5.3. Annotated Responders

RSocket ์‘๋‹ต์ž๋Š” @MessageMapping๊ณผ @ConnectMapping ๋ฉ”์„œ๋“œ๋กœ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค. @MessageMapping ๋ฉ”์„œ๋“œ๋Š” ๊ฐ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜๊ณ  @ConnectMapping ๋ฉ”์„œ๋“œ๋Š” ์ปค๋„ฅ์…˜ ๋ ˆ๋ฒจ ์ด๋ฒคํŠธ(์„ค์ •๊ณผ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ํ‘ธ์‹œ)๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค. ์–ด๋…ธํ…Œ์ด์…˜ ๋‹ฌ๋ฆฐ ์‘๋‹ต์ž(annotated responder)๋Š” ์„œ๋ฒ„ ์ธก์—์„œ ์‘๋‹ตํ•˜๊ณ  ํด๋ผ์ด์–ธํŠธ ์ธก์—์„œ ์‘๋‹ตํ•˜๊ธฐ ์œ„ํ•ด ๋Œ€์นญ์ ์œผ๋กœ ๋ชจ๋‘ ์ง€์›ํ•œ๋‹ค.


5.3.1. Server Responders

์„œ๋ฒ„ ์ธก์—์„œ ์–ด๋…ธํ…Œ์ด์…˜ ๋‹ฌ๋ฆฐ ์‘๋‹ต์ž(annotated responder)๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด RSocketMessageHandler๋ฅผ ์Šคํ”„๋ง ์„ค์ •์— ์ถ”๊ฐ€ํ•˜์—ฌ @Controller ๋นˆ๊ณผ @MessageMapping ๋ฐ @ConnectMapping ๋ฉ”์„œ๋“œ๋ฅผ ๊ฐ์ง€ํ•˜๋„๋ก ํ•œ๋‹ค.

Java:

@Configuration
static class ServerConfig {

    @Bean
    public RSocketMessageHandler rsocketMessageHandler() {
        RSocketMessageHandler handler = new RSocketMessageHandler();
        handler.routeMatcher(new PathPatternRouteMatcher());
        return handler;
    }
}

Kotlin:

@Configuration
class ServerConfig {

    @Bean
    fun rsocketMessageHandler() = RSocketMessageHandler().apply {
        routeMatcher = PathPatternRouteMatcher()
    }
}

๋‹ค์Œ์œผ๋กœ ์ž๋ฐ” RSocket API๋ฅผ ํ†ตํ•ด RSocket ์„œ๋ฒ„๋ฅผ ์‹œ์ž‘ํ•˜๊ณ  ๋‹ค์Œ๊ณผ ๊ฐ™์ด responder์— ๋Œ€ํ•œ RSocketMessageHandler๋ฅผ ์—ฐ๊ฒฐํ•œ๋‹ค:

Java:

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
    RSocketServer.create(handler.responder())
        .bind(TcpServerTransport.create("localhost", 7000))
        .block();

Kotlin:

import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
        .bind(TcpServerTransport.create("localhost", 7000))
        .awaitFirst()

RSocketMessageHandler๋Š” ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ composite์™€ routing์„ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ง€์›ํ•œ๋‹ค. ๋‹ค๋ฅธ mime ํƒ€์ž…์œผ๋กœ ๋ฐ”๊พธ๊ฑฐ๋‚˜ ๋‹ค๋ฅธ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ mime ํƒ€์ž…์„ ๋“ฑ๋กํ•˜๋ ค๋ฉด MetadataExtractor์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์™€ ๋ฐ์ดํ„ฐ ํฌ๋งท์„ ์ง€์›ํ•˜๋Š”๋ฐ ํ•„์š”ํ•œ Encoder์™€ Decoder ์ธ์Šคํ„ด์Šค๋ฅผ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค. ์ฝ”๋ฑ ๊ตฌํ˜„์„ ์œ„ํ•ด์„œ๋Š” spring-web ๋ชจ๋“ˆ์ด ํ•„์š”ํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ธฐ๋ณธ์ ์œผ๋กœ SimpleRouteMatcher๋Š” AntPathMatcher๋ฅผ ํ†ตํ•ด ๋ผ์šฐํŒ… ๋งค์นญํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ํšจ๊ณผ์ ์ธ ๋ผ์šฐํŒ… ๋งค์นญ์„ ์œ„ํ•ด spring-web ๋ชจ๋“ˆ์˜ PathPatternRouteMatcher๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค. RSocket ๋ผ์šฐํŒ…์€ ๊ณ„์ธต์ ์ผ ์ˆ˜ ์žˆ์ง€๋งŒ URL path๋Š” ์•„๋‹ˆ๋‹ค. ๋‘ ๋ผ์šฐํŒ… ๋งค์นญ์€ ๊ธฐ๋ณธ์ ์œผ๋กœ โ€œ.โ€๋ฅผ ๊ตฌ๋ถ„์ž๋กœ ์‚ฌ์šฉํ•˜๋ฉฐ HTTP URL๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ URL ๋””์ฝ”๋”ฉ์€ ์—†๋‹ค.

RSocketMessageHandler๋Š” RSocketStrategies๋ฅผ ํ†ตํ•ด ์„ค์ •ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๋™์ผํ•œ ํ”„๋กœ์„ธ์Šค์—์„œ ํด๋ผ์ด์–ธํŠธ์™€ ์„œ๋ฒ„ ์‚ฌ์ด์˜ ์„ค์ •์„ ๊ณต์œ ํ•ด์•ผํ•˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค:

Java:

@Configuration
static class ServerConfig {

    @Bean
    public RSocketMessageHandler rsocketMessageHandler() {
        RSocketMessageHandler handler = new RSocketMessageHandler();
        handler.setRSocketStrategies(rsocketStrategies());
        return handler;
    }

    @Bean
    public RSocketStrategies rsocketStrategies() {
        return RSocketStrategies.builder()
            .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
            .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
            .routeMatcher(new PathPatternRouteMatcher())
            .build();
    }
}

Kotlin:

@Configuration
class ServerConfig {

    @Bean
    fun rsocketMessageHandler() = RSocketMessageHandler().apply {
        rSocketStrategies = rsocketStrategies()
    }

    @Bean
    fun rsocketStrategies() = RSocketStrategies.builder()
            .encoders { it.add(Jackson2CborEncoder()) }
            .decoders { it.add(Jackson2CborDecoder()) }
            .routeMatcher(PathPatternRouteMatcher())
            .build()
}


5.3.2. Client Responders

ํด๋ผ์ด์–ธํŠธ ์ธก์˜ ์–ด๋…ธํ…Œ์ด์…˜ ์‘๋‹ต์ž๋Š” RSocketRequester.Builder์—์„œ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ Client Responders์„ ์ฐธ์กฐํ•˜๋ผ.


5.3.3. @MessageMapping

์„œ๋ฒ„ ๋˜๋Š” ํด๋ผ์ด์–ธํŠธ responder ์„ค์ •์ด ์™„๋ฃŒ๋˜๋ฉด @MessageMapping ๋ฉ”์„œ๋“œ๋ฅผ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค:

Java:

@Controller
public class RadarsController {

    @MessageMapping("locate.radars.within")
    public Flux<AirportLocation> radars(MapRequest request) {
        // ...
    }
}

Kotlin:

@Controller
class RadarsController {

    @MessageMapping("locate.radars.within")
    fun radars(request: MapRequest): Flow<AirportLocation> {
        // ...
    }
}

์œ„์˜ @MessageMapping ๋ฉ”์„œ๋“œ๋Š” โ€œlocate.radars.withinโ€ ๋ผ์šฐํŒ…์„ ๊ฐ€์ง„ Request-Stream ์ƒํ˜ธ์ž‘์šฉ์— ์‘๋‹ตํ•œ๋‹ค. ์ด ๋ฉ”์„œ๋“œ๋Š” ๋‹ค์Œ ๋ฉ”์„œ๋“œ ์ธ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์œ ์—ฐํ•œ ๋ฉ”์„œ๋“œ ์‹œ๊ทธ๋‹ˆ์ฒ˜๋ฅผ ์ง€์›ํ•œ๋‹ค:

๋ฉ”์„œ๋“œ ์ธ์ž ์„ค๋ช…
@Payload ์š”์ฒญ์˜ ํŽ˜์ด๋กœ๋“œ. Mono ๋˜๋Š” Flux์™€ ๊ฐ™์ด ๋น„๋™๊ธฐ ํƒ€์ž…์˜ ๊ตฌ์ฒด์ ์ธ ๊ฐ’์ด ๋  ์ˆ˜ ์žˆ๋‹ค.

์ฐธ๊ณ : ์–ด๋…ธํ…Œ์ด์…˜ ์‚ฌ์šฉ์€ ์„ ํƒ์‚ฌํ•ญ์ด๋‹ค. ๋‹จ์ˆœ ํƒ€์ž…์ด ์•„๋‹ˆ๋ฉด์„œ ์ง€์›๋˜๋Š” ์ธ์ž๊ฐ€ ์•„๋‹Œ ๊ฒฝ์šฐ์—๋Š” ํŽ˜์ด๋กœ๋“œ๋กœ ๊ฐ„์ฃผ๋œ๋‹ค.
RSocketRequester ์›๊ฒฉ ์ข…๋ฃŒ ์š”์ฒญ์„ ๋ณด๋‚ธ ์š”์ฒญ์ž(requester)
@DestinationVariable ๋งคํ•‘ ํŒจํ„ด์˜ ๋ณ€์ˆ˜์— ๊ธฐ๋ฐ˜ํ•œ ๋ผ์šฐํŒ…์œผ๋กœ๋ถ€ํ„ฐ ์ถ”์ถœ๋œ ๊ฐ’. ์˜ˆ๋กœ, @MessageMapping("find.radar.{id}")
@Header ๋“ฑ๋ก๋œ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•œ ๊ฐ’. MetadataExtractor ์ฐธ๊ณ 
@Headers Map<String, Object> ๋“ฑ๋ก๋œ ๋ชจ๋“  ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•œ ๊ฐ’. MetadataExtractor ์ฐธ๊ณ 

๋ฐ˜ํ™˜๊ฐ’์€ ์‘๋‹ต ํŽ˜์ด๋กœ๋“œ๋กœ ์ง๋ ฌํ™”๋  ํ•˜๋‚˜ ์ด์ƒ์˜ ๊ฐ์ฒด๋กœ ์˜ˆ์ƒ๋œ๋‹ค. ๋ฐ˜ํ™˜๊ฐ’์€ Mono ๋˜๋Š” Flux์™€ ๊ฐ™์€ ๋น„๋™๊ธฐ ํƒ€์ž…์ด๊ฑฐ๋‚˜, ๊ตฌ์ฒด์ ์ธ ๊ฐ’ ๋˜๋Š” void ๋˜๋Š” Mono<Void>์™€ ๊ฐ™์€ ๊ฐ’์ด ์—†๋Š”(no-value) ๋น„๋™๊ธฐ ํƒ€์ž…์ผ ์ˆ˜ ์žˆ๋‹ค.

@MessageMapping ๋ฉ”์„œ๋“œ๊ฐ€ ์ง€์›ํ•˜๋Š” RSocket ์ƒํ˜ธ์ž‘์šฉ ํƒ€์ž…์€ ์ž…๋ ฅ(์˜ˆ: @Payload ์ธ์ž) ๋ฐ ์ถœ๋ ฅ์˜ ์นด๋””๋„๋ฆฌํ‹ฐ๋กœ๋ถ€ํ„ฐ ๊ฒฐ์ •๋œ๋‹ค. ์—ฌ๊ธฐ์„œ ์นด๋””๋„๋ฆฌํ‹ฐ๋Š” ๋‹ค์Œ์„ ์˜๋ฏธํ•œ๋‹ค:

์นด๋””๋„๋ฆฌํ‹ฐ(Cardinality) ์„ค๋ช…
1 ๋ช…์‹œ์ ์ธ ๊ฐ’, ํ˜น์€ Mono<T>์™€ ๊ฐ™์€ ๋‹จ์ผ๊ฐ’(single-value) ๋น„๋™๊ธฐ ํƒ€์ž…
Many Flux<T>์™€ ๊ฐ™์€ ๋‹ค์ค‘๊ฐ’(multi-value) ๋น„๋™๊ธฐ ํƒ€์ž…
0 ์ž…๋ ฅ์—์„œ๋Š” ๋ฉ”์„œ๋“œ์— @Payload ์ธ์ž๊ฐ€ ์—†์Œ์„ ์˜๋ฏธํ•œ๋‹ค.

์ถœ๋ ฅ์˜ ๊ฒฝ์šฐ void ๋˜๋Š” Mono<Void>์™€ ๊ฐ™์€ ๊ฐ’์ด ์—†๋Š”(no-value) ๋น„๋™๊ธฐ ํƒ€์ž…

๋‹ค์Œ์€ ๋ชจ๋“  ์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ ์นด๋””๋„๋ฆฌํ‹ฐ ์กฐํ•ฉ๊ณผ ๊ทธ์— ๋”ฐ๋ฅธ ์ƒํ˜ธ์ž‘์šฉ ํƒ€์ž… ์œ ํ˜•์ด๋‹ค:

์ž…๋ ฅ ์นด๋””๋„๋ฆฌํ‹ฐ
(Input Cardinality)
์ถœ๋ ฅ ์นด๋””๋„๋ฆฌํ‹ฐ
(Output Cardinality)
์ƒํ˜ธ๋™์ž‘ ์œ ํ˜•
(Interaction Types)
0, 1 0 Fire-and-Forget, Request-Response
0, 1 1 Request-Response
0, 1 Many Request-Stream
Many 0, 1, Many Request-Channel


5.3.4. @ConnectMapping

@ConnectMapping์€ RSocket ์ปค๋„ฅ์…˜ ์‹œ์ž‘์‹œ SETUP ํ”„๋ ˆ์ž„์„ ํ•ธ๋“ค๋งํ•œ๋‹ค. ๊ทธ๋ฆฌ๊ณ  METADATA_PUSH ํ”„๋ ˆ์ž„ (์˜ˆ: io.rsocket.RSocket์˜ metadataPush(payload))์œผ๋กœ ์ด์–ด์ง€๋Š” ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ํ‘ธ์‹œ ์•Œ๋ฆผ์„ ํ•ธ๋“ค๋งํ•œ๋‹ค.

@ConnectMapping ๋ฉ”์„œ๋“œ๋Š” @MessageMapping๊ณผ ๋™์ผํ•œ ์ธ์ž๋ฅผ ์ง€์›ํ•˜์ง€๋งŒ SETUP๊ณผ METADATA_PUSH ํ”„๋ ˆ์ž„์˜ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์™€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ๋‹ค. @ConnectMapping์— ํŒจํ„ด์„ ์ง€์ •ํ•˜๋ฉด ํŠน์ • ๋ผ์šฐํŒ… ์ •๋ณด๊ฐ€ ์žˆ๋Š” ์ปค๋„ฅ์…˜๋งŒ ์ฒ˜๋ฆฌํ•œ๋‹ค. ์•„๋ฌด ํŒจํ„ด๋„ ์„ ์–ธ๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ๋ผ๋ฉด ๋ชจ๋“  ์ปค๋„ฅ์…˜์ด ๋งค์นญ๋œ๋‹ค.

@ConnectMapping ๋ฉ”์„œ๋“œ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์—†์œผ๋ฉฐ void ๋˜๋Š” Mono<Void>๋ฅผ ๋ฐ˜ํ™˜ ํƒ€์ž…์œผ๋กœ ์„ ์–ธํ•ด์•ผ ํ•œ๋‹ค. ๋งŒ์ผ ์‹ ๊ทœ ์ปค๋„ฅ์…˜์— ๋Œ€ํ•ด ์˜ค๋ฅ˜๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด ์ปค๋„ฅ์…˜์€ ๊ฑฐ์ ˆ๋œ๋‹ค. RSocketRequester์— ์ปค๋„ฅ์…˜์„ ์š”์ฒญํ•˜๊ธฐ ์œ„ํ•ด ์ฒ˜๋ฆฌ๋ฅผ ๋ณด๋ฅ˜ํ•ด์„œ๋Š” ์•ˆ ๋œ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ Server Requester๋ฅผ ์ฐธ์กฐํ•˜๋ผ.


๋ชฉ์ฐจ ๊ฐ€์ด๋“œ