]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-5531 - Updated ES refresh while synchronizing
authorStephane Gamard <stephane.gamard@sonarsource.com>
Tue, 16 Sep 2014 19:44:11 +0000 (21:44 +0200)
committerStephane Gamard <stephane.gamard@sonarsource.com>
Mon, 22 Sep 2014 10:07:20 +0000 (12:07 +0200)
server/sonar-search/src/main/java/org/sonar/search/SearchServer.java
server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
server/sonar-server/src/test/java/org/sonar/server/activity/ActivityBackendMediumTest.java
server/sonar-server/src/test/java/org/sonar/server/search/IndexSynchronizerMediumTest.java
server/sonar-server/src/test/java/org/sonar/server/tester/BackendCleanup.java
server/sonar-server/src/test/java/org/sonar/server/tester/ServerTester.java
sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java

index 1433bd71b6e709f86f50656c6eaa466bc2f9c407..c1b3bb4c1507d552682ca794c3913da670654039 100644 (file)
@@ -88,9 +88,14 @@ public class SearchServer implements Monitored {
       .put("discovery.zen.ping.multicast.enabled", "false")
 
       // Index storage policies
+      .put("index.refresh_interval", "30")
       .put("index.number_of_shards", "1")
       .put("index.number_of_replicas", MINIMUM_INDEX_REPLICATION)
       .put("index.store.type", "mmapfs")
+//      .put("indices.store.throttle.type", "merge")
+//      .put("indices.store.throttle.max_bytes_per_sec", "500mb")
+      .put("index.merge.scheduler.max_thread_count",
+        Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2)))
 
       // Install our own listUpdate scripts
       .put("script.default_lang", "native")
index a87021146bb99efc9a7e4b499cbdb491a6d90b7f..59183eb307a6fb1bec713bc5328308014fa48a0e 100644 (file)
@@ -323,7 +323,7 @@ public abstract class BaseDao<MAPPER, DTO extends Dto<KEY>, KEY extends Serializ
       @Override
       public void handleResult(ResultContext resultContext) {
         DTO dto = (DTO) resultContext.getResultObject();
-        session.enqueue(new UpsertDto<DTO>(getIndexType(), dto, true));
+        session.enqueue(new UpsertDto<DTO>(getIndexType(), dto, false));
         count++;
         if (count % 100000 == 0) {
           LOGGER.info(" - synchronized {} {}", count, getIndexType());
@@ -358,6 +358,7 @@ public abstract class BaseDao<MAPPER, DTO extends Dto<KEY>, KEY extends Serializ
       DbSynchronizationHandler handler = getSynchronizationResultHandler(session);
       session.select(getSynchronizeStatementFQN(), getSynchronizationParams(date, params), handler);
       handler.enqueueCollected();
+
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
index 6faafd538df6eda512dfbde9fffddb3b130bd51f..a2b1597adc98c158327451f350bb5eac917a6bff 100644 (file)
@@ -365,7 +365,7 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
       .admin()
       .indices()
       .prepareRefresh(this.getIndexName())
-      .setForce(false)
+      .setForce(true)
       .setIndices(this.getIndexName()));
   }
 
index de7636d8596774ab18ec12311927f838e303fa61..e5f01c3ca9e05c354c11daf173a72e7fa917cb8c 100644 (file)
@@ -69,9 +69,14 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
     }
     try {
 
+      boolean refreshRequired = false;
+
       Map<String, Index> indexes = getIndexMap();
       Set<String> indices = new HashSet<String>();
       for (IndexActionRequest action : actions) {
+        if (action.needsRefresh()) {
+          refreshRequired = true;
+        }
         Index index = indexes.get(action.getIndexType());
         action.setIndex(index);
         if (action.needsRefresh()) {
@@ -83,12 +88,15 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
 
       long normTime = executeNormalization(bulkRequestBuilder, actions);
 
-      //execute the request
+      // execute the request
       long indexTime = System.currentTimeMillis();
       BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
       indexTime = System.currentTimeMillis() - indexTime;
 
-      long refreshTime = this.refreshRequiredIndex(indices);
+      long refreshTime = 0;
+      if (refreshRequired) {
+        refreshTime = this.refreshRequiredIndex(indices);
+      }
 
       LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
         bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
@@ -102,7 +110,6 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
     }
   }
 
-
   private long refreshRequiredIndex(Set<String> indices) {
 
     long refreshTime = System.currentTimeMillis();
@@ -124,7 +131,7 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
     long normTime = System.currentTimeMillis();
     try {
       ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
-      //invokeAll() blocks until ALL tasks submitted to executor complete
+      // invokeAll() blocks until ALL tasks submitted to executor complete
       for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
         for (ActionRequest update : updateRequests.get()) {
           if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
index 1ffcb84bf468227893895d4c904c0cbb97c5c396..0cbacbe16c87e78cd706efc8d98206bc88276a86 100644 (file)
@@ -80,5 +80,6 @@ public class IndexSynchronizer {
     }
     dao.synchronizeAfter(session,
       index.getLastSynchronization());
+    index.refresh();
   }
 }
index 8eb540df4a62331eb3ef60e95aa2cc639bf5184a..6b4e193373942988d4351a71501fc313f28040c3 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.sonar.server.search.action;
 
-
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.sonar.core.cluster.ClusterAction;
@@ -39,7 +38,6 @@ public abstract class IndexActionRequest implements ClusterAction<List<ActionReq
   }
 
   protected IndexActionRequest(String indexType, boolean requiresRefresh) {
-    super();
     this.indexType = indexType;
     this.requiresRefresh = requiresRefresh;
   }
@@ -52,7 +50,6 @@ public abstract class IndexActionRequest implements ClusterAction<List<ActionReq
     return indexType;
   }
 
-
   public void setIndex(Index index) {
     this.index = index;
   }
index e20aea655835b047f118c54b54e06f095adc515b..6156409adf64bcf57c39699f78c15b726fa6eaca 100644 (file)
@@ -115,9 +115,12 @@ public class ActivityBackendMediumTest {
   @Test
   public void massive_insert() {
 
+    // Set qeue's implicit commit size to 10
+    dbSession.setImplicitCommitSize(10);
+
     // 0 Assert no logs in DB
     assertThat(dao.findAll(dbSession)).hasSize(0);
-    int max = 400;
+    int max = 40;
     final String testValue = "hello world";
     for (int i = 0; i < max; i++) {
 
@@ -156,9 +159,12 @@ public class ActivityBackendMediumTest {
   @Test
   public void massive_log_insert() {
 
+    // Set qeue's implicit commit size to 10
+    dbSession.setImplicitCommitSize(10);
+
     // 0 Assert no logs in DB
     assertThat(dao.findAll(dbSession)).hasSize(0);
-    int max = 400;
+    int max = 40;
     final String testValue = "hello world";
     for (int i = 0; i < max; i++) {
       TestActivityLog log = new TestActivityLog(testValue + "_" + i, Activity.Type.QPROFILE.toString());
index 1e31323634384c4c5f01492ef14c1927c39c5a39..7c95432a00cff4f6c957659b245b99fe93edf21a 100644 (file)
@@ -78,6 +78,7 @@ public class IndexSynchronizerMediumTest {
 
     synchronizer.synchronize(dbSession, dbClient.ruleDao(), indexClient.get(RuleIndex.class));
     dbSession.commit();
+    Thread.sleep(1000);
     assertThat(indexClient.get(RuleIndex.class).countAll()).isEqualTo(numberOfRules);
   }
 }
index cb79ed6d56952d8c3e8cef3977ebf3b68f81026b..2938252bf03fafd9c155a45d814038d1ccc523d3 100644 (file)
@@ -67,6 +67,9 @@ public class BackendCleanup implements ServerComponent {
       .getState().getMetaData().concreteAllIndices())
       .setQuery(QueryBuilders.matchAllQuery())
       .get();
-
+    searchClient.admin().indices().prepareRefresh(searchClient.admin().cluster().prepareState().get()
+      .getState().getMetaData().concreteAllIndices())
+      .setForce(true)
+      .get();
   }
 }
index 43ef301e1f678b2abbd12c2f1c897699bbb32b69..9f884aea4658d4bf26c81bcab52200140d1072f9 100644 (file)
@@ -192,6 +192,7 @@ public class ServerTester extends ExternalResource {
   public void clearIndexes() {
     checkStarted();
     get(BackendCleanup.class).clearIndexes();
+
   }
 
   /**
index 3ca049d44eec88186d02e66951e257779cf79a6c..2bca0acee02b4df2e6455581af0529455d266f8e 100644 (file)
@@ -41,6 +41,8 @@ public class DbSession implements SqlSession {
   private SqlSession session;
   private int actionCount;
 
+  private Integer implicitCommitSize = IMPLICIT_COMMIT_SIZE;
+
   DbSession(WorkQueue queue, SqlSession session) {
     this.actionCount = 0;
     this.session = session;
@@ -48,10 +50,18 @@ public class DbSession implements SqlSession {
     this.actions = new ArrayList<ClusterAction>();
   }
 
+  public Integer getImplicitCommitSize() {
+    return implicitCommitSize;
+  }
+
+  public void setImplicitCommitSize(Integer implicitCommitSize) {
+    this.implicitCommitSize = implicitCommitSize;
+  }
+
   public void enqueue(ClusterAction action) {
     actionCount++;
     this.actions.add(action);
-    if (this.actions.size() > IMPLICIT_COMMIT_SIZE) {
+    if (this.actions.size() > getImplicitCommitSize()) {
       this.commit();
     }
   }