]> source.dussan.org Git - sonarqube.git/commitdiff
indexQueue can explicitly avoid inline refresh.
authorStephane Gamard <stephane.gamard@searchbox.com>
Thu, 21 Aug 2014 13:49:02 +0000 (15:49 +0200)
committerStephane Gamard <stephane.gamard@searchbox.com>
Thu, 21 Aug 2014 13:49:02 +0000 (15:49 +0200)
fork.sh
server/sonar-search/src/main/resources/org/sonar/search/logback.xml
server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDto.java

diff --git a/fork.sh b/fork.sh
index fbbf69331f408483d1b38b5de1b9dad7d5cb1a4e..1b9048266972a5eddbb1b961587fdd95c1544053 100755 (executable)
--- a/fork.sh
+++ b/fork.sh
@@ -1,6 +1,6 @@
 #!/bin/sh
 
-mvn clean install -DskipTests -pl :sonar-core,sonar-search -amd
+mvn clean install -DskipTests -pl :sonar-server,:sonar-search -amd
 
 if [[ "$OSTYPE" == "darwin"* ]]; then
   OS='macosx-universal-64'
index 648ece82e289f684f1dc7f386f0d8a4c68fe94c6..5f714eec3e7b06fb9fb806ca7c284728b9391b9e 100644 (file)
@@ -28,7 +28,7 @@
   </appender>
 
   <root>
-    <level value="INFO"/>
+    <level value="DEBUG"/>
     <appender-ref ref="LOGFILE"/>
   </root>
 
index 956986259343e3cf381ccd5b051cfe193de421b9..170d446df1b98c3283657522ffd2ec215e0dcfc6 100644 (file)
@@ -280,7 +280,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
   @Override
   public final void synchronizeAfter(final DbSession session, Date date) {
     for (E dto : this.findAfterDate(session, date)) {
-      session.enqueue(new UpsertDto<E>(getIndexType(), dto));
+      session.enqueue(new UpsertDto<E>(getIndexType(), dto, false));
     }
     session.commit();
   }
index 3a347255d7d93eb976804b90b428008c2fa98df3..a27b0eafa9446ec0d363b0afcc34511a2aacc4c4 100644 (file)
@@ -22,8 +22,6 @@ package org.sonar.server.rule.index;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.support.replication.ReplicationType;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.sonar.api.rule.RuleKey;
 import org.sonar.api.rule.RuleStatus;
@@ -281,8 +279,6 @@ public class RuleNormalizer extends BaseNormalizer<RuleDto, RuleKey> {
 
       /** Creating updateRequest */
       requests.add(new UpdateRequest()
-        .replicationType(ReplicationType.ASYNC)
-        .consistencyLevel(WriteConsistencyLevel.QUORUM)
         .id(rule.getKey().toString())
         .doc(update)
         .upsert(upsert));
index 564c63810e04d4dc073197f51fd7ea67cd59851b..8061f04ab641a410f5a203d89260b148f4f8f016 100644 (file)
@@ -34,8 +34,10 @@ import org.sonar.core.profiling.Profiling;
 import org.sonar.server.search.action.IndexActionRequest;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -69,32 +71,56 @@ public class IndexQueue extends LinkedBlockingQueue<Runnable>
     }
     try {
 
+      long normTime = System.currentTimeMillis();
       BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
       Map<String, Index> indexes = getIndexMap();
+      Set<String> indices = new HashSet<String>();
       for (IndexActionRequest action : actions) {
-        action.setIndex(indexes.get(action.getIndexType()));
+        Index index = indexes.get(action.getIndexType());
+        action.setIndex(index);
+        indices.add(index.getIndexName());
       }
 
-      ExecutorService executorService = Executors.newFixedThreadPool(4);
+      ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+      // Do we need to refresh
+      boolean requiresRefresh = false;
+      for (IndexActionRequest action : actions) {
+        if (action.needsRefresh()) {
+          requiresRefresh = true;
+          break;
+        }
+      }
 
       //invokeAll() blocks until ALL tasks submitted to executor complete
       for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
         for (ActionRequest update : updateRequests.get()) {
           if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
-            bulkRequestBuilder.add((UpdateRequest) update);
+            bulkRequestBuilder.add(((UpdateRequest) update).refresh(false));
           } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
-            bulkRequestBuilder.add((DeleteRequest) update);
+            bulkRequestBuilder.add(((DeleteRequest) update).refresh(false));
           } else {
             throw new IllegalStateException("Un-managed request type: " + update.getClass());
           }
         }
       }
       executorService.shutdown();
-
-      LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions());
+      normTime = System.currentTimeMillis() - normTime;
 
       //execute the request
-      BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true));
+      long indexTime = System.currentTimeMillis();
+      BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
+      indexTime = System.currentTimeMillis() - indexTime;
+
+      long refreshTime = System.currentTimeMillis();
+      if (requiresRefresh) {
+        searchClient.admin().indices().prepareRefresh(indices.toArray(new String[indices.size()])).setForce(false).get();
+      }
+      refreshTime = System.currentTimeMillis() - refreshTime;
+
+      LOGGER.info("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+        bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+
     } catch (Exception e) {
       e.printStackTrace();
     }
index 889cf7a130c09e0e50e1193c3a8ae1f55dc8e766..2d36fa0d5dc68459659c7b2c0570665bd0de134e 100644 (file)
@@ -31,11 +31,17 @@ import java.util.List;
 public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
 
   protected final String indexType;
+  private final boolean requiresRefresh;
   private Index index;
 
-  public IndexActionRequest(String indexType) {
+  protected IndexActionRequest(String indexType) {
+    this(indexType, true);
+  }
+
+  protected IndexActionRequest(String indexType, boolean requiresRefresh) {
     super();
     this.indexType = indexType;
+    this.requiresRefresh = requiresRefresh;
   }
 
   public abstract String getKey();
@@ -61,7 +67,8 @@ public abstract class IndexActionRequest implements ClusterAction<List<ActionReq
       if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
         ((UpdateRequest) request)
           .type(index.getIndexType())
-          .index(index.getIndexName());
+          .index(index.getIndexName())
+          .refresh(false);
       }
       finalRequests.add(request);
     }
@@ -69,4 +76,8 @@ public abstract class IndexActionRequest implements ClusterAction<List<ActionReq
   }
 
   public abstract List<ActionRequest> doCall(Index index) throws Exception;
+
+  public boolean needsRefresh() {
+    return this.requiresRefresh;
+  }
 }
index 1e31a35d788784bf7686bae19a2332fc9c82d31d..5ad7af6c3f53598c9a49aa4a1297c9a184f8664a 100644 (file)
@@ -30,7 +30,11 @@ public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
   private final DTO dto;
 
   public UpsertDto(String indexType, DTO dto) {
-    super(indexType);
+    this(indexType, dto, true);
+  }
+
+  public UpsertDto(String indexType, DTO dto, boolean requiresRefresh) {
+    super(indexType, requiresRefresh);
     this.dto = dto;
   }