web 170

kafka cluster replica partition reassignment

kafka replica kafka는 내장애성을 위해 복제본을 만든다고 한다. 그래서 안정적인 고가용성을 유지하기 위해 일정 수준의 replica partition이 살아있지 않은 경우 ISR(In-Sync Replica)이 복구 될 때까지 데이터를 쓸 수가 없는 상태가 된다. 만약 replica partition이 있는 broker가 down 될 경우에 새로운 broker를 클러스터에 추가하게 되면 broker와 함께 죽어버린 replica partition이 자동으로 assign 되면서 생성될까? 확인 결과 그렇지 않다고 한다. 하둡이나 ES처럼 자동으로 새로운 노드, broker가 붙는다고 해서 auto assign을 하지 않는다고 한다. (엄청 불편하군) 그럼 실제로 broker가 죽었다고 생각하..

web/kafka 2021.03.20

Kafka cluster에서 topic 지우기

카프카 토픽을 지우기 위해 kafka-topic command를 사용해서 시도했다. [root@f6ed7547e36f /]# kafka-topics --delete --zookeeper centos1:2181,centos2:2181,centos3:2181 --topic wedul Topic wedul is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. for deletion을 위함 marked가 되었다는 내용가 delete.topic.enable을 true로 하지 않으면 실제 효과가 없다는 알림을 받았다. 이에 /etc/kafka/server.properties에 있는 delete...

web/kafka 2021.03.20

Kafka Connect 정리

Kafka Connect - kafka에서 외부로 데이터로 출력을 하거나 kafka로 데이터를 입력하도록 도와주는 도구 - kafka connect는 kafka connect와 플러그인 형태로 되어 있으며 데이터를 producer를 통해 넣는 부분을 source라고 하고 consumer 쪽으로 보내는 곳을 sink라고 한다. - 공개된 플러그인이 있으며 sink, source 양쪽 모두 공개된 플러그인만 사용하는게 좋다. (신뢰도가 떨어지는 플러그인은 버그가..) - kafka connect는 브로커와 동일한 서버에서 동작할 수 있기 때문에 kafka 클러스터와 kafka connect 클러스터를 함께 구성도 가능 - kafka connect 클러스터에 source, sinke의 플러그인으로 데이터를 입..

web/kafka 2021.03.14

docker에서 Kafka + zookeeper 클러스터 구성하기

설명 주키퍼와 브로커가 한 서버에 위치하는 구조의 클러스터를 구성해보자. Docker #network - docker container가 서로 통신할 수 있도록 bridge driver형태의 네크워크 생성 sudo docker network create mynetwork #centos docker container 생성 - 일반 centos 이미지를 사용할 경우 systemd command가 정상동작하지 않기 때문에 centos/systemd 이미지를 사용할 것 docker run --privileged -d --name centos1 --network mynetwork -P centos/systemd init docker run --privileged -d --name centos2 --network ..

web/kafka 2021.02.22

Spring Webflux url length 제한 413 에러 해결 방법

프로젝트에서 사용되는 특정 api에 request param의 길이가 길어지는 api가 현재 구조상 어쩔 수 없이 존재하게 되었다. 그렇게 조심스럽게 운영하다가 request param의 길이가 너무 길어져서 api에서 413에러를 내뱉으면서 결과를 반환 받지 못하는 문제가 생겼다. 우선 nginx에 있는 request param 길이 관련 설정이 문제를 유발하는지 확인해봤다. 우선 테스트를 위한 api를 하나 만들었다. 단순하게 요청을 받고 바로 반환하는 api package com.wedul.reactivetest; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annot..

web/Spring 2020.12.26

WebFlux에서 Mono, Flux에 Map 또는 flatMap을 사용할 때 null을 리턴하는 경우

WebFlux로 구성되어 있는 프로젝트에서 Mono, Flux stream에서 map이나 flatMap을 사용해서 특정 데이터를 매핑하는 과정에서 특정한 경우에 대해서 null을 리턴하고 다음 파이프라인에서 filter로 Objects의 nonNull을 사용해서 컨텐츠를 필터링 하려고 했다. 작성하려고 한 코드의 일부 예시를 만들어서 작성해봤다. 만약 resultData를 통해 전달받은 데이터가 예상 대로라면 map과정에서 500보다 큰 5189값만 정상 반환하고 나머지는 null을 반환한 후 filter를 통해 정상적으로 하나의 데이터만 남을 것이라고 예상했다. @Test @DisplayName("map 과정에서 반환된 null이 정상적으로 필터링 되었는지 확인하는 테스트") void mono_null..

web/Spring 2020.10.30

Spring Boot version 2.1.x에서 2.2.x (spring frame 5.2 이상)으로 버전업 진행 시 Spring Cloud AWS SnsAutoConfiguration 에서 TypeNotPresentExceptionProxy 가 발생하며 실행안되는 문제

문제 발생 평소 문제가 많았던 webflux 부분 수정을 위해 spring boot 2.1.3에서 2.2.7버전으로 업그레이드를 진행하기 위해 gradle에서 spring boot version을 2.2.7로 변경하고 애플리케이션을 실행 시켰다. 그런데 평소에는 자주본적이 없던 TypeNotPresentExceptionProxy 에러가 발생했다. org.springframework.beans.factory.BeanDefinitionStoreException: Failed to process import candidates for configuration class [com.baemin.bmart.search.BmartSearchAdminApplication]; nested exception is java..

web/Spring 2020.08.21

Spring reactor 2.1.2 (netty 0.8.4) Mono.zip readTimeoutException 문제

Mono zip 각 Mono 처리 스레드를 병렬로 실행하고 이를 묶어서 사용할 수 있는게 Mono.zip이다. 근데 Mono zip에서 병렬로 실행되는 작업 중 하나가 empty 또는 error가 발생 되면 바로 error 또는 complete를 내뱉게 되어있다. 하지만 각 Mono 구독 작업에 error와 empty 발생 시 문제에 대해 fallback 처리를 해주면 에러가 발생하더라도 그 로직을 타게 되어있다. 하지만 2.1.2(netty 0.8.4) 버전을 사용하고 있을 때 호출 체인에서 첫 번째 요청의 실패 이후에 두 번째 요청이 정상적으로 이루어 지지 않아서 readTimeout이 발생되는 문제를 경험하였습니다. 이 문제를 해결하기 위해서 알아보던 중 2.1.2버전에 문제가 있는 것을 알게 되어..

web/Spring 2020.07.12

Spring boot2 resilience4j를 이용한 circuit breaker 사용

fault tolerance library (장애 허용 시스템) fault tolerance library는 무엇인가? 간단하게 이야기해보자. MSA 환경에서 한 개의 서비스에서 다른 api를 호출 할 때 일시적으로 에러가 발생하고 있다고 가정해보자. 만약 이 시기에 요청이 계속 들어오면 계속 500에러를 내보내게 된다. 그럼 사용자들은 이 서비스에 대해서 신뢰를 잃어 버리게 되고 안좋은 인식을 만들 수 있다. 그래서 특정 api 호출과 같은 작업에 에러가 발생했을 때, 그 횟수를 정해놓고 그 횟수 이상 에러를 초과하면 기존에 설정해 놓은 fallback에 맞게 동작하게 하고 일정 시간 후에 다시 시도하여 진행하는 등에 작업이 필요하다. 이게 바로 fault tolerance library (장애 허용 ..

web/Spring 2020.02.23 (1)

spring cloud resilience4j 사용시 CircuitBreakerConfiguration 에러

CircuitBreaker 테스트를 위해서 Resilience4j를 사용했다. 버전은 1.3.0을 사용하려고 했다. //Resilience4J compile("io.github.resilience4j:resilience4j-spring-boot2:1.3.0") compile("io.github.resilience4j:resilience4j-reactor:1.3.0") compile("io.github.resilience4j:resilience4j-timelimiter:1.3.0") 그런데 분명 1.3.0을 사용한다고 명시하였고 gradle도 clean하고 사용하는 denpendency도 확인하였는데 계속해서 다음과 같이 1.1.0 라이브러리를 사용하려고 해서 문제가 발행했다. Cannot resolve ..

web/Spring 2020.02.23