By Using Kafka we can easily horizontally scale our application to do asynchronous pagination in ElasticSearch.
Let’s say you have an ElasticSearch Index of 1,000,000 documents, and you need to run an operation on those documents. We already know how expensive the deep-paging in ElasticSearch is, especially index.max_result_window and doing Search ‘from:’.
GET /_search
{
"from": 5,
"size": 20,
"query": {
"match": {
"user.id": "jay"
}
}
}
Enter fullscreen mode Exit fullscreen mode
One way of overcoming the problem is to use search_after. In this case, your process is becoming synchronous, which means you cannot call the second chunk of data without having the results from the first call, for example:
GET /_search
{
"size": 10000,
"query": {
"match" : {
"user.id" : "jay"
}
},
"sort": [
{"@timestamp": "asc"}
]
}
Enter fullscreen mode Exit fullscreen mode
results:
{
"took" : ...,
"timed_out" : false,
"_shards" : ...,
"hits" : {
"total" : ...,
"max_score" : null,
"hits" : [
...
{
"_source" : ...,
"sort" : [
4098435132000
]
}
]
}
}
Enter fullscreen mode Exit fullscreen mode
As you can see, the results give you a sort value (4098435132000) where you can use in your second call-in “search_after” to get the next chunk as:
GET /_search
{
"size": 10000,
"query": {
"match" : {
"user.id" : "jay"
}
},
"sort": [
{"@timestamp": "asc"}
],
"search_after": [
4098435132000
]
}
Enter fullscreen mode Exit fullscreen mode
then get the next sort value and use it in your next call.
This process is sequentially asynchronous, which means to get through 1,000,000 documents you need to call ES 1,000,000/10,000=100 times one after the other.
What can go wrong?
- let’s say your application dies or runs out of memory during these calls
- how to know where to start after it failed
- maybe this process is very slow for your application use-case
Solution By Using “Slice”
One call to ES to get the count of the documents (which in this case is 1M), then create a Kafka producer to put the sequences (0–100) into your Kafka topic and define your partition size as the number of your consumer app. Let’s say you have 10 consumer applications.
Create a Kafka consumer app, which is calling Elastic Search as:
GET /_search?scroll=1m
{
"slice": {
"id":0,
"max":100
},
"size": 10000,
"sort": [
{"@timestamp": "asc"}
]
}
Enter fullscreen mode Exit fullscreen mode
The consumer gets the sequence number as a parameter and replaces it with the id. As long as all consumers have the same consumer “group.id” they can process these calls in parallel and have a resilient framework.
Implementation with Java/Spring Cloud
I happened to use java with spring cloud for my project.
Let’s run your Elastic Search and Kafka in docker-compose by running the following command:
# Elastic Search
docker-compose -f docker-compose-es.yml up -d
# Kafka
docker-compose -f docker-compose-kafka.yml up -d
Enter fullscreen mode Exit fullscreen mode
you can find the docker-compose-es.yml and docker-compose-kafka.yml in my Github account.
The producer first calls ES to get the count of the documents in the index, then divides the count by 500, which will be the number of documents you expect to be retrieved in every ES call.
public void paginationProcess() {
log.debug("paginationProcess called");
//call to ES and get the total count
Response response = restHighLevelClient.getLowLevelClient()
.performRequest(new Request("GET", String.format("%s/_count", INDEX_NAME)));
ResponseCountDto responseCountDto = objectMapper.readValue(EntityUtils.toString(response.getEntity()), ResponseCountDto.class);
log.debug("responseCountDto: {}", responseCountDto.getCount());
//let say you want to have a page size of 500 then count / 500
int max = responseCountDto.getCount() / 500;
log.debug("count: {} , max: {}", responseCountDto.getCount(), max);
//producer
IntStream.range(0, max).forEach(i -> paginationBinder.paginationOut()//
.send(MessageBuilder.withPayload(//
PaginationDto.builder()//
.id(i)//slice id
.max(max)// let say i want to have page size of 500 then: count / 500
.build())//
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()));
}
Enter fullscreen mode Exit fullscreen mode
Then the Consumer receives the slice number as a parameter. In case of any error in the consumer method, we retry it 5 times, and then it puts the message into to DLQ method.
@StreamListener(PaginationBinder.PAGINATION_IN)
public void paginationProcess(@Payload PaginationDto paginationDto) {
// Call ES
log.debug("paginationProcess: {}", paginationDto);
try {
Request request = new Request("GET", String.format("%s/_search?scroll=1m", INDEX_NAME));
//sorted by localDateTime and slice by id and max as parameters
request.setJsonEntity(String.format("{\"slice\":{\"id\":%s,\"max\":%s},\"size\":10000,\"sort\":[{\"localDateTime\":\"asc\"}]}", paginationDto.getId(), paginationDto.getMax()));
Response response = restHighLevelClient.getLowLevelClient().performRequest(request);
//do something with the response ...
log.debug("response: {}", response);
} catch (IOException e) {
e.printStackTrace();
}
}
Enter fullscreen mode Exit fullscreen mode
Note: Assume no new documents are inserting into the result set or at least within that period. Otherwise, we lose the consistency of the result during the pagination.
application.yml file
spring:
application:
name: es-pagination
cloud.stream:
bindings:
pagination-out:
destination: pagination
producer:
partition-count: 10
pagination-in:
destination: pagination
group: ${spring.application.name}.pagination-group
consumer:
maxAttempts: 5
pagination-in-dlq:
destination: paginationDLQ
group: ${spring.application.name}.pagination-group
kafka:
streams:
bindings:
pagination-in:
consumer:
enableDlq: true
dlqName: paginationDLQ
autoCommitOnError: true
autoCommitOffset: true
binder:
autoAddPartitions: true
min-partition-count: 10
configuration:
commit.interval.ms: 100
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Enter fullscreen mode Exit fullscreen mode
Avoid toasting the memory
If the number of slices is bigger than the number of shards, the slice filter will become very slow on the first calls. It has a complexity of O(N) and a memory cost which equals N bits per slice where N is the total number of documents in the shard. After a few calls, the filter should be cached and subsequent calls should be faster, but you should limit the number of sliced queries you perform in parallel to avoid the memory explosion.
You can find the project in my Github account:
https://github.com/ehsaniara/es-kafka-pagination
Refrence:
https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
暂无评论内容