diff options
-rw-r--r-- | server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java | 41 |
1 files changed, 37 insertions, 4 deletions
diff --git a/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java b/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java index 32b76ccd4e4..350b74fac71 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java +++ b/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java @@ -20,11 +20,14 @@ package org.sonar.server.es; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -32,6 +35,7 @@ import org.picocontainer.Startable; import org.slf4j.LoggerFactory; import org.sonar.server.util.ProgressLogger; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -44,7 +48,7 @@ import java.util.concurrent.atomic.AtomicLong; * </ul> */ public class BulkIndexer implements Startable { - + private static final Logger LOGGER = LoggerFactory.getLogger(BulkIndexer.class); private static final long FLUSH_BYTE_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytes(); private static final String REFRESH_INTERVAL_SETTING = "index.refresh_interval"; @@ -162,9 +166,38 @@ public class BulkIndexer implements Startable { } private void executeBulk(BulkRequestBuilder bulkRequest) { - bulkRequest.get(); + List<ActionRequest> retries = Lists.newArrayList(); + BulkResponse response = bulkRequest.get(); + + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + ActionRequest retry = bulkRequest.request().requests().get(item.getItemId()); + retries.add(retry); + } + } - // TODO check failures - // WARNING - complexity of response#hasFailures() and #buildFailureMessages() is O(n) + if (!retries.isEmpty()) { + LOGGER.warn(String.format("%d index requests failed. Trying again.", retries.size())); + BulkRequestBuilder retryBulk = client.prepareBulk(); + for (ActionRequest retry : retries) { + retryBulk.request().add(retry); + } + BulkResponse retryBulkResponse = retryBulk.get(); + if (retryBulkResponse.hasFailures()) { + LOGGER.error("New attempt to index documents failed"); + for (int index = 0; index < retryBulkResponse.getItems().length; index++) { + BulkItemResponse item = retryBulkResponse.getItems()[index]; + if (item.isFailed()) { + StringBuilder sb = new StringBuilder(); + String msg = sb.append("\n[").append(index) + .append("]: index [").append(item.getIndex()).append("], type [").append(item.getType()).append("], id [").append(item.getId()) + .append("], message [").append(item.getFailureMessage()).append("]").toString(); + LOGGER.error(msg); + } + } + } else { + LOGGER.info("New index attempt succeeded"); + } + } } } |