]> source.dussan.org Git - sonarqube.git/commitdiff
Missing Elasticsearch bulk requests from TRACE logs
authorSimon Brandhof <simon.brandhof@sonarsource.com>
Thu, 12 Apr 2018 13:56:52 +0000 (15:56 +0200)
committerSonarTech <sonartech@sonarsource.com>
Tue, 8 May 2018 18:20:44 +0000 (20:20 +0200)
server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java
server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerTest.java

index 5bfd70b3f9f5c9c79fac67686be3a928215814ee..ab2d1f790a940511562b3563eb83fc7f06334c0e 100644 (file)
 package org.sonar.server.es;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.LinkedHashMultiset;
+import com.google.common.collect.Multiset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.elasticsearch.action.DocWriteRequest;
@@ -40,6 +43,7 @@ import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -49,6 +53,7 @@ import org.elasticsearch.search.SearchHitField;
 import org.elasticsearch.search.sort.SortOrder;
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
+import org.sonar.api.utils.log.Profiler;
 import org.sonar.core.util.ProgressLogger;
 
 import static java.lang.String.format;
@@ -193,13 +198,16 @@ public class BulkIndexer {
   }
 
   private final class BulkProcessorListener implements Listener {
+    private final Profiler profiler = Profiler.createIfTrace(EsClient.LOGGER);
+
     @Override
     public void beforeBulk(long executionId, BulkRequest request) {
-      // no action required
+      profiler.start();
     }
 
     @Override
     public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+      stopProfiler(request);
       List<DocId> successDocIds = new ArrayList<>();
       for (BulkItemResponse item : response.getItems()) {
         if (item.isFailed()) {
@@ -213,8 +221,93 @@ public class BulkIndexer {
     }
 
     @Override
-    public void afterBulk(long executionId, BulkRequest req, Throwable e) {
-      LOGGER.error("Fail to execute bulk index request: " + req, e);
+    public void afterBulk(long executionId, BulkRequest request, Throwable e) {
+      LOGGER.error("Fail to execute bulk index request: " + request, e);
+      stopProfiler(request);
+    }
+
+    private void stopProfiler(BulkRequest request) {
+      if (profiler.isTraceEnabled()) {
+        profiler.stopTrace(toString(request));
+      }
+    }
+
+    private String toString(BulkRequest bulkRequest) {
+      StringBuilder message = new StringBuilder();
+      message.append("Bulk[");
+      Multiset<BulkRequestKey> groupedRequests = LinkedHashMultiset.create();
+      for (int i = 0; i < bulkRequest.requests().size(); i++) {
+        DocWriteRequest item = bulkRequest.requests().get(i);
+        String requestType;
+        if (item instanceof IndexRequest) {
+          requestType = "index";
+        } else if (item instanceof UpdateRequest) {
+          requestType = "update";
+        } else if (item instanceof DeleteRequest) {
+          requestType = "delete";
+        } else {
+          // Cannot happen, not allowed by BulkRequest's contract
+          throw new IllegalStateException("Unsupported bulk request type: " + item.getClass());
+        }
+        groupedRequests.add(new BulkRequestKey(requestType, item.index(), item.type()));
+      }
+
+      Set<Multiset.Entry<BulkRequestKey>> entrySet = groupedRequests.entrySet();
+      int size = entrySet.size();
+      int current = 0;
+      for (Multiset.Entry<BulkRequestKey> requestEntry : entrySet) {
+        message.append(requestEntry.getCount()).append(" ").append(requestEntry.getElement().toString());
+        current++;
+        if (current < size) {
+          message.append(", ");
+        }
+      }
+
+      message.append("]");
+      return message.toString();
+    }
+  }
+
+  private static class BulkRequestKey {
+    private String requestType;
+    private String index;
+    private String docType;
+
+    private BulkRequestKey(String requestType, String index, String docType) {
+      this.requestType = requestType;
+      this.index = index;
+      this.docType = docType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      BulkRequestKey that = (BulkRequestKey) o;
+      if (!docType.equals(that.docType)) {
+        return false;
+      }
+      if (!index.equals(that.index)) {
+        return false;
+      }
+      return requestType.equals(that.requestType);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = requestType.hashCode();
+      result = 31 * result + index.hashCode();
+      result = 31 * result + docType.hashCode();
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s requests on %s/%s", requestType, index, docType);
     }
   }
 
index 4ab149e922424485d00ac1d3ebd5a8f97d4b9672..c7ad44e0dd3eaf6295954c065628cfd353109e91 100644 (file)
@@ -30,6 +30,8 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.junit.Rule;
 import org.junit.Test;
 import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
 import org.sonar.db.DbTester;
 import org.sonar.server.es.BulkIndexer.Size;
 
@@ -46,6 +48,8 @@ public class BulkIndexerTest {
   public EsTester es = EsTester.createCustom(new FakeIndexDefinition().setReplicas(1));
   @Rule
   public DbTester dbTester = DbTester.create(testSystem2);
+  @Rule
+  public LogTester logTester = new LogTester();
 
   @Test
   public void index_nothing() {
@@ -154,6 +158,24 @@ public class BulkIndexerTest {
     assertThat(listener.calledResult.getTotal()).isEqualTo(2);
   }
 
+  @Test
+  public void log_requests_when_TRACE_level_is_enabled() {
+    logTester.setLevel(LoggerLevel.TRACE);
+
+    BulkIndexer indexer = new BulkIndexer(es.client(), INDEX_TYPE_FAKE, Size.REGULAR, new FakeListener());
+    indexer.start();
+    indexer.add(newIndexRequestWithDocId("foo"));
+    indexer.addDeletion(INDEX_TYPE_FAKE, "foo");
+    indexer.add(newIndexRequestWithDocId("bar"));
+    indexer.stop();
+
+    assertThat(logTester.logs(LoggerLevel.TRACE)
+      .stream()
+      .filter(log -> log.contains("Bulk[2 index requests on fakes/fake, 1 delete requests on fakes/fake]"))
+      .count()).isNotZero();
+
+  }
+
   private static class FakeListener implements IndexingListener {
     private final List<DocId> calledDocIds = new ArrayList<>();
     private IndexingResult calledResult;