Getting Started With Kafka!

Kafka Producer์™€ Consumer๋ฅผ ์ž๋ฐ”๋กœ ์ง์ ‘ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒƒ์€ ์ƒ๊ฐ๋ณด๋‹ค ๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ๊ฒฐ๊ณผ๊นŒ์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด Kafka ์„ค์น˜ ๊ณผ์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. kafka๋Š” zookeeper์™€ ๊ฐ™์ด ์›€์ง์ž…๋‹ˆ๋‹ค. ๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— zookeeper ์„ค์น˜๋„ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

kafka & zookeeper ์„ค์น˜ํ•˜๊ธฐ

brew install kafka
brew install zookeeper
zkServer start # zookeeper running
kafka-server-start /usr/local/etc/kafka/server.properties # kafka running

kafka topic ์ƒ์„ฑํ•˜๊ธฐ

# replication-factor : ๋ณต์ œ๋ณธ ๊ฐœ์ˆ˜(1)
# partitions : ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜(1)
# topic : ํ† ํ”ฝ๋ช…(taeng)
$ kafka-topics --create --zookeeper localhost:2181 \ 
      --replication-factor 1 --partitions 1 --topic taeng

์ผ๋‹จ ์นดํ”„์นด๋ฅผ ์ฝ”๋“œ๋กœ ๊ตฌํ˜„ํ•  ์ค€๋น„๋Š” ๋ชจ๋‘ ๋๋‚ฌ์Šต๋‹ˆ๋‹ค. ๋‹ค๋งŒ ๊ฐ„๋‹จํ•œ ์˜ˆ์ œ๋ฅผ ์‚ดํŽด๋ณด๊ธฐ ์œ„ํ•œ ์ตœ์†Œํ•œ์˜ ์ค€๋น„์ž…๋‹ˆ๋‹ค. ์‹ค์ œ ์‹ค๋ฌด์—์„œ๋Š” ์ง€๊ธˆ๊ณผ ๊ฐ™์ด ๋‹จ์ผ ์‹œ์Šคํ…œ์œผ๋กœ ์‚ฌ์šฉํ•˜์ง€๋Š” ์•Š์Šต๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๋ณธ๊ฒฉ์ ์ธ ๊ตฌํ˜„์— ์•ž์„œ! ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” kafka๋‚˜ zookeeper์— ๋Œ€ํ•œ ์ด๋ก ์ ์ธ ๋ถ€๋ถ„์€ ์ž์„ธํžˆ ๋‹ค๋ฃจ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.


๊ตฌํ˜„ํ•ด๋ณด์ž! Kafka Producer

์˜ˆ์ œ๋Š” ๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค. ๋‹จ์ˆœํ•˜๊ฒŒ ํ‚ค๋ณด๋“œ ์ž…๋ ฅ์„ ๋ฐ›์•„ ์ „์†กํ•˜๋Š” ๊ฒƒ์ด์ง€์š”. ์‹ค๋ฌด์—์„œ๋Š” kafka ์ „์†ก์— ์žˆ์–ด์„œ ์—ฐ๊ฒฐ์„ ๋Š๊ฑฐ๋‚˜ ์œ ์ง€ํ•˜๋Š” ๊ฒƒ์ด ์—„๊ฒฉํ•˜๊ฒŒ ๊ด€๋ฆฌ๋˜๊ฒ ์ง€๋งŒ ์ง€๊ธˆ์€ ๊ฐ„๋‹จํ•œ ์˜ˆ์ œ์ธ๋งŒํผ ๋‹จ์ˆœํ•˜๊ฒŒ ํŠน์ • ๋ฉ”์‹œ์ง€๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ์—ฐ๊ฒฐ์ด ์ข…๋ฃŒ๋˜๋„๋ก ํ•ฉ์‹œ๋‹ค.

/**
 * Kafka Producer.
 * keyboard input์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•œ๋‹ค.
 *
 * @author Kimtaeng
 * Created on 2018. 9. 10.
 */
public class MyKafkaProducer {
    private static final String TOPIC_NAME = "taeng";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        while(true) {
            Scanner sc = new Scanner(System.in);
            System.out.print("Input > ");
            String message = sc.nextLine();

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            try {
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        // some exception
                    }
                });

            } catch (Exception e) {
                // exception
            } finally {
                producer.flush();
            }

            if(StringUtils.equals(message, FIN_MESSAGE)) {
                producer.close();
                break;
            }
        }
    }
}


๊ตฌํ˜„ํ•ด๋ณด์ž! Kafka Consumer

Consumer์˜ ๊ฒฝ์šฐ๋Š” ๊ตฌ๋…(subscribe)์„ ์‹œ์ž‘ํ•œ ํ›„ poll์„ ํ†ตํ•ด ๋ ˆ์ฝ”๋“œ๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. topic์˜ ๊ฒฝ์šฐ์— List๋กœ ์„ค์ • ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. ๋‹จ์ผ topic์ด ์•„๋‹ˆ๋ผ๋Š” ๊ฒƒ์ด์ง€์š”. poll ๋ฉ”์„œ๋“œ์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ธฐ๋‹ค๋ฆด ์ตœ๋Œ€ ๋ธ”๋Ÿญ ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์•ž์„œ ์‚ดํŽด๋ณธ Producer์™€ ๋™์ผํ•˜๊ฒŒ ํŠน์ • ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์œผ๋ฉด ์ข…๋ฃŒํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

/**
 * Kafka Consumer.
 * Producer๊ฐ€ ์ „์†กํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š”๋‹ค.
 *
 * @author Kimtaeng
 * Created on 2018. 9. 10.
 */
public class MyKafkaConsumer {
    private static final String TOPIC_NAME = "taeng";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));

                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (!StringUtils.equals(message, FIN_MESSAGE));
        } catch(Exception e) {
            // exception
        } finally {
            consumer.close();
        }
    }
}


์‹คํ–‰ํ•ด๋ณด์ž!

๋จผ์ € Consumer๋ฅผ ๋ฏธ๋ฆฌ ์‹คํ–‰ํ•œ ํ›„์— Producer๋ฅผ ์‹คํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. Producer๋ฅผ ์‹คํ–‰ํ•œ ํ›„ ํ‚ค๋ณด๋“œ ์ž…๋ ฅ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ž…๋ ฅํ•˜๋ฉด Consumer์˜ ์ฝ˜์†” ํ™”๋ฉด์—์„œ ์ „์†ก๋œ ๋ฉ”์‹œ์ง€๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

kafka producer

kafka consumer

์ž๋ฐ”๋กœ Kafka Producer์™€ Consumer๋ฅผ ๊ฐ„๋‹จํ•˜๊ฒŒ ๊ตฌํ˜„ํ•˜๊ณ  ์‹คํ–‰ํ•ด๋ณด์•˜์Šต๋‹ˆ๋‹ค. ์œ„์˜ ์˜ˆ์ œ ์ฝ”๋“œ์—์„œ ์‚ดํŽด๋ณธ ๊ฒƒ ์™ธ์—๋„ ๋” ๋งŽ์€ ์˜ต์…˜ ์„ค์ •์ด ์žˆ๋Š”๋ฐ์š”. ์ด ์„ค์ •๋“ค์€ ์ด์–ด์ง€๋Š” ๊ธ€์„ ํ†ตํ•ด์„œ ์•Œ์•„๋ด…์‹œ๋‹ค.