import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.action.DeleteKey;
import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.RefreshActionRequest;
import org.sonar.server.search.action.UpsertDto;
import org.sonar.server.search.action.UpsertNestedItem;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
-
import java.io.Serializable;
import java.sql.Timestamp;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
import static com.google.common.collect.Maps.newHashMap;
DbSynchronizationHandler handler = getSynchronizationResultHandler(session);
session.select(getSynchronizeStatementFQN(), getSynchronizationParams(date, params), handler);
handler.enqueueCollected();
-
+ session.enqueue(new RefreshActionRequest(this.getIndexType()));
} catch (Exception e) {
throw new IllegalStateException(e);
}
return mapping;
}
- @Override
- public void refresh() {
- client.execute(client
- .admin()
- .indices()
- .prepareRefresh(this.getIndexName())
- .setForce(true)
- .setIndices(this.getIndexName()));
- }
-
/* Base CRUD methods */
protected abstract DOMAIN toDoc(Map<String, Object> fields);
String getIndexName();
- void refresh();
-
Date getLastSynchronization();
IndexStat getIndexStat();
package org.sonar.server.search;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.sonar.api.ServerComponent;
import org.sonar.api.config.Settings;
import org.sonar.api.platform.ComponentContainer;
+import org.sonar.core.cluster.ClusterAction;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.profiling.Profiling;
import org.sonar.server.search.action.IndexActionRequest;
+import org.sonar.server.search.action.IndexWorker;
+import org.sonar.server.search.action.RefreshActionRequest;
import java.util.HashMap;
import java.util.HashSet;
Map<String, Index> indexes = getIndexMap();
Set<String> indices = new HashSet<String>();
- for (IndexActionRequest action : actions) {
- if (action.needsRefresh()) {
- refreshRequired = true;
+ for (ClusterAction action : actions) {
+ if (IndexActionRequest.class.isAssignableFrom(action.getClass())) {
+ IndexActionRequest worker = (IndexActionRequest) action;
+ if (worker.needsRefresh()) {
+ refreshRequired = true;
+ indices.add(indexes.get(worker.getIndexType()).getIndexName());
+ }
+ }
+
+ if (IndexWorker.class.isAssignableFrom(action.getClass())) {
+ IndexWorker worker = (IndexWorker) action;
+ Index index = indexes.get(worker.getIndexType());
+ worker.setIndex(index);
}
- Index index = indexes.get(action.getIndexType());
- action.setIndex(index);
- if (action.needsRefresh()) {
- indices.add(index.getIndexName());
+
+ if (RefreshActionRequest.class.isAssignableFrom(action.getClass())) {
+ refreshRequired = true;
+ indices.add(indexes.get(((RefreshActionRequest) action).getIndexType()).getIndexName());
}
}
long normTime = executeNormalization(bulkRequestBuilder, actions);
- // execute the request
- long indexTime = System.currentTimeMillis();
- BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
- indexTime = System.currentTimeMillis() - indexTime;
+ if (bulkRequestBuilder.numberOfActions() > 0) {
+ // execute the request
+ long indexTime = System.currentTimeMillis();
+ BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
- long refreshTime = 0;
- if (refreshRequired) {
- refreshTime = this.refreshRequiredIndex(indices);
- }
+ indexTime = System.currentTimeMillis() - indexTime;
- LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
- bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+ long refreshTime = 0;
+ if (refreshRequired) {
+ refreshTime = this.refreshRequiredIndex(indices);
+ }
- if (response.hasFailures()) {
- throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
- }
+ LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+ bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+ if (response.hasFailures()) {
+ throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
+ }
+ }
} catch (Exception e) {
LOGGER.error("Could not commit to ElasticSearch", e);
}
private long executeNormalization(BulkRequestBuilder bulkRequestBuilder, List<IndexActionRequest> actions) {
long normTime = System.currentTimeMillis();
try {
+ boolean hasInlineRefreshRequest = false;
ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
// invokeAll() blocks until ALL tasks submitted to executor complete
for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
for (ActionRequest update : updateRequests.get()) {
if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add(((UpdateRequest) update).refresh(false));
+ bulkRequestBuilder.add(((UpdateRequest) update));
} else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add(((DeleteRequest) update).refresh(false));
+ bulkRequestBuilder.add(((DeleteRequest) update));
+ } else if (RefreshRequest.class.isAssignableFrom(update.getClass())) {
+ hasInlineRefreshRequest = true;
} else {
throw new IllegalStateException("Un-managed request type: " + update.getClass());
}
}
}
executorService.shutdown();
+ bulkRequestBuilder.setRefresh(hasInlineRefreshRequest);
} catch (Exception e) {
throw new IllegalStateException("Could not execute normalization for stack", e);
}
}
dao.synchronizeAfter(session,
index.getLastSynchronization());
- index.refresh();
}
}
import java.util.ArrayList;
import java.util.List;
-public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
+public abstract class IndexActionRequest implements IndexWorker, ClusterAction<ActionRequest> {
protected final String indexType;
private final boolean requiresRefresh;
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.sonar.server.search.Index;
+
+public interface IndexWorker {
+
+ public String getIndexType();
+
+ public void setIndex(Index index);
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public class RefreshActionRequest implements IndexWorker, ClusterAction<RefreshRequest> {
+
+ private Index<?, ?, ?> index;
+ private final String indexType;
+
+ public RefreshActionRequest(String indexType) {
+ this.indexType = indexType;
+ }
+
+ @Override
+ public List<RefreshRequest> call() throws Exception {
+ return ImmutableList.<RefreshRequest>of(
+ new RefreshRequest()
+ .force(false)
+ .indices(index.getIndexName()));
+ }
+
+ @Override
+ public String getIndexType() {
+ return indexType;
+ }
+
+ @Override
+ public void setIndex(Index index) {
+ this.index = index;
+ }
+}
assertThat(session.getActionCount()).isEqualTo(1);
dao.synchronizeAfter(session, new Date(t0));
- assertThat(session.getActionCount()).isEqualTo(2);
+ // Synchronize adds an implicit action to the queue before finishing.
+ assertThat(session.getActionCount()).isEqualTo(3);
}
}
assertThat(session.getActionCount()).isEqualTo(0);
dao.synchronizeAfter(session, new Date(0));
-
- assertThat(session.getActionCount()).isEqualTo(1);
+ // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+ assertThat(session.getActionCount()).isEqualTo(2);
}
@Test
setupData("synchronize_after_since_given_date");
dao.synchronizeAfter(session, DateUtils.parseDate("2014-09-01"));
- assertThat(session.getActionCount()).isEqualTo(1);
+ // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+ assertThat(session.getActionCount()).isEqualTo(2);
}
@Test
setupData("synchronize_after_with_project");
dao.synchronizeAfter(session, DateUtils.parseDate("2014-01-01"), ImmutableMap.of(IssueAuthorizationDao.PROJECT_KEY, "org.sonar:sample"));
- assertThat(session.getActionCount()).isEqualTo(1);
+ // SynchronizeAfter adds an implicit action (refresh) after execution of synchronization
+ assertThat(session.getActionCount()).isEqualTo(2);
}
}
*/
package org.sonar.core.cluster;
+import java.util.List;
import java.util.concurrent.Callable;
-public interface ClusterAction<K> extends Callable<K> {
+public interface ClusterAction<K> extends Callable<List<K>> {
@Override
- public K call() throws Exception;
+ public List<K> call() throws Exception;
}