규칙 68 - 스레드보다는 실행자와 태스크를 이용하라.

JAVA/Effective Java|2018. 6. 13. 09:35

여러 쓰레드를 실행해야할 때,  큐에 넣고 작업을 진행하거나 할 수 있으면 더욱 효율적으로 관리 할 수있다.


그래서 자바 1.5부터 자바 플랫폼에는 java.util.concurrent가 추가되었다. 이 패키지에는 Executor Framework가 들어 있는데 이는 인터페이스 기반 task 실행 프레임워크이다.


해당 Executor를 실행하기 위해서는 다음과 같이 입력하면 된다.

1
2
3
4
5
6
7
ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.    
    @Override
    public void run() {
        System.out.println("test");
    }
});
cs


그리고 만약 executor안에 요소가 모두 실행이 끝난뒤에는 실행자가 꺼지지는 않는다.

이를 명시적으로 꺼주어야 하는데 그때 shutdown() 메소드를 이용하면 된다.


이른 기능을 제외하고도 임의에 task의 작업을 기다릴 수도 있고 task가 끝날때마다 정보를 가져오도록 할수도 있다. 


여러 작업을 관리해야하는 경우에는 ThreadPool을 만들어서 작업할 수 있다. 이런 작업을 제공하는 ThreadPool은 newCachedThreadPool과 newFixedThreadPool이 있다.


먼저 newCachedThreadPool은 부하가 크지않고 작은 프로그램에서 사용하기에 적합하다. 설정도 필요없고 보통 많은일을 알아서 처리한다. 하지만 작업량이 많은곳에서는 적합하지 않다. 

왜냐하면 해당 쓰레드풀은 작업이 큐에 들어가는 것이 아니라 실행을 담당하는 스레드에 바로 넘겨진다. 그렇기 때문에 task가 많이 지정될 경우에는 상당히 많은 양의 쓰레드가 생성이 되어 CPU의 사용량이 증가된다.


그렇기 때문에 이를 보안하는 ThreadPool이 있는데 newFixedThreadPool이다.


newFixedThreadPool은 스레드 개수가 고정된 풀을 만들어서 제어가 손쉽다.


이런 실행자들을 이용하여 쓰레드를 관리하면 별도의 개별쓰레드를 만들어서 관리하는 것보다 훨씬 편하고 안정적이다.


  출처 : 조슈아 블로크, 『 Effective Java 2/E』, 이병준 옮김, 인사이트(2014.9.1), 규칙68



댓글()

Spring 비동기 프로세스 Callable, DeferredResult, SseEmitter

web/Spring|2018. 5. 27. 11:59

Callable

Spring MVC 3.2는 Servlet 3를 기반으로한 비동기 요청 프로세스를 공개했다. 이 프로세스는 보통 값을 바로 반환하는 것 대신에 컨트롤러는 java.util.concurrent.Callable를 먼저 반환하고, Spring에서 관리하는 별도의 Thread에서 값을 반환한다.

위의 과정을 진행하는 동안 주요 Servlet Container Thread는 해당 요청에서 벗어나서 다른 Request를 받을 수 있게 벗어난다. 그리고 Spring MVC는 TaskExecutor의 도음으로 각각에 Thread에서 Callable 작업을 실행한다. 그리고 Callable 작업이 종료된 후 Callable 객체를 반환한다. 그럼 다시 요청이 Servlet container로 돌아가게 되고 Spring MVC Thread에게 Callable 객체를 반환 받는다. 



1
2
3
4
5
6
7
8
9
@RequestMapping(method=RequestMethod.POST)*public* Callable<String> processUpload(*final* MultipartFile file) {
 
    *return* *new* Callable<String>() {
        *public* String call() *throws* Exception {
            // ...*return* "someView";
        }
    };
 
}
cs



주요특징
- Controller는 Callable 객체를 바로 리턴한다.
- Spring MVC는 내부 TaskExecutor에게 해당 작업을 전달한다.
- DispatherServlet 그리고 모든 필터들은 Servlet Container Thread에서 벗어나지만 response는 여전히 열려있는 상태이다.
- Callable은 결과를 생성하고 Spring MVC는 Servlet container에게 재 요청을 진행한다.
- DispatcherServlet은 Callable에게서 반환된 결과값을 가지고 다시 작업을 진행시키다.


DeferredResult

Callable 이외의 다른 옵션이 있는데 DeferredResult 인스턴스이다. DeferredResult 도 또한 다른 Thread로 부터 생산된 값을 반환하는 것은 동일하다. 하지만 이 Thread가 Spring MVC에서 관리되는 것은 아니다. 동작에 대해 예를 들면 JMS message, scheduled task 등과 같은 외부 의 몇몇 이벤트로 부터 생상된 된 response를 반환해야 하는 경우에 사용된다. 이는 long polling 방식이라고도 하는데, 특정 작업이 진행이 되거나 이벤트가 완료된 후에 요청을 했던 client에게 값을 전달하는 방식이다.



1
2
3
4
5
6
@RequestMapping("/quotes")@ResponseBody*public* DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = *new* DeferredResult<String>();
    // Save the deferredResult somewhere..*return* deferredResult;
}
// In some other thread...
deferredResult.setResult(data);
cs



주요특징

- Controller는 DeferredResult를 반환하고, 내부 Queue나 List에 생성한 DeferedResult를 보관하고 있는다.
- Spring MVC는 비동기 작업을 진행한다.
- DispatcherServlet과 모든 필터들은 해당 요청에서 벗아나지만 response는 열린체 대기한다.
- Application은 DefererResult에 완료된 작업 결과를 설정하고, Spring MVC는 해당 요청을 다시 Servlet Container에게 붙힌다.
- DispatcherServlet은 비동기로 생상된 결과를 가지고 작업을 다시진행한다.

=> 이는 Long polling 프로세스에서 다음과 같이 사용될 수 있다.
client에게 비동기 요청을 받는다.
생성된 DeferedResult를 맵에 저장하고 있는다.
client에게 전달해야 하는 변동사항이 생겼을 때, map에서 해당 client에 DeferredResult를 꺼내서 setResult에 결과를 전달한다



HTTP Streaming (server send event)
 Callable과 DerferedResult는 하나의 요청에 하나의 응답만 가능하다. 하지만 Http5가 표준이 되면서 하나의 Http 요청에 여러개의 응답이 가능하다. 그 이유는 Spring MVC가 ResponseBodyEmitter를 통해서 여러개의 오브젝트를 보낼 수 있기 때문이다.



1
2
3
4
5
6
7
8
9
10
@RequestMapping("/events")*public* ResponseBodyEmitter<String> handle() {
    ResponseBodyEmitter<String> emitter = *new* ResponseBodyEmitter<String>();
    // Save the emitter somewhere..*return* emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
cs



- emitter는 send()메소드를 통해 하나 이상의 결과를 반환할 수 있고, 더 이상 보낼 게 없을 때 complete() 메소드를 실행하면 response가 반환된다.

다음의 예는 
서버와 connection을 맽어놓고 서버에 변경사항이 생겼을 때 연결되어 있는 client에게 신호를 전송하도록 sse 통신을 만들어 보았다.

여기서 서버에 변경사항이 생긴것 체크는 페이지에서 버튼으로 표현하였다.

1. web.xml 설정


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- Processes application requests -->
<servlet>
    <servlet-name>appServlet</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
</servlet>
    
<servlet-mapping>
    <servlet-name>appServlet</servlet-name>
    <url-pattern>/</url-pattern>
</servlet-mapping>
cs



비동기는 web.xml 서브 엘리먼트인 <async-supported>true</async-supported> 설정을 통해서 설정된 DispathcherServlet에서 가능하다.

2. controller


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.wedul.test;
 
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.Locale;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
/**
 * Handles requests for the application home page.
 */
@Controller
public class HomeController {
    
    private static final Logger logger = LoggerFactory.getLogger(HomeController.class);
    
    @Autowired
    TestService testService;
  
    
    @RequestMapping("/events")
    public SseEmitter handle(String key) {
        SseEmitter emitter = new SseEmitter();
        testService.setEmitter(key, emitter);
        
        emitter.onCompletion(() -> {
            synchronized (this) {
                    testService.remove(key);
            }
        });
 
        emitter.onTimeout(()-> {
                emitter.complete();
        });
 
        
        return emitter;
    }
    
    @RequestMapping(value = "/signal", method = RequestMethod.POST)
    public void signal(String key) throws IOException {
        testService.sendEmitter(key);
    }
    
}
cs



- controller에서 events를 통해 요청 받은 다음 그 요청에서 SseEmitter객체를 생성하여 서비스로 넘겨 전달받은 key와 함께 맵에 보관한 후, emitter는 먼저 반환한다.
- signal url로 전송받은 key에 해당하는 emitter의 send()를 통해 결과를 전송한다. 


3. service


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.wedul.test;
 
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
@Service
public class TestService {
    
    private Map<String, SseEmitter> datas = new ConcurrentHashMap<String, SseEmitter>();
    
    public void setEmitter(String key, SseEmitter emitter) {
        this.datas.put(key, emitter);
    }
    
    public void sendEmitter(String key) throws IOException {
        for (Map.Entry<String, SseEmitter> data : this.datas.entrySet()) {
            if (data.getKey().equals(key)) {
                data.getValue().send(key + " is sended");
            }
        }
        
    }
    
    public void remove(String key) {
        this.datas.remove(key);
    }
 
}
cs



SseEmitter 객체는 send메소드를 통해 결과를 발송하면 요청결과가 반환된다.


4. ajax


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<%@ page session="false" %>
<html>
<head>
    <title>Home</title>
</head>
<body>
<h1>
    Hello world!  
</h1>
 
<P>  The time on the server is ${serverTime}. </P>
 
<button id="id">interupt</button>
</body>
<script
  src="https://code.jquery.com/jquery-3.3.1.js"
  integrity="sha256-2Kok7MbOyxpgUVvAk/HJ2jigOSYS2auK4Pfzbm7uH60="
  crossorigin="anonymous"></script>
<script>
const $id = $('#id');
 
var goSSE = function () {
    const eventSource = new EventSource('http://localhost:8080/test/events?key=babo'); 
    eventSource.onmessage = event => {
      console.log(event);
    };
};
 
(function() {
    goSSE();
    console.log('gogo');
})();
 
 
$id.click(function (e) {
    console.log('clicked');
    
    $.ajax({
        url : '/test/signal',
        type : 'POST',
        data : {
            'key' : 'babo'
        },
        dataType:'application/x-www-form-urlencoded',
        success : function(data) {              
        },
        error : function(request,error)
        {
        }
    });
});
 
</script>
 
</html>
 
 
cs



동작화면



- /home 페이지에 들어가면 자동으로 /events 를 요청하여 long polling을 설정한다.
- event 요청이 넘어간 것을 옆에서 볼 수 있다.




- 왼쪽에 interrupt 버튼을 클릭하면 /signal을 보내서 같이 전송된 key에 맞는 SseEmitter의 send()메소드를 사용하여 결과를 반환하면 다음과 같이 결과가 뒤늦게 도착한다.
- 이런식으로 Long polling을 설정할 수 있다.



댓글()

JAVA Thread Futher, Callable, Executor

JAVA/Thread|2016. 12. 21. 22:42

Futher, Callable, Executor

  • 스레드가 필요할  스레드를 생성하여 제공하는 ExecutorService
  • Callabe 객체를 생성하여 Executor 추가하여 객체를 순차적으로 실행한  Future 클래스로 결과를 반환 받는다.
  • Callable 객체는 인터페이스이고 어떤 항목이든 담을  있는 call() 메소드 하나만 포함한다.

EX) Callable 객체

Class findmaxTask implements Callable<Integer>{

 

FindMaxTas(int[] data, int start, int end){

Super(data,start,end);

}

 

Public Integer getMax(){

생략

}

}

 

MultiThreadedMaxFinder 객체

 

Public class MultithreadedMaxFinder {

 

FindMaxTask task1 = new FindMaxTask(data, 0, 100);

FindMaxTask task2 = new FindMaxTask(data, 1000, 1000);

 

ExecutorService service = Executor.newFixedThreadPool(2);//작업을 두개로 분할

 

//Future 결과값 반환

Future<Integer> future1 = service.submit(task1);

Future<Integer> future2 = service.submit(task2);

 

Return Math.max(future1.get(), future2.get());

// future1.get() 호출될   메소드는 먼저 FindMaxTask 끝날때까지 대기한다그러고 난후 future2.get() 호출 한다 개의 스레드가 정상적으로 종료된 후에 결과값 Max 리턴한다.

}

'JAVA > Thread' 카테고리의 다른 글

Thread 크리티컬 세션  (0) 2016.12.21
java thread pool 소개  (0) 2016.12.21
JAVA 스레드 스케줄링  (0) 2016.12.21
Thread 동기화 문제  (0) 2016.12.21
JAVA Thread Futher, Callable, Executor  (0) 2016.12.21
Thread 폴링 방식  (0) 2016.12.21

댓글()