Swiftorial Logo
Home
Swift Lessons
Matchups
CodeSnaps
Tutorials
Career
Resources

Core Concepts: Messages and Batches in Kafka

Introduction to Messages and Batches

In Kafka, messages and batches are fundamental units of data processing and transfer. Understanding how messages and batches work is essential for optimizing data throughput and ensuring efficient communication between producers and consumers.

What are Kafka Messages?

A message in Kafka is a unit of data that is written to and read from Kafka topics. Each message consists of a key, a value, and metadata including headers and timestamp.

Message Structure

Kafka messages have the following structure:

  • Key: An optional attribute used for partitioning and grouping messages.
  • Value: The main content of the message, which is the actual data being transmitted.
  • Headers: Optional metadata that can be used to provide additional information about the message.
  • Timestamp: The time when the message was produced.
Example:

A Kafka message representing a user login event:


{
  "key": "user123",
  "value": {
    "event": "login",
    "timestamp": "2024-07-05T12:34:56Z"
  },
  "headers": {
    "source": "webapp",
    "eventType": "userEvent"
  }
}
        

Producing Messages

Producers are responsible for creating and sending messages to Kafka topics. Here is an example of producing a message using KafkaProducer in Java:


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
    

What are Kafka Batches?

Batches in Kafka are collections of messages that are sent together to improve efficiency. By batching messages, Kafka reduces the overhead associated with network communication and improves throughput.

Batching Messages

When producing messages, KafkaProducer can be configured to batch messages before sending them to the broker. This is controlled by the batch.size and linger.ms configurations.


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 16384); // Batch size in bytes
props.put("linger.ms", 10); // Linger time in milliseconds

KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
    
Example:

Configuring KafkaProducer to batch messages with a batch size of 16 KB and a linger time of 10 milliseconds:


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 16384); // Batch size in bytes
props.put("linger.ms", 10); // Linger time in milliseconds

KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
        

Consuming Messages

Consumers read messages from Kafka topics, and they can also process messages in batches to improve efficiency. Here is an example of consuming messages using KafkaConsumer in Java:


import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Collections;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_events_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_events"));

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
    

Batch Processing with Consumers

Consumers can process messages in batches to improve performance. This can be controlled by configuring the fetch.min.bytes and fetch.max.wait.ms properties.


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_events_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.min.bytes", 1024); // Minimum amount of data the server should return for a fetch request
props.put("fetch.max.wait.ms", 500); // Maximum amount of time the server will block before answering the fetch request

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_events"));

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
    
Example:

Configuring KafkaConsumer to fetch a minimum of 1 KB of data and wait a maximum of 500 milliseconds:


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user_events_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.min.bytes", 1024); // Minimum amount of data the server should return for a fetch request
props.put("fetch.max.wait.ms", 500); // Maximum amount of time the server will block before answering the fetch request

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_events"));

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
        

Message Compression

Kafka supports message compression to reduce the size of messages and improve throughput. Producers can be configured to use compression algorithms such as gzip, snappy, or lz4.


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip"); // Use gzip compression

KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
    
Example:

Configuring KafkaProducer to use gzip compression:


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip"); // Use gzip compression

KafkaProducer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord("user_events", "user123", "{\"event\":\"login\",\"timestamp\":\"2024-07-05T12:34:56Z\"}"));
producer.close();
        

Conclusion

In this tutorial, we've covered the core concepts of Kafka messages and batches, including how to produce and consume messages efficiently. Understanding these concepts is essential for optimizing Kafka-based data pipelines and ensuring high throughput.