.put("discovery.zen.ping.multicast.enabled", "false")
// Index storage policies
+ .put("index.refresh_interval", "30")
.put("index.number_of_shards", "1")
.put("index.number_of_replicas", MINIMUM_INDEX_REPLICATION)
.put("index.store.type", "mmapfs")
+ .put("indices.store.throttle.type", "none")
+ .put("index.merge.scheduler.max_thread_count",
+ Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2)))
// Install our own listUpdate scripts
.put("script.default_lang", "native")
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.action.DeleteKey;
import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.RefreshIndex;
import org.sonar.server.search.action.UpsertDto;
import org.sonar.server.search.action.UpsertNestedItem;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
-
import java.io.Serializable;
import java.sql.Timestamp;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
import static com.google.common.collect.Maps.newHashMap;
@Override
public void handleResult(ResultContext resultContext) {
DTO dto = (DTO) resultContext.getResultObject();
- session.enqueue(new UpsertDto<DTO>(getIndexType(), dto, true));
+ session.enqueue(new UpsertDto<DTO>(getIndexType(), dto, false));
count++;
if (count % 100000 == 0) {
LOGGER.info(" - synchronized {} {}", count, getIndexType());
DbSynchronizationHandler handler = getSynchronizationResultHandler(session);
session.select(getSynchronizeStatementFQN(), getSynchronizationParams(date, params), handler);
handler.enqueueCollected();
+ session.enqueue(new RefreshIndex(this.getIndexType()));
} catch (Exception e) {
throw new IllegalStateException(e);
}
return mapping;
}
- @Override
- public void refresh() {
- client.execute(client
- .admin()
- .indices()
- .prepareRefresh(this.getIndexName())
- .setForce(false)
- .setIndices(this.getIndexName()));
- }
-
/* Base CRUD methods */
protected abstract DOMAIN toDoc(Map<String, Object> fields);
String getIndexName();
- void refresh();
-
Date getLastSynchronization();
IndexStat getIndexStat();
package org.sonar.server.search;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.sonar.api.platform.ComponentContainer;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.profiling.Profiling;
-import org.sonar.server.search.action.IndexActionRequest;
+import org.sonar.server.search.action.IndexAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
-public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest> {
+public class IndexQueue implements ServerComponent, WorkQueue<IndexAction<?>> {
protected final Profiling profiling;
}
@Override
- public void enqueue(List<IndexActionRequest> actions) {
+ public void enqueue(List<IndexAction<?>> actions) {
if (actions.isEmpty()) {
return;
}
try {
+ boolean refreshRequired = false;
+
Map<String, Index> indexes = getIndexMap();
Set<String> indices = new HashSet<String>();
- for (IndexActionRequest action : actions) {
+ for (IndexAction action : actions) {
Index index = indexes.get(action.getIndexType());
action.setIndex(index);
if (action.needsRefresh()) {
+ refreshRequired = true;
indices.add(index.getIndexName());
}
}
BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
- long normTime = executeNormalization(bulkRequestBuilder, actions);
+ long normTime = processActionsIntoQueries(bulkRequestBuilder, actions);
- //execute the request
- long indexTime = System.currentTimeMillis();
- BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
- indexTime = System.currentTimeMillis() - indexTime;
+ if (bulkRequestBuilder.numberOfActions() > 0) {
+ // execute the request
+ long indexTime = System.currentTimeMillis();
+ BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
- long refreshTime = this.refreshRequiredIndex(indices);
+ indexTime = System.currentTimeMillis() - indexTime;
- LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
- bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+ long refreshTime = 0;
+ if (refreshRequired) {
+ refreshTime = this.refreshRequiredIndex(indices);
+ }
- if (response.hasFailures()) {
- throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
- }
+ LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+ bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+ if (response.hasFailures()) {
+ throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
+ }
+ }
} catch (Exception e) {
LOGGER.error("Could not commit to ElasticSearch", e);
}
}
-
private long refreshRequiredIndex(Set<String> indices) {
long refreshTime = System.currentTimeMillis();
return System.currentTimeMillis() - refreshTime;
}
- private long executeNormalization(BulkRequestBuilder bulkRequestBuilder, List<IndexActionRequest> actions) {
+ private long processActionsIntoQueries(BulkRequestBuilder bulkRequestBuilder, List<IndexAction<?>> actions) {
long normTime = System.currentTimeMillis();
try {
+ boolean hasInlineRefreshRequest = false;
ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
- //invokeAll() blocks until ALL tasks submitted to executor complete
- for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
- for (ActionRequest update : updateRequests.get()) {
+ // invokeAll() blocks until ALL tasks submitted to executor complete
+ List<Future<List<? extends ActionRequest>>> requests = executorService.invokeAll(actions, 20, TimeUnit.SECONDS);
+ for (Future<List<? extends ActionRequest>> updates : requests) {
+ for (ActionRequest update : updates.get()) {
if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add(((UpdateRequest) update).refresh(false));
- } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add(((DeleteRequest) update).refresh(false));
- } else {
+ bulkRequestBuilder.add(((UpdateRequest) update));
+ }
+
+ else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
+ bulkRequestBuilder.add(((DeleteRequest) update));
+ }
+
+ else if (RefreshRequest.class.isAssignableFrom(update.getClass())) {
+ hasInlineRefreshRequest = true;
+ }
+
+ else {
throw new IllegalStateException("Un-managed request type: " + update.getClass());
}
}
}
executorService.shutdown();
+ bulkRequestBuilder.setRefresh(hasInlineRefreshRequest);
} catch (Exception e) {
throw new IllegalStateException("Could not execute normalization for stack", e);
}
*/
package org.sonar.server.search.action;
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Requests;
import org.sonar.core.persistence.Dto;
import org.sonar.server.search.Index;
import java.util.ArrayList;
import java.util.List;
-public class DeleteDto<DTO extends Dto> extends IndexActionRequest {
+public class DeleteDto<DTO extends Dto> extends IndexAction<DeleteRequest> {
private final DTO dto;
}
@Override
- public List<ActionRequest> doCall(Index index) throws Exception {
- List<ActionRequest> requests = new ArrayList<ActionRequest>();
+ public List<DeleteRequest> doCall(Index index) throws Exception {
+ List<DeleteRequest> requests = new ArrayList<DeleteRequest>();
requests.add(Requests.deleteRequest(index.getIndexName())
.id(dto.getKey().toString())
.type(indexType));
*/
package org.sonar.server.search.action;
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Requests;
import org.sonar.server.search.Index;
import java.util.ArrayList;
import java.util.List;
-public class DeleteKey<K extends Serializable> extends IndexActionRequest {
+public class DeleteKey<K extends Serializable> extends IndexAction<DeleteRequest> {
private final K key;
}
@Override
- public List<ActionRequest> doCall(Index index) throws Exception {
- List<ActionRequest> requests = new ArrayList<ActionRequest>();
+ public List<DeleteRequest> doCall(Index index) throws Exception {
+ List<DeleteRequest> requests = new ArrayList<DeleteRequest>();
requests.add(Requests.deleteRequest(index.getIndexName())
.id(getKey())
.type(indexType));
*/
package org.sonar.server.search.action;
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.sonar.server.search.Index;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest {
+public class DeleteNestedItem<K extends Serializable> extends IndexAction<UpdateRequest> {
private final K key;
private final Object item;
}
@Override
- public List<ActionRequest> doCall(Index index) throws Exception {
- List<ActionRequest> updates = new ArrayList<ActionRequest>();
+ public List<UpdateRequest> doCall(Index index) throws Exception {
+ List<UpdateRequest> updates = new ArrayList<UpdateRequest>();
updates.addAll(deleteItem(index, item, key));
for (Object otherItem : items) {
updates.addAll(deleteItem(index, otherItem, key));
return updates;
}
- private List<ActionRequest> deleteItem(Index index, Object item, K key) {
- return index.getNormalizer().deleteNested(item, key);
+ private List<UpdateRequest> deleteItem(Index index, Object item, K key) {
+ List<UpdateRequest> updates = index.getNormalizer().deleteNested(item, key);
+ for (UpdateRequest update : updates) {
+ update.index(index.getIndexName())
+ .type(index.getIndexType())
+ .refresh(needsRefresh());
+ }
+ return updates;
}
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public abstract class IndexAction<K extends ActionRequest> implements ClusterAction<List<K>> {
+
+ protected final String indexType;
+ private final boolean requiresRefresh;
+ private Index index;
+
+ protected IndexAction(String indexType) {
+ this(indexType, true);
+ }
+
+ protected IndexAction(String indexType, boolean requiresRefresh) {
+ this.indexType = indexType;
+ this.requiresRefresh = requiresRefresh;
+ }
+
+ public abstract String getKey();
+
+ public abstract Class getPayloadClass();
+
+ public String getIndexType() {
+ return indexType;
+ }
+
+ public IndexAction<K> setIndex(Index index) {
+ this.index = index;
+ return this;
+ }
+
+ @Override
+ public final List<K> call() throws Exception {
+ if (index == null) {
+ throw new IllegalStateException("Cannot execute request on null index");
+ }
+ return doCall(index);
+ }
+
+ public abstract List<K> doCall(Index index) throws Exception;
+
+ public boolean needsRefresh() {
+ return this.requiresRefresh;
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.sonar.core.cluster.ClusterAction;
-import org.sonar.server.search.Index;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
-
- protected final String indexType;
- private final boolean requiresRefresh;
- private Index index;
-
- protected IndexActionRequest(String indexType) {
- this(indexType, true);
- }
-
- protected IndexActionRequest(String indexType, boolean requiresRefresh) {
- super();
- this.indexType = indexType;
- this.requiresRefresh = requiresRefresh;
- }
-
- public abstract String getKey();
-
- public abstract Class getPayloadClass();
-
- public String getIndexType() {
- return indexType;
- }
-
-
- public void setIndex(Index index) {
- this.index = index;
- }
-
- @Override
- public final List<ActionRequest> call() throws Exception {
- if (index == null) {
- throw new IllegalStateException("Cannot execute request - Index is null");
- }
- List<ActionRequest> finalRequests = new ArrayList<ActionRequest>();
- for (ActionRequest request : doCall(index)) {
- if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
- ((UpdateRequest) request)
- .type(index.getIndexType())
- .index(index.getIndexName())
- .refresh(false);
- }
- finalRequests.add(request);
- }
- return finalRequests;
- }
-
- public abstract List<ActionRequest> doCall(Index index) throws Exception;
-
- public boolean needsRefresh() {
- return this.requiresRefresh;
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public class RefreshIndex extends IndexAction<RefreshRequest> {
+
+ public RefreshIndex(String indexType) {
+ super(indexType);
+ }
+
+ @Override
+ public String getKey() {
+ return null;
+ }
+
+ @Override
+ public Class getPayloadClass() {
+ return null;
+ }
+
+ @Override
+ public List<RefreshRequest> doCall(Index index) throws Exception {
+ return ImmutableList.<RefreshRequest>of(
+ new RefreshRequest()
+ .force(false)
+ .indices(index.getIndexName()));
+ }
+}
*/
package org.sonar.server.search.action;
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.sonar.core.persistence.Dto;
import org.sonar.server.search.Index;
import java.util.List;
-public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
+public class UpsertDto<DTO extends Dto> extends IndexAction<UpdateRequest> {
private final DTO dto;
}
@Override
- public List<ActionRequest> doCall(Index index) throws Exception {
- return index.getNormalizer().normalize(dto);
+ public List<UpdateRequest> doCall(Index index) throws Exception {
+ List<UpdateRequest> updates = index.getNormalizer().normalize(dto);
+ for (UpdateRequest update : updates) {
+ update.index(index.getIndexName())
+ .type(index.getIndexType())
+ .refresh(needsRefresh());
+ }
+ return updates;
}
}
*/
package org.sonar.server.search.action;
-import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.sonar.server.search.Index;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest {
-
+public class UpsertNestedItem<K extends Serializable> extends IndexAction<UpdateRequest> {
private final K key;
private final Object item;
}
@Override
- public List<ActionRequest> doCall(Index index) throws Exception {
- List<ActionRequest> updates = new ArrayList<ActionRequest>();
+ public List<UpdateRequest> doCall(Index index) throws Exception {
+ List<UpdateRequest> updates = new ArrayList<UpdateRequest>();
updates.addAll(normalizeItem(index, item, key));
for (Object otherItem : items) {
updates.addAll(normalizeItem(index, otherItem, key));
return updates;
}
- private List<ActionRequest> normalizeItem(Index index, Object item, K key) {
- return index.getNormalizer().normalizeNested(item, key);
+ private List<UpdateRequest> normalizeItem(Index index, Object item, K key) {
+ List<UpdateRequest> updates = index.getNormalizer().normalizeNested(item, key);
+ for (UpdateRequest update : updates) {
+ update.index(index.getIndexName())
+ .type(index.getIndexType())
+ .refresh(needsRefresh());
+ }
+ return updates;
}
}
@Test
public void massive_insert() {
+ // Set qeue's implicit commit size to 10
+ dbSession.setImplicitCommitSize(10);
+
// 0 Assert no logs in DB
assertThat(dao.findAll(dbSession)).hasSize(0);
- int max = 400;
+ int max = 35;
final String testValue = "hello world";
for (int i = 0; i < max; i++) {
@Test
public void massive_log_insert() {
+ // Set qeue's implicit commit size to 10
+ dbSession.setImplicitCommitSize(10);
+
// 0 Assert no logs in DB
assertThat(dao.findAll(dbSession)).hasSize(0);
- int max = 400;
+ int max = 40;
final String testValue = "hello world";
for (int i = 0; i < max; i++) {
TestActivityLog log = new TestActivityLog(testValue + "_" + i, Activity.Type.QPROFILE.toString());
assertThat(session.getActionCount()).isEqualTo(1);
dao.synchronizeAfter(session, new Date(t0));
- assertThat(session.getActionCount()).isEqualTo(2);
+ // Synchronize adds an implicit action to the queue before finishing.
+ assertThat(session.getActionCount()).isEqualTo(3);
}
}
assertThat(session.getActionCount()).isEqualTo(0);
dao.synchronizeAfter(session, new Date(0));
-
- assertThat(session.getActionCount()).isEqualTo(1);
+ // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+ assertThat(session.getActionCount()).isEqualTo(2);
}
@Test
setupData("synchronize_after_since_given_date");
dao.synchronizeAfter(session, DateUtils.parseDate("2014-09-01"));
- assertThat(session.getActionCount()).isEqualTo(1);
+ // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+ assertThat(session.getActionCount()).isEqualTo(2);
}
@Test
setupData("synchronize_after_with_project");
dao.synchronizeAfter(session, DateUtils.parseDate("2014-01-01"), ImmutableMap.of(IssueAuthorizationDao.PROJECT_KEY, "org.sonar:sample"));
- assertThat(session.getActionCount()).isEqualTo(1);
+ // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+ assertThat(session.getActionCount()).isEqualTo(2);
}
}
.getState().getMetaData().concreteAllIndices())
.setQuery(QueryBuilders.matchAllQuery())
.get();
-
+ searchClient.admin().indices().prepareRefresh(searchClient.admin().cluster().prepareState().get()
+ .getState().getMetaData().concreteAllIndices())
+ .setForce(true)
+ .get();
+ searchClient.admin().indices().prepareFlush(searchClient.admin().cluster().prepareState().get()
+ .getState().getMetaData().concreteAllIndices())
+ .get();
}
}
private SqlSession session;
private int actionCount;
+ private Integer implicitCommitSize = IMPLICIT_COMMIT_SIZE;
+
DbSession(WorkQueue queue, SqlSession session) {
this.actionCount = 0;
this.session = session;
this.actions = new ArrayList<ClusterAction>();
}
+ public Integer getImplicitCommitSize() {
+ return implicitCommitSize;
+ }
+
+ public void setImplicitCommitSize(Integer implicitCommitSize) {
+ this.implicitCommitSize = implicitCommitSize;
+ }
+
public void enqueue(ClusterAction action) {
actionCount++;
this.actions.add(action);
- if (this.actions.size() > IMPLICIT_COMMIT_SIZE) {
+ if (this.actions.size() > getImplicitCommitSize()) {
this.commit();
}
}