5.2. RSocketRequester

RSocketRequester๋Š” RSocket ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๋Šฅ์ˆ™ํ•œ API ์ œ๊ณตํ•˜์—ฌ, ํ•˜์œ„ ์ˆ˜์ค€ ๋ฐ์ดํ„ฐ ๋ฒ„ํผ ๋Œ€์‹ ์— ๋ฐ์ดํ„ฐ์™€ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ๊ฐ์ฒด๋ฅผ ๋ฐ›๊ณ  ๋ฐ˜ํ™˜ํ•œ๋‹ค. ํด๋ผ์ด์–ธํŠธ ์š”์ฒญ๊ณผ ์„œ๋ฒ„ ์š”์ฒญ ๋‘˜๋‹ค ์ž‘์„ฑํ•˜๊ธฐ ์œ„ํ•ด RSocketRequest๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.


5.2.1. Client Requester

ํด๋ผ์ด์–ธํŠธ ์ธก์—์„œ RSocketRequester๋ฅผ ์–ป์œผ๋ ค๋ฉด ์„œ๋ฒ„์— ์ปค๋„ฅ์…˜์„ ์š”์ฒญํ•˜๊ณ  RSocket SETUP ํ”„๋ ˆ์ž„์„ ์ค€๋น„ํ•˜์—ฌ ์ „์†กํ•ด์•ผ ํ•œ๋‹ค. RSocketRequester๋Š” ์ด๋ฅผ ์œ„ํ•œ ๋นŒ๋”๋ฅผ ์ œ๊ณตํ•œ๋‹ค. ๋‚ด๋ถ€์ ์œผ๋กœ io.rsocket.core.RSocketConnector๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ๋‹ค.

์•„๋ž˜๋Š” ๋””ํดํŠธ ์„ค์ •์œผ๋กœ ์ปค๋„ฅ์…˜์„ ๋งบ๋Š” ๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ•์ด๋‹ค:

Java:

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .connectTcp("localhost", 7000);

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .connectWebSocket(URI.create("https://example.org:8080/rsocket"));

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait
import org.springframework.messaging.rsocket.connectWebSocketAndAwait

val requester = RSocketRequester.builder()
        .connectTcpAndAwait("localhost", 7000)

val requester = RSocketRequester.builder()
        .connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))

์œ„ ์˜ˆ์ œ ์ฝ”๋“œ๋Š” ๋ฐ”๋กœ ์ปค๋„ฅ์…˜์„ ๋งบ์ง€ ์•Š๊ณ  ์—ฐ๊ธฐํ•œ๋‹ค. ์‹ค์ œ๋กœ ์ปค๋„ฅ์…˜์„ ๋งบ๊ณ  requester๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•˜๋ผ:

Java:

// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
    .subscribe(requester -> {
        // ...
    });

// Or block
RSocketRequester requester = RSocketRequester.builder()
    .connectTcp("localhost", 7000)
    .block(Duration.ofSeconds(5));

Kotlin:

// Connect asynchronously
import org.springframework.messaging.rsocket.connectTcpAndAwait

class MyService {

    private var requester: RSocketRequester? = null

    private suspend fun requester() = requester ?:
        RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }

    suspend fun doSomething() = requester().route(...)
}

// Or block
import org.springframework.messaging.rsocket.connectTcpAndAwait

class MyService {

    private val requester = runBlocking {
        RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
    }

    suspend fun doSomething() = requester.route(...)
}


์ปค๋„ฅ์…˜ ์„ค์ •(Connection Setup)

RSocketRequester.Builder๋Š” ์ดˆ๊ธฐ SETUP ํ”„๋ ˆ์ž„์„ ์ปค์Šคํ…€ํ•˜๊ธฐ ์œ„ํ•ด ๋‹ค์Œ์„ ์ œ๊ณตํ•œ๋‹ค:

  • dataMimeType(MimeType) - ์ปค๋„ฅ์…˜์„ ํ†ตํ•˜๋Š” ๋ฐ์ดํ„ฐ์˜ mime ํƒ€์ž… ์„ค์ •
  • metadataMimeType(MimeType) - ์ปค๋„ฅ์…˜์„ ํ†ตํ•˜๋Š” ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์˜ mime ํƒ€์ž… ์„ค์ •
  • setupData(Object) - SETUP ํ”„๋ ˆ์ž„์— ํฌํ•จํ•  ๋ฐ์ดํ„ฐ ์„ค์ •
  • setupRoute(String, Object...) - SETUP ํ”„๋ ˆ์ž„์— ํฌํ•จ๋  ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋ฅผ ๋ผ์šฐํŒ…
  • setupMetadata(Object, MimeType) - SETUP ํ”„๋ ˆ์ž„์— ํฌํ•จ๋  ๋‹ค๋ฅธ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ

๋ฐ์ดํ„ฐ์˜ ๊ธฐ๋ณธ mime ํƒ€์ž…์€ ์ฒ˜์Œ ์„ค์ •๋œ Decoder๋กœ ๊ฒฐ์ •๋œ๋‹ค. ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ ๊ธฐ๋ณธ mime ํƒ€์ž…์€ ์š”์ฒญ ๋‹น ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์™€ mime ํƒ€์ž…์„ ์—ฌ๋Ÿฌ๊ฐœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” composite metadata๋‹ค. ์ผ๋ฐ˜์ ์œผ๋กœ ๋‘˜ ๋‹ค ๋ณ€๊ฒฝํ•  ํ•„์š”๋Š” ์—†๋‹ค.

SETUP ํ”„๋ ˆ์ž„์˜ ๋ฐ์ดํ„ฐ์™€ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋Š” ์„ ํƒ์‚ฌํ•ญ์ด๋‹ค. ์„œ๋ฒ„ ์ธก์—์„œ @ConnectionMapping ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ปค๋„ฅ์…˜์˜ ์‹œ์ž‘๊ณผ SETUP ํ”„๋ ˆ์ž„์˜ ์ปจํ…์ธ ๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค. ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋Š” ์ปค๋„ฅ์…˜ ๋ ˆ๋ฒจ ๋ณด์•ˆ์— ์‚ฌ์šฉ๋  ์ˆ˜ ์žˆ๋‹ค.


์ „๋žต(Strategies)

RSocketRequster.Builder๋Š” RSocketStrategies๋ฅผ ๋ฐ›์•„ requester๋ฅผ ์„ค์ •ํ•œ๋‹ค. ์ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ์™€ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ๊ฐ’์„ (์—ญ)์ง๋ ฌํ™”ํ•  ์ธ์ฝ”๋”์™€ ๋””์ฝ”๋”๋ฅผ ์ œ๊ณตํ•œ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ spring-core์— ์žˆ๋Š” String, byte[], ByteBuffer์— ๋Œ€ํ•œ ์ฝ”๋ฑ๋งŒ ๊ธฐ๋ณธ๊ฐ’์œผ๋กœ ๋“ฑ๋ก๋œ๋‹ค. spring-web ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ฝ”๋ฑ์„ ์ถ”๊ฐ€ ๋“ฑ๋กํ•  ์ˆ˜ ์žˆ๋‹ค:

Java:

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
    .build();

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketStrategies(strategies)
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait

val strategies = RSocketStrategies.builder()
        .encoders { it.add(Jackson2CborEncoder()) }
        .decoders { it.add(Jackson2CborDecoder()) }
        .build()

val requester = RSocketRequester.builder()
        .rsocketStrategies(strategies)
        .connectTcpAndAwait("localhost", 7000)

RSocketStrategies๋Š” ์žฌ์‚ฌ์šฉ์ด ๊ณ ๋ ค๋˜์–ด ๋งŒ๋“ค์–ด์กŒ๋‹ค. ์ผ๋ถ€ ์‹œ๋‚˜๋ฆฌ์˜ค์—์„œ๋Š”, ์˜ˆ๋ฅผ ๋“ค๋ฉด ํด๋ผ์ด์–ธํŠธ์™€ ์„œ๋ฒ„๊ฐ€ ๊ฐ™์€ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋™์ž‘ํ•  ๋•Œ, RSocketStrategies๋ฅผ ์Šคํ”„๋ง ์„ค์ •์— ์„ ์–ธํ•˜๋Š” ๊ฒƒ์ด ๋” ๋‚˜์„ ์ˆ˜ ์žˆ๋‹ค.


ํด๋ผ์ด์–ธํŠธ ์‘๋‹ต์ž(Client Responders)

RSocketRequester.Builder๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์„œ๋ฒ„์˜ ์š”์ฒญ์— ๋Œ€ํ•œ responder๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค. ํด๋ผ์ด์–ธํŠธ ์ธก์˜ ์‘๋‹ต์— ์–ด๋…ธํ…Œ์ด์…˜ ํ•ธ๋“ค๋Ÿฌ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์„œ๋ฒ„์—์„œ ์‚ฌ์šฉ๋˜๋Š” ๊ฒƒ๊ณผ ๋™์ผํ•œ ์ธํ”„๋ผ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•˜์ง€๋งŒ, ๋‹ค์Œ๊ณผ ๊ฐ™์€ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ ๋“ฑ๋กํ•œ๋‹ค:

Java:

RSocketStrategies strategies = RSocketStrategies.builder()
    .routeMatcher(new PathPatternRouteMatcher())  (1)
    .build();

SocketAcceptor responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(responder)) (3)
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait

val strategies = RSocketStrategies.builder()
        .routeMatcher(PathPatternRouteMatcher())  (1)
        .build()

val responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(responder) } (3)
        .connectTcpAndAwait("localhost", 7000)

(1) spring-web ๋ชจ๋“ˆ์ด ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ, ํšจ๊ณผ์ ์ธ ๋ผ์šฐํŒ… ๋งค์นญ์„ ์œ„ํ•˜์—ฌ PathPatternRouteMatcher๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
(2) @MessageMapping ๋˜๋Š” @ConnectMapping ๋ฉ”์„œ๋“œ๋ฅผ ํฌํ•จํ•˜๋Š” responder๋ฅผ ๋งŒ๋“ ๋‹ค.
(3) responder๋ฅผ ๋“ฑ๋กํ•œ๋‹ค.

์œ„์˜ ๋‚ด์šฉ์€ ํด๋ผ์ด์–ธํŠธ responder๋ฅผ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ ๋“ฑ๋กํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์˜ˆ์ œ๋‹ค. responder๊ฐ€ ์Šคํ”„๋ง ์„ค์ •์— ์žˆ๋Š” ๋‹ค๋ฅธ ์‹œ๋‚˜๋ฆฌ์˜ค์˜ ๊ฒฝ์šฐ์—๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด RSocketMessageHandler๋ฅผ ์Šคํ”„๋ง ๋นˆ์œผ๋กœ ์„ ์–ธํ•œ ํ›„์— ๋“ฑ๋กํ•˜๋ฉด ๋œ๋‹ค:

Java:

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(handler.responder()))
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.beans.factory.getBean
import org.springframework.messaging.rsocket.connectTcpAndAwait

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(handler.responder()) }
        .connectTcpAndAwait("localhost", 7000)

์œ„์˜ ๊ฒฝ์šฐ RSocketMessageHandler์˜ setHandlerPredicate๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํด๋ผ์ด์–ธํŠธ responder๋ฅผ ๊ฐ์ง€ํ•˜๊ธฐ ์œ„ํ•œ ๋‹ค๋ฅธ ์ „๋žต์œผ๋กœ ์ „ํ™˜ํ•ด์•ผ ํ•  ์ˆ˜๋„ ์žˆ๋‹ค. @Controller๊ณผ ๊ฐ™์€ ๊ธฐ๋ณธ ์–ด๋…ธํ…Œ์ด์…˜ ๋Œ€์‹  @RSocketClientResponder์™€ ๊ฐ™์€ ์ปค์Šคํ…€ ์–ด๋…ธํ…Œ์ด์…˜ ์‚ฌ์šฉ์„ ์˜ˆ๋กœ ๋“ค ์ˆ˜ ์žˆ๋‹ค. ํด๋ผ์ด์–ธํŠธ์™€ ์„œ๋ฒ„ ๋˜๋Š” ์—ฌ๋Ÿฌ ํด๋ผ์ด์–ธํŠธ๊ฐ€ ๊ฐ™์€ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋™์ž‘ํ•˜๋Š” ์‹œ๋‚˜๋ฆฌ์˜ค์—์„œ ์ด๋Ÿฐ ์ „๋žต ์ „ํ™˜์ด ํ•„์š”ํ•˜๋‹ค.

ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ชจ๋ธ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Annotated Responders๋ฅผ ์ฐธ์กฐํ•˜๋ผ.


๊ณ ๊ธ‰(Advanced)

RSocketRequesterBuilder๋Š” keepalive ๊ฐ„๊ฒฉ, ์„ธ์…˜ ์ œ๊ฐœ, ์ธํ„ฐ์…‰ํ„ฐ ๋“ฑ์„ ์œ„ํ•œ ์ถ”๊ฐ€ ์„ค์ • ์˜ต์…˜์„ ์œ„ํ•ด io.rsocket.core.RSocketConnector ์ฝœ๋ฐฑ์„ ์ œ๊ณตํ•œ๋‹ค. ๋‹ค์Œ๊ณผ ๊ฐ™์ด ํ•ด๋‹น ๋ ˆ๋ฒจ์—์„œ ์˜ต์…˜์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค:

Java:

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketConnector(connector -> {
        // ...
    })
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait

val requester = RSocketRequester.builder()
        .rsocketConnector {
            //...
        }.connectTcpAndAwait("localhost", 7000)


5.2.2. Server Requester

์„œ๋ฒ„์—์„œ ์—ฐ๊ฒฐ๋œ ํด๋ผ์ด์–ธํŠธ๋กœ ์š”์ฒญํ•˜๋Š” ๊ฒƒ์€ ์„œ๋ฒ„์—์„œ ์—ฐ๊ฒฐ๋œ ํด๋ผ์ด์–ธํŠธ์— ๋Œ€ํ•œ requester๋ฅผ ์–ป๋Š” ์ผ์ด๋‹ค.

์–ด๋…ธํ…Œ์ด์…˜ ์‘๋‹ต์ž (Annotated Responders)์—์„œ๋Š” @ConnectMapping๊ณผ @MessageMapping ๋ฉ”์„œ๋“œ๋Š” RSocketRequester๋ฅผ ์ธ์ž๋กœ ๋ฐ›๋Š”๋‹ค. ์ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ปค๋„ฅ์…˜์„ ๋งบ์€ requester์— ์ ‘๊ทผํ•œ๋‹ค. @ConnectMapping ๋ฉ”์„œ๋“œ๋Š” ์š”์ฒญ ์‹œ์ž‘ ์ „์— ๋ฐ˜๋“œ์‹œ ์ฒ˜๋ฆฌํ•ด์•ผํ•˜๋Š” SETUP ํ”„๋ ˆ์ž„์˜ ๊ธฐ๋ณธ์ ์ธ ํ•ธ๋“ค๋Ÿฌ์ด๋‹ค. ๋”ฐ๋ผ์„œ ์š”์ฒญ์€ ์‹œ์ž‘ํ•  ๋•Œ ํ•ธ๋“ค๋ง๊ณผ ๋ถ„๋ฆฌ๋˜์–ด์•ผ ํ•œ๋‹ค.

Java:

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
    requester.route("status").data("5")
        .retrieveFlux(StatusReport.class)
        .subscribe(bar -> { (1)
            // ...
        });
    return ... (2)
}

(1) ์ฒ˜๋ฆฌ์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ ๋…๋ฆฝ์ ์œผ๋กœ ๋น„๋™๊ธฐ ์š”์ฒญ์„ ์‹œ์ž‘ํ•œ๋‹ค.
(2) ์ฒ˜๋ฆฌ๋ฅผ ์™„๋ฃŒํ•˜๊ณ  Mono<Void>๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

Kotlin:

@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
    GlobalScope.launch {
        requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
            // ...
        }
    }
    /// ... (2)
}

(1) ์ฒ˜๋ฆฌ์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ ๋…๋ฆฝ์ ์œผ๋กœ ๋น„๋™๊ธฐ ์š”์ฒญ์„ ์‹œ์ž‘ํ•œ๋‹ค.
(2) suspend ํ•จ์ˆ˜์—์„œ ์ฒ˜๋ฆฌํ•œ๋‹ค.


5.2.3. Requests

ํด๋ผ์ด์–ธํŠธ (client) ๋˜๋Š” ์„œ๋ฒ„(server) ์š”์ฒญ์ž๋ฅผ ๊ฐ€์ง€๋ฉด, ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์š”์ฒญํ•  ์ˆ˜ ์žˆ๋‹ค:

Java:

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
        .data(viewBox) (2)
        .retrieveFlux(AirportLocation.class); (3)

Kotlin:

val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
        .data(viewBox) (2)
        .retrieveFlow<AirportLocation>() (3)

(1) ์š”์ฒญ ๋ฉ”์‹œ์ง€์˜ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์— ๋ผ์šฐํŒ… ์ •๋ณด๋ฅผ ์ง€์ •ํ•œ๋‹ค.
(2) ์š”์ฒญ ๋ฉ”์‹œ์ง€์— ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
(3) ์˜ˆ์ƒ๋˜๋Š” ์‘๋‹ต์„ ์„ ์–ธํ•œ๋‹ค.

์ƒํ˜ธ์ž‘์šฉ ํƒ€์ž…์€ ์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ์˜ ์นด๋””๋„๋ฆฌํ‹ฐ๋กœ๋ถ€ํ„ฐ ์•”๋ฌต์ ์œผ๋กœ ๊ฒฐ์ •๋œ๋‹ค. ์œ„ ์˜ˆ์ œ๋Š” ํ•˜๋‚˜์˜ ๊ฐ’์ด ์ „์†ก๋˜๊ณ  ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ ๊ฐ’์„ ์ˆ˜์‹ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— Request-Stream ์ด๋‹ค. ์‚ฌ์šฉํ•˜๋Š” ์ž…๋ ฅ ๋ฐ ์ถœ๋ ฅ ์„ ํƒ์ด RSocket ์ธํ„ฐ๋ž™์…˜ ํƒ€์ž…๊ณผ rsponder๊ฐ€ ์˜ˆ์ƒํ•˜๋Š” ์ž…๋ ฅ ๋ฐ ์ถœ๋ ฅ ํƒ€์ž…๊ณผ ์ผ์น˜ํ•˜๋‹ค๋ฉด ์ด๋ฅผ ๊ณ ๋ คํ•  ํ•„์š”๋Š” ์—†๋‹ค. ์œ ํšจํ•˜์ง€ ์•Š์€ ์กฐํ•ฉ์˜ ์œ ์ผํ•œ ์˜ˆ๋Š” ๋‹ค๋Œ€์ผ(many-to-one)์ธ ๊ฒฝ์šฐ๋‹ค.

data(Object) ๋ฉ”์„œ๋“œ๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ Publisher๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค. Publisher๋Š” Flux์™€ Mono๋ฅผ ํฌํ•จ, ReactiveAdapterRegistry์— ๋“ฑ๋ก๋œ ๋‹ค๋ฅธ producer๋„ ํฌํ•จํ•œ๋‹ค. ๋™์ผํ•œ ํƒ€์ž…์˜ ๊ฐ’์„ ์ƒ์„ฑํ•˜๋Š” Flux์™€ ๊ฐ™์€ ๋‹ค์ค‘๊ฐ’(multi-value) Publisher์— ๋Œ€ํ•ด์„œ๋Š”, ์˜ค๋ฒ„๋กœ๋“œํ•œ data ๋ฉ”์„œ๋“œ ์ค‘ ํ•˜๋‚˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํƒ€์ž… ์ฒดํฌ์™€ ๋ชจ๋“  ์š”์†Œ์— ๋Œ€ํ•œ Encoder ๊ฒ€์ƒ‰์„ ๋ฐฉ์ง€๋ฅผ ๊ณ ๋ คํ•˜๋ผ.

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) ๋‹จ๊ณ„๋Š” ์„ ํƒ์ ์ด๋‹ค. ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ด์ง€ ์•Š๋Š” ์š”์ฒญ์ธ ๊ฒฝ์šฐ ์ƒ๋žตํ•˜์ž:

Java:

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
    .retrieveMono(AirportLocation.class);

Kotlin:

import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
    .retrieveAndAwait<AirportLocation>()

composite metadata(๊ธฐ๋ณธ๊ฐ’)๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ๋“ฑ๋กํ•œ Encoder๊ฐ€ ์ง€์›ํ•˜๋Š” ๊ฐ’์ธ ๊ฒฝ์šฐ, ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค:

Java:

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlux(AirportLocation.class);

Kotlin:

import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlow<AirportLocation>()

Fire-and-Forget์˜ ๊ฒฝ์šฐ Mono<Void>๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” send() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. Mono๋Š” ๋ฉ”์‹œ์ง€๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ „์†ก๋˜์—ˆ๋‹ค๋Š” ๋œป์ผ ๋ฟ, ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š์•˜์Œ์„ ๋‚˜ํƒ€๋‚ธ๋‹ค.


๋ชฉ์ฐจ ๊ฐ€์ด๋“œ