Kafka Streams ์‹œ๋ฆฌ์ฆˆ ๋ชฉ์ฐจ


์ด๋ฒคํŠธ์™€ ์ƒํƒœ, ๋‘ ๊ฐ€์ง€ ๊ด€์ 

์˜จ๋ผ์ธ ์‡ผํ•‘๋ชฐ์—์„œ ์‚ฌ์šฉ์ž์˜ ์ฃผ๋ฌธ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค๊ณ  ์ƒ๊ฐํ•ด ๋ณด์ž. โ€œ์‚ฌ์šฉ์ž A๊ฐ€ ์ƒํ’ˆ X๋ฅผ ์ฃผ๋ฌธํ–ˆ๋‹คโ€๋Š” ์ด๋ฒคํŠธ๋Š” ๋ฐœ์ƒํ•  ๋•Œ๋งˆ๋‹ค ํ•˜๋‚˜์˜ ๋…๋ฆฝ์ ์ธ ์‚ฌ์‹ค์ด๋‹ค. ๊ฐ™์€ ์‚ฌ์šฉ์ž๊ฐ€ ๋‘ ๋ฒˆ ์ฃผ๋ฌธํ•˜๋ฉด ๋‘ ๊ฐœ์˜ ์ด๋ฒคํŠธ๊ฐ€ ์Œ“์ธ๋‹ค. ๋ฐ˜๋ฉด โ€œ์‚ฌ์šฉ์ž A์˜ ํ˜„์žฌ ๋“ฑ๊ธ‰์€ ๊ณจ๋“œ๋‹คโ€๋ผ๋Š” ์ •๋ณด๋Š” ๋“ฑ๊ธ‰์ด ๋ฐ”๋€” ๋•Œ๋งˆ๋‹ค ์ด์ „ ๊ฐ’์„ ๋Œ€์ฒดํ•˜๋Š” ์ตœ์‹  ์ƒํƒœ๋‹ค.

Kafka Streams๋Š” ์ด ๋‘ ๊ฐ€์ง€ ๊ด€์ ์„ ๊ฐ๊ฐ KStream๊ณผ KTable์ด๋ผ๋Š” ์ถ”์ƒํ™”๋กœ ํ‘œํ˜„ํ•œ๋‹ค.


KStream: ๋์—†๋Š” ์ด๋ฒคํŠธ ํ๋ฆ„

KStream์€ ํ† ํ”ฝ์˜ ๊ฐ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋…๋ฆฝ์ ์ธ ์ด๋ฒคํŠธ(INSERT)๋กœ ํ•ด์„ํ•œ๋‹ค. ๊ฐ™์€ key๋ฅผ ๊ฐ€์ง„ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์—ฌ๋Ÿฌ ๋ฒˆ ๋“ค์–ด์™€๋„ ์ด์ „ ๊ฐ’์„ ๋ฎ์–ด์“ฐ์ง€ ์•Š๋Š”๋‹ค. ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๊ฐ€ ์˜๋ฏธ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋‹ค.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orderStream = builder.stream("order-events");

order-events ํ† ํ”ฝ์— ์•„๋ž˜ 4๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋“ค์–ด์˜จ๋‹ค๊ณ  ํ•˜์ž.

key=user-A, value={"item":"X", "amount":15000}
key=user-B, value={"item":"Y", "amount":8000}
key=user-A, value={"item":"Z", "amount":22000}
key=user-B, value={"item":"W", "amount":5000}

KStream์€ ์ด 4๊ฐœ๋ฅผ ๋ชจ๋‘ ๋…๋ฆฝ ์ด๋ฒคํŠธ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค. key๊ฐ€ user-A์ธ ๋ ˆ์ฝ”๋“œ๊ฐ€ 2๊ฐœ์ง€๋งŒ ์„œ๋กœ ๋Œ€์ฒดํ•˜์ง€ ์•Š๋Š”๋‹ค.


KTable: key ๊ธฐ์ค€ ์ตœ์‹  ์ƒํƒœ

KTable์€ ๊ฐ™์€ key์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ์ด์ „ ๊ฐ’์„ UPDATEํ•œ๋‹ค. ํ•ญ์ƒ key๋‹น ์ตœ์‹  ๊ฐ’ ํ•˜๋‚˜๋งŒ ์œ ์ง€ํ•˜๋Š”, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ํ…Œ์ด๋ธ”๊ณผ ๋น„์Šทํ•œ ๊ตฌ์กฐ๋‹ค.

KTable<String, String> userGradeTable = builder.table("user-grades");

user-grades ํ† ํ”ฝ์— ์•„๋ž˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์ˆœ์„œ๋Œ€๋กœ ๋“ค์–ด์˜ค๋ฉด:

key=user-A, value="SILVER"
key=user-B, value="GOLD"
key=user-A, value="GOLD"

KTable์˜ ์ตœ์ข… ์ƒํƒœ๋Š” user-A=GOLD, user-B=GOLD์ด๋‹ค. user-A์˜ ์ฒซ ๋ฒˆ์งธ ๊ฐ’ SILVER๋Š” GOLD๋กœ ๋Œ€์ฒด๋๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด ๊ฐ™์€ ํ† ํ”ฝ์„ KStream์œผ๋กœ ์ฝ์œผ๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ? KStream์€ 3๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋ชจ๋‘ ๋…๋ฆฝ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค. โ€œSILVER๊ฐ€ GOLD๋กœ ๋ฐ”๋€Œ์—ˆ๋‹คโ€๋Š” ํ•ด์„ ์—†์ด 3๊ฑด์˜ ์ด๋ฒคํŠธ๊ฐ€ ๊ฐ๊ฐ ํ˜๋Ÿฌ๊ฐ„๋‹ค.

๊ฐ™์€ ๋ฐ์ดํ„ฐ๋ฅผ KStream์œผ๋กœ ๋ณผ์ง€ KTable๋กœ ๋ณผ์ง€๋Š” ๋น„์ฆˆ๋‹ˆ์Šค ์˜๋ฏธ์— ๋”ฐ๋ผ ๊ฒฐ์ •ํ•œ๋‹ค.

๊ตฌ๋ถ„ KStream KTable
๋ ˆ์ฝ”๋“œ ํ•ด์„ INSERT (๋…๋ฆฝ ์ด๋ฒคํŠธ) UPDATE (์ตœ์‹  ์ƒํƒœ)
๊ฐ™์€ key ๋ชจ๋‘ ์œ ์ง€ ๋งˆ์ง€๋ง‰ ๊ฐ’๋งŒ ์œ ์ง€
State Store ๊ธฐ๋ณธ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š์Œ (stateful ์—ฐ์‚ฐ ์‹œ ์‚ฌ์šฉ) ์‚ฌ์šฉ
์ ํ•ฉํ•œ ๋ฐ์ดํ„ฐ ์ฃผ๋ฌธ, ํด๋ฆญ, ๋กœ๊ทธ ์‚ฌ์šฉ์ž ํ”„๋กœํ•„, ์žฌ๊ณ , ์„ค์ •๊ฐ’


KStream๊ณผ KTable์€ ์–ธ์ œ ์“ฐ๋Š”๊ฐ€

KStream์€ ๋ฐœ์ƒํ•˜๋Š” ์ด๋ฒคํŠธ๋ฅผ ๋น ์ง์—†์ด ์ฒ˜๋ฆฌํ•ด์•ผ ํ•  ๋•Œ ์ ํ•ฉํ•˜๋‹ค. ์ฃผ๋ฌธ, ํด๋ฆญ ๋กœ๊ทธ, ์„ผ์„œ ๋ฐ์ดํ„ฐ์ฒ˜๋Ÿผ ๊ฐœ๋ณ„ ๊ฑด ์ž์ฒด๊ฐ€ ์˜๋ฏธ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋Š” KStream์œผ๋กœ ์ฝ๋Š” ๊ฒƒ์ด ์ž์—ฐ์Šค๋Ÿฝ๋‹ค. ํ•œ ๊ฑด ํ•œ ๊ฑด์ด ๋…๋ฆฝ์ ์ด๋ฏ€๋กœ ํ•„ํ„ฐ๋ง, ๋ณ€ํ™˜, ๋ถ„๊ธฐ ๊ฐ™์€ Stateless ์—ฐ์‚ฐ์„ ๊ฑฐ์ณ ๋ฐ”๋กœ ๋‹ค์Œ ์ฒ˜๋ฆฌ๋กœ ๋„˜๊ธธ ์ˆ˜ ์žˆ๋‹ค.

KTable์€ ์ตœ์‹  ์ƒํƒœ๋ฅผ ๊ด€๋ฆฌํ•ด์•ผ ํ•  ๋•Œ ์ ํ•ฉํ•˜๋‹ค. ์‚ฌ์šฉ์ž ํ”„๋กœํ•„, ์ƒํ’ˆ ์žฌ๊ณ , ํ™˜์œจ์ฒ˜๋Ÿผ ๊ฐ’์ด ๊ฐฑ์‹ ๋˜๋ฉด ์ด์ „ ๊ฐ’์€ ์˜๋ฏธ๊ฐ€ ์—†์–ด์ง€๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ๋Œ€ํ‘œ์ ์ด๋‹ค. State Store์— key ๋‹จ์œ„๋กœ ์ตœ์‹  ๊ฐ’์„ ์œ ์ง€ํ•˜๋ฏ€๋กœ, ์‹ค์‹œ๊ฐ„ ์กฐํšŒ๊ฐ€ ํ•„์š”ํ•œ ์ฐธ์กฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๊ธฐ์— ์ข‹๋‹ค.

๋‘˜์˜ ์กฐํ•ฉ์€ ์ด๋ฒคํŠธ์— ์ตœ์‹  ์ƒํƒœ๋ฅผ ๊ฒฐํ•ฉํ•ด์•ผ ํ•  ๋•Œ ์ง„๊ฐ€๋ฅผ ๋ฐœํœ˜ํ•œ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์ฃผ๋ฌธ ์ด๋ฒคํŠธ(KStream)๊ฐ€ ๋“ค์–ด์˜ฌ ๋•Œ๋งˆ๋‹ค ํ•ด๋‹น ์‚ฌ์šฉ์ž์˜ ํ˜„์žฌ ๋“ฑ๊ธ‰(KTable)์„ ๋ถ™์—ฌ์„œ ๋“ฑ๊ธ‰๋ณ„ ํ• ์ธ์„ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. KStream ๋‹จ๋…์ด๋ผ๋ฉด ๋“ฑ๊ธ‰ ์ •๋ณด๋ฅผ ๋งค๋ฒˆ ์™ธ๋ถ€ ์‹œ์Šคํ…œ์— ์กฐํšŒํ•ด์•ผ ํ•˜๊ณ , KTable ๋‹จ๋…์ด๋ผ๋ฉด ์ด๋ฒคํŠธ์˜ ํ๋ฆ„ ์ž์ฒด๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†๋‹ค. KStream-KTable ์กฐ์ธ์€ ์ด๋ฒคํŠธ ํ๋ฆ„๊ณผ ์ตœ์‹  ์ƒํƒœ๋ฅผ Kafka ์•ˆ์—์„œ ๊ฒฐํ•ฉํ•˜๋ฏ€๋กœ, ์™ธ๋ถ€ ํ˜ธ์ถœ ์—†์ด๋„ ํ’๋ถ€ํ•œ ์‹ค์‹œ๊ฐ„ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.


GlobalKTable: ์ „์ฒด ํŒŒํ‹ฐ์…˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ์ปฌ์—

KTable์€ ํŒŒํ‹ฐ์…˜ ๋‹จ์œ„๋กœ ๋ถ„ํ• ๋œ๋‹ค. ์ฆ‰, ์ธ์Šคํ„ด์Šค A๋Š” ํŒŒํ‹ฐ์…˜ 0~1์˜ ๋ฐ์ดํ„ฐ๋งŒ ๊ฐ–๊ณ , ์ธ์Šคํ„ด์Šค B๋Š” ํŒŒํ‹ฐ์…˜ 2~3์˜ ๋ฐ์ดํ„ฐ๋งŒ ๊ฐ–๋Š”๋‹ค. ์ด ํŠน์„ฑ ๋•Œ๋ฌธ์— KStream๊ณผ KTable์„ ์กฐ์ธํ•˜๋ ค๋ฉด ๋‘ ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๊ฐ™๊ณ , ๊ฐ™์€ key๊ฐ€ ๊ฐ™์€ ํŒŒํ‹ฐ์…˜์— ์žˆ์–ด์•ผ ํ•œ๋‹ค. ์ด๋ฅผ co-partitioning์ด๋ผ ํ•˜๋ฉฐ, ์กฐ์ธ ๋Œ€์ƒ ํ† ํ”ฝ๋“ค์ด ๊ฐ™์€ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ๊ฐ™์€ ํŒŒํ‹ฐ์…”๋‹ ์ „๋žต์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๋œปํ•œ๋‹ค.

GlobalKTable์€ ์ด ์ œ์•ฝ์„ ์—†์•ค๋‹ค. ๋ชจ๋“  ์ธ์Šคํ„ด์Šค๊ฐ€ ํ† ํ”ฝ์˜ ์ „์ฒด ํŒŒํ‹ฐ์…˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ์ปฌ์— ๋ณต์ œํ•œ๋‹ค.

GlobalKTable<String, String> productTable = builder.globalTable("products");

GlobalKTable์€ co-partitioning ์—†์ด ์กฐ์ธ์ด ๊ฐ€๋Šฅํ•˜๊ณ , ์–ด๋–ค key๋กœ๋“  ๋กœ์ปฌ์—์„œ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋‹ค. ๋‹ค๋งŒ ํ† ํ”ฝ์˜ ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ ์ธ์Šคํ„ด์Šค๊ฐ€ ๋ชจ๋‘ ๊ฐ–๊ณ  ์žˆ์œผ๋ฏ€๋กœ, ๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์œผ๋ฉด ๋ฉ”๋ชจ๋ฆฌ์™€ ๋””์Šคํฌ ์‚ฌ์šฉ๋Ÿ‰์ด ์ปค์ง„๋‹ค. ์ƒํ’ˆ ๋งˆ์Šคํ„ฐ, ๊ตญ๊ฐ€ ์ฝ”๋“œ, ํ™˜์œจ ๊ฐ™์€ ์กฐํšŒ์šฉ ์ฐธ์กฐ ๋ฐ์ดํ„ฐ์— ์ ํ•ฉํ•˜๋‹ค.


Stateless ์—ฐ์‚ฐ

Stateless ์—ฐ์‚ฐ์€ ํ˜„์žฌ ๋ ˆ์ฝ”๋“œ๋งŒ ๋ณด๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๋‚ด๋Š” ์—ฐ์‚ฐ์ด๋‹ค. ์ด์ „ ๋ ˆ์ฝ”๋“œ์˜ ์ƒํƒœ๋ฅผ ๊ธฐ์–ตํ•  ํ•„์š”๊ฐ€ ์—†์œผ๋ฏ€๋กœ State Store๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค. ์•„๋ž˜ ์—ฐ์‚ฐ๋“ค์€ ์ฃผ๋กœ KStream์—์„œ ์‚ฌ์šฉํ•˜์ง€๋งŒ, filter์™€ mapValues๋Š” KTable์—์„œ๋„ ๋™์ผํ•˜๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

filter

์กฐ๊ฑด์— ๋งž๋Š” ๋ ˆ์ฝ”๋“œ๋งŒ ํ†ต๊ณผ์‹œํ‚จ๋‹ค.

KStream<String, OrderEvent> highValueOrders = orderStream
    .filter((key, event) -> event.getAmount() >= 10000);

map / mapValues

๋ ˆ์ฝ”๋“œ์˜ key์™€ value๋ฅผ ๋ณ€ํ™˜ํ•œ๋‹ค. mapValues๋Š” key๋ฅผ ๊ทธ๋Œ€๋กœ ๋‘๊ณ  value๋งŒ ๋ณ€ํ™˜ํ•˜๋ฏ€๋กœ, ์žฌํŒŒํ‹ฐ์…”๋‹์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š”๋‹ค.

// key๋Š” ์œ ์ง€, value๋งŒ ๋ณ€ํ™˜
KStream<String, String> summaries = orderStream
    .mapValues(event -> event.getItem() + ":" + event.getAmount());

// key์™€ value ๋ชจ๋‘ ๋ณ€ํ™˜ (์žฌํŒŒํ‹ฐ์…”๋‹ ๋ฐœ์ƒ ๊ฐ€๋Šฅ)
KStream<String, Integer> amountByItem = orderStream
    .map((key, event) -> KeyValue.pair(event.getItem(), event.getAmount()));

map์œผ๋กœ key๋ฅผ ๋ณ€๊ฒฝํ•œ ๋’ค groupByKey๋‚˜ join ๊ฐ™์€ key ๊ธฐ๋ฐ˜ ์—ฐ์‚ฐ์ด ๋’ค๋”ฐ๋ฅด๋ฉด, Kafka Streams๊ฐ€ ๋‚ด๋ถ€์ ์œผ๋กœ repartition ํ† ํ”ฝ์„ ์ƒ์„ฑํ•ด์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์žฌ๋ถ„๋ฐฐํ•œ๋‹ค. repartition ํ† ํ”ฝ์€ ์ƒˆ key ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์˜ฌ๋ฐ”๋ฅธ ํŒŒํ‹ฐ์…˜์— ๋‹ค์‹œ ๋ฐฐ์น˜ํ•˜๋Š” ์ค‘๊ฐ„ ํ† ํ”ฝ์ด๋‹ค. key ๋ณ€๊ฒฝ์ด ๋ถˆํ•„์š”ํ•˜๋‹ค๋ฉด mapValues๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์„ฑ๋Šฅ์ƒ ์œ ๋ฆฌํ•˜๋‹ค.

repartition์ด ๋ฐœ์ƒํ•˜๋ฉด ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๊ฐ€ ์ƒˆ key ๊ธฐ์ค€์œผ๋กœ ์ง๋ ฌํ™”๋˜์–ด repartition ํ† ํ”ฝ์— ์“ฐ์ธ ๋’ค, ๋‹ค์‹œ ์ฝํ˜€์„œ ๋‹ค์Œ ํ”„๋กœ์„ธ์„œ๋กœ ์ „๋‹ฌ๋œ๋‹ค. ์ด ๊ณผ์ •์—์„œ ๋„คํŠธ์›Œํฌ I/O์™€ ๋””์Šคํฌ I/O๊ฐ€ ์ถ”๊ฐ€๋กœ ๋ฐœ์ƒํ•˜๊ณ , repartition ํ† ํ”ฝ์€ ์ž…๋ ฅ ํ† ํ”ฝ๊ณผ ๊ฐ™์€ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋กœ ์ƒ์„ฑ๋˜๋ฏ€๋กœ ๋ธŒ๋กœ์ปค์˜ ๋””์Šคํฌ ์‚ฌ์šฉ๋Ÿ‰๋„ ๋Š˜์–ด๋‚œ๋‹ค. map, selectKey, groupBy๋กœ key๋ฅผ ๋ณ€๊ฒฝํ•œ ๋’ค key ๊ธฐ๋ฐ˜ ์—ฐ์‚ฐ(groupByKey, join ๋“ฑ)์ด ๋’ค๋”ฐ๋ฅด๋ฉด repartition์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค. key ๋ณ€๊ฒฝ์ด ํ•„์š” ์—†๋‹ค๋ฉด mapValues์™€ groupByKey๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์•ˆ์ „ํ•˜๋‹ค.

// repartition ๋ฐœ์ƒ: selectKey๋กœ key๋ฅผ ๋ณ€๊ฒฝํ•œ ๋’ค groupByKey
orderStream
    .selectKey((key, event) -> event.getItem())
    .groupByKey()
    .count();

// repartition ์—†์Œ: groupBy ๋Œ€์‹  ๊ธฐ์กด key๋ฅผ ์œ ์ง€
orderStream
    .groupByKey()
    .count();

flatMapValues

ํ•˜๋‚˜์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ๋กœ ํŽผ์นœ๋‹ค. 1ํŽธ์—์„œ ๋ณธ WordCount์˜ ๋‹จ์–ด ๋ถ„๋ฆฌ๊ฐ€ ๋Œ€ํ‘œ์ ์ธ ์˜ˆ๋‹ค.

KStream<String, String> words = textLines
    .flatMapValues(line -> Arrays.asList(line.split("\\W+")));

branch

์กฐ๊ฑด์— ๋”ฐ๋ผ ์ŠคํŠธ๋ฆผ์„ ์—ฌ๋Ÿฌ ๊ฐˆ๋ž˜๋กœ ๋ถ„๊ธฐํ•œ๋‹ค.

@SuppressWarnings("unchecked")
KStream<String, OrderEvent>[] branches = orderStream.branch(
    (key, event) -> event.getAmount() >= 50000,  // [0] ๊ณ ๊ฐ€ ์ฃผ๋ฌธ
    (key, event) -> event.getAmount() >= 10000,   // [1] ์ค‘๊ฐ€ ์ฃผ๋ฌธ
    (key, event) -> true                           // [2] ๊ทธ ์™ธ
);

KStream<String, OrderEvent> premiumOrders = branches[0];
KStream<String, OrderEvent> standardOrders = branches[1];
KStream<String, OrderEvent> budgetOrders = branches[2];

branch์˜ ์กฐ๊ฑด์€ ์ˆœ์„œ๋Œ€๋กœ ํ‰๊ฐ€๋˜๋ฉฐ, ์ฒซ ๋ฒˆ์งธ๋กœ ์ผ์น˜ํ•˜๋Š” ๋ถ„๊ธฐ๋กœ ๋ผ์šฐํŒ…๋œ๋‹ค. ๋งˆ์ง€๋ง‰ ์กฐ๊ฑด์„ (key, event) -> true๋กœ ๋‘๋ฉด ์•ž์˜ ์กฐ๊ฑด์— ๊ฑธ๋ฆฌ์ง€ ์•Š์€ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๋ฅผ ์žก์•„๋‚ผ ์ˆ˜ ์žˆ๋‹ค. branch๋Š” ๋ฐฐ์—ด ๋ฐ˜ํ™˜ ๋ฐฉ์‹์ด๋ผ ํƒ€์ž… ์•ˆ์ „์„ฑ์ด ๋–จ์–ด์ง€๋Š”๋ฐ, Kafka 2.8 ์ดํ›„์—๋Š” split().branch().defaultBranch()๋กœ ๋Œ€์ฒดํ•  ์ˆ˜ ์žˆ๋‹ค.


Stateful ์—ฐ์‚ฐ

Stateful ์—ฐ์‚ฐ์€ ์—ฌ๋Ÿฌ ๋ ˆ์ฝ”๋“œ์— ๊ฑธ์ณ ์ƒํƒœ๋ฅผ ์œ ์ง€ํ•˜๋ฉฐ ๊ฒฐ๊ณผ๋ฅผ ๋ˆ„์ ํ•˜๋Š” ์—ฐ์‚ฐ์ด๋‹ค. State Store(๊ธฐ๋ณธ RocksDB)์— ์ค‘๊ฐ„ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๊ณ , changelog ํ† ํ”ฝ์— ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๊ธฐ๋กํ•ด ์žฅ์•  ๋ณต๊ตฌ๋ฅผ ๋ณด์žฅํ•œ๋‹ค.

KStream์—์„œ groupByKey๋‚˜ groupBy๋กœ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋ฌถ์œผ๋ฉด KGroupedStream์ด ๋˜๊ณ , ์—ฌ๊ธฐ์— count, reduce, aggregate ๊ฐ™์€ ์ง‘๊ณ„๋ฅผ ์ ์šฉํ•˜๋ฉด ๊ฒฐ๊ณผ๊ฐ€ KTable๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

groupByKey vs groupBy

// key๋ฅผ ๋ณ€๊ฒฝํ•˜์ง€ ์•Š์„ ๋•Œ (์žฌํŒŒํ‹ฐ์…”๋‹ ์—†์Œ)
KGroupedStream<String, OrderEvent> groupedByUser = orderStream.groupByKey();

// key๋ฅผ ๋ณ€๊ฒฝํ•  ๋•Œ (์žฌํŒŒํ‹ฐ์…”๋‹ ๋ฐœ์ƒ)
KGroupedStream<String, OrderEvent> groupedByItem = orderStream
    .groupBy((key, event) -> event.getItem());

groupByKey๋Š” ํ˜„์žฌ key๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ, ์›๋ž˜ key๊ฐ€ ์œ ์ง€๋œ ๊ฒฝ์šฐ์—๋Š” ์žฌํŒŒํ‹ฐ์…”๋‹์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š”๋‹ค. ๋‹ค๋งŒ ์—…์ŠคํŠธ๋ฆผ์—์„œ selectKey๋‚˜ map์œผ๋กœ key๋ฅผ ์ด๋ฏธ ๋ณ€๊ฒฝํ•œ ์ƒํƒœ๋ผ๋ฉด groupByKey()๋„ ๋‚ด๋ถ€์ ์œผ๋กœ repartition์ด ํ•„์š”ํ•˜๋‹ค. groupBy๋Š” ์ƒˆ key๋ฅผ ์ง€์ •ํ•˜๋ฏ€๋กœ ๋‚ด๋ถ€์ ์œผ๋กœ repartition ํ† ํ”ฝ์ด ์ƒ๊ธด๋‹ค. ๊ฐ€๋Šฅํ•˜๋ฉด groupByKey๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ๋„คํŠธ์›Œํฌ ๋น„์šฉ๊ณผ ์ฒ˜๋ฆฌ ์ง€์—ฐ์„ ์ค„์ผ ์ˆ˜ ์žˆ๋‹ค.

count

key๋ณ„ ๋ ˆ์ฝ”๋“œ ์ˆ˜๋ฅผ ์„ผ๋‹ค. ๊ฒฐ๊ณผ๋Š” KTable๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

KTable<String, Long> orderCountByUser = orderStream
    .groupByKey()
    .count();

reduce

๊ฐ™์€ ํƒ€์ž…์˜ ๋‘ ๊ฐ’์„ ํ•˜๋‚˜๋กœ ํ•ฉ์นœ๋‹ค. ์ง‘๊ณ„ ๊ฒฐ๊ณผ์˜ ํƒ€์ž…์ด ์ž…๋ ฅ ๋ ˆ์ฝ”๋“œ์˜ value ํƒ€์ž…๊ณผ ๊ฐ™์•„์•ผ ํ•œ๋‹ค.

// ์‚ฌ์šฉ์ž๋ณ„ ์ตœ๋Œ€ ์ฃผ๋ฌธ ๊ธˆ์•ก
KTable<String, OrderEvent> maxOrderByUser = orderStream
    .groupByKey()
    .reduce((aggValue, newValue) ->
        aggValue.getAmount() >= newValue.getAmount() ? aggValue : newValue
    );

aggregate

reduce๋ณด๋‹ค ์œ ์—ฐํ•œ ์ง‘๊ณ„ ์—ฐ์‚ฐ์ด๋‹ค. ์ดˆ๊ธฐ๊ฐ’(initializer)๊ณผ ์ง‘๊ณ„ ํ•จ์ˆ˜(aggregator)๋ฅผ ๋ฐ›๊ณ , ๊ฒฐ๊ณผ ํƒ€์ž…์ด ์ž…๋ ฅ๊ณผ ๋‹ฌ๋ผ๋„ ๋œ๋‹ค.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Materialized;

KTable<String, Long> totalAmountByUser = orderStream
    .groupByKey()
    .aggregate(
        () -> 0L,  // ์ดˆ๊ธฐ๊ฐ’
        (key, event, currentTotal) -> currentTotal + event.getAmount(),
        Materialized.with(Serdes.String(), Serdes.Long())
    );

Materialized.with(...)๋กœ State Store์˜ key/value Serdes๋ฅผ ์ง€์ •ํ•œ๋‹ค. ๊ฒฐ๊ณผ ํƒ€์ž…์ด ์ž…๋ ฅ๊ณผ ๋‹ค๋ฅด๊ธฐ ๋•Œ๋ฌธ์—(OrderEvent โ†’ Long) Serdes๋ฅผ ๋ช…์‹œํ•ด์•ผ ํ•œ๋‹ค.


KStream-KTable ์กฐ์ธ

์ฃผ๋ฌธ ์ด๋ฒคํŠธ(KStream)์— ์‚ฌ์šฉ์ž ๋“ฑ๊ธ‰ ์ •๋ณด(KTable)๋ฅผ ๋ถ™์ด๋Š” ๊ฒƒ์€ ์‹ค๋ฌด์—์„œ ๋งค์šฐ ํ”ํ•œ ํŒจํ„ด์ด๋‹ค. KStream-KTable ์กฐ์ธ์€ ์ŠคํŠธ๋ฆผ์˜ ๊ฐ ๋ ˆ์ฝ”๋“œ์— ๋Œ€ํ•ด ๊ฐ™์€ key๋ฅผ ๊ฐ€์ง„ KTable์˜ ํ˜„์žฌ ๊ฐ’์„ ์ฐพ์•„ ๊ฒฐํ•ฉํ•œ๋‹ค.

StreamsBuilder builder = new StreamsBuilder();

KStream<String, OrderEvent> orders = builder.stream(
    "order-events",
    Consumed.with(Serdes.String(), orderEventSerde)
);

KTable<String, String> userGrades = builder.table(
    "user-grades",
    Consumed.with(Serdes.String(), Serdes.String())
);

// ์ฃผ๋ฌธ ์ด๋ฒคํŠธ์— ์‚ฌ์šฉ์ž ๋“ฑ๊ธ‰์„ ๋ถ™์ธ๋‹ค
KStream<String, EnrichedOrder> enrichedOrders = orders.join(
    userGrades,
    (order, grade) -> new EnrichedOrder(
        order.getOrderId(),
        order.getItem(),
        order.getAmount(),
        grade  // KTable์—์„œ ๊ฐ€์ ธ์˜จ ํ˜„์žฌ ๋“ฑ๊ธ‰
    )
);

enrichedOrders.to("enriched-orders",
    Produced.with(Serdes.String(), enrichedOrderSerde));

์ด ์กฐ์ธ์ด ๋™์ž‘ํ•˜๋ ค๋ฉด order-events์™€ user-grades ํ† ํ”ฝ์˜ key๊ฐ€ ๋™์ผํ•œ ๊ธฐ์ค€(์˜ˆ: userId)์ด๊ณ , ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๊ฐ™์•„์•ผ ํ•œ๋‹ค(co-partitioning).

KStream-KTable ์กฐ์ธ์€ inner join์ด ๊ธฐ๋ณธ์ด๋‹ค. ์ฆ‰, KTable์— ํ•ด๋‹น key๊ฐ€ ์—†์œผ๋ฉด ๊ทธ ์ŠคํŠธ๋ฆผ ๋ ˆ์ฝ”๋“œ๋Š” ๊ฒฐ๊ณผ์— ํฌํ•จ๋˜์ง€ ์•Š๋Š”๋‹ค. ๋ชจ๋“  ์ฃผ๋ฌธ ์ด๋ฒคํŠธ๋ฅผ ๊ฒฐ๊ณผ์— ํฌํ•จํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด leftJoin์„ ์‚ฌ์šฉํ•œ๋‹ค.

KStream<String, EnrichedOrder> enrichedOrders = orders.leftJoin(
    userGrades,
    (order, grade) -> new EnrichedOrder(
        order.getOrderId(),
        order.getItem(),
        order.getAmount(),
        grade != null ? grade : "UNKNOWN"
    )
);

leftJoin์—์„œ KTable์— ๋งค์นญ๋˜๋Š” ๊ฐ’์ด ์—†์œผ๋ฉด grade๊ฐ€ null๋กœ ์ „๋‹ฌ๋œ๋‹ค.

๋งŒ์•ฝ co-partitioning ์ œ์•ฝ์ด ๋ฌธ์ œ๊ฐ€ ๋œ๋‹ค๋ฉด GlobalKTable์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. GlobalKTable ์กฐ์ธ์—์„œ๋Š” KeyValueMapper๋กœ ์กฐ์ธ key๋ฅผ ์ง์ ‘ ์ง€์ •ํ•˜๋ฏ€๋กœ ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๋‹ฌ๋ผ๋„ ๋™์ž‘ํ•œ๋‹ค.

GlobalKTable<String, String> productTable = builder.globalTable("products");

KStream<String, EnrichedOrder> withProduct = orders.join(
    productTable,
    (orderKey, orderValue) -> orderValue.getItem(),  // products ํ† ํ”ฝ์˜ key์™€ ๋งค์นญํ•  ๊ฐ’
    (order, productInfo) -> new EnrichedOrder(order, productInfo)
);


KTable์˜ ์ค‘๊ฐ„ ๊ฐฑ์‹ ๊ณผ suppress

KTable์€ key์˜ ๊ฐ’์ด ๊ฐฑ์‹ ๋˜๋ฉด ๋‹ค์šด์ŠคํŠธ๋ฆผ(downstream) ํ”„๋กœ์„ธ์„œ์— ๋ณ€๊ฒฝ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋‚ด๋ณด๋‚ธ๋‹ค. ์‹ค์ œ๋กœ๋Š” Kafka Streams์˜ Record Cache(statestore.cache.max.bytes, ๊ธฐ๋ณธ 10MB)๊ฐ€ ๊ฐ™์€ key์— ๋Œ€ํ•œ ์—ฐ์† ๊ฐฑ์‹ ์„ ๋ณ‘ํ•ฉํ•ด์„œ ์บ์‹œ๊ฐ€ ํ”Œ๋Ÿฌ์‹œ๋  ๋•Œ ๋งˆ์ง€๋ง‰ ๊ฐ’๋งŒ ์ „๋‹ฌํ•˜๋ฏ€๋กœ, ๋ชจ๋“  ์ค‘๊ฐ„ ๊ฐ’์ด ๊ทธ๋Œ€๋กœ ๋‚˜๊ฐ€์ง€๋Š” ์•Š๋Š”๋‹ค. ๊ทธ๋Ÿผ์—๋„ ์บ์‹œ ํ”Œ๋Ÿฌ์‹œ ์ฃผ๊ธฐ์— ๋”ฐ๋ผ ์ƒ๋‹น์ˆ˜์˜ ์ค‘๊ฐ„ ๊ฐฑ์‹ ์ด ๋‹ค์šด์ŠคํŠธ๋ฆผ์œผ๋กœ ์ „๋‹ฌ๋  ์ˆ˜ ์žˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด count()๋กœ ๋‹จ์–ด ์นด์šดํŠธ๋ฅผ ๊ตฌํ•˜๋ฉด, ๊ฐ™์€ ๋‹จ์–ด๊ฐ€ ๋“ค์–ด์˜ฌ ๋•Œ๋งˆ๋‹ค ์นด์šดํŠธ๊ฐ€ 1, 2, 3, โ€ฆ ์œผ๋กœ ๊ฐฑ์‹ ๋˜๋ฉฐ ์บ์‹œ ํ”Œ๋Ÿฌ์‹œ ์‹œ์ ๋งˆ๋‹ค ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๊ฐ€ ์ถœ๋ ฅ๋œ๋‹ค.

์œˆ๋„์šฐ ์ง‘๊ณ„์—์„œ๋Š” ์ด ์ค‘๊ฐ„ ๊ฐฑ์‹ ์ด ํŠนํžˆ ๋ฌธ์ œ๊ฐ€ ๋œ๋‹ค. 5๋ถ„ ์œˆ๋„์šฐ ์•ˆ์—์„œ ์ˆ˜์ฒœ ๊ฑด์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ์บ์‹œ๊ฐ€ ์ค„์—ฌ์ฃผ๋”๋ผ๋„ ์ƒ๋‹น์ˆ˜์˜ ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๊ฐ€ ์ถœ๋ ฅ ํ† ํ”ฝ์— ์“ฐ์ธ๋‹ค. ์ตœ์ข… ๊ฒฐ๊ณผ๋งŒ ํ•„์š”ํ•˜๋‹ค๋ฉด suppress๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;

KTable<Windowed<String>, Long> finalCounts = orders
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(
            Duration.ofMinutes(5), Duration.ofMinutes(1)))
    .count()
    .suppress(
        Suppressed.untilWindowCloses(
            Suppressed.BufferConfig.unbounded()));

suppress(untilWindowCloses(unbounded()))๋Š” ์œˆ๋„์šฐ๊ฐ€ ๋‹ซํž ๋•Œ๊นŒ์ง€ ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๋ฅผ ๋ฒ„ํผ์— ๋ณด๊ด€ํ•˜๋‹ค๊ฐ€, ์œˆ๋„์šฐ๊ฐ€ ๋‹ซํžˆ๋Š” ์‹œ์ ์— ์ตœ์ข… ๊ฒฐ๊ณผ ํ•˜๋‚˜๋งŒ ๋‚ด๋ณด๋‚ธ๋‹ค.

unbounded()๋Š” ๋ฒ„ํผ ํฌ๊ธฐ์— ์ œํ•œ์„ ๋‘์ง€ ์•Š๋Š”๋‹ค๋Š” ์˜๋ฏธ๋‹ค. key ์นด๋””๋„๋ฆฌํ‹ฐ๊ฐ€ ๋†’๊ฑฐ๋‚˜ ์œˆ๋„์šฐ๊ฐ€ ๊ธธ๋ฉด ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ์ปค์งˆ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ, ์ƒํ•œ์ด ํ•„์š”ํ•˜๋‹ค๋ฉด maxRecords๋‚˜ maxBytes๋กœ ์ œํ•œ๋œ ๋ฒ„ํผ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

Suppressed.BufferConfig
    .maxRecords(10000)
    .shutDownWhenFull()

shutDownWhenFull()์€ ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด ์ŠคํŠธ๋ฆผ์„ ์ข…๋ฃŒํ•˜๊ณ , emitEarlyWhenFull()์€ ๋ฒ„ํผ๊ฐ€ ์ฐจ๋ฉด ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๋ฅผ ๋‚ด๋ณด๋‚ด๋ฉด์„œ ๊ณ„์† ๋™์ž‘ํ•œ๋‹ค.


DSL๊ณผ Processor API ํ˜ผํ•ฉ ์‚ฌ์šฉ

1ํŽธ์—์„œ Streams DSL๊ณผ Processor API์˜ ์ฐจ์ด๋ฅผ ๋‹ค๋ค˜๋‹ค. ๋Œ€๋ถ€๋ถ„์˜ ๊ฒฝ์šฐ DSL๋งŒ์œผ๋กœ ์ถฉ๋ถ„ํ•˜์ง€๋งŒ, DSL ์ฒด์ธ ์ค‘๊ฐ„์— State Store์— ์ง์ ‘ ์ ‘๊ทผํ•˜๊ฑฐ๋‚˜ ์กฐ๊ฑด๋ถ€ ๋ผ์šฐํŒ…์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋‹ค.

process()๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด DSL ์ฒด์ธ ์•ˆ์—์„œ Processor API์˜ Processor๋ฅผ ๋ผ์›Œ ๋„ฃ์„ ์ˆ˜ ์žˆ๋‹ค.

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;

StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("dedup-store"),
        Serdes.String(),
        Serdes.Long()
    );

builder.addStateStore(storeBuilder);

orderStream.process(() -> new Processor<
        String, OrderEvent, String, OrderEvent>() {

    private KeyValueStore<String, Long> store;
    private ProcessorContext<String, OrderEvent> context;

    @Override
    public void init(
            ProcessorContext<String, OrderEvent> ctx) {
        this.context = ctx;
        this.store = ctx.getStateStore("dedup-store");
    }

    @Override
    public void process(
            Record<String, OrderEvent> record) {
        String orderId = record.value().getOrderId();
        if (store.get(orderId) != null) {
            return;
        }
        store.put(orderId, record.timestamp());
        context.forward(record);
    }
}, "dedup-store");

์ด ์˜ˆ์‹œ๋Š” ์ฃผ๋ฌธ ID ๊ธฐ์ค€์œผ๋กœ ์ค‘๋ณต ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ฑธ๋Ÿฌ๋‚ด๋Š” ํŒจํ„ด์ด๋‹ค. State Store์— ์ด๋ฏธ ์ฒ˜๋ฆฌํ•œ ์ฃผ๋ฌธ ID๋ฅผ ๊ธฐ๋กํ•ด ๋‘๊ณ , ๊ฐ™์€ ID๊ฐ€ ๋‹ค์‹œ ๋“ค์–ด์˜ค๋ฉด ๋‹ค์šด์ŠคํŠธ๋ฆผ์œผ๋กœ ์ „๋‹ฌํ•˜์ง€ ์•Š๋Š”๋‹ค. process()์˜ ๋งˆ์ง€๋ง‰ ์ธ์ž๋กœ ์ ‘๊ทผํ•  State Store ์ด๋ฆ„์„ ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.


Serdes ์„ค์ •๊ณผ ์ง๋ ฌํ™”

Kafka Streams์—์„œ ๋ ˆ์ฝ”๋“œ์˜ key์™€ value๋ฅผ ์ง๋ ฌํ™”/์—ญ์ง๋ ฌํ™”ํ•˜๋Š” ๊ฒƒ์ด Serdes๋‹ค. Serdes๋Š” Serializer์™€ Deserializer๋ฅผ ๋ฌถ์€ ๋ž˜ํผ ํด๋ž˜์Šค์ด๋ฉฐ, Kafka Streams์˜ ๋ชจ๋“  ์ž…์ถœ๋ ฅ ์ง€์ ์—์„œ ์‚ฌ์šฉ๋œ๋‹ค.

๊ธฐ๋ณธ ์ œ๊ณต๋˜๋Š” Serdes๋Š” Serdes.String(), Serdes.Long(), Serdes.Integer(), Serdes.ByteArray() ๋“ฑ์ด ์žˆ๋‹ค. JSON ๊ฐ™์€ ์ปค์Šคํ…€ ํฌ๋งท์€ ์ง์ ‘ Serdes๋ฅผ ๋งŒ๋“ค์–ด์•ผ ํ•œ๋‹ค.

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class JsonSerde<T> implements Serde<T> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Class<T> targetType;

    public JsonSerde(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public Serializer<T> serializer() {
        return (topic, data) -> {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("JSON ์ง๋ ฌํ™” ์‹คํŒจ. topic=" + topic, e);
            }
        };
    }

    @Override
    public Deserializer<T> deserializer() {
        return (topic, data) -> {
            try {
                return objectMapper.readValue(data, targetType);
            } catch (Exception e) {
                throw new SerializationException("JSON ์—ญ์ง๋ ฌํ™” ์‹คํŒจ. topic=" + topic, e);
            }
        };
    }
}

์ด Serdes๋ฅผ ์ŠคํŠธ๋ฆผ ์—ฐ์‚ฐ์—์„œ ์‚ฌ์šฉํ•  ๋•Œ๋Š” Consumed, Produced, Materialized ๋“ฑ์— ์ „๋‹ฌํ•œ๋‹ค.

Serde<OrderEvent> orderEventSerde = new JsonSerde<>(OrderEvent.class);

KStream<String, OrderEvent> orders = builder.stream(
    "order-events",
    Consumed.with(Serdes.String(), orderEventSerde)
);

Serdes๋ฅผ ์ง€์ •ํ•˜์ง€ ์•Š์œผ๋ฉด StreamsConfig์— ์„ค์ •ํ•œ DEFAULT_KEY_SERDE_CLASS_CONFIG์™€ DEFAULT_VALUE_SERDE_CLASS_CONFIG๊ฐ€ ์‚ฌ์šฉ๋œ๋‹ค. ๊ธฐ๋ณธ Serdes์™€ ์‹ค์ œ ๋ฐ์ดํ„ฐ ํƒ€์ž…์ด ๋งž์ง€ ์•Š์œผ๋ฉด ๋Ÿฐํƒ€์ž„์— ClassCastException์ด ๋ฐœ์ƒํ•˜๋ฏ€๋กœ, ํƒ€์ž…์ด ๋‹ค๋ฅธ ์—ฐ์‚ฐ์—์„œ๋Š” Serdes๋ฅผ ๋ช…์‹œ์ ์œผ๋กœ ์ง€์ •ํ•˜๋Š” ๊ฒƒ์ด ์•ˆ์ „ํ•˜๋‹ค.

ํŠนํžˆ aggregate๋‚˜ count ๊ฐ™์€ stateful ์—ฐ์‚ฐ์—์„œ ๊ฒฐ๊ณผ ํƒ€์ž…์ด ๋ฐ”๋€Œ๋Š” ๊ฒฝ์šฐ Materialized.with(...)๋กœ Serdes๋ฅผ ๋ฐ˜๋“œ์‹œ ์ง€์ •ํ•ด์•ผ ํ•œ๋‹ค. ์ด๋ฅผ ๋น ๋œจ๋ฆฌ๋ฉด State Store์— ๊ฐ’์„ ์“ฐ๊ฑฐ๋‚˜ ์ฝ์„ ๋•Œ ์ง๋ ฌํ™” ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.


๋‹ค์Œ ๊ธ€์—์„œ ๋‹ค๋ฃฐ ๋‚ด์šฉ

์ด ๊ธ€์—์„œ๋Š” KStream๊ณผ KTable์˜ ์ฐจ์ด, Stateless/Stateful ์—ฐ์‚ฐ, ์กฐ์ธ, Serdes ์„ค์ •์„ ๋‹ค๋ค˜๋‹ค. ๋‹ค์Œ ๊ธ€์—์„œ๋Š” ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฌถ๋Š” ์œˆ๋„์šฐ(Tumbling, Hopping, Session)์™€ KStream-KStream ์‹œ๊ฐ„ ์กฐ์ธ, Grace Period๋ฅผ ๋‹ค๋ฃฌ๋‹ค.


์ฐธ๊ณ