'web'에 해당되는 글 146건

web/node.js

node.js에서 multer 사용하여 이미지 업로드 및 텍스트 파일 다루기

Multer는 파일 업로드를 위해서 사용되는 multipart/form-data를 다루기 위한 node.js 미들웨어이다. busyboy를 기반으로 하고 있다.
자세한 내용은 https://github.com/expressjs/multer/blob/master/doc/README-ko.md 이곳에서 참고하면 된다.

그럼 간단하게 multipart/form-data로 올린 이미지 파일과 텍스트파일을 request post로 받아서 처리하는 코드를 만들어보자.

우선 multer를 설치한다.
npm i multer

그리고 이미지 파일을 특정 경로에 저장해놓고 사용할 수 있지만 나는 메모리 스토리지를 사용해서 조작하는 방식으로 진행해보겠다.

multer 라이브러리를 선언하고 memoryStorage를 사용할 수 있도록 추가적으로 선언해준다.

1
2
3
const multer = require('multer');
const storage = multer.memoryStorage();
const upload = multer({ storage });
cs

그리고 router에 미들웨어로 upload.single('productImage')를 넣어서 productImage 필드로 넘어온 값을 버퍼로 가져오도록 한다. array, fields 등등 다른 메소드들도 있으나 나는 이미지가 하나라서 single을 사용했다.

1
2
3
4
5
6
7
8
9
router.post('/test', upload.single('productImg'), async (req, res, next) => {
    try {
      console.log(req.file);
      res.json(req.body);
    } catch (e) {
      next(e);
    }
  });
 
cs

그럼 postman을 통해서 데이터를 보내보자.


전송정보

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"method": "POST",
"url": "/info/test",
"headers": {
"authorization": "Bearer 8c6076013db8af716df89b1b48b90c9b2b0fad6c",
"content-type": "multipart/form-data; boundary=--------------------------820616490467091968093229",
"cache-control": "no-cache",
"postman-token": "217b466f-1f40-402e-b2cd-153426c61cc7",
"user-agent": "PostmanRuntime/7.6.0",
"accept": "*/*",
"host": "127.0.0.1:8081",
"accept-encoding": "gzip, deflate",
"content-length": "97764",
"connection": "keep-alive"
},
"body": {},
"query": {},
"level": "info",
"message": "Access"
}
 
cs

이렇게 전송하고 debug로 전송된 정보를 체크해보면 다음과 같이 출력된다.

이 정보를 이용해서 text 파일들은 파일대로 다루고 이미지 파일은 버퍼를 사용해서 s3에 저장을 하던지 여러가지 동작을 진행할 수 있다.

web/kafka

[번역] shared message queues와 publish-subscribe 방식에 Custom Group 방식을 더한 Kafka 소개

전통적으로 메시지 모델은 Shared Message Queue, Publish-subscribe로 구분된다. 두 가지 모델 모두 그들만에 pros and cons를 보유하고 있다. 하지만 이 두개의 모두 최초 디자인 제한 때문에 큰 데이터를 다루기에는 부족했다. Apache Kafka는 두 모델 중 publish-subscribe 메시징 모델을 구현한 모델로 부족했던 부분을 수정하고 실시간 분석을 위한 스트리밍 데이터를 처리할 수 있도록 가능해졌다. kafka는 LinkedIn에서 2010년에 방대한 데이터 처리를 위해서 개발되었다. Apache Kafka는 전통적인 메시징 모델이 달성하지 못한 격차를 해소했다. Kafka는 두 모델의 개념을 구현하여 단점을 극복하고 동시에 두 가지 방법론을 모두 통합 할 수있는 유연성을 제공한다.


Shared Message Queues
Shared Message Queues 메시지 큐는 producer에서 single consumer에게 스트리밍 데이터를 전송할 수 있다. 큐에 저장된 메시지는 한번만 읽기가 가능하고 하나의 consumer만 읽을 수 있다. subscribers는 메시지를 큐의 끝에서 메시지를 읽어서 가지고 온다. Queueing 시스템은 성공적으로 읽혀진 메시지를 큐에서 제거한다.

약점
한번 읽고 지워지는 SharedMessage Queue는 같은 도메인에 속해 있고 event-driving programming을 하는 명령어와 같은 메시지에서 적합하다. 만약 많은 consumer들이 shared queue에 접근을 하게 된다면 해당 컨슈머들은 logical domain이 같아야 하고 같은 기능을 수행해야 한다. 그렇기 때무에 shared queue는 single domain 소비로 제한된다.

Publish-Subscribe System
publish-subscribe모델은 여러 publisher가 발행이 가능하고 여러 subscriber가 구독이 가능하게 설계되어 있다. 그래서 모든 메시지는 토픽을 구독하는 모든 subscriber들에게 전송이 가능하도록 되어있다.

약점
subscriber로부터 publisher의 logical 결합이 loosely-coupled 되어 있지만 scale은 한정적이다. 각각의 subscriber는 모든 파티션으로 부터 메시지를 접근하기 위해서는 모든 파티션을 접근해야한다. 그러므로 전통적인 pub-sub 모델은 작은 네트워크에서 동작하도록 되어있다.

또한 subscriber와 publisher에 디커플링이 메시지의 신뢰도를 낮추는 영향을 준다. 모든 메시지가 모든 subscriber들에게 전송되기 때문에 메시지가 다른 subscirber에게 전송되는 경우 subscriber들 사이에 sync를 맞추는게 실질적으로 어렵다.


그럼 어떻게 Kafka는 두 모델을 결합했을까?
kafka는 shared message queue 시스템과 pub-sub 모델의 장점을 가지고 만들어졌다. 그 성공은 두개의 컨셉을 기준으로 만들어졌다.
  • consumer group 사용
  • broker들로 부터 메시지 리텐션

consumer가 그룹에 소속되고 topic을 구독할 때 오직 하나의 consumer만 그룹내에서 토픽의 메시지를 읽는다. 그리고 메시지는 broker 내부 토픽에서 사라지지 않고 보유되는데 이는 shared message queue 시스템과 다른점이다.

여러 consumer group은 같은 토픽에서 값을 읽을 수 있으며, 또한 서로 다른 logical application domain에서 다른 시간데에서도 읽을 수 있다. 그러므로 kafka는 같은 consumer group에 속한 consumer들의 높은 확정성을 제공하고 동시에 독립적인 애플리케이션들이 동작할 수 있는 이점이 있다.

Consumer Group
consumer group은 kafka가 message queue와 pub-sub 모델들의 이점을을 가질 수 있도록하는 유연성을 제공한다. 같은 그룹에 속한 consumer들은 group id를 공유한다. 이 consumer들은 토픽의 파티션을 공장하게 나눈다. 이 각각의 파티션들은 오직 그룹내에 하나의 consumer에서만 소비된다.

kafka Consumer Groups
만약 같은 그룹에 모든 consumer가 들어있으면 kafka 모델은 전통적인 message queue처럼 동작한다. 왜냐면 각각의 메시지가 하나의 consumer에게만 발행되는 부분이 같기 때문이다. 각각의 파티션은 거의 그룹내에 하나의 consumer와 연결된다.

여러 consumer group가 존재할 때 데이터 소비 모델은 전형적인 pub-sub 모델을 따른다. 메시지는 모든 consumer group에게 전송된다.

만약 하나의 consumer만 들어있는 그룹이 있으면 그 consumer가 모든 파티션을 담당한다.

이상적으로 topic의 파티션 수와 consumer group에 consumer 수가 맞으면 최적으로 효율을 가진다. 만약 consumer가 파티션보다 많으면 consumer들이 idle상태에 빠지게 되므로 자원 낭비가 발생된다. 만약 partition이 consumer보다 많은 경우 consumer들은 여러 파티션에서 값을 있는데 이는 각 파티션에서 읽는 값이 서로 순서가 맞지 않게 읽게 되기 때문에 문제의 소지가 있다. kafka는 파티션 사이에서 메시지의 순서를 보장하지 않는다. 그러므로 kafka는 오직 하나의 consumer가 하나의 파티션의 내용을 구독할 때만 순서가 보장된다. 메시지는 또한 processing중에 그룹화된 키를 통해서 정렬될 수 있다.

kafka는 offset commit과 form을 사용하여 브로커에서 구독 그룹으로 메시지가 전송되었느지 보증한다. 파티션은 consumer그룹내에 오직 하나 또는 하나 이상의 관계를 consumer와 맺을 수 있기 때문에 메시지 중복을 피하기 위해서 한번에 그룹내에서 한번에 하나의 그룹에게만 메시지를 전송한다.

Reblancing
consumer그룹이 scales up & down을 하기 때문에 동작중인 consumer들은 파티선을 그 들 사이에서 쪼갠다. Reblancing은 consumer와 broker의 충돌 또는 topic이나 partition 추가로 인해 파티션과 consumer의 소유권이 변경되면서 진행된다. 이 매커니즘을 이용하여 안전하게 시스템으로 부터 consumer의 추가와 제거가 가능하다.
-> 요약하면 consumer에 문제가 발생하면 다른 consumer가 파티션의 메시지를 받는다 이런 것 같다.

kafka가 시작되면서 브로커는 consumer의 RegisterConsumer 요청을 수신한 consumer group의 하위 집합에 대한 코디네이터로 표시되고 소유해야할 파티션 목록이 포함된 RegisterConsumer Response를 반환한다. 또한 코디네이터는 컨슈머가 살아있거나 죽었는지 체크하고 결함을 찾기위해 동작을 시작한다. 또한 세션이 끊어지기 전에 consumer가 코디네이터 브로커에게 heartbeat 신호 전송에 실패하면 코디네이터는 해당 consumer를 dead로 표시하고 rebalance를 실행한다. 세션 타임아웃 시간은 session.timeout.ms 속성에서 설정할 수 있다. 예를들어 그룹 A의 C2가 실패 했으면 C1그리고 C3가 짧게 자기들의 파티션에서의 메시지 소비를 정지하고 파티션들은 C1과 C2에 대해서 reassian된다. C2 consumer는 잃게 되지만 rebalancing 프로세스가 실행되고 파티션이 그룹내에 다른 consumer들에게 재 할당된다. GroupB는 Group A의 이런 현상이 전혀 영향을 주지 않는다.

결론
shared message queue는 메시지 프로세스내에서 scale을 허용한다 하지만 싱글 도메인에서만 사용이 가능하다. pub-sub 모델은 consumer들에게 브로드캐스팅을 지원하지만 scale이 제한된다. kafka는 shared message queue에서 scale을 가져왔고 pub-sub 아키텍쳐를 consumer gorup를 구현함으로써 단점을 보안하여 재 구현하여 가져왔다. consumer group를 구현함으로써 scale과 멀티 도메인을 사용할 수 있게 되었다. 그리고 kafka에 rebalacing을 통해서 그룹내에서 문제 발생시 문제를 해결할 수 있다.

원문
https://blog.cloudera.com/blog/2018/05/scalability-of-kafka-messaging-using-consumer-groups/




web/Spring

Spring Boot에서 6.4 Elasticsearch 연결 및 간단 CRUD

Elasticsearch를 Spring Boot에서 작업을 하는 간단한 정리를 해보자.


1. Library 추가

Elasticsearch를 사용하기 위해서는 spring-data-elasticsearch 라이브러리가 추가되어야 한다. 

gradle에 추가해보자.

1
2
3
4
5
6
7
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    compileOnly "org.projectlombok:lombok:1.16.16"
}
 
cs


spring-data-elasticsearch 버전별로 호환되는 elasticsearch가 상이하니 참고

spring data elasticsearchelasticsearch
3.2.x6.5.0
3.1.x6.2.2
3.0.x5.5.0
2.1.x2.4.0
2.0.x2.2.0
1.3.x1.5.2


2. Configuration

Elasticsearch에 접속하기 위한 Configuration을 정의해준다.

Elasticsearch  접속을 위해서는 host, port, cluster name이 필요하다. cluster name을 알아야 하는데 docker에 설치 한 경우 여기서 확인하면 된다.

우선 docker exec -it elastic bash로 콘솔에 접속한 후에 elasticsearch.yml에 적혀있는 cluster name을 확인한다.

그리고 application.properties에 설정 내용을 적어준다.

1
2
3
4
elasticsearch.host=127.0.0.1
elasticsearch.port=9300
elasticsearch.cluster_name=docker-cluster
spring.main.allow-bean-definition-overriding=true
cs

그리고 EnableElasticsearchRepositories 애노테이션을 설정한 Configuration 클래스를 만들어준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.elasticsearch.study.configuration;
 
import org.springframework.beans.factory.annotation.Value;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
 
import java.net.InetAddress;
 
/**
 * Elasticsearch Configuration
 *
 * @author wedul
 * @since 2019-02-09
 **/
@EnableElasticsearchRepositories
@Configuration
public class ElasticConfiguration {
 
  @Value("${elasticsearch.host}")
  private String host;
 
  @Value("${elasticsearch.port}")
  private int port;
 
  @Value("${elasticsearch.cluster_name")
  private String clusterName;
 
  @Bean
  public Client client() throws Exception {
    Settings settings = Settings.builder().put("cluster.name", clusterName).build();
 
    TransportClient client = new PreBuiltTransportClient(settings);
    client.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
    return client;
  }
 
  @Bean
  public ElasticsearchOperations elasticsearchTemplate() throws Exception {
    return new ElasticsearchTemplate(client());
  }
 
}
 
cs


3. DTO 생성

Elasticsearch에서 Document 내용을 담을 DTO를 만들어주고 @Document 애노테이션을 달고 index name과 type을 정의해준다.

@Id 어노테이션이 붙은 필드는 각 Doucument에 붙어있는 _id 값이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.elasticsearch.study.dto;
 
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
 
/**
 * studyFor
 *
 * @author wedul
 * @since 2019-02-09
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document(indexName = "wedul_play", type = "story")
public class WedulPlay {
 
  @Id
  private String id;
  private String title;
  private String user;
  private long startAt;
  private long endAt;
 
}
 
cs


4. Repository

JPA를 사용하면 익숙할 패턴으로 Elasticsearch에서도 ElasticsearchRepository가 존재한다. 사용방법은 JPA와 동일하게 저장할 때는 save, 조회할 때는 find(), findByUser()등으로 사용할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.elasticsearch.study.repository;
 
import com.elasticsearch.study.dto.WedulPlay;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-02-09
 **/
@Repository("wedulPlayRepository")
public interface WedulPlayRepository extends ElasticsearchRepository<WedulPlay, String> {
 
  WedulPlay findByUser(String user);
  
}
 
cs


5. Service

지금 테스트 하는 부분에서는 크게 비즈니스 로직에 들어갈 소스가 없다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.elasticsearch.study.service;
 
import com.elasticsearch.study.dto.WedulPlay;
import com.elasticsearch.study.repository.WedulPlayRepository;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Service;
 
import java.util.List;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-02-09
 **/
@AllArgsConstructor
@NoArgsConstructor
@Service
public class WedulPlayService {
 
  private WedulPlayRepository wedulPlayRepository;
 
  public void save(WedulPlay play) {
    wedulPlayRepository.save(play);
  }
 
  public List<WedulPlay> findAll() {
    return Lists.newArrayList(wedulPlayRepository.findAll());
  }
 
  public WedulPlay findByUser(String user) {
    return wedulPlayRepository.findByUser(user);
  }
 
}
 
cs


6. Test 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.elasticsearch.study.wedulplay;
 
import com.elasticsearch.study.dto.WedulPlay;
import com.elasticsearch.study.repository.WedulPlayRepository;
import com.elasticsearch.study.service.WedulPlayService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
import java.util.List;
 
import org.hamcrest.core.IsNull;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
 
/**
 * wedul play document 조회
 *
 * @author wedul
 * @since 2019-02-09
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class WedulPlayTest {
 
  WedulPlayService wedulPlayService;
 
  @Autowired
  @Qualifier("wedulPlayRepository")
  WedulPlayRepository wedulPlayRepository;
 
  @Before
  public void setup() {
    wedulPlayService = new WedulPlayService(wedulPlayRepository);
  }
 
  @Test
  public void whenValidParameter_thenSuccessFind() {
    List<WedulPlay> list = wedulPlayService.findAll();
 
    assertNotNull(list);
  }
 
  @Test
  public void whenValidParameter_thenSuccessSave() {
    Exception ex = null;
 
    try {
      wedulPlayService.save(WedulPlay.builder().title("안녕 이건 테스트야").user("위들").startAt(1242421424).endAt(23214124).build());
    } catch (Exception exception) {
      ex = exception;
    }
 
    assertTrue(null == ex);
  }
 
  @Test
  public void whenValidParameter_thenSuccessFindByUser() {
    Exception ex = null;
 
    try {
      WedulPlay play = wedulPlayService.findByUser("위들");
 
      assertThat(play, is(IsNull.notNullValue()));
    } catch (Exception exception) {
      ex = exception;
    }
 
    assertTrue(null == ex);
  }
 
 
}
 
cs


설정이 간단하다. 

나중에 이용해 먹어야지


자세한 소스코드는 여기 참조

https://github.com/weduls/spring_elastic

web/node.js

Nginx, Apache 그리고 node.js 성능 관련 잡다한 정리

Nginx과 Apache 비교


 

 Nginx

Apache 

특징 

- Nginx는 싱글 스레드 Event driven 방식

- 미리 설정된 worker 프로세스 안에서 요청이 들어올 때 마다 요청을 분배하여 worker에게 역할을 분배

- 기존에 정해놓은 리소스를 사용하기 때문에 CPU, Memory 등의 자원 사용률이 낮음

- 요청이 올 때마다 쓰레드를 생성하여 할당한다. 작업이 많아질 경우 많은 쓰레드할당이 필요하다. 그리고 쓰레드들이 작업을 진행 할 때마다  CPU를 사용하려 하기 때문에 문맥교환이 자주 발생된다. 

차이점 

 - Apache에 경우 Blocking 방식으로 Network, DB 등 별도의 동작이 진행 될 때 Block되지만 Nginx는 Non-blocking 방식을 지원함


Non-blocking, Event Loop 방식의 Node.js


- Event Loop 방식은 요청이 오면 무조건 이벤트 핸들러에게 요청을 넘김. 그 것이 처리되면 완료되었다고 연락이온다. 그래서 Single Thread Non-blocking 방식이 가능

- Single Thread이기 때문에 Multi thread의 자원에 대한 크리티컬 섹션에 대한 문제가 발생하지 않음.

- Node.js는 Single Thread를 사용하기 때문에 CPU에 문맥교환이 발생하지 않아서 Multi Thread를 지원하는 프레임워크보다 무조건 빠를다고 하지만 그렇지는 않다. Node.js에서 요청은 비동기를 지원하지만 내부적으로 Event를 처리할 때는 libio의 Thread Pool에 의해 동작하기 때문에 I/O 작업이 많을 때는 Multi Thread 방식이 유리하다.(Node.js는 메시지 처리와 같이 I/O가 적은 작업이 어울림.) 

- Single Thread이기 때문에 CPU를 많이 잡아먹는 연산일 경우 다른 작업도 모두 느려지기 때문에 연산이 오래걸리는 작업에는 어울리지 않음.

- Node.js에 특성상 오류가 발생하면 프로그램이 죽어버림

- Single Thread Async를 지원하고 있지만 네트워크, DB와 같은 I/O가 발생할 때는 libio의 Thread Pool에 작업이 전달되고 완료되면 EventQueue에 CallBack 함수에 전달된다.


=> 결론 

Single Thread를 사용. 비동기지만 I/O 발생 시 내부적으로 비동기처럼 동작하기 위해 쓰레드 풀을 사용하여 진행한다. 그래서 요청이 많을 경우 문맥교환이 일어나는 Multi Thread보다 오히려 성능이 나쁠 수 있다.


Nginx와 Node.js

Nginx와 Node.js를 사용할 때 성능적으로 도움이 되는 설정을 정리했다.


Port Range

- Nginx에서 Node.js에 요청을 보낼 때는 2개의 TCP 소켓이 필요하다.(2개의 포트가 필요) 그래서 가용 가능한 포트가 적을 경우에 요청을 받지 못하는 문제가 발생한다. 그래서 상황에 따라서 sysctl의 net.ipv4.ip_local_port_range를 사용하여 설정한다.


Time Wait

- Time wait는 커넥션이 종료 되었으나 할당된 연결이 아직 release가 되지 않은 자원이다. 그래서 요청이 와도 자원을 사용할 수 없어서 문제가 발생할 수 있어서 이를 사용할 수있게 해줘야 한다. sysctl의 net.ipv4.tcp_tw_reuse값을 1로 변경해줘야 한다.


Time Wait 시간

- Time Wait 자원을 사용할 수 있는 자원으로 변경되는 시간을 줄여서 빠르게 사용할 수 있게 해주는 것이 좋다. sysctl의 net.ipv4.tcp_fin.timeout값을 줄이면 된다.


Context Switch 

Node.js에서 사용중인 CPU 코어가 다른 프로세스에서 사용되지 않도록하면 성능 향상에 도움이 된다. 여러 프로세스에서 하나의 코어를 같이 사용할 수 있다. 각각의 프로세스는 돌아가면서 코어 스케줄대로 이용한다. 이 경우 문맥교환이 많이 발생하여 CPU 부하가 증가한다. 그렇기 때문에 node에서 사용하는 CPU 코어는 다른 업무를 못하게하여 서로 코어를 잡아먹으려 하는 문맥교환을 줄여야한다.

※ Context switch

- 문맥 교환(Context Switch)이란 하나의 프로세스가 CPU를 사용 중인 상태에서 다른 프로세스가 CPU를 사용하도록 하기 위해, 이전의 프로세스의 상태(문맥)를 보관하고 새로운 프로세스의 상태를 적재하는 작업을 말한다.


현재 Connection 상태를 확인하는 명령어

netstat -tan | awk '{print $6}' | sort | uniq -c


Nginx에서 Upstream이란?

Nginx 설정에서 Upstream이 있다. 여기서 Upstream은 순차적으로 서비스를 처리하기 위해 사용되는 서버를 의미한다. Nginx에서 내장된 Upstream 모듈은 설정된 서버들의 부하분산, 속도 개선을 담당한다. 일반적으로 설정된 서버의 Upstream은 라운드 로빈 방식으로 진행된다.


※ 라운드 로빈

라운드 로빈 스케줄링(Round Robin Scheduling, RR)은 시분할 시스템을 위해 설계된 선점형 스케줄링의 하나로서, 프로세스들 사이에 우선순위를 두지 않고, 순서대로 시간단위(Time Quantum)로 CPU를 할당하는 방식의 CPU 스케줄링 알고리즘이다.


web/Spring

kafka docker에 간단 설치 후 Spring boot 연동 테스트

간단하게 Kafka 설치

docker-compose.yml 생성 후 docker-compose up -d를 통해 설치

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: wedul.pos
      KAFKA_CREATE_TOPICS: "test:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
cs


설치된 프로세스 확인


생성된 토픽 확인

- test 토픽이 파티션 1와 replication 1로 생성되었는지 여부 확인

1
2
3
$ docker exec -it simple_kafka_1 bash
$ cd /opt/kafka/bin
$ kafka-topics.sh --describe --topic test --zookeeper simple_zookeeper_1
cs


Spring Boot 프로젝트 생성


카프카 설정

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.kafka.study.configuration;
 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * 카프카 설정
 *
 * @author wedul
 * @since 2019-01-24
 **/
@Configuration
@EnableKafka
@PropertySource("classpath:kafka.properties")
public class KafkaConfiguration {
 
  @Autowired
  private Environment env;
 
  private Map<String, Object> producerConfig() {
    Map<String, Object> config = new HashMap<>();
 
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
    return config;
  }
 
  @Bean
  public KafkaTemplate<StringString > kafkaTemplate() {
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
  }
 
}
 
cs


카프카 컨트롤러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.kafka.study.ctrl;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-01-24
 **/
@RestController
@Slf4j
public class KafkaCtrl {
 
  private final KafkaTemplate kafkaTemplate;
 
  public KafkaCtrl(KafkaTemplate kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }
 
  @PostMapping("/send")
  public ResponseEntity<String> sendMessage(String message) {
    if(!StringUtils.isEmpty(message)) kafkaTemplate.send("test""Message is " + message);
    log.info(message);
    return ResponseEntity.ok("");
  }
}
 
cs


Console-consumer 모니터링 모드

카프카에 메시지를 send 했을 때 모니터링하기 위한 모드 스크립트 실행

1
bash-4.4# kafka-console-consumer.sh --bootstrap-server wedul.pos:9092 --topic test
cs


실행해보면 콘솔에 메시지가 전송된것을 확인할 수 있다.


Kafka Licenser 설정 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kafka.study.configuration;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-01-25
 **/
@Slf4j
@Component
public class ReceiveConfiguration {
 
  @KafkaListener(topics = "test", groupId = "console-consumer-1970")
  public void receive(String payload) {
    log.info("received payload='{}'", payload);
  }
 
}
 
cs


보내면 바로 consumer에서 메시지를 받을 수 있도록 리스너를 설정해보자.

그리고 테스트!

1
2
3
4
5
2019-01-25 00:09:43.033  INFO 1760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-01-25 00:09:43.034  INFO 1760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-01-25 00:09:43.041  INFO 1760 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 8gNzLx__SHq-4p0b_WsydA
2019-01-25 00:09:43.047  INFO 1760 --- [nio-8080-exec-1] com.kafka.study.ctrl.KafkaCtrl           : babo
2019-01-25 00:09:43.069  INFO 1760 --- [ntainer#0-0-C-1] c.k.s.c.ReceiveConfiguration             : received payload='Message is babo'
cs


git 저장소 : https://github.com/weduls/kafka_example

푸터바

알림

이 블로그는 구글에서 제공한 크롬에 최적화 되어있고, 네이버에서 제공한 나눔글꼴이 적용되어 있습니다.

카운터

  • Today : 36
  • Yesterday : 651
  • Total : 55,511