Kafka Streams ์๋ฆฌ์ฆ ๋ชฉ์ฐจ
- 1. Kafka Streams ๊ฐ๋ ๊ณผ ์ํคํ ์ฒ
- 2. Kafka Streams KStream๊ณผ KTable
- 3. Kafka Streams ์๋์ฐ์ ์กฐ์ธ
- 4. Kafka Streams ์๋ฌ ์ฒ๋ฆฌ์ ๋ณต๊ตฌ ์ ๋ต
์ด๋ฒคํธ์ ์ํ, ๋ ๊ฐ์ง ๊ด์
์จ๋ผ์ธ ์ผํ๋ชฐ์์ ์ฌ์ฉ์์ ์ฃผ๋ฌธ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ค๊ณ ์๊ฐํด ๋ณด์. โ์ฌ์ฉ์ A๊ฐ ์ํ X๋ฅผ ์ฃผ๋ฌธํ๋คโ๋ ์ด๋ฒคํธ๋ ๋ฐ์ํ ๋๋ง๋ค ํ๋์ ๋ ๋ฆฝ์ ์ธ ์ฌ์ค์ด๋ค. ๊ฐ์ ์ฌ์ฉ์๊ฐ ๋ ๋ฒ ์ฃผ๋ฌธํ๋ฉด ๋ ๊ฐ์ ์ด๋ฒคํธ๊ฐ ์์ธ๋ค. ๋ฐ๋ฉด โ์ฌ์ฉ์ A์ ํ์ฌ ๋ฑ๊ธ์ ๊ณจ๋๋คโ๋ผ๋ ์ ๋ณด๋ ๋ฑ๊ธ์ด ๋ฐ๋ ๋๋ง๋ค ์ด์ ๊ฐ์ ๋์ฒดํ๋ ์ต์ ์ํ๋ค.
Kafka Streams๋ ์ด ๋ ๊ฐ์ง ๊ด์ ์ ๊ฐ๊ฐ KStream๊ณผ KTable์ด๋ผ๋ ์ถ์ํ๋ก ํํํ๋ค.
KStream: ๋์๋ ์ด๋ฒคํธ ํ๋ฆ
KStream์ ํ ํฝ์ ๊ฐ ๋ ์ฝ๋๋ฅผ ๋ ๋ฆฝ์ ์ธ ์ด๋ฒคํธ(INSERT)๋ก ํด์ํ๋ค. ๊ฐ์ key๋ฅผ ๊ฐ์ง ๋ ์ฝ๋๊ฐ ์ฌ๋ฌ ๋ฒ ๋ค์ด์๋ ์ด์ ๊ฐ์ ๋ฎ์ด์ฐ์ง ์๋๋ค. ๋ชจ๋ ๋ ์ฝ๋๊ฐ ์๋ฏธ ์๋ ๋ฐ์ดํฐ๋ค.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orderStream = builder.stream("order-events");
order-events ํ ํฝ์ ์๋ 4๊ฐ์ ๋ ์ฝ๋๊ฐ ๋ค์ด์จ๋ค๊ณ ํ์.
key=user-A, value={"item":"X", "amount":15000}
key=user-B, value={"item":"Y", "amount":8000}
key=user-A, value={"item":"Z", "amount":22000}
key=user-B, value={"item":"W", "amount":5000}
KStream์ ์ด 4๊ฐ๋ฅผ ๋ชจ๋ ๋
๋ฆฝ ์ด๋ฒคํธ๋ก ์ฒ๋ฆฌํ๋ค. key๊ฐ user-A์ธ ๋ ์ฝ๋๊ฐ 2๊ฐ์ง๋ง ์๋ก ๋์ฒดํ์ง ์๋๋ค.
KTable: key ๊ธฐ์ค ์ต์ ์ํ
KTable์ ๊ฐ์ key์ ๋ ์ฝ๋๊ฐ ๋ค์ด์ค๋ฉด ์ด์ ๊ฐ์ UPDATEํ๋ค. ํญ์ key๋น ์ต์ ๊ฐ ํ๋๋ง ์ ์งํ๋, ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ด๋ธ๊ณผ ๋น์ทํ ๊ตฌ์กฐ๋ค.
KTable<String, String> userGradeTable = builder.table("user-grades");
user-grades ํ ํฝ์ ์๋ ๋ ์ฝ๋๊ฐ ์์๋๋ก ๋ค์ด์ค๋ฉด:
key=user-A, value="SILVER"
key=user-B, value="GOLD"
key=user-A, value="GOLD"
KTable์ ์ต์ข
์ํ๋ user-A=GOLD, user-B=GOLD์ด๋ค. user-A์ ์ฒซ ๋ฒ์งธ ๊ฐ SILVER๋ GOLD๋ก ๋์ฒด๋๋ค.
๊ทธ๋ ๋ค๋ฉด ๊ฐ์ ํ ํฝ์ KStream์ผ๋ก ์ฝ์ผ๋ฉด ์ด๋ป๊ฒ ๋ ๊น? KStream์ 3๊ฐ์ ๋ ์ฝ๋๋ฅผ ๋ชจ๋ ๋ ๋ฆฝ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ค. โSILVER๊ฐ GOLD๋ก ๋ฐ๋์๋คโ๋ ํด์ ์์ด 3๊ฑด์ ์ด๋ฒคํธ๊ฐ ๊ฐ๊ฐ ํ๋ฌ๊ฐ๋ค.
๊ฐ์ ๋ฐ์ดํฐ๋ฅผ KStream์ผ๋ก ๋ณผ์ง KTable๋ก ๋ณผ์ง๋ ๋น์ฆ๋์ค ์๋ฏธ์ ๋ฐ๋ผ ๊ฒฐ์ ํ๋ค.
| ๊ตฌ๋ถ | KStream | KTable |
|---|---|---|
| ๋ ์ฝ๋ ํด์ | INSERT (๋ ๋ฆฝ ์ด๋ฒคํธ) | UPDATE (์ต์ ์ํ) |
| ๊ฐ์ key | ๋ชจ๋ ์ ์ง | ๋ง์ง๋ง ๊ฐ๋ง ์ ์ง |
| State Store | ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉํ์ง ์์ (stateful ์ฐ์ฐ ์ ์ฌ์ฉ) | ์ฌ์ฉ |
| ์ ํฉํ ๋ฐ์ดํฐ | ์ฃผ๋ฌธ, ํด๋ฆญ, ๋ก๊ทธ | ์ฌ์ฉ์ ํ๋กํ, ์ฌ๊ณ , ์ค์ ๊ฐ |
KStream๊ณผ KTable์ ์ธ์ ์ฐ๋๊ฐ
KStream์ ๋ฐ์ํ๋ ์ด๋ฒคํธ๋ฅผ ๋น ์ง์์ด ์ฒ๋ฆฌํด์ผ ํ ๋ ์ ํฉํ๋ค. ์ฃผ๋ฌธ, ํด๋ฆญ ๋ก๊ทธ, ์ผ์ ๋ฐ์ดํฐ์ฒ๋ผ ๊ฐ๋ณ ๊ฑด ์์ฒด๊ฐ ์๋ฏธ ์๋ ๋ฐ์ดํฐ๋ KStream์ผ๋ก ์ฝ๋ ๊ฒ์ด ์์ฐ์ค๋ฝ๋ค. ํ ๊ฑด ํ ๊ฑด์ด ๋ ๋ฆฝ์ ์ด๋ฏ๋ก ํํฐ๋ง, ๋ณํ, ๋ถ๊ธฐ ๊ฐ์ Stateless ์ฐ์ฐ์ ๊ฑฐ์ณ ๋ฐ๋ก ๋ค์ ์ฒ๋ฆฌ๋ก ๋๊ธธ ์ ์๋ค.
KTable์ ์ต์ ์ํ๋ฅผ ๊ด๋ฆฌํด์ผ ํ ๋ ์ ํฉํ๋ค. ์ฌ์ฉ์ ํ๋กํ, ์ํ ์ฌ๊ณ , ํ์จ์ฒ๋ผ ๊ฐ์ด ๊ฐฑ์ ๋๋ฉด ์ด์ ๊ฐ์ ์๋ฏธ๊ฐ ์์ด์ง๋ ๋ฐ์ดํฐ๊ฐ ๋ํ์ ์ด๋ค. State Store์ key ๋จ์๋ก ์ต์ ๊ฐ์ ์ ์งํ๋ฏ๋ก, ์ค์๊ฐ ์กฐํ๊ฐ ํ์ํ ์ฐธ์กฐ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๊ธฐ์ ์ข๋ค.
๋์ ์กฐํฉ์ ์ด๋ฒคํธ์ ์ต์ ์ํ๋ฅผ ๊ฒฐํฉํด์ผ ํ ๋ ์ง๊ฐ๋ฅผ ๋ฐํํ๋ค. ์๋ฅผ ๋ค์ด, ์ฃผ๋ฌธ ์ด๋ฒคํธ(KStream)๊ฐ ๋ค์ด์ฌ ๋๋ง๋ค ํด๋น ์ฌ์ฉ์์ ํ์ฌ ๋ฑ๊ธ(KTable)์ ๋ถ์ฌ์ ๋ฑ๊ธ๋ณ ํ ์ธ์ ์ ์ฉํ ์ ์๋ค. KStream ๋จ๋ ์ด๋ผ๋ฉด ๋ฑ๊ธ ์ ๋ณด๋ฅผ ๋งค๋ฒ ์ธ๋ถ ์์คํ ์ ์กฐํํด์ผ ํ๊ณ , KTable ๋จ๋ ์ด๋ผ๋ฉด ์ด๋ฒคํธ์ ํ๋ฆ ์์ฒด๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค. KStream-KTable ์กฐ์ธ์ ์ด๋ฒคํธ ํ๋ฆ๊ณผ ์ต์ ์ํ๋ฅผ Kafka ์์์ ๊ฒฐํฉํ๋ฏ๋ก, ์ธ๋ถ ํธ์ถ ์์ด๋ ํ๋ถํ ์ค์๊ฐ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง๋ค.
GlobalKTable: ์ ์ฒด ํํฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ก์ปฌ์
KTable์ ํํฐ์ ๋จ์๋ก ๋ถํ ๋๋ค. ์ฆ, ์ธ์คํด์ค A๋ ํํฐ์ 0~1์ ๋ฐ์ดํฐ๋ง ๊ฐ๊ณ , ์ธ์คํด์ค B๋ ํํฐ์ 2~3์ ๋ฐ์ดํฐ๋ง ๊ฐ๋๋ค. ์ด ํน์ฑ ๋๋ฌธ์ KStream๊ณผ KTable์ ์กฐ์ธํ๋ ค๋ฉด ๋ ํ ํฝ์ ํํฐ์ ์๊ฐ ๊ฐ๊ณ , ๊ฐ์ key๊ฐ ๊ฐ์ ํํฐ์ ์ ์์ด์ผ ํ๋ค. ์ด๋ฅผ co-partitioning์ด๋ผ ํ๋ฉฐ, ์กฐ์ธ ๋์ ํ ํฝ๋ค์ด ๊ฐ์ ํํฐ์ ์์ ๊ฐ์ ํํฐ์ ๋ ์ ๋ต์ ์ฌ์ฉํ๋ ๊ฒ์ ๋ปํ๋ค.
GlobalKTable์ ์ด ์ ์ฝ์ ์์ค๋ค. ๋ชจ๋ ์ธ์คํด์ค๊ฐ ํ ํฝ์ ์ ์ฒด ํํฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ก์ปฌ์ ๋ณต์ ํ๋ค.
GlobalKTable<String, String> productTable = builder.globalTable("products");
GlobalKTable์ co-partitioning ์์ด ์กฐ์ธ์ด ๊ฐ๋ฅํ๊ณ , ์ด๋ค key๋ก๋ ๋ก์ปฌ์์ ์กฐํํ ์ ์๋ค. ๋ค๋ง ํ ํฝ์ ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ๊ฐ ์ธ์คํด์ค๊ฐ ๋ชจ๋ ๊ฐ๊ณ ์์ผ๋ฏ๋ก, ๋ฐ์ดํฐ๊ฐ ๋ง์ผ๋ฉด ๋ฉ๋ชจ๋ฆฌ์ ๋์คํฌ ์ฌ์ฉ๋์ด ์ปค์ง๋ค. ์ํ ๋ง์คํฐ, ๊ตญ๊ฐ ์ฝ๋, ํ์จ ๊ฐ์ ์กฐํ์ฉ ์ฐธ์กฐ ๋ฐ์ดํฐ์ ์ ํฉํ๋ค.
Stateless ์ฐ์ฐ
Stateless ์ฐ์ฐ์ ํ์ฌ ๋ ์ฝ๋๋ง ๋ณด๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ด๋ ์ฐ์ฐ์ด๋ค. ์ด์ ๋ ์ฝ๋์ ์ํ๋ฅผ ๊ธฐ์ตํ ํ์๊ฐ ์์ผ๋ฏ๋ก State Store๋ฅผ ์ฌ์ฉํ์ง ์๋๋ค.
์๋ ์ฐ์ฐ๋ค์ ์ฃผ๋ก KStream์์ ์ฌ์ฉํ์ง๋ง, filter์ mapValues๋ KTable์์๋ ๋์ผํ๊ฒ ์ฌ์ฉํ ์ ์๋ค.
filter
์กฐ๊ฑด์ ๋ง๋ ๋ ์ฝ๋๋ง ํต๊ณผ์ํจ๋ค.
KStream<String, OrderEvent> highValueOrders = orderStream
.filter((key, event) -> event.getAmount() >= 10000);
map / mapValues
๋ ์ฝ๋์ key์ value๋ฅผ ๋ณํํ๋ค. mapValues๋ key๋ฅผ ๊ทธ๋๋ก ๋๊ณ value๋ง ๋ณํํ๋ฏ๋ก, ์ฌํํฐ์
๋์ด ๋ฐ์ํ์ง ์๋๋ค.
// key๋ ์ ์ง, value๋ง ๋ณํ
KStream<String, String> summaries = orderStream
.mapValues(event -> event.getItem() + ":" + event.getAmount());
// key์ value ๋ชจ๋ ๋ณํ (์ฌํํฐ์
๋ ๋ฐ์ ๊ฐ๋ฅ)
KStream<String, Integer> amountByItem = orderStream
.map((key, event) -> KeyValue.pair(event.getItem(), event.getAmount()));
map์ผ๋ก key๋ฅผ ๋ณ๊ฒฝํ ๋ค groupByKey๋ join ๊ฐ์ key ๊ธฐ๋ฐ ์ฐ์ฐ์ด ๋ค๋ฐ๋ฅด๋ฉด, Kafka Streams๊ฐ ๋ด๋ถ์ ์ผ๋ก repartition ํ ํฝ์ ์์ฑํด์ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ถ๋ฐฐํ๋ค. repartition ํ ํฝ์ ์ key ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฐ๋ฅธ ํํฐ์
์ ๋ค์ ๋ฐฐ์นํ๋ ์ค๊ฐ ํ ํฝ์ด๋ค.
key ๋ณ๊ฒฝ์ด ๋ถํ์ํ๋ค๋ฉด mapValues๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์ฑ๋ฅ์ ์ ๋ฆฌํ๋ค.
repartition์ด ๋ฐ์ํ๋ฉด ๋ชจ๋ ๋ ์ฝ๋๊ฐ ์ key ๊ธฐ์ค์ผ๋ก ์ง๋ ฌํ๋์ด repartition ํ ํฝ์ ์ฐ์ธ ๋ค, ๋ค์ ์ฝํ์ ๋ค์ ํ๋ก์ธ์๋ก ์ ๋ฌ๋๋ค.
์ด ๊ณผ์ ์์ ๋คํธ์ํฌ I/O์ ๋์คํฌ I/O๊ฐ ์ถ๊ฐ๋ก ๋ฐ์ํ๊ณ , repartition ํ ํฝ์ ์
๋ ฅ ํ ํฝ๊ณผ ๊ฐ์ ํํฐ์
์๋ก ์์ฑ๋๋ฏ๋ก ๋ธ๋ก์ปค์ ๋์คํฌ ์ฌ์ฉ๋๋ ๋์ด๋๋ค.
map, selectKey, groupBy๋ก key๋ฅผ ๋ณ๊ฒฝํ ๋ค key ๊ธฐ๋ฐ ์ฐ์ฐ(groupByKey, join ๋ฑ)์ด ๋ค๋ฐ๋ฅด๋ฉด repartition์ด ๋ฐ์ํ ์ ์๋ค. key ๋ณ๊ฒฝ์ด ํ์ ์๋ค๋ฉด mapValues์ groupByKey๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์์ ํ๋ค.
// repartition ๋ฐ์: selectKey๋ก key๋ฅผ ๋ณ๊ฒฝํ ๋ค groupByKey
orderStream
.selectKey((key, event) -> event.getItem())
.groupByKey()
.count();
// repartition ์์: groupBy ๋์ ๊ธฐ์กด key๋ฅผ ์ ์ง
orderStream
.groupByKey()
.count();
flatMapValues
ํ๋์ ๋ ์ฝ๋๋ฅผ ์ฌ๋ฌ ๊ฐ๋ก ํผ์น๋ค. 1ํธ์์ ๋ณธ WordCount์ ๋จ์ด ๋ถ๋ฆฌ๊ฐ ๋ํ์ ์ธ ์๋ค.
KStream<String, String> words = textLines
.flatMapValues(line -> Arrays.asList(line.split("\\W+")));
branch
์กฐ๊ฑด์ ๋ฐ๋ผ ์คํธ๋ฆผ์ ์ฌ๋ฌ ๊ฐ๋๋ก ๋ถ๊ธฐํ๋ค.
@SuppressWarnings("unchecked")
KStream<String, OrderEvent>[] branches = orderStream.branch(
(key, event) -> event.getAmount() >= 50000, // [0] ๊ณ ๊ฐ ์ฃผ๋ฌธ
(key, event) -> event.getAmount() >= 10000, // [1] ์ค๊ฐ ์ฃผ๋ฌธ
(key, event) -> true // [2] ๊ทธ ์ธ
);
KStream<String, OrderEvent> premiumOrders = branches[0];
KStream<String, OrderEvent> standardOrders = branches[1];
KStream<String, OrderEvent> budgetOrders = branches[2];
branch์ ์กฐ๊ฑด์ ์์๋๋ก ํ๊ฐ๋๋ฉฐ, ์ฒซ ๋ฒ์งธ๋ก ์ผ์นํ๋ ๋ถ๊ธฐ๋ก ๋ผ์ฐํ
๋๋ค.
๋ง์ง๋ง ์กฐ๊ฑด์ (key, event) -> true๋ก ๋๋ฉด ์์ ์กฐ๊ฑด์ ๊ฑธ๋ฆฌ์ง ์์ ๋ชจ๋ ๋ ์ฝ๋๋ฅผ ์ก์๋ผ ์ ์๋ค.
branch๋ ๋ฐฐ์ด ๋ฐํ ๋ฐฉ์์ด๋ผ ํ์
์์ ์ฑ์ด ๋จ์ด์ง๋๋ฐ, Kafka 2.8 ์ดํ์๋ split().branch().defaultBranch()๋ก ๋์ฒดํ ์ ์๋ค.
Stateful ์ฐ์ฐ
Stateful ์ฐ์ฐ์ ์ฌ๋ฌ ๋ ์ฝ๋์ ๊ฑธ์ณ ์ํ๋ฅผ ์ ์งํ๋ฉฐ ๊ฒฐ๊ณผ๋ฅผ ๋์ ํ๋ ์ฐ์ฐ์ด๋ค. State Store(๊ธฐ๋ณธ RocksDB)์ ์ค๊ฐ ์ํ๋ฅผ ์ ์ฅํ๊ณ , changelog ํ ํฝ์ ๋ณ๊ฒฝ ์ฌํญ์ ๊ธฐ๋กํด ์ฅ์ ๋ณต๊ตฌ๋ฅผ ๋ณด์ฅํ๋ค.
KStream์์ groupByKey๋ groupBy๋ก ๋ ์ฝ๋๋ฅผ ๋ฌถ์ผ๋ฉด KGroupedStream์ด ๋๊ณ , ์ฌ๊ธฐ์ count, reduce, aggregate ๊ฐ์ ์ง๊ณ๋ฅผ ์ ์ฉํ๋ฉด ๊ฒฐ๊ณผ๊ฐ KTable๋ก ๋ฐํ๋๋ค.
groupByKey vs groupBy
// key๋ฅผ ๋ณ๊ฒฝํ์ง ์์ ๋ (์ฌํํฐ์
๋ ์์)
KGroupedStream<String, OrderEvent> groupedByUser = orderStream.groupByKey();
// key๋ฅผ ๋ณ๊ฒฝํ ๋ (์ฌํํฐ์
๋ ๋ฐ์)
KGroupedStream<String, OrderEvent> groupedByItem = orderStream
.groupBy((key, event) -> event.getItem());
groupByKey๋ ํ์ฌ key๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉํ๋ฏ๋ก, ์๋ key๊ฐ ์ ์ง๋ ๊ฒฝ์ฐ์๋ ์ฌํํฐ์
๋์ด ๋ฐ์ํ์ง ์๋๋ค. ๋ค๋ง ์
์คํธ๋ฆผ์์ selectKey๋ map์ผ๋ก key๋ฅผ ์ด๋ฏธ ๋ณ๊ฒฝํ ์ํ๋ผ๋ฉด groupByKey()๋ ๋ด๋ถ์ ์ผ๋ก repartition์ด ํ์ํ๋ค.
groupBy๋ ์ key๋ฅผ ์ง์ ํ๋ฏ๋ก ๋ด๋ถ์ ์ผ๋ก repartition ํ ํฝ์ด ์๊ธด๋ค.
๊ฐ๋ฅํ๋ฉด groupByKey๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ๋คํธ์ํฌ ๋น์ฉ๊ณผ ์ฒ๋ฆฌ ์ง์ฐ์ ์ค์ผ ์ ์๋ค.
count
key๋ณ ๋ ์ฝ๋ ์๋ฅผ ์ผ๋ค. ๊ฒฐ๊ณผ๋ KTable๋ก ๋ฐํ๋๋ค.
KTable<String, Long> orderCountByUser = orderStream
.groupByKey()
.count();
reduce
๊ฐ์ ํ์ ์ ๋ ๊ฐ์ ํ๋๋ก ํฉ์น๋ค. ์ง๊ณ ๊ฒฐ๊ณผ์ ํ์ ์ด ์ ๋ ฅ ๋ ์ฝ๋์ value ํ์ ๊ณผ ๊ฐ์์ผ ํ๋ค.
// ์ฌ์ฉ์๋ณ ์ต๋ ์ฃผ๋ฌธ ๊ธ์ก
KTable<String, OrderEvent> maxOrderByUser = orderStream
.groupByKey()
.reduce((aggValue, newValue) ->
aggValue.getAmount() >= newValue.getAmount() ? aggValue : newValue
);
aggregate
reduce๋ณด๋ค ์ ์ฐํ ์ง๊ณ ์ฐ์ฐ์ด๋ค. ์ด๊ธฐ๊ฐ(initializer)๊ณผ ์ง๊ณ ํจ์(aggregator)๋ฅผ ๋ฐ๊ณ , ๊ฒฐ๊ณผ ํ์
์ด ์
๋ ฅ๊ณผ ๋ฌ๋ผ๋ ๋๋ค.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Materialized;
KTable<String, Long> totalAmountByUser = orderStream
.groupByKey()
.aggregate(
() -> 0L, // ์ด๊ธฐ๊ฐ
(key, event, currentTotal) -> currentTotal + event.getAmount(),
Materialized.with(Serdes.String(), Serdes.Long())
);
Materialized.with(...)๋ก State Store์ key/value Serdes๋ฅผ ์ง์ ํ๋ค.
๊ฒฐ๊ณผ ํ์
์ด ์
๋ ฅ๊ณผ ๋ค๋ฅด๊ธฐ ๋๋ฌธ์(OrderEvent โ Long) Serdes๋ฅผ ๋ช
์ํด์ผ ํ๋ค.
KStream-KTable ์กฐ์ธ
์ฃผ๋ฌธ ์ด๋ฒคํธ(KStream)์ ์ฌ์ฉ์ ๋ฑ๊ธ ์ ๋ณด(KTable)๋ฅผ ๋ถ์ด๋ ๊ฒ์ ์ค๋ฌด์์ ๋งค์ฐ ํํ ํจํด์ด๋ค. KStream-KTable ์กฐ์ธ์ ์คํธ๋ฆผ์ ๊ฐ ๋ ์ฝ๋์ ๋ํด ๊ฐ์ key๋ฅผ ๊ฐ์ง KTable์ ํ์ฌ ๊ฐ์ ์ฐพ์ ๊ฒฐํฉํ๋ค.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, OrderEvent> orders = builder.stream(
"order-events",
Consumed.with(Serdes.String(), orderEventSerde)
);
KTable<String, String> userGrades = builder.table(
"user-grades",
Consumed.with(Serdes.String(), Serdes.String())
);
// ์ฃผ๋ฌธ ์ด๋ฒคํธ์ ์ฌ์ฉ์ ๋ฑ๊ธ์ ๋ถ์ธ๋ค
KStream<String, EnrichedOrder> enrichedOrders = orders.join(
userGrades,
(order, grade) -> new EnrichedOrder(
order.getOrderId(),
order.getItem(),
order.getAmount(),
grade // KTable์์ ๊ฐ์ ธ์จ ํ์ฌ ๋ฑ๊ธ
)
);
enrichedOrders.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde));
์ด ์กฐ์ธ์ด ๋์ํ๋ ค๋ฉด order-events์ user-grades ํ ํฝ์ key๊ฐ ๋์ผํ ๊ธฐ์ค(์: userId)์ด๊ณ , ํํฐ์
์๊ฐ ๊ฐ์์ผ ํ๋ค(co-partitioning).
KStream-KTable ์กฐ์ธ์ inner join์ด ๊ธฐ๋ณธ์ด๋ค. ์ฆ, KTable์ ํด๋น key๊ฐ ์์ผ๋ฉด ๊ทธ ์คํธ๋ฆผ ๋ ์ฝ๋๋ ๊ฒฐ๊ณผ์ ํฌํจ๋์ง ์๋๋ค.
๋ชจ๋ ์ฃผ๋ฌธ ์ด๋ฒคํธ๋ฅผ ๊ฒฐ๊ณผ์ ํฌํจํ๊ณ ์ถ๋ค๋ฉด leftJoin์ ์ฌ์ฉํ๋ค.
KStream<String, EnrichedOrder> enrichedOrders = orders.leftJoin(
userGrades,
(order, grade) -> new EnrichedOrder(
order.getOrderId(),
order.getItem(),
order.getAmount(),
grade != null ? grade : "UNKNOWN"
)
);
leftJoin์์ KTable์ ๋งค์นญ๋๋ ๊ฐ์ด ์์ผ๋ฉด grade๊ฐ null๋ก ์ ๋ฌ๋๋ค.
๋ง์ฝ co-partitioning ์ ์ฝ์ด ๋ฌธ์ ๊ฐ ๋๋ค๋ฉด GlobalKTable์ ์ฌ์ฉํ ์ ์๋ค.
GlobalKTable ์กฐ์ธ์์๋ KeyValueMapper๋ก ์กฐ์ธ key๋ฅผ ์ง์ ์ง์ ํ๋ฏ๋ก ํํฐ์
์๊ฐ ๋ฌ๋ผ๋ ๋์ํ๋ค.
GlobalKTable<String, String> productTable = builder.globalTable("products");
KStream<String, EnrichedOrder> withProduct = orders.join(
productTable,
(orderKey, orderValue) -> orderValue.getItem(), // products ํ ํฝ์ key์ ๋งค์นญํ ๊ฐ
(order, productInfo) -> new EnrichedOrder(order, productInfo)
);
KTable์ ์ค๊ฐ ๊ฐฑ์ ๊ณผ suppress
KTable์ key์ ๊ฐ์ด ๊ฐฑ์ ๋๋ฉด ๋ค์ด์คํธ๋ฆผ(downstream) ํ๋ก์ธ์์ ๋ณ๊ฒฝ ๋ ์ฝ๋๋ฅผ ๋ด๋ณด๋ธ๋ค.
์ค์ ๋ก๋ Kafka Streams์ Record Cache(statestore.cache.max.bytes, ๊ธฐ๋ณธ 10MB)๊ฐ ๊ฐ์ key์ ๋ํ ์ฐ์ ๊ฐฑ์ ์ ๋ณํฉํด์ ์บ์๊ฐ ํ๋ฌ์๋ ๋ ๋ง์ง๋ง ๊ฐ๋ง ์ ๋ฌํ๋ฏ๋ก, ๋ชจ๋ ์ค๊ฐ ๊ฐ์ด ๊ทธ๋๋ก ๋๊ฐ์ง๋ ์๋๋ค.
๊ทธ๋ผ์๋ ์บ์ ํ๋ฌ์ ์ฃผ๊ธฐ์ ๋ฐ๋ผ ์๋น์์ ์ค๊ฐ ๊ฐฑ์ ์ด ๋ค์ด์คํธ๋ฆผ์ผ๋ก ์ ๋ฌ๋ ์ ์๋ค.
์๋ฅผ ๋ค์ด count()๋ก ๋จ์ด ์นด์ดํธ๋ฅผ ๊ตฌํ๋ฉด, ๊ฐ์ ๋จ์ด๊ฐ ๋ค์ด์ฌ ๋๋ง๋ค ์นด์ดํธ๊ฐ 1, 2, 3, โฆ ์ผ๋ก ๊ฐฑ์ ๋๋ฉฐ ์บ์ ํ๋ฌ์ ์์ ๋ง๋ค ์ค๊ฐ ๊ฒฐ๊ณผ๊ฐ ์ถ๋ ฅ๋๋ค.
์๋์ฐ ์ง๊ณ์์๋ ์ด ์ค๊ฐ ๊ฐฑ์ ์ด ํนํ ๋ฌธ์ ๊ฐ ๋๋ค. 5๋ถ ์๋์ฐ ์์์ ์์ฒ ๊ฑด์ ๋ ์ฝ๋๊ฐ ๋ค์ด์ค๋ฉด ์บ์๊ฐ ์ค์ฌ์ฃผ๋๋ผ๋ ์๋น์์ ์ค๊ฐ ๊ฒฐ๊ณผ๊ฐ ์ถ๋ ฅ ํ ํฝ์ ์ฐ์ธ๋ค.
์ต์ข
๊ฒฐ๊ณผ๋ง ํ์ํ๋ค๋ฉด suppress๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
KTable<Windowed<String>, Long> finalCounts = orders
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), Duration.ofMinutes(1)))
.count()
.suppress(
Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()));
suppress(untilWindowCloses(unbounded()))๋ ์๋์ฐ๊ฐ ๋ซํ ๋๊น์ง ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ ๋ฒํผ์ ๋ณด๊ดํ๋ค๊ฐ, ์๋์ฐ๊ฐ ๋ซํ๋ ์์ ์ ์ต์ข
๊ฒฐ๊ณผ ํ๋๋ง ๋ด๋ณด๋ธ๋ค.
unbounded()๋ ๋ฒํผ ํฌ๊ธฐ์ ์ ํ์ ๋์ง ์๋๋ค๋ ์๋ฏธ๋ค. key ์นด๋๋๋ฆฌํฐ๊ฐ ๋๊ฑฐ๋ ์๋์ฐ๊ฐ ๊ธธ๋ฉด ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ปค์ง ์ ์์ผ๋ฏ๋ก, ์ํ์ด ํ์ํ๋ค๋ฉด maxRecords๋ maxBytes๋ก ์ ํ๋ ๋ฒํผ๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
Suppressed.BufferConfig
.maxRecords(10000)
.shutDownWhenFull()
shutDownWhenFull()์ ๋ฒํผ๊ฐ ๊ฐ๋ ์ฐจ๋ฉด ์คํธ๋ฆผ์ ์ข
๋ฃํ๊ณ , emitEarlyWhenFull()์ ๋ฒํผ๊ฐ ์ฐจ๋ฉด ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ ๋ด๋ณด๋ด๋ฉด์ ๊ณ์ ๋์ํ๋ค.
DSL๊ณผ Processor API ํผํฉ ์ฌ์ฉ
1ํธ์์ Streams DSL๊ณผ Processor API์ ์ฐจ์ด๋ฅผ ๋ค๋ค๋ค. ๋๋ถ๋ถ์ ๊ฒฝ์ฐ DSL๋ง์ผ๋ก ์ถฉ๋ถํ์ง๋ง, DSL ์ฒด์ธ ์ค๊ฐ์ State Store์ ์ง์ ์ ๊ทผํ๊ฑฐ๋ ์กฐ๊ฑด๋ถ ๋ผ์ฐํ ์ด ํ์ํ ๊ฒฝ์ฐ๊ฐ ์๋ค.
process()๋ฅผ ์ฌ์ฉํ๋ฉด DSL ์ฒด์ธ ์์์ Processor API์ Processor๋ฅผ ๋ผ์ ๋ฃ์ ์ ์๋ค.
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("dedup-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
orderStream.process(() -> new Processor<
String, OrderEvent, String, OrderEvent>() {
private KeyValueStore<String, Long> store;
private ProcessorContext<String, OrderEvent> context;
@Override
public void init(
ProcessorContext<String, OrderEvent> ctx) {
this.context = ctx;
this.store = ctx.getStateStore("dedup-store");
}
@Override
public void process(
Record<String, OrderEvent> record) {
String orderId = record.value().getOrderId();
if (store.get(orderId) != null) {
return;
}
store.put(orderId, record.timestamp());
context.forward(record);
}
}, "dedup-store");
์ด ์์๋ ์ฃผ๋ฌธ ID ๊ธฐ์ค์ผ๋ก ์ค๋ณต ๋ ์ฝ๋๋ฅผ ๊ฑธ๋ฌ๋ด๋ ํจํด์ด๋ค. State Store์ ์ด๋ฏธ ์ฒ๋ฆฌํ ์ฃผ๋ฌธ ID๋ฅผ ๊ธฐ๋กํด ๋๊ณ , ๊ฐ์ ID๊ฐ ๋ค์ ๋ค์ด์ค๋ฉด ๋ค์ด์คํธ๋ฆผ์ผ๋ก ์ ๋ฌํ์ง ์๋๋ค.
process()์ ๋ง์ง๋ง ์ธ์๋ก ์ ๊ทผํ State Store ์ด๋ฆ์ ์ ๋ฌํด์ผ ํ๋ค.
Serdes ์ค์ ๊ณผ ์ง๋ ฌํ
Kafka Streams์์ ๋ ์ฝ๋์ key์ value๋ฅผ ์ง๋ ฌํ/์ญ์ง๋ ฌํํ๋ ๊ฒ์ด Serdes๋ค.
Serdes๋ Serializer์ Deserializer๋ฅผ ๋ฌถ์ ๋ํผ ํด๋์ค์ด๋ฉฐ, Kafka Streams์ ๋ชจ๋ ์
์ถ๋ ฅ ์ง์ ์์ ์ฌ์ฉ๋๋ค.
๊ธฐ๋ณธ ์ ๊ณต๋๋ Serdes๋ Serdes.String(), Serdes.Long(), Serdes.Integer(), Serdes.ByteArray() ๋ฑ์ด ์๋ค.
JSON ๊ฐ์ ์ปค์คํ
ํฌ๋งท์ ์ง์ Serdes๋ฅผ ๋ง๋ค์ด์ผ ํ๋ค.
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
public class JsonSerde<T> implements Serde<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Class<T> targetType;
public JsonSerde(Class<T> targetType) {
this.targetType = targetType;
}
@Override
public Serializer<T> serializer() {
return (topic, data) -> {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("JSON ์ง๋ ฌํ ์คํจ. topic=" + topic, e);
}
};
}
@Override
public Deserializer<T> deserializer() {
return (topic, data) -> {
try {
return objectMapper.readValue(data, targetType);
} catch (Exception e) {
throw new SerializationException("JSON ์ญ์ง๋ ฌํ ์คํจ. topic=" + topic, e);
}
};
}
}
์ด Serdes๋ฅผ ์คํธ๋ฆผ ์ฐ์ฐ์์ ์ฌ์ฉํ ๋๋ Consumed, Produced, Materialized ๋ฑ์ ์ ๋ฌํ๋ค.
Serde<OrderEvent> orderEventSerde = new JsonSerde<>(OrderEvent.class);
KStream<String, OrderEvent> orders = builder.stream(
"order-events",
Consumed.with(Serdes.String(), orderEventSerde)
);
Serdes๋ฅผ ์ง์ ํ์ง ์์ผ๋ฉด StreamsConfig์ ์ค์ ํ DEFAULT_KEY_SERDE_CLASS_CONFIG์ DEFAULT_VALUE_SERDE_CLASS_CONFIG๊ฐ ์ฌ์ฉ๋๋ค.
๊ธฐ๋ณธ Serdes์ ์ค์ ๋ฐ์ดํฐ ํ์
์ด ๋ง์ง ์์ผ๋ฉด ๋ฐํ์์ ClassCastException์ด ๋ฐ์ํ๋ฏ๋ก, ํ์
์ด ๋ค๋ฅธ ์ฐ์ฐ์์๋ Serdes๋ฅผ ๋ช
์์ ์ผ๋ก ์ง์ ํ๋ ๊ฒ์ด ์์ ํ๋ค.
ํนํ aggregate๋ count ๊ฐ์ stateful ์ฐ์ฐ์์ ๊ฒฐ๊ณผ ํ์
์ด ๋ฐ๋๋ ๊ฒฝ์ฐ Materialized.with(...)๋ก Serdes๋ฅผ ๋ฐ๋์ ์ง์ ํด์ผ ํ๋ค.
์ด๋ฅผ ๋น ๋จ๋ฆฌ๋ฉด State Store์ ๊ฐ์ ์ฐ๊ฑฐ๋ ์ฝ์ ๋ ์ง๋ ฌํ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค.
๋ค์ ๊ธ์์ ๋ค๋ฃฐ ๋ด์ฉ
์ด ๊ธ์์๋ KStream๊ณผ KTable์ ์ฐจ์ด, Stateless/Stateful ์ฐ์ฐ, ์กฐ์ธ, Serdes ์ค์ ์ ๋ค๋ค๋ค. ๋ค์ ๊ธ์์๋ ์๊ฐ ๊ธฐ๋ฐ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ฌถ๋ ์๋์ฐ(Tumbling, Hopping, Session)์ KStream-KStream ์๊ฐ ์กฐ์ธ, Grace Period๋ฅผ ๋ค๋ฃฌ๋ค.