From 895b4895152499b74c6e6daa4f2499bd4abb1756 Mon Sep 17 00:00:00 2001 From: Simon Brandhof Date: Thu, 12 Apr 2018 15:56:52 +0200 Subject: [PATCH] Missing Elasticsearch bulk requests from TRACE logs --- .../java/org/sonar/server/es/BulkIndexer.java | 99 ++++++++++++++++++- .../org/sonar/server/es/BulkIndexerTest.java | 22 +++++ 2 files changed, 118 insertions(+), 3 deletions(-) diff --git a/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java b/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java index 5bfd70b3f9f..ab2d1f790a9 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java +++ b/server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java @@ -20,10 +20,13 @@ package org.sonar.server.es; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.LinkedHashMultiset; +import com.google.common.collect.Multiset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.elasticsearch.action.DocWriteRequest; @@ -40,6 +43,7 @@ import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -49,6 +53,7 @@ import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.sort.SortOrder; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; +import org.sonar.api.utils.log.Profiler; import org.sonar.core.util.ProgressLogger; import static java.lang.String.format; @@ -193,13 +198,16 @@ public class BulkIndexer { } private final class BulkProcessorListener implements Listener { + private final Profiler profiler = Profiler.createIfTrace(EsClient.LOGGER); + @Override public void beforeBulk(long executionId, BulkRequest request) { - // no action required + profiler.start(); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + stopProfiler(request); List successDocIds = new ArrayList<>(); for (BulkItemResponse item : response.getItems()) { if (item.isFailed()) { @@ -213,8 +221,93 @@ public class BulkIndexer { } @Override - public void afterBulk(long executionId, BulkRequest req, Throwable e) { - LOGGER.error("Fail to execute bulk index request: " + req, e); + public void afterBulk(long executionId, BulkRequest request, Throwable e) { + LOGGER.error("Fail to execute bulk index request: " + request, e); + stopProfiler(request); + } + + private void stopProfiler(BulkRequest request) { + if (profiler.isTraceEnabled()) { + profiler.stopTrace(toString(request)); + } + } + + private String toString(BulkRequest bulkRequest) { + StringBuilder message = new StringBuilder(); + message.append("Bulk["); + Multiset groupedRequests = LinkedHashMultiset.create(); + for (int i = 0; i < bulkRequest.requests().size(); i++) { + DocWriteRequest item = bulkRequest.requests().get(i); + String requestType; + if (item instanceof IndexRequest) { + requestType = "index"; + } else if (item instanceof UpdateRequest) { + requestType = "update"; + } else if (item instanceof DeleteRequest) { + requestType = "delete"; + } else { + // Cannot happen, not allowed by BulkRequest's contract + throw new IllegalStateException("Unsupported bulk request type: " + item.getClass()); + } + groupedRequests.add(new BulkRequestKey(requestType, item.index(), item.type())); + } + + Set> entrySet = groupedRequests.entrySet(); + int size = entrySet.size(); + int current = 0; + for (Multiset.Entry requestEntry : entrySet) { + message.append(requestEntry.getCount()).append(" ").append(requestEntry.getElement().toString()); + current++; + if (current < size) { + message.append(", "); + } + } + + message.append("]"); + return message.toString(); + } + } + + private static class BulkRequestKey { + private String requestType; + private String index; + private String docType; + + private BulkRequestKey(String requestType, String index, String docType) { + this.requestType = requestType; + this.index = index; + this.docType = docType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BulkRequestKey that = (BulkRequestKey) o; + if (!docType.equals(that.docType)) { + return false; + } + if (!index.equals(that.index)) { + return false; + } + return requestType.equals(that.requestType); + } + + @Override + public int hashCode() { + int result = requestType.hashCode(); + result = 31 * result + index.hashCode(); + result = 31 * result + docType.hashCode(); + return result; + } + + @Override + public String toString() { + return String.format("%s requests on %s/%s", requestType, index, docType); } } diff --git a/server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerTest.java b/server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerTest.java index 4ab149e9224..c7ad44e0dd3 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerTest.java @@ -30,6 +30,8 @@ import org.elasticsearch.index.query.QueryBuilders; import org.junit.Rule; import org.junit.Test; import org.sonar.api.utils.internal.TestSystem2; +import org.sonar.api.utils.log.LogTester; +import org.sonar.api.utils.log.LoggerLevel; import org.sonar.db.DbTester; import org.sonar.server.es.BulkIndexer.Size; @@ -46,6 +48,8 @@ public class BulkIndexerTest { public EsTester es = EsTester.createCustom(new FakeIndexDefinition().setReplicas(1)); @Rule public DbTester dbTester = DbTester.create(testSystem2); + @Rule + public LogTester logTester = new LogTester(); @Test public void index_nothing() { @@ -154,6 +158,24 @@ public class BulkIndexerTest { assertThat(listener.calledResult.getTotal()).isEqualTo(2); } + @Test + public void log_requests_when_TRACE_level_is_enabled() { + logTester.setLevel(LoggerLevel.TRACE); + + BulkIndexer indexer = new BulkIndexer(es.client(), INDEX_TYPE_FAKE, Size.REGULAR, new FakeListener()); + indexer.start(); + indexer.add(newIndexRequestWithDocId("foo")); + indexer.addDeletion(INDEX_TYPE_FAKE, "foo"); + indexer.add(newIndexRequestWithDocId("bar")); + indexer.stop(); + + assertThat(logTester.logs(LoggerLevel.TRACE) + .stream() + .filter(log -> log.contains("Bulk[2 index requests on fakes/fake, 1 delete requests on fakes/fake]")) + .count()).isNotZero(); + + } + private static class FakeListener implements IndexingListener { private final List calledDocIds = new ArrayList<>(); private IndexingResult calledResult; -- 2.39.5