Elasticsearch의 노드 

Elasticsearch의 인스턴스를 시작하는 동시에 노드도 같이 시작된다. 노드들을 연결해놓은 것을 클러스터라고 한다.

만약 하나의 엘라스틱 서치 노드만을 실행시킨 경우도 하나의 노드를 가진 클러스터라고 한다. 

클러스터안에서 모든 노드는 HTTP와 Transport 트래픽을 기본적으로 다룬다. Transport 레이어는 오로지 노드들과 Java TransportClient와의 통신에만 사용된다. Http 레이어는 오직 외부 Rest Cliente들과 통신할 때 사용된다.

모든 노드는 클러스터 안에서 서로 다른 노드들에 대하여 알고 있고 client에 요청을 적적한 노드로 향하게 조절해준다. 기본적으로 노드는 master-eligible, data, ingest, machine learning이 존대한다.

 

Elasticsearch의 노드 종류

Master-eligible 노드

- node.master를 true로 지정하며 클러스터의 컨트롤을 통해 마스터 노드로 선택될 자격을 가지게 된다.

- 마스터 노드는 클러스터에서 인덱스를 만들고 지우는 행위, 클러스터에서 노드들을 트래킹하고 각각의 노드를 샤드를 할당할건지 결정한다.

- Masger Eligible Node등에서 마스터 노드는 마스터 설출 프로세스를 통해 선출된다.

(https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html)

- 마스터 노드는 데이터 노드처럼 데이터와 폴더에 접근 권한이 있어야 한다. 노드가 재시작 하는 사이에도 클러스터 상태가 유지되어야 하기 때문에 접근이 가능해야 한다.

- 데이터를 인덱싱하고 찾고 하는 작업은 CPU, Memory, I/O 자원을 많이 사용하기 때문에 큰 규모에서는 data node와 master node를 구별한다.

- 마스터 노드도 Coordinating Node 처럼 데이터를 routing하고 모으고 하는 작업이 가능하지만 이는 마스터 노드가 하는 주 목적이 아니다. 안정적인 마스터 노드를 운영하기 위해서는 자기 일만 딱 하게 해주는 것이 좋다.

- 마스터 노드로 노드를 지정하기 위한 기본 설정 값은 다음과 같다.

node.master: true 
node.data: false 
node.ingest: false 
node.ml: false 
xpack.ml.enabled: true 
cluster.remote.connect: false

 

Data 노드

- node.data가 true로 지정된 노드는 데이터를 가지고 있을 수 있고 CRUD, 검색, aggregation 등의 데이터와 관련된 작업이 가능하다.

- 데이터 노드는 인덱싱 된 Document를 포함하고 있는 샤드를 관리한다.

- 데이터 노드는 데이터를 직접적으로 다루기 때문에 리소스 자원이 많이 필요하다.

- 데이터 노드로 노드를 지정하기 위한 기본 설정 값은 다음과 같다.

node.master: false 
node.data: true 
node.ingest: false 
node.ml: false 
cluster.remote.connect: false

 

Ingest 노드 

- node.ingest가 true로 지정된 노드가 Document가 인덱싱 되기 전에 변형되고 풍성하게 하기 위해서 Document를 ingest pipeline으로 적용할 수 있다.

- ingetst node는 pre processing 파이프라인을 실행하고 하나 또는 하나 이상의 ingest processor들을 모으는 작업을 한다.

- ingest를 로드하는건 무겁기 때문에 데이터나 마스터 노드에서는 node.ingest를 false로 지정하는 것이 좋다.

- 많은 리소스를 잡아먹기 때문에 ingest node는 별도로 지정하는 것이 좋다.

- ingest 노드로 노드를 지정하기 위한 기본 설정 값은 다음과 같다.

node.master: false 
node.data: false 
node.ingest: true 
node.ml: false 
cluster.remote.connect: false

 

Machine Learning 노드

- xpack.ml이 true로 지정되어 있고 node.ml이 true로 설정되어 있는 노드는 기본적으로 엘라스틱서치에서 분배하는 행위를 한다.

- 만약 머신러닝 특징을 사용하고 싶으면 적어도 클러스터 내에 하나의 머신러닝 노드가 있어야 한다.

 

Coordinating 노드

- 검색 요청과 bulk indexing과 같은 요청들은 다른 노드들의 있는 데이터를 많이 다룬다.

- 데이터가 흩어져 있는 경우 데이터가 있는 노드로 향하게 조정해준다. 각각의 데이터 노드는 요청을 자체적으로 처리하고 그것의 값을 Coordinating 노드에 전달해준다. 그럼 Coordinating 노드는 이를 모아서 하나의 데이터 형태로 정제하여 반환한다.

- 각각의 노드는 Coordinating node가 될 수 있다. 대신 node.master, node.data, node.integer가 false로 되어 있어야 한다.

- 또한 데이터를 모으고 조작하고 하는 작업이 많기 때문에 Coordinating 노드는 메모리랑 CPU에 대한 자원이 많아야 한다. 그렇기 때문에 오직 요청을 라우팅하고 검색 구절을 조절하고, bulk indexing 분배작업을 하는 노드로만 사용하는 게 좋다.

- Coordinating node는 본질적으로 로드 밸런싱 같은 역할을 한다.

- Coordinating node로 지정하기 위한 설정은 다음과 같다.

node.master: false 
node.data: false 
node.ingest: false 
node.ml: false 
cluster.remote.connect: false

 

Node Data Path Setting

path.data

- 모든 데이터와 master-eligible 노드는 샤드 그리고 인덱스, 클러스터 메타데이터가 저장되어 있는 데이터 디렉토리를 접근한다.

- path.data는 기본적으로 $ES_HOME/data로 지정되어 있지만 elasticsearch.yml을 통해서 바꿀 수 있다.

배치를 이용해서 Elasticsearch에 데이터를 삽입하던 중 version conflict라는 오류가 자주 발생했다. 처음에는 Elasticsearch 버전이 동일한데 왜? 오류가 나는지 몰랐다.

그래서 검색해보니 인덱스안에 document에는 각자 관리하는 version이 존재한다. 이 version은 document가 수정될 때 하나씩 올라가게 되는데 version이 10인 상태에 document에 여러 서버 모듈에서 해당 document에 업데이트를 하려고 하니 문제가 발생하였다.

그 이유는 version 10인 상태에서 작업에 들어간 두 모듈은 한 모듈이 먼저 11로 업데이트를 시키고 다음 모듈이 작업을 진행하려고 할 때 자기가 알고 있던 마지막 version인 10이 아니라 11로 바껴있는것을 보고 에러를 뱉어내는것이다. 이렇게 까지 세심하게 챙겨줄지 몰랐다. 알면 알수록 elasticsearch라는 db는 정말 매력적이다.

PUT wedul_index 
{
  "mappings": {
      "_doc": {
        "dynamic": "false",
        "properties": {
          "name": {
            "type": "text"
          }
        }
      }
  }
}

위와 같이 인덱스가 있고 document 하나가 들어있다. 여기에 age라는 값과 gender를 집어넣어보자. 이를 동시에 호출해보자.

document

그럼 document 하나에 필드를 동시에 업데이트하는 update.sh라는 스크립트를 만들어서 실행시켜보자.

curl -X POST "localhost:9200/wedul_index/_update_by_query" -H 'Content-Type: application/json' -d' { "script": { "source": "ctx._source[\u0027gender\u0027] = \u0027M\u0027"}, "query": { "match": { "name": "위들" } } } ‘
curl -X POST "localhost:9200/wedul_index/_update_by_query" -H 'Content-Type: application/json' -d' { "script": { "source": "ctx._source.age = 10", "lang": "painless" }, "query": { "match": { "name": "위들" } } } ‘

그럼 위에 설명했던 것 처럼 버전이 먼저 변경이 되면서 다음과 같은 에러를 뱉어낸다.

[{"index":"wedul_index","type":"_doc","id":"3MSd5WsB_jV9Cf9TkYLV","cause":{"type":"version_conflict_engine_exception","reason":"[_doc][3MSd5WsB_jV9Cf9TkYLV]: version conflict, current version [3] is different than the one provided [2]","index_uuid":"sJI8sBnrTP-OW8OG8YBqWA","shard":"3","index":"wedul_index"},"status":409}]

 

이를 해결하기 위해서는 retry_on_conflict 옵션을 함꼐 부여할 수 있는데 이 옵션은 version conflict이 발생했을 때, 업데이트 재시도를 몇회 할건지 지정하는 옵션이다.

좀 더 자세한 사항은 아래 elasticsearch 메뉴얼을 보면 자세히 나와있다.

참조
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html

Elasticsearch reindex를 진행할 때, 단순하게 새로운 인덱스를 만들고 reindex api를 진행하고 기존 인덱스를 지우고 새로 만들어서 다시 reindex를 해줬다. (이전글: https://wedul.site/611?category=680504)

하지만 그것은 해당 인덱스의 document의 수가 적어서 금방 진행이 되었었고 만약 document수가 10만가지만 넘어도 생각보다 오래걸려서 서비스의 흐름이 끊어지게 된다는걸 인지하지 못했다. 같은 회사 동료분께서 해당 부분에 대해서 말씀해주셨고, 그 분이 가이드 해주신대로 진행해서 reindex를 무중단하게 진행하는 방법을 찾아봤다.

 

Alias를 이용하여 reindex하기


기존 index wedul의 매핑구조이다.

PUT wedul 
{
  "mappings": {
    "dynamic": false,
    "properties": {
      "name": {
        "type": "text"
      }
    }
  }
}

해당 인덱스의 데이터는 현재 다음과 같이 들어있는 것을 볼 수 있다. 여기서 age는 매핑이 안되어있어서 검색에 잡을 수 없기에 이를 reindex를 통해 매핑 정보를 업데이트해주자.

wedul 인덱스에 들어있는 데이터(왼), age로 검색이 안됨 (우)

그럼 reindex를 위해 새로운 인덱스 wedul_v1을 만들어보자.

reindex를 진행할 새로운 index, wedul_v1

그리고 wedul_v1으로 reindex를 실행해준다. 이때 주의사항이 있는데 document양이 10만 이상이 넘어가게 되면 작업이 오래걸리기에 kibana에서 504 gateway timeout이 발생하고 작업이 중단된다. 그래서 해당 작업을 비동기로 실행시키는 옵션인 wait_for_completion=false를 함께 설정해주고 진행해야한다.

POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "wedul"
  },
  "dest": {
    "index": "wedul_v1"
  }
}

그럼 위에 이미지처럼 task 프로세스 번호가 나오고 이 프로세스에 시작시간 상태 취소 가능여부 등등을 GET _task 명령어를 통해 볼 수 있다. 여기서 프로세스가 종료되면 reindex가 다 된것이다.

그 다음 wedul_v1에 wedul이라는 alias를 지정해줘야한다. 

POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "wedul_v1",
        "alias": "wedul"
      }
    }
  ]
}

alias를 지정하기 전에 기존 인덱스 wedul을 지워줘야한다. DELETE wedul 명령어를 날려서 기존 인덱스를 지우고 위의 alias 명령어를 실행시킨다. 

그럼 정상적으로 alias를 통해 무중단 reindex를 실행되었다. 정상적으로 실행 되었는지 age에 대한 query를 날려보자.

ㅋㅋ 정상적으로 실행되었다.

앞으로 이런 방식으로 진행해야겠다.

 

출처 : https://discuss.elastic.co/t/reindex-big-index/83047

 

Reindex big index

I would like to reindex a very big index. When I run reindex API with elasticsearchjs client I will receive the requestTimeout error, or Gateway timeout error. It's ok because the reindex process is still running in Elastic server. However, what I want to

discuss.elastic.co

https://www.elastic.co/kr/blog/changing-mapping-with-zero-downtime

Elasticsearch를 Spring Boot에서 작업을 하는 간단한 정리를 해보자.


1. Library 추가

Elasticsearch를 사용하기 위해서는 spring-data-elasticsearch 라이브러리가 추가되어야 한다. 

gradle에 추가해보자.

1
2
3
4
5
6
7
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    compileOnly "org.projectlombok:lombok:1.16.16"
}
 
cs


spring-data-elasticsearch 버전별로 호환되는 elasticsearch가 상이하니 참고

spring data elasticsearchelasticsearch
3.2.x6.5.0
3.1.x6.2.2
3.0.x5.5.0
2.1.x2.4.0
2.0.x2.2.0
1.3.x1.5.2


2. Configuration

Elasticsearch에 접속하기 위한 Configuration을 정의해준다.

Elasticsearch  접속을 위해서는 host, port, cluster name이 필요하다. cluster name을 알아야 하는데 docker에 설치 한 경우 여기서 확인하면 된다.

우선 docker exec -it elastic bash로 콘솔에 접속한 후에 elasticsearch.yml에 적혀있는 cluster name을 확인한다.

그리고 application.properties에 설정 내용을 적어준다.

1
2
3
4
elasticsearch.host=127.0.0.1
elasticsearch.port=9300
elasticsearch.cluster_name=docker-cluster
spring.main.allow-bean-definition-overriding=true
cs

그리고 EnableElasticsearchRepositories 애노테이션을 설정한 Configuration 클래스를 만들어준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.elasticsearch.study.configuration;
 
import org.springframework.beans.factory.annotation.Value;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
 
import java.net.InetAddress;
 
/**
 * Elasticsearch Configuration
 *
 * @author wedul
 * @since 2019-02-09
 **/
@EnableElasticsearchRepositories
@Configuration
public class ElasticConfiguration {
 
  @Value("${elasticsearch.host}")
  private String host;
 
  @Value("${elasticsearch.port}")
  private int port;
 
  @Value("${elasticsearch.cluster_name")
  private String clusterName;
 
  @Bean
  public Client client() throws Exception {
    Settings settings = Settings.builder().put("cluster.name", clusterName).build();
 
    TransportClient client = new PreBuiltTransportClient(settings);
    client.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
    return client;
  }
 
  @Bean
  public ElasticsearchOperations elasticsearchTemplate() throws Exception {
    return new ElasticsearchTemplate(client());
  }
 
}
 
cs


3. DTO 생성

Elasticsearch에서 Document 내용을 담을 DTO를 만들어주고 @Document 애노테이션을 달고 index name과 type을 정의해준다.

@Id 어노테이션이 붙은 필드는 각 Doucument에 붙어있는 _id 값이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.elasticsearch.study.dto;
 
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
 
/**
 * studyFor
 *
 * @author wedul
 * @since 2019-02-09
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document(indexName = "wedul_play", type = "story")
public class WedulPlay {
 
  @Id
  private String id;
  private String title;
  private String user;
  private long startAt;
  private long endAt;
 
}
 
cs


4. Repository

JPA를 사용하면 익숙할 패턴으로 Elasticsearch에서도 ElasticsearchRepository가 존재한다. 사용방법은 JPA와 동일하게 저장할 때는 save, 조회할 때는 find(), findByUser()등으로 사용할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.elasticsearch.study.repository;
 
import com.elasticsearch.study.dto.WedulPlay;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-02-09
 **/
@Repository("wedulPlayRepository")
public interface WedulPlayRepository extends ElasticsearchRepository<WedulPlay, String> {
 
  WedulPlay findByUser(String user);
  
}
 
cs


5. Service

지금 테스트 하는 부분에서는 크게 비즈니스 로직에 들어갈 소스가 없다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.elasticsearch.study.service;
 
import com.elasticsearch.study.dto.WedulPlay;
import com.elasticsearch.study.repository.WedulPlayRepository;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Service;
 
import java.util.List;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-02-09
 **/
@AllArgsConstructor
@NoArgsConstructor
@Service
public class WedulPlayService {
 
  private WedulPlayRepository wedulPlayRepository;
 
  public void save(WedulPlay play) {
    wedulPlayRepository.save(play);
  }
 
  public List<WedulPlay> findAll() {
    return Lists.newArrayList(wedulPlayRepository.findAll());
  }
 
  public WedulPlay findByUser(String user) {
    return wedulPlayRepository.findByUser(user);
  }
 
}
 
cs


6. Test 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.elasticsearch.study.wedulplay;
 
import com.elasticsearch.study.dto.WedulPlay;
import com.elasticsearch.study.repository.WedulPlayRepository;
import com.elasticsearch.study.service.WedulPlayService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
import java.util.List;
 
import org.hamcrest.core.IsNull;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
 
/**
 * wedul play document 조회
 *
 * @author wedul
 * @since 2019-02-09
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class WedulPlayTest {
 
  WedulPlayService wedulPlayService;
 
  @Autowired
  @Qualifier("wedulPlayRepository")
  WedulPlayRepository wedulPlayRepository;
 
  @Before
  public void setup() {
    wedulPlayService = new WedulPlayService(wedulPlayRepository);
  }
 
  @Test
  public void whenValidParameter_thenSuccessFind() {
    List<WedulPlay> list = wedulPlayService.findAll();
 
    assertNotNull(list);
  }
 
  @Test
  public void whenValidParameter_thenSuccessSave() {
    Exception ex = null;
 
    try {
      wedulPlayService.save(WedulPlay.builder().title("안녕 이건 테스트야").user("위들").startAt(1242421424).endAt(23214124).build());
    } catch (Exception exception) {
      ex = exception;
    }
 
    assertTrue(null == ex);
  }
 
  @Test
  public void whenValidParameter_thenSuccessFindByUser() {
    Exception ex = null;
 
    try {
      WedulPlay play = wedulPlayService.findByUser("위들");
 
      assertThat(play, is(IsNull.notNullValue()));
    } catch (Exception exception) {
      ex = exception;
    }
 
    assertTrue(null == ex);
  }
 
 
}
 
cs


설정이 간단하다. 

나중에 이용해 먹어야지


자세한 소스코드는 여기 참조

https://github.com/weduls/spring_elastic

ELK에서 logstash를 제외하고는 모두 경험해봤다.

이제 logstash를 사용해서 log파일을 elasticsearch에 기록해보자.


설치

elasticseach도 kibana도 pc에 직접 설치하고 싶지 않아서 docker에 설치해서 사용했다. logstash도 docker에 설치해서 사용해보자.


물론 logstash를 사용하기전에 elasticseach와 kibana가 설치되어 있어야한다. 설치법은 저번 게시물에 올려놨다.


logstash를 이름을 지정해서 background에서 동작하도록 실행시킨다.

1
docker run --name logstash -d docker.elastic.co/logstash/logstash:6.4.0
cs


설정파일

logstash를 설치하면 내부에 다음과 같은 설정파일이 존재한다.

 이름

 설명

logstash.yml 

 logstash 구성 플래그가 들어있다. 이곳에 설정하면 command에 직접 설정할 필요가 없다. 그래도 command명령이 logstash.yml 파일의 우선순위보다 높다.

 pipelines.yml

 single logstash 인스턴스에서 파이프라인을 실행시키기 위한 설정내용을 담고 있다. input, filter, output등을 설정한다,

 jvm.options

 jvm 설정을 포함하고 있어서 힙사이즈 조절이 가능하다.

 log4j2.properties

log4j 설정을 할 수있다. 


설치가 완료된 후 logstash를 기존에 설치된 elasticsearch에 연동해서 상태를 보려고 할때 오류가 발생한다. config/logstash.yml에서 호스트 네임을 설정해주면 정상적으로 연결이 되고 monitoring까지 할 수 있다.



파이프라인

logstash에서 파이프라인은 input, output, filter가 존재한다. input과 output은 데이터 입출력을 위해 필수값이고 filter는 정재하기 위한 설정으로 선택값이다.

파이프라인 설정은 /pipieline 내부에 conf파일을 생성하여 설정할 수있다.


로그기록

그럼 설치된 logstash에서 특정 로그파일을 읽어서 elasticsearch에 기록해보자. pipeline.yml 파일을 수정해서 다음과 같이 기록한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input {
  file {
    path => "/wedul.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
 
output {
  elasticsearch {
    hosts => "http://we.local:9200"
    index=>"wedulpos"
  }
}
cs


위 내용은 파일을 읽고 elasticsearch wedulpos인덱스에 저장하라는 내용이다. start_position이 beginning이면 처음부터 읽으라는 것이고 sincedb는 내부적으로 어디까지 읽었는지 확인하기위해 저장하는 db이다. 현재 테스트할때 sincedb 설정이 제대로 되지 않아 No sincedb_path set, generating one based on the "path" setting 오류가 발생하여 우선적으로 /dev/null로 진행했다.

설정이 끝나고 logstash를 실행시키면 다음과 같이 저장되는걸 알 수있다.


모니터링에서도 해당 로그 기록 상황을 볼 수있다.


이 밖에도 input, output을 jdbc, cloudwatch등으로 설정해서 진행할수도 있다.




+ Recent posts