web/websocket

SimpleBroker로 알아보는 WebSocket Massage Broker

반응형

Simple Message Broker

simple broker는 클라이언트에서 전달 받은 subscription을 메모리에 저장하고 연결된 client에게 메시지를 보내는 역할을 한다.

하지만 문서에 보면 알수 있듯이 제공하는 일부의 기능만 있으며 간단한 메시지 전송 루프에 의존하며 클러스터링에 적합하지 않다. 이에 대한 대한으로 RabbitMQ와 같은 여러 message broker를 사용할 수 있다.

The simple broker is great for getting started but supports only a subset of STOMP commands (it does not support acks, receipts, and some other features), relies on a simple message-sending loop, and is not suitable for clustering. As an alternative, you can upgrade your applications to use a full-featured message broker.

 

stomp broker relay는 Spring MessageHandler로 메시지를 message broker에게 전달하는 역할을 한다. 그렇게 하기 위해 tcp 연결이 성립되고 나면 모든 메시지는 broker에게 전달하고 다시 broker에게 전달받은 메시지를  WebSocket session을 통해서 클라이언트에게 보낸다. 이 작업을 relay라고 하고 broker <-> messageHandler <-> clinet 양방향으로 진행된다.


 

 

Simple Message Broker enable

SimpleMessage broker를 사용하기 위해서는 아래와 같이 @Configuration과 @EnableWebSocketMessageBroker를 anntated하고 WebSocketMessageBrokerConfigurer를 상속한 클래스에서 configureMessageBroker를 overrride하여 MessageBrokerRegistry에 enableSimpleBroker를 등록하면서 simple broker를 사용하게 할 수 있다.

 

이때 등록하는 SimpleBrokerRegistration은 heartbeat interval time과 heartbeat를 체크할 scheduler를 등록할 수 있다.

package com.wedul.websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/gs-guide-websocket").withSockJS();
    }
}

 

등록된 후 애플리케이션이 실행시키면 AbstractMessageBrokerConfiguration가 올라오면서 simpleBrokerMessageHandler를 bean으로 등록하는 과정에서 아까 만든 SimpleBrokerRegistration을 사용해서 설정한 heartbeats, scheduler등을 사용해서  SimpleBrokerMessageHandler를 생성해서 반환한다. 그럼 이 MessageHandler는 빈으로 등록이 된다. (MessageHandler에 역할은 위에 참조)


 

 

SimpleBrokerMessageHandler

실질적으로 Broker내부에서 동작하는 역할을 하는 MessageHandler의 동작방식을 SimpleBrokerMessageHandler의 코드를 보면서 파악해보자.

 

 

1. HeartBeat Task Scheduler 등록

heartBeat 값은 log array 형태로 값을 받는데 첫번재는 서버쪽 두번째는 클라이언트쪽을 의미한다. 만약 heatbeatValue가 null이거나 두 가지 값 모두 0일경우 해당 스케줄러는 실행되지 않는다.

 

그리고 initHeartbeatTaskDelay를 보면 알 수 있듯이 두가지 값중에 작은 값이 interval로 사용된다.

이 interval은 주기적으로 schedule을 돌면서 세션들에 마지막 read, write 타임을 체크하면서 각 세션 클라이언트 커넥션 시 Header에 포함되어 왔던 heartbeats의 타임아웃 주기랑 비교하면서 설정된 주기이상으로 요청이 없었을 시 타임아웃 시키고 그게 아니라면 pong응답을 보낸다. (client도 주기적으로 connection 연결을 위해 Ping을 보낸다. client에서 ping을 보내는걸 없애면 실제 데이터가 주고 받고 하지 않는 이상 task scheduler에 의해 끊어지게 된다.)

 

@Override
public void startInternal() {
	publishBrokerAvailableEvent();
	if (this.taskScheduler != null) {
		long interval = initHeartbeatTaskDelay();
		if (interval > 0) {
			this.heartbeatFuture = this.taskScheduler.scheduleWithFixedDelay(new HeartbeatTask(), interval);
		}
	}
	else {
		Assert.isTrue(getHeartbeatValue() == null ||
				(getHeartbeatValue()[0] == 0 && getHeartbeatValue()[1] == 0),
				"Heartbeat values configured but no TaskScheduler provided");
	}
}

private long initHeartbeatTaskDelay() {
	if (getHeartbeatValue() == null) {
		return 0;
	}
	else if (getHeartbeatValue()[0] > 0 && getHeartbeatValue()[1] > 0) {
		return Math.min(getHeartbeatValue()[0], getHeartbeatValue()[1]);
	}
	else {
		return (getHeartbeatValue()[0] > 0 ? getHeartbeatValue()[0] : getHeartbeatValue()[1]);
	}
}

@Override
public void stopInternal() {
	publishBrokerUnavailableEvent();
	if (this.heartbeatFuture != null) {
		this.heartbeatFuture.cancel(true);
	}
}

interval 이후에 실행되는 task는 마지막 요청 시간과 현재 시간과의 interval이 heartbeat로 설정한 Interval 보다 클 경우 세션을 종료시키는 작업을 진행한다.

 

private class HeartbeatTask implements Runnable {

	@Override
	public void run() {
		long now = System.currentTimeMillis();
		for (SessionInfo info : sessions.values()) {
			if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) {
				handleDisconnect(info.getSessionId(), info.getUser(), null);
			}
			if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) {
				SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
				accessor.setSessionId(info.getSessionId());
				Principal user = info.getUser();
				if (user != null) {
					accessor.setUser(user);
				}
				initHeaders(accessor);
				accessor.setLeaveMutable(true);
				MessageHeaders headers = accessor.getMessageHeaders();
				info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
			}
		}
	}
}

 

 

2. handleMessageInternal

client로 부터 들어온 메시지를 broker에서 처리하는 메소드이다.

@Override
protected void handleMessageInternal(Message<?> message) {
	// 1
	MessageHeaders headers = message.getHeaders();
	String destination = SimpMessageHeaderAccessor.getDestination(headers);
	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);

	updateSessionReadTime(sessionId);

	// 2
	if (!checkDestinationPrefix(destination)) {
		return;
	}

	// 3
	SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
	if (SimpMessageType.MESSAGE.equals(messageType)) {
		logMessage(message);
		sendMessageToSubscribers(destination, message);
	}
	else if (SimpMessageType.CONNECT.equals(messageType)) {
		logMessage(message);
		if (sessionId != null) {
			long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
			long[] heartbeatOut = getHeartbeatValue();
			Principal user = SimpMessageHeaderAccessor.getUser(headers);
			MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
			this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
			SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
			initHeaders(connectAck);
			connectAck.setSessionId(sessionId);
			if (user != null) {
				connectAck.setUser(user);
			}
			connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
			connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
			Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
			getClientOutboundChannel().send(messageOut);
		}
	}
	else if (SimpMessageType.DISCONNECT.equals(messageType)) {
		logMessage(message);
		if (sessionId != null) {
			Principal user = SimpMessageHeaderAccessor.getUser(headers);
			handleDisconnect(sessionId, user, message);
		}
	}
	else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
		logMessage(message);
		this.subscriptionRegistry.registerSubscription(message);
	}
	else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
		logMessage(message);
		this.subscriptionRegistry.unregisterSubscription(message);
	}
}

 

1. 

- 들어온 message에 header를 추출하고 destination과 sessionId를 추출하고 sessionId의 readTime을 갱신한다. (heartBeat에서 사용)

- WebSocketMessageBrokerConfigurer에서 선언했던 setApplicationDestinationPrefixes에서 선언한 prefix가 있는지 확인한다. 이때, user destination(/user/**)이거나 prefix가 없으면 false를 반환하고 선언한 prefix를 달고 들어온 destination이면 true를 반환한다.

protected boolean checkDestinationPrefix(@Nullable String destination) {
	if (destination == null) {
		return true;
	}
	if (CollectionUtils.isEmpty(this.destinationPrefixes)) {
		return !isUserDestination(destination);
	}
	for (String prefix : this.destinationPrefixes) {
		if (destination.startsWith(prefix)) {
			return true;
		}
	}
	return false;
}

 

 

2.

header에서 message Type을 추출하는데 이 메시지 type은 STOMP simple message 프로토콜에서 사용하는 종류이다.

1) MESSAGE

- 메시지일 경우 해당 내용을 로깅하고 client에게 메시지를 보내는 sendMessageToSubscribers를 호출한다.

 

2) CONNECT

- 들어온 요청을 로깅하고 sessionId가 있을 경우 내부적으로 session을 관리하는 Map에 세션을 저장하고 client에 ACK 메시지를 전달한다.

 

3) DISCONNECT

- 들어온 요청을 로깅하고 sessionId를 찾아서 sessionMap에서 제거 하고 클라이언트에 Disconnect 메시지를 클라이언트에 전달한다.

 

4) SUBSCRIBE

- 들어온 요청을 로깅하고 sessionId, subscriptionId, destination정보를 이용하여 세션의 subscription 정보를 저장한다.

- session하나당 destination 별 여러 subscription으로 구성되어 있다. 

@Override
public final void registerSubscription(Message<?> message) {
	MessageHeaders headers = message.getHeaders();

	SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
	if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
		throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
	}

	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
	if (sessionId == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No sessionId in  " + message);
		}
		return;
	}

	String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
	if (subscriptionId == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No subscriptionId in " + message);
		}
		return;
	}

	String destination = SimpMessageHeaderAccessor.getDestination(headers);
	if (destination == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No destination in " + message);
		}
		return;
	}

	addSubscriptionInternal(sessionId, subscriptionId, destination, message);
}

@Override
protected void addSubscriptionInternal(
		String sessionId, String subscriptionId, String destination, Message<?> message) {

	boolean isPattern = this.pathMatcher.isPattern(destination);
	Expression expression = getSelectorExpression(message.getHeaders());
	Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);

	this.sessionRegistry.addSubscription(sessionId, subscription);
	this.destinationCache.updateAfterNewSubscription(sessionId, subscription);
}

public void addSubscription(String sessionId, Subscription subscription) {
	SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
	info.addSubscription(subscription);
}

 

5) UNSUBSCRIBE

- 4번의 SUBSCRIBE와 반대로 sessionId, subscriptionId, destination 정보를 이용해서 세션에서 subscription을 제거한다. 

 

 

 

3. sendMessageToSubscribers

들어온 MESSAGE에서 sessionId와 path(destination)로 subscriptionIds를 찾고 모든 subscription에 메시지를 전달한다.

protected void sendMessageToSubscribers(@Nullable String destination, Message<?> message) {
	MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
	if (!subscriptions.isEmpty() && logger.isDebugEnabled()) {
		logger.debug("Broadcasting to " + subscriptions.size() + " sessions.");
	}
	long now = System.currentTimeMillis();
	subscriptions.forEach((sessionId, subscriptionIds) -> {
		for (String subscriptionId : subscriptionIds) {
			SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
			initHeaders(headerAccessor);
			headerAccessor.setSessionId(sessionId);
			headerAccessor.setSubscriptionId(subscriptionId);
			headerAccessor.copyHeadersIfAbsent(message.getHeaders());
			headerAccessor.setLeaveMutable(true);
			Object payload = message.getPayload();
			Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
			SessionInfo info = this.sessions.get(sessionId);
			if (info != null) {
				try {
					info.getClientOutboundChannel().send(reply);
				}
				catch (Throwable ex) {
					if (logger.isErrorEnabled()) {
						logger.error("Failed to send " + message, ex);
					}
				}
				finally {
					info.setLastWriteTime(now);
				}
			}
		}
	});
}

 

 

반응형