aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStephane Gamard <stephane.gamard@searchbox.com>2014-08-20 16:22:33 +0200
committerStephane Gamard <stephane.gamard@searchbox.com>2014-08-20 18:18:38 +0200
commit8e32f1abcb4858f867936a44d66a7030324ae828 (patch)
treef1957d7a300ad43ef5947bab9502f8b9dd643d3f
parent008c1d58cc8e0b6fa077f85a18148233db289a04 (diff)
downloadsonarqube-8e32f1abcb4858f867936a44d66a7030324ae828.tar.gz
sonarqube-8e32f1abcb4858f867936a44d66a7030324ae828.zip
Updated indexQueue mechanism for performance (no compression yet).
-rwxr-xr-xfork.sh2
-rw-r--r--server/sonar-search/src/main/resources/org/sonar/search/logback.xml2
-rw-r--r--server/sonar-server/pom.xml1
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java7
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java28
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java2
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java7
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java2
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java7
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java2
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java124
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/Index.java12
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java281
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java69
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java11
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java (renamed from server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java)46
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java (renamed from server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java)42
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java (renamed from server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java)36
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java72
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java (renamed from server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java)48
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java66
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java148
-rw-r--r--server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java3
-rw-r--r--sonar-core/pom.xml5
-rw-r--r--sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java (renamed from sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java)23
-rw-r--r--sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java9
-rw-r--r--sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java5
-rw-r--r--sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java10
28 files changed, 446 insertions, 624 deletions
diff --git a/fork.sh b/fork.sh
index 6a0cad70879..fbbf69331f4 100755
--- a/fork.sh
+++ b/fork.sh
@@ -1,6 +1,6 @@
#!/bin/sh
-mvn clean install -DskipTests -pl :sonar-process -amd
+mvn clean install -DskipTests -pl :sonar-core,sonar-search -amd
if [[ "$OSTYPE" == "darwin"* ]]; then
OS='macosx-universal-64'
diff --git a/server/sonar-search/src/main/resources/org/sonar/search/logback.xml b/server/sonar-search/src/main/resources/org/sonar/search/logback.xml
index 5f714eec3e7..648ece82e28 100644
--- a/server/sonar-search/src/main/resources/org/sonar/search/logback.xml
+++ b/server/sonar-search/src/main/resources/org/sonar/search/logback.xml
@@ -28,7 +28,7 @@
</appender>
<root>
- <level value="DEBUG"/>
+ <level value="INFO"/>
<appender-ref ref="LOGFILE"/>
</root>
diff --git a/server/sonar-server/pom.xml b/server/sonar-server/pom.xml
index e1b8f74bb22..4790413b37c 100644
--- a/server/sonar-server/pom.xml
+++ b/server/sonar-server/pom.xml
@@ -243,7 +243,6 @@
<groupId>org.codehaus.sonar</groupId>
<artifactId>sonar-search</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
</dependencies>
diff --git a/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java b/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java
index 01ffc420f82..73c538333c5 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java
@@ -33,13 +33,12 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.sonar.core.activity.Activity;
import org.sonar.core.activity.db.ActivityDto;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.QueryOptions;
import org.sonar.server.search.Result;
+import org.sonar.server.search.SearchClient;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -51,8 +50,8 @@ import java.util.Map;
*/
public class ActivityIndex extends BaseIndex<Activity, ActivityDto, String> {
- public ActivityIndex(ActivityNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
- super(IndexDefinition.LOG, normalizer, workQueue, node);
+ public ActivityIndex(ActivityNormalizer normalizer, SearchClient node) {
+ super(IndexDefinition.LOG, normalizer, node);
}
@Override
diff --git a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
index 5637b742d43..214ae8d49db 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java
@@ -26,10 +26,10 @@ import org.sonar.core.persistence.DbSession;
import org.sonar.core.persistence.Dto;
import org.sonar.server.exceptions.NotFoundException;
import org.sonar.server.search.IndexDefinition;
-import org.sonar.server.search.action.DtoIndexAction;
-import org.sonar.server.search.action.EmbeddedIndexAction;
-import org.sonar.server.search.action.IndexAction;
-import org.sonar.server.search.action.KeyIndexAction;
+import org.sonar.server.search.action.DeleteKey;
+import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.UpsertDto;
+import org.sonar.server.search.action.UpsertNestedItem;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
@@ -176,7 +176,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
item.setUpdatedAt(now);
doUpdate(session, item);
if (hasIndex()) {
- session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item));
+ session.enqueue(new UpsertDto(getIndexType(), item));
}
} catch (Exception e) {
throw new IllegalStateException("Fail to update item in db: " + item, e);
@@ -216,7 +216,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
try {
doInsert(session, item);
if (hasIndex()) {
- session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item));
+ session.enqueue(new UpsertDto<E>(getIndexType(), item));
}
} catch (Exception e) {
throw new IllegalStateException("Fail to insert item in db: " + item, e.getCause());
@@ -249,7 +249,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
try {
doDeleteByKey(session, key);
if (hasIndex()) {
- session.enqueue(new KeyIndexAction<K>(getIndexType(), IndexAction.Method.DELETE, key));
+ session.enqueue(new DeleteKey<K>(getIndexType(), key));
}
} catch (Exception e) {
throw new IllegalStateException("Fail to delete item from db: " + key, e);
@@ -258,31 +258,29 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple
protected final void enqueueUpdate(Object nestedItem, K key, DbSession session) {
if (hasIndex()) {
- session.enqueue(new EmbeddedIndexAction<K>(
- this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem));
+ session.enqueue(new UpsertNestedItem<K>(
+ this.getIndexType(), key, nestedItem));
}
}
public void enqueueDelete(Object nestedItem, K key, DbSession session) {
if (hasIndex()) {
- session.enqueue(new EmbeddedIndexAction<K>(
- this.getIndexType(), IndexAction.Method.DELETE, key, nestedItem));
+ session.enqueue(new DeleteNestedItem<K>(
+ this.getIndexType(), key, nestedItem));
session.commit();
}
}
public void enqueueInsert(Object nestedItem, K key, DbSession session) {
if (hasIndex()) {
- session.enqueue(new EmbeddedIndexAction<K>(
- this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem));
+ this.enqueueUpdate(nestedItem, key, session);
}
}
@Override
public final void synchronizeAfter(final DbSession session, Date date) {
for (E dto : this.findAfterDate(session, date)) {
- session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT,
- dto));
+ session.enqueue(new UpsertDto<E>(getIndexType(), dto));
}
session.commit();
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java b/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java
index 4cfce8e9a81..fafdd32b722 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java
@@ -220,7 +220,7 @@ class ServerComponents {
ActiveRuleNormalizer.class,
RuleIndex.class,
ActiveRuleIndex.class,
- IndexQueueWorker.class,
+ //IndexQueueWorker.class,
IndexClient.class,
ActivityNormalizer.class,
ActivityIndex.class,
diff --git a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java
index e71647d3415..b12c5f8d8ca 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java
@@ -33,16 +33,15 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.sonar.api.rule.RuleKey;
import org.sonar.api.rule.RuleStatus;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.qualityprofile.db.ActiveRuleDto;
import org.sonar.core.qualityprofile.db.ActiveRuleKey;
import org.sonar.server.qualityprofile.ActiveRule;
import org.sonar.server.rule.index.RuleNormalizer;
import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
import org.sonar.server.search.FacetValue;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
+import org.sonar.server.search.SearchClient;
import java.io.IOException;
import java.util.ArrayList;
@@ -52,8 +51,8 @@ import java.util.Map;
public class ActiveRuleIndex extends BaseIndex<ActiveRule, ActiveRuleDto, ActiveRuleKey> {
- public ActiveRuleIndex(ActiveRuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
- super(IndexDefinition.ACTIVE_RULE, normalizer, workQueue, node);
+ public ActiveRuleIndex(ActiveRuleNormalizer normalizer, SearchClient node) {
+ super(IndexDefinition.ACTIVE_RULE, normalizer, node);
}
@Override
diff --git a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java
index 8ece6b6eca8..3f593744058 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java
@@ -28,13 +28,13 @@ import org.sonar.core.qualityprofile.db.ActiveRuleDto;
import org.sonar.core.qualityprofile.db.ActiveRuleKey;
import org.sonar.core.qualityprofile.db.ActiveRuleParamDto;
import org.sonar.core.qualityprofile.db.QualityProfileDto;
+import org.sonar.search.script.ListUpdate;
import org.sonar.server.db.DbClient;
import org.sonar.server.qualityprofile.ActiveRule;
import org.sonar.server.search.BaseNormalizer;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.Indexable;
-import org.sonar.server.search.es.ListUpdate;
import java.lang.reflect.Field;
import java.util.ArrayList;
diff --git a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java
index a80f4d13ef6..53b6ca26d24 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java
@@ -44,16 +44,15 @@ import org.elasticsearch.search.sort.SortOrder;
import org.sonar.api.rule.RuleKey;
import org.sonar.api.rule.RuleStatus;
import org.sonar.api.server.debt.DebtCharacteristic;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.rule.RuleDto;
import org.sonar.server.qualityprofile.index.ActiveRuleNormalizer;
import org.sonar.server.rule.Rule;
import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.QueryOptions;
import org.sonar.server.search.Result;
+import org.sonar.server.search.SearchClient;
import javax.annotation.CheckForNull;
import java.io.IOException;
@@ -69,8 +68,8 @@ import static com.google.common.collect.Lists.newArrayList;
public class RuleIndex extends BaseIndex<Rule, RuleDto, RuleKey> {
- public RuleIndex(RuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
- super(IndexDefinition.RULE, normalizer, workQueue, node);
+ public RuleIndex(RuleNormalizer normalizer, SearchClient client) {
+ super(IndexDefinition.RULE, normalizer, client);
}
protected String getKeyValue(RuleKey key) {
diff --git a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java
index 9071d30faf2..3a347255d7d 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java
@@ -33,12 +33,12 @@ import org.sonar.core.rule.RuleDto;
import org.sonar.core.rule.RuleParamDto;
import org.sonar.core.technicaldebt.db.CharacteristicDto;
import org.sonar.markdown.Markdown;
+import org.sonar.search.script.ListUpdate;
import org.sonar.server.db.DbClient;
import org.sonar.server.search.BaseNormalizer;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.Indexable;
-import org.sonar.server.search.es.ListUpdate;
import java.lang.reflect.Field;
import java.util.ArrayList;
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java b/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
index 5abddd2db69..f4baaf004b8 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
@@ -23,19 +23,13 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolFilterBuilder;
@@ -53,15 +47,19 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCou
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.persistence.Dto;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable>
implements Index<DOMAIN, DTO, KEY> {
@@ -72,13 +70,16 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
private final BaseNormalizer<DTO, KEY> normalizer;
private final IndexDefinition indexDefinition;
- protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer,
- WorkQueue workQueue, SearchClient client) {
+ protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer, SearchClient client) {
this.normalizer = normalizer;
this.client = client;
this.indexDefinition = indexDefinition;
}
+ public BaseNormalizer<DTO, KEY> getNormalizer() {
+ return normalizer;
+ }
+
@Override
public final String getIndexName() {
return this.indexDefinition.getIndexName();
@@ -390,105 +391,6 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial
return null;
}
- protected void updateDocument(Collection<UpdateRequest> requests, KEY key) {
- LOG.debug("UPDATE _id:{} in index {}", key, this.getIndexName());
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- for (UpdateRequest request : requests) {
- // if request has no ID then no upsert possible!
- if (request.id() == null || request.id().isEmpty()) {
- bulkRequest.add(new IndexRequest()
- .source(request.doc().sourceAsMap())
- .type(this.getIndexType())
- .index(this.getIndexName()));
- } else {
- bulkRequest.add(request
- .id(this.getKeyValue(key))
- .index(this.getIndexName())
- .type(this.getIndexType()));
- }
- }
- BulkResponse response = client.execute(bulkRequest);
- }
-
- @Override
- public void upsert(KEY key, Object object, Object... objects) throws Exception {
- long t0 = System.currentTimeMillis();
- List<UpdateRequest> requests = this.normalizer.normalizeNested(object, key);
- for (Object additionalObject : objects) {
- requests.addAll(this.normalizer.normalizeNested(additionalObject, key));
- }
- long t1 = System.currentTimeMillis();
- this.updateDocument(requests, key);
- long t2 = System.currentTimeMillis();
- LOG.debug("UPSERT [object] time:{}ms ({}ms normalize, {}ms elastic)",
- t2 - t0, t1 - t0, t2 - t1);
- }
-
- @Override
- public void upsert(DTO item, DTO... items) {
- try {
- long t0 = System.currentTimeMillis();
- List<UpdateRequest> requests = normalizer.normalize(item);
- for (DTO additionalItem : items) {
- requests.addAll(normalizer.normalize(additionalItem));
- }
- long t1 = System.currentTimeMillis();
- this.updateDocument(requests, item.getKey());
- long t2 = System.currentTimeMillis();
- LOG.debug("UPSERT [dto] time:{}ms ({}ms normalize, {}ms elastic)",
- t2 - t0, t1 - t0, t2 - t1);
- } catch (Exception e) {
- LOG.error("Could not update document for index {}: {}",
- this.getIndexName(), e.getMessage(), e);
- }
- }
-
- private void deleteDocument(KEY key) throws ExecutionException, InterruptedException {
- LOG.debug("DELETE _id:{} in index {}", key, this.getIndexName());
- DeleteRequestBuilder request = client
- .prepareDelete()
- .setIndex(this.getIndexName())
- .setType(this.getIndexType())
- .setId(this.getKeyValue(key));
- DeleteResponse response = client.execute(request);
- }
-
- @Override
- public void delete(KEY key, Object object, Object... objects) throws Exception {
- LOG.debug("DELETE NESTED _id:{} in index {}", key, this.getIndexName());
- List<UpdateRequest> requests = this.normalizer.deleteNested(object, key);
- for (Object additionalObject : objects) {
- requests.addAll(this.normalizer.deleteNested(additionalObject, key));
- }
- this.updateDocument(requests, key);
- }
-
- @Override
- public void deleteByKey(KEY key, KEY... keys) {
- try {
- this.deleteDocument(key);
- for (KEY additionalKey : keys) {
- this.deleteDocument(additionalKey);
- }
- } catch (Exception e) {
- throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s",
- getKeyValue(key), getIndexName()), e);
- }
- }
-
- @Override
- public void deleteByDto(DTO item, DTO... items) {
- try {
- this.deleteDocument(item.getKey());
- for (DTO additionalItem : items) {
- this.deleteDocument(additionalItem.getKey());
- }
- } catch (Exception e) {
- throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s",
- getKeyValue(item.getKey()), getIndexName()), e);
- }
- }
-
/* ES QueryHelper Methods */
protected BoolFilterBuilder addTermFilter(BoolFilterBuilder filter, String field, @Nullable Collection<String> values) {
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/Index.java b/server/sonar-server/src/main/java/org/sonar/server/search/Index.java
index 08e8836c564..13a7a935c4e 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/Index.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/Index.java
@@ -40,19 +40,11 @@ public interface Index<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable> e
void refresh();
- void upsert(KEY key, Object object, Object... objects) throws Exception;
-
- void upsert(DTO dto, DTO... dtos);
-
- void delete(KEY key, Object object, Object... objects) throws Exception;
-
- void deleteByKey(KEY key, KEY... keys);
-
- void deleteByDto(DTO dto, DTO... dtos);
-
Date getLastSynchronization();
IndexStat getIndexStat();
Iterator<DOMAIN> scroll(String scrollId);
+
+ BaseNormalizer<DTO, KEY> getNormalizer();
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
index 81cfdf6092f..89ddcb752d5 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java
@@ -19,154 +19,201 @@
*/
package org.sonar.server.search;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.api.ServerComponent;
import org.sonar.api.config.Settings;
+import org.sonar.api.platform.ComponentContainer;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.profiling.Profiling;
-import org.sonar.core.profiling.StopWatch;
-import org.sonar.server.search.action.EmbeddedIndexAction;
-import org.sonar.server.search.action.IndexAction;
+import org.sonar.server.search.action.IndexActionRequest;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
public class IndexQueue extends LinkedBlockingQueue<Runnable>
- implements ServerComponent, WorkQueue<IndexAction> {
+ implements ServerComponent, WorkQueue<IndexActionRequest> {
protected final Profiling profiling;
+ private final SearchClient searchClient;
+ private final ComponentContainer container;
+
private static final Logger LOGGER = LoggerFactory.getLogger(IndexQueue.class);
private static final Integer DEFAULT_QUEUE_SIZE = 200;
private static final int TIMEOUT = 30000;
- public IndexQueue(Settings settings) {
+ public IndexQueue(Settings settings, SearchClient searchClient, ComponentContainer container) {
super(DEFAULT_QUEUE_SIZE);
+ this.searchClient = searchClient;
+ this.container = container;
this.profiling = new Profiling(settings);
}
@Override
- public void enqueue(IndexAction action) {
- this.enqueue(ImmutableList.of(action));
- }
-
- @Override
- public void enqueue(List<IndexAction> actions) {
-
-
- int bcount = 0;
- int ecount = 0;
- List<String> refreshes = Lists.newArrayList();
- Set<String> types = Sets.newHashSet();
- long all_start = System.currentTimeMillis();
- long indexTime;
- long refreshTime;
- long embeddedTime;
-
- if (actions.size() == 1) {
- /* Atomic update here */
- CountDownLatch latch = new CountDownLatch(1);
- IndexAction action = actions.get(0);
- action.setLatch(latch);
- try {
- indexTime = System.currentTimeMillis();
- this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
- if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
- }
- bcount++;
- indexTime = System.currentTimeMillis() - indexTime;
- // refresh the index.
- Index<?, ?, ?> index = action.getIndex();
- if (index != null) {
- refreshTime = System.currentTimeMillis();
- index.refresh();
- refreshTime = System.currentTimeMillis() - refreshTime;
- refreshes.add(index.getIndexName());
- }
- types.add(action.getPayloadClass().getSimpleName());
- } catch (InterruptedException e) {
- throw new IllegalStateException("ES update has been interrupted", e);
- }
- } else if (actions.size() > 1) {
- StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
-
- /* Purge actions that would be overridden */
- Long purgeStart = System.currentTimeMillis();
- List<IndexAction> itemActions = Lists.newArrayList();
- List<IndexAction> embeddedActions = Lists.newArrayList();
-
- for (IndexAction action : actions) {
- if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
- embeddedActions.add(action);
- } else {
- itemActions.add(action);
- }
- }
+ public void enqueue(List<IndexActionRequest> actions) {
- LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
- actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
+ if (actions.isEmpty()) {
+ return;
+ }
+ try {
- try {
- /* execute all item actions */
- CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
- indexTime = System.currentTimeMillis();
- for (IndexAction action : itemActions) {
- action.setLatch(itemLatch);
- this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
- types.add(action.getPayloadClass().getSimpleName());
- bcount++;
+ BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
+ Map<String,Index> indexes = getIndexMap();
+ for (IndexActionRequest action : actions) {
+ action.setIndex(indexes.get(action.getIndexType()));
+ }
- }
- if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
- }
- indexTime = System.currentTimeMillis() - indexTime;
-
- /* and now push the embedded */
- CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
- embeddedTime = System.currentTimeMillis();
- for (IndexAction action : embeddedActions) {
- action.setLatch(embeddedLatch);
- this.offer(action, TIMEOUT, TimeUnit.SECONDS);
- types.add(action.getPayloadClass().getSimpleName());
- ecount++;
- }
- if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
- }
- embeddedTime = System.currentTimeMillis() - embeddedTime;
-
- /* Finally refresh affected indexes */
- Set<String> refreshedIndexes = new HashSet<String>();
- refreshTime = System.currentTimeMillis();
- for (IndexAction action : actions) {
- if (action.getIndex() != null &&
- !refreshedIndexes.contains(action.getIndex().getIndexName())) {
- refreshedIndexes.add(action.getIndex().getIndexName());
- action.getIndex().refresh();
- refreshes.add(action.getIndex().getIndexName());
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+ //invokeAll() blocks until ALL tasks submitted to executor complete
+ for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
+ for (ActionRequest update : updateRequests.get()) {
+ if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
+ bulkRequestBuilder.add((UpdateRequest)update);
+ } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
+ bulkRequestBuilder.add((DeleteRequest)update);
+ } else {
+ throw new IllegalStateException("Un-managed request type: " + update.getClass());
}
}
- refreshTime = System.currentTimeMillis() - refreshTime;
- } catch (InterruptedException e) {
- throw new IllegalStateException("ES update has been interrupted", e);
}
+ executorService.shutdown();
- basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
- (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
- StringUtils.join(types, ","),
- bcount, ecount, StringUtils.join(refreshes, ","));
+ LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions());
+
+ //execute the request
+ BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true));
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
+
+ private Map<String, Index> getIndexMap() {
+ Map<String, Index> indexes = new HashMap<String, Index>();
+ for (Index index : container.getComponentsByType(Index.class)) {
+ indexes.put(index.getIndexType(), index);
+ }
+ return indexes;
+ }
+
+
+//
+// int bcount = 0;
+// int ecount = 0;
+// List<String> refreshes = Lists.newArrayList();
+// Set<String> types = Sets.newHashSet();
+// long all_start = System.currentTimeMillis();
+// long indexTime;
+// long refreshTime;
+// long embeddedTime;
+//
+// if (actions.size() == 1) {
+// /* Atomic update here */
+// CountDownLatch latch = new CountDownLatch(1);
+// IndexAction action = actions.get(0);
+// action.setLatch(latch);
+// try {
+// indexTime = System.currentTimeMillis();
+// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
+// if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
+// }
+// bcount++;
+// indexTime = System.currentTimeMillis() - indexTime;
+// // refresh the index.
+// Index<?, ?, ?> index = action.getIndex();
+// if (index != null) {
+// refreshTime = System.currentTimeMillis();
+// index.refresh();
+// refreshTime = System.currentTimeMillis() - refreshTime;
+// refreshes.add(index.getIndexName());
+// }
+// types.add(action.getPayloadClass().getSimpleName());
+// } catch (InterruptedException e) {
+// throw new IllegalStateException("ES update has been interrupted", e);
+// }
+// } else if (actions.size() > 1) {
+// StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
+//
+// /* Purge actions that would be overridden */
+// Long purgeStart = System.currentTimeMillis();
+// List<IndexAction> itemActions = Lists.newArrayList();
+// List<IndexAction> embeddedActions = Lists.newArrayList();
+//
+// for (IndexAction action : actions) {
+// if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
+// embeddedActions.add(action);
+// } else {
+// itemActions.add(action);
+// }
+// }
+//
+// LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
+// actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
+//
+// try {
+// /* execute all item actions */
+// CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
+// indexTime = System.currentTimeMillis();
+// for (IndexAction action : itemActions) {
+// action.setLatch(itemLatch);
+// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
+// types.add(action.getPayloadClass().getSimpleName());
+// bcount++;
+//
+// }
+// if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
+// }
+// indexTime = System.currentTimeMillis() - indexTime;
+//
+// /* and now push the embedded */
+// CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
+// embeddedTime = System.currentTimeMillis();
+// for (IndexAction action : embeddedActions) {
+// action.setLatch(embeddedLatch);
+// this.offer(action, TIMEOUT, TimeUnit.SECONDS);
+// types.add(action.getPayloadClass().getSimpleName());
+// ecount++;
+// }
+// if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+// throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
+// }
+// embeddedTime = System.currentTimeMillis() - embeddedTime;
+//
+// /* Finally refresh affected indexes */
+// Set<String> refreshedIndexes = new HashSet<String>();
+// refreshTime = System.currentTimeMillis();
+// for (IndexAction action : actions) {
+// if (action.getIndex() != null &&
+// !refreshedIndexes.contains(action.getIndex().getIndexName())) {
+// refreshedIndexes.add(action.getIndex().getIndexName());
+// action.getIndex().refresh();
+// refreshes.add(action.getIndex().getIndexName());
+// }
+// }
+// refreshTime = System.currentTimeMillis() - refreshTime;
+// } catch (InterruptedException e) {
+// throw new IllegalStateException("ES update has been interrupted", e);
+// }
+//
+// basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
+// (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
+// StringUtils.join(types, ","),
+// bcount, ecount, StringUtils.join(refreshes, ","));
+// }
+
+
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java
deleted file mode 100644
index c2948b717d9..00000000000
--- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search;
-
-import org.picocontainer.Startable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonar.api.ServerComponent;
-import org.sonar.server.search.action.IndexAction;
-
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class IndexQueueWorker extends ThreadPoolExecutor
- implements ServerComponent, Startable {
-
- private static final Logger LOG = LoggerFactory.getLogger(IndexQueueWorker.class);
-
- private final IndexClient indexes;
-
- public IndexQueueWorker(IndexQueue queue, IndexClient indexes) {
- super(1,1, 0L, TimeUnit.MILLISECONDS, queue);
- this.indexes = indexes;
- }
-
- protected void beforeExecute(Thread t, Runnable r) {
- LOG.debug("Starting task: {}", r);
- super.beforeExecute(t, r);
- if (IndexAction.class.isAssignableFrom(r.getClass())) {
- IndexAction ia = (IndexAction) r;
- LOG.debug("Task is an IndexAction for {}", ia.getIndexType());
- ia.setIndex(indexes.getByType(ia.getIndexType()));
- }
- }
-
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- if (t != null) {
- throw new IllegalStateException(t);
- }
- }
-
- @Override
- public void start() {
- this.prestartAllCoreThreads();
- }
-
- @Override
- public void stop() {
- this.shutdown();
- }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java b/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java
index 1810cee8cfd..0c919160eda 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
@@ -36,6 +35,8 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.sonar.api.config.Settings;
import org.sonar.core.profiling.Profiling;
import org.sonar.core.profiling.StopWatch;
@@ -45,6 +46,8 @@ import org.sonar.core.profiling.StopWatch;
*/
public class SearchClient extends TransportClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SearchClient.class);
+
private static final String DEFAULT_HEALTH_TIMEOUT = "30s";
private final Settings settings;
@@ -114,10 +117,10 @@ public class SearchClient extends TransportClient {
public <K extends ActionResponse> K execute(ActionRequestBuilder request) {
StopWatch fullProfile = profiling.start("search", Profiling.Level.FULL);
- ListenableActionFuture acc = request.execute();
+ K response = null;
try {
- K response = (K) acc.get();
+ response = (K) request.get();
if (profiling.isProfilingEnabled(Profiling.Level.BASIC)) {
if (ToXContent.class.isAssignableFrom(request.getClass())) {
@@ -144,7 +147,9 @@ public class SearchClient extends TransportClient {
}
return response;
} catch (Exception e) {
+ LOGGER.error("could not execute request: " + response);
throw new IllegalStateException("ES error: ", e);
+
}
}
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java
index 3f09d708cbd..8b801a45b72 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java
@@ -19,43 +19,39 @@
*/
package org.sonar.server.search.action;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.client.Requests;
import org.sonar.core.persistence.Dto;
+import org.sonar.server.search.Index;
-public class DtoIndexAction<E extends Dto> extends IndexAction {
+import java.util.ArrayList;
+import java.util.List;
- private final E item;
- private final E[] items;
+public class DeleteDto<DTO extends Dto> extends IndexActionRequest {
- public DtoIndexAction(String indexType, Method method, E item, E... items) {
- super(indexType, method);
- this.item = item;
- this.items = items;
+ private final DTO dto;
+
+ public DeleteDto(String indexType, DTO dto) {
+ super(indexType);
+ this.dto = dto;
}
@Override
- public Class<?> getPayloadClass() {
- return item.getClass();
+ public String getKey() {
+ return dto.getKey().toString();
}
@Override
- public String getKey() {
- return item.getKey().toString();
+ public Class<?> getPayloadClass() {
+ return dto.getClass();
}
@Override
- public void doExecute() {
- try {
- if (this.getMethod().equals(Method.DELETE)) {
- index.deleteByDto(this.item, this.items);
- } else if (this.getMethod().equals(Method.UPSERT)) {
- index.upsert(this.item, this.items);
- }
- } catch (Exception e) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
- " cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() +
- " as " + this.getIndexType() +
- " on key: " + this.item.getKey(), e);
- }
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> requests = new ArrayList<ActionRequest>();
+ requests.add(Requests.deleteRequest(index.getIndexName())
+ .id(dto.getKey().toString())
+ .type(indexType));
+ return requests;
}
}
-
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java
index 3e23d5514e8..adc15a9a9a5 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java
@@ -19,42 +19,40 @@
*/
package org.sonar.server.search.action;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.client.Requests;
+import org.sonar.server.search.Index;
+
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
-public class KeyIndexAction<K extends Serializable> extends IndexAction {
+public class DeleteKey<K extends Serializable> extends IndexActionRequest {
private final K key;
- private final K[] keys;
- public KeyIndexAction(String indexType, Method method, K key, K... keys) {
- super(indexType, method);
+ public DeleteKey(String indexType, K key) {
+ super(indexType);
this.key = key;
- this.keys = keys;
}
@Override
- public Class<?> getPayloadClass() {
- return String.class;
+ public String getKey() {
+ return key.toString();
}
@Override
- public String getKey() {
- return this.key.toString();
+ public Class<?> getPayloadClass() {
+ throw new IllegalStateException("Deletion by key does not have an object payload!");
}
@Override
- public void doExecute() {
- try {
- if (this.getMethod().equals(Method.DELETE)) {
- index.deleteByKey(this.key, this.keys);
- } else if (this.getMethod().equals(Method.UPSERT)) {
- throw new IllegalStateException("Upsert by Key is not supported anymore");
- }
- } catch (Exception e) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
- "cannot execute " + this.getMethod() + " for " + this.key.getClass().getSimpleName() +
- " on type: " + this.getIndexType() +
- " on key: " + this.key, e);
- }
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> requests = new ArrayList<ActionRequest>();
+ requests.add(Requests.deleteRequest(index.getIndexName())
+ .id(getKey())
+ .type(indexType));
+ return requests;
}
+
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java
index 728ea8ece6c..15555a4481b 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java
@@ -19,18 +19,21 @@
*/
package org.sonar.server.search.action;
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.server.search.Index;
+
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
-public class EmbeddedIndexAction<K extends Serializable> extends IndexAction {
+public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest {
private final K key;
private final Object item;
private final Object[] items;
- public EmbeddedIndexAction(String indexType, Method method, K key, Object item, Object... items) {
- super(indexType, method);
- this.indexType = indexType;
- this.method = method;
+ public DeleteNestedItem(String indexType, K key, Object item, Object... items) {
+ super(indexType);
this.key = key;
this.item = item;
this.items = items;
@@ -47,19 +50,16 @@ public class EmbeddedIndexAction<K extends Serializable> extends IndexAction {
}
@Override
- public void doExecute() {
-
- try {
- if (this.getMethod().equals(Method.DELETE)) {
- index.delete(this.key, this.item, this.items);
- } else if (this.getMethod().equals(Method.UPSERT)) {
- index.upsert(this.key, this.item, this.items);
- }
- } catch (Exception e) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
- "cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() +
- " as " + this.getIndexType() +
- " on key: " + this.key, e);
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> updates = new ArrayList<ActionRequest>();
+ updates.addAll(deleteItem(index, item, key));
+ for (Object otherItem : items) {
+ updates.addAll(deleteItem(index, otherItem, key));
}
+ return updates;
+ }
+
+ private List<ActionRequest> deleteItem(Index index, Object item, K key) {
+ return index.getNormalizer().deleteNested(item, key);
}
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
new file mode 100644
index 00000000000..889cf7a130c
--- /dev/null
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java
@@ -0,0 +1,72 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
+
+ protected final String indexType;
+ private Index index;
+
+ public IndexActionRequest(String indexType) {
+ super();
+ this.indexType = indexType;
+ }
+
+ public abstract String getKey();
+
+ public abstract Class<?> getPayloadClass();
+
+ public String getIndexType() {
+ return indexType;
+ }
+
+
+ public void setIndex(Index index) {
+ this.index = index;
+ }
+
+ @Override
+ public final List<ActionRequest> call() throws Exception {
+ if (index == null) {
+ throw new IllegalStateException("Cannot execute request - Index is null");
+ }
+ List<ActionRequest> finalRequests = new ArrayList<ActionRequest>();
+ for (ActionRequest request : doCall(index)) {
+ if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
+ ((UpdateRequest) request)
+ .type(index.getIndexType())
+ .index(index.getIndexName());
+ }
+ finalRequests.add(request);
+ }
+ return finalRequests;
+ }
+
+ public abstract List<ActionRequest> doCall(Index index) throws Exception;
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java
index c2001b4257c..1e31a35d788 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java
@@ -19,47 +19,33 @@
*/
package org.sonar.server.search.action;
-import org.sonar.core.cluster.QueueAction;
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.core.persistence.Dto;
import org.sonar.server.search.Index;
-public abstract class IndexAction extends QueueAction {
+import java.util.List;
+public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
- public abstract Class<?> getPayloadClass();
+ private final DTO dto;
- public enum Method {
- UPSERT, DELETE
+ public UpsertDto(String indexType, DTO dto) {
+ super(indexType);
+ this.dto = dto;
}
- protected String indexType;
- protected Method method;
- protected Index index;
-
- public IndexAction(String indexType, Method method) {
- super();
- this.indexType = indexType;
- this.method = method;
- }
-
- public abstract String getKey();
-
- public Method getMethod() {
- return this.method;
- }
-
- public String getIndexType() {
- return indexType;
- }
-
- public void setMethod(Method method) {
- this.method = method;
+ @Override
+ public String getKey() {
+ return dto.getKey().toString();
}
- public void setIndex(Index index) {
- this.index = index;
+ @Override
+ public Class<?> getPayloadClass() {
+ return dto.getClass();
}
- public Index getIndex() {
- return index;
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ return index.getNormalizer().normalize(dto);
}
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java
new file mode 100644
index 00000000000..e87fe604e86
--- /dev/null
+++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java
@@ -0,0 +1,66 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest {
+
+
+ private final K key;
+ private final Object item;
+ private final Object[] items;
+
+ public UpsertNestedItem(String indexType, K key, Object item, Object... items) {
+ super(indexType);
+ this.key = key;
+ this.item = item;
+ this.items = items;
+ }
+
+ @Override
+ public String getKey() {
+ return key.toString();
+ }
+
+ @Override
+ public Class<?> getPayloadClass() {
+ return item.getClass();
+ }
+
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> updates = new ArrayList<ActionRequest>();
+ updates.addAll(normalizeItem(index, item, key));
+ for (Object otherItem : items) {
+ updates.addAll(normalizeItem(index, otherItem, key));
+ }
+ return updates;
+ }
+
+ private List<ActionRequest> normalizeItem(Index index, Object item, K key) {
+ return index.getNormalizer().normalizeNested(item, key);
+ }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java b/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java
deleted file mode 100644
index 6de0df6a978..00000000000
--- a/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.es;
-
-import com.google.common.collect.ImmutableSet;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.xcontent.support.XContentMapValues;
-import org.elasticsearch.script.AbstractExecutableScript;
-import org.elasticsearch.script.ExecutableScript;
-import org.elasticsearch.script.NativeScriptFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * @since 4.4
- */
-public class ListUpdate extends AbstractExecutableScript {
-
- public static final String NAME = "listUpdate";
-
- public static final String ID_FIELD = "idField";
- public static final String ID_VALUE = "idValue";
- public static final String FIELD = "field";
- public static final String VALUE = "value";
-
- public static class UpdateListScriptFactory implements NativeScriptFactory {
- @Override
- public ExecutableScript newScript(@Nullable Map<String, Object> params) {
- String idField = XContentMapValues.nodeStringValue(params.get(ID_FIELD), null);
- String idValue = XContentMapValues.nodeStringValue(params.get(ID_VALUE), null);
- String field = XContentMapValues.nodeStringValue(params.get(FIELD), null);
- Map value = null;
- if (idField == null) {
- throw new IllegalStateException("Missing '" + ID_FIELD + "' parameter");
- }
- if (idValue == null) {
- throw new IllegalStateException("Missing '" + ID_VALUE + "' parameter");
- }
- if (field == null) {
- throw new IllegalStateException("Missing '" + FIELD + "' parameter");
- }
-
- //NULL case is deletion of nested item
- if (params.containsKey(VALUE)) {
- Object obj = params.get(VALUE);
- if (obj != null) {
- value = XContentMapValues.nodeMapValue(params.get(VALUE), "Update item");
- }
- }
-
- return new ListUpdate(idField, idValue, field, value);
- }
- }
-
-
- private final String idField;
- private final String idValue;
- private final String field;
- private final Map<String, Object> value;
-
- private Map<String, Object> ctx;
-
- public ListUpdate(String idField, String idValue, String field, Map<String, Object> value) {
- this.idField = idField;
- this.idValue = idValue;
- this.field = field;
- this.value = value;
- }
-
- @Override
- public void setNextVar(String name, Object value) {
- if (name.equals("ctx")) {
- ctx = (Map<String, Object>) value;
- }
- }
-
- @Override
- public Object unwrap(Object value) {
- return value;
- }
-
- @Override
- public Object run() {
- try {
- //Get the Document's source from ctx
- Map<String, Object> source = XContentMapValues.nodeMapValue(ctx.get("_source"), "source from context");
-
- //Get the Object for list update
- Object fieldValue = source.get(field);
-
- if (fieldValue == null && value != null) {
- // 0. The field does not exist (this is a upsert then)
- source.put(field, value);
- } else if (!XContentMapValues.isArray(fieldValue) && value != null) {
- // 1. The field is not yet a list
- Map currentFieldValue = XContentMapValues.nodeMapValue(fieldValue, "current FieldValue");
- if (XContentMapValues.nodeStringValue(currentFieldValue.get(idField), null).equals(idValue)) {
- source.put(field, value);
- } else {
- source.put(field, ImmutableSet.of(fieldValue, value));
- }
- } else {
- // 3. field is a list
- Collection items = (Collection) fieldValue;
- Object target = null;
- for (Object item : items) {
- Map<String, Object> fields = (Map<String, Object>) item;
- String itemIdValue = XContentMapValues.nodeStringValue(fields.get(idField), null);
- if (itemIdValue != null && itemIdValue.equals(idValue)) {
- target = item;
- break;
- }
- }
- if (target != null) {
- items.remove(target);
- }
-
- //Supporting the update by NULL = deletion case
- if (value != null) {
- items.add(value);
- }
- source.put(field, items);
- }
- } catch (Exception e) {
- throw new IllegalStateException("failed to execute listUpdate script", e);
- }
- return null;
-
- }
-}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java b/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java
index 92daecaef82..e92fd5b936f 100644
--- a/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java
+++ b/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java
@@ -29,7 +29,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.sonar.api.config.Settings;
-import org.sonar.core.cluster.NullQueue;
import org.sonar.process.MonitoredProcess;
import org.sonar.process.NetworkUtils;
import org.sonar.process.Props;
@@ -107,7 +106,7 @@ public class BaseIndexTest {
private BaseIndex getIndex(final SearchClient searchClient) {
BaseIndex index = new BaseIndex(
IndexDefinition.TEST,
- null, new NullQueue(), searchClient) {
+ null, searchClient) {
@Override
protected String getKeyValue(Serializable key) {
return null;
diff --git a/sonar-core/pom.xml b/sonar-core/pom.xml
index bad0f4f5503..34c5d976e5b 100644
--- a/sonar-core/pom.xml
+++ b/sonar-core/pom.xml
@@ -184,6 +184,11 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.1.2</version>
+ </dependency>
</dependencies>
<build>
diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java b/sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java
index f83078d3bb5..bac9518865f 100644
--- a/sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java
+++ b/sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java
@@ -19,27 +19,10 @@
*/
package org.sonar.core.cluster;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
-public abstract class QueueAction implements Runnable {
-
- protected CountDownLatch latch;
-
- public QueueAction() {
- this.latch = null;
- }
-
- public void setLatch(CountDownLatch latch){
- this.latch = latch;
- }
-
- public abstract void doExecute();
+public interface ClusterAction<K> extends Callable<K> {
@Override
- public void run(){
- this.doExecute();
- if (latch != null){
- latch.countDown();
- }
- }
+ public K call() throws Exception;
}
diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java b/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java
index 235d2078fef..cccdf7033bc 100644
--- a/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java
+++ b/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java
@@ -21,15 +21,10 @@ package org.sonar.core.cluster;
import java.util.List;
-public class NullQueue implements WorkQueue<QueueAction> {
+public class NullQueue implements WorkQueue<ClusterAction> {
@Override
- public void enqueue(QueueAction action) {
- // do nothing
- }
-
- @Override
- public void enqueue(List<QueueAction> actions) {
+ public void enqueue(List<ClusterAction> actions) {
// do nothing
}
}
diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java b/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java
index 9b4141f510d..9d5cf998564 100644
--- a/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java
+++ b/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java
@@ -20,10 +20,9 @@
package org.sonar.core.cluster;
import java.util.List;
+import java.util.concurrent.Callable;
-public interface WorkQueue<K extends QueueAction> {
-
- void enqueue(K action);
+public interface WorkQueue<K extends Callable> {
void enqueue(List<K> actions);
diff --git a/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java b/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java
index 10e82b803cc..b663b22e35e 100644
--- a/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java
+++ b/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java
@@ -24,7 +24,7 @@ import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
-import org.sonar.core.cluster.QueueAction;
+import org.sonar.core.cluster.ClusterAction;
import org.sonar.core.cluster.WorkQueue;
import java.sql.Connection;
@@ -34,8 +34,8 @@ import java.util.Map;
public class DbSession implements SqlSession {
- private static final Integer IMPLICIT_COMMIT_SIZE = 200;
- private List<QueueAction> actions;
+ private static final Integer IMPLICIT_COMMIT_SIZE = 1000;
+ private List<ClusterAction> actions;
private WorkQueue queue;
private SqlSession session;
@@ -43,10 +43,10 @@ public class DbSession implements SqlSession {
DbSession(WorkQueue queue, SqlSession session) {
this.session = session;
this.queue = queue;
- this.actions = new ArrayList<QueueAction>();
+ this.actions = new ArrayList<ClusterAction>();
}
- public void enqueue(QueueAction action) {
+ public void enqueue(ClusterAction action) {
this.actions.add(action);
if (this.actions.size() > IMPLICIT_COMMIT_SIZE) {
this.commit();