[Web on Reactive Stack] 5. RSocket: 5.2. RSocketRequester

한글로 번역한 Web on Reactive Stack, 5. RSocket: 5.2. RSocketRequester

#spring #reactive #rsocket


5.2. RSocketRequester

RSocketRequester는 RSocket 요청을 처리하는 능숙한 API 제공하여, 하위 수준 데이터 버퍼 대신에 데이터와 메타 데이터에 대한 객체를 받고 반환한다. 클라이언트 요청과 서버 요청 둘다 작성하기 위해 RSocketRequest를 사용할 수 있다.


5.2.1. Client Requester

클라이언트 측에서 RSocketRequester를 얻으려면 서버에 커넥션을 요청하고 RSocket SETUP 프레임을 준비하여 전송해야 한다. RSocketRequester는 이를 위한 빌더를 제공한다. 내부적으로 io.rsocket.core.RSocketConnector를 기반으로 한다.

아래는 디폴트 설정으로 커넥션을 맺는 가장 기본적인 방법이다:

Java:

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .connectTcp("localhost", 7000);

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .connectWebSocket(URI.create("https://example.org:8080/rsocket"));

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait
import org.springframework.messaging.rsocket.connectWebSocketAndAwait

val requester = RSocketRequester.builder()
        .connectTcpAndAwait("localhost", 7000)

val requester = RSocketRequester.builder()
        .connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))

위 예제 코드는 바로 커넥션을 맺지 않고 연기한다. 실제로 커넥션을 맺고 requester를 사용하려면 다음을 수행하라:

Java:

// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
    .subscribe(requester -> {
        // ...
    });

// Or block
RSocketRequester requester = RSocketRequester.builder()
    .connectTcp("localhost", 7000)
    .block(Duration.ofSeconds(5));

Kotlin:

// Connect asynchronously
import org.springframework.messaging.rsocket.connectTcpAndAwait

class MyService {

    private var requester: RSocketRequester? = null

    private suspend fun requester() = requester ?:
        RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }

    suspend fun doSomething() = requester().route(...)
}

// Or block
import org.springframework.messaging.rsocket.connectTcpAndAwait

class MyService {

    private val requester = runBlocking {
        RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
    }

    suspend fun doSomething() = requester.route(...)
}


커넥션 설정(Connection Setup)

RSocketRequester.Builder는 초기 SETUP 프레임을 커스텀하기 위해 다음을 제공한다:

  • dataMimeType(MimeType) - 커넥션을 통하는 데이터의 mime 타입 설정
  • metadataMimeType(MimeType) - 커넥션을 통하는 메타 데이터의 mime 타입 설정
  • setupData(Object) - SETUP 프레임에 포함할 데이터 설정
  • setupRoute(String, Object...) - SETUP 프레임에 포함될 메타 데이터를 라우팅
  • setupMetadata(Object, MimeType) - SETUP 프레임에 포함될 다른 메타데이터

데이터의 기본 mime 타입은 처음 설정된 Decoder로 결정된다. 메타 데이터의 경우 기본 mime 타입은 요청 당 메타 데이터와 mime 타입을 여러개 사용할 수 있는 composite metadata다. 일반적으로 둘 다 변경할 필요는 없다.

SETUP 프레임의 데이터와 메타 데이터는 선택사항이다. 서버 측에서 @ConnectionMapping 메서드를 사용하여 커넥션의 시작과 SETUP 프레임의 컨텐츠를 처리한다. 메타 데이터는 커넥션 레벨 보안에 사용될 수 있다.


전략(Strategies)

RSocketRequster.BuilderRSocketStrategies를 받아 requester를 설정한다. 이를 사용하여 데이터와 메타 데이터 값을 (역)직렬화할 인코더와 디코더를 제공한다. 기본적으로 spring-core에 있는 String, byte[], ByteBuffer에 대한 코덱만 기본값으로 등록된다. spring-web 모듈을 사용하여 다음과 같은 코덱을 추가 등록할 수 있다:

Java:

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
    .build();

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketStrategies(strategies)
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait

val strategies = RSocketStrategies.builder()
        .encoders { it.add(Jackson2CborEncoder()) }
        .decoders { it.add(Jackson2CborDecoder()) }
        .build()

val requester = RSocketRequester.builder()
        .rsocketStrategies(strategies)
        .connectTcpAndAwait("localhost", 7000)

RSocketStrategies는 재사용이 고려되어 만들어졌다. 일부 시나리오에서는, 예를 들면 클라이언트와 서버가 같은 애플리케이션에 동작할 때, RSocketStrategies를 스프링 설정에 선언하는 것이 더 나을 수 있다.


클라이언트 응답자(Client Responders)

RSocketRequester.Builder를 사용하여 서버의 요청에 대한 responder를 구성할 수 있다. 클라이언트 측의 응답에 어노테이션 핸들러를 사용할 수 있다. 서버에서 사용되는 것과 동일한 인프라를 기반으로 하지만, 다음과 같은 프로그래밍 방식으로 등록한다:

Java:

RSocketStrategies strategies = RSocketStrategies.builder()
    .routeMatcher(new PathPatternRouteMatcher())  (1)
    .build();

SocketAcceptor responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(responder)) (3)
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait

val strategies = RSocketStrategies.builder()
        .routeMatcher(PathPatternRouteMatcher())  (1)
        .build()

val responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(responder) } (3)
        .connectTcpAndAwait("localhost", 7000)

(1) spring-web 모듈이 존재하는 경우, 효과적인 라우팅 매칭을 위하여 PathPatternRouteMatcher를 사용한다.
(2) @MessageMapping 또는 @ConnectMapping 메서드를 포함하는 responder를 만든다.
(3) responder를 등록한다.

위의 내용은 클라이언트 responder를 프로그래밍 방식으로 등록하는 간단한 예제다. responder가 스프링 설정에 있는 다른 시나리오의 경우에는 다음과 같이 RSocketMessageHandler를 스프링 빈으로 선언한 후에 등록하면 된다:

Java:

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(handler.responder()))
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.beans.factory.getBean
import org.springframework.messaging.rsocket.connectTcpAndAwait

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(handler.responder()) }
        .connectTcpAndAwait("localhost", 7000)

위의 경우 RSocketMessageHandlersetHandlerPredicate를 사용하여 클라이언트 responder를 감지하기 위한 다른 전략으로 전환해야 할 수도 있다. @Controller과 같은 기본 어노테이션 대신 @RSocketClientResponder와 같은 커스텀 어노테이션 사용을 예로 들 수 있다. 클라이언트와 서버 또는 여러 클라이언트가 같은 애플리케이션에서 동작하는 시나리오에서 이런 전략 전환이 필요하다.

프로그래밍 모델에 대한 자세한 내용은 Annotated Responders를 참조하라.


고급(Advanced)

RSocketRequesterBuilder는 keepalive 간격, 세션 제개, 인터셉터 등을 위한 추가 설정 옵션을 위해 io.rsocket.core.RSocketConnector 콜백을 제공한다. 다음과 같이 해당 레벨에서 옵션을 설정할 수 있다:

Java:

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
    .rsocketConnector(connector -> {
        // ...
    })
    .connectTcp("localhost", 7000);

Kotlin:

import org.springframework.messaging.rsocket.connectTcpAndAwait

val requester = RSocketRequester.builder()
        .rsocketConnector {
            //...
        }.connectTcpAndAwait("localhost", 7000)


5.2.2. Server Requester

서버에서 연결된 클라이언트로 요청하는 것은 서버에서 연결된 클라이언트에 대한 requester를 얻는 일이다.

어노테이션 응답자 (Annotated Responders)에서는 @ConnectMapping@MessageMapping 메서드는 RSocketRequester를 인자로 받는다. 이를 사용하여 커넥션을 맺은 requester에 접근한다. @ConnectMapping 메서드는 요청 시작 전에 반드시 처리해야하는 SETUP 프레임의 기본적인 핸들러이다. 따라서 요청은 시작할 때 핸들링과 분리되어야 한다.

Java:

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
    requester.route("status").data("5")
        .retrieveFlux(StatusReport.class)
        .subscribe(bar -> { (1)
            // ...
        });
    return ... (2)
}

(1) 처리와 무관하게 독립적으로 비동기 요청을 시작한다.
(2) 처리를 완료하고 Mono<Void>를 반환한다.

Kotlin:

@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
    GlobalScope.launch {
        requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
            // ...
        }
    }
    /// ... (2)
}

(1) 처리와 무관하게 독립적으로 비동기 요청을 시작한다.
(2) suspend 함수에서 처리한다.


5.2.3. Requests

클라이언트 (client) 또는 서버(server) 요청자를 가지면, 다음과 같이 요청할 수 있다:

Java:

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
        .data(viewBox) (2)
        .retrieveFlux(AirportLocation.class); (3)

Kotlin:

val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
        .data(viewBox) (2)
        .retrieveFlow<AirportLocation>() (3)

(1) 요청 메시지의 메타 데이터에 라우팅 정보를 지정한다.
(2) 요청 메시지에 대한 데이터를 제공한다.
(3) 예상되는 응답을 선언한다.

상호작용 타입은 입력과 출력의 카디널리티로부터 암묵적으로 결정된다. 위 예제는 하나의 값이 전송되고 하나의 스트림 값을 수신하기 때문에 Request-Stream 이다. 사용하는 입력 및 출력 선택이 RSocket 인터랙션 타입과 rsponder가 예상하는 입력 및 출력 타입과 일치하다면 이를 고려할 필요는 없다. 유효하지 않은 조합의 유일한 예는 다대일(many-to-one)인 경우다.

data(Object) 메서드는 리액티브 스트림 Publisher를 받을 수 있다. Publisher는 FluxMono를 포함, ReactiveAdapterRegistry에 등록된 다른 producer도 포함한다. 동일한 타입의 값을 생성하는 Flux와 같은 다중값(multi-value) Publisher에 대해서는, 오버로드한 data 메서드 중 하나를 사용하여 타입 체크와 모든 요소에 대한 Encoder 검색을 방지를 고려하라.

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 단계는 선택적이다. 데이터를 보내지 않는 요청인 경우 생략하자:

Java:

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
    .retrieveMono(AirportLocation.class);

Kotlin:

import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
    .retrieveAndAwait<AirportLocation>()

composite metadata(기본값)를 사용하고 등록한 Encoder가 지원하는 값인 경우, 메타 데이터를 추가할 수 있다. 예를 들면 다음과 같다:

Java:

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlux(AirportLocation.class);

Kotlin:

import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlow<AirportLocation>()

Fire-and-Forget의 경우 Mono<Void>를 반환하는 send() 메서드를 사용한다. Mono는 메시지가 성공적으로 전송되었다는 뜻일 뿐, 처리되지 않았음을 나타낸다.


목차 가이드