aboutsummaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorSimon Brandhof <simon.brandhof@sonarsource.com>2014-12-04 17:38:47 +0100
committerSimon Brandhof <simon.brandhof@sonarsource.com>2014-12-04 23:35:08 +0100
commitfb610e795c04d2afa465b96106488af97f0143f7 (patch)
treec21f1d14dbf5e0b749b4ab55c62e49ed0c53adcd /server
parente3c2aab3d26f7a68fb623524b536abcc3f892b98 (diff)
downloadsonarqube-fb610e795c04d2afa465b96106488af97f0143f7.tar.gz
sonarqube-fb610e795c04d2afa465b96106488af97f0143f7.zip
Retry bulk index request on failures
Diffstat (limited to 'server')
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java41
1 files changed, 37 insertions, 4 deletions
diff --git a/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java b/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java
index 32b76ccd4e4..350b74fac71 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java
@@ -20,11 +20,14 @@
package org.sonar.server.es;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
+import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -32,6 +35,7 @@ import org.picocontainer.Startable;
import org.slf4j.LoggerFactory;
import org.sonar.server.util.ProgressLogger;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -44,7 +48,7 @@ import java.util.concurrent.atomic.AtomicLong;
* </ul>
*/
public class BulkIndexer implements Startable {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(BulkIndexer.class);
private static final long FLUSH_BYTE_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytes();
private static final String REFRESH_INTERVAL_SETTING = "index.refresh_interval";
@@ -162,9 +166,38 @@ public class BulkIndexer implements Startable {
}
private void executeBulk(BulkRequestBuilder bulkRequest) {
- bulkRequest.get();
+ List<ActionRequest> retries = Lists.newArrayList();
+ BulkResponse response = bulkRequest.get();
+
+ for (BulkItemResponse item : response.getItems()) {
+ if (item.isFailed()) {
+ ActionRequest retry = bulkRequest.request().requests().get(item.getItemId());
+ retries.add(retry);
+ }
+ }
- // TODO check failures
- // WARNING - complexity of response#hasFailures() and #buildFailureMessages() is O(n)
+ if (!retries.isEmpty()) {
+ LOGGER.warn(String.format("%d index requests failed. Trying again.", retries.size()));
+ BulkRequestBuilder retryBulk = client.prepareBulk();
+ for (ActionRequest retry : retries) {
+ retryBulk.request().add(retry);
+ }
+ BulkResponse retryBulkResponse = retryBulk.get();
+ if (retryBulkResponse.hasFailures()) {
+ LOGGER.error("New attempt to index documents failed");
+ for (int index = 0; index < retryBulkResponse.getItems().length; index++) {
+ BulkItemResponse item = retryBulkResponse.getItems()[index];
+ if (item.isFailed()) {
+ StringBuilder sb = new StringBuilder();
+ String msg = sb.append("\n[").append(index)
+ .append("]: index [").append(item.getIndex()).append("], type [").append(item.getType()).append("], id [").append(item.getId())
+ .append("], message [").append(item.getFailureMessage()).append("]").toString();
+ LOGGER.error(msg);
+ }
+ }
+ } else {
+ LOGGER.info("New index attempt succeeded");
+ }
+ }
}
}