private final EsClient client;
private final String indexName;
private boolean large = false;
+ private long flushByteSize = FLUSH_BYTE_SIZE;
private boolean disableRefresh = false;
private BulkRequestBuilder bulkRequest = null;
private Map<String, Object> largeInitialSettings = null;
return this;
}
+ public BulkIndexer setFlushByteSize(long flushByteSize) {
+ this.flushByteSize = flushByteSize;
+ return this;
+ }
+
/**
* By default refresh of index is executed in method {@link #stop()}. Set to true
* to disable refresh.
public void add(ActionRequest request) {
bulkRequest.request().add(request);
- if (bulkRequest.request().estimatedSizeInBytes() >= FLUSH_BYTE_SIZE) {
+ if (bulkRequest.request().estimatedSizeInBytes() >= flushByteSize) {
executeBulk();
}
}
searchRequest
.setScroll(TimeValue.timeValueMinutes(5))
.setSearchType(SearchType.SCAN)
+ .setSize(100)
// load only doc ids, not _source fields
.setFetchSource(false);
// Same semaphore can't be reused because of potential deadlock (requires to acquire
// two locks)
SearchResponse searchResponse = searchRequest.get();
- searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).get();
- for (SearchHit hit : searchResponse.getHits()) {
- add(client.prepareDelete(hit.index(), hit.type(), hit.getId()).request());
+
+ while (true) {
+ searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
+ .setScroll(TimeValue.timeValueMinutes(5))
+ .get();
+ SearchHit[] hits = searchResponse.getHits().getHits();
+ for (SearchHit hit : hits) {
+ add(client.prepareDelete(hit.index(), hit.type(), hit.getId()).request());
+ }
+ if (hits.length == 0) {
+ break;
+ }
}
}
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Rule;
import org.junit.Test;
+import java.util.Map;
+
import static org.assertj.core.api.Assertions.assertThat;
public class BulkIndexerTest {
assertThat(replicas()).isEqualTo(1);
BulkIndexer indexer = new BulkIndexer(esTester.client(), FakeIndexDefinition.INDEX)
+ .setFlushByteSize(500)
.setLarge(true);
indexer.start();
assertThat(replicas()).isEqualTo(1);
}
+ @Test
+ public void bulk_delete() throws Exception {
+ int max = 500;
+ int removeFrom = 200;
+ Map[] docs = new Map[max];
+ for (int i = 0; i < max; i++) {
+ docs[i] = ImmutableMap.of(FakeIndexDefinition.INT_FIELD, i);
+ }
+ esTester.putDocuments(FakeIndexDefinition.INDEX, FakeIndexDefinition.TYPE, docs);
+ assertThat(count()).isEqualTo(max);
+
+ SearchRequestBuilder req = esTester.client().prepareSearch(FakeIndexDefinition.INDEX)
+ .setTypes(FakeIndexDefinition.TYPE)
+ .setQuery(QueryBuilders.filteredQuery(
+ QueryBuilders.matchAllQuery(),
+ FilterBuilders.rangeFilter(FakeIndexDefinition.INT_FIELD).gte(removeFrom)));
+ BulkIndexer.delete(esTester.client(), FakeIndexDefinition.INDEX, req);
+
+ assertThat(count()).isEqualTo(removeFrom);
+ }
+
@Test
public void disable_refresh() throws Exception {
BulkIndexer indexer = new BulkIndexer(esTester.client(), FakeIndexDefinition.INDEX)