Java Tutorial - Kafka Integration
Introduction
Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. This tutorial will guide you through integrating Kafka with a Java application. We will cover the basics of setting up Kafka, producing and consuming messages using Java, and running a simple example.
Prerequisites
Before we start, ensure you have the following installed:
- Java Development Kit (JDK) 8 or higher
- Apache Kafka
- Maven
Setting Up Kafka
Download and extract Kafka from the official Apache Kafka Downloads page. Start the ZooKeeper server and Kafka server using the following commands:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
Creating a Maven Project
Create a new Maven project using your IDE or the command line. Add the following dependencies to your pom.xml
:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies>
Producing Messages
Create a Java class named KafkaProducerExample
to produce messages to a Kafka topic:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducerproducer = new KafkaProducer<>(properties); ProducerRecord record = new ProducerRecord<>("my-topic", "key", "value"); producer.send(record); producer.close(); } }
Consuming Messages
Create a Java class named KafkaConsumerExample
to consume messages from a Kafka topic:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumerconsumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value()); } } } }
Running the Example
Compile and run the KafkaProducerExample
class to produce a message. Then, compile and run the KafkaConsumerExample
class to consume the message. You should see the consumed message printed in the console.
Conclusion
In this tutorial, we covered the basics of integrating Kafka with a Java application. We set up Kafka, produced and consumed messages using Java, and ran a simple example. Kafka is a powerful tool for building real-time data pipelines and streaming applications, and integrating it with Java can help you leverage its capabilities for your projects.