Spring reactor 2.1.2 (netty 0.8.4) Mono.zip readTimeoutException 문제

web/Spring|2020. 7. 12. 20:34

 

Mono zip


각 Mono 처리 스레드를 병렬로 실행하고 이를 묶어서 사용할 수 있는게 Mono.zip이다. 

근데 Mono zip에서 병렬로 실행되는 작업 중 하나가 empty 또는 error가 발생 되면 바로 error 또는 complete를 내뱉게 되어있다. 하지만 각 Mono 구독 작업에 error와 empty 발생 시 문제에 대해 fallback 처리를 해주면 에러가 발생하더라도 그 로직을 타게 되어있다. 

 

하지만 2.1.2(netty 0.8.4) 버전을 사용하고 있을 때 호출 체인에서 첫 번째 요청의 실패 이후에 두 번째 요청이 정상적으로 이루어 지지 않아서 readTimeout이 발생되는 문제를 경험하였습니다.

 

이 문제를 해결하기 위해서 알아보던 중 2.1.2버전에 문제가 있는 것을 알게 되어 테스트를 해봤다.

 

 

테스트


아래 Mono.zip을 보면 두 개의 Mono 구독 작업을 병렬로 진행하도록 지정해놨고 각 작업 종료 후 response에 대한 부분을 출력하도록 해놨다.

public Mono<WedulResponse> circuitTest(WedulRequest request) {
    return Mono.zip(
        wedulClient.isWedulExist(request)
            .doOnError(e -> log.error("service error", e))
            .defaultIfEmpty(WedulResponse.builder().type("Error Return").build())
            .onErrorReturn(WedulResponse.builder().type("Error Return").build()),
        wedulTestClient.isWedulTestExist(request)
    ).map(
        d -> {
            System.out.println(d.getT2().getPage());
            System.out.println(d.getT1().getType());
            return WedulResponse.builder().isExist(d.getT1().isExist()).build();
        }
    ).doOnError(e -> log.error("error {}", e));
}

이 때 첫 번째 요청은 error가 발생하거나 empty 응답이 발생했을 때 기본값을 주도록 하고 socket timeout의 값은 1ms로 극단적으로 무조건 타임아웃이 나도록 지정해 놨다.

 

그리고 두 번째 요청http://wedul.space에서 사용중인 정상적인 api를 호출하도록 하였고 socket timeout 시간도 3000ms로 아주 넉넉하게 주었고 실제로 타임아웃이 날 이유가 없다.

 

그럼 정상적인 테스트 결과라고 한다면 아래와 같이 정상적인 응답이 와야한다. (page는 무조건 5로 나오게 지정해놨다.)

5
Error Return

 

 

실제로 응답은 예상된 대로 잘 왔다. 하지만 간혈적으로 아래와 같은 readTimeout exception이 별도로 계속 떨어졌다.

2020-07-12 20:17:30.998 ERROR 12214 --- [ctor-http-nio-3] r.netty.http.client.HttpClientConnect    : [id: 0x029e73cc, L:/127.0.0.1:63695 - R:localhost/127.0.0.1:8081] The connection observed an error

io.netty.handler.timeout.ReadTimeoutException: null

 

그래서 왜 그럴까 하고 검색을 해보니 2.1.2버전에 문제가 있어서 버전업을 하면 해결된다고 들었다. 

https://stackoverflow.com/questions/56048216/spring-webflux-timeout-with-multiple-clients

 

Spring webflux timeout with multiple clients

I have a service that interacts with a couple of other services. So I created separate webclients for them ( because of different basepaths). I had set timeouts for them individually based on https://

stackoverflow.com

 

그래서 버전을 2.2.4버전으로 업데이트하고 다시 테스트 해봤다. 실제로 아까 발생했던 readTimeoutException 문제는 더 발생하지 않았다.

 

 

실제 코드가 어떤 부분이 문제였는지는 찾지 못했지만 그래도 문제는 해결되어서 다행이다.

 

테스트 코드

https://github.com/weduls/circuit_breaker_test

댓글()

스프링 부트와 RabbitMQ를 사용한 리액티브 마이크로 서비스

web/마이크로서비스|2018. 10. 4. 23:44

publisher와 subscriber가 외부의 메시지 큐(RabbitMQ)와 연결되어 있는 애플리케이션을 만들어보자.


Rabbitmq
 설치

- 자세한 설치법은 검색을 해서 찾아보면 간단하게 나온다.
- 하단의 내용은 local docker가 설치되어 있을때 docker-compose.yml을 작성할때 붙혀넣으면 된다.

rabbitmq:
  image: rabbitmq:management 
  ports:
    - "5672:5672” // 연결 포트 
    - "15672:15672” // 관리자 페이지 포트 (localhost:15672) 


Maven

의존성 추가

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
cs


Sender 클래스

Queue를 생성 해당 생성자의 파라미터는 Queue의 이름과 duration으로 구성된다. durable의 값이 true이면 서버가 재시작 되어도 큐에 내용이 살아있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class Sender {
 
    @Autowired
    RabbitMessagingTemplate template;
 
    @Bean
    Queue queue() {
        // 뒤에 durable의 경우에는 true이면 서버가 재시작 되어도 큐가 살아있다. (이름과 지속성여부 부여)
        return new Queue("TestQ"false);
    }
 
    public void send(String message) {
        template.convertAndSend("TestQ", message);
    }
 
}
cs


Receiver 클래스

RabbitListener을 통해 메시지를 받을 수 있다.

1
2
3
4
5
6
7
8
9
public class Receiver {
 
    // RabbitListener만 추가하면 메시지를 받을 수 있다.
    @RabbitListener(queues = "TestQ")
    public void processMessage(String content) {
        System.out.println(content);
    }
 
}
cs


Spring Application 설정

CommandLineRunner 인터페이스를 구현할 경우 Override하는 run메서드는 어플리케이션이 올라갈때 실행된다. 이때 Sender 객체에 send 메서드를 호출시켜 Queue에 메시지를 전송하다. 

정상적으로 전송이 완료되면 console창에 test 00!! 이라는 문구가 출력된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
public class MicroserviceApplication implements CommandLineRunner {
 
    @Autowired
    Sender sender;
 
    public static void main(String[] args) {
        SpringApplication.run(MicroserviceApplication.class, args);
    }
 
    @Override
    public void run(String... args) throws Exception {
        sender.send("test 00!!!");
    }
}
cs


댓글()

리액티브 마이크로 서비스 정리

web/마이크로서비스|2018. 10. 4. 01:00

동일한 마이크로서비스는 서로 공유 및 통신하게 되어있다. 

예를 들면 주문과 결재, 배송서비스들은 서로 공유되어있다. 이 서비스들의 호출 관계를 단순하게 동기 방식으로 호출하게 된다면 강한 의존성을 가지게 되기 때문에 마이크로 서비스의 강점을 충분하게 살릴 수 없다. 결국 모노토릭 서비스와 크게 다를게 없어진다.

그래서 도입되는 개념이 리액티브 마이크로 서비스이다. 
리액티브 프로그래밍은 회복성(resilient), 응답성(responsive), 메시지 기반(message driven), 탄력성(elastic) 이렇게 4가지 기둥이 존재한다. 

서로간에 마이크로서비스가 독립적으로 되어있기 때문에 특정 부분에 문제가 발생하면 해당 마이크로서비스의 복제본이 이를 대체할 수 있다. 

하지만 이렇게 격리가 되었다고 해도 서비스 사이의 통신 방식과 의존 관계가 동기 블로킹방식의 RPC로 구성된다면 서비스는 완전히 격리된 것이 아니고 서로 의존관계가 있기에 오류가 전파될 수 밖에 없다.  

그래서 서비스 사이의 통신을 비동기 논블로킹 호출을 사용하는 리액티브 스타일로 설계하는 것이 중요하다.



위에 그림을 보면 각 마이크로서비스들은 이벤트를 리스닝 하고있다. 각 서비스에서 발생한 출력값이 다른 서비스에 입력값으로 들어가게 되는데 특정 마이크로서비스의 값을 대기하는게 아니라 단지 자신의입력 큐에 이벤트가 들어오기를 기다리고 있다. 

그래서 마이크로 서비스들은 서로의 존재를 알지 못한다. 단지 이벤트 발생을 리스닝 하고 있을 뿐이다. 

이렇게 되면 한 서비스의 결과 큐를 다른 서비스의 입력 큐로 연결하는 연출(choreography)효과를 낼 수 있다.  

그렇기 때문에 동기로 호출 되기를 무작정 기다리는 블로킹보다 효율적이다.

그리고 상황에 따라서 서비스를 복제하여 여러 인스턴스로 띄워서 확장할 수 도 있다. 그러면 이벤트를 리스닝하는 서비스가 증가하고 이벤트를 받은 서비스는 각자의 역할대로 이벤트를 처리하게 되면 분산처리가 가능하다. 

이런 리액티브 마이크로서비스에는 흐름 제어가 자동으로 이루어지며 중앙에서 제어해주는 밸런서가 따로 존재하지 않는다. 

대신 메시지와 입력, 출력에 대한 마이크로서비스 사이의 관계가 주어지는데 필요에 따라 입력큐와 출력큐를 바꾸기만 하면 되기 때문에 서비스의 영향없이 운영할 수 있다. 

이런 리액티브 웹 애플리케이션을 개발할 수 있도록 스프링 부트에는 기능이 내장되어 있다. 추 후에 Spring Cloud Streams에 대해 공부해보자.


댓글()