RestHighLevelClient를 사용하여 search after 기능 구현하기

web/Spring|2019. 11. 14. 17:55

https://wedul.site/541에서 search after 기능을 사용해서 검색을 하는 이유를 알아봤었다.

그럼 spring boot에서 RestHighLevelClient를 이용해서 search after를 구현을 해보자.

 

1. Mapping

우선 index가 필요한데 간단하게 상품명과 지역 가격정보들을 가지고 있는 wedul_product 인덱스를 만들어 사용한다.

{
    "settings": {
        "index": {
            "analysis": {
                "tokenizer": {
                    "nori_user_dict": {
                        "type": "nori_tokenizer",
                        "decompound_mode": "mixed",
                        "user_dictionary": "analysis/userdict_ko.txt"
                    }
                },
                "analyzer": {
                    "wedul_analyzer": {
                        "tokenizer": "nori_user_dict",
                        "filter": [
                            "synonym"
                        ]
                    }
                },
                "filter": {
                    "synonym": {
                        "type": "synonym",
                        "synonyms_path": "analysis/synonyms.txt"
                    }
                }
            }
        }
    },
    "mappings": {
        "dynamic": "false",
        "properties": {
            "productId": {
                "type": "keyword"
            },
            "place": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "message": {
                "type": "text"
            },
            "query": {
                "type": "percolator"
            },
            "name": {
                "type": "text",
                "analyzer": "wedul_analyzer",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "price": {
                "type": "integer"
            },
            "updateAt": {
                "type": "date",
                "format": "epoch_second"
            },
            "createAt": {
                "type": "date",
                "format": "epoch_second"
            }
        }
    }
}

값은 적당하게 3개정도 삽입하였다.

저장되어 있는 초기값.

 

2. 라이브러리 

사용에 필요한 라이브러리들을 gradle을 사용해서 추가한다. 

plugins {
    id 'org.springframework.boot' version '2.2.0.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

ext {
    set('elasticsearch.version', '7.4.2')
}

group = 'com.wedul'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
    maven { url "https://plugins.gradle.org/m2/" }
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.0'
    annotationProcessor 'org.projectlombok:lombok'
    testCompile group: 'org.mockito', name: 'mockito-all', version:'1.9.5'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'

    // gson
    compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'

    // elasticsearch
    compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.4.2'
    compile group: 'org.elasticsearch', name: 'elasticsearch', version: '7.4.2'
}

 

 

3.RestHighLevelClient configuration

restHighLevelClient 사용을 위한 Configuration 파일을 만들어주는데 id와 pw는 AppConfig라는 별도 properties를 관리하는 bean에서 받아서 사용하는데 base64로 인코딩되어있어서 이를 decoding후 사용한다. (부족한 코드는 글 맨 아래있는 github 링크 참조)

package com.wedul.study.common.config;

import com.wedul.study.common.util.EncodingUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

/**
 * study
 *
 * @author wedul
 * @since 2019-11-07
 **/
@Configuration
@Slf4j
public class ElasticsearchClientConfig implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {

    @Autowired
    AppConfig appConfig;

    private RestHighLevelClient restHighLevelClient;

    @Override
    public RestHighLevelClient getObject() {
        return restHighLevelClient;
    }

    @Override
    public Class<?> getObjectType() {
        return RestHighLevelClient.class;
    }

    @Override
    public void destroy() {
        try {
            if (null != restHighLevelClient) {
                restHighLevelClient.close();
            }
        } catch (Exception e) {
            log.error("Error closing ElasticSearch client: ", e);
        }
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    public void afterPropertiesSet() {
        restHighLevelClient = buildClient();
    }

    private RestHighLevelClient buildClient() {
        try {
            String id = EncodingUtil.decodingBase64(appConfig.getElasticsearchConfig().getId());
            String pw = EncodingUtil.decodingBase64(appConfig.getElasticsearchConfig().getPw());

            // 계정 설정
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(id, pw));

            // client 설정
            RestClientBuilder builder = RestClient.builder(
                new HttpHost(appConfig.getElasticsearchConfig().getIp(),
                    appConfig.getElasticsearchConfig().getPort(), "http"))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

            restHighLevelClient = new RestHighLevelClient(builder);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        return restHighLevelClient;
    }

}

 

 

4. Handler 추가

자주 사용되는 Elasticsearch 문법을 처리하기 위해서 만들어 놓은 ElasticsearchHandler에 search after에 사용 될 메소드를 추가한다. search after는 sort 필드가 없으면 사용이 불가능 하기 때문에 sort 필드가 없는 경우 에러를 전달한다.

public static SearchSourceBuilder searchAfter(Map<String, SortOrder> sortFields, QueryBuilder query, Object[] searchAfter, int size) {
    return searchAfterBuilder(sortFields, query, searchAfter,  size);
}

public static SearchSourceBuilder searchAfter(Map<String, SortOrder> sortFields, QueryBuilder query, Object[] searchAfter) {
    return searchAfterBuilder(sortFields, query, searchAfter, 20);
}

private static SearchSourceBuilder searchAfterBuilder(Map<String, SortOrder> sortFields, QueryBuilder query, Object[] searchAfter, int size) {
    SearchSourceBuilder builder = new SearchSourceBuilder();

    if (CollectionUtils.isEmpty(sortFields)) {
        throw new InternalServerException("잘못된 필드 요청입니다.");
    }

    sortFields.forEach((field, sort) -> {
        builder.sort(field, sort);
    });
    builder.size(size);
    builder.query(query);

    if (ArrayUtils.isNotEmpty(searchAfter)) {
        builder.searchAfter(searchAfter);
    }

    return builder;
}

 

 

5. 기능 구현

위의 기능들을 이용해서 실제로 구현해보자. productService와 productRepository 클래스를 통해서 구현하였다. 자세한 설명없이 간단하기 때문에 소스를 보면 알 수 있다. 

 

우선 최종 결과물로 사용될 클래스는 ElasticResult인데 다음과 같이 현재 요청이 마지막인지 표시하는 isLast와 다음 요청을 위해 보내줘야 하는 cursor값과 결과값 전체 total과 결과 리스트 list 필드가 존재한다.

@Builder
@Data
public class ElasticResult<T extends ElasticsearchDto> {

    private boolean isLast;
    private long total;
    private List<T> list;
    private Object[] cursor;

}

 

그 다음 service로직을 통해 결과를 얻어서 위 ElasticResult에 결과를 담아보자. products 메서드는 요청을 받아서 elasticsearch에 실제 조작요청을 하는 productRepository에 동작을 요청하고 값을 받아서 처리하는 메서드이다. 그리고 extractProductList는 결과값에서 ProductDto 값을 뽑아내는 메서드이다.

public ElasticResult<ProductDto> products(String index, Object[] searchAfter, int size) throws IOException {
    SearchResponse searchResponse = productRepository.products(index, searchAfter, size);
    SearchHits searchHits = searchResponse.getHits();
    int hitCnt = searchHits.getHits().length;
    boolean isLast = 0 == hitCnt || size > hitCnt;

    return ElasticResult.<ProductDto>builder()
        .cursor(isLast ? null : searchHits.getHits()[hitCnt - 1].getSortValues())
        .isLast(isLast)
        .list(extractProductList(searchHits))
        .total(searchHits.getTotalHits().value)
        .build();
}

private List<ProductDto> extractProductList(SearchHits searchHits) {
    List<ProductDto> productList = new ArrayList<>();

    searchHits.forEach(hit -> {
        Map<String, Object> result = hit.getSourceAsMap();

        productList.add(ProductDto.builder()
            .name(String.valueOf(result.get("name")))
            .productId(String.valueOf(result.get("productId")))
            .place(String.valueOf(result.get("place")))
            .price(Integer.valueOf(result.get("price").toString()))
            .updateAt(Long.valueOf(result.get("updateAt").toString()))
            .createAt(Long.valueOf(result.get("createAt").toString())).build());
    });

    return productList;
}

 

그리고 마지막으로 es에 직접적으로 콜을 하는 productRepository 이다. 여기서 정렬 키워드는 name과 place를 사용한다.

public SearchResponse products(String index, Object[] searchAfter, int size) throws IOException {
    SearchRequest searchRequest = new SearchRequest(index);
    Map<String, SortOrder> sorts = new HashMap<String, SortOrder>() {
        {
            put("name.keyword", SortOrder.DESC);
            put("place.keyword", SortOrder.DESC);
        }
    };

    SearchSourceBuilder searchSourceBuilder = ElasticsearchHandler.searchAfter(sorts, QueryBuilders.matchAllQuery(), searchAfter, size);
    searchRequest.source(searchSourceBuilder);
    return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
}

 

 

6. 테스트

그럼 위에 내용이 잘 구현되었는지 테스트를 해보자. 총 3개의 데이터가 있는데 이 데이터를 1개씩 search after를 통해서 값을 받아서 저장하고 한번에 출력하도록 해보자.

@Test
@DisplayName("search after")
public void searchAfter() throws IOException {
    ElasticResult<ProductDto> result = productService.products(PRODUCT_INDEX, new Object[]{}, 1);
    List<ProductDto> productDtos = new ArrayList<>();

    while(result != null && !result.isLast()) {
        productDtos.addAll(result.getList());
        result = productService.products(PRODUCT_INDEX, result.getCursor(), 1);
    }
    productDtos.addAll(result.getList());

    productDtos.forEach(productDto -> {
        System.out.println("이름 : " + productDto.getName());
        System.out.println("장소 : " + productDto.getPlace());
    });
}

결과는 정상적으로 3가지 모두 잘 출력되는 걸 알 수있다.

 

우선 기능 구현을 해보기 위해서 진행하였는데 더 다듬어야 할 것같다.

자세한 소스는 github참조

댓글()

Elasticsearch에서 search_after 기능 사용하여 조회하기

elasticsearch에서 search_after를 이용하여 데이터를 조회하는 방법을 정리해보자.

우선 사용할 인덱스를 생성하자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PUT wedul
{
  "mappings": {
    "cjung": {
      "properties": {
        "id": {
          "type": "keyword"
        },
        "name": {
          "type": "text",
          "analyzer": "nori",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}
cs


생성된 인덱스에 데이터 몇개만 삽입하여보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST wedul/cjung?pretty { "id": "wemakeprice", "name": "원더쇼핑" } POST wedul/cjung { "id": "dauns", "name": "다운" }
cs


그리고 일반적으로 사용하는 방식으로 데이터를 조회해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
GET wedul/cjung/_search
{
  "from": 0, 
  "size": 2, 
  "query": {
    "match_all": {}
  }
}
 
 
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "wedul",
        "_type" : "cjung",
        "_id" : "_update",
        "_score" : 1.0,
        "_source" : {
          "doc" : {
            "id" : "wedul",
            "name" : "정철"
          }
        }
      },
      {
        "_index" : "wedul",
        "_type" : "cjung",
        "_id" : "tSNYH2cBvWxWFgHQJ6J4",
        "_score" : 1.0,
        "_source" : {
          "doc" : {
            "id" : "dauns",
            "name" : "다운"
          }
        }
      }
    ]
  }
}
 
cs

정상적으로 조회가 된다. 하지만 여기서 만약 size가 10000이 넘은 곳을 검색하고 싶다면 어떻게 될까? 저번에 공부해서 정리한 글 처럼 10000개 이상에 데이터에 접근하려고 하면 오류가 발생한다.

참고 : https://wedul.tistory.com/518?category=680504


그럼 어떻게 조회해야 할까? 그래서 제공되는 방법이 search_after를 이용하여 검색하는 방법이다.

search_after는 라이브 커서를 제공하여 다음 페이지를 계속 조회하는 방식으로 검색기능을 제공한다. 


기본적인 검색 방법은 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET wedul/cjung/_search
{
  "sort": [
    {
      "id": {
        "order": "asc"
      },"name.keyword": {
        "order": "desc"
      }
    }
  ], 
  "size": 1, 
  "query": {
    "match_all": {}
  }
}
cs


이렇게 검색을하게 되면 다음과 같이 결과가 나오는데 여기서 나온 sort  필드를 이용하여 다음 필드를 조회해 나가는 것이 search_after 기능이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "wedul",
        "_type" : "cjung",
        "_id" : "uyNoH2cBvWxWFgHQ86L9",
        "_score" : null,
        "_source" : {
          "id" : "wemakeprice",
          "name" : "원더쇼핑"
        },
        "sort" : [
          "wemakeprice",
          "원더쇼핑"
        ]
      }
    ]
  }
}
 
cs


위에 나온 검색결과 sort에 출력된 wemakeprice와 원더쇼핑을 사용하여 다음 데이터를 조회한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET wedul/cjung/_search
{
  "search_after": ["wemakeprice",
          "원더쇼핑"],
  "sort": [
    {
      "id": {
        "order": "asc"
      },"name.keyword": {
        "order": "desc"
      }
    }
  ], 
  "query": {
    "match_all": {}
  }
}
cs


그렇다면 저 위에 sort 필드는 과연 무엇인가 하고 생각이 들 수 있다. sort 필드는 바로 검색 dsl에서 사용했던 sort필드의 값 들이다. 이 값 다음에 나오는 데이터를 조회 하라는 뜻이다. 그렇기 때문에 무조건 search_after를 사용하기 위해서는 데이터를 정렬하는것이 필수이다. 그리고 정말 중요한 것은 그 sort필드에 들어가는 데이터중 하나는 무조건 unique한 값 이어야 한다는 것이다. 그렇지 않으면 어디서 부터 검색을 시작해야할지 알지 못하기 때문이다. 


참고

https://www.elastic.co/guide/en/elasticsearch/reference/master/search-request-search-after.html

댓글()