import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.sonar.api.rule.RuleKey;
import org.sonar.api.rule.RuleStatus;
/** Creating updateRequest */
requests.add(new UpdateRequest()
- .replicationType(ReplicationType.ASYNC)
- .consistencyLevel(WriteConsistencyLevel.QUORUM)
.id(rule.getKey().toString())
.doc(update)
.upsert(upsert));
import org.sonar.server.search.action.IndexActionRequest;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
}
try {
+ long normTime = System.currentTimeMillis();
BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
Map<String, Index> indexes = getIndexMap();
+ Set<String> indices = new HashSet<String>();
for (IndexActionRequest action : actions) {
- action.setIndex(indexes.get(action.getIndexType()));
+ Index index = indexes.get(action.getIndexType());
+ action.setIndex(index);
+ indices.add(index.getIndexName());
}
- ExecutorService executorService = Executors.newFixedThreadPool(4);
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ // Do we need to refresh
+ boolean requiresRefresh = false;
+ for (IndexActionRequest action : actions) {
+ if (action.needsRefresh()) {
+ requiresRefresh = true;
+ break;
+ }
+ }
//invokeAll() blocks until ALL tasks submitted to executor complete
for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
for (ActionRequest update : updateRequests.get()) {
if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add((UpdateRequest) update);
+ bulkRequestBuilder.add(((UpdateRequest) update).refresh(false));
} else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add((DeleteRequest) update);
+ bulkRequestBuilder.add(((DeleteRequest) update).refresh(false));
} else {
throw new IllegalStateException("Un-managed request type: " + update.getClass());
}
}
}
executorService.shutdown();
-
- LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions());
+ normTime = System.currentTimeMillis() - normTime;
//execute the request
- BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true));
+ long indexTime = System.currentTimeMillis();
+ BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(false));
+ indexTime = System.currentTimeMillis() - indexTime;
+
+ long refreshTime = System.currentTimeMillis();
+ if (requiresRefresh) {
+ searchClient.admin().indices().prepareRefresh(indices.toArray(new String[indices.size()])).setForce(false).get();
+ }
+ refreshTime = System.currentTimeMillis() - refreshTime;
+
+ LOGGER.info("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+ bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
+
} catch (Exception e) {
e.printStackTrace();
}
public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
protected final String indexType;
+ private final boolean requiresRefresh;
private Index index;
- public IndexActionRequest(String indexType) {
+ protected IndexActionRequest(String indexType) {
+ this(indexType, true);
+ }
+
+ protected IndexActionRequest(String indexType, boolean requiresRefresh) {
super();
this.indexType = indexType;
+ this.requiresRefresh = requiresRefresh;
}
public abstract String getKey();
if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
((UpdateRequest) request)
.type(index.getIndexType())
- .index(index.getIndexName());
+ .index(index.getIndexName())
+ .refresh(false);
}
finalRequests.add(request);
}
}
public abstract List<ActionRequest> doCall(Index index) throws Exception;
+
+ public boolean needsRefresh() {
+ return this.requiresRefresh;
+ }
}