Using Kafka in 2022

In case it helped 🙂

We will cover briefly:

  1. Intro to Kafka
  2. Setup Kafka on machine
  3. Create Kafka Producer
  4. Create Kafka Consumer
  5. Running Producer and Consumer

Intro to Kafka

As per the docs, Kafka is an event streaming platform that can be used for collecting, processing, storing, and analyzing data at scale. Kafka is used by thousands of companies including over 60% of the Fortune 100.

Kafka is known for its excellent performance, low latency, fault tolerance, and high throughput, it’s capable of handling thousands of messages per second.

Kafka Overview

Usecases:

  • Process payments and financial transactions in real-time,
  • Continuously capture and analyze sensor data from IoT devices,
  • Collect and immediately react to customer interactions and orders,
  • Track and monitor cars, trucks, fleets, and shipments in real-time

Setup Kafka on machine

This section will help to set up Kafka on your machine. 

Note: We are using a mac.

  • To install Kafka follow here. Navigate to the latest binary downloads, as of today, it’s Scala 2.13, and click on the link. It will open another webpage and click the link as below:

Download link for Kafka

  • Extract the folder and navigate to the folder (Downloads/kafka_2.12–3.1.0) in our case.

Note: Follow the below terminal commands at the path where you extract the Kafka zip. 

sh bin/zookeeper-server-start.sh config/zookeeper.properties

Enter fullscreen mode Exit fullscreen mode

Start Zookeeper

The above command starts Zookeeper . It is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.

Note: Starting with v2.8, Kafka can be run without Zookeeper. However, this update isn’t ready for use in production

Keep the above terminal open

Start Kafka broker service

  • Open another terminal at the above path(Downloads/kafka_2.12–3.1.0) and type
sh bin/kafka-server-start.sh config/server.properties

Enter fullscreen mode Exit fullscreen mode

This starts the Kafka Broker. A Kafka cluster is made up of multiple Kafka Brokers. Each Kafka Broker has a unique ID (number). Kafka Brokers contain topic log partitions. Connecting to one broker bootstraps a client to the entire Kafka cluster. A Kafka cluster can have, 10, 100, or 1,000 brokers in a cluster if needed.

Keep this terminal open

Create a topic

  • Open a new terminal at the above path and type
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic

Enter fullscreen mode Exit fullscreen mode

Kafka produces events across different machines, these events are organized and stored inside topics 

In the above command, we are saying our 

  • server is localhost:9092 and
  • topic created is test-topic 

Verify the topic created 

  • Open a new terminal at the above path and type
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Enter fullscreen mode Exit fullscreen mode

This should show you the test-topic

Create Kafka Producer

Note: Kafka Broker should be running in the terminal

Install dependencies

We will be using Eclipse IDE for creating our Kafka Producer

Kafka Producer

  • Create a Maven Project using the eclipse.
  • Go to the pom.xml, put in the following, and update your maven project
<dependencies> 

   <!-- Kafka Client -->
   <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
   </dependency>

   <!-- Simple logging -->
   <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.0-alpha6</version>
   </dependency>
   <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.0-alpha6</version>     
   </dependency>
</dependencies>

Enter fullscreen mode Exit fullscreen mode

This will add Kafka Client and slf4j to our project.

Create SampleProducer.java

  • Create a class called SampleProducer.java
public class SampleProducer {


 public SampleProducer() throws InterruptedException {
  Properties properties = new Properties();
  properties.put("bootstrap.servers", "localhost:9092");
  properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

  properties.put("kafka.topic", "test-topic");

  KafkaProducer kafkaProducer = new KafkaProducer(properties);
  int i=0;

  try {
   while (true) {
    i++;
    ProducerRecord producerRecord = new ProducerRecord("test-topic","key","message-" +i);

    Thread.sleep(3000);
    kafkaProducer.send(producerRecord);
   }
  } finally {
   kafkaProducer.close();
  }    
    }

}

Enter fullscreen mode Exit fullscreen mode

We initialize the Properties with the following keys and values 

bootstrap.servers : your local server host and port

key.serializer : for sending the serialized key over the network. 

value.serializer : for sending the serialized value over the network.

kafka.topic : your Kafka topic

  • Next, we create an instance of KafkaProducer with the properties (as specified above)
  • We need to create a ProducerRecordfor sending the data to the Kafka producer.

This ProducerRecord takes in the topic name, key, and the value to be sent.

  • We send the record using kafkaProducer. 
  • This block of code is placed inside the while loop, and after every 3 seconds, it sends a record to the Kafka broker.

Create KafkaProducerRunner.java

  • This will be class that invokes the above SampleProducer
public class KafkaProducerRunner {
 public static void main(String[] args) {
  SampleProducer sampleProducer = new SampleProducer();
 }
}

Enter fullscreen mode Exit fullscreen mode

Create Kafka Consumer

Note: Kafka Broker should be running in the terminal

  • Create a class called SampleConsumer.java
public class SampleConsumer {

 public SampleConsumer() {
  Properties properties = new Properties();
  properties.put("bootstrap.servers", "localhost:9092"); 
  properties.put("kafka.topic", "test-topic"); 
  properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
 properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

 properties.put("group.id", "my-group");

  KafkaConsumer consumer = new KafkaConsumer(properties);
  consumer.subscribe(Arrays.asList(properties.getProperty("kafka.topic")));

       while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord record : records)  {
           System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", 
             record.partition(), record.offset(), record.key(), record.value());
          }
       }              

 }  
}

Enter fullscreen mode Exit fullscreen mode

We initialize the Properties with the following keys and values

bootstrap.servers : your local server host and port

kafka.topic : your Kafka topic

key.deserializer : for deserializing the key over the network.

value.deserializer : for deserializing the value over the network.

group.id : specify the group id, used only on the Consumer side.

  • Next, we create an instance of KafkaConsumer with the properties (as specified above). This KafkaConsumer consumes the records from a Kafka cluster.
  • Next, we subscribe to the topic (which was created in the KafkaProducer) using subscribe 
  • We get the data in the form of ConsumerRecords and we call poll every 100 ms in an infinite loop.
  • For each consumer record, we extract the partition, offset, key and value

Create KafkaConsumerRunner.java

  • This will be class that invokes the above SampleConsumer
public class KafkaConsumerRunner {
 public static void main(String[] args) {
  SampleConsumer sampleConsumer = new SampleConsumer();
 }
}

Enter fullscreen mode Exit fullscreen mode

Running Producer and Consumer

If you followed the above steps correctly, you should have the below files

Let’s run the KafkaProducerRunner first, we should see something like this

Kafka Producer logs

Our Producer produces data every 3 seconds.

  • Let’s run the KafkaConsumerRunner, we should see something like this

Kafka Consumer logs

Our consumer receives the data, every 3 seconds and we print it onto the console

Source code.

In case it helped 🙂

原文链接:Using Kafka in 2022

© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容