try {
BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
- Map<String,Index> indexes = getIndexMap();
+ Map<String, Index> indexes = getIndexMap();
for (IndexActionRequest action : actions) {
action.setIndex(indexes.get(action.getIndexType()));
}
for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
for (ActionRequest update : updateRequests.get()) {
if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add((UpdateRequest)update);
+ bulkRequestBuilder.add((UpdateRequest) update);
} else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
- bulkRequestBuilder.add((DeleteRequest)update);
+ bulkRequestBuilder.add((DeleteRequest) update);
} else {
throw new IllegalStateException("Un-managed request type: " + update.getClass());
}
}
return indexes;
}
-
-
-//
-// int bcount = 0;
-// int ecount = 0;
-// List<String> refreshes = Lists.newArrayList();
-// Set<String> types = Sets.newHashSet();
-// long all_start = System.currentTimeMillis();
-// long indexTime;
-// long refreshTime;
-// long embeddedTime;
-//
-// if (actions.size() == 1) {
-// /* Atomic update here */
-// CountDownLatch latch = new CountDownLatch(1);
-// IndexAction action = actions.get(0);
-// action.setLatch(latch);
-// try {
-// indexTime = System.currentTimeMillis();
-// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
-// if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
-// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
-// }
-// bcount++;
-// indexTime = System.currentTimeMillis() - indexTime;
-// // refresh the index.
-// Index<?, ?, ?> index = action.getIndex();
-// if (index != null) {
-// refreshTime = System.currentTimeMillis();
-// index.refresh();
-// refreshTime = System.currentTimeMillis() - refreshTime;
-// refreshes.add(index.getIndexName());
-// }
-// types.add(action.getPayloadClass().getSimpleName());
-// } catch (InterruptedException e) {
-// throw new IllegalStateException("ES update has been interrupted", e);
-// }
-// } else if (actions.size() > 1) {
-// StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
-//
-// /* Purge actions that would be overridden */
-// Long purgeStart = System.currentTimeMillis();
-// List<IndexAction> itemActions = Lists.newArrayList();
-// List<IndexAction> embeddedActions = Lists.newArrayList();
-//
-// for (IndexAction action : actions) {
-// if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
-// embeddedActions.add(action);
-// } else {
-// itemActions.add(action);
-// }
-// }
-//
-// LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
-// actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
-//
-// try {
-// /* execute all item actions */
-// CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
-// indexTime = System.currentTimeMillis();
-// for (IndexAction action : itemActions) {
-// action.setLatch(itemLatch);
-// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
-// types.add(action.getPayloadClass().getSimpleName());
-// bcount++;
-//
-// }
-// if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
-// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
-// }
-// indexTime = System.currentTimeMillis() - indexTime;
-//
-// /* and now push the embedded */
-// CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
-// embeddedTime = System.currentTimeMillis();
-// for (IndexAction action : embeddedActions) {
-// action.setLatch(embeddedLatch);
-// this.offer(action, TIMEOUT, TimeUnit.SECONDS);
-// types.add(action.getPayloadClass().getSimpleName());
-// ecount++;
-// }
-// if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
-// throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
-// }
-// embeddedTime = System.currentTimeMillis() - embeddedTime;
-//
-// /* Finally refresh affected indexes */
-// Set<String> refreshedIndexes = new HashSet<String>();
-// refreshTime = System.currentTimeMillis();
-// for (IndexAction action : actions) {
-// if (action.getIndex() != null &&
-// !refreshedIndexes.contains(action.getIndex().getIndexName())) {
-// refreshedIndexes.add(action.getIndex().getIndexName());
-// action.getIndex().refresh();
-// refreshes.add(action.getIndex().getIndexName());
-// }
-// }
-// refreshTime = System.currentTimeMillis() - refreshTime;
-// } catch (InterruptedException e) {
-// throw new IllegalStateException("ES update has been interrupted", e);
-// }
-//
-// basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
-// (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
-// StringUtils.join(types, ","),
-// bcount, ecount, StringUtils.join(refreshes, ","));
-// }
-
-
}