Core Concepts: Producers and Consumers in Kafka
Introduction to Producers and Consumers
In Kafka, producers and consumers are key components that allow data to be written to and read from Kafka topics. Understanding how producers and consumers work is essential for designing efficient data pipelines and streaming applications.
What are Producers?
Producers are the entities that publish data to one or more Kafka topics. They are responsible for creating and sending messages to Kafka brokers, which then store the messages in the appropriate topic partitions.
Creating a Producer
To create a producer in Kafka, you typically use the KafkaProducer class provided by the Kafka client library. Here is an example in Java:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("my_topic", "key", "value"));
producer.close();
Creating a producer that sends messages to the user_logs
topic:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_logs", "user123", "login"));
producer.close();
What are Consumers?
Consumers are the entities that subscribe to Kafka topics and process the feed of published messages. They pull data from Kafka brokers and can be part of a consumer group to enable parallel processing of data.
Creating a Consumer
To create a consumer in Kafka, you typically use the KafkaConsumer class provided by the Kafka client library. Here is an example in Java:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Collections;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Creating a consumer that reads messages from the user_logs
topic:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Collections;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_log_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_logs"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Consumer Groups
Consumer groups allow multiple consumers to work together to process data from a topic. Each consumer in a group reads data from a subset of the partitions in the topic, enabling parallel processing.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer1 = new KafkaConsumer<>(props);
KafkaConsumer consumer2 = new KafkaConsumer<>(props);
consumer1.subscribe(Collections.singletonList("my_topic"));
consumer2.subscribe(Collections.singletonList("my_topic"));
Creating two consumers in the same group to read from the user_logs
topic:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_log_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer1 = new KafkaConsumer<>(props);
KafkaConsumer consumer2 = new KafkaConsumer<>(props);
consumer1.subscribe(Collections.singletonList("user_logs"));
consumer2.subscribe(Collections.singletonList("user_logs"));
Offset Management
Offsets are unique identifiers for each record within a partition. Consumers use offsets to keep track of which records have been processed. Kafka automatically commits offsets periodically, but manual offset management is also possible.
consumer.commitSync();
Manually committing offsets after processing records:
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
Conclusion
In this tutorial, we've covered the core concepts of Kafka producers and consumers, including how to create and manage them, and how they work. Understanding these concepts is essential for building scalable and efficient Kafka-based applications.