5.2. RSocketRequester
RSocketRequester๋ RSocket ์์ฒญ์ ์ฒ๋ฆฌํ๋ ๋ฅ์ํ API ์ ๊ณตํ์ฌ, ํ์ ์์ค ๋ฐ์ดํฐ ๋ฒํผ ๋์ ์ ๋ฐ์ดํฐ์ ๋ฉํ ๋ฐ์ดํฐ์ ๋ํ ๊ฐ์ฒด๋ฅผ
๋ฐ๊ณ ๋ฐํํ๋ค. ํด๋ผ์ด์ธํธ ์์ฒญ๊ณผ ์๋ฒ ์์ฒญ ๋๋ค ์์ฑํ๊ธฐ ์ํด RSocketRequest๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
5.2.1. Client Requester
ํด๋ผ์ด์ธํธ ์ธก์์ RSocketRequester๋ฅผ ์ป์ผ๋ ค๋ฉด ์๋ฒ์ ์ปค๋ฅ์
์ ์์ฒญํ๊ณ RSocket SETUP ํ๋ ์์ ์ค๋นํ์ฌ ์ ์กํด์ผ ํ๋ค.
RSocketRequester๋ ์ด๋ฅผ ์ํ ๋น๋๋ฅผ ์ ๊ณตํ๋ค. ๋ด๋ถ์ ์ผ๋ก io.rsocket.core.RSocketConnector๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
์๋๋ ๋ํดํธ ์ค์ ์ผ๋ก ์ปค๋ฅ์ ์ ๋งบ๋ ๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ์ด๋ค:
Java:
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectTcp("localhost", 7000);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectWebSocket(URI.create("https://example.org:8080/rsocket"));
Kotlin:
import org.springframework.messaging.rsocket.connectTcpAndAwait
import org.springframework.messaging.rsocket.connectWebSocketAndAwait
val requester = RSocketRequester.builder()
.connectTcpAndAwait("localhost", 7000)
val requester = RSocketRequester.builder()
.connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))
์ ์์ ์ฝ๋๋ ๋ฐ๋ก ์ปค๋ฅ์ ์ ๋งบ์ง ์๊ณ ์ฐ๊ธฐํ๋ค. ์ค์ ๋ก ์ปค๋ฅ์ ์ ๋งบ๊ณ requester๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด ๋ค์์ ์ํํ๋ผ:
Java:
// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
.subscribe(requester -> {
// ...
});
// Or block
RSocketRequester requester = RSocketRequester.builder()
.connectTcp("localhost", 7000)
.block(Duration.ofSeconds(5));
Kotlin:
// Connect asynchronously
import org.springframework.messaging.rsocket.connectTcpAndAwait
class MyService {
private var requester: RSocketRequester? = null
private suspend fun requester() = requester ?:
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }
suspend fun doSomething() = requester().route(...)
}
// Or block
import org.springframework.messaging.rsocket.connectTcpAndAwait
class MyService {
private val requester = runBlocking {
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
}
suspend fun doSomething() = requester.route(...)
}
์ปค๋ฅ์ ์ค์ (Connection Setup)
RSocketRequester.Builder๋ ์ด๊ธฐ SETUP ํ๋ ์์ ์ปค์คํ
ํ๊ธฐ ์ํด ๋ค์์ ์ ๊ณตํ๋ค:
dataMimeType(MimeType)- ์ปค๋ฅ์ ์ ํตํ๋ ๋ฐ์ดํฐ์ mime ํ์ ์ค์ metadataMimeType(MimeType)- ์ปค๋ฅ์ ์ ํตํ๋ ๋ฉํ ๋ฐ์ดํฐ์ mime ํ์ ์ค์ setupData(Object)-SETUPํ๋ ์์ ํฌํจํ ๋ฐ์ดํฐ ์ค์ setupRoute(String, Object...)-SETUPํ๋ ์์ ํฌํจ๋ ๋ฉํ ๋ฐ์ดํฐ๋ฅผ ๋ผ์ฐํsetupMetadata(Object, MimeType)-SETUPํ๋ ์์ ํฌํจ๋ ๋ค๋ฅธ ๋ฉํ๋ฐ์ดํฐ
๋ฐ์ดํฐ์ ๊ธฐ๋ณธ mime ํ์
์ ์ฒ์ ์ค์ ๋ Decoder๋ก ๊ฒฐ์ ๋๋ค. ๋ฉํ ๋ฐ์ดํฐ์ ๊ฒฝ์ฐ ๊ธฐ๋ณธ mime ํ์
์ ์์ฒญ ๋น ๋ฉํ ๋ฐ์ดํฐ์ mime ํ์
์
์ฌ๋ฌ๊ฐ ์ฌ์ฉํ ์ ์๋
composite metadata๋ค. ์ผ๋ฐ์ ์ผ๋ก ๋ ๋ค ๋ณ๊ฒฝํ ํ์๋ ์๋ค.
SETUP ํ๋ ์์ ๋ฐ์ดํฐ์ ๋ฉํ ๋ฐ์ดํฐ๋ ์ ํ์ฌํญ์ด๋ค. ์๋ฒ ์ธก์์
@ConnectionMapping ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ปค๋ฅ์
์ ์์๊ณผ SETUP ํ๋ ์์ ์ปจํ
์ธ ๋ฅผ ์ฒ๋ฆฌํ๋ค. ๋ฉํ ๋ฐ์ดํฐ๋ ์ปค๋ฅ์
๋ ๋ฒจ ๋ณด์์
์ฌ์ฉ๋ ์ ์๋ค.
์ ๋ต(Strategies)
RSocketRequster.Builder๋ RSocketStrategies๋ฅผ ๋ฐ์ requester๋ฅผ ์ค์ ํ๋ค. ์ด๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ์ ๋ฉํ ๋ฐ์ดํฐ ๊ฐ์
(์ญ)์ง๋ ฌํํ ์ธ์ฝ๋์ ๋์ฝ๋๋ฅผ ์ ๊ณตํ๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก spring-core์ ์๋ String, byte[], ByteBuffer์ ๋ํ ์ฝ๋ฑ๋ง
๊ธฐ๋ณธ๊ฐ์ผ๋ก ๋ฑ๋ก๋๋ค. spring-web ๋ชจ๋์ ์ฌ์ฉํ์ฌ ๋ค์๊ณผ ๊ฐ์ ์ฝ๋ฑ์ ์ถ๊ฐ ๋ฑ๋กํ ์ ์๋ค:
Java:
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcp("localhost", 7000);
Kotlin:
import org.springframework.messaging.rsocket.connectTcpAndAwait
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcpAndAwait("localhost", 7000)
RSocketStrategies๋ ์ฌ์ฌ์ฉ์ด ๊ณ ๋ ค๋์ด ๋ง๋ค์ด์ก๋ค. ์ผ๋ถ ์๋๋ฆฌ์ค์์๋, ์๋ฅผ ๋ค๋ฉด ํด๋ผ์ด์ธํธ์ ์๋ฒ๊ฐ ๊ฐ์ ์ ํ๋ฆฌ์ผ์ด์
์ ๋์ํ ๋,
RSocketStrategies๋ฅผ ์คํ๋ง ์ค์ ์ ์ ์ธํ๋ ๊ฒ์ด ๋ ๋์ ์ ์๋ค.
ํด๋ผ์ด์ธํธ ์๋ต์(Client Responders)
RSocketRequester.Builder๋ฅผ ์ฌ์ฉํ์ฌ ์๋ฒ์ ์์ฒญ์ ๋ํ responder๋ฅผ ๊ตฌ์ฑํ ์ ์๋ค. ํด๋ผ์ด์ธํธ ์ธก์ ์๋ต์ ์ด๋
ธํ
์ด์
ํธ๋ค๋ฌ๋ฅผ
์ฌ์ฉํ ์ ์๋ค. ์๋ฒ์์ ์ฌ์ฉ๋๋ ๊ฒ๊ณผ ๋์ผํ ์ธํ๋ผ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ์ง๋ง, ๋ค์๊ณผ ๊ฐ์ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ๋ฑ๋กํ๋ค:
Java:
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.connectTcp("localhost", 7000);
Kotlin:
import org.springframework.messaging.rsocket.connectTcpAndAwait
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.connectTcpAndAwait("localhost", 7000)
(1)
spring-web๋ชจ๋์ด ์กด์ฌํ๋ ๊ฒฝ์ฐ, ํจ๊ณผ์ ์ธ ๋ผ์ฐํ ๋งค์นญ์ ์ํ์ฌPathPatternRouteMatcher๋ฅผ ์ฌ์ฉํ๋ค.
(2)@MessageMapping๋๋@ConnectMapping๋ฉ์๋๋ฅผ ํฌํจํ๋ responder๋ฅผ ๋ง๋ ๋ค.
(3) responder๋ฅผ ๋ฑ๋กํ๋ค.
์์ ๋ด์ฉ์ ํด๋ผ์ด์ธํธ responder๋ฅผ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ๋ฑ๋กํ๋ ๊ฐ๋จํ ์์ ๋ค. responder๊ฐ ์คํ๋ง ์ค์ ์ ์๋ ๋ค๋ฅธ ์๋๋ฆฌ์ค์ ๊ฒฝ์ฐ์๋
๋ค์๊ณผ ๊ฐ์ด RSocketMessageHandler๋ฅผ ์คํ๋ง ๋น์ผ๋ก ์ ์ธํ ํ์ ๋ฑ๋กํ๋ฉด ๋๋ค:
Java:
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.connectTcp("localhost", 7000);
Kotlin:
import org.springframework.beans.factory.getBean
import org.springframework.messaging.rsocket.connectTcpAndAwait
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.connectTcpAndAwait("localhost", 7000)
์์ ๊ฒฝ์ฐ RSocketMessageHandler์ setHandlerPredicate๋ฅผ ์ฌ์ฉํ์ฌ ํด๋ผ์ด์ธํธ responder๋ฅผ ๊ฐ์งํ๊ธฐ ์ํ ๋ค๋ฅธ ์ ๋ต์ผ๋ก
์ ํํด์ผ ํ ์๋ ์๋ค. @Controller๊ณผ ๊ฐ์ ๊ธฐ๋ณธ ์ด๋
ธํ
์ด์
๋์ @RSocketClientResponder์ ๊ฐ์ ์ปค์คํ
์ด๋
ธํ
์ด์
์ฌ์ฉ์
์๋ก ๋ค ์ ์๋ค. ํด๋ผ์ด์ธํธ์ ์๋ฒ ๋๋ ์ฌ๋ฌ ํด๋ผ์ด์ธํธ๊ฐ ๊ฐ์ ์ ํ๋ฆฌ์ผ์ด์
์์ ๋์ํ๋ ์๋๋ฆฌ์ค์์ ์ด๋ฐ ์ ๋ต ์ ํ์ด ํ์ํ๋ค.
ํ๋ก๊ทธ๋๋ฐ ๋ชจ๋ธ์ ๋ํ ์์ธํ ๋ด์ฉ์ Annotated Responders๋ฅผ ์ฐธ์กฐํ๋ผ.
๊ณ ๊ธ(Advanced)
RSocketRequesterBuilder๋ keepalive ๊ฐ๊ฒฉ, ์ธ์
์ ๊ฐ, ์ธํฐ์
ํฐ ๋ฑ์ ์ํ ์ถ๊ฐ ์ค์ ์ต์
์ ์ํด
io.rsocket.core.RSocketConnector ์ฝ๋ฐฑ์ ์ ๊ณตํ๋ค. ๋ค์๊ณผ ๊ฐ์ด ํด๋น ๋ ๋ฒจ์์ ์ต์
์ ์ค์ ํ ์ ์๋ค:
Java:
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.connectTcp("localhost", 7000);
Kotlin:
import org.springframework.messaging.rsocket.connectTcpAndAwait
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}.connectTcpAndAwait("localhost", 7000)
5.2.2. Server Requester
์๋ฒ์์ ์ฐ๊ฒฐ๋ ํด๋ผ์ด์ธํธ๋ก ์์ฒญํ๋ ๊ฒ์ ์๋ฒ์์ ์ฐ๊ฒฐ๋ ํด๋ผ์ด์ธํธ์ ๋ํ requester๋ฅผ ์ป๋ ์ผ์ด๋ค.
์ด๋
ธํ
์ด์
์๋ต์
(Annotated Responders)์์๋ @ConnectMapping๊ณผ @MessageMapping ๋ฉ์๋๋ RSocketRequester๋ฅผ ์ธ์๋ก ๋ฐ๋๋ค.
์ด๋ฅผ ์ฌ์ฉํ์ฌ ์ปค๋ฅ์
์ ๋งบ์ requester์ ์ ๊ทผํ๋ค. @ConnectMapping ๋ฉ์๋๋ ์์ฒญ ์์ ์ ์ ๋ฐ๋์ ์ฒ๋ฆฌํด์ผํ๋ SETUP ํ๋ ์์
๊ธฐ๋ณธ์ ์ธ ํธ๋ค๋ฌ์ด๋ค. ๋ฐ๋ผ์ ์์ฒญ์ ์์ํ ๋ ํธ๋ค๋ง๊ณผ ๋ถ๋ฆฌ๋์ด์ผ ํ๋ค.
Java:
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
(1) ์ฒ๋ฆฌ์ ๋ฌด๊ดํ๊ฒ ๋ ๋ฆฝ์ ์ผ๋ก ๋น๋๊ธฐ ์์ฒญ์ ์์ํ๋ค.
(2) ์ฒ๋ฆฌ๋ฅผ ์๋ฃํ๊ณMono<Void>๋ฅผ ๋ฐํํ๋ค.
Kotlin:
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
(1) ์ฒ๋ฆฌ์ ๋ฌด๊ดํ๊ฒ ๋ ๋ฆฝ์ ์ผ๋ก ๋น๋๊ธฐ ์์ฒญ์ ์์ํ๋ค.
(2) suspend ํจ์์์ ์ฒ๋ฆฌํ๋ค.
5.2.3. Requests
ํด๋ผ์ด์ธํธ (client) ๋๋ ์๋ฒ(server) ์์ฒญ์๋ฅผ ๊ฐ์ง๋ฉด, ๋ค์๊ณผ ๊ฐ์ด ์์ฒญํ ์ ์๋ค:
Java:
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
Kotlin:
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
(1) ์์ฒญ ๋ฉ์์ง์ ๋ฉํ ๋ฐ์ดํฐ์ ๋ผ์ฐํ ์ ๋ณด๋ฅผ ์ง์ ํ๋ค.
(2) ์์ฒญ ๋ฉ์์ง์ ๋ํ ๋ฐ์ดํฐ๋ฅผ ์ ๊ณตํ๋ค.
(3) ์์๋๋ ์๋ต์ ์ ์ธํ๋ค.
์ํธ์์ฉ ํ์
์ ์
๋ ฅ๊ณผ ์ถ๋ ฅ์ ์นด๋๋๋ฆฌํฐ๋ก๋ถํฐ ์๋ฌต์ ์ผ๋ก ๊ฒฐ์ ๋๋ค. ์ ์์ ๋ ํ๋์ ๊ฐ์ด ์ ์ก๋๊ณ ํ๋์ ์คํธ๋ฆผ ๊ฐ์ ์์ ํ๊ธฐ ๋๋ฌธ์
Request-Stream ์ด๋ค. ์ฌ์ฉํ๋ ์
๋ ฅ ๋ฐ ์ถ๋ ฅ ์ ํ์ด RSocket ์ธํฐ๋์
ํ์
๊ณผ rsponder๊ฐ ์์ํ๋ ์
๋ ฅ ๋ฐ ์ถ๋ ฅ ํ์
๊ณผ ์ผ์นํ๋ค๋ฉด
์ด๋ฅผ ๊ณ ๋ คํ ํ์๋ ์๋ค. ์ ํจํ์ง ์์ ์กฐํฉ์ ์ ์ผํ ์๋ ๋ค๋์ผ(many-to-one)์ธ ๊ฒฝ์ฐ๋ค.
data(Object) ๋ฉ์๋๋ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ Publisher๋ฅผ ๋ฐ์ ์ ์๋ค. Publisher๋ Flux์ Mono๋ฅผ ํฌํจ,
ReactiveAdapterRegistry์ ๋ฑ๋ก๋ ๋ค๋ฅธ producer๋ ํฌํจํ๋ค. ๋์ผํ ํ์
์ ๊ฐ์ ์์ฑํ๋ Flux์ ๊ฐ์
๋ค์ค๊ฐ(multi-value) Publisher์ ๋ํด์๋, ์ค๋ฒ๋ก๋ํ data ๋ฉ์๋ ์ค ํ๋๋ฅผ ์ฌ์ฉํ์ฌ ํ์
์ฒดํฌ์ ๋ชจ๋ ์์์ ๋ํ
Encoder ๊ฒ์์ ๋ฐฉ์ง๋ฅผ ๊ณ ๋ คํ๋ผ.
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object) ๋จ๊ณ๋ ์ ํ์ ์ด๋ค. ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ด์ง ์๋ ์์ฒญ์ธ ๊ฒฝ์ฐ ์๋ตํ์:
Java:
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin:
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
composite metadata(๊ธฐ๋ณธ๊ฐ)๋ฅผ ์ฌ์ฉํ๊ณ ๋ฑ๋กํ Encoder๊ฐ ์ง์ํ๋ ๊ฐ์ธ ๊ฒฝ์ฐ, ๋ฉํ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํ ์ ์๋ค.
์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ๋ค:
Java:
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin:
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
Fire-and-Forget์ ๊ฒฝ์ฐ Mono<Void>๋ฅผ ๋ฐํํ๋ send() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ค. Mono๋ ๋ฉ์์ง๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์ ์ก๋์๋ค๋ ๋ป์ผ ๋ฟ,
์ฒ๋ฆฌ๋์ง ์์์์ ๋ํ๋ธ๋ค.
๋ชฉ์ฐจ ๊ฐ์ด๋