Kafka with Scala Tutorial
Introduction to Kafka
Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It is designed to handle high throughput and low latency, making it ideal for big data use cases.
Kafka works on a publish-subscribe model, where producers publish messages to topics, and consumers subscribe to those topics to receive messages. This decouples the producers from the consumers, allowing for more flexible and scalable architectures.
Setting Up Kafka
Before we begin coding in Scala, we need to set up Kafka on our machine. Follow these steps to install Kafka:
- Download Kafka from the official website.
- Extract the downloaded archive.
- Start the ZooKeeper service:
- Start the Kafka broker:
You now have a basic Kafka setup running on your local machine.
Creating a Scala Project
To work with Kafka in Scala, we will use the sbt (Scala Build Tool). Create a new directory for your project and navigate to it:
$ cd KafkaScalaExample
Create a new file named build.sbt
and add the following dependencies:
version := "0.1"
scalaVersion := "2.13.6"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.8.0"
Create a src/main/scala
directory structure for your Scala code.
Producing Messages to Kafka
Now we will write a Scala application that produces messages to a Kafka topic. Create a new Scala file ProducerExample.scala
in the src/main/scala
directory with the following code:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties
object ProducerExample {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for (i <- 1 to 10) {
val record = new ProducerRecord[String, String]("my-topic", s"key-$i", s"value-$i")
producer.send(record)
}
producer.close()
}
}
This code initializes a Kafka producer, creates a topic named my-topic
, and sends 10 messages to it with keys and values.
Consuming Messages from Kafka
Next, we will create a consumer that reads messages from the Kafka topic. Create a new Scala file ConsumerExample.scala
in the same directory with the following code:
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.consumer.ConsumerRecords
import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters._
object ConsumerExample {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(java.util.Collections.singletonList("my-topic"))
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))
records.asScala.foreach { record =>
println(s"Consumed message: key=${record.key()}, value=${record.value()}")
}
}
}
}
This consumer subscribes to the my-topic
topic and continuously polls for new messages, printing them to the console.
Running the Examples
To run the producer and consumer, follow these steps:
- First, run the consumer in a terminal:
- Then, run the producer in another terminal:
You should see the messages produced by the producer being consumed by the consumer in real-time.
Conclusion
In this tutorial, we covered the basics of using Kafka with Scala. We set up a Kafka environment, created a Scala project, and built producer and consumer applications. Kafka's ability to handle large volumes of data makes it an essential tool in the world of big data and real-time analytics.
For further exploration, consider looking into Kafka Streams for real-time processing, or Schema Registry for managing data schemas.