Kafka with Storm Tutorial
Introduction
Apache Kafka is a distributed streaming platform that can handle real-time data feeds. Apache Storm is a distributed real-time computation system. By integrating Kafka with Storm, you can create an efficient and robust system for processing and analyzing streaming data.
Prerequisites
Before you start, ensure that you have the following:
- Java 8 or later installed
- Apache Kafka installed and running
- Apache Storm installed and running
- Basic knowledge of Kafka and Storm
Step 1: Setting Up Kafka
First, we need to set up Kafka and create a topic that we will use to send messages.
Create a Kafka topic named "storm-topic":
kafka-topics.sh --create --topic storm-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Step 2: Setting Up Storm
Next, we need to set up Storm to read from the Kafka topic. We'll use the storm-kafka
library for this purpose.
Add the following dependencies to your pom.xml
if you are using Maven:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>2.2.0</version> </dependency>
Step 3: Writing a Storm Topology
In this step, we'll write a simple Storm topology that reads messages from the Kafka topic and processes them.
Create a Java class named KafkaStormTopology
:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.Config; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.LocalCluster; import org.apache.storm.kafka.bolt.KafkaBolt; import java.util.Properties; public class KafkaStormTopology { public static void main(String[] args) throws Exception { String bootstrapServers = "localhost:9092"; String topic = "storm-topic"; KafkaSpoutConfigspoutConfig = KafkaSpoutConfig.builder(bootstrapServers, topic) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "storm-group") .build(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout<>(spoutConfig), 1); builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("kafka-spout"); Config config = new Config(); config.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KafkaStormTopology", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Step 4: Implementing the PrintBolt
The PrintBolt
class will be responsible for printing the messages received from Kafka.
Create a Java class named PrintBolt
:
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.Map; public class PrintBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String message = input.getStringByField("value"); System.out.println("Received message: " + message); collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("value")); } }
Step 5: Running the Topology
Compile and run the topology to start processing messages from Kafka. You should see the messages printed on the console.
Run the following command to compile and run the topology:
mvn compile exec:java -Dexec.mainClass="KafkaStormTopology"
Conclusion
In this tutorial, we have demonstrated how to integrate Apache Kafka with Apache Storm. We created a Kafka topic, set up a Storm topology to read from the Kafka topic, and processed the messages using a custom bolt. This setup allows you to build a powerful real-time processing pipeline for streaming data.