import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-public class IndexQueue extends LinkedBlockingQueue<Runnable>
- implements ServerComponent, WorkQueue<IndexActionRequest> {
+public class IndexQueue implements ServerComponent, WorkQueue<IndexActionRequest> {
protected final Profiling profiling;
private static final Logger LOGGER = LoggerFactory.getLogger(IndexQueue.class);
- private static final Integer DEFAULT_QUEUE_SIZE = 200;
- private static final Integer CONCURRENT_NORMALIZATION_FACTOR = 5;
+ private static final Integer CONCURRENT_NORMALIZATION_FACTOR = 3;
public IndexQueue(Settings settings, SearchClient searchClient, ComponentContainer container) {
- super(DEFAULT_QUEUE_SIZE);
this.searchClient = searchClient;
this.container = container;
this.profiling = new Profiling(settings);
long refreshTime = this.refreshRequiredIndex(indices);
- LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
+ LOGGER.info("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
if (response.hasFailures()) {
private long executeNormalization(BulkRequestBuilder bulkRequestBuilder, List<IndexActionRequest> actions) {
long normTime = System.currentTimeMillis();
- ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
- //invokeAll() blocks until ALL tasks submitted to executor complete
try {
+ ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
+ //invokeAll() blocks until ALL tasks submitted to executor complete
for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
for (ActionRequest update : updateRequests.get()) {
if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
}
}
}
+ executorService.shutdown();
} catch (Exception e) {
throw new IllegalStateException("Could not execute normalization for stack", e);
}
- executorService.shutdown();
return System.currentTimeMillis() - normTime;
}