Spring 비동기 프로세스 Callable, DeferredResult, SseEmitter

web/Spring|2018. 5. 27. 11:59

Callable

Spring MVC 3.2는 Servlet 3를 기반으로한 비동기 요청 프로세스를 공개했다. 이 프로세스는 보통 값을 바로 반환하는 것 대신에 컨트롤러는 java.util.concurrent.Callable를 먼저 반환하고, Spring에서 관리하는 별도의 Thread에서 값을 반환한다.

위의 과정을 진행하는 동안 주요 Servlet Container Thread는 해당 요청에서 벗어나서 다른 Request를 받을 수 있게 벗어난다. 그리고 Spring MVC는 TaskExecutor의 도음으로 각각에 Thread에서 Callable 작업을 실행한다. 그리고 Callable 작업이 종료된 후 Callable 객체를 반환한다. 그럼 다시 요청이 Servlet container로 돌아가게 되고 Spring MVC Thread에게 Callable 객체를 반환 받는다. 



1
2
3
4
5
6
7
8
9
@RequestMapping(method=RequestMethod.POST)*public* Callable<String> processUpload(*final* MultipartFile file) {
 
    *return* *new* Callable<String>() {
        *public* String call() *throws* Exception {
            // ...*return* "someView";
        }
    };
 
}
cs



주요특징
- Controller는 Callable 객체를 바로 리턴한다.
- Spring MVC는 내부 TaskExecutor에게 해당 작업을 전달한다.
- DispatherServlet 그리고 모든 필터들은 Servlet Container Thread에서 벗어나지만 response는 여전히 열려있는 상태이다.
- Callable은 결과를 생성하고 Spring MVC는 Servlet container에게 재 요청을 진행한다.
- DispatcherServlet은 Callable에게서 반환된 결과값을 가지고 다시 작업을 진행시키다.


DeferredResult

Callable 이외의 다른 옵션이 있는데 DeferredResult 인스턴스이다. DeferredResult 도 또한 다른 Thread로 부터 생산된 값을 반환하는 것은 동일하다. 하지만 이 Thread가 Spring MVC에서 관리되는 것은 아니다. 동작에 대해 예를 들면 JMS message, scheduled task 등과 같은 외부 의 몇몇 이벤트로 부터 생상된 된 response를 반환해야 하는 경우에 사용된다. 이는 long polling 방식이라고도 하는데, 특정 작업이 진행이 되거나 이벤트가 완료된 후에 요청을 했던 client에게 값을 전달하는 방식이다.



1
2
3
4
5
6
@RequestMapping("/quotes")@ResponseBody*public* DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = *new* DeferredResult<String>();
    // Save the deferredResult somewhere..*return* deferredResult;
}
// In some other thread...
deferredResult.setResult(data);
cs



주요특징

- Controller는 DeferredResult를 반환하고, 내부 Queue나 List에 생성한 DeferedResult를 보관하고 있는다.
- Spring MVC는 비동기 작업을 진행한다.
- DispatcherServlet과 모든 필터들은 해당 요청에서 벗아나지만 response는 열린체 대기한다.
- Application은 DefererResult에 완료된 작업 결과를 설정하고, Spring MVC는 해당 요청을 다시 Servlet Container에게 붙힌다.
- DispatcherServlet은 비동기로 생상된 결과를 가지고 작업을 다시진행한다.

=> 이는 Long polling 프로세스에서 다음과 같이 사용될 수 있다.
client에게 비동기 요청을 받는다.
생성된 DeferedResult를 맵에 저장하고 있는다.
client에게 전달해야 하는 변동사항이 생겼을 때, map에서 해당 client에 DeferredResult를 꺼내서 setResult에 결과를 전달한다



HTTP Streaming (server send event)
 Callable과 DerferedResult는 하나의 요청에 하나의 응답만 가능하다. 하지만 Http5가 표준이 되면서 하나의 Http 요청에 여러개의 응답이 가능하다. 그 이유는 Spring MVC가 ResponseBodyEmitter를 통해서 여러개의 오브젝트를 보낼 수 있기 때문이다.



1
2
3
4
5
6
7
8
9
10
@RequestMapping("/events")*public* ResponseBodyEmitter<String> handle() {
    ResponseBodyEmitter<String> emitter = *new* ResponseBodyEmitter<String>();
    // Save the emitter somewhere..*return* emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
cs



- emitter는 send()메소드를 통해 하나 이상의 결과를 반환할 수 있고, 더 이상 보낼 게 없을 때 complete() 메소드를 실행하면 response가 반환된다.

다음의 예는 
서버와 connection을 맽어놓고 서버에 변경사항이 생겼을 때 연결되어 있는 client에게 신호를 전송하도록 sse 통신을 만들어 보았다.

여기서 서버에 변경사항이 생긴것 체크는 페이지에서 버튼으로 표현하였다.

1. web.xml 설정


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- Processes application requests -->
<servlet>
    <servlet-name>appServlet</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
</servlet>
    
<servlet-mapping>
    <servlet-name>appServlet</servlet-name>
    <url-pattern>/</url-pattern>
</servlet-mapping>
cs



비동기는 web.xml 서브 엘리먼트인 <async-supported>true</async-supported> 설정을 통해서 설정된 DispathcherServlet에서 가능하다.

2. controller


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.wedul.test;
 
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.Locale;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
/**
 * Handles requests for the application home page.
 */
@Controller
public class HomeController {
    
    private static final Logger logger = LoggerFactory.getLogger(HomeController.class);
    
    @Autowired
    TestService testService;
  
    
    @RequestMapping("/events")
    public SseEmitter handle(String key) {
        SseEmitter emitter = new SseEmitter();
        testService.setEmitter(key, emitter);
        
        emitter.onCompletion(() -> {
            synchronized (this) {
                    testService.remove(key);
            }
        });
 
        emitter.onTimeout(()-> {
                emitter.complete();
        });
 
        
        return emitter;
    }
    
    @RequestMapping(value = "/signal", method = RequestMethod.POST)
    public void signal(String key) throws IOException {
        testService.sendEmitter(key);
    }
    
}
cs



- controller에서 events를 통해 요청 받은 다음 그 요청에서 SseEmitter객체를 생성하여 서비스로 넘겨 전달받은 key와 함께 맵에 보관한 후, emitter는 먼저 반환한다.
- signal url로 전송받은 key에 해당하는 emitter의 send()를 통해 결과를 전송한다. 


3. service


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.wedul.test;
 
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
@Service
public class TestService {
    
    private Map<String, SseEmitter> datas = new ConcurrentHashMap<String, SseEmitter>();
    
    public void setEmitter(String key, SseEmitter emitter) {
        this.datas.put(key, emitter);
    }
    
    public void sendEmitter(String key) throws IOException {
        for (Map.Entry<String, SseEmitter> data : this.datas.entrySet()) {
            if (data.getKey().equals(key)) {
                data.getValue().send(key + " is sended");
            }
        }
        
    }
    
    public void remove(String key) {
        this.datas.remove(key);
    }
 
}
cs



SseEmitter 객체는 send메소드를 통해 결과를 발송하면 요청결과가 반환된다.


4. ajax


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<%@ page session="false" %>
<html>
<head>
    <title>Home</title>
</head>
<body>
<h1>
    Hello world!  
</h1>
 
<P>  The time on the server is ${serverTime}. </P>
 
<button id="id">interupt</button>
</body>
<script
  src="https://code.jquery.com/jquery-3.3.1.js"
  integrity="sha256-2Kok7MbOyxpgUVvAk/HJ2jigOSYS2auK4Pfzbm7uH60="
  crossorigin="anonymous"></script>
<script>
const $id = $('#id');
 
var goSSE = function () {
    const eventSource = new EventSource('http://localhost:8080/test/events?key=babo'); 
    eventSource.onmessage = event => {
      console.log(event);
    };
};
 
(function() {
    goSSE();
    console.log('gogo');
})();
 
 
$id.click(function (e) {
    console.log('clicked');
    
    $.ajax({
        url : '/test/signal',
        type : 'POST',
        data : {
            'key' : 'babo'
        },
        dataType:'application/x-www-form-urlencoded',
        success : function(data) {              
        },
        error : function(request,error)
        {
        }
    });
});
 
</script>
 
</html>
 
 
cs



동작화면



- /home 페이지에 들어가면 자동으로 /events 를 요청하여 long polling을 설정한다.
- event 요청이 넘어간 것을 옆에서 볼 수 있다.




- 왼쪽에 interrupt 버튼을 클릭하면 /signal을 보내서 같이 전송된 key에 맞는 SseEmitter의 send()메소드를 사용하여 결과를 반환하면 다음과 같이 결과가 뒤늦게 도착한다.
- 이런식으로 Long polling을 설정할 수 있다.



댓글()