Spring webSocket with stomp 기본 개념 정리
web/websocket

Spring webSocket with stomp 기본 개념 정리

반응형

 

웹 소켓 관련하여 정리가 필요하여 spring document 내용을 발췌 하여 필요한 부분만 정리했다.

공유용 보다는 개인적인 정리용으로 크게 도움이 되지 않을 수 있다.

출처 : https://docs.spring.io/spring-framework/docs/5.2.6.RELEASE/spring-framework-reference/web.html#websocket

 

웹 소켓 소개

웹소켓 프로토콜은 웹 애플리케이션을 위한 새로운 기능으로써 클라이언트 양방향 통신의 오랜 역사를 가지고 있다. HTTP와는 다른 TCP 프로토콜이지만 HTTP에서 동작가능하게 디자인 되었고 80, 443 포트를 사용하며 방화벽규칙을 재사용할 수 있도록 되어있다.

 

일반 HTTP 요청에 Upgrade 헤더를 포함한 reqeust를 전송하면 WebSocket protocol로 변환되며 WebSocket interaction이 시작된다.

[요청]

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket             ---- 1
Connection: Upgrade            ---- 2
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

1. Upgrade 헤더

2. Upgrade connection 사용

위와 같은 요청을 받은 웹 소켓을 지원하는 서버는 일반적으로 200 상태 코드 대신에 아래와 같은 응답을 지원한다.

 

[응답]

HTTP/1.1 101 Switching Protocols     ---- 1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

1. 프로토콜 스위치

 

http upgrade 요청을 받아 성공적인 핸드쉐이크가 성공한 이후에 tcp 소켓은 클라이언트와 계속 열어두고 메시지를 주고 받는다.

만약 websocket이 웹서버(nginx)에서 실행될 경우 websocket upgrade request를 웹 소켓 서버로 전달할 수 있도록 설정해줘야한다. 마찬가지로 만약에 어플리케이션이 클라우드 환경에서 동작할 때 클라우드가 웹소켓을 지원하는지 여부를 확인해야한다.

 

 

 

Http와 대비되는 WebSocket

웹소켓이 비록 Http와 비슷하게 설게되었고 Http request로 시작했을지라도 두개의 시스템은 서로 다른 아키텍쳐를 가지고 있고 다른 애플리케이션 프로그래밍 모델을 따르고 있다.

 

Http 그리고 Rest에서 애플리케이션은 많은 url을 가지고 있다. 애플리케이션과 클라이언트가 통신하기 위해서는 url에 요청을 보내야하고 request-response 형태를 띄고 있다. 서버는 Http Url, 메서드 및 헤더를 기반으로 요청을 적절한 handler에 라우팅 시킨다.

 

반대로 웹소켓은 초기 연결을 위한 하나의 연결만 사용한다. 그후에 모든 애플리케이션 메시지는 같은 tcp connection을 가지고 주고 받게 된다. 이것은 완전히 다른 비동기식 이벤트 기반 메시징 아키텍처이다.

 

웹소켓은 또한 http와 다르게 메시지 내용의 규정을 정의하지 않는 low level 전송 프로토콜입니다. 그래서 클라이언트와 서버가 미리 정의한 메시지 규약없이는 메시지가 라우팅 되거나 처리되지 못한다.

 

웹소켓 클라이언트와 서버는 Http 핸드쉐이크 요청에서 Sec-WebSocket-Protocol을 사용해서 STOMP와 같은 높은 레벨의 메시징 프로토콜을 사용할 수 있다. 이런 방법이 아니라면 별도의 다른 규약이 필요하다.

 

 

 

SockJS Fallback

WebSocket을 먼저 사용하고 여러 인터넷 등의 상황으로 upgrade 헤더 요청을 보내지 못할 경우등에 사용할 fallback으로 Http 기반의 기술로 WebSocket interaction을 에뮬레이트하고 같은 애플리케이션 api를 노출하는 방식

 

Servlet 스택에서 Spring Framework는 SockJS 프로토콜에 대한 서버(및 클라이언트) 지원을 모두 제공한다.

주요 목적은 서버가 웹소켓 사용이 불가능할 경우에 사용하기 위함

 

 

 

웹소켓에 대한 기본 설정

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler")
	        // interceptor
            .addInterceptors(new HttpSessionHandshakeInterceptor())
            // 허용 도메인
            .setAllowedOrigins("https://mydomain.com");
            // fallback
            .withSockJS();
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

	// websocket 관련 설정
	@Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

}

 

 

HartBeats

SockJs 프로토콜은 서버에 heartbeats 메시지를 보내서 서버가 hang이 걸리는것을 방지한다. Spring SockJS에는 hartbeatTime이라는 속성이 있는데 이 속성을 이용해서 빈도수를 custom 할 수 있다. 기본적으로 heartbeats의 기본은 어떠한 메시지가 연결에 보내지지 않았다는 가정하에 25초이다. 

 

WebSocket 그리고 SockJs를 사용하여 STOMP를 사용할 때 만약 STOMP 클라이언트와 서버의 heartbeats 협약이 변경될 경우 SockJS heartbeats는 무시된다.

 

또한 Spring SockJS는 TaskScheduler를통해 heartbeats task를 스케줄화 할 수 있다. task 스케줄러는 thread pool에 의해 관리되며 기본적으로 가용가능한 프로세서의 수만큼 동작한다. 필요에 의해서 커스터마이징이 가능하다.

 

또한 Spring SockJS는 기본값 25초의 heartbeats를 서버에 보내서 서버의 연결없는 상태를 heartbeats주기나 데이터 송수신 시 알 수 있다.

 

 

Stomp (스트리밍 텍스트 지향 메시지 프로토콜)

[주요 특징]

  • @Controller → @MessageMapping으로 연결한 후 브로커에다가 보내는데 브로커는 메모리도 가능하고 RabbitMQ, ActiveMq등도 사용이 가능하다.
  • spring은 브로커에 대한 tcp 연결을 유지하고 연결된 websocket client에게 메시지를 전달한다.
  • client는 메시지를 받고 또 메시지를 수신한다.
  • client에서 메시지를 보내면 @MessageMapping에서 받아서 처리한다.
  • 메시지를 받을 endpoint는 /endpoint/..., /endpoint/** 등을 지원한다.
  • 서버의 모든 메시지는 특정 클라이언트 구독에 대한 응답이어야 하며 서버 메시지의 subscription-id 헤더는 클라이언트 구독의 id 헤더와 동일해야한다.

 

[장점]

  • raw websocket보다 더 많은 프로그래밍 모델을 지원
  • 여러 브로커(카프카, 등등)을 사용가능
  • spring framework를 사용하면 사용가능
  • 메시지 포맷을 정할 필요가 없다.
  • 애플리케이션 로직은 여러 @Controller 인스턴스로 구성될 수 있으며 주어진 연결에 대해 단일 WebSocketHandler를 사용하여 원시 WebSocket 메시지를 처리하는 대신 STOMP 대상 헤더를 기반으로 메시지를 라우팅할 수 있습니다.

 

[기본 설정]

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker -- 1
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();  -- 2
    }

		// stotmp
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app"); -- 3
        config.enableSimpleBroker("/topic", "/queue"); -- 4
    }
}
  1. @EnableWebSocketMessageBroker를 통해 메시지 플로우를 모으기 위해 컴포넌트를 구성합니다.
  2. handshake endpoint
  3. 목적 headerrk /app으로 시작되는 stomp message는 @Controller클래스 내부에 @MessageMapping 메소드로 라우팅 된다. 결국 여기서도 broker로 메시지를 전달
  4. /topic, /queue로 시작하는 메시지를 브로커로 라우팅

 

[built-in simple message broker flow chart]

- 메모리 베이스 simple broker 사용했을 때 차트

# simple broker는 클라이언트에서 전달 받은 subscription을 메모리에 저장하고 연결된 client에게 메시지를 보내는 역할을 한다.

  • clientInboundChannel: 웹소켓 클라이언트로 받은 데이터를 받기 위해서 사용하는 채널
  • clientOutboundChannel: 클라이언트에게 메시지를 보내기 위한 채널
  • brokerChannel: 어플리케이션에서 브로커에게 메시지를 전송하기 위해 사용하는 채널

 

[external broker flow chart]

- RabbitMQ와 같은 외부 broker 사용 시 차트

두 개 다이어그램의 차이는 tcp를 통해 외부 stomp 브로커 메시지를 전달하고 브로커에게 구독된 클라이언트로 메시지를 전달하기 위해 'broker-relay'를 사용한다는 것이다.

 

메시지를 클라이언트에게 전달받으면 stomp frame으로 디코딩 하고 spring Message형태로 변환한 후 clientInboundChannel로 다음 프로세스를 진행하기 위해 전달한다.

 

예를들어 STOMP 메시지 /app으로 시작하는 destination header를 가지고 들어오는 요청은 Controller 애노테이션 되어있는 클래스 내부에 @MessageMapping메소드가 할당곳으로 매핑되고 반면에 /topic, /queue의 요청을 가지고 들어오게 되면 바로 message broker에게 전달 된다.

 

클라이언트에게 전달받은 stomp 메시지를 다루는 @Controller 애노테이션은 메시지를 broker channel을 통해서 메시지 브로커에게 메시지를 전달하고 브로커는 clientOutboundChannel을 통해 메시지를 매칭되는 subscriber에게 값을 전달한다.

 

동일한 컨트롤러 내에서 http 요청을 받을 수 있기 때문에 요청을 받고 브로커에게 메시지 응답을 보내서 가입된 클라이언트에 브로드 캐스트 할 수 있다.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic")
	        .setTaskScheduler(taskScheduler())
            // 안쓰면 그냥 10s 씩 (inbound, outbound)
            .setHeartbeatValue(new long[] {3000L, 3000L});
    }
    
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.initialize();
        return taskScheduler;
    }
}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }
}
  1. localhost:8080/portfolio를 클라이언트에서 연동하여 client와 websocket이 연결되면 stomp frame은 그 커넥션 위에서 동작하게 된다.
  2. 클라이언트가 /topic/greeting header 목적지에 SUBSCRIBE frame을 전송한다. 수신되고 decode된 메시지는 clientInboundChannel에 전송되고 메시지 브로커에게 route되고 client subscription에 저장된다.
  3. client가 SEND frame을 /app/greeting에 보낸다. /app prefix는 controller애노테이션된곳에 요청을 라우트 하고 /app prefix없이 @MessageMapping에 /greeting이 걸려있는 곳에 메시지 요청이 매핑된다.
  4. Gretting Controller로 부터 반환된 값은 반환값과 /topic/greeting의 기본 대상 헤더를 기반으로 하는 페이로드가 포함된 spring message로 전환된다.
  5. 메시지 브로커는 매칭된 모든 subscriber에게 MESSAGE frame을 clientOutboundChannel을 통해서 전송하고 STOMP frame에 인코딩 된 상태로 client에게 전송된다.

 

 

 

Annotated Controllers

클라이언트로 부터 받은 메시지를 다루기 위해 classes에 @Controller 애노테이션을 달수 있다. 각각의 클래스들은 @MessageMapping, @SubscribeMapping, @ExceptionHandler 메소드를 선언할 수 있다.

 

@MessageMapping

  • 목적지를 기반으로 메시지를 라우팅하는 메소드에 annotaion을 달 수 있다. 메소드와 type level에서 annotation 할 수 있다.
  • 여러 /app/**, @DestinationVariable /app/{id} 등을 사용할 수 있다.
  • 여러 Method arguement 등을 사용할 수 있다.
  • 반환 타입
    • 일반적으로 MessageConverter를 사용하여 serialized된 응답 형태가 반환되고 Messagesms brokerChannel에 전달된다. outbound 메시지는 바로 브로커에게 전달하는 /topic에 경우에는 동일한 형태로 값을 전달 받는다.
    • @SendTo, @SendToUser 애노테이션을 사용해서 output message에 목적지를 커스터마이징 할 수 있다. @SendTo는 목적지를 커스터마이징 하기위해서 사용되거나 여러 목적지에 값을 전달하기 위해 사용된다. @SendToUser는 inputMessage를 보낸 user에게만 응답을 보내기 위해서 사용된다.
    • @SendTo, @SendToUser를 같이 사용할 수 있고 클래스 레벨에서도 사용할 수 있으며 클래스 레벨에 있던 애노테이션은 메소드 레벨에서 재정의 될 수 있다.
    • Message는 비동기로 다뤄지고 @MessageMapping은 ListenableFuture, CompletableFuture, CompletionStage를 반환한다.
    • 사실 @SendTo와 @SendToUser는 SimpleMessagingTemplate를 사용하는것과 다르지 않다.

 

@SubscribeMapping

  • @MessageMapping과 비슷하지만 구독 메시지를 받는데 한정된 매핑만을 지원한다. 지원하는 argument는 @MessageMapping과 동일하지만 기본적으로 메시지는 다이렉트로 client에게 전송되며 broker에게 전송하지 않는다.
  • /topic, /queue가 브로커에 매핑되어있고 application 컨토롤러에 /app이 매핑되어 있다고 가정해보자. 브로커는 반복적으로 브로드 캐스트하는 두개 엔드포인트로 오는 subscription을 저장하고 애플리케이션은 작업을 따로 할게 없다. 클라이언트는 또한 /app 목적지를 구독할 수 있으며 컨트롤러에서 broker에 저장하거나 구독을 다시 사용하는 등에 작업 없이 응답을 바로 보낼 수 있다. ui 초기 데이터 등에 사용될 때 유용하다.
  • 어떤 이유로 구독을 포함하여 메시지를 둘 다 독립적으로 처리하지 않으려면 브로커와 컨트롤러를 동일한 대상 접두사에 매핑하지 말아야한다. 들어온 메시지는 병렬로 들어온다. broker와 controller에 들어오는 메시지의 순서가 보장되지 않기 때문에 만약 subscription이 준비가 되어 brodcast되었다는 노티가 필요할 경우 클라이언트는 서버가 준비되었는지 물어볼 수 있다.

 

@MessageExceptionHandler

  • @MessageMapping에서 발생한 에러를 핸들링 하기 위해서 사용된다. Exception을 파라미터로써 사용할 수 있다.
  • @Controller
    public class MyController {
    
        // ...
    
        @MessageExceptionHandler
        public ApplicationError handleException(MyException exception) {
            // ...
            return appError;
        }
    }
  • @MessageExceptionHandler 메소드는 flexible한 메소드 시그니쳐를 지원하고 같은 메소드 파라미터 타입과 반환 타입을 지원한다.
  • 일반적으로 @MessageExceptionHandler 메소드는 @Controller 클래스내부에서 동작한다. 만약 전역적으로 적용하기 위해서는 Spring Mvc와 비슷하게 @ControllerAdvice를 적용할 수 있다.

 

 

메시지 순서

  • setPreservePublishOrder를 통해 순서대로 메시지를 받을 수 있다.

 

 

 

User Destination

  • 애플리케이션은 specfic user에게 메시지를 전송할 수 있고 Spring stomp는 prefix /user를 통해서 지원한다. 예를 들어 클라이언트가 /user/queue/position-updates를 subscribe 하였을 때 이 destination은 UserDestinationMessageHandler를 통해 다뤄지고 특별한 유저 세션으로 destination이 변경된다. (ex. /queue/position-updates-user-123) 이를 통해서 고유의 요청을 전달할 수 있어 각 유저간의 요청의 충돌을 막을 수 있다.
  • 보내는쪽에서는 /user/{username}/queue/position-updates와 같이 보내면 된다. 이를 통해 응용프로그램 내의 모든 구성 요소는 이름과 일반 대상외에는 알 필요 없이 특정 사용자를 대상으로 하는 메시지를 서비스 할 수 있다. 또한 annotation과 messaging 템플릿을 통해 사용할 수 있다.
  • message handling 메소드에서 @SendToUser를 통해서 specific한 유저에게 응답을 보낼 수 있다.
  • @Controller
    public class PortfolioController {
    
        @MessageMapping("/trade")
        @SendToUser("/queue/position-updates")
        public TradeResult executeTrade(Trade trade, Principal principal) {
            // ...
            return tradeResult;
        }
    }
  • 만약 기본값으로 유저가 하나이상의 세션을 가지게 되었을 때, 모든 세션이 응답을 받게 되는데 brodcast option을 false로 지정하면 오직 하나의 세션에만 응답이 가게 할 수 있다.
  • @Controller
    public class MyController {
    
        @MessageMapping("/action")
        public void handleAction() throws Exception{
            // raise MyBusinessException here
        }
    
        @MessageExceptionHandler
        @SendToUser(destinations="/queue/errors", broadcast=false)
        public ApplicationError handleException(MyBusinessException exception) {
            // ...
            return appError;
        }
    }

 

 

 

 

Perfermance

  • 메시징 어플리케이션에서 메시지는 쓰레드풀 내에서 비동기로 처리된다.
  • clientInboundChannel, clientOutboundChannel thread pool이 필요하고 기본적으로 가용한 프로세서의 두배가 기본설정이다.
  • 만약에 어노테이션이 달리 메소드에서 주로 메시지 처리가 될경우에는 clientInboundChannel 쓰레드 수는 프로세서 수에 가깝게 유지해야하고 이 작업들이 IO에 더많이 의존하고 데이터베이스나 외부 시스템에서 차단하거나 대기해야하는 경우에는 쓰레드풀을 늘려야한다.
  • 기본적으로 쓰레드 풀에는 3가지 주요 요점이 있는데 core thread pool, max thread pool, capacity for quque이다. 우선 core는 10, max는 20 으로 되어있고 queue capacity는 Integer.MAX_VALUE로 되어있어서 스레드 풀은 core thread pool 이상 넘어갈 수 없도 왜냐면 queue capacity가 커서 queue안에 요청이 들어가기 때문이다.
  • clientOutboundChannel 입장에서봤을 때 client로 보내는 전송 속도가 빠르면 프로세서와 동일한 수준으로 thread pool을 맞추면 되고 그게 아니라 느리다고 한다면 thread pool 사이즈를 느려야한다.
  • clientOutboundChannel에 sendTimeLimit, sendBufferSizeLimit를 두어서 전송 데이터와 크기등을 제한할 수 있다.
  • @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
    				// 시간과 보내는 버퍼 사이즈 제한
            registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    				// 전송받고 보낼 메시지 사이즈 제한
    				registration.setMessageSizeLimit(128 * 1024);
        }
    
        // ...
    
    }
반응형