diff options
author | Stephane Gamard <stephane.gamard@searchbox.com> | 2014-08-20 16:22:33 +0200 |
---|---|---|
committer | Stephane Gamard <stephane.gamard@searchbox.com> | 2014-08-20 18:18:38 +0200 |
commit | 8e32f1abcb4858f867936a44d66a7030324ae828 (patch) | |
tree | f1957d7a300ad43ef5947bab9502f8b9dd643d3f | |
parent | 008c1d58cc8e0b6fa077f85a18148233db289a04 (diff) | |
download | sonarqube-8e32f1abcb4858f867936a44d66a7030324ae828.tar.gz sonarqube-8e32f1abcb4858f867936a44d66a7030324ae828.zip |
Updated indexQueue mechanism for performance (no compression yet).
28 files changed, 446 insertions, 624 deletions
@@ -1,6 +1,6 @@ #!/bin/sh -mvn clean install -DskipTests -pl :sonar-process -amd +mvn clean install -DskipTests -pl :sonar-core,sonar-search -amd if [[ "$OSTYPE" == "darwin"* ]]; then OS='macosx-universal-64' diff --git a/server/sonar-search/src/main/resources/org/sonar/search/logback.xml b/server/sonar-search/src/main/resources/org/sonar/search/logback.xml index 5f714eec3e7..648ece82e28 100644 --- a/server/sonar-search/src/main/resources/org/sonar/search/logback.xml +++ b/server/sonar-search/src/main/resources/org/sonar/search/logback.xml @@ -28,7 +28,7 @@ </appender> <root> - <level value="DEBUG"/> + <level value="INFO"/> <appender-ref ref="LOGFILE"/> </root> diff --git a/server/sonar-server/pom.xml b/server/sonar-server/pom.xml index e1b8f74bb22..4790413b37c 100644 --- a/server/sonar-server/pom.xml +++ b/server/sonar-server/pom.xml @@ -243,7 +243,6 @@ <groupId>org.codehaus.sonar</groupId> <artifactId>sonar-search</artifactId> <version>${project.version}</version> - <scope>test</scope> </dependency> </dependencies> diff --git a/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java b/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java index 01ffc420f82..73c538333c5 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java +++ b/server/sonar-server/src/main/java/org/sonar/server/activity/index/ActivityIndex.java @@ -33,13 +33,12 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.sort.SortOrder; import org.sonar.core.activity.Activity; import org.sonar.core.activity.db.ActivityDto; -import org.sonar.core.cluster.WorkQueue; import org.sonar.server.search.BaseIndex; -import org.sonar.server.search.SearchClient; import org.sonar.server.search.IndexDefinition; import org.sonar.server.search.IndexField; import org.sonar.server.search.QueryOptions; import org.sonar.server.search.Result; +import org.sonar.server.search.SearchClient; import javax.annotation.Nullable; import java.io.IOException; @@ -51,8 +50,8 @@ import java.util.Map; */ public class ActivityIndex extends BaseIndex<Activity, ActivityDto, String> { - public ActivityIndex(ActivityNormalizer normalizer, WorkQueue workQueue, SearchClient node) { - super(IndexDefinition.LOG, normalizer, workQueue, node); + public ActivityIndex(ActivityNormalizer normalizer, SearchClient node) { + super(IndexDefinition.LOG, normalizer, node); } @Override diff --git a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java index 5637b742d43..214ae8d49db 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java +++ b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java @@ -26,10 +26,10 @@ import org.sonar.core.persistence.DbSession; import org.sonar.core.persistence.Dto; import org.sonar.server.exceptions.NotFoundException; import org.sonar.server.search.IndexDefinition; -import org.sonar.server.search.action.DtoIndexAction; -import org.sonar.server.search.action.EmbeddedIndexAction; -import org.sonar.server.search.action.IndexAction; -import org.sonar.server.search.action.KeyIndexAction; +import org.sonar.server.search.action.DeleteKey; +import org.sonar.server.search.action.DeleteNestedItem; +import org.sonar.server.search.action.UpsertDto; +import org.sonar.server.search.action.UpsertNestedItem; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -176,7 +176,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple item.setUpdatedAt(now); doUpdate(session, item); if (hasIndex()) { - session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item)); + session.enqueue(new UpsertDto(getIndexType(), item)); } } catch (Exception e) { throw new IllegalStateException("Fail to update item in db: " + item, e); @@ -216,7 +216,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple try { doInsert(session, item); if (hasIndex()) { - session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item)); + session.enqueue(new UpsertDto<E>(getIndexType(), item)); } } catch (Exception e) { throw new IllegalStateException("Fail to insert item in db: " + item, e.getCause()); @@ -249,7 +249,7 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple try { doDeleteByKey(session, key); if (hasIndex()) { - session.enqueue(new KeyIndexAction<K>(getIndexType(), IndexAction.Method.DELETE, key)); + session.enqueue(new DeleteKey<K>(getIndexType(), key)); } } catch (Exception e) { throw new IllegalStateException("Fail to delete item from db: " + key, e); @@ -258,31 +258,29 @@ public abstract class BaseDao<M, E extends Dto<K>, K extends Serializable> imple protected final void enqueueUpdate(Object nestedItem, K key, DbSession session) { if (hasIndex()) { - session.enqueue(new EmbeddedIndexAction<K>( - this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem)); + session.enqueue(new UpsertNestedItem<K>( + this.getIndexType(), key, nestedItem)); } } public void enqueueDelete(Object nestedItem, K key, DbSession session) { if (hasIndex()) { - session.enqueue(new EmbeddedIndexAction<K>( - this.getIndexType(), IndexAction.Method.DELETE, key, nestedItem)); + session.enqueue(new DeleteNestedItem<K>( + this.getIndexType(), key, nestedItem)); session.commit(); } } public void enqueueInsert(Object nestedItem, K key, DbSession session) { if (hasIndex()) { - session.enqueue(new EmbeddedIndexAction<K>( - this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem)); + this.enqueueUpdate(nestedItem, key, session); } } @Override public final void synchronizeAfter(final DbSession session, Date date) { for (E dto : this.findAfterDate(session, date)) { - session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, - dto)); + session.enqueue(new UpsertDto<E>(getIndexType(), dto)); } session.commit(); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java b/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java index 4cfce8e9a81..fafdd32b722 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java +++ b/server/sonar-server/src/main/java/org/sonar/server/platform/ServerComponents.java @@ -220,7 +220,7 @@ class ServerComponents { ActiveRuleNormalizer.class, RuleIndex.class, ActiveRuleIndex.class, - IndexQueueWorker.class, + //IndexQueueWorker.class, IndexClient.class, ActivityNormalizer.class, ActivityIndex.class, diff --git a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java index e71647d3415..b12c5f8d8ca 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java +++ b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndex.java @@ -33,16 +33,15 @@ import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.sonar.api.rule.RuleKey; import org.sonar.api.rule.RuleStatus; -import org.sonar.core.cluster.WorkQueue; import org.sonar.core.qualityprofile.db.ActiveRuleDto; import org.sonar.core.qualityprofile.db.ActiveRuleKey; import org.sonar.server.qualityprofile.ActiveRule; import org.sonar.server.rule.index.RuleNormalizer; import org.sonar.server.search.BaseIndex; -import org.sonar.server.search.SearchClient; import org.sonar.server.search.FacetValue; import org.sonar.server.search.IndexDefinition; import org.sonar.server.search.IndexField; +import org.sonar.server.search.SearchClient; import java.io.IOException; import java.util.ArrayList; @@ -52,8 +51,8 @@ import java.util.Map; public class ActiveRuleIndex extends BaseIndex<ActiveRule, ActiveRuleDto, ActiveRuleKey> { - public ActiveRuleIndex(ActiveRuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) { - super(IndexDefinition.ACTIVE_RULE, normalizer, workQueue, node); + public ActiveRuleIndex(ActiveRuleNormalizer normalizer, SearchClient node) { + super(IndexDefinition.ACTIVE_RULE, normalizer, node); } @Override diff --git a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java index 8ece6b6eca8..3f593744058 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java +++ b/server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleNormalizer.java @@ -28,13 +28,13 @@ import org.sonar.core.qualityprofile.db.ActiveRuleDto; import org.sonar.core.qualityprofile.db.ActiveRuleKey; import org.sonar.core.qualityprofile.db.ActiveRuleParamDto; import org.sonar.core.qualityprofile.db.QualityProfileDto; +import org.sonar.search.script.ListUpdate; import org.sonar.server.db.DbClient; import org.sonar.server.qualityprofile.ActiveRule; import org.sonar.server.search.BaseNormalizer; import org.sonar.server.search.IndexDefinition; import org.sonar.server.search.IndexField; import org.sonar.server.search.Indexable; -import org.sonar.server.search.es.ListUpdate; import java.lang.reflect.Field; import java.util.ArrayList; diff --git a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java index a80f4d13ef6..53b6ca26d24 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java +++ b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndex.java @@ -44,16 +44,15 @@ import org.elasticsearch.search.sort.SortOrder; import org.sonar.api.rule.RuleKey; import org.sonar.api.rule.RuleStatus; import org.sonar.api.server.debt.DebtCharacteristic; -import org.sonar.core.cluster.WorkQueue; import org.sonar.core.rule.RuleDto; import org.sonar.server.qualityprofile.index.ActiveRuleNormalizer; import org.sonar.server.rule.Rule; import org.sonar.server.search.BaseIndex; -import org.sonar.server.search.SearchClient; import org.sonar.server.search.IndexDefinition; import org.sonar.server.search.IndexField; import org.sonar.server.search.QueryOptions; import org.sonar.server.search.Result; +import org.sonar.server.search.SearchClient; import javax.annotation.CheckForNull; import java.io.IOException; @@ -69,8 +68,8 @@ import static com.google.common.collect.Lists.newArrayList; public class RuleIndex extends BaseIndex<Rule, RuleDto, RuleKey> { - public RuleIndex(RuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) { - super(IndexDefinition.RULE, normalizer, workQueue, node); + public RuleIndex(RuleNormalizer normalizer, SearchClient client) { + super(IndexDefinition.RULE, normalizer, client); } protected String getKeyValue(RuleKey key) { diff --git a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java index 9071d30faf2..3a347255d7d 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java +++ b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java @@ -33,12 +33,12 @@ import org.sonar.core.rule.RuleDto; import org.sonar.core.rule.RuleParamDto; import org.sonar.core.technicaldebt.db.CharacteristicDto; import org.sonar.markdown.Markdown; +import org.sonar.search.script.ListUpdate; import org.sonar.server.db.DbClient; import org.sonar.server.search.BaseNormalizer; import org.sonar.server.search.IndexDefinition; import org.sonar.server.search.IndexField; import org.sonar.server.search.Indexable; -import org.sonar.server.search.es.ListUpdate; import java.lang.reflect.Field; import java.util.ArrayList; diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java b/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java index 5abddd2db69..f4baaf004b8 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java @@ -23,19 +23,13 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountRequestBuilder; import org.elasticsearch.action.count.CountResponse; -import org.elasticsearch.action.delete.DeleteRequestBuilder; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequestBuilder; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolFilterBuilder; @@ -53,15 +47,19 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCou import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sonar.core.cluster.WorkQueue; import org.sonar.core.persistence.Dto; import javax.annotation.Nullable; - import java.io.IOException; import java.io.Serializable; -import java.util.*; -import java.util.concurrent.ExecutionException; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Queue; public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable> implements Index<DOMAIN, DTO, KEY> { @@ -72,13 +70,16 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial private final BaseNormalizer<DTO, KEY> normalizer; private final IndexDefinition indexDefinition; - protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer, - WorkQueue workQueue, SearchClient client) { + protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer, SearchClient client) { this.normalizer = normalizer; this.client = client; this.indexDefinition = indexDefinition; } + public BaseNormalizer<DTO, KEY> getNormalizer() { + return normalizer; + } + @Override public final String getIndexName() { return this.indexDefinition.getIndexName(); @@ -390,105 +391,6 @@ public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serial return null; } - protected void updateDocument(Collection<UpdateRequest> requests, KEY key) { - LOG.debug("UPDATE _id:{} in index {}", key, this.getIndexName()); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - for (UpdateRequest request : requests) { - // if request has no ID then no upsert possible! - if (request.id() == null || request.id().isEmpty()) { - bulkRequest.add(new IndexRequest() - .source(request.doc().sourceAsMap()) - .type(this.getIndexType()) - .index(this.getIndexName())); - } else { - bulkRequest.add(request - .id(this.getKeyValue(key)) - .index(this.getIndexName()) - .type(this.getIndexType())); - } - } - BulkResponse response = client.execute(bulkRequest); - } - - @Override - public void upsert(KEY key, Object object, Object... objects) throws Exception { - long t0 = System.currentTimeMillis(); - List<UpdateRequest> requests = this.normalizer.normalizeNested(object, key); - for (Object additionalObject : objects) { - requests.addAll(this.normalizer.normalizeNested(additionalObject, key)); - } - long t1 = System.currentTimeMillis(); - this.updateDocument(requests, key); - long t2 = System.currentTimeMillis(); - LOG.debug("UPSERT [object] time:{}ms ({}ms normalize, {}ms elastic)", - t2 - t0, t1 - t0, t2 - t1); - } - - @Override - public void upsert(DTO item, DTO... items) { - try { - long t0 = System.currentTimeMillis(); - List<UpdateRequest> requests = normalizer.normalize(item); - for (DTO additionalItem : items) { - requests.addAll(normalizer.normalize(additionalItem)); - } - long t1 = System.currentTimeMillis(); - this.updateDocument(requests, item.getKey()); - long t2 = System.currentTimeMillis(); - LOG.debug("UPSERT [dto] time:{}ms ({}ms normalize, {}ms elastic)", - t2 - t0, t1 - t0, t2 - t1); - } catch (Exception e) { - LOG.error("Could not update document for index {}: {}", - this.getIndexName(), e.getMessage(), e); - } - } - - private void deleteDocument(KEY key) throws ExecutionException, InterruptedException { - LOG.debug("DELETE _id:{} in index {}", key, this.getIndexName()); - DeleteRequestBuilder request = client - .prepareDelete() - .setIndex(this.getIndexName()) - .setType(this.getIndexType()) - .setId(this.getKeyValue(key)); - DeleteResponse response = client.execute(request); - } - - @Override - public void delete(KEY key, Object object, Object... objects) throws Exception { - LOG.debug("DELETE NESTED _id:{} in index {}", key, this.getIndexName()); - List<UpdateRequest> requests = this.normalizer.deleteNested(object, key); - for (Object additionalObject : objects) { - requests.addAll(this.normalizer.deleteNested(additionalObject, key)); - } - this.updateDocument(requests, key); - } - - @Override - public void deleteByKey(KEY key, KEY... keys) { - try { - this.deleteDocument(key); - for (KEY additionalKey : keys) { - this.deleteDocument(additionalKey); - } - } catch (Exception e) { - throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s", - getKeyValue(key), getIndexName()), e); - } - } - - @Override - public void deleteByDto(DTO item, DTO... items) { - try { - this.deleteDocument(item.getKey()); - for (DTO additionalItem : items) { - this.deleteDocument(additionalItem.getKey()); - } - } catch (Exception e) { - throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s", - getKeyValue(item.getKey()), getIndexName()), e); - } - } - /* ES QueryHelper Methods */ protected BoolFilterBuilder addTermFilter(BoolFilterBuilder filter, String field, @Nullable Collection<String> values) { diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/Index.java b/server/sonar-server/src/main/java/org/sonar/server/search/Index.java index 08e8836c564..13a7a935c4e 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/Index.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/Index.java @@ -40,19 +40,11 @@ public interface Index<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable> e void refresh(); - void upsert(KEY key, Object object, Object... objects) throws Exception; - - void upsert(DTO dto, DTO... dtos); - - void delete(KEY key, Object object, Object... objects) throws Exception; - - void deleteByKey(KEY key, KEY... keys); - - void deleteByDto(DTO dto, DTO... dtos); - Date getLastSynchronization(); IndexStat getIndexStat(); Iterator<DOMAIN> scroll(String scrollId); + + BaseNormalizer<DTO, KEY> getNormalizer(); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java index 81cfdf6092f..89ddcb752d5 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java @@ -19,154 +19,201 @@ */ package org.sonar.server.search; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.commons.lang.StringUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonar.api.ServerComponent; import org.sonar.api.config.Settings; +import org.sonar.api.platform.ComponentContainer; import org.sonar.core.cluster.WorkQueue; import org.sonar.core.profiling.Profiling; -import org.sonar.core.profiling.StopWatch; -import org.sonar.server.search.action.EmbeddedIndexAction; -import org.sonar.server.search.action.IndexAction; +import org.sonar.server.search.action.IndexActionRequest; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; public class IndexQueue extends LinkedBlockingQueue<Runnable> - implements ServerComponent, WorkQueue<IndexAction> { + implements ServerComponent, WorkQueue<IndexActionRequest> { protected final Profiling profiling; + private final SearchClient searchClient; + private final ComponentContainer container; + private static final Logger LOGGER = LoggerFactory.getLogger(IndexQueue.class); private static final Integer DEFAULT_QUEUE_SIZE = 200; private static final int TIMEOUT = 30000; - public IndexQueue(Settings settings) { + public IndexQueue(Settings settings, SearchClient searchClient, ComponentContainer container) { super(DEFAULT_QUEUE_SIZE); + this.searchClient = searchClient; + this.container = container; this.profiling = new Profiling(settings); } @Override - public void enqueue(IndexAction action) { - this.enqueue(ImmutableList.of(action)); - } - - @Override - public void enqueue(List<IndexAction> actions) { - - - int bcount = 0; - int ecount = 0; - List<String> refreshes = Lists.newArrayList(); - Set<String> types = Sets.newHashSet(); - long all_start = System.currentTimeMillis(); - long indexTime; - long refreshTime; - long embeddedTime; - - if (actions.size() == 1) { - /* Atomic update here */ - CountDownLatch latch = new CountDownLatch(1); - IndexAction action = actions.get(0); - action.setLatch(latch); - try { - indexTime = System.currentTimeMillis(); - this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS); - if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms"); - } - bcount++; - indexTime = System.currentTimeMillis() - indexTime; - // refresh the index. - Index<?, ?, ?> index = action.getIndex(); - if (index != null) { - refreshTime = System.currentTimeMillis(); - index.refresh(); - refreshTime = System.currentTimeMillis() - refreshTime; - refreshes.add(index.getIndexName()); - } - types.add(action.getPayloadClass().getSimpleName()); - } catch (InterruptedException e) { - throw new IllegalStateException("ES update has been interrupted", e); - } - } else if (actions.size() > 1) { - StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC); - - /* Purge actions that would be overridden */ - Long purgeStart = System.currentTimeMillis(); - List<IndexAction> itemActions = Lists.newArrayList(); - List<IndexAction> embeddedActions = Lists.newArrayList(); - - for (IndexAction action : actions) { - if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) { - embeddedActions.add(action); - } else { - itemActions.add(action); - } - } + public void enqueue(List<IndexActionRequest> actions) { - LOGGER.debug("INDEX - compressed {} items into {} in {}ms,", - actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart); + if (actions.isEmpty()) { + return; + } + try { - try { - /* execute all item actions */ - CountDownLatch itemLatch = new CountDownLatch(itemActions.size()); - indexTime = System.currentTimeMillis(); - for (IndexAction action : itemActions) { - action.setLatch(itemLatch); - this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS); - types.add(action.getPayloadClass().getSimpleName()); - bcount++; + BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient); + Map<String,Index> indexes = getIndexMap(); + for (IndexActionRequest action : actions) { + action.setIndex(indexes.get(action.getIndexType())); + } - } - if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms"); - } - indexTime = System.currentTimeMillis() - indexTime; - - /* and now push the embedded */ - CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size()); - embeddedTime = System.currentTimeMillis(); - for (IndexAction action : embeddedActions) { - action.setLatch(embeddedLatch); - this.offer(action, TIMEOUT, TimeUnit.SECONDS); - types.add(action.getPayloadClass().getSimpleName()); - ecount++; - } - if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms"); - } - embeddedTime = System.currentTimeMillis() - embeddedTime; - - /* Finally refresh affected indexes */ - Set<String> refreshedIndexes = new HashSet<String>(); - refreshTime = System.currentTimeMillis(); - for (IndexAction action : actions) { - if (action.getIndex() != null && - !refreshedIndexes.contains(action.getIndex().getIndexName())) { - refreshedIndexes.add(action.getIndex().getIndexName()); - action.getIndex().refresh(); - refreshes.add(action.getIndex().getIndexName()); + ExecutorService executorService = Executors.newFixedThreadPool(4); + + //invokeAll() blocks until ALL tasks submitted to executor complete + for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) { + for (ActionRequest update : updateRequests.get()) { + if (UpdateRequest.class.isAssignableFrom(update.getClass())) { + bulkRequestBuilder.add((UpdateRequest)update); + } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) { + bulkRequestBuilder.add((DeleteRequest)update); + } else { + throw new IllegalStateException("Un-managed request type: " + update.getClass()); } } - refreshTime = System.currentTimeMillis() - refreshTime; - } catch (InterruptedException e) { - throw new IllegalStateException("ES update has been interrupted", e); } + executorService.shutdown(); - basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]", - (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime, - StringUtils.join(types, ","), - bcount, ecount, StringUtils.join(refreshes, ",")); + LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions()); + + //execute the request + BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true)); + } catch (Exception e) { + e.printStackTrace(); } } + + private Map<String, Index> getIndexMap() { + Map<String, Index> indexes = new HashMap<String, Index>(); + for (Index index : container.getComponentsByType(Index.class)) { + indexes.put(index.getIndexType(), index); + } + return indexes; + } + + +// +// int bcount = 0; +// int ecount = 0; +// List<String> refreshes = Lists.newArrayList(); +// Set<String> types = Sets.newHashSet(); +// long all_start = System.currentTimeMillis(); +// long indexTime; +// long refreshTime; +// long embeddedTime; +// +// if (actions.size() == 1) { +// /* Atomic update here */ +// CountDownLatch latch = new CountDownLatch(1); +// IndexAction action = actions.get(0); +// action.setLatch(latch); +// try { +// indexTime = System.currentTimeMillis(); +// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS); +// if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) { +// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms"); +// } +// bcount++; +// indexTime = System.currentTimeMillis() - indexTime; +// // refresh the index. +// Index<?, ?, ?> index = action.getIndex(); +// if (index != null) { +// refreshTime = System.currentTimeMillis(); +// index.refresh(); +// refreshTime = System.currentTimeMillis() - refreshTime; +// refreshes.add(index.getIndexName()); +// } +// types.add(action.getPayloadClass().getSimpleName()); +// } catch (InterruptedException e) { +// throw new IllegalStateException("ES update has been interrupted", e); +// } +// } else if (actions.size() > 1) { +// StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC); +// +// /* Purge actions that would be overridden */ +// Long purgeStart = System.currentTimeMillis(); +// List<IndexAction> itemActions = Lists.newArrayList(); +// List<IndexAction> embeddedActions = Lists.newArrayList(); +// +// for (IndexAction action : actions) { +// if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) { +// embeddedActions.add(action); +// } else { +// itemActions.add(action); +// } +// } +// +// LOGGER.debug("INDEX - compressed {} items into {} in {}ms,", +// actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart); +// +// try { +// /* execute all item actions */ +// CountDownLatch itemLatch = new CountDownLatch(itemActions.size()); +// indexTime = System.currentTimeMillis(); +// for (IndexAction action : itemActions) { +// action.setLatch(itemLatch); +// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS); +// types.add(action.getPayloadClass().getSimpleName()); +// bcount++; +// +// } +// if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) { +// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms"); +// } +// indexTime = System.currentTimeMillis() - indexTime; +// +// /* and now push the embedded */ +// CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size()); +// embeddedTime = System.currentTimeMillis(); +// for (IndexAction action : embeddedActions) { +// action.setLatch(embeddedLatch); +// this.offer(action, TIMEOUT, TimeUnit.SECONDS); +// types.add(action.getPayloadClass().getSimpleName()); +// ecount++; +// } +// if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) { +// throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms"); +// } +// embeddedTime = System.currentTimeMillis() - embeddedTime; +// +// /* Finally refresh affected indexes */ +// Set<String> refreshedIndexes = new HashSet<String>(); +// refreshTime = System.currentTimeMillis(); +// for (IndexAction action : actions) { +// if (action.getIndex() != null && +// !refreshedIndexes.contains(action.getIndex().getIndexName())) { +// refreshedIndexes.add(action.getIndex().getIndexName()); +// action.getIndex().refresh(); +// refreshes.add(action.getIndex().getIndexName()); +// } +// } +// refreshTime = System.currentTimeMillis() - refreshTime; +// } catch (InterruptedException e) { +// throw new IllegalStateException("ES update has been interrupted", e); +// } +// +// basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]", +// (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime, +// StringUtils.join(types, ","), +// bcount, ecount, StringUtils.join(refreshes, ",")); +// } + + } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java deleted file mode 100644 index c2948b717d9..00000000000 --- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueueWorker.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * SonarQube, open source software quality management tool. - * Copyright (C) 2008-2014 SonarSource - * mailto:contact AT sonarsource DOT com - * - * SonarQube is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * SonarQube is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.server.search; - -import org.picocontainer.Startable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.sonar.api.ServerComponent; -import org.sonar.server.search.action.IndexAction; - -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class IndexQueueWorker extends ThreadPoolExecutor - implements ServerComponent, Startable { - - private static final Logger LOG = LoggerFactory.getLogger(IndexQueueWorker.class); - - private final IndexClient indexes; - - public IndexQueueWorker(IndexQueue queue, IndexClient indexes) { - super(1,1, 0L, TimeUnit.MILLISECONDS, queue); - this.indexes = indexes; - } - - protected void beforeExecute(Thread t, Runnable r) { - LOG.debug("Starting task: {}", r); - super.beforeExecute(t, r); - if (IndexAction.class.isAssignableFrom(r.getClass())) { - IndexAction ia = (IndexAction) r; - LOG.debug("Task is an IndexAction for {}", ia.getIndexType()); - ia.setIndex(indexes.getByType(ia.getIndexType())); - } - } - - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - if (t != null) { - throw new IllegalStateException(t); - } - } - - @Override - public void start() { - this.prestartAllCoreThreads(); - } - - @Override - public void stop() { - this.shutdown(); - } -} diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java b/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java index 1810cee8cfd..0c919160eda 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/SearchClient.java @@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -36,6 +35,8 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.sonar.api.config.Settings; import org.sonar.core.profiling.Profiling; import org.sonar.core.profiling.StopWatch; @@ -45,6 +46,8 @@ import org.sonar.core.profiling.StopWatch; */ public class SearchClient extends TransportClient { + private static final Logger LOGGER = LoggerFactory.getLogger(SearchClient.class); + private static final String DEFAULT_HEALTH_TIMEOUT = "30s"; private final Settings settings; @@ -114,10 +117,10 @@ public class SearchClient extends TransportClient { public <K extends ActionResponse> K execute(ActionRequestBuilder request) { StopWatch fullProfile = profiling.start("search", Profiling.Level.FULL); - ListenableActionFuture acc = request.execute(); + K response = null; try { - K response = (K) acc.get(); + response = (K) request.get(); if (profiling.isProfilingEnabled(Profiling.Level.BASIC)) { if (ToXContent.class.isAssignableFrom(request.getClass())) { @@ -144,7 +147,9 @@ public class SearchClient extends TransportClient { } return response; } catch (Exception e) { + LOGGER.error("could not execute request: " + response); throw new IllegalStateException("ES error: ", e); + } } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java index 3f09d708cbd..8b801a45b72 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/DtoIndexAction.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteDTO.java @@ -19,43 +19,39 @@ */ package org.sonar.server.search.action; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.client.Requests; import org.sonar.core.persistence.Dto; +import org.sonar.server.search.Index; -public class DtoIndexAction<E extends Dto> extends IndexAction { +import java.util.ArrayList; +import java.util.List; - private final E item; - private final E[] items; +public class DeleteDto<DTO extends Dto> extends IndexActionRequest { - public DtoIndexAction(String indexType, Method method, E item, E... items) { - super(indexType, method); - this.item = item; - this.items = items; + private final DTO dto; + + public DeleteDto(String indexType, DTO dto) { + super(indexType); + this.dto = dto; } @Override - public Class<?> getPayloadClass() { - return item.getClass(); + public String getKey() { + return dto.getKey().toString(); } @Override - public String getKey() { - return item.getKey().toString(); + public Class<?> getPayloadClass() { + return dto.getClass(); } @Override - public void doExecute() { - try { - if (this.getMethod().equals(Method.DELETE)) { - index.deleteByDto(this.item, this.items); - } else if (this.getMethod().equals(Method.UPSERT)) { - index.upsert(this.item, this.items); - } - } catch (Exception e) { - throw new IllegalStateException(this.getClass().getSimpleName() + - " cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() + - " as " + this.getIndexType() + - " on key: " + this.item.getKey(), e); - } + public List<ActionRequest> doCall(Index index) throws Exception { + List<ActionRequest> requests = new ArrayList<ActionRequest>(); + requests.add(Requests.deleteRequest(index.getIndexName()) + .id(dto.getKey().toString()) + .type(indexType)); + return requests; } } - diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java index 3e23d5514e8..adc15a9a9a5 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/KeyIndexAction.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteKey.java @@ -19,42 +19,40 @@ */ package org.sonar.server.search.action; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.client.Requests; +import org.sonar.server.search.Index; + import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; -public class KeyIndexAction<K extends Serializable> extends IndexAction { +public class DeleteKey<K extends Serializable> extends IndexActionRequest { private final K key; - private final K[] keys; - public KeyIndexAction(String indexType, Method method, K key, K... keys) { - super(indexType, method); + public DeleteKey(String indexType, K key) { + super(indexType); this.key = key; - this.keys = keys; } @Override - public Class<?> getPayloadClass() { - return String.class; + public String getKey() { + return key.toString(); } @Override - public String getKey() { - return this.key.toString(); + public Class<?> getPayloadClass() { + throw new IllegalStateException("Deletion by key does not have an object payload!"); } @Override - public void doExecute() { - try { - if (this.getMethod().equals(Method.DELETE)) { - index.deleteByKey(this.key, this.keys); - } else if (this.getMethod().equals(Method.UPSERT)) { - throw new IllegalStateException("Upsert by Key is not supported anymore"); - } - } catch (Exception e) { - throw new IllegalStateException(this.getClass().getSimpleName() + - "cannot execute " + this.getMethod() + " for " + this.key.getClass().getSimpleName() + - " on type: " + this.getIndexType() + - " on key: " + this.key, e); - } + public List<ActionRequest> doCall(Index index) throws Exception { + List<ActionRequest> requests = new ArrayList<ActionRequest>(); + requests.add(Requests.deleteRequest(index.getIndexName()) + .id(getKey()) + .type(indexType)); + return requests; } + } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java index 728ea8ece6c..15555a4481b 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/EmbeddedIndexAction.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/DeleteNestedItem.java @@ -19,18 +19,21 @@ */ package org.sonar.server.search.action; +import org.elasticsearch.action.ActionRequest; +import org.sonar.server.search.Index; + import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; -public class EmbeddedIndexAction<K extends Serializable> extends IndexAction { +public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest { private final K key; private final Object item; private final Object[] items; - public EmbeddedIndexAction(String indexType, Method method, K key, Object item, Object... items) { - super(indexType, method); - this.indexType = indexType; - this.method = method; + public DeleteNestedItem(String indexType, K key, Object item, Object... items) { + super(indexType); this.key = key; this.item = item; this.items = items; @@ -47,19 +50,16 @@ public class EmbeddedIndexAction<K extends Serializable> extends IndexAction { } @Override - public void doExecute() { - - try { - if (this.getMethod().equals(Method.DELETE)) { - index.delete(this.key, this.item, this.items); - } else if (this.getMethod().equals(Method.UPSERT)) { - index.upsert(this.key, this.item, this.items); - } - } catch (Exception e) { - throw new IllegalStateException(this.getClass().getSimpleName() + - "cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() + - " as " + this.getIndexType() + - " on key: " + this.key, e); + public List<ActionRequest> doCall(Index index) throws Exception { + List<ActionRequest> updates = new ArrayList<ActionRequest>(); + updates.addAll(deleteItem(index, item, key)); + for (Object otherItem : items) { + updates.addAll(deleteItem(index, otherItem, key)); } + return updates; + } + + private List<ActionRequest> deleteItem(Index index, Object item, K key) { + return index.getNormalizer().deleteNested(item, key); } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java new file mode 100644 index 00000000000..889cf7a130c --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java @@ -0,0 +1,72 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.search.action; + + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.sonar.core.cluster.ClusterAction; +import org.sonar.server.search.Index; + +import java.util.ArrayList; +import java.util.List; + +public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> { + + protected final String indexType; + private Index index; + + public IndexActionRequest(String indexType) { + super(); + this.indexType = indexType; + } + + public abstract String getKey(); + + public abstract Class<?> getPayloadClass(); + + public String getIndexType() { + return indexType; + } + + + public void setIndex(Index index) { + this.index = index; + } + + @Override + public final List<ActionRequest> call() throws Exception { + if (index == null) { + throw new IllegalStateException("Cannot execute request - Index is null"); + } + List<ActionRequest> finalRequests = new ArrayList<ActionRequest>(); + for (ActionRequest request : doCall(index)) { + if (request.getClass().isAssignableFrom(UpdateRequest.class)) { + ((UpdateRequest) request) + .type(index.getIndexType()) + .index(index.getIndexName()); + } + finalRequests.add(request); + } + return finalRequests; + } + + public abstract List<ActionRequest> doCall(Index index) throws Exception; +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java index c2001b4257c..1e31a35d788 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexAction.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDTO.java @@ -19,47 +19,33 @@ */ package org.sonar.server.search.action; -import org.sonar.core.cluster.QueueAction; +import org.elasticsearch.action.ActionRequest; +import org.sonar.core.persistence.Dto; import org.sonar.server.search.Index; -public abstract class IndexAction extends QueueAction { +import java.util.List; +public class UpsertDto<DTO extends Dto> extends IndexActionRequest { - public abstract Class<?> getPayloadClass(); + private final DTO dto; - public enum Method { - UPSERT, DELETE + public UpsertDto(String indexType, DTO dto) { + super(indexType); + this.dto = dto; } - protected String indexType; - protected Method method; - protected Index index; - - public IndexAction(String indexType, Method method) { - super(); - this.indexType = indexType; - this.method = method; - } - - public abstract String getKey(); - - public Method getMethod() { - return this.method; - } - - public String getIndexType() { - return indexType; - } - - public void setMethod(Method method) { - this.method = method; + @Override + public String getKey() { + return dto.getKey().toString(); } - public void setIndex(Index index) { - this.index = index; + @Override + public Class<?> getPayloadClass() { + return dto.getClass(); } - public Index getIndex() { - return index; + @Override + public List<ActionRequest> doCall(Index index) throws Exception { + return index.getNormalizer().normalize(dto); } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java new file mode 100644 index 00000000000..e87fe604e86 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertNestedItem.java @@ -0,0 +1,66 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.search.action; + +import org.elasticsearch.action.ActionRequest; +import org.sonar.server.search.Index; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest { + + + private final K key; + private final Object item; + private final Object[] items; + + public UpsertNestedItem(String indexType, K key, Object item, Object... items) { + super(indexType); + this.key = key; + this.item = item; + this.items = items; + } + + @Override + public String getKey() { + return key.toString(); + } + + @Override + public Class<?> getPayloadClass() { + return item.getClass(); + } + + @Override + public List<ActionRequest> doCall(Index index) throws Exception { + List<ActionRequest> updates = new ArrayList<ActionRequest>(); + updates.addAll(normalizeItem(index, item, key)); + for (Object otherItem : items) { + updates.addAll(normalizeItem(index, otherItem, key)); + } + return updates; + } + + private List<ActionRequest> normalizeItem(Index index, Object item, K key) { + return index.getNormalizer().normalizeNested(item, key); + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java b/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java deleted file mode 100644 index 6de0df6a978..00000000000 --- a/server/sonar-server/src/main/java/org/sonar/server/search/es/ListUpdate.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * SonarQube, open source software quality management tool. - * Copyright (C) 2008-2014 SonarSource - * mailto:contact AT sonarsource DOT com - * - * SonarQube is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * SonarQube is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.server.search.es; - -import com.google.common.collect.ImmutableSet; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.script.AbstractExecutableScript; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.script.NativeScriptFactory; - -import java.util.Collection; -import java.util.Map; - -/** - * @since 4.4 - */ -public class ListUpdate extends AbstractExecutableScript { - - public static final String NAME = "listUpdate"; - - public static final String ID_FIELD = "idField"; - public static final String ID_VALUE = "idValue"; - public static final String FIELD = "field"; - public static final String VALUE = "value"; - - public static class UpdateListScriptFactory implements NativeScriptFactory { - @Override - public ExecutableScript newScript(@Nullable Map<String, Object> params) { - String idField = XContentMapValues.nodeStringValue(params.get(ID_FIELD), null); - String idValue = XContentMapValues.nodeStringValue(params.get(ID_VALUE), null); - String field = XContentMapValues.nodeStringValue(params.get(FIELD), null); - Map value = null; - if (idField == null) { - throw new IllegalStateException("Missing '" + ID_FIELD + "' parameter"); - } - if (idValue == null) { - throw new IllegalStateException("Missing '" + ID_VALUE + "' parameter"); - } - if (field == null) { - throw new IllegalStateException("Missing '" + FIELD + "' parameter"); - } - - //NULL case is deletion of nested item - if (params.containsKey(VALUE)) { - Object obj = params.get(VALUE); - if (obj != null) { - value = XContentMapValues.nodeMapValue(params.get(VALUE), "Update item"); - } - } - - return new ListUpdate(idField, idValue, field, value); - } - } - - - private final String idField; - private final String idValue; - private final String field; - private final Map<String, Object> value; - - private Map<String, Object> ctx; - - public ListUpdate(String idField, String idValue, String field, Map<String, Object> value) { - this.idField = idField; - this.idValue = idValue; - this.field = field; - this.value = value; - } - - @Override - public void setNextVar(String name, Object value) { - if (name.equals("ctx")) { - ctx = (Map<String, Object>) value; - } - } - - @Override - public Object unwrap(Object value) { - return value; - } - - @Override - public Object run() { - try { - //Get the Document's source from ctx - Map<String, Object> source = XContentMapValues.nodeMapValue(ctx.get("_source"), "source from context"); - - //Get the Object for list update - Object fieldValue = source.get(field); - - if (fieldValue == null && value != null) { - // 0. The field does not exist (this is a upsert then) - source.put(field, value); - } else if (!XContentMapValues.isArray(fieldValue) && value != null) { - // 1. The field is not yet a list - Map currentFieldValue = XContentMapValues.nodeMapValue(fieldValue, "current FieldValue"); - if (XContentMapValues.nodeStringValue(currentFieldValue.get(idField), null).equals(idValue)) { - source.put(field, value); - } else { - source.put(field, ImmutableSet.of(fieldValue, value)); - } - } else { - // 3. field is a list - Collection items = (Collection) fieldValue; - Object target = null; - for (Object item : items) { - Map<String, Object> fields = (Map<String, Object>) item; - String itemIdValue = XContentMapValues.nodeStringValue(fields.get(idField), null); - if (itemIdValue != null && itemIdValue.equals(idValue)) { - target = item; - break; - } - } - if (target != null) { - items.remove(target); - } - - //Supporting the update by NULL = deletion case - if (value != null) { - items.add(value); - } - source.put(field, items); - } - } catch (Exception e) { - throw new IllegalStateException("failed to execute listUpdate script", e); - } - return null; - - } -} diff --git a/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java b/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java index 92daecaef82..e92fd5b936f 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/search/BaseIndexTest.java @@ -29,7 +29,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.sonar.api.config.Settings; -import org.sonar.core.cluster.NullQueue; import org.sonar.process.MonitoredProcess; import org.sonar.process.NetworkUtils; import org.sonar.process.Props; @@ -107,7 +106,7 @@ public class BaseIndexTest { private BaseIndex getIndex(final SearchClient searchClient) { BaseIndex index = new BaseIndex( IndexDefinition.TEST, - null, new NullQueue(), searchClient) { + null, searchClient) { @Override protected String getKeyValue(Serializable key) { return null; diff --git a/sonar-core/pom.xml b/sonar-core/pom.xml index bad0f4f5503..34c5d976e5b 100644 --- a/sonar-core/pom.xml +++ b/sonar-core/pom.xml @@ -184,6 +184,11 @@ <artifactId>h2</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>1.1.2</version> + </dependency> </dependencies> <build> diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java b/sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java index f83078d3bb5..bac9518865f 100644 --- a/sonar-core/src/main/java/org/sonar/core/cluster/QueueAction.java +++ b/sonar-core/src/main/java/org/sonar/core/cluster/ClusterAction.java @@ -19,27 +19,10 @@ */ package org.sonar.core.cluster; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; -public abstract class QueueAction implements Runnable { - - protected CountDownLatch latch; - - public QueueAction() { - this.latch = null; - } - - public void setLatch(CountDownLatch latch){ - this.latch = latch; - } - - public abstract void doExecute(); +public interface ClusterAction<K> extends Callable<K> { @Override - public void run(){ - this.doExecute(); - if (latch != null){ - latch.countDown(); - } - } + public K call() throws Exception; } diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java b/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java index 235d2078fef..cccdf7033bc 100644 --- a/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java +++ b/sonar-core/src/main/java/org/sonar/core/cluster/NullQueue.java @@ -21,15 +21,10 @@ package org.sonar.core.cluster; import java.util.List; -public class NullQueue implements WorkQueue<QueueAction> { +public class NullQueue implements WorkQueue<ClusterAction> { @Override - public void enqueue(QueueAction action) { - // do nothing - } - - @Override - public void enqueue(List<QueueAction> actions) { + public void enqueue(List<ClusterAction> actions) { // do nothing } } diff --git a/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java b/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java index 9b4141f510d..9d5cf998564 100644 --- a/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java +++ b/sonar-core/src/main/java/org/sonar/core/cluster/WorkQueue.java @@ -20,10 +20,9 @@ package org.sonar.core.cluster; import java.util.List; +import java.util.concurrent.Callable; -public interface WorkQueue<K extends QueueAction> { - - void enqueue(K action); +public interface WorkQueue<K extends Callable> { void enqueue(List<K> actions); diff --git a/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java b/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java index 10e82b803cc..b663b22e35e 100644 --- a/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java +++ b/sonar-core/src/main/java/org/sonar/core/persistence/DbSession.java @@ -24,7 +24,7 @@ import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.ResultHandler; import org.apache.ibatis.session.RowBounds; import org.apache.ibatis.session.SqlSession; -import org.sonar.core.cluster.QueueAction; +import org.sonar.core.cluster.ClusterAction; import org.sonar.core.cluster.WorkQueue; import java.sql.Connection; @@ -34,8 +34,8 @@ import java.util.Map; public class DbSession implements SqlSession { - private static final Integer IMPLICIT_COMMIT_SIZE = 200; - private List<QueueAction> actions; + private static final Integer IMPLICIT_COMMIT_SIZE = 1000; + private List<ClusterAction> actions; private WorkQueue queue; private SqlSession session; @@ -43,10 +43,10 @@ public class DbSession implements SqlSession { DbSession(WorkQueue queue, SqlSession session) { this.session = session; this.queue = queue; - this.actions = new ArrayList<QueueAction>(); + this.actions = new ArrayList<ClusterAction>(); } - public void enqueue(QueueAction action) { + public void enqueue(ClusterAction action) { this.actions.add(action); if (this.actions.size() > IMPLICIT_COMMIT_SIZE) { this.commit(); |