3.2. ์น์์ผ API(WebSocket API)
์คํ๋ง ํ๋ ์์ํฌ๋ ์น์์ผ ๋ฉ์์ง๋ฅผ ํธ๋ค๋งํ๋ ํด๋ผ์ด์ธํธ์ ์๋ฒ ์ฌ์ด๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์์ฑํ๋๋ฐ ์ฌ์ฉํ ์ ์๋ ์น์์ผ API๋ฅผ ์ ๊ณตํ๋ค.
3.2.1. ์๋ฒ(Server)
์น์์ผ ์๋ฒ๋ฅผ ์์ฑํ๋ ค๋ฉด, ๋จผ์ WebSocketHandler๋ฅผ ์์ฑํด์ผ ํ๋ค. ๋ค์ ์์ ๋ ์ด๋ฅผ ์์ฑํ๋ ๋ฐฉ๋ฒ์ด๋ค:
Java:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
Kotlin:
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
๊ทธ ๋ค์์๋ ํธ๋ค๋ฌ๋ฅผ URL์ ๋งตํํ๊ณ WebSocketHandlerAdapter๋ฅผ ์ถ๊ฐํ๋ค. ๋ค์ ์์ ๋ฅผ ์ฐธ์กฐํ๋ผ:
Java:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Kotlin:
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
3.2.2. WebSocketHandler
WebSocketHandler์ handle ๋ฉ์๋๋ WebSocketSession์ ๋ฐ์์ ์ธ์
์ฒ๋ฆฌ๊ฐ ์๋ฃ๋ ๊ฒ์ ๋ํ๋ด๊ธฐ ์ํด Mono<Void>๋ฅผ
๋ฐํํ๋ค. ์ธ์
์ ๋ ๊ฐ์ ์คํธ๋ฆผ์ ํตํด ์ฒ๋ฆฌ๋๋๋ฐ, ๊ฐ๊ฐ ์ธ๋ฐ์ด๋ ๋ฉ์์ง์ ์์๋ฐ์ด๋ ๋ฉ์์ง๋ฅผ ์ํ ๊ฒ์ด๋ค. ๋ค์ ํ๋ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ ๋ ๊ฐ์ง
๋ฉ์๋์ ๋ํด์ ์ค๋ช
ํ๋ค:
WebSocketSession ๋ฉ์๋ |
์ค๋ช |
|---|---|
Flux<WebSocketMessage> receive() |
์ธ๋ฐ์ด๋ ๋ฉ์์ง ์คํธ๋ฆผ์ ์ ๊ทผํ๊ณ ์ปค๋ฅ์ ์ด ๋ซํ ๋ ์๋ฃ๋๋ค. |
Mono<Void> send(Publisher<WebSocketMessage>) |
์ ์กํ ๋ฉ์์ง๋ฅผ ๋ฐ์ ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ , ์์ค๊ฐ ์๋ฃ๋๊ณ ๋ฉ์์ง ์์ฑ์ด ๋๋ ๋, Mono<Void>๋ฅผ ๋ฐํํ๋ค. |
WebSocketHandler๋ ์ธ๋ฐ์ด๋์ ์์ด๋ฐ์ด๋ ์คํธ๋ฆผ์ ํ๋๋ก ํตํฉํ ํ๋ก์ฐ๋ก ๊ตฌ์ฑํ๊ณ , ์ด ํ๋ก์ฐ์ ์๋ฃ๋ฅผ ๋ํ๋ด๋ Mono<Void>๋ฅผ
๋ฆฌํดํด์ผ ํ๋ค. ์ ํ๋ฆฌ์ผ์ด์
์๊ตฌ ์ฌํญ์ ๋ฐ๋ผ์ ํตํฉ๋ ํ๋ก์ฐ๋ ๋ค์๊ณผ ๊ฐ์ ์ํฉ์์ ์๋ฃ๋๋ค:
- ์ธ๋ฐ์ด๋ ๋๋ ์์๋ฐ์ด๋ ๋ฉ์์ง ์คํธ๋ฆผ์ด ์๋ฃ๋์ ๋
- ์ธ๋ฐ์ด๋ ์คํธ๋ฆผ์ด ์๋ฃ๋๊ณ (์ฆ, ์ปค๋ฅ์ ์ด ๋ซํ์ ๋), ์์๋ฐ์ด๋ ์คํธ๋ฆผ์ด ๋ฌดํ ์คํธ๋ฆผ์ผ ๋
- ์ ํ๋ ์์ ์์,
WebSocketSession์close๋ฉ์๋๋ฅผ ํตํด์
์ธ๋ฐ์ด๋์ ์์๋ฐ์ธ๋ ๋ฉ์์ง ์คํธ๋ฆผ์ด ํจ๊ป ๊ตฌ์ฑ๋ ๊ฒฝ์ฐ, ์ปค๋ฅ์ ์ด ์ด๋ ค์๋์ง ํใ ๋ํ ํ์๊ฐ ์๋ค. ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ ์๊ทธ๋์ด ํ๋์ ์ข ๋ฃํ๊ธฐ ๋๋ฌธ์ด๋ค. ์ธ๋ฐ์ด๋ ์คํธ๋ฆผ์ ์๋ฃ ๋๋ ์ค๋ฅ ์ ํธ๋ฅผ ์์ ํ๊ณ ์์๋ฐ์ด๋ ์คํธ๋ฆผ์ ์ทจ์ ์ ํธ๋ฅผ ์์ ํ๋ค.
ํธ๋ค๋ฌ์ ๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ๊ตฌํ์ฒด๋ ์ธ๋ฐ์ด๋ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ ๊ฒ์ด๋ค. ๋ค์ ์์ ๋ ์ด๋ฅผ ๋ณด์ฌ์ค๋ค:
Java:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
Kotlin:
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
(1) ์ธ๋ฐ์ด๋ ๋ฉ์์ง ์คํธ๋ฆผ์ ์ ๊ทผํ๋ค.
(2) ๊ฐ ๋ฉ์์ง์ ๋ํ ์ฒ๋ฆฌ๋ฅผ ์ํํ๋ค.
(3) ๋ฉ์์ง ์ฝํ ์ธ ๋ฅผ ์ฌ์ฉํ๋ ์ค์ฒฉ๋ ๋น๋๊ธฐ ์์ ์ ์ํํ๋ค.
(4) ์์ ์ด ์๋ฃ๋๋ฉดMono<Void>๋ฅผ ๋ฐํํ๋ค.
์ค์ฒฉ๋ ๋น๋๊ธฐ ์์ ์ ๊ฒฝ์ฐ, ํ๋ง๋ ๋ฐ์ดํฐ ๋ฒํผ(pooled data buffers)๋ฅผ ์ฌ์ฉํ๋ ์๋ฒ(์๋ฅผ ๋ค๋ฉด, Netty)์์
message.retain()์ ํธ์ถํด์ผ ํ ์ ์๋ค. ๊ทธ๋ ์ง ์์ผ๋ฉด ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ธฐ ์ ์ ๋ฒํผ๊ฐ ๋น์์ง ์ ์๋ค. ์์ธํ ๋ด์ฉ์ ๋ฐ์ดํฐ ๋ฒํผ์ ์ฝ๋ฑ(Data Buffers and Codecs)์ ์ฐธ๊ณ ํ๋ผ.
๋ค์์ ์ธ๋ฐ์ด๋์ ์์๋ฐ์ด๋ ์คํธ๋ฆผ์ ๊ฒฐํฉํ๋ค:
Java:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
Kotlin:
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
(1) ์ธ๋ฐ์ด๋ ๋ฉ์์ง ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ค.
(2) ์์๋ฐ์ด๋ ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ ๊ฒฐํฉ๋ ํ๋ก์ฐ๋ฅผ ๋ง๋ ๋ค.
(3) ์์ ํ๋ ๋์์๋ ์ฒ๋ฆฌ๊ฐ ์๋ฃํ์ง ์๋Mono<Void>๋ฅผ ๋ฐํํ๋ค.
์ธ๋ฐ์ด๋์ ์์๋ฐ์ด๋ ์คํธ๋ฆผ์ ๋ ๋ฆฝ์ ์ผ ์ ์์ผ๋ฉฐ, ์๋ฃ์์๋ง ๊ฒฐํฉ๋ ์ ์๋ค. ๋ค์์ ๊ทธ ์์ ๋ค:
Java:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
Kotlin:
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage))
return Mono.zip(input, output).then()
}
}
(1) ์ธ๋ฐ์ด๋ ๋ฉ์์ง ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ค.
(2) ๋ฐ์ ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค.
(3) ์คํธ๋ฆผ์ ๊ฒฐํฉํ๊ณ ๋ ์คํธ๋ฆผ์ด ๋ชจ๋ ๋๋๋ฉด ์ข ๋ฃํ๋Mono<Void>๋ฅผ ๋ฐํํ๋ค.
3.2.3. DataBuffer
DataBuffer๋ ์นํ๋ญ์ค์ ๋ฐ์ดํธ ๋ฒํผ๋ค. ๊ด๋ จํด์๋ ์คํ๋ง ์ฝ์ด ๋ ํผ๋ฐ์ค์
๋ฐ์ดํฐ ๋ฒํผ์ ์ฝ๋ฑ(Data Buffers and Codecs) ์น์
์์ ๋ ์์ธํ ์ค๋ช
ํ๋ค.
์ค์ํ ์ ์ ๋คํฐ(Netty)์ ๊ฐ์ ์ผ๋ถ ์๋ฒ์์๋ ๋ฐ์ดํธ ๋ฒํผ๋ฅผ ๋ฉ๋ชจ๋ฆฌ ํ์ ์ฌ์ฉํ์ฌ ์ฒ๋ฆฌํ๊ณ ์ฐธ์กฐ๋ฅผ ์นด์ดํธํ๊ธฐ ๋๋ฌธ์ ๋ฉ๋ชจ๋ฆฌ ๋์๋ฅผ
ํผํ๋ ค๋ฉด ์๋น(consume)ํ ๋ค์์๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํด์ ํด์ผ ํ๋ค๋ ๊ฒ์ด๋ค.
๋คํฐ์์ ์ ํ๋ฆฌ์ผ์ด์
์ ์คํํ๋ ๊ฒฝ์ฐ, ์
๋ ฅ ๋ฐ์ดํฐ ๋ฒํผ๊ฐ ํด์ ๋์ง ์๊ณ ์ ์งํด์ผ ํ๋ค๋ฉด DataBufferUtils.retain(dataBuffer)๋ฅผ
์ฌ์ฉํ๊ณ , ๋ฒํผ์ ์๋ ๋ฐ์ดํฐ๋ฅผ ์๋นํ๋ค๋ฉด DataBufferUtils.release(dataBuffer)๋ฅผ ํธ์ถํด์ผ ํ๋ค.
3.2.4. Handshake
WebSocketHandlerAdapter๋ WebSocketService์ ์ฒ๋ฆฌ๋ฅผ ์์ํ๋ค. ๊ธฐ๋ณธ ๊ตฌํ์ฒด HandshakeWebSocketService๋
์น์์ผ ์์ฒญ์ ๋ํ ๊ธฐ๋ณธ์ ์ธ ๊ฒ์ฌ๋ฅผ ํ ๋ค์์ ๊ตฌ๋์ค์ธ ์๋ฒ์ ๋ํด์ RequestUpgradeStrategy๋ฅผ ์ฌ์ฉํ๋ค.
ํ์ฌ ๋ฆฌ์กํฐ ๋คํฐ(Reactor Netty), ํฐ์บฃ(Tomcat), ์ ํฐ(Jetty) ๊ทธ๋ฆฌ๊ณ ์ธ๋ํ ์ฐ(Undertow)๋ฅผ ๊ธฐ๋ณธ์ ์ผ๋ก ์ง์ํ๋ค.
HandshakeWebSocketService๋ sessionAttributePredicate ์์ฑ์ ๊ฐ์ง๊ณ ์์ผ๋ฉฐ, Predicate<String>๋ฅผ ์ค์ ํ์ฌ
WebSession์ผ๋ก๋ถํฐ ์์ฑ์ ์ถ์ถํ๊ณ WebSocketSession์ ์์ฑ์ผ๋ก ์
๋ ฅํ๋ค.
3.2.5. ์๋ฒ ์ค์ (Server Configuration)
๊ฐ ์๋ฒ์ RequestUpgradeStrategy ๊ตฌํ์ฒด๋ฅผ ์ด์ฉํ์ฌ ์น์์ผ ์์ง ๊ด๋ จํ ์น์์ผ ๊ด๋ จ ์ค์ ์ ํ ์ ์๋ค. ๋ค์ ์์ ๋ ํฐ์บฃ์์ ์คํ๋๋
์น์์ผ ์ต์
์ ์ค์ ํ๋ค:
Java:
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
Kotlin:
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
์ฌ์ฉ ๊ฐ๋ฅํ ์ต์ ์ ํ์ธํ๋ ค๋ฉด ์๋ฒ์ ์ ๊ทธ๋ ์ด๋ ์ ๋ต(upgrade strategy)์ ํ์ธํ๋ผ. ํ์ฌ ํฐ์บฃ๊ณผ ์ ํฐ๋ง์ด ์ด ์ต์ ์ ์ ๊ณตํ๋ค.
3.2.6. CORS
CORS๋ฅผ ์ค์ ํ๊ณ ์น์์ผ ์๋ํฌ์ธํธ๋ก์ ์ ๊ทผ์ ์ ํํ๋ ๊ฐ์ฅ ๊ฐ๋จํ ๋ฐฉ๋ฒ์ WebSocketHandler๊ฐ CorsConfigurationSource๋ฅผ
๊ตฌํํ์ฌ ํ์ฉํ origin, ํค๋ ๊ทธ๋ฆฌ๊ณ ๋ค๋ฅธ ์์ธ ์ค์ ๋ฑ์ ๊ฐ์ง CorsConfiguration์ ๋ฐํํ๋ ๊ฒ์ด๋ค. ๋ง์ผ ์ด๋ ๊ฒ ํ ์ ์๋ค๋ฉด,
SimpleUrlHandler์ corsConfigurations ์์ฑ์ URL ํจํด ๋ณ๋ก CORS ์ค์ ์ ๋ฃ์ ์ ์๋ค. ๋ง์ผ ๋ ๋ฐฉ๋ฒ ๋ชจ๋ ์ฌ์ฉํ๋ค๋ฉด,
CorsConfiguration์ comine ๋ฉ์๋์์ ๋ ์ค์ ์ ๊ฒฐํฉ๋๋ค.
3.2.7. ํด๋ผ์ด์ธํธ(Client)
์คํ๋ง ์นํ๋ญ์ค๋ ๋ฆฌ์กํฐ ๋คํฐ(Reactor Netty), ํฐ์บฃ(Tomcat), ์ ํฐ(Jetty), ์ธ๋ํ ์ฐ(Undertow) ๊ทธ๋ฆฌ๊ณ ํ์ค ์๋ฐ(JSR-356)์
๋ํ ๊ตฌํ์ฒด๋ก WebSocketClient ์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํ๋ค.
์น์์ผ ์ธ์
์ ์์ํ๊ธฐ ์ํด ํด๋ผ์ด์ธํธ์ ์ธ์คํด์ค๋ฅผ ์์ฑํ๊ณ ํด๋น execute ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ค.
Java:
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
Kotlin:
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
์ ํฐ(Jetty)์ ๊ฐ์ LifeCycle ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ ์ผ๋ถ ํด๋ผ์ด์ธํธ๋ ์ฌ์ฉํ๊ธฐ ์ ์ ์ค์งํ๊ณ ์์ํด์ผ ํ๋ค. ๋ชจ๋ ํด๋ผ์ด์ธํธ๋ ๊ธฐ๋ณธ์ ์ผ๋ก
๊ธฐ๋ณธ ์น์์ผ ํด๋ผ์ด์ธํธ์ ์ค์ ๊ณผ ๊ด๋ จ๋ ์์ฑ์ ์ต์
์ด ์๋ค.
๋ชฉ์ฐจ ๊ฐ์ด๋