Swiftorial Logo
Home
Swift Lessons
Matchups
CodeSnaps
Tutorials
Career
Resources

Kafka with Scala Tutorial

Introduction to Kafka

Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It is designed to handle high throughput and low latency, making it ideal for big data use cases.

Kafka works on a publish-subscribe model, where producers publish messages to topics, and consumers subscribe to those topics to receive messages. This decouples the producers from the consumers, allowing for more flexible and scalable architectures.

Setting Up Kafka

Before we begin coding in Scala, we need to set up Kafka on our machine. Follow these steps to install Kafka:

  1. Download Kafka from the official website.
  2. Extract the downloaded archive.
  3. Start the ZooKeeper service:
  4. $ bin/zookeeper-server-start.sh config/zookeeper.properties
  5. Start the Kafka broker:
  6. $ bin/kafka-server-start.sh config/server.properties

You now have a basic Kafka setup running on your local machine.

Creating a Scala Project

To work with Kafka in Scala, we will use the sbt (Scala Build Tool). Create a new directory for your project and navigate to it:

$ mkdir KafkaScalaExample
$ cd KafkaScalaExample

Create a new file named build.sbt and add the following dependencies:

name := "KafkaScalaExample"
version := "0.1"
scalaVersion := "2.13.6"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.8.0"

Create a src/main/scala directory structure for your Scala code.

Producing Messages to Kafka

Now we will write a Scala application that produces messages to a Kafka topic. Create a new Scala file ProducerExample.scala in the src/main/scala directory with the following code:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties

object ProducerExample {
    def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)

        for (i <- 1 to 10) {
            val record = new ProducerRecord[String, String]("my-topic", s"key-$i", s"value-$i")
            producer.send(record)
        }

        producer.close()
    }
}

This code initializes a Kafka producer, creates a topic named my-topic, and sends 10 messages to it with keys and values.

Consuming Messages from Kafka

Next, we will create a consumer that reads messages from the Kafka topic. Create a new Scala file ConsumerExample.scala in the same directory with the following code:

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.consumer.ConsumerRecords
import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters._

object ConsumerExample {
    def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

        val consumer = new KafkaConsumer[String, String](props)
        consumer.subscribe(java.util.Collections.singletonList("my-topic"))

        while (true) {
            val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))
            records.asScala.foreach { record =>
                println(s"Consumed message: key=${record.key()}, value=${record.value()}")
            }
        }
    }
}

This consumer subscribes to the my-topic topic and continuously polls for new messages, printing them to the console.

Running the Examples

To run the producer and consumer, follow these steps:

  1. First, run the consumer in a terminal:
  2. $ sbt run
  3. Then, run the producer in another terminal:
  4. $ sbt run

You should see the messages produced by the producer being consumed by the consumer in real-time.

Conclusion

In this tutorial, we covered the basics of using Kafka with Scala. We set up a Kafka environment, created a Scala project, and built producer and consumer applications. Kafka's ability to handle large volumes of data makes it an essential tool in the world of big data and real-time analytics.

For further exploration, consider looking into Kafka Streams for real-time processing, or Schema Registry for managing data schemas.