Written by 2:45 am Cloud

Implementing a Kafka Producer and Consumer using Spring Cloud Stream

Spring Cloud Stream is a Spring Boot starter that provides a high-level abstraction for building event-driven microservices using Apache Kafka. It makes it easy to produce and consume messages from Kafka topics using a variety of messaging technologies, such as JMS, RabbitMQ, and Google Cloud Pub/Sub.

Implementing a Kafka Producer

To implement a Kafka producer using Spring Cloud Stream, you will need to:

  1. Add the following dependencies to your pom.xml file:
XML
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
  1. Create a Spring Boot application and annotate it with @EnableKafka. This will enable Spring Boot to auto-configure the necessary Kafka components.
Java
@SpringBootApplication
@EnableKafka
public class KafkaProducerApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApp.class, args);
    }
  1. Create a producer bean and annotate it with @KafkaListener. This will tell Spring Cloud Stream to listen for messages on the specified Kafka topic.
Java
@Bean
@KafkaListener(topics = "my-topic")
public void produceMessage(String message) {
    System.out.println("Sending message: " + message);
    // Send the message to Kafka
  1. Start the application and send a message to the Kafka topic using the produceMessage() method.

Implementing a Kafka Consumer

To implement a Kafka consumer using Spring Cloud Stream, you will need to:

  1. Add the following dependencies to your pom.xml file:
XML
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
  1. Create a Spring Boot application and annotate it with @EnableKafka. This will enable Spring Boot to auto-configure the necessary Kafka components.
Java
@SpringBootApplication
@EnableKafka
public class KafkaConsumerApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApp.class, args);
    
  1. Create a consumer bean and annotate it with @KafkaListener. This will tell Spring Cloud Stream to listen for messages on the specified Kafka topic.
Java
@Bean
@KafkaListener(topics = "my-topic")
public void consumeMessage(String message) {
    System.out.println("Received message: " + message);
    // Process the message

  1. Start the application and the consumer will start listening for messages on the Kafka topic.

Example

The following example shows how to implement a Kafka producer and consumer using Spring Cloud Stream:

Java
// Producer
@SpringBootApplication
@EnableKafka
public class KafkaProducerApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApp.class, args);
    }

    @Bean
    @KafkaListener(topics = "my-topic")
    public void produceMessage(String message) {
        System.out.println("Sending message: " + message);
        // Send the message to Kafka
    
Java
// Consumer
@SpringBootApplication
@EnableKafka
public class KafkaConsumerApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApp.class, args);
    }

    @Bean
    @KafkaListener(topics = "my-topic")
    public void consumeMessage(String message) {
        System.out.println("Received message: " + message);
        // Process the message
    

To start the producer and consumer, you can run the following command:

mvn spring-boot:run

Once the producer and consumer are running, you can send a message to the Kafka topic using the following command:

curl -X POST http://localhost:8080/messages -d '{"message": "Hello, world!"}'

The consumer will receive the message and print it to the console:

Received message: Hello, world!

Copyright Free Images

Kafka producer

Visited 17 times, 1 visit(s) today
Close