import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
@Override
public void upsert(Object obj, KEY key) throws Exception {
- this.updateDocument(this.normalizer.normalizeNested(obj, key), key);
+ long t0 = System.currentTimeMillis();
+ List<UpdateRequest> requests = this.normalizer.normalizeNested(obj, key);
+ long t1 = System.currentTimeMillis();
+ this.updateDocument(requests, key);
+ long t2 = System.currentTimeMillis();
+ LOG.debug("UPSERT [object] time:{}ms ({}ms normalize, {}ms elastic)",
+ t2-t0, t1-t0, t2-t1);
}
@Override
public void upsertByDto(DTO item) {
try {
- this.updateDocument(normalizer.normalize(item), item.getKey());
+ long t0 = System.currentTimeMillis();
+ List<UpdateRequest> request = normalizer.normalize(item);
+ long t1 = System.currentTimeMillis();
+ this.updateDocument(request, item.getKey());
+ long t2 = System.currentTimeMillis();
+ LOG.debug("UPSERT [dto] time:{}ms ({}ms normalize, {}ms elastic)",
+ t2-t0, t1-t0, t2-t1);
} catch (Exception e) {
LOG.error("Could not update document for index {}: {}",
this.getIndexName(), e.getMessage(), e);
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.sonar.api.ServerComponent;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.server.search.action.EmbeddedIndexAction;
public class IndexQueue extends LinkedBlockingQueue<Runnable>
implements ServerComponent, WorkQueue<IndexAction> {
- private static final Integer DEFAULT_QUEUE_SIZE = 20;
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexQueue.class);
+
+ private static final Integer DEFAULT_QUEUE_SIZE = 200;
public IndexQueue() {
super(DEFAULT_QUEUE_SIZE);
@Override
public void enqueue(List<IndexAction> actions) {
+ int bcount = 0;
+ int ecount = 0;
+ List<String> refreshes = Lists.newArrayList();
+ Set<String> types = Sets.newHashSet();
+ long all_start = System.currentTimeMillis();
+ long indexTime;
+ long refreshTime;
+ long embeddedTime;
+
if (actions.size() == 1) {
/* Atomic update here */
CountDownLatch latch = new CountDownLatch(1);
IndexAction action = actions.get(0);
action.setLatch(latch);
try {
+ indexTime = System.currentTimeMillis();
this.offer(action, 1000, TimeUnit.SECONDS);
- latch.await(1500, TimeUnit.MILLISECONDS);
+ latch.await(1000, TimeUnit.MILLISECONDS);
+ bcount ++;
+ indexTime = System.currentTimeMillis() - indexTime;
// refresh the index.
+ refreshTime = System.currentTimeMillis();
action.getIndex().refresh();
+ refreshTime = System.currentTimeMillis() - refreshTime;
+ refreshes.add(action.getIndex().getIndexName());
+ types.add(action.getPayloadClass().getSimpleName());
} catch (InterruptedException e) {
throw new IllegalStateException("ES update has been interrupted", e);
}
/* execute all item actions */
Multimap<String, IndexAction> itemBulks = makeBulkByType(itemActions);
CountDownLatch itemLatch = new CountDownLatch(itemBulks.size());
+ indexTime = System.currentTimeMillis();
for (IndexAction action : itemBulks.values()) {
action.setLatch(itemLatch);
this.offer(action, 1000, TimeUnit.SECONDS);
+ types.add(action.getPayloadClass().getSimpleName());
+ bcount++;
+
}
- itemLatch.await(1500, TimeUnit.MILLISECONDS);
+ itemLatch.await(2000, TimeUnit.MILLISECONDS);
+ indexTime = System.currentTimeMillis() - indexTime;
/* and now push the embedded */
CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
+ embeddedTime = System.currentTimeMillis();
for (IndexAction action : embeddedActions) {
action.setLatch(embeddedLatch);
this.offer(action, 1000, TimeUnit.SECONDS);
+ types.add(action.getPayloadClass().getSimpleName());
+ ecount ++;
}
embeddedLatch.await(1500, TimeUnit.MILLISECONDS);
+ embeddedTime = System.currentTimeMillis() - embeddedTime;
/* Finally refresh affected indexes */
Set<String> refreshedIndexes = new HashSet<String>();
+ refreshTime = System.currentTimeMillis();
for (IndexAction action : actions) {
if (action.getIndex() != null &&
!refreshedIndexes.contains(action.getIndex().getIndexName())){
- action.getIndex().refresh();
refreshedIndexes.add(action.getIndex().getIndexName());
+ action.getIndex().refresh();
+ refreshes.add(action.getIndex().getIndexName());
}
}
-
+ refreshTime = System.currentTimeMillis() - refreshTime;
} catch (InterruptedException e) {
throw new IllegalStateException("ES update has been interrupted", e);
}
+ LOGGER.debug("INDEX - time:{}ms ({}ms index, {}ms embedded, {}ms refresh)\ttypes:[{}],\tbulk:{}\tembedded:{}\trefresh:[{}]",
+ (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
+ StringUtils.join(types,","),
+ bcount, ecount, StringUtils.join(refreshes, ","));
}
}