From 600f1bd16ed55d83b66c2e36fc0376a4d6712118 Mon Sep 17 00:00:00 2001 From: Stephane Gamard Date: Tue, 16 Sep 2014 21:44:11 +0200 Subject: [PATCH] SONAR-5531 - Updated ES refresh while synchronizing --- .../main/java/org/sonar/search/SearchServer.java | 5 +++++ .../main/java/org/sonar/server/db/BaseDao.java | 3 ++- .../java/org/sonar/server/search/BaseIndex.java | 2 +- .../java/org/sonar/server/search/IndexQueue.java | 15 +++++++++++---- .../sonar/server/search/IndexSynchronizer.java | 1 + .../server/search/action/IndexActionRequest.java | 3 --- .../activity/ActivityBackendMediumTest.java | 10 ++++++++-- .../search/IndexSynchronizerMediumTest.java | 1 + .../org/sonar/server/tester/BackendCleanup.java | 5 ++++- .../org/sonar/server/tester/ServerTester.java | 1 + .../org/sonar/core/persistence/DbSession.java | 12 +++++++++++- 11 files changed, 45 insertions(+), 13 deletions(-) diff --git a/server/sonar-search/src/main/java/org/sonar/search/SearchServer.java b/server/sonar-search/src/main/java/org/sonar/search/SearchServer.java index 1433bd71b6e..c1b3bb4c150 100644 --- a/server/sonar-search/src/main/java/org/sonar/search/SearchServer.java +++ b/server/sonar-search/src/main/java/org/sonar/search/SearchServer.java @@ -88,9 +88,14 @@ public class SearchServer implements Monitored { .put("discovery.zen.ping.multicast.enabled", "false") // Index storage policies + .put("index.refresh_interval", "30") .put("index.number_of_shards", "1") .put("index.number_of_replicas", MINIMUM_INDEX_REPLICATION) .put("index.store.type", "mmapfs") +// .put("indices.store.throttle.type", "merge") +// .put("indices.store.throttle.max_bytes_per_sec", "500mb") + .put("index.merge.scheduler.max_thread_count", + Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))) // Install our own listUpdate scripts .put("script.default_lang", "native") diff --git a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java index a87021146bb..59183eb307a 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java +++ b/server/sonar-server/src/main/java/org/sonar/server/db/BaseDao.java @@ -323,7 +323,7 @@ public abstract class BaseDao, KEY extends Serializ @Override public void handleResult(ResultContext resultContext) { DTO dto = (DTO) resultContext.getResultObject(); - session.enqueue(new UpsertDto(getIndexType(), dto, true)); + session.enqueue(new UpsertDto(getIndexType(), dto, false)); count++; if (count % 100000 == 0) { LOGGER.info(" - synchronized {} {}", count, getIndexType()); @@ -358,6 +358,7 @@ public abstract class BaseDao, KEY extends Serializ DbSynchronizationHandler handler = getSynchronizationResultHandler(session); session.select(getSynchronizeStatementFQN(), getSynchronizationParams(date, params), handler); handler.enqueueCollected(); + } catch (Exception e) { throw new IllegalStateException(e); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java b/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java index 6faafd538df..a2b1597adc9 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java @@ -365,7 +365,7 @@ public abstract class BaseIndex, KEY extends Serial .admin() .indices() .prepareRefresh(this.getIndexName()) - .setForce(false) + .setForce(true) .setIndices(this.getIndexName())); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java index de7636d8596..e5f01c3ca9e 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/IndexQueue.java @@ -69,9 +69,14 @@ public class IndexQueue implements ServerComponent, WorkQueue indexes = getIndexMap(); Set indices = new HashSet(); for (IndexActionRequest action : actions) { + if (action.needsRefresh()) { + refreshRequired = true; + } Index index = indexes.get(action.getIndexType()); action.setIndex(index); if (action.needsRefresh()) { @@ -83,12 +88,15 @@ public class IndexQueue implements ServerComponent, WorkQueue indices) { long refreshTime = System.currentTimeMillis(); @@ -124,7 +131,7 @@ public class IndexQueue implements ServerComponent, WorkQueue> updateRequests : executorService.invokeAll(actions)) { for (ActionRequest update : updateRequests.get()) { if (UpdateRequest.class.isAssignableFrom(update.getClass())) { diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java b/server/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java index 1ffcb84bf46..0cbacbe16c8 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java @@ -80,5 +80,6 @@ public class IndexSynchronizer { } dao.synchronizeAfter(session, index.getLastSynchronization()); + index.refresh(); } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java index 8eb540df4a6..6b4e1933739 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java +++ b/server/sonar-server/src/main/java/org/sonar/server/search/action/IndexActionRequest.java @@ -19,7 +19,6 @@ */ package org.sonar.server.search.action; - import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.update.UpdateRequest; import org.sonar.core.cluster.ClusterAction; @@ -39,7 +38,6 @@ public abstract class IndexActionRequest implements ClusterAction(); } + public Integer getImplicitCommitSize() { + return implicitCommitSize; + } + + public void setImplicitCommitSize(Integer implicitCommitSize) { + this.implicitCommitSize = implicitCommitSize; + } + public void enqueue(ClusterAction action) { actionCount++; this.actions.add(action); - if (this.actions.size() > IMPLICIT_COMMIT_SIZE) { + if (this.actions.size() > getImplicitCommitSize()) { this.commit(); } } -- 2.39.5