BulkRequestBuilder bulkRequestBuilder = searchClient.prepareBulk();
- long normTime = processActionsIntoQueries(bulkRequestBuilder, actions);
+ processActionsIntoQueries(bulkRequestBuilder, actions);
if (bulkRequestBuilder.numberOfActions() > 0) {
// execute the request
- long indexTime = System.currentTimeMillis();
BulkResponse response = bulkRequestBuilder.setRefresh(false).get();
- indexTime = System.currentTimeMillis() - indexTime;
-
- long refreshTime = 0;
if (refreshRequired) {
- refreshTime = this.refreshRequiredIndex(indices);
+ this.refreshRequiredIndex(indices);
}
- LOGGER.debug("-- submitted {} items with {}ms in normalization, {}ms indexing and {}ms refresh({}). Total: {}ms",
- bulkRequestBuilder.numberOfActions(), normTime, indexTime, refreshTime, indices, (normTime + indexTime + refreshTime));
-
if (response.hasFailures()) {
throw new IllegalStateException("Errors while indexing stack: " + response.buildFailureMessage());
}
}
}
- private long refreshRequiredIndex(Set<String> indices) {
- long refreshTime = System.currentTimeMillis();
+ private void refreshRequiredIndex(Set<String> indices) {
if (!indices.isEmpty()) {
RefreshRequestBuilder refreshRequest = searchClient.prepareRefresh(indices.toArray(new String[indices.size()]))
.setForce(false);
LOGGER.warn("{} Shard(s) did not refresh", refreshResponse.getFailedShards());
}
}
- return System.currentTimeMillis() - refreshTime;
}
- private long processActionsIntoQueries(BulkRequestBuilder bulkRequestBuilder, List<IndexAction<?>> actions) {
- long normTime = System.currentTimeMillis();
+ private void processActionsIntoQueries(BulkRequestBuilder bulkRequestBuilder, List<IndexAction<?>> actions) {
try {
boolean hasInlineRefreshRequest = false;
ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_NORMALIZATION_FACTOR);
} catch (Exception e) {
throw new IllegalStateException("Could not execute normalization for stack", e);
}
- return System.currentTimeMillis() - normTime;
}
private Map<String, Index> getIndexMap() {