Swiftorial Logo
Home
Swift Lessons
Matchups
CodeSnaps
Tutorials
Career
Resources

Core Concepts: Offsets in Kafka

Introduction to Offsets

Offsets are a fundamental concept in Kafka, used to uniquely identify each message within a partition. Understanding how offsets work is crucial for managing data consumption, ensuring data integrity, and handling message processing efficiently.

What is an Offset?

An offset is a unique identifier assigned to each message within a partition. Offsets are sequential, meaning they increase monotonically as new messages are added to the partition. This sequential nature allows consumers to track their position within a partition.

Offset Structure

The offset is simply an integer value that represents the position of a message in a partition.

Example:

If a partition contains the following messages:

0 => {"user": "Alice", "action": "login"}
1 => {"user": "Bob", "action": "logout"}
2 => {"user": "Charlie", "action": "login"}
        

Here, 0, 1, and 2 are the offsets of the messages.

Managing Offsets

Consumers use offsets to keep track of which messages have been processed. Kafka provides several ways to manage offsets:

  • Automatic Offset Committing
  • Manual Offset Committing
  • Storing Offsets in Kafka

Automatic Offset Committing

With automatic offset committing, Kafka automatically commits offsets at regular intervals. This is controlled by the enable.auto.commit and auto.commit.interval.ms properties.


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("enable.auto.commit", "true"); // Enable automatic offset committing
props.put("auto.commit.interval.ms", "1000"); // Commit offsets every 1000 milliseconds

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
    

Manual Offset Committing

With manual offset committing, the application explicitly commits offsets after processing messages. This provides more control over when offsets are committed.


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("enable.auto.commit", "false"); // Disable automatic offset committing

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync(); // Manually commit offsets
}
    
Example:

Configuring KafkaConsumer to manually commit offsets after processing messages:


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("enable.auto.commit", "false"); // Disable automatic offset committing

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync(); // Manually commit offsets
}
        

Storing Offsets in Kafka

Kafka stores offsets in a special topic called __consumer_offsets. This allows offsets to be managed and replicated like any other data in Kafka, providing fault tolerance and durability.

Example:

The __consumer_offsets topic stores information about the committed offsets for each consumer group:

Partition 0:
{"group": "my_group", "topic": "my_topic", "partition": 0, "offset": 123}

Partition 1:
{"group": "my_group", "topic": "my_topic", "partition": 1, "offset": 456}
        

Rebalancing and Offset Management

When a consumer group is rebalanced (e.g., when a new consumer joins or leaves), Kafka redistributes partitions among consumers. During this process, consumers use the committed offsets to resume processing from the correct position.


consumer.subscribe(Collections.singletonList("my_topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection partitions) {
        consumer.commitSync(); // Commit offsets before rebalance
    }

    @Override
    public void onPartitionsAssigned(Collection partitions) {
        // Seek to the last committed offset for each partition
        for (TopicPartition partition : partitions) {
            long offset = consumer.committed(partition).offset();
            consumer.seek(partition, offset);
        }
    }
});
    
Example:

Handling offsets during a rebalance using a ConsumerRebalanceListener:


consumer.subscribe(Collections.singletonList("my_topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection partitions) {
        consumer.commitSync(); // Commit offsets before rebalance
    }

    @Override
    public void onPartitionsAssigned(Collection partitions) {
        // Seek to the last committed offset for each partition
        for (TopicPartition partition : partitions) {
            long offset = consumer.committed(partition).offset();
            consumer.seek(partition, offset);
        }
    }
});
        

Offset Reset Policies

Kafka provides offset reset policies to handle scenarios where the committed offset is invalid (e.g., when the offset is no longer available). The auto.offset.reset property controls this behavior.


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("auto.offset.reset", "earliest"); // Start from the earliest offset if no valid offset is found

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
    
Example:

Configuring KafkaConsumer to start from the earliest offset if no valid offset is found:


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("auto.offset.reset", "earliest"); // Start from the earliest offset if no valid offset is found

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
        

Conclusion

In this tutorial, we've covered the core concepts of Kafka offsets, including how they work, how to manage them, and how they are used during rebalancing and offset reset scenarios. Understanding these concepts is essential for building robust and efficient Kafka-based applications.