From 52213a0f26e3f22a4bee652a2b0c6d0f239fc8dd Mon Sep 17 00:00:00 2001 From: Stephane Gamard Date: Thu, 21 Aug 2014 15:49:02 +0200 Subject: [PATCH] indexQueue can explicitly avoid inline refresh. --- fork.sh | 2 +- .../resources/org/sonar/search/logback.xml | 2 +- .../java/org/sonar/server/db/BaseDao.java | 2 +- .../server/rule/index/RuleNormalizer.java | 4 -- .../org/sonar/server/search/IndexQueue.java | 40 +++++++++++++++---- .../search/action/IndexActionRequest.java | 15 ++++++- .../sonar/server/search/action/UpsertDto.java | 6 ++- 7 files changed, 54 insertions(+), 17 deletions(-) diff --git a/fork.sh b/fork.sh index fbbf69331f4..1b904826697 100755 --- a/fork.sh +++ b/fork.sh @@ -1,6 +1,6 @@ #!/bin/sh -mvn clean install -DskipTests -pl :sonar-core,sonar-search -amd +mvn clean install -DskipTests -pl :sonar-server,:sonar-search -amd if [[ "$OSTYPE" == "darwin"* ]]; then OS='macosx-universal-64' diff --git a/server/sonar-search/src/main/resources/org/sonar/search/logback.xml b/server/sonar-search/src/main/resources/org/sonar/search/logback.xml index 648ece82e28..5f714eec3e7 100644 --- a/server/sonar-search/src/main/resources/org/sonar/search/logback.xml +++ b/server/sonar-search/src/main/resources/org/sonar/search/logback.xml @@ -28,7 +28,7 @@ - + diff --git a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java index 95698625934..170d446df1b 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java +++ b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java @@ -280,7 +280,7 @@ public abstract class BaseDao, K extends Serializable> imple @Override public final void synchronizeAfter(final DbSession session, Date date) { for (E dto : this.findAfterDate(session, date)) { - session.enqueue(new UpsertDto(getIndexType(), dto)); + session.enqueue(new UpsertDto(getIndexType(), dto, false)); } session.commit(); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java index 3a347255d7d..a27b0eafa94 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java +++ b/server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleNormalizer.java @@ -22,8 +22,6 @@ package org.sonar.server.rule.index; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.update.UpdateRequest; import org.sonar.api.rule.RuleKey; import org.sonar.api.rule.RuleStatus; @@ -281,8 +279,6 @@ public class RuleNormalizer extends BaseNormalizer { /** Creating updateRequest */ requests.add(new UpdateRequest() - .replicationType(ReplicationType.ASYNC) - .consistencyLevel(WriteConsistencyLevel.QUORUM) .id(rule.getKey().toString()) .doc(update) .upsert(upsert)); diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java index 564c63810e0..8061f04ab64 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java @@ -34,8 +34,10 @@ import org.sonar.core.profiling.Profiling; import org.sonar.server.search.action.IndexActionRequest; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -69,32 +71,56 @@ public class IndexQueue extends LinkedBlockingQueue } try { + long normTime = System.currentTimeMillis(); BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient); Map indexes = getIndexMap(); + Set indices = new HashSet(); for (IndexActionRequest action : actions) { - action.setIndex(indexes.get(action.getIndexType())); + Index index = indexes.get(action.getIndexType()); + action.setIndex(index); + indices.add(index.getIndexName()); } - ExecutorService executorService = Executors.newFixedThreadPool(4); + ExecutorService executorService = Executors.newFixedThreadPool(10); + + // Do we need to refresh + boolean requiresRefresh = false; + for (IndexActionRequest action : actions) { + if (action.needsRefresh()) { + requiresRefresh = true; + break; + } + } //invokeAll() blocks until ALL tasks submitted to executor complete for (Future> updateRequests : executorService.invokeAll(actions)) { for (ActionRequest update : updateRequests.get()) { if (UpdateRequest.class.isAssignableFrom(update.getClass())) { - bulkRequestBuilder.add((UpdateRequest) update); + bulkRequestBuilder.add(((UpdateRequest) update).refresh(false)); } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) { - bulkRequestBuilder.add((DeleteRequest) update); + bulkRequestBuilder.add(((DeleteRequest) update).refresh(false)); } else { throw new IllegalStateException("Un-managed request type: " + update.getClass()); } } } executorService.shutdown(); - - LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions()); + normTime = System.currentTimeMillis() - normTime; //execute the request - BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true)); + long indexTime = System.currentTimeMillis(); + BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false)); + indexTime = System.currentTimeMillis() - indexTime; + + long refreshTime = System.currentTimeMillis(); + if (requiresRefresh) { + searchClient.admin().indices().prepareRefresh(indices.toArray(new String[indices.size()])).setForce(false).get(); + } + refreshTime = System.currentTimeMillis() - refreshTime; + + LOGGER.info("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms", + bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime)); + } catch (Exception e) { e.printStackTrace(); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java index 889cf7a130c..2d36fa0d5dc 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java @@ -31,11 +31,17 @@ import java.util.List; public abstract class IndexActionRequest implements ClusterAction> { protected final String indexType; + private final boolean requiresRefresh; private Index index; - public IndexActionRequest(String indexType) { + protected IndexActionRequest(String indexType) { + this(indexType, true); + } + + protected IndexActionRequest(String indexType, boolean requiresRefresh) { super(); this.indexType = indexType; + this.requiresRefresh = requiresRefresh; } public abstract String getKey(); @@ -61,7 +67,8 @@ public abstract class IndexActionRequest implements ClusterAction doCall(Index index) throws Exception; + + public boolean needsRefresh() { + return this.requiresRefresh; + } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDto.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDto.java index 1e31a35d788..5ad7af6c3f5 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDto.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/UpsertDto.java @@ -30,7 +30,11 @@ public class UpsertDto extends IndexActionRequest { private final DTO dto; public UpsertDto(String indexType, DTO dto) { - super(indexType); + this(indexType, dto, true); + } + + public UpsertDto(String indexType, DTO dto, boolean requiresRefresh) { + super(indexType, requiresRefresh); this.dto = dto; } -- 2.39.5