Swiftorial Logo
Home
Swift Lessons
Matchups
CodeSnaps
Tutorials
Career
Resources

Kafka with Storm Tutorial

Introduction

Apache Kafka is a distributed streaming platform that can handle real-time data feeds. Apache Storm is a distributed real-time computation system. By integrating Kafka with Storm, you can create an efficient and robust system for processing and analyzing streaming data.

Prerequisites

Before you start, ensure that you have the following:

  • Java 8 or later installed
  • Apache Kafka installed and running
  • Apache Storm installed and running
  • Basic knowledge of Kafka and Storm

Step 1: Setting Up Kafka

First, we need to set up Kafka and create a topic that we will use to send messages.

Create a Kafka topic named "storm-topic":

kafka-topics.sh --create --topic storm-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Step 2: Setting Up Storm

Next, we need to set up Storm to read from the Kafka topic. We'll use the storm-kafka library for this purpose.

Add the following dependencies to your pom.xml if you are using Maven:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>2.2.0</version>
</dependency>
                

Step 3: Writing a Storm Topology

In this step, we'll write a simple Storm topology that reads messages from the Kafka topic and processes them.

Create a Java class named KafkaStormTopology:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.bolt.KafkaBolt;

import java.util.Properties;

public class KafkaStormTopology {
    public static void main(String[] args) throws Exception {
        String bootstrapServers = "localhost:9092";
        String topic = "storm-topic";

        KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "storm-group")
                .build();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout<>(spoutConfig), 1);
        builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("kafka-spout");

        Config config = new Config();
        config.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("KafkaStormTopology", config, builder.createTopology());

        Thread.sleep(10000);
        cluster.shutdown();
    }
}
                

Step 4: Implementing the PrintBolt

The PrintBolt class will be responsible for printing the messages received from Kafka.

Create a Java class named PrintBolt:

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

public class PrintBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String message = input.getStringByField("value");
        System.out.println("Received message: " + message);
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("value"));
    }
}
                

Step 5: Running the Topology

Compile and run the topology to start processing messages from Kafka. You should see the messages printed on the console.

Run the following command to compile and run the topology:

mvn compile exec:java -Dexec.mainClass="KafkaStormTopology"

Conclusion

In this tutorial, we have demonstrated how to integrate Apache Kafka with Apache Storm. We created a Kafka topic, set up a Storm topology to read from the Kafka topic, and processed the messages using a custom bolt. This setup allows you to build a powerful real-time processing pipeline for streaming data.