]> source.dussan.org Git - sonarqube.git/commitdiff
Updated indexQueue mechanism for performance (no compression yet).
authorStephane Gamard <stephane.gamard@searchbox.com>
Wed, 20 Aug 2014 14:22:33 +0000 (16:22 +0200)
committerStephane Gamard <stephane.gamard@searchbox.com>
Wed, 20 Aug 2014 16:18:38 +0000 (18:18 +0200)
33 files changed:
fork.sh
server/sonar-search/src/main/resources/org/sonar/search/logback.xml
server/sonar-server/pom.xml
server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java
server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java
server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java
server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java
server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java
server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java
server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
server/sonar-server/src/main/java/org/sonar/server/search/Index.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java
server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java [deleted file]
server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java
sonar-core/pom.xml
sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java [new file with mode: 0644]
sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java
sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java [deleted file]
sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java
sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java

diff --git a/fork.sh b/fork.sh
index 6a0cad70879ed0ff8eb6a06221acf5257bf8fc7e..fbbf69331f408483d1b38b5de1b9dad7d5cb1a4e 100755 (executable)
--- a/fork.sh
+++ b/fork.sh
@@ -1,6 +1,6 @@
 #!/bin/sh
 
-mvn clean install -DskipTests -pl :sonar-process -amd
+mvn clean install -DskipTests -pl :sonar-core,sonar-search -amd
 
 if [[ "$OSTYPE" == "darwin"* ]]; then
   OS='macosx-universal-64'
index 5f714eec3e7b06fb9fb806ca7c284728b9391b9e..648ece82e289f684f1dc7f386f0d8a4c68fe94c6 100644 (file)
@@ -28,7 +28,7 @@
   </appender>
 
   <root>
-    <level value="DEBUG"/>
+    <level value="INFO"/>
     <appender-ref ref="LOGFILE"/>
   </root>
 
index e1b8f74bb226aa64300d1706b8e9faaa5732c931..4790413b37c881740cb978116d111047bc95e64e 100644 (file)
       <groupId>org.codehaus.sonar</groupId>
       <artifactId>sonar-search</artifactId>
       <version>${project.version}</version>
-      <scope>test</scope>
     </dependency>
   </dependencies>
 
index 01ffc420f82c1a4fd14049d51858053d371ceed3..73c538333c527b0e2769902b0599ef645bb1a47a 100644 (file)
@@ -33,13 +33,12 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.sort.SortOrder;
 import org.sonar.core.activity.Activity;
 import org.sonar.core.activity.db.ActivityDto;
-import org.sonar.core.cluster.WorkQueue;
 import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.IndexField;
 import org.sonar.server.search.QueryOptions;
 import org.sonar.server.search.Result;
+import org.sonar.server.search.SearchClient;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -51,8 +50,8 @@ import java.util.Map;
  */
 public class ActivityIndex extends BaseIndex<Activity, ActivityDto, String> {
 
-  public ActivityIndex(ActivityNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
-    super(IndexDefinition.LOG, normalizer, workQueue, node);
+  public ActivityIndex(ActivityNormalizer normalizer, SearchClient node) {
+    super(IndexDefinition.LOG, normalizer, node);
   }
 
   @Override
index 5637b742d438098a0c2565c2882b514d19b0120b..214ae8d49dbf5158bf8fd5272185037c7d0ef85a 100644 (file)
@@ -26,10 +26,10 @@ import org.sonar.core.persistence.DbSession;
 import org.sonar.core.persistence.Dto;
 import org.sonar.server.exceptions.NotFoundException;
 import org.sonar.server.search.IndexDefinition;
-import org.sonar.server.search.action.DtoIndexAction;
-import org.sonar.server.search.action.EmbeddedIndexAction;
-import org.sonar.server.search.action.IndexAction;
-import org.sonar.server.search.action.KeyIndexAction;
+import org.sonar.server.search.action.DeleteKey;
+import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.UpsertDto;
+import org.sonar.server.search.action.UpsertNestedItem;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
@@ -176,7 +176,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
       item.setUpdatedAt(now);
       doUpdate(session, item);
       if (hasIndex()) {
-        session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item));
+        session.enqueue(new UpsertDto(getIndexType(), item));
       }
     } catch (Exception e) {
       throw new IllegalStateException("Fail to update item in db: " + item, e);
@@ -216,7 +216,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
     try {
       doInsert(session, item);
       if (hasIndex()) {
-        session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item));
+        session.enqueue(new UpsertDto<E>(getIndexType(), item));
       }
     } catch (Exception e) {
       throw new IllegalStateException("Fail to insert item in db: " + item, e.getCause());
@@ -249,7 +249,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
     try {
       doDeleteByKey(session, key);
       if (hasIndex()) {
-        session.enqueue(new KeyIndexAction<K>(getIndexType(), IndexAction.Method.DELETE, key));
+        session.enqueue(new DeleteKey<K>(getIndexType(), key));
       }
     } catch (Exception e) {
       throw new IllegalStateException("Fail to delete item from db: " + key, e);
@@ -258,31 +258,29 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
 
   protected final void enqueueUpdate(Object nestedItem, K key, DbSession session) {
     if (hasIndex()) {
-      session.enqueue(new EmbeddedIndexAction<K>(
-        this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem));
+      session.enqueue(new UpsertNestedItem<K>(
+        this.getIndexType(), key, nestedItem));
     }
   }
 
   public void enqueueDelete(Object nestedItem, K key, DbSession session) {
     if (hasIndex()) {
-      session.enqueue(new EmbeddedIndexAction<K>(
-        this.getIndexType(), IndexAction.Method.DELETE, key, nestedItem));
+      session.enqueue(new DeleteNestedItem<K>(
+        this.getIndexType(), key, nestedItem));
       session.commit();
     }
   }
 
   public void enqueueInsert(Object nestedItem, K key, DbSession session) {
     if (hasIndex()) {
-      session.enqueue(new EmbeddedIndexAction<K>(
-        this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem));
+      this.enqueueUpdate(nestedItem, key, session);
     }
   }
 
   @Override
   public final void synchronizeAfter(final DbSession session, Date date) {
     for (E dto : this.findAfterDate(session, date)) {
-      session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT,
-        dto));
+      session.enqueue(new UpsertDto<E>(getIndexType(), dto));
     }
     session.commit();
   }
index 4cfce8e9a817fa937d18e76f22133ade5e940e0c..fafdd32b722cb7f8f9714bddb4747b105d2a844c 100644 (file)
@@ -220,7 +220,7 @@ class ServerComponents {
       ActiveRuleNormalizer.class,
       RuleIndex.class,
       ActiveRuleIndex.class,
-      IndexQueueWorker.class,
+      //IndexQueueWorker.class,
       IndexClient.class,
       ActivityNormalizer.class,
       ActivityIndex.class,
index e71647d3415fe8c0b773eff3f5829486938616af..b12c5f8d8cad24098a9191dc464f31ecb736897b 100644 (file)
@@ -33,16 +33,15 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.sonar.api.rule.RuleKey;
 import org.sonar.api.rule.RuleStatus;
-import org.sonar.core.cluster.WorkQueue;
 import org.sonar.core.qualityprofile.db.ActiveRuleDto;
 import org.sonar.core.qualityprofile.db.ActiveRuleKey;
 import org.sonar.server.qualityprofile.ActiveRule;
 import org.sonar.server.rule.index.RuleNormalizer;
 import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
 import org.sonar.server.search.FacetValue;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.IndexField;
+import org.sonar.server.search.SearchClient;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -52,8 +51,8 @@ import java.util.Map;
 
 public class ActiveRuleIndex extends BaseIndex<ActiveRule, ActiveRuleDto, ActiveRuleKey> {
 
-  public ActiveRuleIndex(ActiveRuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
-    super(IndexDefinition.ACTIVE_RULE, normalizer, workQueue, node);
+  public ActiveRuleIndex(ActiveRuleNormalizer normalizer, SearchClient node) {
+    super(IndexDefinition.ACTIVE_RULE, normalizer, node);
   }
 
   @Override
index 8ece6b6eca8a53dd2646201bd8ce3153418c4c4c..3f59374405888e31a560060347d35dec04895e59 100644 (file)
@@ -28,13 +28,13 @@ import org.sonar.core.qualityprofile.db.ActiveRuleDto;
 import org.sonar.core.qualityprofile.db.ActiveRuleKey;
 import org.sonar.core.qualityprofile.db.ActiveRuleParamDto;
 import org.sonar.core.qualityprofile.db.QualityProfileDto;
+import org.sonar.search.script.ListUpdate;
 import org.sonar.server.db.DbClient;
 import org.sonar.server.qualityprofile.ActiveRule;
 import org.sonar.server.search.BaseNormalizer;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.IndexField;
 import org.sonar.server.search.Indexable;
-import org.sonar.server.search.es.ListUpdate;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
index a80f4d13ef62f5fc64afe7dcaa74880c07991eaa..53b6ca26d24705acae5f294c684a7581e31f20d0 100644 (file)
@@ -44,16 +44,15 @@ import org.elasticsearch.search.sort.SortOrder;
 import org.sonar.api.rule.RuleKey;
 import org.sonar.api.rule.RuleStatus;
 import org.sonar.api.server.debt.DebtCharacteristic;
-import org.sonar.core.cluster.WorkQueue;
 import org.sonar.core.rule.RuleDto;
 import org.sonar.server.qualityprofile.index.ActiveRuleNormalizer;
 import org.sonar.server.rule.Rule;
 import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.IndexField;
 import org.sonar.server.search.QueryOptions;
 import org.sonar.server.search.Result;
+import org.sonar.server.search.SearchClient;
 
 import javax.annotation.CheckForNull;
 import java.io.IOException;
@@ -69,8 +68,8 @@ import static com.google.common.collect.Lists.newArrayList;
 
 public class RuleIndex extends BaseIndex<Rule, RuleDto, RuleKey> {
 
-  public RuleIndex(RuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
-    super(IndexDefinition.RULE, normalizer, workQueue, node);
+  public RuleIndex(RuleNormalizer normalizer, SearchClient client) {
+    super(IndexDefinition.RULE, normalizer, client);
   }
 
   protected String getKeyValue(RuleKey key) {
index 9071d30faf23ac25e54dbf73fbceb3d0790ff131..3a347255d7d93eb976804b90b428008c2fa98df3 100644 (file)
@@ -33,12 +33,12 @@ import org.sonar.core.rule.RuleDto;
 import org.sonar.core.rule.RuleParamDto;
 import org.sonar.core.technicaldebt.db.CharacteristicDto;
 import org.sonar.markdown.Markdown;
+import org.sonar.search.script.ListUpdate;
 import org.sonar.server.db.DbClient;
 import org.sonar.server.search.BaseNormalizer;
 import org.sonar.server.search.IndexDefinition;
 import org.sonar.server.search.IndexField;
 import org.sonar.server.search.Indexable;
-import org.sonar.server.search.es.ListUpdate;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
index 5abddd2db696c01e8358626788ea7dc08653fb25..f4baaf004b8e6fde26d9695f93ed1b1bcdebecfb 100644 (file)
@@ -23,19 +23,13 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.count.CountRequestBuilder;
 import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.BoolFilterBuilder;
@@ -53,15 +47,19 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCou
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.sonar.core.cluster.WorkQueue;
 import org.sonar.core.persistence.Dto;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
 
 public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable>
   implements Index<DOMAIN, DTO, KEY> {
@@ -72,13 +70,16 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
   private final BaseNormalizer<DTO, KEY> normalizer;
   private final IndexDefinition indexDefinition;
 
-  protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer,
-                      WorkQueue workQueue, SearchClient client) {
+  protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer, SearchClient client) {
     this.normalizer = normalizer;
     this.client = client;
     this.indexDefinition = indexDefinition;
   }
 
+  public BaseNormalizer<DTO, KEY> getNormalizer() {
+    return normalizer;
+  }
+
   @Override
   public final String getIndexName() {
     return this.indexDefinition.getIndexName();
@@ -390,105 +391,6 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
     return null;
   }
 
-  protected void updateDocument(Collection<UpdateRequest> requests, KEY key) {
-    LOG.debug("UPDATE _id:{} in index {}", key, this.getIndexName());
-    BulkRequestBuilder bulkRequest = client.prepareBulk();
-    for (UpdateRequest request : requests) {
-      // if request has no ID then no upsert possible!
-      if (request.id() == null || request.id().isEmpty()) {
-        bulkRequest.add(new IndexRequest()
-          .source(request.doc().sourceAsMap())
-          .type(this.getIndexType())
-          .index(this.getIndexName()));
-      } else {
-        bulkRequest.add(request
-          .id(this.getKeyValue(key))
-          .index(this.getIndexName())
-          .type(this.getIndexType()));
-      }
-    }
-    BulkResponse response = client.execute(bulkRequest);
-  }
-
-  @Override
-  public void upsert(KEY key, Object object, Object... objects) throws Exception {
-    long t0 = System.currentTimeMillis();
-    List<UpdateRequest> requests = this.normalizer.normalizeNested(object, key);
-    for (Object additionalObject : objects) {
-      requests.addAll(this.normalizer.normalizeNested(additionalObject, key));
-    }
-    long t1 = System.currentTimeMillis();
-    this.updateDocument(requests, key);
-    long t2 = System.currentTimeMillis();
-    LOG.debug("UPSERT [object] time:{}ms ({}ms normalize, {}ms elastic)",
-      t2 - t0, t1 - t0, t2 - t1);
-  }
-
-  @Override
-  public void upsert(DTO item, DTO... items) {
-    try {
-      long t0 = System.currentTimeMillis();
-      List<UpdateRequest> requests = normalizer.normalize(item);
-      for (DTO additionalItem : items) {
-        requests.addAll(normalizer.normalize(additionalItem));
-      }
-      long t1 = System.currentTimeMillis();
-      this.updateDocument(requests, item.getKey());
-      long t2 = System.currentTimeMillis();
-      LOG.debug("UPSERT [dto] time:{}ms ({}ms normalize, {}ms elastic)",
-        t2 - t0, t1 - t0, t2 - t1);
-    } catch (Exception e) {
-      LOG.error("Could not update document for index {}: {}",
-        this.getIndexName(), e.getMessage(), e);
-    }
-  }
-
-  private void deleteDocument(KEY key) throws ExecutionException, InterruptedException {
-    LOG.debug("DELETE _id:{} in index {}", key, this.getIndexName());
-    DeleteRequestBuilder request = client
-      .prepareDelete()
-      .setIndex(this.getIndexName())
-      .setType(this.getIndexType())
-      .setId(this.getKeyValue(key));
-    DeleteResponse response = client.execute(request);
-  }
-
-  @Override
-  public void delete(KEY key, Object object, Object... objects) throws Exception {
-    LOG.debug("DELETE NESTED _id:{} in index {}", key, this.getIndexName());
-    List<UpdateRequest> requests = this.normalizer.deleteNested(object, key);
-    for (Object additionalObject : objects) {
-      requests.addAll(this.normalizer.deleteNested(additionalObject, key));
-    }
-    this.updateDocument(requests, key);
-  }
-
-  @Override
-  public void deleteByKey(KEY key, KEY... keys) {
-    try {
-      this.deleteDocument(key);
-      for (KEY additionalKey : keys) {
-        this.deleteDocument(additionalKey);
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s",
-        getKeyValue(key), getIndexName()), e);
-    }
-  }
-
-  @Override
-  public void deleteByDto(DTO item, DTO... items) {
-    try {
-      this.deleteDocument(item.getKey());
-      for (DTO additionalItem : items) {
-        this.deleteDocument(additionalItem.getKey());
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s",
-        getKeyValue(item.getKey()), getIndexName()), e);
-    }
-  }
-
   /* ES QueryHelper Methods */
 
   protected BoolFilterBuilder addTermFilter(BoolFilterBuilder filter, String field, @Nullable Collection<String> values) {
index 08e8836c564eb894c2ce6c3923461523a2a975dd..13a7a935c4e78ceebe04b1d6218abdfed42b4bdb 100644 (file)
@@ -40,19 +40,11 @@ public interface Index<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable> e
 
   void refresh();
 
-  void upsert(KEY key, Object object, Object... objects) throws Exception;
-
-  void upsert(DTO dto, DTO... dtos);
-
-  void delete(KEY key, Object object, Object... objects) throws Exception;
-
-  void deleteByKey(KEY key, KEY... keys);
-
-  void deleteByDto(DTO dto, DTO... dtos);
-
   Date getLastSynchronization();
 
   IndexStat getIndexStat();
 
   Iterator<DOMAIN> scroll(String scrollId);
+
+  BaseNormalizer<DTO, KEY> getNormalizer();
 }
index 81cfdf6092f3abea7970a2baf13c96480f707fba..89ddcb752d543a1dbc4f79f8c2c909991c6a6b2c 100644 (file)
  */
 package org.sonar.server.search;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonar.api.ServerComponent;
 import org.sonar.api.config.Settings;
+import org.sonar.api.platform.ComponentContainer;
 import org.sonar.core.cluster.WorkQueue;
 import org.sonar.core.profiling.Profiling;
-import org.sonar.core.profiling.StopWatch;
-import org.sonar.server.search.action.EmbeddedIndexAction;
-import org.sonar.server.search.action.IndexAction;
+import org.sonar.server.search.action.IndexActionRequest;
 
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 public class IndexQueue extends LinkedBlockingQueue<Runnable>
-  implements ServerComponent, WorkQueue<IndexAction> {
+  implements ServerComponent, WorkQueue<IndexActionRequest> {
 
   protected final Profiling profiling;
 
+  private final SearchClient searchClient;
+  private final ComponentContainer container;
+
   private static final Logger LOGGER = LoggerFactory.getLogger(IndexQueue.class);
 
   private static final Integer DEFAULT_QUEUE_SIZE = 200;
   private static final int TIMEOUT = 30000;
 
-  public IndexQueue(Settings settings) {
+  public IndexQueue(Settings settings, SearchClient searchClient, ComponentContainer container) {
     super(DEFAULT_QUEUE_SIZE);
+    this.searchClient = searchClient;
+    this.container = container;
     this.profiling = new Profiling(settings);
   }
 
   @Override
-  public void enqueue(IndexAction action) {
-    this.enqueue(ImmutableList.of(action));
-  }
-
-  @Override
-  public void enqueue(List<IndexAction> actions) {
-
-
-    int bcount = 0;
-    int ecount = 0;
-    List<String> refreshes = Lists.newArrayList();
-    Set<String> types = Sets.newHashSet();
-    long all_start = System.currentTimeMillis();
-    long indexTime;
-    long refreshTime;
-    long embeddedTime;
-
-    if (actions.size() == 1) {
-      /* Atomic update here */
-      CountDownLatch latch = new CountDownLatch(1);
-      IndexAction action = actions.get(0);
-      action.setLatch(latch);
-      try {
-        indexTime = System.currentTimeMillis();
-        this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
-        if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
-          throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
-        }
-        bcount++;
-        indexTime = System.currentTimeMillis() - indexTime;
-        // refresh the index.
-        Index<?, ?, ?> index = action.getIndex();
-        if (index != null) {
-          refreshTime = System.currentTimeMillis();
-          index.refresh();
-          refreshTime = System.currentTimeMillis() - refreshTime;
-          refreshes.add(index.getIndexName());
-        }
-        types.add(action.getPayloadClass().getSimpleName());
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("ES update has been interrupted", e);
-      }
-    } else if (actions.size() > 1) {
-      StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
-
-      /* Purge actions that would be overridden  */
-      Long purgeStart = System.currentTimeMillis();
-      List<IndexAction> itemActions = Lists.newArrayList();
-      List<IndexAction> embeddedActions = Lists.newArrayList();
-
-      for (IndexAction action : actions) {
-        if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
-          embeddedActions.add(action);
-        } else {
-          itemActions.add(action);
-        }
-      }
+  public void enqueue(List<IndexActionRequest> actions) {
 
-      LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
-        actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
+    if (actions.isEmpty()) {
+      return;
+    }
+    try {
 
-      try {
-        /* execute all item actions */
-        CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
-        indexTime = System.currentTimeMillis();
-        for (IndexAction action : itemActions) {
-          action.setLatch(itemLatch);
-          this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
-          types.add(action.getPayloadClass().getSimpleName());
-          bcount++;
+      BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
+      Map<String,Index> indexes = getIndexMap();
+      for (IndexActionRequest action : actions) {
+        action.setIndex(indexes.get(action.getIndexType()));
+      }
 
-        }
-        if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
-          throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
-        }
-        indexTime = System.currentTimeMillis() - indexTime;
-
-        /* and now push the embedded */
-        CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
-        embeddedTime = System.currentTimeMillis();
-        for (IndexAction action : embeddedActions) {
-          action.setLatch(embeddedLatch);
-          this.offer(action, TIMEOUT, TimeUnit.SECONDS);
-          types.add(action.getPayloadClass().getSimpleName());
-          ecount++;
-        }
-        if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
-          throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
-        }
-        embeddedTime = System.currentTimeMillis() - embeddedTime;
-
-        /* Finally refresh affected indexes */
-        Set<String> refreshedIndexes = new HashSet<String>();
-        refreshTime = System.currentTimeMillis();
-        for (IndexAction action : actions) {
-          if (action.getIndex() != null &&
-            !refreshedIndexes.contains(action.getIndex().getIndexName())) {
-            refreshedIndexes.add(action.getIndex().getIndexName());
-            action.getIndex().refresh();
-            refreshes.add(action.getIndex().getIndexName());
+      ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+      //invokeAll() blocks until ALL tasks submitted to executor complete
+      for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
+        for (ActionRequest update : updateRequests.get()) {
+          if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
+            bulkRequestBuilder.add((UpdateRequest)update);
+          } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
+            bulkRequestBuilder.add((DeleteRequest)update);
+          } else {
+            throw new IllegalStateException("Un-managed request type: " + update.getClass());
           }
         }
-        refreshTime = System.currentTimeMillis() - refreshTime;
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("ES update has been interrupted", e);
       }
+      executorService.shutdown();
 
-      basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
-        (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
-        StringUtils.join(types, ","),
-        bcount, ecount, StringUtils.join(refreshes, ","));
+      LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions());
+
+      //execute the request
+      BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true));
+    } catch (Exception e) {
+      e.printStackTrace();
     }
   }
+
+  private Map<String, Index> getIndexMap() {
+    Map<String, Index> indexes = new HashMap<String, Index>();
+    for (Index index : container.getComponentsByType(Index.class)) {
+      indexes.put(index.getIndexType(), index);
+    }
+    return indexes;
+  }
+
+
+//
+//    int bcount = 0;
+//    int ecount = 0;
+//    List<String> refreshes = Lists.newArrayList();
+//    Set<String> types = Sets.newHashSet();
+//    long all_start = System.currentTimeMillis();
+//    long indexTime;
+//    long refreshTime;
+//    long embeddedTime;
+//
+//    if (actions.size() == 1) {
+//      /* Atomic update here */
+//      CountDownLatch latch = new CountDownLatch(1);
+//      IndexAction action = actions.get(0);
+//      action.setLatch(latch);
+//      try {
+//        indexTime = System.currentTimeMillis();
+//        this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
+//        if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+//          throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
+//        }
+//        bcount++;
+//        indexTime = System.currentTimeMillis() - indexTime;
+//        // refresh the index.
+//        Index<?, ?, ?> index = action.getIndex();
+//        if (index != null) {
+//          refreshTime = System.currentTimeMillis();
+//          index.refresh();
+//          refreshTime = System.currentTimeMillis() - refreshTime;
+//          refreshes.add(index.getIndexName());
+//        }
+//        types.add(action.getPayloadClass().getSimpleName());
+//      } catch (InterruptedException e) {
+//        throw new IllegalStateException("ES update has been interrupted", e);
+//      }
+//    } else if (actions.size() > 1) {
+//      StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
+//
+//      /* Purge actions that would be overridden  */
+//      Long purgeStart = System.currentTimeMillis();
+//      List<IndexAction> itemActions = Lists.newArrayList();
+//      List<IndexAction> embeddedActions = Lists.newArrayList();
+//
+//      for (IndexAction action : actions) {
+//        if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
+//          embeddedActions.add(action);
+//        } else {
+//          itemActions.add(action);
+//        }
+//      }
+//
+//      LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
+//        actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
+//
+//      try {
+//        /* execute all item actions */
+//        CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
+//        indexTime = System.currentTimeMillis();
+//        for (IndexAction action : itemActions) {
+//          action.setLatch(itemLatch);
+//          this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
+//          types.add(action.getPayloadClass().getSimpleName());
+//          bcount++;
+//
+//        }
+//        if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+//          throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
+//        }
+//        indexTime = System.currentTimeMillis() - indexTime;
+//
+//        /* and now push the embedded */
+//        CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
+//        embeddedTime = System.currentTimeMillis();
+//        for (IndexAction action : embeddedActions) {
+//          action.setLatch(embeddedLatch);
+//          this.offer(action, TIMEOUT, TimeUnit.SECONDS);
+//          types.add(action.getPayloadClass().getSimpleName());
+//          ecount++;
+//        }
+//        if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+//          throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
+//        }
+//        embeddedTime = System.currentTimeMillis() - embeddedTime;
+//
+//        /* Finally refresh affected indexes */
+//        Set<String> refreshedIndexes = new HashSet<String>();
+//        refreshTime = System.currentTimeMillis();
+//        for (IndexAction action : actions) {
+//          if (action.getIndex() != null &&
+//            !refreshedIndexes.contains(action.getIndex().getIndexName())) {
+//            refreshedIndexes.add(action.getIndex().getIndexName());
+//            action.getIndex().refresh();
+//            refreshes.add(action.getIndex().getIndexName());
+//          }
+//        }
+//        refreshTime = System.currentTimeMillis() - refreshTime;
+//      } catch (InterruptedException e) {
+//        throw new IllegalStateException("ES update has been interrupted", e);
+//      }
+//
+//      basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
+//        (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
+//        StringUtils.join(types, ","),
+//        bcount, ecount, StringUtils.join(refreshes, ","));
+//    }
+
+
 }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java
deleted file mode 100644 (file)
index c2948b7..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search;
-
-import org.picocontainer.Startable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonar.api.ServerComponent;
-import org.sonar.server.search.action.IndexAction;
-
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class IndexQueueWorker extends ThreadPoolExecutor
-  implements ServerComponent, Startable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(IndexQueueWorker.class);
-
-  private final IndexClient indexes;
-
-  public IndexQueueWorker(IndexQueue queue, IndexClient indexes) {
-    super(1,1, 0L, TimeUnit.MILLISECONDS, queue);
-    this.indexes = indexes;
-  }
-
-  protected void beforeExecute(Thread t, Runnable r) {
-    LOG.debug("Starting task: {}", r);
-    super.beforeExecute(t, r);
-    if (IndexAction.class.isAssignableFrom(r.getClass())) {
-      IndexAction ia = (IndexAction) r;
-      LOG.debug("Task is an IndexAction for {}", ia.getIndexType());
-      ia.setIndex(indexes.getByType(ia.getIndexType()));
-    }
-  }
-
-  protected void afterExecute(Runnable r, Throwable t) {
-    super.afterExecute(r, t);
-    if (t != null) {
-      throw new IllegalStateException(t);
-    }
-  }
-
-  @Override
-  public void start() {
-    this.prestartAllCoreThreads();
-  }
-
-  @Override
-  public void stop() {
-    this.shutdown();
-  }
-}
index 1810cee8cfde82086e195ffe25a23ddf589ae7da..0c919160eda0ba7c566838c413e946aa15f77450 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
@@ -36,6 +35,8 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.sonar.api.config.Settings;
 import org.sonar.core.profiling.Profiling;
 import org.sonar.core.profiling.StopWatch;
@@ -45,6 +46,8 @@ import org.sonar.core.profiling.StopWatch;
  */
 public class SearchClient extends TransportClient {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(SearchClient.class);
+
   private static final String DEFAULT_HEALTH_TIMEOUT = "30s";
 
   private final Settings settings;
@@ -114,10 +117,10 @@ public class SearchClient extends TransportClient {
 
   public <K extends ActionResponse> K execute(ActionRequestBuilder request) {
     StopWatch fullProfile = profiling.start("search", Profiling.Level.FULL);
-    ListenableActionFuture acc = request.execute();
+    K response = null;
     try {
 
-      K response = (K) acc.get();
+      response = (K) request.get();
 
       if (profiling.isProfilingEnabled(Profiling.Level.BASIC)) {
         if (ToXContent.class.isAssignableFrom(request.getClass())) {
@@ -144,7 +147,9 @@ public class SearchClient extends TransportClient {
       }
       return response;
     } catch (Exception e) {
+      LOGGER.error("could not execute request: "   + response);
       throw new IllegalStateException("ES error: ", e);
+
     }
   }
 }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java
new file mode 100644 (file)
index 0000000..8b801a4
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.client.Requests;
+import org.sonar.core.persistence.Dto;
+import org.sonar.server.search.Index;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeleteDto<DTO extends Dto> extends IndexActionRequest {
+
+  private final DTO dto;
+
+  public DeleteDto(String indexType, DTO dto) {
+    super(indexType);
+    this.dto = dto;
+  }
+
+  @Override
+  public String getKey() {
+    return dto.getKey().toString();
+  }
+
+  @Override
+  public Class<?> getPayloadClass() {
+    return dto.getClass();
+  }
+
+  @Override
+  public List<ActionRequest> doCall(Index index) throws Exception {
+    List<ActionRequest> requests = new ArrayList<ActionRequest>();
+    requests.add(Requests.deleteRequest(index.getIndexName())
+      .id(dto.getKey().toString())
+      .type(indexType));
+    return requests;
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java
new file mode 100644 (file)
index 0000000..adc15a9
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.client.Requests;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeleteKey<K extends Serializable> extends IndexActionRequest {
+
+  private final K key;
+
+  public DeleteKey(String indexType, K key) {
+    super(indexType);
+    this.key = key;
+  }
+
+  @Override
+  public String getKey() {
+    return key.toString();
+  }
+
+  @Override
+  public Class<?> getPayloadClass() {
+    throw new IllegalStateException("Deletion by key does not have an object payload!");
+  }
+
+  @Override
+  public List<ActionRequest> doCall(Index index) throws Exception {
+    List<ActionRequest> requests = new ArrayList<ActionRequest>();
+    requests.add(Requests.deleteRequest(index.getIndexName())
+      .id(getKey())
+      .type(indexType));
+    return requests;
+  }
+
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java
new file mode 100644 (file)
index 0000000..15555a4
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest {
+
+  private final K key;
+  private final Object item;
+  private final Object[] items;
+
+  public DeleteNestedItem(String indexType, K key, Object item, Object... items) {
+    super(indexType);
+    this.key = key;
+    this.item = item;
+    this.items = items;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key.toString();
+  }
+
+  @Override
+  public Class<?> getPayloadClass() {
+    return item.getClass();
+  }
+
+  @Override
+  public List<ActionRequest> doCall(Index index) throws Exception {
+    List<ActionRequest> updates = new ArrayList<ActionRequest>();
+    updates.addAll(deleteItem(index, item, key));
+    for (Object otherItem : items) {
+      updates.addAll(deleteItem(index, otherItem, key));
+    }
+    return updates;
+  }
+
+  private List<ActionRequest> deleteItem(Index index, Object item, K key) {
+    return index.getNormalizer().deleteNested(item, key);
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java
deleted file mode 100644 (file)
index 3f09d70..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import org.sonar.core.persistence.Dto;
-
-public class DtoIndexAction<E extends Dto> extends IndexAction {
-
-  private final E item;
-  private final E[] items;
-
-  public DtoIndexAction(String indexType, Method method, E item, E... items) {
-    super(indexType, method);
-    this.item = item;
-    this.items = items;
-  }
-
-  @Override
-  public Class<?> getPayloadClass() {
-    return item.getClass();
-  }
-
-  @Override
-  public String getKey() {
-    return item.getKey().toString();
-  }
-
-  @Override
-  public void doExecute() {
-    try {
-      if (this.getMethod().equals(Method.DELETE)) {
-        index.deleteByDto(this.item, this.items);
-      } else if (this.getMethod().equals(Method.UPSERT)) {
-        index.upsert(this.item, this.items);
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException(this.getClass().getSimpleName() +
-        " cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() +
-        " as " + this.getIndexType() +
-        " on key: " + this.item.getKey(), e);
-    }
-  }
-}
-
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java
deleted file mode 100644 (file)
index 728ea8e..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import java.io.Serializable;
-
-public class EmbeddedIndexAction<K extends Serializable> extends IndexAction {
-
-  private final K key;
-  private final Object item;
-  private final Object[] items;
-
-  public EmbeddedIndexAction(String indexType, Method method, K key, Object item, Object... items) {
-    super(indexType, method);
-    this.indexType = indexType;
-    this.method = method;
-    this.key = key;
-    this.item = item;
-    this.items = items;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key.toString();
-  }
-
-  @Override
-  public Class<?> getPayloadClass() {
-    return item.getClass();
-  }
-
-  @Override
-  public void doExecute() {
-
-    try {
-      if (this.getMethod().equals(Method.DELETE)) {
-        index.delete(this.key, this.item, this.items);
-      } else if (this.getMethod().equals(Method.UPSERT)) {
-        index.upsert(this.key, this.item, this.items);
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException(this.getClass().getSimpleName() +
-        "cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() +
-        " as " + this.getIndexType() +
-        " on key: " + this.key, e);
-    }
-  }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java
deleted file mode 100644 (file)
index c2001b4..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import org.sonar.core.cluster.QueueAction;
-import org.sonar.server.search.Index;
-
-public abstract class IndexAction extends QueueAction {
-
-
-  public abstract Class<?> getPayloadClass();
-
-  public enum Method {
-    UPSERT, DELETE
-  }
-
-  protected String indexType;
-  protected Method method;
-  protected Index index;
-
-  public IndexAction(String indexType, Method method) {
-    super();
-    this.indexType = indexType;
-    this.method = method;
-  }
-
-  public abstract String getKey();
-
-  public Method getMethod() {
-    return this.method;
-  }
-
-  public String getIndexType() {
-    return indexType;
-  }
-
-  public void setMethod(Method method) {
-    this.method = method;
-  }
-
-  public void setIndex(Index index) {
-    this.index = index;
-  }
-
-  public Index getIndex() {
-    return index;
-  }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
new file mode 100644 (file)
index 0000000..889cf7a
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
+
+  protected final String indexType;
+  private Index index;
+
+  public IndexActionRequest(String indexType) {
+    super();
+    this.indexType = indexType;
+  }
+
+  public abstract String getKey();
+
+  public abstract Class<?> getPayloadClass();
+
+  public String getIndexType() {
+    return indexType;
+  }
+
+
+  public void setIndex(Index index) {
+    this.index = index;
+  }
+
+  @Override
+  public final List<ActionRequest> call() throws Exception {
+    if (index == null) {
+      throw new IllegalStateException("Cannot execute request - Index is null");
+    }
+    List<ActionRequest> finalRequests = new ArrayList<ActionRequest>();
+    for (ActionRequest request : doCall(index)) {
+      if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
+        ((UpdateRequest) request)
+          .type(index.getIndexType())
+          .index(index.getIndexName());
+      }
+      finalRequests.add(request);
+    }
+    return finalRequests;
+  }
+
+  public abstract List<ActionRequest> doCall(Index index) throws Exception;
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java
deleted file mode 100644 (file)
index 3e23d55..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import java.io.Serializable;
-
-public class KeyIndexAction<K extends Serializable> extends IndexAction {
-
-  private final K key;
-  private final K[] keys;
-
-  public KeyIndexAction(String indexType, Method method, K key, K... keys) {
-    super(indexType, method);
-    this.key = key;
-    this.keys = keys;
-  }
-
-  @Override
-  public Class<?> getPayloadClass() {
-    return String.class;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key.toString();
-  }
-
-  @Override
-  public void doExecute() {
-    try {
-      if (this.getMethod().equals(Method.DELETE)) {
-        index.deleteByKey(this.key, this.keys);
-      } else if (this.getMethod().equals(Method.UPSERT)) {
-        throw new IllegalStateException("Upsert by Key is not supported anymore");
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException(this.getClass().getSimpleName() +
-        "cannot execute " + this.getMethod() + " for " + this.key.getClass().getSimpleName() +
-        " on type: " + this.getIndexType() +
-        " on key: " + this.key, e);
-    }
-  }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java
new file mode 100644 (file)
index 0000000..1e31a35
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.core.persistence.Dto;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
+
+  private final DTO dto;
+
+  public UpsertDto(String indexType, DTO dto) {
+    super(indexType);
+    this.dto = dto;
+  }
+
+  @Override
+  public String getKey() {
+    return dto.getKey().toString();
+  }
+
+  @Override
+  public Class<?> getPayloadClass() {
+    return dto.getClass();
+  }
+
+  @Override
+  public List<ActionRequest> doCall(Index index) throws Exception {
+    return index.getNormalizer().normalize(dto);
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java
new file mode 100644 (file)
index 0000000..e87fe60
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest {
+
+
+  private final K key;
+  private final Object item;
+  private final Object[] items;
+
+  public UpsertNestedItem(String indexType, K key, Object item, Object... items) {
+    super(indexType);
+    this.key = key;
+    this.item = item;
+    this.items = items;
+  }
+
+  @Override
+  public String getKey() {
+    return key.toString();
+  }
+
+  @Override
+  public Class<?> getPayloadClass() {
+    return item.getClass();
+  }
+
+  @Override
+  public List<ActionRequest> doCall(Index index) throws Exception {
+    List<ActionRequest> updates = new ArrayList<ActionRequest>();
+    updates.addAll(normalizeItem(index, item, key));
+    for (Object otherItem : items) {
+      updates.addAll(normalizeItem(index, otherItem, key));
+    }
+    return updates;
+  }
+
+  private List<ActionRequest> normalizeItem(Index index, Object item, K key) {
+    return index.getNormalizer().normalizeNested(item, key);
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java b/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java
deleted file mode 100644 (file)
index 6de0df6..0000000
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.search.es;
-
-import com.google.common.collect.ImmutableSet;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.xcontent.support.XContentMapValues;
-import org.elasticsearch.script.AbstractExecutableScript;
-import org.elasticsearch.script.ExecutableScript;
-import org.elasticsearch.script.NativeScriptFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * @since 4.4
- */
-public class ListUpdate extends AbstractExecutableScript {
-
-  public static final String NAME = "listUpdate";
-
-  public static final String ID_FIELD = "idField";
-  public static final String ID_VALUE = "idValue";
-  public static final String FIELD = "field";
-  public static final String VALUE = "value";
-
-  public static class UpdateListScriptFactory implements NativeScriptFactory {
-    @Override
-    public ExecutableScript newScript(@Nullable Map<String, Object> params) {
-      String idField = XContentMapValues.nodeStringValue(params.get(ID_FIELD), null);
-      String idValue = XContentMapValues.nodeStringValue(params.get(ID_VALUE), null);
-      String field = XContentMapValues.nodeStringValue(params.get(FIELD), null);
-      Map value = null;
-      if (idField == null) {
-        throw new IllegalStateException("Missing '" + ID_FIELD + "' parameter");
-      }
-      if (idValue == null) {
-        throw new IllegalStateException("Missing '" + ID_VALUE + "' parameter");
-      }
-      if (field == null) {
-        throw new IllegalStateException("Missing '" + FIELD + "' parameter");
-      }
-
-      //NULL case is deletion of nested item
-      if (params.containsKey(VALUE)) {
-        Object obj = params.get(VALUE);
-        if (obj != null) {
-          value = XContentMapValues.nodeMapValue(params.get(VALUE), "Update item");
-        }
-      }
-
-      return new ListUpdate(idField, idValue, field, value);
-    }
-  }
-
-
-  private final String idField;
-  private final String idValue;
-  private final String field;
-  private final Map<String, Object> value;
-
-  private Map<String, Object> ctx;
-
-  public ListUpdate(String idField, String idValue, String field, Map<String, Object> value) {
-    this.idField = idField;
-    this.idValue = idValue;
-    this.field = field;
-    this.value = value;
-  }
-
-  @Override
-  public void setNextVar(String name, Object value) {
-    if (name.equals("ctx")) {
-      ctx = (Map<String, Object>) value;
-    }
-  }
-
-  @Override
-  public Object unwrap(Object value) {
-    return value;
-  }
-
-  @Override
-  public Object run() {
-    try {
-      //Get the Document's source from ctx
-      Map<String, Object> source = XContentMapValues.nodeMapValue(ctx.get("_source"), "source from context");
-
-      //Get the Object for list update
-      Object fieldValue = source.get(field);
-
-      if (fieldValue == null && value != null) {
-        // 0. The field does not exist (this is a upsert then)
-        source.put(field, value);
-      } else if (!XContentMapValues.isArray(fieldValue) && value != null) {
-        // 1. The field is not yet a list
-        Map currentFieldValue = XContentMapValues.nodeMapValue(fieldValue, "current FieldValue");
-        if (XContentMapValues.nodeStringValue(currentFieldValue.get(idField), null).equals(idValue)) {
-          source.put(field, value);
-        } else {
-          source.put(field, ImmutableSet.of(fieldValue, value));
-        }
-      } else {
-        // 3. field is a list
-        Collection items = (Collection) fieldValue;
-        Object target = null;
-        for (Object item : items) {
-          Map<String, Object> fields = (Map<String, Object>) item;
-          String itemIdValue = XContentMapValues.nodeStringValue(fields.get(idField), null);
-          if (itemIdValue != null && itemIdValue.equals(idValue)) {
-            target = item;
-            break;
-          }
-        }
-        if (target != null) {
-          items.remove(target);
-        }
-
-        //Supporting the update by NULL = deletion case
-        if (value != null) {
-          items.add(value);
-        }
-        source.put(field, items);
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException("failed to execute listUpdate script", e);
-    }
-    return null;
-
-  }
-}
index 92daecaef82e1c312aa90b38fffa7133fb22e8fb..e92fd5b936fe8c1da072b9cb28f4f7e4b15864a0 100644 (file)
@@ -29,7 +29,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.sonar.api.config.Settings;
-import org.sonar.core.cluster.NullQueue;
 import org.sonar.process.MonitoredProcess;
 import org.sonar.process.NetworkUtils;
 import org.sonar.process.Props;
@@ -107,7 +106,7 @@ public class BaseIndexTest {
   private BaseIndex getIndex(final SearchClient searchClient) {
     BaseIndex index = new BaseIndex(
       IndexDefinition.TEST,
-      null, new NullQueue(), searchClient) {
+      null, searchClient) {
       @Override
       protected String getKeyValue(Serializable key) {
         return null;
index bad0f4f55035d02b84963188e55b575dfe3d6970..34c5d976e5b703bbaae26cb088375d155c9e9800 100644 (file)
       <artifactId>h2</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>1.1.2</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java b/sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java
new file mode 100644 (file)
index 0000000..bac9518
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.core.cluster;
+
+import java.util.concurrent.Callable;
+
+public interface ClusterAction<K> extends Callable<K> {
+
+  @Override
+  public K call() throws Exception;
+}
index 235d2078fef75f4f22d2011f3118a39aec6bb32f..cccdf7033bc2c4fef42f198b3d717283a83ce04a 100644 (file)
@@ -21,15 +21,10 @@ package org.sonar.core.cluster;
 
 import java.util.List;
 
-public class NullQueue implements WorkQueue<QueueAction> {
+public class NullQueue implements WorkQueue<ClusterAction> {
 
   @Override
-  public void enqueue(QueueAction action) {
-    // do nothing
-  }
-
-  @Override
-  public void enqueue(List<QueueAction> actions) {
+  public void enqueue(List<ClusterAction> actions) {
     // do nothing
   }
 }
diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java b/sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java
deleted file mode 100644 (file)
index f83078d..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.core.cluster;
-
-import java.util.concurrent.CountDownLatch;
-
-public abstract class QueueAction implements Runnable {
-
-  protected CountDownLatch latch;
-
-  public QueueAction() {
-    this.latch = null;
-  }
-
-  public void setLatch(CountDownLatch latch){
-    this.latch = latch;
-  }
-
-  public abstract void doExecute();
-
-  @Override
-  public void run(){
-    this.doExecute();
-    if (latch != null){
-      latch.countDown();
-    }
-  }
-}
index 9b4141f510defa5f4ef758ce6db556b007fa65e1..9d5cf998564beb5cb558b34c3d890275c5d03149 100644 (file)
 package org.sonar.core.cluster;
 
 import java.util.List;
+import java.util.concurrent.Callable;
 
-public interface WorkQueue<K extends QueueAction> {
-
-  void enqueue(K action);
+public interface WorkQueue<K extends Callable> {
 
   void enqueue(List<K> actions);
 
index 10e82b803cc3d520f0ca7e90947e06affd7d1589..b663b22e35e7a5574a2cd0c45681a91d2bb3ca74 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.ibatis.session.Configuration;
 import org.apache.ibatis.session.ResultHandler;
 import org.apache.ibatis.session.RowBounds;
 import org.apache.ibatis.session.SqlSession;
-import org.sonar.core.cluster.QueueAction;
+import org.sonar.core.cluster.ClusterAction;
 import org.sonar.core.cluster.WorkQueue;
 
 import java.sql.Connection;
@@ -34,8 +34,8 @@ import java.util.Map;
 
 public class DbSession implements SqlSession {
 
-  private static final Integer IMPLICIT_COMMIT_SIZE = 200;
-  private List<QueueAction> actions;
+  private static final Integer IMPLICIT_COMMIT_SIZE = 1000;
+  private List<ClusterAction> actions;
 
   private WorkQueue queue;
   private SqlSession session;
@@ -43,10 +43,10 @@ public class DbSession implements SqlSession {
   DbSession(WorkQueue queue, SqlSession session) {
     this.session = session;
     this.queue = queue;
-    this.actions = new ArrayList<QueueAction>();
+    this.actions = new ArrayList<ClusterAction>();
   }
 
-  public void enqueue(QueueAction action) {
+  public void enqueue(ClusterAction action) {
     this.actions.add(action);
     if (this.actions.size() > IMPLICIT_COMMIT_SIZE) {
       this.commit();