데이터베이스/Elasticsearch

Java Lowlevel client bulk api에서 filter_path 사용하기

반응형

https://wedul.site/690

 

Bulk Index 진행 시 search api 느려지는 현상 해결 방법 리서치

현재 회사에서 하고있는 프로젝트에 경우 Elasticsearch를 사용해서 데이터를 제공하고 있다. 서비스 특성상 초당 받는 데이터 업데이트 요청이 많고 real time engine이 아닌 elasticsearch에 거의 리얼타

wedul.site

이전글에서 작성하였듯이 계속해서 쓰기 작업 시 발생할 수 있는 순단을 줄이기 위해서 여러가지 방법을 찾고 있다.

그중 쓰기 작업이 많이 발생할 때 불필요한 response를 줄이기 위해서 filter_path를 적용해보고자 한다.


 

Filter Path

rest api 작업 시 필요한 응답값만 받을 수 있는 기능이다. 하지만 Java High Level REST Client에서는 transport layer를 사용하고 있기 때문에 해당기능을 사용할 수 없다. 단 low level client를 사용하면 이용할 수 있다.

https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#common-options-response-filtering

 

Common options | Elasticsearch Guide [7.13] | Elastic

The following options can be applied to all of the REST APIs. Pretty Resultsedit When appending ?pretty=true to any request made, the JSON returned will be pretty formatted (use it for debugging only!). Another option is to set ?format=yaml which will caus

www.elastic.co

 

 

 

공통 코드

그럼 두개 기능을 구현하기 위한 공통으로 사용하기 위한 코드를 작성해보자.

 

build.gradle

rest high level client와 web, lombok등 필요한 라이브러리 추가

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    compileOnly 'org.projectlombok:lombok'
    implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

 

StudentDto

document 단위가 될 student 정보를 보관할 dto

package com.wedul.estest.domain.student.dto;

import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

@Getter
@ToString
@NoArgsConstructor
public class StudentDto implements Serializable {

    private Long id;
    private String name;
    private int age;

    @Builder
    public StudentDto(Long id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

}

 

Proerties

yaml에서 연결정보를 읽어 보관할 Properties

package com.wedul.estest.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotNull;

@Setter
@Getter
@ConfigurationProperties("spring.elasticsearch")
@Validated
public class ElasticsearchConfiguration {
    @NotNull
    private String host;

    @NotNull
    private int port;
}

 

Configuration

client를 생성하기 위한 간단한 configuration

package com.wedul.estest.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

@Configuration
@EnableConfigurationProperties(ElasticsearchConfiguration.class)
public class RestHighLevelClientConfig {

    @Bean
    public RestHighLevelClient restHighLevelClient(ElasticsearchConfiguration configuration) {
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(configuration.getHost(), configuration.getPort()));
        return new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[0])));
    }

}

 

yaml 파일

spring:
  profiles:
    group:
      "dev": "beta"
      "test": "local"
    active: "test"

---
spring:
  config:
    activate:
      on-profile: local
  elasticsearch:
    host: localhost
    port: 9200

---
spring:
  config:
    activate:
      on-profile: beta
  elasticsearch:
    host: localhost
    port: 9200

 

 

기존 high level rest client bulk Response

그럼 기존에 high level rest client에서 제공하는 bulk api를 사용하면 어느 정도의 response가 들어오는지 확인하기 위한 코드를 작성해보자.

 

query builder

bulk update request 생성을 위한 query builder

package com.wedul.estest.domain.student.querybuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class StudentQueryBuilder {

    private static final String ALIAS = "student";
    private final ObjectMapper objectMapper;

    public BulkRequest updateStudents(List<StudentDto> studentDtoList) {
        BulkRequest bulkRequest = new BulkRequest();

        studentDtoList.stream().map(studentDto -> {
            UpdateRequest updateRequest = new UpdateRequest();
            try {
                String doc = objectMapper.writeValueAsString(studentDto);
                updateRequest.index(ALIAS)
                    .id(studentDto.getId().toString())
                    .doc(doc, XContentType.JSON)
                    .docAsUpsert(true);
            } catch (JsonProcessingException e) {
                log.error("request create error", e);
            }
            return updateRequest;
        }).forEach(bulkRequest::add);
        return bulkRequest;
    }

}

 

service

package com.wedul.estest.domain.student.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import com.wedul.estest.domain.student.querybuilder.StudentQueryBuilder;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpEntity;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
public class StudentService {

    private final RestHighLevelClient restHighLevelClient;
    private final StudentQueryBuilder studentQueryBuilder;

    public BulkResponse updateStudent(List<StudentDto> studentDtos) throws IOException {
         return restHighLevelClient.bulk(studentQueryBuilder.updateStudents(studentDtos), RequestOptions.DEFAULT);
    }

}

 

test code

reponse 출력을 위해 만든 테스트 코드로 실제 기능 검증을 위해 만든건 아니다.

package com.wedul.estest.domain.student.service;

import com.wedul.estest.domain.student.dto.StudentDto;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Response;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@SpringBootTest
class StudentServiceTest {

    @Autowired
    private StudentService studentService;

    @Test
    @DisplayName("bulk update response 확인")
    void bulk_update_response_check() throws IOException {
        // given
        List<StudentDto> studentDtoList = Arrays.asList(StudentDto.builder()
                .age(10)
                .id(1L)
                .name("wedul")
                .build()
        );

        // when
        BulkResponse bulkItemResponses = studentService.updateStudent(studentDtoList);

        // then
        Arrays.stream(bulkItemResponses.getItems()).forEach(item -> {
            System.out.println(item.getResponse());
        });
    }

}

 

그럼 테스트 코드를 사용해서 실제 출력되는 response 형태와 사이즈를 확인해보면 407 byte이고 items 내부에 데이터만 확인해보면 347byte가 되는걸 알 수 있다. 만약 1000개씩 bulk를 주기적으로 때린다고 하면 338kb 정도의 응답을 불필요하게 주고 받게 되는데 물론 이게 큰 영향을 주지는 않겠지만 불필요하다면 받지 않는게 좋을 것 같다.

{
  "took" : 6,
  "errors" : false,
  "items" : [
    {
      "update" : {
        "_index" : "student",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 6,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 6,
        "_primary_term" : 1,
        "status" : 200
      }
    }
  ]
}

 

 

low level client를 사용해 filter_path 적용

그럼 low level client를 사용해서 error내용을 제외하고는 받지 않도록 추가해보자

 

query builder

low level client의 경우 실제 rest api를 요청하는 방식과 동일하기 때문에 동일한 요청문을 string 형태로 만들어줘야한다. bulk api의 request body 형태는 이곳을 참조하면 된다.

 

package com.wedul.estest.domain.student.querybuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class StudentQueryBuilder {

    private static final String ALIAS = "student";
    private final ObjectMapper objectMapper;

	public Request updateStudentJson(List<StudentDto> studentDtoList) {
        Request request = new Request("POST", "_bulk");
        request.addParameter("filter_path", "items.*.error");
        String json = studentDtoList.stream().map(studentDto -> {
            try {
                String update = "{ \"update\" : {\"_id\" : " + studentDto.getId() + ", \"_index\" : \"" + ALIAS + "\"} }";
                String body = "{ \"doc\" : " + objectMapper.writeValueAsString(studentDto) + ", \"doc_as_upsert\" : true }";
                return update + "\n" + body + "\n";
            } catch (JsonProcessingException e) {
                log.error("request create error", e);
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.joining());
        request.setJsonEntity(json);
        return request;
    }

}

여기서 주의 할 점은 각 json object 사이에 \n의 개행문자가 있어야 한다. 만약 존재하지 않으면 “The bulk request must be terminated by a newline [\n]”

에러가 발생된다.

 

 

service

low level client를 통해 studentQueryBuilder를 통해 생성되어 받은  reqeust 내용을 호출한다.

package com.wedul.estest.domain.student.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import com.wedul.estest.domain.student.querybuilder.StudentQueryBuilder;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpEntity;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
public class StudentService {

    private final RestHighLevelClient restHighLevelClient;
    private final StudentQueryBuilder studentQueryBuilder;
    private final ObjectMapper objectMapper;

	public Response updateStudentUseLowLevelClient(List<StudentDto> studentDtos) throws IOException {
        return restHighLevelClient.getLowLevelClient().performRequest(studentQueryBuilder.updateStudentJson(studentDtos));
    }

}

 

test code

reponse 출력을 위해 만든 테스트 코드로 실제 기능 검증을 위해 만든건 아니다.

package com.wedul.estest.domain.student.service;

import com.wedul.estest.domain.student.dto.StudentDto;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Response;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@SpringBootTest
class StudentServiceTest {

    @Autowired
    private StudentService studentService;

    @Test
    @DisplayName("low level client 사용")
    void low_level_client_test() throws IOException {
        // given
        List<StudentDto> studentDtoList = Arrays.asList(StudentDto.builder()
                .age(10)
                .id(1L)
                .name("wedul")
                .build(),
            StudentDto.builder()
                .age(20)
                .id(2L)
                .name("chul")
                .build()
        );

        // when
        Response response = studentService.updateStudentUseLowLevelClient(studentDtoList);

        // then
        InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
        Stream<String> streamOfString= new BufferedReader(reader).lines();
        String streamToString = streamOfString.collect(Collectors.joining());
        System.out.println(streamToString);
    }

}

 

출력 결과는 {} 빈 object이다 filter_path에서 적용했던 items.*.error 내용이 없기 때문에 출력되어 나올 데이터가 없기 때문이다.

 

 

주의 (Aws에 Es를 올려서 사용하는경우)

Aws에 Elasticsearch를 올려서 사용하는 경우에는 아래와 같이 _bulk url앞에 /를 붙여서 사용해야한다. 정확한 이유는 모르겠지만 elb에서 400 bad request라는 응답을 계속 돌려주기에 정상적으로 요청이 가지못한다.

	public Request updateStudentJson(List<StudentDto> studentDtoList) {
        Request request = new Request("POST", "/_bulk");

참고 

https://github.com/elastic/elasticsearch/issues/28923

 

결론


이렇게 작은 response라고 해도 transfer data를 줄여서 네트워크 비용과 불필요한 deserialize 작업들을 제거할 수 있으면 훨씬 장점이 될 것 같고 bulk api는 보통 5~15mb 수준에서 데이터를 보내기 때문에 위 student 기준으로 실질적으로 bulk 작업시 보내게 되는 데이터의 양은 더 많을 것이다. 이때 filter_path를 적용하게 되면 최소 몇 mb 수준의 불필요한 응답을 매 요청 시 마다 줄일 수 있을 것이다. 

 

하지만 그에 따른 단점으로 low level client를 사용해야하다 보니 코드 작성도 번거롭고 깔끔하지 못하다. 이 정도는 불필요한 사양을 줄이고 속도 향상에 이득이 된다고 하면 크게 문제가 될만한 상황은 아니긴 하다.

 

 

상황에 따라 잘 사용하는게 중요할 것 같다. 

github 소스 : https://github.com/weduls/es-test

반응형