Spring Kafka with Spring Boot Tutorial
Introduction
Spring Kafka is a project that provides support for Apache Kafka messaging system in Spring applications. It simplifies the process of integrating Kafka with Spring applications, providing a higher-level abstraction and making it easier to work with Kafka producers and consumers.
In this tutorial, we will create a Spring Boot application that demonstrates how to produce and consume messages using Spring Kafka.
Prerequisites
Before we start, you should have the following installed:
- Java Development Kit (JDK) 8 or higher
- Apache Kafka (make sure to have a running instance)
- Maven or Gradle for dependency management
- Spring Boot CLI (optional, but helpful)
Setting Up a Spring Boot Project
You can create a Spring Boot project using Spring Initializr. Go to start.spring.io and select the following options:
- Project: Maven Project
- Language: Java
- Spring Boot: 2.5.0 (or latest)
- Dependencies: Spring Web, Spring for Apache Kafka
Click on "Generate" to download your project. Extract it and open it in your favorite IDE.
Configuration
Open the application.properties
file located in src/main/resources
and add the following properties to configure Kafka:
application.properties
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest
Creating the Producer
Create a new class KafkaProducerConfig.java
under src/main/java/com/example/demo/config
to configure the Kafka producer:
KafkaProducerConfig.java
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactoryproducerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Creating the Consumer
Create a new class KafkaConsumerConfig.java
under src/main/java/com/example/demo/config
to configure the Kafka consumer:
KafkaConsumerConfig.java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactoryconsumerFactory() { Map configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(configProps); } @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { System.out.println("Received Message: " + message); } }
Sending Messages
Create a new REST controller MessageController.java
to send messages to Kafka:
MessageController.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private KafkaTemplatekafkaTemplate; @PostMapping("/send") public String sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); return "Message sent to Kafka: " + message; } }
Running the Application
Now that we have everything set up, let’s run the application. Use the following command to start your Spring Boot application:
mvn spring-boot:run
You can send a message to Kafka by making a POST request to http://localhost:8080/send
with a message body. You can use tools like Postman or curl for this.
curl -X POST http://localhost:8080/send -H "Content-Type: application/json" -d "\"Hello Kafka!\""
Conclusion
In this tutorial, we demonstrated how to set up a Spring Boot application with Spring Kafka. We created a producer to send messages and a consumer to receive messages from a Kafka topic. This basic setup can be expanded with more features such as error handling, message serialization, and much more.
For further learning, consider exploring more advanced Kafka features and Spring Kafka's capabilities.