Core Concepts: Messages and Batches in Kafka
Introduction to Messages and Batches
In Kafka, messages and batches are fundamental units of data processing and transfer. Understanding how messages and batches work is essential for optimizing data throughput and ensuring efficient communication between producers and consumers.
What are Kafka Messages?
A message in Kafka is a unit of data that is written to and read from Kafka topics. Each message consists of a key, a value, and metadata including headers and timestamp.
Message Structure
Kafka messages have the following structure:
- Key: An optional attribute used for partitioning and grouping messages.
- Value: The main content of the message, which is the actual data being transmitted.
- Headers: Optional metadata that can be used to provide additional information about the message.
- Timestamp: The time when the message was produced.
A Kafka message representing a user login event:
{
"key": "user123",
"value": {
"event": "login",
"timestamp": "2024-07-05T12:34:56Z"
},
"headers": {
"source": "webapp",
"eventType": "userEvent"
}
}
Producing Messages
Producers are responsible for creating and sending messages to Kafka topics. Here is an example of producing a message using KafkaProducer in Java:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
What are Kafka Batches?
Batches in Kafka are collections of messages that are sent together to improve efficiency. By batching messages, Kafka reduces the overhead associated with network communication and improves throughput.
Batching Messages
When producing messages, KafkaProducer can be configured to batch messages before sending them to the broker. This is controlled by the batch.size
and linger.ms
configurations.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 16384); // Batch size in bytes
props.put("linger.ms", 10); // Linger time in milliseconds
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
Configuring KafkaProducer to batch messages with a batch size of 16 KB and a linger time of 10 milliseconds:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 16384); // Batch size in bytes
props.put("linger.ms", 10); // Linger time in milliseconds
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
Consuming Messages
Consumers read messages from Kafka topics, and they can also process messages in batches to improve efficiency. Here is an example of consuming messages using KafkaConsumer in Java:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Collections;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_events_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_events"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Batch Processing with Consumers
Consumers can process messages in batches to improve performance. This can be controlled by configuring the fetch.min.bytes
and fetch.max.wait.ms
properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_events_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.min.bytes", 1024); // Minimum amount of data the server should return for a fetch request
props.put("fetch.max.wait.ms", 500); // Maximum amount of time the server will block before answering the fetch request
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_events"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Configuring KafkaConsumer to fetch a minimum of 1 KB of data and wait a maximum of 500 milliseconds:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_events_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.min.bytes", 1024); // Minimum amount of data the server should return for a fetch request
props.put("fetch.max.wait.ms", 500); // Maximum amount of time the server will block before answering the fetch request
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_events"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Message Compression
Kafka supports message compression to reduce the size of messages and improve throughput. Producers can be configured to use compression algorithms such as gzip, snappy, or lz4.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip"); // Use gzip compression
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
Configuring KafkaProducer to use gzip compression:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip"); // Use gzip compression
KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
Conclusion
In this tutorial, we've covered the core concepts of Kafka messages and batches, including how to produce and consume messages efficiently. Understanding these concepts is essential for optimizing Kafka-based data pipelines and ensuring high throughput.