Redis pub/sub 기능을 이용한 여러 서버 메시지 관리
web/websocket

Redis pub/sub 기능을 이용한 여러 서버 메시지 관리

반응형

지금까지 SimpleMessageBroker, Stomp등 기본적인 것들에 대해 많이 공부를 했다. 그럼 실제 사례를 가정해보고 코드를 만들어 보자.

 

사례

고객이 주문을 하고 배달 시작을 누르면 기본시간을 받고 변경된 시간을 주기적으로 업데이트 받으며 같은 id로 로그인한 다른 디바이스에서도 subscribe시 같은 배달 예상시간을 안내받을 수 있도록 하고 배달이 종료되면 alert을 내려주는 코드를 만들어보자.

1. client에서 stomp서버로 connect를 한다.

2. user별 메시지를 받을 수 있는 /user/topic/data, 모든 broadcast message를 받을 수 있는 /topic/message를 subscribe한다.

3. 초기에 connect(주문시작)되면 redis에 저장된 initial data를 전달한다. 만약 주문 시작 이후에 사용자가 다른 디바이스로 접속 시에는 initial data를 현재 진행중인 주문의 시간을 전달받는다.

4. worker가 없어서 terminal로 redis의 channel topic에 user별 변경 된 배달 예상시간 정보를 전송한다.

5, 6. redis에 channel topic을 subscribe하는 listener에서 전달받은 user의 업데이트 시간을 session에 전달한다. 이때 user별로 변경된 시간을 각자 받을 수 있기 때문에 server에 연결된 user의 session 모두에 값이 전달된다.

7. 배달이 완료된 사용자에게 알람 메시지를 전달한다.


 

 

설정

WebSocketBrokerConfig

우선 websocket 사용을 위한 WebSocketMessageBrokerConfigurer를 구현해야한다. 우선 stomp websocket의 경우에 handshake도중에 custom header를 사용할 수 있는 방법이 없기 때문에 ChannelInterceptor를 통해서 유저 정보를 인증해야한다. 나는 여기서 configureClientInboundChannel 에서 CONNECT를 맽는 순간에 nativeHeader user정보를 뽑아서 Principal을 세션에 지정해줬다. 나머지 설정 enableSimpleBroker, setApplicationDestinationPrefixes, setUserDestinationPrefix등은 이전에 공부했던 부분과 동일하게 설정했다.

package com.wedul.websocket.config;

import com.wedul.websocket.handler.CustomHandShakeHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import java.util.ArrayList;
import java.util.List;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        //
        config.enableSimpleBroker("/topic", "/direct")
            .setTaskScheduler(taskScheduler())
            // 안쓰면 그냥 10s 씩 (inbound, outbound)
            .setHeartbeatValue(new long[] {3000L, 3000L});
        // client -> server로 보내는 endpoint의 기본 prefix
        config.setApplicationDestinationPrefixes("/app");
        // specific user endpoint prefix (default /user)
        config.setUserDestinationPrefix("/user");
    }

    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.initialize();
        return taskScheduler;
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/gs-guide-websocket")
                .setHandshakeHandler(new CustomHandShakeHandler())
                .withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    String user = accessor.getFirstNativeHeader("user");
                    if (user != null) {
                        List<GrantedAuthority> authorities = new ArrayList<>();
                        authorities.add(new SimpleGrantedAuthority("ROLE_USER"));
                        Authentication auth = new UsernamePasswordAuthenticationToken(user, user, authorities);
                        SecurityContextHolder.getContext().setAuthentication(auth);
                        accessor.setUser(auth);
                    }
                }

                return message;
            }
        });
    }

}

 

RedisConfiguration

yml에 있는 정보를 이용해서 스프링 기본 redis Lattuce를 사용해서 RedisConnectionFactory를 만들어주고 channel topic을 듣는 리스너를 등록해준다.

package com.wedul.websocket.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Getter
@Setter
@ConfigurationProperties("spring.redis")
public class RedisProperties {

    private String host;
    private int port;
}
package com.wedul.websocket.config;

import com.wedul.websocket.handler.RedisSubscriber;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfiguration {

    private static final String TOPIC_NAME = "channel";

    @Bean
    public RedisConnectionFactory redisConnectionFactory(RedisProperties redisProperties) {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(redisProperties.getHost());
        redisStandaloneConfiguration.setPort(redisProperties.getPort());
        return new LettuceConnectionFactory(redisStandaloneConfiguration);
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisSubscriber redisSubscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(redisSubscriber, createTopic());
        return container;
    }

    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }

    private Topic createTopic() {
        return new ChannelTopic(TOPIC_NAME);
    }

}

 

메시지

DTO

client, stomp server, redis와 주고 받을 message로 사용할 DTO 

package com.wedul.websocket.dto;

import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
public class SubscribeDto {

    private SubscribeType type;
    private String name;
    private int time;
    private String message;

    @Builder
    public SubscribeDto(SubscribeType type, String name, int time, String message) {
        this.type = type;
        this.name = name;
        this.time = time;
        this.message = message;
    }
}

SubscribeType에 따라서 BROAD_CAST 메시지인지 specific user에게 보내는 메시지인지 정해진다.

package com.wedul.websocket.dto;

import lombok.Getter;

@Getter
public enum SubscribeType {
    BROAD_CAST,
    USER;
}

 

 

handler

WebSocketSubscribeHandler를 사용해서 subscribe가 발생했을 때 event listen을 할 수 있도록 하는 코드를 추가했다. 그 중 사용자 배달 예상시간을 subscribe하는 destination(/user/topic/data)에 대해 subscribe가 발생되었을 때 사용자별 배달 예상시간을 저장하고 있는 redis에서 해당 사용자의 배달 예상시간을 꺼내서 전달해주도록 했다. (없을 경우 20분으로 지정) 

/*
 * Copyright 2014-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.wedul.websocket.config;

import com.wedul.websocket.handler.WebSocketSubscribeHandler;
import com.wedul.websocket.handler.WebSocketUnSubscribeHandler;
import com.wedul.websocket.repository.UserDeliveryTimeService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import javax.websocket.Session;

/**
 * These handlers are separated from WebSocketConfig because they are specific to this
 * application and do not demonstrate a typical Spring Session setup.
 *
 * @author Rob Winch
 */
@Configuration
public class WebSocketHandlersConfig<S extends Session> {

	@Bean
	public WebSocketSubscribeHandler<S> webSocketSubscribeHandler(UserDeliveryTimeService userDeliveryTimeService, SimpMessagingTemplate messageTemplate) {
		return new WebSocketSubscribeHandler<>(messageTemplate, userDeliveryTimeService);
	}

	@Bean
	public WebSocketUnSubscribeHandler<S> webSocketUnSubscribeHandler(UserDeliveryTimeService userDeliveryTimeService) {
		return new WebSocketUnSubscribeHandler<>(userDeliveryTimeService);
	}

}
/*
 * Copyright 2014-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.wedul.websocket.handler;

import com.wedul.websocket.dto.SubscribeDto;
import com.wedul.websocket.dto.SubscribeType;
import com.wedul.websocket.repository.UserDeliveryTimeService;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;

import java.security.Principal;
import java.util.List;
import java.util.Map;

public class WebSocketSubscribeHandler<S> implements ApplicationListener<SessionSubscribeEvent> {

	private SimpMessagingTemplate messageTemplate;
	private UserDeliveryTimeService userDeliveryTimeService;
	private static final String TARGET = "/topic/data";

	public WebSocketSubscribeHandler() {
		super();
	}

	public WebSocketSubscribeHandler(SimpMessagingTemplate messageTemplate, UserDeliveryTimeService userDeliveryTimeService) {
		this.userDeliveryTimeService = userDeliveryTimeService;
		this.messageTemplate = messageTemplate;
	}

	@Override
	public void onApplicationEvent(SessionSubscribeEvent event) {
		String source = String.valueOf(((List) ((Map) event.getMessage().getHeaders().get("nativeHeaders")).get("destination")).get(0));

		if (source.contains(TARGET)) {
			Principal user = event.getUser();
			if (user == null || user.getName() == null) {
				return;
			}
			messageTemplate.convertAndSendToUser(user.getName(), TARGET, SubscribeDto.builder()
					.type(SubscribeType.USER)
					.time(userDeliveryTimeService.getTime(user.getName(), 20))
					.build()
			);
		}
	}
}

 

RedisSubscriber

channel 토픽을 리슨하고 있는 MessageListener에 메시지가 들어왔을 때 TYPE이 BROAD_CAST이면 /topic/message에 message를 전달하도록 하고 USER일 때는 convertAndSendToUser를 통해서 특정 사용자에게 변경된 시간 정보를 전달한다. 그리고 이때 사용자별 배달 예상시간을 저장하고 있는 redis에 배달 예상시간을 저장한다. 이 값을 이용해서 새로운 device가 붙어도 사용자는 변경된 배예시 정보를 유지해서 받아볼 수 있다.

package com.wedul.websocket.handler;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.websocket.dto.SubscribeDto;
import com.wedul.websocket.dto.SubscribeType;
import com.wedul.websocket.repository.UserDeliveryTimeService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final SimpMessagingTemplate messageTemplate;
    private final UserDeliveryTimeService userDeliveryTimeService;

    @SneakyThrows
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String subscribeMessage = new String(message.getBody());
        SubscribeDto subscribe = objectMapper.readValue(subscribeMessage, SubscribeDto.class);
        log.info("[subscribe][message] {}", subscribeMessage);

        if (SubscribeType.BROAD_CAST == subscribe.getType()) {
            messageTemplate.convertAndSend("/topic/message", subscribe);
            return;
        }
        userDeliveryTimeService.updateTime(subscribe.getName(), subscribe.getTime());
        messageTemplate.convertAndSendToUser(subscribe.getName(), "/topic/data", subscribe);
    }
}
package com.wedul.websocket.repository;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class UserDeliveryTimeService {

    private final RedisTemplate redisTemplate;

    public int getTime(String user, int defaultVal) {
        Object time = redisTemplate.opsForValue().get(user);
        if (null == time) {
            return defaultVal;
        }
        return (int) time;
    }

    public void updateTime(String user, int time) {
        redisTemplate.opsForValue().set(user, time);
    }

    public void deleteTime(String user) {
        redisTemplate.delete(user);
    }

}

 

client

browser client에서는 wedul이라는 사용자를 별도 session으로 두개를 가지도록 했고 chul이라는 user에 session 하나를 가지도록 했다. 그 이유는 같은 서버에서 convertAndSendToUser를 사용해서 특정 유저에게 메시지를 보냈을 때 모든 세션에게 전송되는지 확인해보기 위해서이다.

<!DOCTYPE html>
<html>
<head>
    <title>Hello WebSocket</title>
    <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
    <link href="/main.css" rel="stylesheet">
    <script src="/webjars/jquery/jquery.min.js"></script>
    <script src="/webjars/sockjs-client/sockjs.min.js"></script>
    <script src="/webjars/stomp-websocket/stomp.min.js"></script>
    <script src="/app.js"></script>
</head>
<meta charset="utf-8">
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being
    enabled. Please enable
    Javascript and reload this page!</h2></noscript>
<div id="main-content" class="container">
    <div class="row">
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <label for="connect">WebSocket connection:</label>
                    <button id="connect" class="btn btn-default" type="submit">주문시작</button>
                    <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">주문종료
                    </button>
                </div>
            </form>
        </div>
    </div>
    <div class="row">
        <div class="col-md-12">
            <div>
                <span>고객 이름 : wedul, device : 1, 배달 예상 시간 : </span> <span class="wedul">0</span>
            </div>
            <div>
                <span>고객 이름 : wedul, device : 2, 배달 예상 시간 : </span> <span class="wedul">0</span>
            </div>
            <div>
                <span>고객 이름 : chul, device : 1, 배달 예상 시간 : </span> <span class="chul">0</span>
            </div>
        </div>
    </div>
</div>
</body>
</html>
let wedulStompClient1 = null;
let wedulStompClient2 = null;
let chulStompClient = null;

function setConnected(connected) {
    $("#connect").prop("disabled", connected);
    $("#disconnect").prop("disabled", !connected);
    if (connected) {
        $("#conversation").show();
    }
    else {
        $("#conversation").hide();
    }
    $("#greetings").html("");
}

function connect(user) {
    let socket = new SockJS('/gs-guide-websocket');
    let client = Stomp.over(socket);
    client.heartbeat.outgoing = 0;
    client.heartbeat.incoming = 0;
    client.connect({
        user
    }, function (frame) {
        setConnected(true);
        console.log('Connected: ' + frame);
        // user 메시지
        client.subscribe('/user/topic/data', function (message) {
            $('.' + user).text(JSON.parse(message.body).time);
        })

        // broadcast message
        client.subscribe('/topic/message', function (message) {
            console.log(JSON.parse(message.body).message);
            alert(JSON.parse(message.body).message);
        });

        // 에러메시지 핸들링
        client.subscribe('/user/queue/errors', function (message) {
            console.log(message);
            alert("[error] receive User : " + user + " "  + message);
        });
        client.heartbeat.outgoing = 0;
    });

    return client;
}

function disconnect(client, user) {
    if (client !== null) {
        client.disconnect();
    }
    setConnected(false);
    console.log(user + " Disconnected");
}

$(function () {
    $("form").on('submit', function (e) {
        e.preventDefault();
    });
    $( "#connect" ).click(function() {
        wedulStompClient1 = connect("wedul");
        wedulStompClient2 = connect("wedul");
        chulStompClient = connect("chul");
    });
    $( "#disconnect" ).click(function() {
        disconnect(wedulStompClient1, "wedul");
        disconnect(wedulStompClient2, "wedul");
        disconnect(chulStompClient, "chul");
    });
});


 

 

테스트

처음 시작 시 wedul 사용자 세션 2개 chul 사용자 세션 1개를 사용하여 배달 시작(CONNECT)를 시작하면 배달 예상시간 초기 20이 지정된다.

그리고 wedul 사용자에게 시간을 17분으로 단축하는 이벤트를 redis chnnel topic에 전달한다.

그럼 정상적으로 user wedul로 구독하고 있는 두개의 세션(device)의 값이 변경된 것을 알 수 있다.

그리고 chul도 14분으로 변경해보자. 잘 변경되는 것을 확인했다.

 

그럼 새로운 server와 client를 띄워보자. 기존 서버가 8080이었으니 8090으로 실행해서 CONNECT, SUBSCRIBE를 했을 때 잘 연동되는 것을 확인했다. (새로운 장비가 붙어도 연동 가능)

그리고 다시한번 wedul을 10분 chul을 9분으로 바꿔보자.

서버와 클라이언트 확장에 상관없이 잘 반영되는 것을 확인했다.

 

그럼 마지막으로 BROAD_CAST 메시지도 하나 보내보자.

 

 

잘된다. 

이걸 활용하면 많은 걸 만들 수 있을 것 같다.

 

 

코드 : https://github.com/weduls/websocket_study

반응형