*/
package org.sonar.server.es;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
+import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkProcessor.Listener;
+import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
* <ul>
* <li>bulk request is sent on the wire when its size is higher than 5Mb</li>
* <li>on large table indexing, replicas and automatic refresh can be temporarily disabled</li>
- * <li>index refresh is optional (enabled by default)</li>
* </ul>
*/
public class BulkIndexer implements Startable {
private static final Logger LOGGER = Loggers.get(BulkIndexer.class);
- private static final long FLUSH_BYTE_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB).bytes();
+ private static final ByteSizeValue FLUSH_BYTE_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
+ private static final int FLUSH_ACTIONS = -1;
private static final String REFRESH_INTERVAL_SETTING = "index.refresh_interval";
- private static final String ALREADY_STARTED_MESSAGE = "Bulk indexing is already started";
+ private static final int DEFAULT_NUMBER_OF_SHARDS = 5;
private final EsClient client;
private final String indexName;
- private Size size = Size.REGULAR;
- private long flushByteSize = FLUSH_BYTE_SIZE;
- private BulkRequestBuilder bulkRequest = null;
- private Map<String, Object> largeInitialSettings = null;
+ private final BulkProcessor bulkProcessor;
private final AtomicLong counter = new AtomicLong(0L);
- private final int concurrentRequests;
- private final Semaphore semaphore;
- private final ProgressLogger progress;
+ private final SizeHandler sizeHandler;
- public BulkIndexer(EsClient client, String indexName) {
+ public BulkIndexer(EsClient client, String indexName, Size size) {
this.client = client;
this.indexName = indexName;
- this.progress = new ProgressLogger(format("Progress[BulkIndexer[%s]]", indexName), counter, LOGGER)
- .setPluralLabel("requests");
-
- // see https://jira.sonarsource.com/browse/SONAR-8075
- this.concurrentRequests = Math.max(1, Runtime.getRuntime().availableProcessors() / 5);
- this.semaphore = new Semaphore(concurrentRequests);
- }
-
- public enum Size {
- /** Use this size for a limited number of documents. */
- REGULAR,
-
- /** Use this size for initial indexing and if you expect unusual huge numbers of documents. */
- LARGE;
- }
-
- /**
- * Large indexing is an heavy operation that populates an index generally from scratch. Replicas and
- * automatic refresh are disabled during bulk indexing and lucene segments are optimized at the end.
- */
- public BulkIndexer setSize(Size size) {
- Preconditions.checkState(bulkRequest == null, ALREADY_STARTED_MESSAGE);
- this.size = size;
- return this;
- }
-
- public BulkIndexer setFlushByteSize(long flushByteSize) {
- this.flushByteSize = flushByteSize;
- return this;
+ this.sizeHandler = size.createHandler(Runtime2.INSTANCE);
+ this.bulkProcessor = BulkProcessor.builder(client.nativeClient(), new BulkProcessorListener())
+ .setBackoffPolicy(BackoffPolicy.exponentialBackoff())
+ .setBulkSize(FLUSH_BYTE_SIZE)
+ .setBulkActions(FLUSH_ACTIONS)
+ .setConcurrentRequests(sizeHandler.getConcurrentRequests())
+ .build();
}
@Override
public void start() {
- Preconditions.checkState(bulkRequest == null, ALREADY_STARTED_MESSAGE);
- if (size == Size.LARGE) {
- largeInitialSettings = Maps.newHashMap();
- Map<String, Object> bulkSettings = Maps.newHashMap();
- GetSettingsResponse settingsResp = client.nativeClient().admin().indices().prepareGetSettings(indexName).get();
-
- // deactivate replicas
- int initialReplicas = Integer.parseInt(settingsResp.getSetting(indexName, IndexMetaData.SETTING_NUMBER_OF_REPLICAS));
- if (initialReplicas > 0) {
- largeInitialSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, initialReplicas);
- bulkSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0);
- }
-
- // deactivate periodical refresh
- String refreshInterval = settingsResp.getSetting(indexName, REFRESH_INTERVAL_SETTING);
- largeInitialSettings.put(REFRESH_INTERVAL_SETTING, refreshInterval);
- bulkSettings.put(REFRESH_INTERVAL_SETTING, "-1");
+ sizeHandler.beforeStart(this);
+ counter.set(0L);
+ }
- updateSettings(bulkSettings);
+ @Override
+ public void stop() {
+ try {
+ bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Elasticsearch bulk requests still being executed after 10 minutes", e);
}
- bulkRequest = client.prepareBulk().setRefresh(false);
- counter.set(0L);
- progress.start();
+ client.prepareRefresh(indexName).get();
+ sizeHandler.afterStop(this);
}
public void add(ActionRequest<?> request) {
- bulkRequest.request().add(request);
- if (bulkRequest.request().estimatedSizeInBytes() >= flushByteSize) {
- executeBulk();
- }
+ bulkProcessor.add(request);
}
public void addDeletion(SearchRequestBuilder searchRequest) {
* Note that the parameter indexName could be removed if progress logs are not needed.
*/
public static void delete(EsClient client, String indexName, SearchRequestBuilder searchRequest) {
- BulkIndexer bulk = new BulkIndexer(client, indexName);
+ BulkIndexer bulk = new BulkIndexer(client, indexName, Size.REGULAR);
bulk.start();
bulk.addDeletion(searchRequest);
bulk.stop();
}
- @Override
- public void stop() {
- if (bulkRequest.numberOfActions() > 0) {
- executeBulk();
+ private final class BulkProcessorListener implements Listener {
+
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ // no action required
}
- try {
- if (semaphore.tryAcquire(concurrentRequests, 10, TimeUnit.MINUTES)) {
- semaphore.release(concurrentRequests);
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ counter.addAndGet(response.getItems().length);
+
+ for (BulkItemResponse item : response.getItems()) {
+ if (item.isFailed()) {
+ LOGGER.error("index [{}], type [{}], id [{}], message [{}]", item.getIndex(), item.getType(), item.getId(), item.getFailureMessage());
+ }
}
- } catch (InterruptedException e) {
- throw new IllegalStateException("Elasticsearch bulk requests still being executed after 10 minutes", e);
}
- progress.stop();
- client.prepareRefresh(indexName).get();
- if (size == Size.LARGE) {
- // optimize lucene segments and revert index settings
- // Optimization must be done before re-applying replicas:
- // http://www.elasticsearch.org/blog/performance-considerations-elasticsearch-indexing/
- client.prepareForceMerge(indexName).get();
- updateSettings(largeInitialSettings);
+ @Override
+ public void afterBulk(long executionId, BulkRequest req, Throwable e) {
+ LOGGER.error("Fail to execute bulk index request: " + req, e);
}
- bulkRequest = null;
}
- private void updateSettings(Map<String, Object> settings) {
- UpdateSettingsRequestBuilder req = client.nativeClient().admin().indices().prepareUpdateSettings(indexName);
- req.setSettings(settings);
- req.get();
+ public enum Size {
+ /** Use this size for a limited number of documents. */
+ REGULAR {
+ @Override
+ SizeHandler createHandler(Runtime2 runtime2) {
+ return new SizeHandler();
+ }
+ },
+
+ /**
+ * Large indexing is an heavy operation that populates an index generally from scratch. Replicas and
+ * automatic refresh are disabled during bulk indexing and lucene segments are optimized at the end.
+ * Use this size for initial indexing and if you expect unusual huge numbers of documents.
+ */
+ LARGE {
+ @Override
+ SizeHandler createHandler(Runtime2 runtime2) {
+ return new LargeSizeHandler(runtime2);
+ }
+ };
+
+ abstract SizeHandler createHandler(Runtime2 runtime2);
}
- private void executeBulk() {
- final BulkRequestBuilder req = this.bulkRequest;
- this.bulkRequest = client.prepareBulk().setRefresh(false);
- semaphore.acquireUninterruptibly();
- req.execute(new BulkResponseActionListener(req));
+ @VisibleForTesting
+ static class Runtime2 {
+ private static final Runtime2 INSTANCE = new Runtime2();
+
+ int getCores() {
+ return Runtime.getRuntime().availableProcessors();
+ }
+ }
+
+ static class SizeHandler {
+ /**
+ * @see BulkProcessor.Builder#setConcurrentRequests(int)
+ */
+ int getConcurrentRequests() {
+ // in the same thread by default
+ return 0;
+ }
+
+ void beforeStart(BulkIndexer bulkIndexer) {
+ }
+
+ void afterStop(BulkIndexer bulkIndexer) {
+ }
}
- private class BulkResponseActionListener implements ActionListener<BulkResponse> {
- private final BulkRequestBuilder req;
+ static class LargeSizeHandler extends SizeHandler {
+
+ private final Map<String, Object> initialSettings = new HashMap<>();
+ private final Runtime2 runtime2;
+ private ProgressLogger progress;
- BulkResponseActionListener(BulkRequestBuilder req) {
- this.req = req;
+ LargeSizeHandler(Runtime2 runtime2) {
+ this.runtime2 = runtime2;
}
@Override
- public void onResponse(BulkResponse response) {
- semaphore.release();
- counter.addAndGet(response.getItems().length);
+ int getConcurrentRequests() {
+ // see SONAR-8075
+ int cores = runtime2.getCores();
+ // FIXME do not use DEFAULT_NUMBER_OF_SHARDS
+ return Math.max(1, cores / DEFAULT_NUMBER_OF_SHARDS) - 1;
+ }
- for (BulkItemResponse item : response.getItems()) {
- if (item.isFailed()) {
- LOGGER.error("index [{}], type [{}], id [{}], message [{}]", item.getIndex(), item.getType(), item.getId(), item.getFailureMessage());
- }
+ @Override
+ void beforeStart(BulkIndexer bulkIndexer) {
+ this.progress = new ProgressLogger(format("Progress[BulkIndexer[%s]]", bulkIndexer.indexName), bulkIndexer.counter, LOGGER)
+ .setPluralLabel("requests");
+ this.progress.start();
+ Map<String, Object> temporarySettings = new HashMap<>();
+ GetSettingsResponse settingsResp = bulkIndexer.client.nativeClient().admin().indices().prepareGetSettings(bulkIndexer.indexName).get();
+
+ // deactivate replicas
+ int initialReplicas = Integer.parseInt(settingsResp.getSetting(bulkIndexer.indexName, IndexMetaData.SETTING_NUMBER_OF_REPLICAS));
+ if (initialReplicas > 0) {
+ initialSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, initialReplicas);
+ temporarySettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0);
}
+
+ // deactivate periodical refresh
+ String refreshInterval = settingsResp.getSetting(bulkIndexer.indexName, REFRESH_INTERVAL_SETTING);
+ initialSettings.put(REFRESH_INTERVAL_SETTING, refreshInterval);
+ temporarySettings.put(REFRESH_INTERVAL_SETTING, "-1");
+
+ updateSettings(bulkIndexer, temporarySettings);
}
@Override
- public void onFailure(Throwable e) {
- semaphore.release();
- LOGGER.error("Fail to execute bulk index request: " + req, e);
+ void afterStop(BulkIndexer bulkIndexer) {
+ // optimize lucene segments and revert index settings
+ // Optimization must be done before re-applying replicas:
+ // http://www.elasticsearch.org/blog/performance-considerations-elasticsearch-indexing/
+ bulkIndexer.client.prepareForceMerge(bulkIndexer.indexName).get();
+
+ updateSettings(bulkIndexer, initialSettings);
+ this.progress.stop();
+ }
+
+ private static void updateSettings(BulkIndexer bulkIndexer, Map<String, Object> settings) {
+ UpdateSettingsRequestBuilder req = bulkIndexer.client.nativeClient().admin().indices().prepareUpdateSettings(bulkIndexer.indexName);
+ req.setSettings(settings);
+ req.get();
}
}
+
}