]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-5531 - Fixed missing implicit synchronization
authorStephane Gamard <stephane.gamard@sonarsource.com>
Mon, 22 Sep 2014 09:45:06 +0000 (11:45 +0200)
committerStephane Gamard <stephane.gamard@sonarsource.com>
Mon, 22 Sep 2014 10:07:21 +0000 (12:07 +0200)
server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
server/sonar-server/src/main/java/org/sonar/server/search/Index.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexWorker.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/RefreshActionRequest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/db/BaseDaoTest.java
server/sonar-server/src/test/java/org/sonar/server/issue/db/IssueAuthorizationDaoTest.java
sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java

index 59183eb307a6fb1bec713bc5328308014fa48a0e..37cf02a20340de3ebd16578658238120a2b70d29 100644 (file)
@@ -34,15 +34,20 @@ import org.sonar.server.search.DbSynchronizationHandler;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.action.DeleteKey;
 import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.RefreshActionRequest;
 import org.sonar.server.search.action.UpsertDto;
 import org.sonar.server.search.action.UpsertNestedItem;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.sql.Timestamp;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
 
 import static com.google.common.collect.Maps.newHashMap;
 
@@ -358,7 +363,7 @@ public abstract class BaseDao<MAPPER, DTO extends Dto<KEY>, KEY extends Serializ
       DbSynchronizationHandler handler = getSynchronizationResultHandler(session);
       session.select(getSynchronizeStatementFQN(), getSynchronizationParams(date, params), handler);
       handler.enqueueCollected();
-
+      session.enqueue(new RefreshActionRequest(this.getIndexType()));
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
index a2b1597adc98c158327451f350bb5eac917a6bff..65c73da1ffa240e2289d0418d61a3dc3b99c0beb 100644 (file)
@@ -359,16 +359,6 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
     return mapping;
   }
 
-  @Override
-  public void refresh() {
-    client.execute(client
-      .admin()
-      .indices()
-      .prepareRefresh(this.getIndexName())
-      .setForce(true)
-      .setIndices(this.getIndexName()));
-  }
-
   /* Base CRUD methods */
 
   protected abstract DOMAIN toDoc(Map<String, Object> fields);
index 13a7a935c4e78ceebe04b1d6218abdfed42b4bdb..13acb6a633292bb2885e77337534bc2ebe7710bf 100644 (file)
@@ -38,8 +38,6 @@ public interface Index<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable> e
 
   String getIndexName();
 
-  void refresh();
-
   Date getLastSynchronization();
 
   IndexStat getIndexStat();
index e5f01c3ca9e05c354c11daf173a72e7fa917cb8c..d548648af5c705fe88abcc8c68828a76eed3ce10 100644 (file)
@@ -20,6 +20,7 @@
 package org.sonar.server.search;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -31,9 +32,12 @@ import org.slf4j.LoggerFactory;
 import org.sonar.api.ServerComponent;
 import org.sonar.api.config.Settings;
 import org.sonar.api.platform.ComponentContainer;
+import org.sonar.core.cluster.ClusterAction;
 import org.sonar.core.cluster.WorkQueue;
 import org.sonar.core.profiling.Profiling;
 import org.sonar.server.search.action.IndexActionRequest;
+import org.sonar.server.search.action.IndexWorker;
+import org.sonar.server.search.action.RefreshActionRequest;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -73,14 +77,24 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
 
       Map<String, Index> indexes = getIndexMap();
       Set<String> indices = new HashSet<String>();
-      for (IndexActionRequest action : actions) {
-        if (action.needsRefresh()) {
-          refreshRequired = true;
+      for (ClusterAction action : actions) {
+        if (IndexActionRequest.class.isAssignableFrom(action.getClass())) {
+          IndexActionRequest worker = (IndexActionRequest) action;
+          if (worker.needsRefresh()) {
+            refreshRequired = true;
+            indices.add(indexes.get(worker.getIndexType()).getIndexName());
+          }
+        }
+
+        if (IndexWorker.class.isAssignableFrom(action.getClass())) {
+          IndexWorker worker = (IndexWorker) action;
+          Index index = indexes.get(worker.getIndexType());
+          worker.setIndex(index);
         }
-        Index index = indexes.get(action.getIndexType());
-        action.setIndex(index);
-        if (action.needsRefresh()) {
-          indices.add(index.getIndexName());
+
+        if (RefreshActionRequest.class.isAssignableFrom(action.getClass())) {
+          refreshRequired = true;
+          indices.add(indexes.get(((RefreshActionRequest) action).getIndexType()).getIndexName());
         }
       }
 
@@ -88,23 +102,25 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
 
       long normTime = executeNormalization(bulkRequestBuilder, actions);
 
-      // execute the request
-      long indexTime = System.currentTimeMillis();
-      BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
-      indexTime = System.currentTimeMillis() - indexTime;
+      if (bulkRequestBuilder.numberOfActions() > 0) {
+        // execute the request
+        long indexTime = System.currentTimeMillis();
+        BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
 
-      long refreshTime = 0;
-      if (refreshRequired) {
-        refreshTime = this.refreshRequiredIndex(indices);
-      }
+        indexTime = System.currentTimeMillis() - indexTime;
 
-      LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
-        bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+        long refreshTime = 0;
+        if (refreshRequired) {
+          refreshTime = this.refreshRequiredIndex(indices);
+        }
 
-      if (response.hasFailures()) {
-        throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
-      }
+        LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+          bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
 
+        if (response.hasFailures()) {
+          throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
+        }
+      }
     } catch (Exception e) {
       LOGGER.error("Could not commit to ElasticSearch", e);
     }
@@ -130,20 +146,24 @@ public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest
   private long executeNormalization(BulkRequestBuilder bulkRequestBuilder, List<IndexActionRequest> actions) {
     long normTime = System.currentTimeMillis();
     try {
+      boolean hasInlineRefreshRequest = false;
       ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
       // invokeAll() blocks until ALL tasks submitted to executor complete
       for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
         for (ActionRequest update : updateRequests.get()) {
           if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
-            bulkRequestBuilder.add(((UpdateRequest) update).refresh(false));
+            bulkRequestBuilder.add(((UpdateRequest) update));
           } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
-            bulkRequestBuilder.add(((DeleteRequest) update).refresh(false));
+            bulkRequestBuilder.add(((DeleteRequest) update));
+          } else if (RefreshRequest.class.isAssignableFrom(update.getClass())) {
+            hasInlineRefreshRequest = true;
           } else {
             throw new IllegalStateException("Un-managed request type: " + update.getClass());
           }
         }
       }
       executorService.shutdown();
+      bulkRequestBuilder.setRefresh(hasInlineRefreshRequest);
     } catch (Exception e) {
       throw new IllegalStateException("Could not execute normalization for stack", e);
     }
index 0cbacbe16c87e78cd706efc8d98206bc88276a86..1ffcb84bf468227893895d4c904c0cbb97c5c396 100644 (file)
@@ -80,6 +80,5 @@ public class IndexSynchronizer {
     }
     dao.synchronizeAfter(session,
       index.getLastSynchronization());
-    index.refresh();
   }
 }
index ebc13b7f40b7de665a31b07a7a6e36f22c725720..940316d82d86e9a96ccd2e3d5a04f0cfb2767b77 100644 (file)
@@ -27,7 +27,7 @@ import org.sonar.server.search.Index;
 import java.util.ArrayList;
 import java.util.List;
 
-public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
+public abstract class IndexActionRequest implements IndexWorker, ClusterAction<ActionRequest> {
 
   protected final String indexType;
   private final boolean requiresRefresh;
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexWorker.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexWorker.java
new file mode 100644 (file)
index 0000000..42989b6
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.sonar.server.search.Index;
+
+public interface IndexWorker {
+
+  public String getIndexType();
+
+  public void setIndex(Index index);
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/RefreshActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/RefreshActionRequest.java
new file mode 100644 (file)
index 0000000..0fa0393
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public class RefreshActionRequest implements IndexWorker, ClusterAction<RefreshRequest> {
+
+  private Index<?, ?, ?> index;
+  private final String indexType;
+
+  public RefreshActionRequest(String indexType) {
+    this.indexType = indexType;
+  }
+
+  @Override
+  public List<RefreshRequest> call() throws Exception {
+    return ImmutableList.<RefreshRequest>of(
+      new RefreshRequest()
+        .force(false)
+        .indices(index.getIndexName()));
+  }
+
+  @Override
+  public String getIndexType() {
+    return indexType;
+  }
+
+  @Override
+  public void setIndex(Index index) {
+    this.index = index;
+  }
+}
index 5ee795cb6e60e649d75a9d61451d09d843dc3ab7..c4b354193b7f0c208b532f643a1475ebfe936018 100644 (file)
@@ -117,6 +117,7 @@ public class BaseDaoTest {
     assertThat(session.getActionCount()).isEqualTo(1);
 
     dao.synchronizeAfter(session, new Date(t0));
-    assertThat(session.getActionCount()).isEqualTo(2);
+    // Synchronize adds an implicit action to the queue before finishing.
+    assertThat(session.getActionCount()).isEqualTo(3);
   }
 }
index 0b8a8c988c4ad5f654475e0f6a150859c4e02da3..aedf757c64f53cc4e03b2e49f45c8535667850c4 100644 (file)
@@ -61,8 +61,8 @@ public class IssueAuthorizationDaoTest extends AbstractDaoTestCase {
     assertThat(session.getActionCount()).isEqualTo(0);
 
     dao.synchronizeAfter(session, new Date(0));
-
-    assertThat(session.getActionCount()).isEqualTo(1);
+    // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+    assertThat(session.getActionCount()).isEqualTo(2);
   }
 
   @Test
@@ -70,7 +70,8 @@ public class IssueAuthorizationDaoTest extends AbstractDaoTestCase {
     setupData("synchronize_after_since_given_date");
 
     dao.synchronizeAfter(session, DateUtils.parseDate("2014-09-01"));
-    assertThat(session.getActionCount()).isEqualTo(1);
+    // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+    assertThat(session.getActionCount()).isEqualTo(2);
   }
 
   @Test
@@ -78,7 +79,8 @@ public class IssueAuthorizationDaoTest extends AbstractDaoTestCase {
     setupData("synchronize_after_with_project");
 
     dao.synchronizeAfter(session, DateUtils.parseDate("2014-01-01"), ImmutableMap.of(IssueAuthorizationDao.PROJECT_KEY, "org.sonar:sample"));
-    assertThat(session.getActionCount()).isEqualTo(1);
+    // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+    assertThat(session.getActionCount()).isEqualTo(2);
   }
 
 }
index bac9518865f8c2a357b4bfcc196769108e18179a..13aac68b2e13f8a4d4d47067af20dfe0513a1940 100644 (file)
  */
 package org.sonar.core.cluster;
 
+import java.util.List;
 import java.util.concurrent.Callable;
 
-public interface ClusterAction<K> extends Callable<K> {
+public interface ClusterAction<K> extends Callable<List<K>> {
 
   @Override
-  public K call() throws Exception;
+  public List<K> call() throws Exception;
 }