Too many dynamic script compilations 에러

elasticsearch를 사용하여 개발을 하다보면 스크립트를 사용하는 경우가 굉장히 많이 발생한다. 나는 아무생각없이 스크립트를 만들어서 사용했는데 어느날 운영에 반영을 하는데 본적이 없던 에러를 발견했다.

 

java.lang.AssertionError: 
Expecting code not to raise a throwable but caught
  <"ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]; nested: ElasticsearchException[Elasticsearch exception [type=circuit_breaking_exception, reason=[script] Too many dynamic script compilations within, max: [75/5m]; please use indexed, or scripts with parameters instead; this limit can be changed by the [script.max_compilations_rate] setting]];

스크립트 컴파일 에러가 발생했다고 하는데 왜 발생한 것인지... 몰라 검색해봤다.

 

 

에러 원인

elasticsearch는 기본적으로 컴파일하여 사용할 수 있는 스크립트 수를 제한을 한다고 한다. 그 제한의 기본값은 75/5m rate 즉 5분동안 75개의 스크립트만 컴파일하여 사용이 가능하다고 한다.

 

그래서 그 이상의 스크립트를 컴파일 하려고 할 시 Elasticsearch에서 out of meory 방지를 위해 circuit을 열어버린다. 

https://www.elastic.co/guide/en/elasticsearch/reference/current/circuit-breaker.html

 

근데 나는 스크립트를 그 정도로 많이 만들어서 사용하지는 않았는데 어떻게 그럴수가 있을까 하고 있던 찰나에 같이 일하는 동료분께서 내가 짠 스크립트에서 문제가 있다고 확인해주셨다.

 

바로 데이터를 스크립트에 사용되는 유동적인 데이터 중 일부가 param의 형태로 들어가지 않고 스크립트를 만들 때 동적으로 들어가게 해놨던 것이었다....

 

예를 들면 다음과 같이 스크립트를 매번 만들었던 것이었다. ㅜㅜ 그래서 당연히 초당 요청이 엄청 많은 우리 서비스에서 75요청은 그냥 넘어갔고 그 결과 서킷이 열려버려서 elasticsearch에서 400에러를 내뱉었다.

for (int i = 0; i <= 76; i++) {
	testRepository.search(QueryBuilders.scriptQuery(new Script(ScriptType.INLINE, "painless", "return 0 <= " + i + ";", Collections.emptyMap())));
}

아뿔싸 나 때문에 오전부터 고생했던 배포를 못하게 되었다 ㅜㅜ

 

 

해결방법

이 문제를 해결하기 위한 방식은 2가지가 있다.

 

 

먼저 유동적으로 들어오는 데이터로인해 스크립트가 계속 새로 컴파일 되지 못하도록 유동성 데이터는 param으로 넘겨서 사용하는 방식이다. 나 또한 이 방식으로 데이터를 바꿨다. 위의 예시 기준으로 다음과 같이 변경하였다.

for (int i = 0; i <= 76; i++) {
	testRepository.search(QueryBuilders.scriptQuery(new Script(ScriptType.INLINE, "painless", "return 0 <= params[i];", Collections.emptyMap("i", i))));
}

 

 

그리고 또다른 방식으로는 실제로 스크립트가 많이 컴파일 되어야 할 때는 상황에 맞게 그 rate를 조절해야 할 수도 있다. 이때는 다음과 같이 변경해주면된다.

// dsl
PUT http://localhost:9200/_cluster/settings
{
  "transient": {
    "script.max_compilations_rate": "150/1m"
}

// java (rest high level client)
public void changeScriptMaxCompileRate(String rate) {

	ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
	request.transientSettings(ImmutableMap.of(
		"script.max_compilations_rate", rate
	));

	restHighLevelClient.cluster().putSettings(
		request, RequestOptions.DEFAULT
	);
}

 

하지만 이와 관련된 이슈에서 다음과 같은 문구를 찾았다. 

The best solution is actually to not to increase the limit. If a test suite breaks the amount of compilations allowed, it will absolutely blow up in any serious environments.

The best solution is to figure out which painless script(s) are always recompiling, and parameterize them instead. I've had that happen on a few occasions, and I just needed to move some literal values out of the script and into params. Check out this PR: https://github.com/elastic/beats/pull/9308/files#diff-759f580883147ab049f76cd3501ec965R32

 

무조건 limit를 늘리는건 방식이 아니고 재 컴파일 되지 않도록 수정하는게 옳은 방식이라고 한다.

물론 나 때문에 문제가 발생하였지만 몰랐고 놓쳤던 부분을 알게 되어서 값비싼 수업을 들은 기분이 들었다.

댓글()

RestHighLevelClient를 사용하여 search after 기능 구현하기

web/Spring|2019. 11. 14. 17:55

https://wedul.site/541에서 search after 기능을 사용해서 검색을 하는 이유를 알아봤었다.

그럼 spring boot에서 RestHighLevelClient를 이용해서 search after를 구현을 해보자.

 

1. Mapping

우선 index가 필요한데 간단하게 상품명과 지역 가격정보들을 가지고 있는 wedul_product 인덱스를 만들어 사용한다.

{
    "settings": {
        "index": {
            "analysis": {
                "tokenizer": {
                    "nori_user_dict": {
                        "type": "nori_tokenizer",
                        "decompound_mode": "mixed",
                        "user_dictionary": "analysis/userdict_ko.txt"
                    }
                },
                "analyzer": {
                    "wedul_analyzer": {
                        "tokenizer": "nori_user_dict",
                        "filter": [
                            "synonym"
                        ]
                    }
                },
                "filter": {
                    "synonym": {
                        "type": "synonym",
                        "synonyms_path": "analysis/synonyms.txt"
                    }
                }
            }
        }
    },
    "mappings": {
        "dynamic": "false",
        "properties": {
            "productId": {
                "type": "keyword"
            },
            "place": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "message": {
                "type": "text"
            },
            "query": {
                "type": "percolator"
            },
            "name": {
                "type": "text",
                "analyzer": "wedul_analyzer",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "price": {
                "type": "integer"
            },
            "updateAt": {
                "type": "date",
                "format": "epoch_second"
            },
            "createAt": {
                "type": "date",
                "format": "epoch_second"
            }
        }
    }
}

값은 적당하게 3개정도 삽입하였다.

저장되어 있는 초기값.

 

2. 라이브러리 

사용에 필요한 라이브러리들을 gradle을 사용해서 추가한다. 

plugins {
    id 'org.springframework.boot' version '2.2.0.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

ext {
    set('elasticsearch.version', '7.4.2')
}

group = 'com.wedul'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
    maven { url "https://plugins.gradle.org/m2/" }
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.0'
    annotationProcessor 'org.projectlombok:lombok'
    testCompile group: 'org.mockito', name: 'mockito-all', version:'1.9.5'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'

    // gson
    compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'

    // elasticsearch
    compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.4.2'
    compile group: 'org.elasticsearch', name: 'elasticsearch', version: '7.4.2'
}

 

 

3.RestHighLevelClient configuration

restHighLevelClient 사용을 위한 Configuration 파일을 만들어주는데 id와 pw는 AppConfig라는 별도 properties를 관리하는 bean에서 받아서 사용하는데 base64로 인코딩되어있어서 이를 decoding후 사용한다. (부족한 코드는 글 맨 아래있는 github 링크 참조)

package com.wedul.study.common.config;

import com.wedul.study.common.util.EncodingUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

/**
 * study
 *
 * @author wedul
 * @since 2019-11-07
 **/
@Configuration
@Slf4j
public class ElasticsearchClientConfig implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {

    @Autowired
    AppConfig appConfig;

    private RestHighLevelClient restHighLevelClient;

    @Override
    public RestHighLevelClient getObject() {
        return restHighLevelClient;
    }

    @Override
    public Class<?> getObjectType() {
        return RestHighLevelClient.class;
    }

    @Override
    public void destroy() {
        try {
            if (null != restHighLevelClient) {
                restHighLevelClient.close();
            }
        } catch (Exception e) {
            log.error("Error closing ElasticSearch client: ", e);
        }
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    public void afterPropertiesSet() {
        restHighLevelClient = buildClient();
    }

    private RestHighLevelClient buildClient() {
        try {
            String id = EncodingUtil.decodingBase64(appConfig.getElasticsearchConfig().getId());
            String pw = EncodingUtil.decodingBase64(appConfig.getElasticsearchConfig().getPw());

            // 계정 설정
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(id, pw));

            // client 설정
            RestClientBuilder builder = RestClient.builder(
                new HttpHost(appConfig.getElasticsearchConfig().getIp(),
                    appConfig.getElasticsearchConfig().getPort(), "http"))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

            restHighLevelClient = new RestHighLevelClient(builder);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        return restHighLevelClient;
    }

}

 

 

4. Handler 추가

자주 사용되는 Elasticsearch 문법을 처리하기 위해서 만들어 놓은 ElasticsearchHandler에 search after에 사용 될 메소드를 추가한다. search after는 sort 필드가 없으면 사용이 불가능 하기 때문에 sort 필드가 없는 경우 에러를 전달한다.

public static SearchSourceBuilder searchAfter(Map<String, SortOrder> sortFields, QueryBuilder query, Object[] searchAfter, int size) {
    return searchAfterBuilder(sortFields, query, searchAfter,  size);
}

public static SearchSourceBuilder searchAfter(Map<String, SortOrder> sortFields, QueryBuilder query, Object[] searchAfter) {
    return searchAfterBuilder(sortFields, query, searchAfter, 20);
}

private static SearchSourceBuilder searchAfterBuilder(Map<String, SortOrder> sortFields, QueryBuilder query, Object[] searchAfter, int size) {
    SearchSourceBuilder builder = new SearchSourceBuilder();

    if (CollectionUtils.isEmpty(sortFields)) {
        throw new InternalServerException("잘못된 필드 요청입니다.");
    }

    sortFields.forEach((field, sort) -> {
        builder.sort(field, sort);
    });
    builder.size(size);
    builder.query(query);

    if (ArrayUtils.isNotEmpty(searchAfter)) {
        builder.searchAfter(searchAfter);
    }

    return builder;
}

 

 

5. 기능 구현

위의 기능들을 이용해서 실제로 구현해보자. productService와 productRepository 클래스를 통해서 구현하였다. 자세한 설명없이 간단하기 때문에 소스를 보면 알 수 있다. 

 

우선 최종 결과물로 사용될 클래스는 ElasticResult인데 다음과 같이 현재 요청이 마지막인지 표시하는 isLast와 다음 요청을 위해 보내줘야 하는 cursor값과 결과값 전체 total과 결과 리스트 list 필드가 존재한다.

@Builder
@Data
public class ElasticResult<T extends ElasticsearchDto> {

    private boolean isLast;
    private long total;
    private List<T> list;
    private Object[] cursor;

}

 

그 다음 service로직을 통해 결과를 얻어서 위 ElasticResult에 결과를 담아보자. products 메서드는 요청을 받아서 elasticsearch에 실제 조작요청을 하는 productRepository에 동작을 요청하고 값을 받아서 처리하는 메서드이다. 그리고 extractProductList는 결과값에서 ProductDto 값을 뽑아내는 메서드이다.

public ElasticResult<ProductDto> products(String index, Object[] searchAfter, int size) throws IOException {
    SearchResponse searchResponse = productRepository.products(index, searchAfter, size);
    SearchHits searchHits = searchResponse.getHits();
    int hitCnt = searchHits.getHits().length;
    boolean isLast = 0 == hitCnt || size > hitCnt;

    return ElasticResult.<ProductDto>builder()
        .cursor(isLast ? null : searchHits.getHits()[hitCnt - 1].getSortValues())
        .isLast(isLast)
        .list(extractProductList(searchHits))
        .total(searchHits.getTotalHits().value)
        .build();
}

private List<ProductDto> extractProductList(SearchHits searchHits) {
    List<ProductDto> productList = new ArrayList<>();

    searchHits.forEach(hit -> {
        Map<String, Object> result = hit.getSourceAsMap();

        productList.add(ProductDto.builder()
            .name(String.valueOf(result.get("name")))
            .productId(String.valueOf(result.get("productId")))
            .place(String.valueOf(result.get("place")))
            .price(Integer.valueOf(result.get("price").toString()))
            .updateAt(Long.valueOf(result.get("updateAt").toString()))
            .createAt(Long.valueOf(result.get("createAt").toString())).build());
    });

    return productList;
}

 

그리고 마지막으로 es에 직접적으로 콜을 하는 productRepository 이다. 여기서 정렬 키워드는 name과 place를 사용한다.

public SearchResponse products(String index, Object[] searchAfter, int size) throws IOException {
    SearchRequest searchRequest = new SearchRequest(index);
    Map<String, SortOrder> sorts = new HashMap<String, SortOrder>() {
        {
            put("name.keyword", SortOrder.DESC);
            put("place.keyword", SortOrder.DESC);
        }
    };

    SearchSourceBuilder searchSourceBuilder = ElasticsearchHandler.searchAfter(sorts, QueryBuilders.matchAllQuery(), searchAfter, size);
    searchRequest.source(searchSourceBuilder);
    return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
}

 

 

6. 테스트

그럼 위에 내용이 잘 구현되었는지 테스트를 해보자. 총 3개의 데이터가 있는데 이 데이터를 1개씩 search after를 통해서 값을 받아서 저장하고 한번에 출력하도록 해보자.

@Test
@DisplayName("search after")
public void searchAfter() throws IOException {
    ElasticResult<ProductDto> result = productService.products(PRODUCT_INDEX, new Object[]{}, 1);
    List<ProductDto> productDtos = new ArrayList<>();

    while(result != null && !result.isLast()) {
        productDtos.addAll(result.getList());
        result = productService.products(PRODUCT_INDEX, result.getCursor(), 1);
    }
    productDtos.addAll(result.getList());

    productDtos.forEach(productDto -> {
        System.out.println("이름 : " + productDto.getName());
        System.out.println("장소 : " + productDto.getPlace());
    });
}

결과는 정상적으로 3가지 모두 잘 출력되는 걸 알 수있다.

 

우선 기능 구현을 해보기 위해서 진행하였는데 더 다듬어야 할 것같다.

자세한 소스는 github참조

댓글()

nginx 서버에 filebeat를 이용하여 ELK에 로그 기록하기

IT 지식/Docker|2019. 10. 15. 22:00
git clone https://github.com/deviantony/docker-elk

nginx를 설치하고 docker 기반으로 ELK (elasticsearch, logstash, kibana)를 설치하고 nginx 로그를 filebeat를 설치하여 acces.log, error.log, syslog등을 전송해보자.

 

설치 

ELK를 도커에 설치하는 스크립트를 아래 github에 잘 정리되어 제공해주고 있다.
https://github.com/deviantony/docker-elk

ELK는 이걸로 설치하면 되는데 docker-compose로 nginx와 filebeat까지 함께 설치하기 위해서 아래 저장소에서 제공하는 nginx-filebeat 스크립트를 혼합해서 사용해보자.
https://github.com/spujadas/elk-docker/tree/master/nginx-filebeat

1. 우선 ELK 설치 스크립트를 가져오자.

git clone https://github.com/deviantony/docker-elk



2. 그리고 nginx-filebeat 파일을 다운 받아서 docker-elk 디렉토리 내부에 추가한다.

git clone https://github.com/spujadas/elk-docker
mv ./elk-docker/nginx-filebeat ./docker-elk



3. 그리고 nginx-filebeat까지 사용할 수 있도록 docker-compose.yml 스크립트를 수정해준다. 그리고 nginx.conf 파일을 쉽게 보고 생성된 log도 로컬에서 보기 위해서 mount를 로컬 폴더로 진행한다.

version: '3.2'

services:
  elasticsearch:
    container_name: "elasticsearch"
    build:
      context: elasticsearch/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./elasticsearch/config/elasticsearch.yml
        target: /usr/share/elasticsearch/config/elasticsearch.yml
        read_only: true
      - type: volume
        source: elasticsearch
        target: /usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      ES_JAVA_OPTS: "-Xmx256m -Xms256m"
      ELASTIC_PASSWORD: changeme
    networks:
      - mynet

  logstash:
    container_name: "logstash"
    build:
      context: logstash/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./logstash/config/logstash.yml
        target: /usr/share/logstash/config/logstash.yml
        read_only: true
      - type: bind
        source: ./logstash/pipeline
        target: /usr/share/logstash/pipeline
        read_only: true
    ports:
      - "5000:5000"
      - "9600:9600"
    environment:
      LS_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - mynet
    depends_on:
      - elasticsearch

  kibana:
    container_name: "kibana"
    build:
      context: kibana/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./kibana/config/kibana.yml
        target: /usr/share/kibana/config/kibana.yml
        read_only: true
    ports:
      - "5601:5601"
    networks:
      - mynet
    depends_on:
      - elasticsearch

  nginx:
    container_name: "nginx"
    build:
      context: nginx-filebeat/
    volumes:
      - type: bind
        source: /Users/we/Documents/docker/nginx
        target: /etc/nginx
      - type: bind
        source: /Users/we/Documents/docker/nginx_log
        target: /var/log
    ports:
      - "8080:80"
    networks:
      - mynet

networks:
  mynet:
    driver: bridge

volumes:
  elasticsearch:

 

4. 마지막으로 filebeat.xml에서 ssl 통신을 하지 않기 때문에 ssl 부분을 제거해준다. (이유는 하단에 나온다.)

output:
  logstash:
    enabled: true
    hosts:
      - logstash:5044
    timeout: 15

filebeat:
  inputs:
    -
      paths:
        - /var/log/syslog
        - /var/log/auth.log
      document_type: syslog
    -
      paths:
        - "/var/log/nginx/*.log"
      document_type: nginx-access

 

5. 그럼 지금까지 수정한 내용을 이용해서 docker-compose up -d 통해 설치 진행해보자. 설치후 제거 하고 싶으면 (docker-compose -v down) 명령어를 통해 제거 할 수 있다.

docker-compose up -d

 

6. 설치가 완료되면 키바나에 접속해서 확인해보면 logstash, elasticsearch, kibana 모두 설치 된 것을 알 수 있다. 초기 계정은 elastic / changeme이다.

ELK와 nginx docker process

7. logstash pipeline을 만들어줘야하는데 kibana에서 management → logstash → pipeline에서 설정해주면된다. 간단하게 5044포트로 받고 nginx_log 인덱스로 넣게 설정한다.

input {
    beats {
        client_inactivity_timeout => 19909
        port => "5044"
        ssl => false
    }
}
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}
output {
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "nginx_log"
        user => "elastic"
        password => "changeme"
    }
   stdout { codec => rubydebug }
}

 

8. 그럼 filebeat가 정상적으로 동작하는지 확인해보자.

filebeat 실행 상태 확인

/etc/init.d/filebeat status

 

filebeat.xml 기준으로 설정 정상 적용 상태 확인

filebeat test config

 

filebeat.xml에 설정된 output 정상 여부 확인

filebeat test output

filebeat 테스트 결과

위에 테스트를 진행하면 위에 화면처럼 나오는게 나와야 정상이다. 정상적으로 가동된걸 확인했다.

그럼 실제 nginx에서 나온 로그가 filebeat로 수집되어 logstash > elasticsearch로 정상적으로 적재되는지 보자.
localhost:8080에 접속하여 로그 발생시킨 후 filebeat 로그를 확인했는데 왠걸 다음과 같은 오류가 발생했다.

[2019-10-15T06:12:48,844][INFO ][org.logstash.beats.BeatsHandler] [local: 172.28.0.4:5044, remote: 172.28.0.2:55946] Handling exception: org.logstash.beats.BeatsParser$InvalidFrameProtocolException: Invalid Frame Type, received: 1
[2019-10-15T06:12:48,845][WARN ][io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: org.logstash.beats.BeatsParser$InvalidFrameProtocolException: Invalid Frame Type, received: 1
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:359) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:38) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:236) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) [logstash-input-tcp-6.0.3.jar:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [logstash-input-tcp-6.0.3.jar:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.logstash.beats.BeatsParser$InvalidFrameProtocolException: Invalid Frame Type, received: 1
at org.logstash.beats.BeatsParser.decode(BeatsParser.java:92) ~[logstash-input-beats-6.0.0.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[logstash-input-tcp-6.0.3.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[logstash-input-tcp-6.0.3.jar:?]
... 10 more

이 문제는 logStash 또는 filebeat가 동시에 ssl을 사용하지 않는데 한쪽만 ssl 통신을 했을 때 발생되는 오류이다.

그래서 아까 위에 filebeat에 ssl 부문을 지우고 pipeline에 ssl 설정을 false로 지정한 것이다. 그럼 다시한번 localhost:8080에 들어가보자.

지금은 index.html을 만들어 놓지 않아서 404 에러가 발생된다.

GET nginx_log/_search
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "nginx_log",
        "_type" : "_doc",
        "_id" : "vc04zm0BZBO8vgBrBmV7",
        "_score" : 1.0,
        "_source" : {
          "ecs" : {
            "version" : "1.1.0"
          },
          "@version" : "1",
          "tags" : [
            "beats_input_codec_plain_applied"
          ],
          "message" : """2019/10/15 07:00:31 [error] 25#25: *16 "/etc/nginx/html/index.html" is not found (2: No such file or directory), client: 172.28.0.1, server: , request: "GET / HTTP/1.1", host: "localhost:8080"""",
          "host" : {
            "name" : "a31d3333d22b"
          },
          "agent" : {
            "type" : "filebeat",
            "version" : "7.4.0",
            "hostname" : "a31d3333d22b",
            "id" : "cc4eb582-e09c-4a83-bb2e-9721c39ee508",
            "ephemeral_id" : "a5918a0b-4688-458f-bbc2-4eb49a3fff03"
          },
          "log" : {
            "file" : {
              "path" : "/var/log/nginx/error.log"
            },
            "offset" : 8524
          },
          "@timestamp" : "2019-10-15T07:00:38.443Z"
        }
      },
      {
        "_index" : "nginx_log",
        "_type" : "_doc",
        "_id" : "vs04zm0BZBO8vgBrBmV8",
        "_score" : 1.0,
        "_source" : {
          "ecs" : {
            "version" : "1.1.0"
          },
          "@version" : "1",
          "tags" : [
            "beats_input_codec_plain_applied"
          ],
          "message" : """172.28.0.1 - - [15/Oct/2019:07:00:31 +0000] "GET / HTTP/1.1" 404 555 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36"""",
          "host" : {
            "name" : "a31d3333d22b"
          },
          "agent" : {
            "type" : "filebeat",
            "version" : "7.4.0",
            "hostname" : "a31d3333d22b",
            "id" : "cc4eb582-e09c-4a83-bb2e-9721c39ee508",
            "ephemeral_id" : "a5918a0b-4688-458f-bbc2-4eb49a3fff03"
          },
          "log" : {
            "file" : {
              "path" : "/var/log/nginx/access.log"
            },
            "offset" : 8528
          },
          "@timestamp" : "2019-10-15T07:00:38.443Z"
        }
      }
    ]
  }
}

 

정상적으로 적재가 잘되는것을 확인할 수 있다.

이로써 docker로 구성된 elk로 로그 적재를 진행해봤다. 

 

관련 패키지는 github에 올려놓았다.

https://github.com/weduls/elk_with_nginx

댓글()

Elasticsearch node 종류와 기본 설정 옵션

Elasticsearch의 노드 

Elasticsearch의 인스턴스를 시작하는 동시에 노드도 같이 시작된다. 노드들을 연결해놓은 것을 클러스터라고 한다.

만약 하나의 엘라스틱 서치 노드만을 실행시킨 경우도 하나의 노드를 가진 클러스터라고 한다. 

클러스터안에서 모든 노드는 HTTP와 Transport 트래픽을 기본적으로 다룬다. Transport 레이어는 오로지 노드들과 Java TransportClient와의 통신에만 사용된다. Http 레이어는 오직 외부 Rest Cliente들과 통신할 때 사용된다.

모든 노드는 클러스터 안에서 서로 다른 노드들에 대하여 알고 있고 client에 요청을 적적한 노드로 향하게 조절해준다. 기본적으로 노드는 master-eligible, data, ingest, machine learning이 존대한다.

 

Elasticsearch의 노드 종류

Master-eligible 노드

- node.master를 true로 지정하며 클러스터의 컨트롤을 통해 마스터 노드로 선택될 자격을 가지게 된다.

- 마스터 노드는 클러스터에서 인덱스를 만들고 지우는 행위, 클러스터에서 노드들을 트래킹하고 각각의 노드를 샤드를 할당할건지 결정한다.

- Masger Eligible Node등에서 마스터 노드는 마스터 설출 프로세스를 통해 선출된다.

(https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html)

- 마스터 노드는 데이터 노드처럼 데이터와 폴더에 접근 권한이 있어야 한다. 노드가 재시작 하는 사이에도 클러스터 상태가 유지되어야 하기 때문에 접근이 가능해야 한다.

- 데이터를 인덱싱하고 찾고 하는 작업은 CPU, Memory, I/O 자원을 많이 사용하기 때문에 큰 규모에서는 data node와 master node를 구별한다.

- 마스터 노드도 Coordinating Node 처럼 데이터를 routing하고 모으고 하는 작업이 가능하지만 이는 마스터 노드가 하는 주 목적이 아니다. 안정적인 마스터 노드를 운영하기 위해서는 자기 일만 딱 하게 해주는 것이 좋다.

- 마스터 노드로 노드를 지정하기 위한 기본 설정 값은 다음과 같다.

node.master: true 
node.data: false 
node.ingest: false 
node.ml: false 
xpack.ml.enabled: true 
cluster.remote.connect: false

 

Data 노드

- node.data가 true로 지정된 노드는 데이터를 가지고 있을 수 있고 CRUD, 검색, aggregation 등의 데이터와 관련된 작업이 가능하다.

- 데이터 노드는 인덱싱 된 Document를 포함하고 있는 샤드를 관리한다.

- 데이터 노드는 데이터를 직접적으로 다루기 때문에 리소스 자원이 많이 필요하다.

- 데이터 노드로 노드를 지정하기 위한 기본 설정 값은 다음과 같다.

node.master: false 
node.data: true 
node.ingest: false 
node.ml: false 
cluster.remote.connect: false

 

Ingest 노드 

- node.ingest가 true로 지정된 노드가 Document가 인덱싱 되기 전에 변형되고 풍성하게 하기 위해서 Document를 ingest pipeline으로 적용할 수 있다.

- ingetst node는 pre processing 파이프라인을 실행하고 하나 또는 하나 이상의 ingest processor들을 모으는 작업을 한다.

- ingest를 로드하는건 무겁기 때문에 데이터나 마스터 노드에서는 node.ingest를 false로 지정하는 것이 좋다.

- 많은 리소스를 잡아먹기 때문에 ingest node는 별도로 지정하는 것이 좋다.

- ingest 노드로 노드를 지정하기 위한 기본 설정 값은 다음과 같다.

node.master: false 
node.data: false 
node.ingest: true 
node.ml: false 
cluster.remote.connect: false

 

Machine Learning 노드

- xpack.ml이 true로 지정되어 있고 node.ml이 true로 설정되어 있는 노드는 기본적으로 엘라스틱서치에서 분배하는 행위를 한다.

- 만약 머신러닝 특징을 사용하고 싶으면 적어도 클러스터 내에 하나의 머신러닝 노드가 있어야 한다.

 

Coordinating 노드

- 검색 요청과 bulk indexing과 같은 요청들은 다른 노드들의 있는 데이터를 많이 다룬다.

- 데이터가 흩어져 있는 경우 데이터가 있는 노드로 향하게 조정해준다. 각각의 데이터 노드는 요청을 자체적으로 처리하고 그것의 값을 Coordinating 노드에 전달해준다. 그럼 Coordinating 노드는 이를 모아서 하나의 데이터 형태로 정제하여 반환한다.

- 각각의 노드는 Coordinating node가 될 수 있다. 대신 node.master, node.data, node.integer가 false로 되어 있어야 한다.

- 또한 데이터를 모으고 조작하고 하는 작업이 많기 때문에 Coordinating 노드는 메모리랑 CPU에 대한 자원이 많아야 한다. 그렇기 때문에 오직 요청을 라우팅하고 검색 구절을 조절하고, bulk indexing 분배작업을 하는 노드로만 사용하는 게 좋다.

- Coordinating node는 본질적으로 로드 밸런싱 같은 역할을 한다.

- Coordinating node로 지정하기 위한 설정은 다음과 같다.

node.master: false 
node.data: false 
node.ingest: false 
node.ml: false 
cluster.remote.connect: false

 

Node Data Path Setting

path.data

- 모든 데이터와 master-eligible 노드는 샤드 그리고 인덱스, 클러스터 메타데이터가 저장되어 있는 데이터 디렉토리를 접근한다.

- path.data는 기본적으로 $ES_HOME/data로 지정되어 있지만 elasticsearch.yml을 통해서 바꿀 수 있다.

댓글()

Elasticsearch version conflict 에러

배치를 이용해서 Elasticsearch에 데이터를 삽입하던 중 version conflict라는 오류가 자주 발생했다. 처음에는 Elasticsearch 버전이 동일한데 왜? 오류가 나는지 몰랐다.

그래서 검색해보니 인덱스안에 document에는 각자 관리하는 version이 존재한다. 이 version은 document가 수정될 때 하나씩 올라가게 되는데 version이 10인 상태에 document에 여러 서버 모듈에서 해당 document에 업데이트를 하려고 하니 문제가 발생하였다.

그 이유는 version 10인 상태에서 작업에 들어간 두 모듈은 한 모듈이 먼저 11로 업데이트를 시키고 다음 모듈이 작업을 진행하려고 할 때 자기가 알고 있던 마지막 version인 10이 아니라 11로 바껴있는것을 보고 에러를 뱉어내는것이다. 이렇게 까지 세심하게 챙겨줄지 몰랐다. 알면 알수록 elasticsearch라는 db는 정말 매력적이다.

PUT wedul_index 
{
  "mappings": {
      "_doc": {
        "dynamic": "false",
        "properties": {
          "name": {
            "type": "text"
          }
        }
      }
  }
}

위와 같이 인덱스가 있고 document 하나가 들어있다. 여기에 age라는 값과 gender를 집어넣어보자. 이를 동시에 호출해보자.

document

그럼 document 하나에 필드를 동시에 업데이트하는 update.sh라는 스크립트를 만들어서 실행시켜보자.

curl -X POST "localhost:9200/wedul_index/_update_by_query" -H 'Content-Type: application/json' -d' { "script": { "source": "ctx._source[\u0027gender\u0027] = \u0027M\u0027"}, "query": { "match": { "name": "위들" } } } ‘
curl -X POST "localhost:9200/wedul_index/_update_by_query" -H 'Content-Type: application/json' -d' { "script": { "source": "ctx._source.age = 10", "lang": "painless" }, "query": { "match": { "name": "위들" } } } ‘

그럼 위에 설명했던 것 처럼 버전이 먼저 변경이 되면서 다음과 같은 에러를 뱉어낸다.

[{"index":"wedul_index","type":"_doc","id":"3MSd5WsB_jV9Cf9TkYLV","cause":{"type":"version_conflict_engine_exception","reason":"[_doc][3MSd5WsB_jV9Cf9TkYLV]: version conflict, current version [3] is different than the one provided [2]","index_uuid":"sJI8sBnrTP-OW8OG8YBqWA","shard":"3","index":"wedul_index"},"status":409}]

 

이를 해결하기 위해서는 retry_on_conflict 옵션을 함꼐 부여할 수 있는데 이 옵션은 version conflict이 발생했을 때, 업데이트 재시도를 몇회 할건지 지정하는 옵션이다.

좀 더 자세한 사항은 아래 elasticsearch 메뉴얼을 보면 자세히 나와있다.

참조
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html

댓글()

Elasticsearch reindex시 alias를 사용하여 무중단으로 진행하기 & big index 리인덱싱 시 비동기 처리 방법

Elasticsearch reindex를 진행할 때, 단순하게 새로운 인덱스를 만들고 reindex api를 진행하고 기존 인덱스를 지우고 새로 만들어서 다시 reindex를 해줬다. (이전글: https://wedul.site/611?category=680504)

하지만 그것은 해당 인덱스의 document의 수가 적어서 금방 진행이 되었었고 만약 document수가 10만가지만 넘어도 생각보다 오래걸려서 서비스의 흐름이 끊어지게 된다는걸 인지하지 못했다. 같은 회사 동료분께서 해당 부분에 대해서 말씀해주셨고, 그 분이 가이드 해주신대로 진행해서 reindex를 무중단하게 진행하는 방법을 찾아봤다.

 

Alias를 이용하여 reindex하기


기존 index wedul의 매핑구조이다.

PUT wedul 
{
  "mappings": {
    "dynamic": false,
    "properties": {
      "name": {
        "type": "text"
      }
    }
  }
}

해당 인덱스의 데이터는 현재 다음과 같이 들어있는 것을 볼 수 있다. 여기서 age는 매핑이 안되어있어서 검색에 잡을 수 없기에 이를 reindex를 통해 매핑 정보를 업데이트해주자.

wedul 인덱스에 들어있는 데이터(왼), age로 검색이 안됨 (우)

그럼 reindex를 위해 새로운 인덱스 wedul_v1을 만들어보자.

reindex를 진행할 새로운 index, wedul_v1

그리고 wedul_v1으로 reindex를 실행해준다. 이때 주의사항이 있는데 document양이 10만 이상이 넘어가게 되면 작업이 오래걸리기에 kibana에서 504 gateway timeout이 발생하고 작업이 중단된다. 그래서 해당 작업을 비동기로 실행시키는 옵션인 wait_for_completion=false를 함께 설정해주고 진행해야한다.

POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "wedul"
  },
  "dest": {
    "index": "wedul_v1"
  }
}

그럼 위에 이미지처럼 task 프로세스 번호가 나오고 이 프로세스에 시작시간 상태 취소 가능여부 등등을 GET _task 명령어를 통해 볼 수 있다. 여기서 프로세스가 종료되면 reindex가 다 된것이다.

그 다음 wedul_v1에 wedul이라는 alias를 지정해줘야한다. 

POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "wedul_v1",
        "alias": "wedul"
      }
    }
  ]
}

alias를 지정하기 전에 기존 인덱스 wedul을 지워줘야한다. DELETE wedul 명령어를 날려서 기존 인덱스를 지우고 위의 alias 명령어를 실행시킨다. 

그럼 정상적으로 alias를 통해 무중단 reindex를 실행되었다. 정상적으로 실행 되었는지 age에 대한 query를 날려보자.

ㅋㅋ 정상적으로 실행되었다.

앞으로 이런 방식으로 진행해야겠다.

 

출처 : https://discuss.elastic.co/t/reindex-big-index/83047

 

Reindex big index

I would like to reindex a very big index. When I run reindex API with elasticsearchjs client I will receive the requestTimeout error, or Gateway timeout error. It's ok because the reindex process is still running in Elastic server. However, what I want to

discuss.elastic.co

https://www.elastic.co/kr/blog/changing-mapping-with-zero-downtime

댓글()

Elasticsearch nori 형태소 분석기에서 readingform filter를 이용해서 한자 내용을 한글로 변환하기

Elasticsearch filter에서 한자로 검색했을 때 일치하는 한글 결과로 tokenizing하게 해주는 filter가 있다. 해당 filter는 nori-readingform이다. 적용 방법은 기존에 synonmys나 speech필터 적용과 동일하다.

 

인덱스 생성


위에서 부터 사용했던 인덱스에 nori_readingform 필터를 추가해서 생성만 해주면 된다.

PUT wedul_anaylyzer
{
  "settings": {
    "index" : {
      "analysis" : {
        "tokenizer": {
          "nori_user_dict": {
            "type": "nori_tokenizer",
            "decompound_mode": "none",
            "user_dictionary": "dic/nori_userdict_ko.txt"
          }
        },
        "analyzer" : {
          "custom_analyze" : {
            "type": "custom",
            "tokenizer" : "nori_user_dict",
            "filter": [
              "my_posfilter",
              "nori_readingform"
            ]
          }
        },
        "filter": {
          "my_posfilter": {
            "type": "nori_part_of_speech",
            "stoptags": [
              "NP", "UNKNOWN"
            ]
          }
        }
      }
    }
  }
}

이렇게 만든 인덱스를 이용해서 한자를 이용해서 한글 내용을 뽑아내보자

결과


행복이라는 한자를 입력하여 검색해보자. 필터가 정상적으로 적용된다면 행복이라는 내용을 가진 결과가 나올것이다

GET wedul_analyzer/_analyze
{
"analyzer": "custom_analyze",
"text": "幸福 사랑"
}

결과는 정상적으로 행복 그리고 사랑이라는 단어로 추출되었다. nori를 공부하면서 좋은 기본 필터 많은걸 알게 되서 좋다.

댓글()

Elasticsearch 특정 형태소 종류를 제외하여 검색하는 필터 nori_part_of_speech 적용

Elasticsearch를 사용하여 analyze를 사용하다가 조사, 형용사 등등을 제외하고 형태소 토크나이즈가 되어야 했다. 그래서 정식 문서를 찾아보더니 nori_part_of_speech라는 필터가 있었다.

우선 저번 시간에 만들었던 wedul_analyzer 인덱스를 이용해서 토크나이즈를 해보자.

{
  "tokens": [
    {
      "token": "바보",
      "start_offset": 0,
      "end_offset": 2,
      "type": "word",
      "position": 0
    },
    {
      "token": "위들",
      "start_offset": 3,
      "end_offset": 5,
      "type": "word",
      "position": 1
    },
    {
      "token": "이",
      "start_offset": 5,
      "end_offset": 6,
      "type": "word",
      "position": 2
    },
    {
      "token": "집에",
      "start_offset": 7,
      "end_offset": 9,
      "type": "word",
      "position": 3
    },
    {
      "token": "서",
      "start_offset": 9,
      "end_offset": 10,
      "type": "word",
      "position": 4
    },
    {
      "token": "나",
      "start_offset": 11,
      "end_offset": 12,
      "type": "word",
      "position": 5
    },
    {
      "token": "왔다",
      "start_offset": 12,
      "end_offset": 14,
      "type": "word",
      "position": 6
    }
  ]
}

여기서 '나'와 '왔다'를 없애고 토크나이즈 결과가 나왔으면 좋겠다.

그럼 '나'와 '왔다'의 형태소가 어떻게 되는지 우선 알아보자. analyzer api에 explain: true 옵션을 부여하면 해당 토크나이즈에 분리된 형태소들의 정보가 나온다.

GET _analyze
{
  "analyzer": "nori",
  "explain": true, 
  "text": "바보 위들이 집에서 나왔다"
}

'나'와 '왔다'는 NP와 UNKNOWN이다.  이 두개를 nori_part_of_speech필터를 이용해서 제거해보자.

 {
          "token": "나",
          "start_offset": 11,
          "end_offset": 12,
          "type": "word",
          "position": 6,
          "bytes": "[eb 82 98]",
          "leftPOS": "NP(Pronoun)",
          "morphemes": null,
          "posType": "MORPHEME",
          "positionLength": 1,
          "reading": null,
          "rightPOS": "NP(Pronoun)",
          "termFrequency": 1
        },
        {
          "token": "왔다",
          "start_offset": 12,
          "end_offset": 14,
          "type": "word",
          "position": 7,
          "bytes": "[ec 99 94 eb 8b a4]",
          "leftPOS": "UNKNOWN(Unknown)",
          "morphemes": null,
          "posType": "MORPHEME",
          "positionLength": 1,
          "reading": null,
          "rightPOS": "UNKNOWN(Unknown)",
          "termFrequency": 1
        }

custom analyzer를 만들면서 nori_part_of_speech 필터를 추가해주면된다. 이 필터에서 stoptags 배열에 제거하고 싶은 형태소 요형을 추가하면 해당 형태소를 제거한 결과만 출력된다.

PUT wedul_anaylyzer
{
  "settings": {
    "index" : {
      "analysis" : {
        "tokenizer": {
          "nori_user_dict": {
            "type": "nori_tokenizer",
            "decompound_mode": "none",
            "user_dictionary": "dic/nori_userdict_ko.txt"
          }
        },
        "analyzer" : {
          "custom_analyze" : {
            "type": "custom",
            "tokenizer" : "nori_user_dict",
            "filter": [
              "my_posfilter"
            ]
          }
        },
        "filter": {
          "my_posfilter": {
            "type": "nori_part_of_speech",
            "stoptags": [
              "NP", "UNKNOWN"
            ]
          }
        }
      }
    }
  }
}

이렇게 만든 analyze를 이용해서 다시한번 확인해보자. 

아래 결과 처럼 '나'와 '왔다' 두개의 형태소가 사라진 것을 확인할 수 있다.

{
  "tokens": [
    {
      "token": "바보",
      "start_offset": 0,
      "end_offset": 2,
      "type": "word",
      "position": 0
    },
    {
      "token": "위들",
      "start_offset": 3,
      "end_offset": 5,
      "type": "word",
      "position": 1
    },
    {
      "token": "이",
      "start_offset": 5,
      "end_offset": 6,
      "type": "word",
      "position": 2
    },
    {
      "token": "집에",
      "start_offset": 7,
      "end_offset": 9,
      "type": "word",
      "position": 3
    },
    {
      "token": "서",
      "start_offset": 9,
      "end_offset": 10,
      "type": "word",
      "position": 4
    }
  ]
}

 

기본적으로 stoptags를 적용하지 않으면 10몇가지의 형태소 종류가 기본으로 배제된다.

NP, VPC등 형태소들에 대한 용어는 하단 사이트에 잘 정리되어 있다.

https://coding-start.tistory.com/167
http://kkma.snu.ac.kr/documents/?doc=postag

 

꼬꼬마, 한글 형태소 분석기 (Kind Korean Morpheme Analyzer, KKMA)

꼬꼬마 한국어 형태소 분석기 한글 형태소 품사 (Part Of Speech, POS) 태그표 한글 형태소의 품사를 '체언, 용언, 관형사, 부사, 감탄사, 조사, 어미, 접사, 어근, 부호, 한글 이외'와 같이 나누고 각 세부 품사를 구분한다. 대분류 세종 품사 태그 심광섭 품사 태그 KKMA 단일 태그 V 1.0 태그 설명 Class 설명 묶음1 묶음2 태그 설명 확률태그 저장사전 체언 NNG 일반 명사 NN 명사 N NN NNG 보통 명사 NNA no

kkma.snu.ac.kr

 

출처
https://www.elastic.co/guide/en/elasticsearch/plugins/6.4/analysis-nori-speech.html

댓글()