Getting Started With Kafka

Implementing Kafka Producer and Consumer in Java is simpler than expected. To run and verify the code, installing Kafka as described below. Kafka requires Zookeeper, so Zookeeper installation is also needed.

Installing kafka & zookeeper

brew install kafka
brew install zookeeper
zkServer start # zookeeper running
kafka-server-start /usr/local/etc/kafka/server.properties # kafka running

Creating kafka topic

# replication-factor : number of replicas (1)
# partitions : number of partitions (1)
# topic : topic name (taeng)
$ kafka-topics --create --zookeeper localhost:2181 \ 
      --replication-factor 1 --partitions 1 --topic taeng

Kafka implementation setup is complete. This is minimal setup for examining simple examples. In production, we don’t use single-node systems like this. Note: This post doesn’t cover theoretical aspects of Kafka or Zookeeper in detail.


Implementing Kafka Producer

This example reads keyboard input and sends messages. In production, connection management for Kafka transmission would be strictly controlled, but for this simple example, terminating connections when specific messages are entered.

/**
 * Kafka Producer.
 * Sends messages through keyboard input.
 *
 * @author Kimtaeng
 * Created on 2018. 9. 10.
 */
public class MyKafkaProducer {
    private static final String TOPIC_NAME = "taeng";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        while(true) {
            Scanner sc = new Scanner(System.in);
            System.out.print("Input > ");
            String message = sc.nextLine();

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            try {
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        // some exception
                    }
                });

            } catch (Exception e) {
                // exception
            } finally {
                producer.flush();
            }

            if(StringUtils.equals(message, FIN_MESSAGE)) {
                producer.close();
                break;
            }
        }
    }
}


Implementing Kafka Consumer

For the Consumer, after subscribing, consuming records through polling. Topics can be specified as Lists, i.e., multiple topics are supported. The poll method’s parameter is the maximum blocking time to wait for records. Similar to the Producer, terminating when specific messages are received.

/**
 * Kafka Consumer.
 * Receives messages sent by Producer.
 *
 * @author Kimtaeng
 * Created on 2018. 9. 10.
 */
public class MyKafkaConsumer {
    private static final String TOPIC_NAME = "taeng";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));

                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (!StringUtils.equals(message, FIN_MESSAGE));
        } catch(Exception e) {
            // exception
        } finally {
            consumer.close();
        }
    }
}


Running the Code

Starting the Consumer first, then the Producer. When entering messages through keyboard input in the Producer, seeing sent messages on the Consumer’s console screen.

kafka producer

kafka consumer

Implementing and executing Kafka Producer and Consumer in Java. Beyond what we’ve covered in the example code above, there are many more configuration options. Learning about these settings in the following post.