Getting Started With Kafka!
Kafka Producer์ Consumer๋ฅผ ์๋ฐ๋ก ์ง์ ๊ตฌํํ๋ ๊ฒ์ ์๊ฐ๋ณด๋ค ๊ฐ๋จํฉ๋๋ค. ํ์ง๋ง ์ฝ๋๋ฅผ ์คํํ์ฌ ๊ฒฐ๊ณผ๊น์ง ํ์ธํ๊ธฐ ์ํด์๋ ์๋์ ๊ฐ์ด Kafka ์ค์น ๊ณผ์ ์ด ํ์ํฉ๋๋ค. kafka๋ zookeeper์ ๊ฐ์ด ์์ง์ ๋๋ค. ๊ทธ๋ ๊ธฐ ๋๋ฌธ์ zookeeper ์ค์น๋ ํ์ํฉ๋๋ค.
kafka & zookeeper ์ค์นํ๊ธฐ
brew install kafka
brew install zookeeper
zkServer start # zookeeper running
kafka-server-start /usr/local/etc/kafka/server.properties # kafka running
kafka topic ์์ฑํ๊ธฐ
# replication-factor : ๋ณต์ ๋ณธ ๊ฐ์(1)
# partitions : ํํฐ์
๊ฐ์(1)
# topic : ํ ํฝ๋ช
(taeng)
$ kafka-topics --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic taeng
์ผ๋จ ์นดํ์นด๋ฅผ ์ฝ๋๋ก ๊ตฌํํ ์ค๋น๋ ๋ชจ๋ ๋๋ฌ์ต๋๋ค. ๋ค๋ง ๊ฐ๋จํ ์์ ๋ฅผ ์ดํด๋ณด๊ธฐ ์ํ ์ต์ํ์ ์ค๋น์ ๋๋ค. ์ค์ ์ค๋ฌด์์๋ ์ง๊ธ๊ณผ ๊ฐ์ด ๋จ์ผ ์์คํ ์ผ๋ก ์ฌ์ฉํ์ง๋ ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ๋ณธ๊ฒฉ์ ์ธ ๊ตฌํ์ ์์! ์ด๋ฒ ํฌ์คํ ์์๋ kafka๋ zookeeper์ ๋ํ ์ด๋ก ์ ์ธ ๋ถ๋ถ์ ์์ธํ ๋ค๋ฃจ์ง ์์ต๋๋ค.
๊ตฌํํด๋ณด์! Kafka Producer
์์ ๋ ๊ฐ๋จํฉ๋๋ค. ๋จ์ํ๊ฒ ํค๋ณด๋ ์ ๋ ฅ์ ๋ฐ์ ์ ์กํ๋ ๊ฒ์ด์ง์. ์ค๋ฌด์์๋ kafka ์ ์ก์ ์์ด์ ์ฐ๊ฒฐ์ ๋๊ฑฐ๋ ์ ์งํ๋ ๊ฒ์ด ์๊ฒฉํ๊ฒ ๊ด๋ฆฌ๋๊ฒ ์ง๋ง ์ง๊ธ์ ๊ฐ๋จํ ์์ ์ธ๋งํผ ๋จ์ํ๊ฒ ํน์ ๋ฉ์์ง๋ฅผ ์ ๋ ฅํ๋ฉด ์ฐ๊ฒฐ์ด ์ข ๋ฃ๋๋๋ก ํฉ์๋ค.
/**
* Kafka Producer.
* keyboard input์ ํตํด ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค.
*
* @author Kimtaeng
* Created on 2018. 9. 10.
*/
public class MyKafkaProducer {
private static final String TOPIC_NAME = "taeng";
private static final String FIN_MESSAGE = "exit";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while(true) {
Scanner sc = new Scanner(System.in);
System.out.print("Input > ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// some exception
}
});
} catch (Exception e) {
// exception
} finally {
producer.flush();
}
if(StringUtils.equals(message, FIN_MESSAGE)) {
producer.close();
break;
}
}
}
}
๊ตฌํํด๋ณด์! Kafka Consumer
Consumer์ ๊ฒฝ์ฐ๋ ๊ตฌ๋ (subscribe)์ ์์ํ ํ poll์ ํตํด ๋ ์ฝ๋๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค. topic์ ๊ฒฝ์ฐ์ List๋ก ์ค์ ๊ฐ๋ฅํฉ๋๋ค. ๋จ์ผ topic์ด ์๋๋ผ๋ ๊ฒ์ด์ง์. poll ๋ฉ์๋์ ํ๋ผ๋ฏธํฐ๋ ๋ ์ฝ๋๋ฅผ ๊ธฐ๋ค๋ฆด ์ต๋ ๋ธ๋ญ ์๊ฐ์ ๋๋ค. ๊ทธ๋ฆฌ๊ณ ์์ ์ดํด๋ณธ Producer์ ๋์ผํ๊ฒ ํน์ ๋ฉ์์ง๋ฅผ ๋ฐ์ผ๋ฉด ์ข ๋ฃํ๊ฒ ๋ฉ๋๋ค.
/**
* Kafka Consumer.
* Producer๊ฐ ์ ์กํ ๋ฉ์์ง๋ฅผ ๋ฐ๋๋ค.
*
* @author Kimtaeng
* Created on 2018. 9. 10.
*/
public class MyKafkaConsumer {
private static final String TOPIC_NAME = "taeng";
private static final String FIN_MESSAGE = "exit";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (!StringUtils.equals(message, FIN_MESSAGE));
} catch(Exception e) {
// exception
} finally {
consumer.close();
}
}
}
์คํํด๋ณด์!
๋จผ์ Consumer๋ฅผ ๋ฏธ๋ฆฌ ์คํํ ํ์ Producer๋ฅผ ์คํํด์ผ ํฉ๋๋ค. Producer๋ฅผ ์คํํ ํ ํค๋ณด๋ ์ ๋ ฅ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ ๋ ฅํ๋ฉด Consumer์ ์ฝ์ ํ๋ฉด์์ ์ ์ก๋ ๋ฉ์์ง๋ฅผ ํ์ธํ ์ ์์ต๋๋ค.


์๋ฐ๋ก Kafka Producer์ Consumer๋ฅผ ๊ฐ๋จํ๊ฒ ๊ตฌํํ๊ณ ์คํํด๋ณด์์ต๋๋ค. ์์ ์์ ์ฝ๋์์ ์ดํด๋ณธ ๊ฒ ์ธ์๋ ๋ ๋ง์ ์ต์ ์ค์ ์ด ์๋๋ฐ์. ์ด ์ค์ ๋ค์ ์ด์ด์ง๋ ๊ธ์ ํตํด์ ์์๋ด ์๋ค.