Spring reactor Mono와 Flux 정리

web/마이크로서비스|2019. 1. 6. 22:07

지금까지 Spring5에서 추가되었던 리액트 프로그램을 사용하여 간단한 프로그램을 만들어 봤지만 정확하게 Mono와 Flux에 차이와 정의를 정리하지 못한 것 같다. 이번기회에 두 개의 정확한 차이와 사용방법등을 정리해보자.


리액티브 프로그래밍
비동기 블록킹 프로세스로 동작하는 애플리케이션을 논블록킹 프로세스로 동작하기 위해서 지원하는 프로그래밍. (현재 node.js의 동작방식과 유사)


기존 Spring 블록킹 방식
웹에서 서버에 요청이 왔을때 서버는 요청에 대한 적절한 응답을 보내야 하는데 만약 작업이 오래 걸릴 경우에는 요청에 대한 응답이 모두 종료될 때까지 블록킹된다. Spring에서는 그래서 동시 요청 처리를 위해서 멀티 thread를 지원한다. 그러면 하나의 작업이 thread에서 진행되고 다른 thread가 다른 요청을 할당받아서 처리한다. 하지만 이렇게 결국 thread가 늘어나게 되는 경우에는 thread 할당에 필요한 리소스가 늘어나게 되어 비 효율적이 될 수도 있다.


Spring5의 Non blocking
Spring 5가 도입 되면서 클라이언트에 요청에 별도의 thread를 생성하지 않고 buffer를 사용해서 요청을 받고 뒤에서 처리하는 처리하는 thread는 여러개를 두어서 처리한다. 결국 node.js의 싱글스레드 논블로킹을 따라가는 것 같다.

그럼 왜 블로킹 방식을 지원하던 스프링에서 왜 논블로킹 방식을 생각하게 된걸까? 만약에 수천개의 스트림 데이터가 초당 계속 업데이트 되는 시스템이고 적절하게 응답을 해줘야할 때 기존의 블로킹 방식에 경우 상당한 부담을 받게 된다. 그래서 이런 부담을 효율적으로 처리하기 위해서 도입되었다.


Mono와 Flux
Mono는 0-1개의 결과만을 처리하기 위한 Reactor 객체
Flux는 0-N개의 결과물을 처리하기 위한 Reactor 객체

보통 여러 스트림을 하나의 결과를 모아줄 때 Mono를 쓰고 각각의 Mono를 합쳐서 하나의 여러 개의 값을 여러개의 값을 처리할 떄 Flux를 사용한다.

근데 이 부분에서 의문이 있다. 왜 그럼 Flux를 사용하면 되는거지 한개까지만 데이터를 처리할 수 있는 Mono라는 타입이 있는걸까? Mono와 Flux는 같은 Publisher 인터페이스를 구현해서 만들어졌다. 하지만 어떤 시스템에서는 Multi Result가 아닌 하나의 결과셋만 있는 경우가 있다. 그럴경우에는 Mono를 사용한다. 예를 들어 우리가 보통 자바에서 하나의 결과 또는 결과가 없는경우에 List를 사용해서 결과를 받지 않는다. 그와 동일한 개념이라고 생각하면 좋다.

그럼 Mono와 Flux를 사용해서 리액티브 프로그래밍을 하는 방식을정리해보자.

리액티브 스트림

  • 비동기 스트림 처리를 위한 표준으로써 next는 다음신호를 담고 complete는 신호가 끝난것 그리고 error은 신호보내는 도중 에러가 발생한 것을 의미한다.
  • Publisher가 전송하면 데이터는 sequence 대로 전송한다. 그러면 Subscriber가 데이터를 수신한다.
  • next, complete, error 신호를 발생시킨다.


기본적인 설명

1
2
3
4
5
6
// Integer 값을 발생하는 Flux 생성
Flux<Integer> seq = Flux.just(4, 5, 6); 
 
// 구독
seq.subscribe(System.out::println); 
 
cs


Flux.just(1, 2, 3);
--1-2-3-|→ 이처럼 1, 2, 3 세개의 next신호를 발생하고 마지막에 complete 신호를 발생시켜 시퀀스를 끝낸다.

Flux.just();
아무런 sequence가 없는 경우에는 complete 신호만 발생시킨다.

Mono.just(1);
--1-|→ Mono와 Flux의 차이는 Mono는 최대 발생할 수 있는 값이 1개이다.


구독과 신호 발생
sequence는 바로 신호를 발생하지 않는다. 구독을 하는 시점에 신호를 발생하기 시작한다.

1
2
3
4
Flux.just(1, 2, 3)
 .doOnNext(i -> System.out.println("호출: " + i))
 .subscribe(i -> System.out.println("출력 결과: " + i));
 
cs

-> doOnNext메소드는 consumer로부터 구독이 일어났을때 실행된다. 그래서 위에 메시지에 next 신호가 발생했을때 다음과 같은 결과가 발생한다.

호출: 1
출력 결과: 1
호출: 2
출력 결과: 2
호출: 3
출력 결과: 3


Subscriber 인터페이스 메서드 사용방법 정의
subscriber에서 제공하는 메소드는 다음과 같고 구독이 발생하면 onSubscribe가 호출되고 다음 값을 요청하면 onNext 오류가 발생하면 onError 모든 데이터 요청이 끝나면 onComplete가 호출된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Flux<Integer> seq = Flux.just(1, 2, 3);
 
seq.subscribe(new Subscriber<>() {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        // 구독 시작
        this.subscription = s;
        this.subscription.request(1);
    }
 
    @Override
    public void onNext(Integer i) {
        System.out.println("Costomer가 Publisher에게 데이터 요청: " + i);
        this.subscription.request(1);
    }
 
    @Override
    public void onError(Throwable t) {
        System.out.println("Subscriber.onError: " + t.getMessage());
    }
 
    @Override
    public void onComplete() {
        System.out.println("Subscriber.onComplete");
    }
});
cs

-> seq.subscribe 메서드에서 전달한 임의 Subscriber 객체를 onSubscribe 메서드에서 인자로 받아서 이를 필드로 저장하여 사용한다. request(1)은 한개의 데이터를 요청한다는 뜻이다. 만약 모든 데이터를 한번에 받고 싶다면 다음과 같이 지정하면 된다

1
2
3
4
5
6
@Override
public void onSubscribe(Subscription s) {
    System.out.println("Subscriber.onSubscribe");
    this.subscription = s;
    this.subscription.request(Long.MAX_VALUE);
}
cs


콜드 시퀀스와 핫 시퀀스
시퀀스는 구독 시점부터 데이터를 새로 생성하는 Cold sequence와 구독하는 customer와 상관 없이 데이터를 생성하는 hot sequence가 존재한다.

앞 예제 Flux.just()로 생성한 시퀀스가 콜드 시퀀스이다.
콜드 시퀀스는 위에 보면 알겠지만 subscribe가 발생하지 않는다.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(v -> System.out.println(“첫번 째 요청: " + v)); // 구독
seq.subscribe(v -> System.out.println("두번 째 요청: " + v)); // 구독

-> 이 코드를 보면 알겠지만 seq 시퀀스는 구독을 두번한다.이 결과를 seq 시퀀스는 각 구독마다 데이터를 새롭게 생성한다. 마치 API에서 호출하는 것 처럼 매 호출마다 새로운 응답을 만들어 낸다.
첫번 째 요청: 1
첫번 째 요청: 2
첫번 째 요청: 3
두번 째 요청: 1
두번 째 요청: 2
두번 째 요청: 3

핫 시퀀스는 구독여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점이후에 발생하는 데이터부터 신호를 받는다.
-> 예전부터 있었던 데이터를 똑같은 응답을 받는게 아니라 구독을 시작한 부분부터 받는다.


다음 시간에는 스프링으로 직접 만들어보자.



Spring reactor
https://ahea.wordpress.com/2017/02/15/spring-reactive/

단계별 설명
http://wiki.sys4u.co.kr/pages/viewpage.action?pageId=8552586


댓글()

Mysql Exists와 IN절 설명과 차이점

데이터베이스/mysql|2018. 10. 3. 23:34

두 개 모두 where절에 조건을 보고 결과를 걸러낼때 사용하는데 정리가 잘 안되서 정리해봤다.

Exists 
서브쿼리가 반화나는 결과값이 있는지를 조사한다.
단지 반환된 행이 있는지 없는지만 보고 값이 있으면 참 없으면 거짓을 반환한다.

1
SELECT * FROM sample1;
cs

1
SELECT * FROM sample2;
cs


두 개의 테이블중 조건에 맞는 Row만 추출된다.

1
SELECT * FROM sample1 s1 WHERE EXISTS(SELECT * FROM sample2 s2 WHERE s1.no = s2.no);
cs

그럼 반대로 조건에 맞지 않는 ROW만 추출하고 싶으면 어떻게 해야할까?

1
SELECT * FROM sample1 s1 WHERE NOT EXISTS(SELECT * FROM sample2 s2 WHERE s1.no = s2.no);
cs


IN 
집합 내부에 값이 존재하는지 여부 확인한다. 
실제로 존재하는 데이터의 값을 비교하기 때문에 Exists보다 속도가 느린경우가 있다.

두가지 경우로 사용이 가능하다. 

1.집합군

1
SELECT * FROM sample1 s1 WHERE NO IN (45);
cs


2.서브쿼리

1
SELECT * FROM sample1 s1 WHERE NO IN (SELECT NO FROM sample2 s2);
cs

반대로 포함되지 않은 경우를 추출하고 싶은경우에는 NOT IN 사용

1
SELECT * FROM sample1 s1 WHERE NO NOT IN (SELECT NO FROM sample2 s2);
cs


※ 주의
하지만 NOT IN에 경우에 조건에 맞는 데이터가 있어도 중간에 NULL이 존재하게되면 no rows selected가 나오게 되니 NVL 처리로 NULL 처리를 해야한다.

NULL이 포함된 sample3 테이블

쿼리 조회에 사용될 sample1 테이블


NOT IN절

1
SELECT * FROM sample1 s1 WHERE a IN (SELECT val FROM sample3 s3);
cs

sample1테이블에 null 값이 포함되어 있기 때문에 검색시 no rows selected 결과가 나온다.


NVL 처리 후 확인

1
SELECT * FROM sample1 s1 WHERE a Not IN (SELECT IFNULL (val, 'd'FROM sample3 s3);
cs




댓글()

Git의 rebase를 이용한 커밋 정리 (merge와 차이)

IT 지식/Git|2018. 6. 21. 23:26

Git을 처음 입사 후 진행했던 프로젝트에서 Gitlab을 통해 처음 접해보았다.

확실히 저장소 관리하는 방식과 커밋 전 단계가 제공되는 것 등등 편한 것이 많았다. 하지만 그 때 당시 딱히 팀원이 없었기에 히스토리 관리 및 flow 관리를 해야한다는 필요성을 알지도 못했고 듣지도 못했었다.

그러나 공부를 진행해보면서 git에 rebase라는 좋은 기능이 있다는 것을 알게되었고 배워보고 싶어 nhnent에서 제공하는 사내 교육에 참여하였다.

우선 첫 번째 글로 rebase와 merge에 차이를 설명하고 rebase를 진행해보자.

rebase란? 우선 rebase는 base를 다시 지정한다 (re-base)의 의미이다. base가 무엇인가? 다음 그림을 보자.

그림을 보면 master의 c4와 experiment 브랜치의 c3 커밋이 있다. 두 개의 커밋은 c2라는 같은 조상에서 시작되었다. 바로 이 c2가 base이다. 그럼 이 베이스를 다시 한다는 것은 어떤 의미일까?

여기서 중요한 개념이 rebase와 merge에 차이이다.

그림을 보면 두 개의 master와 experiment 브랜치는 서로 merge를 하였고, 그러면서 새로운 c5 커밋이 생겼다. 이 두개의 커밋이 합쳐져서 이제 하나의 뿌리로 시작하겠지만 커밋 히스토리를 보게되면 위와 같이 뿌리가 나눠져 있어서 히스토리를 찾아갈 때 보기가 어렵다.

이건 단순한 한가지 코드지만 만약 브랜치가 엄청 많아진다면? 정말 히스토리를 보기 어려울 것이다. 그렇기 때문에 리베이스 개념이 사용되는데 베이스를 다시 재정의하여 새롭게 커밋라인을 정리하여 히스토리를 깔끔하게 볼 수있게 해준다.

이 그림이 rebase가 완료된 그림이다.

브랜치가 사라지고 두 개의 브랜치의 base였던 c2를 기준으로 정리가 된것을 볼 수있다.

rebase 단계 계산 및 진행과정

현재 위와 같은 상황이라고 가정해보자. 현재 head는 feature 브랜치에 있고 아래 master branch를 향해서 rebase를 진행할 것이다.

그러면 c2까지의 변경내용에 대해 계산을 선 진행한 후 다음과 같이 변경 내용들을 master 브랜치 앞에 붙힌다.

그리고 master 브랜치를 새로 리베이스된 커밋 앞으로 fast-forward를 진행하면 다음과 같이 완료된다.

그럼 리베이스를 실습해보자.

rebase 하기

git의 기능을 편리하게 사용할 수 있도록 제공해주는 sourceTree를 사용해서 진행해보겠다.

우선 저장소를 먼저 생성하고 마스터 브랜치에 새로운 파일을 하나 추가하고 첫 번째 커밋을 진행한다.

그리고 마스터 브랜치에 파일을 하나 더 추가하고 한번더 커밋을 제공하면 다음과 같이 히스토리를 볼 수 있다.

그리고 feature 브랜치를 하나 생성한다.

생성이 완료된 feature로 체크아웃한 후 두 개의 커밋을 더 진행한다. 그리고 마스터로 돌아와서 한번도 커밋을 진행하면 다음과 같은 히스토리를 볼 수 있다.

그럼 여기서 feature 브랜치의 내용을 master로 rebase를 진행해보겠다.

우선 feature 브랜치로 checkout 진행 후 sourceTree에서 master branch를 우클릭한 후 현재 변경사항을 master에 재배치를 선택한다.

그러면 다음과 같이 rebase가 진행된 것을 볼 수 있다. 깔끔하다!

그럼 fast-forward 까지 진행하고 마무리 해보자.

회사에서 지금은 웹팀으로 옮겨서 SVN을 사용중이다. 물론 SVN이 심플해서 우리회사에는 더 어울리는 것 같다. 그래도 요새 대다수가 사용하는 git에 대해 항상 더 알고싶었는데 기회가 있어서 교육을 들어 좋았다.

계속해서 다양한 사례에서 rebase를 진행하는 걸 알아보자.

댓글()
  1. jun 2019.10.30 09:27 댓글주소  수정/삭제  댓글쓰기

    설명 깔끔하고 이해되기 쉬웠습니다!
    감사합니다 ㅎㅎ

JQuery함수의 append와 appendTo 차이

web/javaScript|2018. 6. 19. 21:31

하나의 dom 객체를 다른 dom에 붙히려고 할때 JQuery에서 제공하는 append나 appendTo를 사용한다.


그런데 append와 appendTo가 헷갈리는 경우가 있는데 이 두개의 차이를 알아보자.


두 개 메서드의 결정적인 차이는 붙는 대상이 무엇이냐에 있다.

아래 예제를 살펴보자

1
2
3
4
5
6
<div id="test">
 
 
 
$('<div>').appendTo($('#test'));
$('#test').append($('<div>'));
cs


코드를 보면 두 개의 결과는 동일하다. 짐작하겠지만 appendTo는 뒤에있는 객체에 앞에있는 내용을 붙히겠다는 의미이고 append는 앞에있는 객체에 뒤라는 객체를 붙히겠다는 뜻이다.


영어 뜻을 잘 해석해서 진행하면 이해하기가 더 쉬울 것 같다.

댓글()

Java List 인터페이스 중 CopyOnWriteArrayList 소개

JAVA/JAVA 관련|2018. 6. 3. 16:36

자바에는 크게 4개의 List 인터페이스를 구현한 클래스가 있다.

- Vector, ArrayList, LinkedList, CopyOnWriteArrayList


그 중 가장 생소한 이름이 있는데 CopyOnWriteArrayList이다. CopyOnWriteArrayList는 그냥 ArrayList랑 다르길래 화려한 이름을 가지고 있는걸까?


ArrayList vs CopyOnWriteArrayList

일반 ArrayList의 경우 스레드에 안전하게 설게되어 있지 않기때문에 만약 스레드 처리가 필요한 List의 경우에 Vector를 사용하거나 ArrayList에 synchroized를 사용하여 처리하였다. 하지만 자바 1.5부터 있던 CopyOnWriteArrayList를 쉽게 이문제를 해결할 수 있다.


CopyOnWriteArrayList의 경우 ArrayList와 모든 부분이 동일하나 어디에 컨텐츠를 전달할 때 컨텐츠를 복사해서 전달한다. 그렇기 때문에 전달 후 해당 List의 내용이 변경될 것을 우려하지 않아도 된다.


CopyOnWriteArrayList는 그냥 ArrayList보다 물론 부담이 있을수 있다. 하지만 잘못설계된 동기화코드보다 더 안전하고 비용이 절감될 수도 있다.


그리고 CopyOnWriteArrayList는 객체를 매번 복사하는 것이 아니라 전달시 해당 상태를 스냅샷으로 가지고 있는 방식으로 진행한다.

자세한 내용은 아래 사이트를 참조하면 도움이 될 것이다.


https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CopyOnWriteArrayList.html

댓글()