Swiftorial Logo
Home
Swift Lessons
Matchups
CodeSnaps
Tutorials
Career
Resources

Kafka Integration with Go

Introduction

Kafka is a distributed streaming platform that can handle real-time data feeds. Go, also known as Golang, is a statically typed, compiled programming language designed for simplicity and efficiency. Integrating Kafka with Go allows developers to build robust, high-performance applications that can process and analyze streams of data in real-time.

Prerequisites

Before diving into Kafka integration with Go, ensure you have the following:

  • Basic understanding of Kafka and Go.
  • Kafka and Zookeeper installed and running on your machine.
  • Go environment set up on your machine.

Installing Kafka Client for Go

We will use the sarama package, which is a popular Go client for Apache Kafka. To install it, run the following command:

go get -u github.com/Shopify/sarama

Creating a Kafka Producer

Let's create a simple Kafka producer in Go. This producer will send messages to a Kafka topic.

package main

import (
    "fmt"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatal("Failed to start Sarama producer:", err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "test_topic",
        Value: sarama.StringEncoder("Hello Kafka!"),
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Fatal("Failed to send message:", err)
    }

    fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", "test_topic", partition, offset)
}
                

Run the above code to send a message to the Kafka topic test_topic.

Creating a Kafka Consumer

Now, let's create a Kafka consumer in Go. This consumer will read messages from the Kafka topic.

package main

import (
    "fmt"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatal("Failed to start Sarama consumer:", err)
    }
    defer consumer.Close()

    partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
    if err != nil {
        log.Fatal("Failed to start partition consumer:", err)
    }
    defer partitionConsumer.Close()

    for msg := range partitionConsumer.Messages() {
        fmt.Printf("Consumed message offset %d: %s\n", msg.Offset, string(msg.Value))
    }
}
                

Run the above code to consume messages from the Kafka topic test_topic.

Handling Errors

Both the producer and consumer can encounter errors. It's important to handle these errors properly. Here's an example of how to handle errors in the Kafka consumer:

package main

import (
    "fmt"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatal("Failed to start Sarama consumer:", err)
    }
    defer consumer.Close()

    partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
    if err != nil {
        log.Fatal("Failed to start partition consumer:", err)
    }
    defer partitionConsumer.Close()

    go func() {
        for err := range partitionConsumer.Errors() {
            fmt.Println("Error: ", err)
        }
    }()

    for msg := range partitionConsumer.Messages() {
        fmt.Printf("Consumed message offset %d: %s\n", msg.Offset, string(msg.Value))
    }
}
                

This code sets up a goroutine to listen for errors and print them out.

Conclusion

In this tutorial, we've covered the basics of integrating Kafka with Go. We learned how to create a Kafka producer and consumer, and how to handle errors. With these skills, you can start building powerful real-time data processing applications using Kafka and Go.