이전글에서 작성하였듯이 계속해서 쓰기 작업 시 발생할 수 있는 순단을 줄이기 위해서 여러가지 방법을 찾고 있다.
그중 쓰기 작업이 많이 발생할 때 불필요한 response를 줄이기 위해서 filter_path를 적용해보고자 한다.
Filter Path
rest api 작업 시 필요한 응답값만 받을 수 있는 기능이다. 하지만 Java High Level REST Client에서는 transport layer를 사용하고 있기 때문에 해당기능을 사용할 수 없다. 단 low level client를 사용하면 이용할 수 있다.
공통 코드
그럼 두개 기능을 구현하기 위한 공통으로 사용하기 위한 코드를 작성해보자.
build.gradle
rest high level client와 web, lombok등 필요한 라이브러리 추가
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-validation'
compileOnly 'org.projectlombok:lombok'
implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
StudentDto
document 단위가 될 student 정보를 보관할 dto
package com.wedul.estest.domain.student.dto;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
@Getter
@ToString
@NoArgsConstructor
public class StudentDto implements Serializable {
private Long id;
private String name;
private int age;
@Builder
public StudentDto(Long id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
}
Proerties
yaml에서 연결정보를 읽어 보관할 Properties
package com.wedul.estest.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.NotNull;
@Setter
@Getter
@ConfigurationProperties("spring.elasticsearch")
@Validated
public class ElasticsearchConfiguration {
@NotNull
private String host;
@NotNull
private int port;
}
Configuration
client를 생성하기 위한 간단한 configuration
package com.wedul.estest.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
@EnableConfigurationProperties(ElasticsearchConfiguration.class)
public class RestHighLevelClientConfig {
@Bean
public RestHighLevelClient restHighLevelClient(ElasticsearchConfiguration configuration) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(configuration.getHost(), configuration.getPort()));
return new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[0])));
}
}
yaml 파일
spring:
profiles:
group:
"dev": "beta"
"test": "local"
active: "test"
---
spring:
config:
activate:
on-profile: local
elasticsearch:
host: localhost
port: 9200
---
spring:
config:
activate:
on-profile: beta
elasticsearch:
host: localhost
port: 9200
기존 high level rest client bulk Response
그럼 기존에 high level rest client에서 제공하는 bulk api를 사용하면 어느 정도의 response가 들어오는지 확인하기 위한 코드를 작성해보자.
query builder
bulk update request 생성을 위한 query builder
package com.wedul.estest.domain.student.querybuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class StudentQueryBuilder {
private static final String ALIAS = "student";
private final ObjectMapper objectMapper;
public BulkRequest updateStudents(List<StudentDto> studentDtoList) {
BulkRequest bulkRequest = new BulkRequest();
studentDtoList.stream().map(studentDto -> {
UpdateRequest updateRequest = new UpdateRequest();
try {
String doc = objectMapper.writeValueAsString(studentDto);
updateRequest.index(ALIAS)
.id(studentDto.getId().toString())
.doc(doc, XContentType.JSON)
.docAsUpsert(true);
} catch (JsonProcessingException e) {
log.error("request create error", e);
}
return updateRequest;
}).forEach(bulkRequest::add);
return bulkRequest;
}
}
service
package com.wedul.estest.domain.student.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import com.wedul.estest.domain.student.querybuilder.StudentQueryBuilder;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpEntity;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class StudentService {
private final RestHighLevelClient restHighLevelClient;
private final StudentQueryBuilder studentQueryBuilder;
public BulkResponse updateStudent(List<StudentDto> studentDtos) throws IOException {
return restHighLevelClient.bulk(studentQueryBuilder.updateStudents(studentDtos), RequestOptions.DEFAULT);
}
}
test code
reponse 출력을 위해 만든 테스트 코드로 실제 기능 검증을 위해 만든건 아니다.
package com.wedul.estest.domain.student.service;
import com.wedul.estest.domain.student.dto.StudentDto;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Response;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SpringBootTest
class StudentServiceTest {
@Autowired
private StudentService studentService;
@Test
@DisplayName("bulk update response 확인")
void bulk_update_response_check() throws IOException {
// given
List<StudentDto> studentDtoList = Arrays.asList(StudentDto.builder()
.age(10)
.id(1L)
.name("wedul")
.build()
);
// when
BulkResponse bulkItemResponses = studentService.updateStudent(studentDtoList);
// then
Arrays.stream(bulkItemResponses.getItems()).forEach(item -> {
System.out.println(item.getResponse());
});
}
}
그럼 테스트 코드를 사용해서 실제 출력되는 response 형태와 사이즈를 확인해보면 407 byte이고 items 내부에 데이터만 확인해보면 347byte가 되는걸 알 수 있다. 만약 1000개씩 bulk를 주기적으로 때린다고 하면 338kb 정도의 응답을 불필요하게 주고 받게 되는데 물론 이게 큰 영향을 주지는 않겠지만 불필요하다면 받지 않는게 좋을 것 같다.
{
"took" : 6,
"errors" : false,
"items" : [
{
"update" : {
"_index" : "student",
"_type" : "_doc",
"_id" : "1",
"_version" : 6,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 6,
"_primary_term" : 1,
"status" : 200
}
}
]
}
low level client를 사용해 filter_path 적용
그럼 low level client를 사용해서 error내용을 제외하고는 받지 않도록 추가해보자
query builder
low level client의 경우 실제 rest api를 요청하는 방식과 동일하기 때문에 동일한 요청문을 string 형태로 만들어줘야한다. bulk api의 request body 형태는 이곳을 참조하면 된다.
package com.wedul.estest.domain.student.querybuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class StudentQueryBuilder {
private static final String ALIAS = "student";
private final ObjectMapper objectMapper;
public Request updateStudentJson(List<StudentDto> studentDtoList) {
Request request = new Request("POST", "_bulk");
request.addParameter("filter_path", "items.*.error");
String json = studentDtoList.stream().map(studentDto -> {
try {
String update = "{ \"update\" : {\"_id\" : " + studentDto.getId() + ", \"_index\" : \"" + ALIAS + "\"} }";
String body = "{ \"doc\" : " + objectMapper.writeValueAsString(studentDto) + ", \"doc_as_upsert\" : true }";
return update + "\n" + body + "\n";
} catch (JsonProcessingException e) {
log.error("request create error", e);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.joining());
request.setJsonEntity(json);
return request;
}
}
여기서 주의 할 점은 각 json object 사이에 \n의 개행문자가 있어야 한다. 만약 존재하지 않으면 “The bulk request must be terminated by a newline [\n]”
에러가 발생된다.
service
low level client를 통해 studentQueryBuilder를 통해 생성되어 받은 reqeust 내용을 호출한다.
package com.wedul.estest.domain.student.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wedul.estest.domain.student.dto.StudentDto;
import com.wedul.estest.domain.student.querybuilder.StudentQueryBuilder;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpEntity;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class StudentService {
private final RestHighLevelClient restHighLevelClient;
private final StudentQueryBuilder studentQueryBuilder;
private final ObjectMapper objectMapper;
public Response updateStudentUseLowLevelClient(List<StudentDto> studentDtos) throws IOException {
return restHighLevelClient.getLowLevelClient().performRequest(studentQueryBuilder.updateStudentJson(studentDtos));
}
}
test code
reponse 출력을 위해 만든 테스트 코드로 실제 기능 검증을 위해 만든건 아니다.
package com.wedul.estest.domain.student.service;
import com.wedul.estest.domain.student.dto.StudentDto;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Response;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SpringBootTest
class StudentServiceTest {
@Autowired
private StudentService studentService;
@Test
@DisplayName("low level client 사용")
void low_level_client_test() throws IOException {
// given
List<StudentDto> studentDtoList = Arrays.asList(StudentDto.builder()
.age(10)
.id(1L)
.name("wedul")
.build(),
StudentDto.builder()
.age(20)
.id(2L)
.name("chul")
.build()
);
// when
Response response = studentService.updateStudentUseLowLevelClient(studentDtoList);
// then
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
Stream<String> streamOfString= new BufferedReader(reader).lines();
String streamToString = streamOfString.collect(Collectors.joining());
System.out.println(streamToString);
}
}
출력 결과는 {} 빈 object이다 filter_path에서 적용했던 items.*.error 내용이 없기 때문에 출력되어 나올 데이터가 없기 때문이다.
주의 (Aws에 Es를 올려서 사용하는경우)
Aws에 Elasticsearch를 올려서 사용하는 경우에는 아래와 같이 _bulk url앞에 /를 붙여서 사용해야한다. 정확한 이유는 모르겠지만 elb에서 400 bad request라는 응답을 계속 돌려주기에 정상적으로 요청이 가지못한다.
public Request updateStudentJson(List<StudentDto> studentDtoList) {
Request request = new Request("POST", "/_bulk");
참고
https://github.com/elastic/elasticsearch/issues/28923
결론
이렇게 작은 response라고 해도 transfer data를 줄여서 네트워크 비용과 불필요한 deserialize 작업들을 제거할 수 있으면 훨씬 장점이 될 것 같고 bulk api는 보통 5~15mb 수준에서 데이터를 보내기 때문에 위 student 기준으로 실질적으로 bulk 작업시 보내게 되는 데이터의 양은 더 많을 것이다. 이때 filter_path를 적용하게 되면 최소 몇 mb 수준의 불필요한 응답을 매 요청 시 마다 줄일 수 있을 것이다.
하지만 그에 따른 단점으로 low level client를 사용해야하다 보니 코드 작성도 번거롭고 깔끔하지 못하다. 이 정도는 불필요한 사양을 줄이고 속도 향상에 이득이 된다고 하면 크게 문제가 될만한 상황은 아니긴 하다.
상황에 따라 잘 사용하는게 중요할 것 같다.
github 소스 : https://github.com/weduls/es-test
'데이터베이스 > Elasticsearch' 카테고리의 다른 글
Line 세미나. 대규모 음악 데이터 검색 기능을 위한 Elasticsearch 구성 및 속도 개선 방법 - 2. 클러스터 튜닝 (0) | 2021.11.14 |
---|---|
Line 세미나. 대규모 음악 데이터 검색 기능을 위한 Elasticsearch 구성 및 속도 개선 방법 - 1. 검색 쿼리 개선 (0) | 2021.11.14 |
Bulk Index 진행 시 search api 느려지는 현상 해결 방법 리서치 (0) | 2021.07.24 |
Nested field에 대한 대체 필드 flattened type (0) | 2021.06.14 |
Elasticsearch의 Translog 설명 (0) | 2021.01.22 |