Using Bulk Processor
提供了一个简单的接口,可以根据请求的数量或大小,或者在给定的时间段后自动刷新批量操作。
要使用它,首先要创建一个处理器实例:
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
默认情况下,BulkProcessor:
bulkActions设置为1000
bulkSize设置为5 mb
不设置flushInterval
将concurrent请求设置为1,这意味着冲洗操作的异步执行。
将反向策略设置为一个指数后退,8次重试,开始延迟50毫秒。总等待时间大约是5.1秒。
Add Request
然后,您可以简单地将您的请求添加到BulkProcessor:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
Closing the Bulk Processoredit
当所有文档都加载到BulkProcessor时,它可以通过使用awaitClose 或close methods来关闭:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或:
bulkProcessor.close();
这两种方法都可以刷新任何剩余的文档,如果它们是通过设置flushInterval来安排的,则禁用所有其他预定的刷新。如果并发请求被启用,则“等待”方法将等待所有批量请求完成的指定超时,然后返回true;如果指定的等待时间在所有批量请求完成之前都已过期,则返回false。关闭方法不会等待任何剩余的批量请求来完成和立即退出。
Using Bulk Processor in testsedit
如果您正在使用ElasticSearch来运行测试,并且正在使用散货处理器来填充数据集,那么您应该更好地将并发请求的数量设置为0,这样批量的操作就会以一种同步的方式执行:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();
» 订阅本站:https://www.kgraph.cn