]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-5531 - Optimized refresh while mass indexing
authorStephane Gamard <stephane.gamard@sonarsource.com>
Tue, 23 Sep 2014 08:43:05 +0000 (10:43 +0200)
committerStephane Gamard <stephane.gamard@sonarsource.com>
Tue, 23 Sep 2014 08:43:05 +0000 (10:43 +0200)
18 files changed:
server/sonar-search/src/main/java/org/sonar/search/SearchServer.java
server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
server/sonar-server/src/main/java/org/sonar/server/search/Index.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDto.java
server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java
server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/search/action/RefreshIndex.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDto.java
server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java
server/sonar-server/src/test/java/org/sonar/server/activity/ActivityBackendMediumTest.java
server/sonar-server/src/test/java/org/sonar/server/db/BaseDaoTest.java
server/sonar-server/src/test/java/org/sonar/server/issue/db/IssueAuthorizationDaoTest.java
server/sonar-server/src/test/java/org/sonar/server/tester/BackendCleanup.java
sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java

index 1433bd71b6e709f86f50656c6eaa466bc2f9c407..1c2ffdc57e735fbca2d0f488b736262adc871c17 100644 (file)
@@ -88,9 +88,13 @@ public class SearchServer implements Monitored {
       .put("discovery.zen.ping.multicast.enabled", "false")
 
       // Index storage policies
+      .put("index.refresh_interval", "30")
       .put("index.number_of_shards", "1")
       .put("index.number_of_replicas", MINIMUM_INDEX_REPLICATION)
       .put("index.store.type", "mmapfs")
+      .put("indices.store.throttle.type", "none")
+      .put("index.merge.scheduler.max_thread_count",
+        Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2)))
 
       // Install our own listUpdate scripts
       .put("script.default_lang", "native")
index a87021146bb99efc9a7e4b499cbdb491a6d90b7f..6dcffcc43b9f87f1251c500dd5b378321e601876 100644 (file)
@@ -34,15 +34,20 @@ import org.sonar.server.search.DbSynchronizationHandler;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.action.DeleteKey;
 import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.RefreshIndex;
 import org.sonar.server.search.action.UpsertDto;
 import org.sonar.server.search.action.UpsertNestedItem;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.sql.Timestamp;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
 
 import static com.google.common.collect.Maps.newHashMap;
 
@@ -323,7 +328,7 @@ public abstract class BaseDao<MAPPER, DTO extends Dto<KEY>, KEY extends Serializ
       @Override
       public void handleResult(ResultContext resultContext) {
         DTO dto = (DTO) resultContext.getResultObject();
-        session.enqueue(new UpsertDto<DTO>(getIndexType(), dto, true));
+        session.enqueue(new UpsertDto<DTO>(getIndexType(), dto, false));
         count++;
         if (count % 100000 == 0) {
           LOGGER.info(" - synchronized {} {}", count, getIndexType());
@@ -358,6 +363,7 @@ public abstract class BaseDao<MAPPER, DTO extends Dto<KEY>, KEY extends Serializ
       DbSynchronizationHandler handler = getSynchronizationResultHandler(session);
       session.select(getSynchronizeStatementFQN(), getSynchronizationParams(date, params), handler);
       handler.enqueueCollected();
+      session.enqueue(new RefreshIndex(this.getIndexType()));
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
index 6faafd538df6eda512dfbde9fffddb3b130bd51f..65c73da1ffa240e2289d0418d61a3dc3b99c0beb 100644 (file)
@@ -359,16 +359,6 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
     return mapping;
   }
 
-  @Override
-  public void refresh() {
-    client.execute(client
-      .admin()
-      .indices()
-      .prepareRefresh(this.getIndexName())
-      .setForce(false)
-      .setIndices(this.getIndexName()));
-  }
-
   /* Base CRUD methods */
 
   protected abstract DOMAIN toDoc(Map<String, Object> fields);
index 13a7a935c4e78ceebe04b1d6218abdfed42b4bdb..13acb6a633292bb2885e77337534bc2ebe7710bf 100644 (file)
@@ -38,8 +38,6 @@ public interface Index<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable> e
 
   String getIndexName();
 
-  void refresh();
-
   Date getLastSynchronization();
 
   IndexStat getIndexStat();
index de7636d8596774ab18ec12311927f838e303fa61..79a59ef8502d79d40f5c1ce49140edc8bac1e671 100644 (file)
@@ -20,6 +20,7 @@
 package org.sonar.server.search;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -33,7 +34,7 @@ import org.sonar.api.config.Settings;
 import org.sonar.api.platform.ComponentContainer;
 import org.sonar.core.cluster.WorkQueue;
 import org.sonar.core.profiling.Profiling;
-import org.sonar.server.search.action.IndexActionRequest;
+import org.sonar.server.search.action.IndexAction;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -43,8 +44,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest> {
+public class IndexQueue implements ServerComponent, WorkQueue<IndexAction<?>> {
 
   protected final Profiling profiling;
 
@@ -62,47 +64,54 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
   }
 
   @Override
-  public void enqueue(List<IndexActionRequest> actions) {
+  public void enqueue(List<IndexAction<?>> actions) {
 
     if (actions.isEmpty()) {
       return;
     }
     try {
 
+      boolean refreshRequired = false;
+
       Map<String, Index> indexes = getIndexMap();
       Set<String> indices = new HashSet<String>();
-      for (IndexActionRequest action : actions) {
+      for (IndexAction action : actions) {
         Index index = indexes.get(action.getIndexType());
         action.setIndex(index);
         if (action.needsRefresh()) {
+          refreshRequired = true;
           indices.add(index.getIndexName());
         }
       }
 
       BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
 
-      long normTime = executeNormalization(bulkRequestBuilder, actions);
+      long normTime = processActionsIntoQueries(bulkRequestBuilder, actions);
 
-      //execute the request
-      long indexTime = System.currentTimeMillis();
-      BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
-      indexTime = System.currentTimeMillis() - indexTime;
+      if (bulkRequestBuilder.numberOfActions() > 0) {
+        // execute the request
+        long indexTime = System.currentTimeMillis();
+        BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
 
-      long refreshTime = this.refreshRequiredIndex(indices);
+        indexTime = System.currentTimeMillis() - indexTime;
 
-      LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
-        bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+        long refreshTime = 0;
+        if (refreshRequired) {
+          refreshTime = this.refreshRequiredIndex(indices);
+        }
 
-      if (response.hasFailures()) {
-        throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
-      }
+        LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+          bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
 
+        if (response.hasFailures()) {
+          throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
+        }
+      }
     } catch (Exception e) {
       LOGGER.error("Could not commit to ElasticSearch", e);
     }
   }
 
-
   private long refreshRequiredIndex(Set<String> indices) {
 
     long refreshTime = System.currentTimeMillis();
@@ -120,23 +129,34 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
     return System.currentTimeMillis() - refreshTime;
   }
 
-  private long executeNormalization(BulkRequestBuilder bulkRequestBuilder, List<IndexActionRequest> actions) {
+  private long processActionsIntoQueries(BulkRequestBuilder bulkRequestBuilder, List<IndexAction<?>> actions) {
     long normTime = System.currentTimeMillis();
     try {
+      boolean hasInlineRefreshRequest = false;
       ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
-      //invokeAll() blocks until ALL tasks submitted to executor complete
-      for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
-        for (ActionRequest update : updateRequests.get()) {
+      // invokeAll() blocks until ALL tasks submitted to executor complete
+      List<Future<List<? extends ActionRequest>>> requests = executorService.invokeAll(actions, 20, TimeUnit.SECONDS);
+      for (Future<List<? extends ActionRequest>> updates : requests) {
+        for (ActionRequest update : updates.get()) {
           if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
-            bulkRequestBuilder.add(((UpdateRequest) update).refresh(false));
-          } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
-            bulkRequestBuilder.add(((DeleteRequest) update).refresh(false));
-          } else {
+            bulkRequestBuilder.add(((UpdateRequest) update));
+          }
+
+          else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
+            bulkRequestBuilder.add(((DeleteRequest) update));
+          }
+
+          else if (RefreshRequest.class.isAssignableFrom(update.getClass())) {
+            hasInlineRefreshRequest = true;
+          }
+
+          else {
             throw new IllegalStateException("Un-managed request type: " + update.getClass());
           }
         }
       }
       executorService.shutdown();
+      bulkRequestBuilder.setRefresh(hasInlineRefreshRequest);
     } catch (Exception e) {
       throw new IllegalStateException("Could not execute normalization for stack", e);
     }
index 73941663882aba68c46431bd7a32464a359df062..fe66d94b521c7fe21a6a709b58f2e51968722a55 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.sonar.server.search.action;
 
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.client.Requests;
 import org.sonar.core.persistence.Dto;
 import org.sonar.server.search.Index;
@@ -27,7 +27,7 @@ import org.sonar.server.search.Index;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteDto<DTO extends Dto> extends IndexActionRequest {
+public class DeleteDto<DTO extends Dto> extends IndexAction<DeleteRequest> {
 
   private final DTO dto;
 
@@ -47,8 +47,8 @@ public class DeleteDto<DTO extends Dto> extends IndexActionRequest {
   }
 
   @Override
-  public List<ActionRequest> doCall(Index index) throws Exception {
-    List<ActionRequest> requests = new ArrayList<ActionRequest>();
+  public List<DeleteRequest> doCall(Index index) throws Exception {
+    List<DeleteRequest> requests = new ArrayList<DeleteRequest>();
     requests.add(Requests.deleteRequest(index.getIndexName())
       .id(dto.getKey().toString())
       .type(indexType));
index 772f34f90418a3be6d68b3686333a63f7a4737ab..1af33653d7fff37ad3eede14310a27d2ada749cf 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.sonar.server.search.action;
 
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.client.Requests;
 import org.sonar.server.search.Index;
 
@@ -27,7 +27,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteKey<K extends Serializable> extends IndexActionRequest {
+public class DeleteKey<K extends Serializable> extends IndexAction<DeleteRequest> {
 
   private final K key;
 
@@ -47,8 +47,8 @@ public class DeleteKey<K extends Serializable> extends IndexActionRequest {
   }
 
   @Override
-  public List<ActionRequest> doCall(Index index) throws Exception {
-    List<ActionRequest> requests = new ArrayList<ActionRequest>();
+  public List<DeleteRequest> doCall(Index index) throws Exception {
+    List<DeleteRequest> requests = new ArrayList<DeleteRequest>();
     requests.add(Requests.deleteRequest(index.getIndexName())
       .id(getKey())
       .type(indexType));
index e85dff4279a4d6c6e6f3042c250fee7a87a9dbaa..ff37eb92259a6c438f696e4ec4d4c68e7b2be64a 100644 (file)
  */
 package org.sonar.server.search.action;
 
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.sonar.server.search.Index;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest {
+public class DeleteNestedItem<K extends Serializable> extends IndexAction<UpdateRequest> {
 
   private final K key;
   private final Object item;
@@ -50,8 +50,8 @@ public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest
   }
 
   @Override
-  public List<ActionRequest> doCall(Index index) throws Exception {
-    List<ActionRequest> updates = new ArrayList<ActionRequest>();
+  public List<UpdateRequest> doCall(Index index) throws Exception {
+    List<UpdateRequest> updates = new ArrayList<UpdateRequest>();
     updates.addAll(deleteItem(index, item, key));
     for (Object otherItem : items) {
       updates.addAll(deleteItem(index, otherItem, key));
@@ -59,7 +59,13 @@ public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest
     return updates;
   }
 
-  private List<ActionRequest> deleteItem(Index index, Object item, K key) {
-    return index.getNormalizer().deleteNested(item, key);
+  private List<UpdateRequest> deleteItem(Index index, Object item, K key) {
+    List<UpdateRequest> updates = index.getNormalizer().deleteNested(item, key);
+    for (UpdateRequest update : updates) {
+      update.index(index.getIndexName())
+        .type(index.getIndexType())
+        .refresh(needsRefresh());
+    }
+    return updates;
   }
 }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java
new file mode 100644 (file)
index 0000000..0204bd0
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public abstract class IndexAction<K extends ActionRequest> implements ClusterAction<List<K>> {
+
+  protected final String indexType;
+  private final boolean requiresRefresh;
+  private Index index;
+
+  protected IndexAction(String indexType) {
+    this(indexType, true);
+  }
+
+  protected IndexAction(String indexType, boolean requiresRefresh) {
+    this.indexType = indexType;
+    this.requiresRefresh = requiresRefresh;
+  }
+
+  public abstract String getKey();
+
+  public abstract Class getPayloadClass();
+
+  public String getIndexType() {
+    return indexType;
+  }
+
+  public IndexAction<K> setIndex(Index index) {
+    this.index = index;
+    return this;
+  }
+
+  @Override
+  public final List<K> call() throws Exception {
+    if (index == null) {
+      throw new IllegalStateException("Cannot execute request on null index");
+    }
+    return doCall(index);
+  }
+
+  public abstract List<K> doCall(Index index) throws Exception;
+
+  public boolean needsRefresh() {
+    return this.requiresRefresh;
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
deleted file mode 100644 (file)
index 8eb540d..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.sonar.core.cluster.ClusterAction;
-import org.sonar.server.search.Index;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
-
-  protected final String indexType;
-  private final boolean requiresRefresh;
-  private Index index;
-
-  protected IndexActionRequest(String indexType) {
-    this(indexType, true);
-  }
-
-  protected IndexActionRequest(String indexType, boolean requiresRefresh) {
-    super();
-    this.indexType = indexType;
-    this.requiresRefresh = requiresRefresh;
-  }
-
-  public abstract String getKey();
-
-  public abstract Class getPayloadClass();
-
-  public String getIndexType() {
-    return indexType;
-  }
-
-
-  public void setIndex(Index index) {
-    this.index = index;
-  }
-
-  @Override
-  public final List<ActionRequest> call() throws Exception {
-    if (index == null) {
-      throw new IllegalStateException("Cannot execute request - Index is null");
-    }
-    List<ActionRequest> finalRequests = new ArrayList<ActionRequest>();
-    for (ActionRequest request : doCall(index)) {
-      if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
-        ((UpdateRequest) request)
-          .type(index.getIndexType())
-          .index(index.getIndexName())
-          .refresh(false);
-      }
-      finalRequests.add(request);
-    }
-    return finalRequests;
-  }
-
-  public abstract List<ActionRequest> doCall(Index index) throws Exception;
-
-  public boolean needsRefresh() {
-    return this.requiresRefresh;
-  }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/RefreshIndex.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/RefreshIndex.java
new file mode 100644 (file)
index 0000000..d4d7a17
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public class RefreshIndex extends IndexAction<RefreshRequest> {
+
+  public RefreshIndex(String indexType) {
+    super(indexType);
+  }
+
+  @Override
+  public String getKey() {
+    return null;
+  }
+
+  @Override
+  public Class getPayloadClass() {
+    return null;
+  }
+
+  @Override
+  public List<RefreshRequest> doCall(Index index) throws Exception {
+    return ImmutableList.<RefreshRequest>of(
+      new RefreshRequest()
+        .force(false)
+        .indices(index.getIndexName()));
+  }
+}
index c2c6b5c0e65edc2cdd2a3ad133d6f2f942e354d9..4b3e7a346195932426344dfd58d1b030a8ccc21e 100644 (file)
  */
 package org.sonar.server.search.action;
 
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.sonar.core.persistence.Dto;
 import org.sonar.server.search.Index;
 
 import java.util.List;
 
-public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
+public class UpsertDto<DTO extends Dto> extends IndexAction<UpdateRequest> {
 
   private final DTO dto;
 
@@ -49,7 +49,13 @@ public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
   }
 
   @Override
-  public List<ActionRequest> doCall(Index index) throws Exception {
-    return index.getNormalizer().normalize(dto);
+  public List<UpdateRequest> doCall(Index index) throws Exception {
+    List<UpdateRequest> updates = index.getNormalizer().normalize(dto);
+    for (UpdateRequest update : updates) {
+      update.index(index.getIndexName())
+        .type(index.getIndexType())
+        .refresh(needsRefresh());
+    }
+    return updates;
   }
 }
index a16ba90bfa66199f7a593e625fb7fb8c7deae7ff..a7bd4b89da6698476902e20b3e75dc7301ad2648 100644 (file)
  */
 package org.sonar.server.search.action;
 
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.sonar.server.search.Index;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest {
-
+public class UpsertNestedItem<K extends Serializable> extends IndexAction<UpdateRequest> {
 
   private final K key;
   private final Object item;
@@ -51,8 +50,8 @@ public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest
   }
 
   @Override
-  public List<ActionRequest> doCall(Index index) throws Exception {
-    List<ActionRequest> updates = new ArrayList<ActionRequest>();
+  public List<UpdateRequest> doCall(Index index) throws Exception {
+    List<UpdateRequest> updates = new ArrayList<UpdateRequest>();
     updates.addAll(normalizeItem(index, item, key));
     for (Object otherItem : items) {
       updates.addAll(normalizeItem(index, otherItem, key));
@@ -60,7 +59,13 @@ public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest
     return updates;
   }
 
-  private List<ActionRequest> normalizeItem(Index index, Object item, K key) {
-    return index.getNormalizer().normalizeNested(item, key);
+  private List<UpdateRequest> normalizeItem(Index index, Object item, K key) {
+    List<UpdateRequest> updates = index.getNormalizer().normalizeNested(item, key);
+    for (UpdateRequest update : updates) {
+      update.index(index.getIndexName())
+        .type(index.getIndexType())
+        .refresh(needsRefresh());
+    }
+    return updates;
   }
 }
index e20aea655835b047f118c54b54e06f095adc515b..2d1c4725ec985735478ffc4c67d3328741151f42 100644 (file)
@@ -115,9 +115,12 @@ public class ActivityBackendMediumTest {
   @Test
   public void massive_insert() {
 
+    // Set qeue's implicit commit size to 10
+    dbSession.setImplicitCommitSize(10);
+
     // 0 Assert no logs in DB
     assertThat(dao.findAll(dbSession)).hasSize(0);
-    int max = 400;
+    int max = 35;
     final String testValue = "hello world";
     for (int i = 0; i < max; i++) {
 
@@ -156,9 +159,12 @@ public class ActivityBackendMediumTest {
   @Test
   public void massive_log_insert() {
 
+    // Set qeue's implicit commit size to 10
+    dbSession.setImplicitCommitSize(10);
+
     // 0 Assert no logs in DB
     assertThat(dao.findAll(dbSession)).hasSize(0);
-    int max = 400;
+    int max = 40;
     final String testValue = "hello world";
     for (int i = 0; i < max; i++) {
       TestActivityLog log = new TestActivityLog(testValue + "_" + i, Activity.Type.QPROFILE.toString());
index 5ee795cb6e60e649d75a9d61451d09d843dc3ab7..c4b354193b7f0c208b532f643a1475ebfe936018 100644 (file)
@@ -117,6 +117,7 @@ public class BaseDaoTest {
     assertThat(session.getActionCount()).isEqualTo(1);
 
     dao.synchronizeAfter(session, new Date(t0));
-    assertThat(session.getActionCount()).isEqualTo(2);
+    // Synchronize adds an implicit action to the queue before finishing.
+    assertThat(session.getActionCount()).isEqualTo(3);
   }
 }
index 0b8a8c988c4ad5f654475e0f6a150859c4e02da3..aedf757c64f53cc4e03b2e49f45c8535667850c4 100644 (file)
@@ -61,8 +61,8 @@ public class IssueAuthorizationDaoTest extends AbstractDaoTestCase {
     assertThat(session.getActionCount()).isEqualTo(0);
 
     dao.synchronizeAfter(session, new Date(0));
-
-    assertThat(session.getActionCount()).isEqualTo(1);
+    // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+    assertThat(session.getActionCount()).isEqualTo(2);
   }
 
   @Test
@@ -70,7 +70,8 @@ public class IssueAuthorizationDaoTest extends AbstractDaoTestCase {
     setupData("synchronize_after_since_given_date");
 
     dao.synchronizeAfter(session, DateUtils.parseDate("2014-09-01"));
-    assertThat(session.getActionCount()).isEqualTo(1);
+    // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+    assertThat(session.getActionCount()).isEqualTo(2);
   }
 
   @Test
@@ -78,7 +79,8 @@ public class IssueAuthorizationDaoTest extends AbstractDaoTestCase {
     setupData("synchronize_after_with_project");
 
     dao.synchronizeAfter(session, DateUtils.parseDate("2014-01-01"), ImmutableMap.of(IssueAuthorizationDao.PROJECT_KEY, "org.sonar:sample"));
-    assertThat(session.getActionCount()).isEqualTo(1);
+    // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+    assertThat(session.getActionCount()).isEqualTo(2);
   }
 
 }
index cb79ed6d56952d8c3e8cef3977ebf3b68f81026b..cfb22cb7eb41ac79857e5b1754ba785022bf2325 100644 (file)
@@ -67,6 +67,12 @@ public class BackendCleanup implements ServerComponent {
       .getState().getMetaData().concreteAllIndices())
       .setQuery(QueryBuilders.matchAllQuery())
       .get();
-
+    searchClient.admin().indices().prepareRefresh(searchClient.admin().cluster().prepareState().get()
+      .getState().getMetaData().concreteAllIndices())
+      .setForce(true)
+      .get();
+    searchClient.admin().indices().prepareFlush(searchClient.admin().cluster().prepareState().get()
+      .getState().getMetaData().concreteAllIndices())
+      .get();
   }
 }
index 3ca049d44eec88186d02e66951e257779cf79a6c..2bca0acee02b4df2e6455581af0529455d266f8e 100644 (file)
@@ -41,6 +41,8 @@ public class DbSession implements SqlSession {
   private SqlSession session;
   private int actionCount;
 
+  private Integer implicitCommitSize = IMPLICIT_COMMIT_SIZE;
+
   DbSession(WorkQueue queue, SqlSession session) {
     this.actionCount = 0;
     this.session = session;
@@ -48,10 +50,18 @@ public class DbSession implements SqlSession {
     this.actions = new ArrayList<ClusterAction>();
   }
 
+  public Integer getImplicitCommitSize() {
+    return implicitCommitSize;
+  }
+
+  public void setImplicitCommitSize(Integer implicitCommitSize) {
+    this.implicitCommitSize = implicitCommitSize;
+  }
+
   public void enqueue(ClusterAction action) {
     actionCount++;
     this.actions.add(action);
-    if (this.actions.size() > IMPLICIT_COMMIT_SIZE) {
+    if (this.actions.size() > getImplicitCommitSize()) {
       this.commit();
     }
   }