#!/bin/sh
-mvn clean install -DskipTests -pl :sonar-process -amd
+mvn clean install -DskipTests -pl :sonar-core,sonar-search -amd
if [[ "$OSTYPE" == "darwin"* ]]; then
OS='macosx-universal-64'
</appender>
<root>
- <level value="DEBUG"/>
+ <level value="INFO"/>
<appender-ref ref="LOGFILE"/>
</root>
<groupId>org.codehaus.sonar</groupId>
<artifactId>sonar-search</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
</dependencies>
import org.elasticsearch.search.sort.SortOrder;
import org.sonar.core.activity.Activity;
import org.sonar.core.activity.db.ActivityDto;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.QueryOptions;
import org.sonar.server.search.Result;
+import org.sonar.server.search.SearchClient;
import javax.annotation.Nullable;
import java.io.IOException;
*/
public class ActivityIndex extends BaseIndex<Activity, ActivityDto, String> {
- public ActivityIndex(ActivityNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
- super(IndexDefinition.LOG, normalizer, workQueue, node);
+ public ActivityIndex(ActivityNormalizer normalizer, SearchClient node) {
+ super(IndexDefinition.LOG, normalizer, node);
}
@Override
import org.sonar.core.persistence.Dto;
import org.sonar.server.exceptions.NotFoundException;
import org.sonar.server.search.IndexDefinition;
-import org.sonar.server.search.action.DtoIndexAction;
-import org.sonar.server.search.action.EmbeddedIndexAction;
-import org.sonar.server.search.action.IndexAction;
-import org.sonar.server.search.action.KeyIndexAction;
+import org.sonar.server.search.action.DeleteKey;
+import org.sonar.server.search.action.DeleteNestedItem;
+import org.sonar.server.search.action.UpsertDto;
+import org.sonar.server.search.action.UpsertNestedItem;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
item.setUpdatedAt(now);
doUpdate(session, item);
if (hasIndex()) {
- session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item));
+ session.enqueue(new UpsertDto(getIndexType(), item));
}
} catch (Exception e) {
throw new IllegalStateException("Fail to update item in db: " + item, e);
try {
doInsert(session, item);
if (hasIndex()) {
- session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT, item));
+ session.enqueue(new UpsertDto<E>(getIndexType(), item));
}
} catch (Exception e) {
throw new IllegalStateException("Fail to insert item in db: " + item, e.getCause());
try {
doDeleteByKey(session, key);
if (hasIndex()) {
- session.enqueue(new KeyIndexAction<K>(getIndexType(), IndexAction.Method.DELETE, key));
+ session.enqueue(new DeleteKey<K>(getIndexType(), key));
}
} catch (Exception e) {
throw new IllegalStateException("Fail to delete item from db: " + key, e);
protected final void enqueueUpdate(Object nestedItem, K key, DbSession session) {
if (hasIndex()) {
- session.enqueue(new EmbeddedIndexAction<K>(
- this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem));
+ session.enqueue(new UpsertNestedItem<K>(
+ this.getIndexType(), key, nestedItem));
}
}
public void enqueueDelete(Object nestedItem, K key, DbSession session) {
if (hasIndex()) {
- session.enqueue(new EmbeddedIndexAction<K>(
- this.getIndexType(), IndexAction.Method.DELETE, key, nestedItem));
+ session.enqueue(new DeleteNestedItem<K>(
+ this.getIndexType(), key, nestedItem));
session.commit();
}
}
public void enqueueInsert(Object nestedItem, K key, DbSession session) {
if (hasIndex()) {
- session.enqueue(new EmbeddedIndexAction<K>(
- this.getIndexType(), IndexAction.Method.UPSERT, key, nestedItem));
+ this.enqueueUpdate(nestedItem, key, session);
}
}
@Override
public final void synchronizeAfter(final DbSession session, Date date) {
for (E dto : this.findAfterDate(session, date)) {
- session.enqueue(new DtoIndexAction<E>(getIndexType(), IndexAction.Method.UPSERT,
- dto));
+ session.enqueue(new UpsertDto<E>(getIndexType(), dto));
}
session.commit();
}
ActiveRuleNormalizer.class,
RuleIndex.class,
ActiveRuleIndex.class,
- IndexQueueWorker.class,
+ //IndexQueueWorker.class,
IndexClient.class,
ActivityNormalizer.class,
ActivityIndex.class,
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.sonar.api.rule.RuleKey;
import org.sonar.api.rule.RuleStatus;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.qualityprofile.db.ActiveRuleDto;
import org.sonar.core.qualityprofile.db.ActiveRuleKey;
import org.sonar.server.qualityprofile.ActiveRule;
import org.sonar.server.rule.index.RuleNormalizer;
import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
import org.sonar.server.search.FacetValue;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
+import org.sonar.server.search.SearchClient;
import java.io.IOException;
import java.util.ArrayList;
public class ActiveRuleIndex extends BaseIndex<ActiveRule, ActiveRuleDto, ActiveRuleKey> {
- public ActiveRuleIndex(ActiveRuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
- super(IndexDefinition.ACTIVE_RULE, normalizer, workQueue, node);
+ public ActiveRuleIndex(ActiveRuleNormalizer normalizer, SearchClient node) {
+ super(IndexDefinition.ACTIVE_RULE, normalizer, node);
}
@Override
import org.sonar.core.qualityprofile.db.ActiveRuleKey;
import org.sonar.core.qualityprofile.db.ActiveRuleParamDto;
import org.sonar.core.qualityprofile.db.QualityProfileDto;
+import org.sonar.search.script.ListUpdate;
import org.sonar.server.db.DbClient;
import org.sonar.server.qualityprofile.ActiveRule;
import org.sonar.server.search.BaseNormalizer;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.Indexable;
-import org.sonar.server.search.es.ListUpdate;
import java.lang.reflect.Field;
import java.util.ArrayList;
import org.sonar.api.rule.RuleKey;
import org.sonar.api.rule.RuleStatus;
import org.sonar.api.server.debt.DebtCharacteristic;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.rule.RuleDto;
import org.sonar.server.qualityprofile.index.ActiveRuleNormalizer;
import org.sonar.server.rule.Rule;
import org.sonar.server.search.BaseIndex;
-import org.sonar.server.search.SearchClient;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.QueryOptions;
import org.sonar.server.search.Result;
+import org.sonar.server.search.SearchClient;
import javax.annotation.CheckForNull;
import java.io.IOException;
public class RuleIndex extends BaseIndex<Rule, RuleDto, RuleKey> {
- public RuleIndex(RuleNormalizer normalizer, WorkQueue workQueue, SearchClient node) {
- super(IndexDefinition.RULE, normalizer, workQueue, node);
+ public RuleIndex(RuleNormalizer normalizer, SearchClient client) {
+ super(IndexDefinition.RULE, normalizer, client);
}
protected String getKeyValue(RuleKey key) {
import org.sonar.core.rule.RuleParamDto;
import org.sonar.core.technicaldebt.db.CharacteristicDto;
import org.sonar.markdown.Markdown;
+import org.sonar.search.script.ListUpdate;
import org.sonar.server.db.DbClient;
import org.sonar.server.search.BaseNormalizer;
import org.sonar.server.search.IndexDefinition;
import org.sonar.server.search.IndexField;
import org.sonar.server.search.Indexable;
-import org.sonar.server.search.es.ListUpdate;
import java.lang.reflect.Field;
import java.util.ArrayList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolFilterBuilder;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.persistence.Dto;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
public abstract class BaseIndex<DOMAIN, DTO extends Dto<KEY>, KEY extends Serializable>
implements Index<DOMAIN, DTO, KEY> {
private final BaseNormalizer<DTO, KEY> normalizer;
private final IndexDefinition indexDefinition;
- protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer,
- WorkQueue workQueue, SearchClient client) {
+ protected BaseIndex(IndexDefinition indexDefinition, BaseNormalizer<DTO, KEY> normalizer, SearchClient client) {
this.normalizer = normalizer;
this.client = client;
this.indexDefinition = indexDefinition;
}
+ public BaseNormalizer<DTO, KEY> getNormalizer() {
+ return normalizer;
+ }
+
@Override
public final String getIndexName() {
return this.indexDefinition.getIndexName();
return null;
}
- protected void updateDocument(Collection<UpdateRequest> requests, KEY key) {
- LOG.debug("UPDATE _id:{} in index {}", key, this.getIndexName());
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- for (UpdateRequest request : requests) {
- // if request has no ID then no upsert possible!
- if (request.id() == null || request.id().isEmpty()) {
- bulkRequest.add(new IndexRequest()
- .source(request.doc().sourceAsMap())
- .type(this.getIndexType())
- .index(this.getIndexName()));
- } else {
- bulkRequest.add(request
- .id(this.getKeyValue(key))
- .index(this.getIndexName())
- .type(this.getIndexType()));
- }
- }
- BulkResponse response = client.execute(bulkRequest);
- }
-
- @Override
- public void upsert(KEY key, Object object, Object... objects) throws Exception {
- long t0 = System.currentTimeMillis();
- List<UpdateRequest> requests = this.normalizer.normalizeNested(object, key);
- for (Object additionalObject : objects) {
- requests.addAll(this.normalizer.normalizeNested(additionalObject, key));
- }
- long t1 = System.currentTimeMillis();
- this.updateDocument(requests, key);
- long t2 = System.currentTimeMillis();
- LOG.debug("UPSERT [object] time:{}ms ({}ms normalize, {}ms elastic)",
- t2 - t0, t1 - t0, t2 - t1);
- }
-
- @Override
- public void upsert(DTO item, DTO... items) {
- try {
- long t0 = System.currentTimeMillis();
- List<UpdateRequest> requests = normalizer.normalize(item);
- for (DTO additionalItem : items) {
- requests.addAll(normalizer.normalize(additionalItem));
- }
- long t1 = System.currentTimeMillis();
- this.updateDocument(requests, item.getKey());
- long t2 = System.currentTimeMillis();
- LOG.debug("UPSERT [dto] time:{}ms ({}ms normalize, {}ms elastic)",
- t2 - t0, t1 - t0, t2 - t1);
- } catch (Exception e) {
- LOG.error("Could not update document for index {}: {}",
- this.getIndexName(), e.getMessage(), e);
- }
- }
-
- private void deleteDocument(KEY key) throws ExecutionException, InterruptedException {
- LOG.debug("DELETE _id:{} in index {}", key, this.getIndexName());
- DeleteRequestBuilder request = client
- .prepareDelete()
- .setIndex(this.getIndexName())
- .setType(this.getIndexType())
- .setId(this.getKeyValue(key));
- DeleteResponse response = client.execute(request);
- }
-
- @Override
- public void delete(KEY key, Object object, Object... objects) throws Exception {
- LOG.debug("DELETE NESTED _id:{} in index {}", key, this.getIndexName());
- List<UpdateRequest> requests = this.normalizer.deleteNested(object, key);
- for (Object additionalObject : objects) {
- requests.addAll(this.normalizer.deleteNested(additionalObject, key));
- }
- this.updateDocument(requests, key);
- }
-
- @Override
- public void deleteByKey(KEY key, KEY... keys) {
- try {
- this.deleteDocument(key);
- for (KEY additionalKey : keys) {
- this.deleteDocument(additionalKey);
- }
- } catch (Exception e) {
- throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s",
- getKeyValue(key), getIndexName()), e);
- }
- }
-
- @Override
- public void deleteByDto(DTO item, DTO... items) {
- try {
- this.deleteDocument(item.getKey());
- for (DTO additionalItem : items) {
- this.deleteDocument(additionalItem.getKey());
- }
- } catch (Exception e) {
- throw new IllegalStateException(String.format("Could not DELETE _id = '%s' for index '%s",
- getKeyValue(item.getKey()), getIndexName()), e);
- }
- }
-
/* ES QueryHelper Methods */
protected BoolFilterBuilder addTermFilter(BoolFilterBuilder filter, String field, @Nullable Collection<String> values) {
void refresh();
- void upsert(KEY key, Object object, Object... objects) throws Exception;
-
- void upsert(DTO dto, DTO... dtos);
-
- void delete(KEY key, Object object, Object... objects) throws Exception;
-
- void deleteByKey(KEY key, KEY... keys);
-
- void deleteByDto(DTO dto, DTO... dtos);
-
Date getLastSynchronization();
IndexStat getIndexStat();
Iterator<DOMAIN> scroll(String scrollId);
+
+ BaseNormalizer<DTO, KEY> getNormalizer();
}
*/
package org.sonar.server.search;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.api.ServerComponent;
import org.sonar.api.config.Settings;
+import org.sonar.api.platform.ComponentContainer;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.profiling.Profiling;
-import org.sonar.core.profiling.StopWatch;
-import org.sonar.server.search.action.EmbeddedIndexAction;
-import org.sonar.server.search.action.IndexAction;
+import org.sonar.server.search.action.IndexActionRequest;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
public class IndexQueue extends LinkedBlockingQueue<Runnable>
- implements ServerComponent, WorkQueue<IndexAction> {
+ implements ServerComponent, WorkQueue<IndexActionRequest> {
protected final Profiling profiling;
+ private final SearchClient searchClient;
+ private final ComponentContainer container;
+
private static final Logger LOGGER = LoggerFactory.getLogger(IndexQueue.class);
private static final Integer DEFAULT_QUEUE_SIZE = 200;
private static final int TIMEOUT = 30000;
- public IndexQueue(Settings settings) {
+ public IndexQueue(Settings settings, SearchClient searchClient, ComponentContainer container) {
super(DEFAULT_QUEUE_SIZE);
+ this.searchClient = searchClient;
+ this.container = container;
this.profiling = new Profiling(settings);
}
@Override
- public void enqueue(IndexAction action) {
- this.enqueue(ImmutableList.of(action));
- }
-
- @Override
- public void enqueue(List<IndexAction> actions) {
-
-
- int bcount = 0;
- int ecount = 0;
- List<String> refreshes = Lists.newArrayList();
- Set<String> types = Sets.newHashSet();
- long all_start = System.currentTimeMillis();
- long indexTime;
- long refreshTime;
- long embeddedTime;
-
- if (actions.size() == 1) {
- /* Atomic update here */
- CountDownLatch latch = new CountDownLatch(1);
- IndexAction action = actions.get(0);
- action.setLatch(latch);
- try {
- indexTime = System.currentTimeMillis();
- this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
- if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
- }
- bcount++;
- indexTime = System.currentTimeMillis() - indexTime;
- // refresh the index.
- Index<?, ?, ?> index = action.getIndex();
- if (index != null) {
- refreshTime = System.currentTimeMillis();
- index.refresh();
- refreshTime = System.currentTimeMillis() - refreshTime;
- refreshes.add(index.getIndexName());
- }
- types.add(action.getPayloadClass().getSimpleName());
- } catch (InterruptedException e) {
- throw new IllegalStateException("ES update has been interrupted", e);
- }
- } else if (actions.size() > 1) {
- StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
-
- /* Purge actions that would be overridden */
- Long purgeStart = System.currentTimeMillis();
- List<IndexAction> itemActions = Lists.newArrayList();
- List<IndexAction> embeddedActions = Lists.newArrayList();
-
- for (IndexAction action : actions) {
- if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
- embeddedActions.add(action);
- } else {
- itemActions.add(action);
- }
- }
+ public void enqueue(List<IndexActionRequest> actions) {
- LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
- actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
+ if (actions.isEmpty()) {
+ return;
+ }
+ try {
- try {
- /* execute all item actions */
- CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
- indexTime = System.currentTimeMillis();
- for (IndexAction action : itemActions) {
- action.setLatch(itemLatch);
- this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
- types.add(action.getPayloadClass().getSimpleName());
- bcount++;
+ BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(searchClient);
+ Map<String,Index> indexes = getIndexMap();
+ for (IndexActionRequest action : actions) {
+ action.setIndex(indexes.get(action.getIndexType()));
+ }
- }
- if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
- }
- indexTime = System.currentTimeMillis() - indexTime;
-
- /* and now push the embedded */
- CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
- embeddedTime = System.currentTimeMillis();
- for (IndexAction action : embeddedActions) {
- action.setLatch(embeddedLatch);
- this.offer(action, TIMEOUT, TimeUnit.SECONDS);
- types.add(action.getPayloadClass().getSimpleName());
- ecount++;
- }
- if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
- }
- embeddedTime = System.currentTimeMillis() - embeddedTime;
-
- /* Finally refresh affected indexes */
- Set<String> refreshedIndexes = new HashSet<String>();
- refreshTime = System.currentTimeMillis();
- for (IndexAction action : actions) {
- if (action.getIndex() != null &&
- !refreshedIndexes.contains(action.getIndex().getIndexName())) {
- refreshedIndexes.add(action.getIndex().getIndexName());
- action.getIndex().refresh();
- refreshes.add(action.getIndex().getIndexName());
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+ //invokeAll() blocks until ALL tasks submitted to executor complete
+ for (Future<List<ActionRequest>> updateRequests : executorService.invokeAll(actions)) {
+ for (ActionRequest update : updateRequests.get()) {
+ if (UpdateRequest.class.isAssignableFrom(update.getClass())) {
+ bulkRequestBuilder.add((UpdateRequest)update);
+ } else if (DeleteRequest.class.isAssignableFrom(update.getClass())) {
+ bulkRequestBuilder.add((DeleteRequest)update);
+ } else {
+ throw new IllegalStateException("Un-managed request type: " + update.getClass());
}
}
- refreshTime = System.currentTimeMillis() - refreshTime;
- } catch (InterruptedException e) {
- throw new IllegalStateException("ES update has been interrupted", e);
}
+ executorService.shutdown();
- basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
- (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
- StringUtils.join(types, ","),
- bcount, ecount, StringUtils.join(refreshes, ","));
+ LOGGER.info("Executing batch request of size: " + bulkRequestBuilder.numberOfActions());
+
+ //execute the request
+ BulkResponse response = searchClient.execute(bulkRequestBuilder.setRefresh(true));
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
+
+ private Map<String, Index> getIndexMap() {
+ Map<String, Index> indexes = new HashMap<String, Index>();
+ for (Index index : container.getComponentsByType(Index.class)) {
+ indexes.put(index.getIndexType(), index);
+ }
+ return indexes;
+ }
+
+
+//
+// int bcount = 0;
+// int ecount = 0;
+// List<String> refreshes = Lists.newArrayList();
+// Set<String> types = Sets.newHashSet();
+// long all_start = System.currentTimeMillis();
+// long indexTime;
+// long refreshTime;
+// long embeddedTime;
+//
+// if (actions.size() == 1) {
+// /* Atomic update here */
+// CountDownLatch latch = new CountDownLatch(1);
+// IndexAction action = actions.get(0);
+// action.setLatch(latch);
+// try {
+// indexTime = System.currentTimeMillis();
+// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
+// if (!latch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
+// }
+// bcount++;
+// indexTime = System.currentTimeMillis() - indexTime;
+// // refresh the index.
+// Index<?, ?, ?> index = action.getIndex();
+// if (index != null) {
+// refreshTime = System.currentTimeMillis();
+// index.refresh();
+// refreshTime = System.currentTimeMillis() - refreshTime;
+// refreshes.add(index.getIndexName());
+// }
+// types.add(action.getPayloadClass().getSimpleName());
+// } catch (InterruptedException e) {
+// throw new IllegalStateException("ES update has been interrupted", e);
+// }
+// } else if (actions.size() > 1) {
+// StopWatch basicProfile = profiling.start("search", Profiling.Level.BASIC);
+//
+// /* Purge actions that would be overridden */
+// Long purgeStart = System.currentTimeMillis();
+// List<IndexAction> itemActions = Lists.newArrayList();
+// List<IndexAction> embeddedActions = Lists.newArrayList();
+//
+// for (IndexAction action : actions) {
+// if (action.getClass().isAssignableFrom(EmbeddedIndexAction.class)) {
+// embeddedActions.add(action);
+// } else {
+// itemActions.add(action);
+// }
+// }
+//
+// LOGGER.debug("INDEX - compressed {} items into {} in {}ms,",
+// actions.size(), itemActions.size() + embeddedActions.size(), System.currentTimeMillis() - purgeStart);
+//
+// try {
+// /* execute all item actions */
+// CountDownLatch itemLatch = new CountDownLatch(itemActions.size());
+// indexTime = System.currentTimeMillis();
+// for (IndexAction action : itemActions) {
+// action.setLatch(itemLatch);
+// this.offer(action, TIMEOUT, TimeUnit.MILLISECONDS);
+// types.add(action.getPayloadClass().getSimpleName());
+// bcount++;
+//
+// }
+// if (!itemLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+// throw new IllegalStateException("ES update could not be completed within: " + TIMEOUT + "ms");
+// }
+// indexTime = System.currentTimeMillis() - indexTime;
+//
+// /* and now push the embedded */
+// CountDownLatch embeddedLatch = new CountDownLatch(embeddedActions.size());
+// embeddedTime = System.currentTimeMillis();
+// for (IndexAction action : embeddedActions) {
+// action.setLatch(embeddedLatch);
+// this.offer(action, TIMEOUT, TimeUnit.SECONDS);
+// types.add(action.getPayloadClass().getSimpleName());
+// ecount++;
+// }
+// if (!embeddedLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)) {
+// throw new IllegalStateException("ES embedded update could not be completed within: " + TIMEOUT + "ms");
+// }
+// embeddedTime = System.currentTimeMillis() - embeddedTime;
+//
+// /* Finally refresh affected indexes */
+// Set<String> refreshedIndexes = new HashSet<String>();
+// refreshTime = System.currentTimeMillis();
+// for (IndexAction action : actions) {
+// if (action.getIndex() != null &&
+// !refreshedIndexes.contains(action.getIndex().getIndexName())) {
+// refreshedIndexes.add(action.getIndex().getIndexName());
+// action.getIndex().refresh();
+// refreshes.add(action.getIndex().getIndexName());
+// }
+// }
+// refreshTime = System.currentTimeMillis() - refreshTime;
+// } catch (InterruptedException e) {
+// throw new IllegalStateException("ES update has been interrupted", e);
+// }
+//
+// basicProfile.stop("INDEX - time:%sms (%sms index, %sms embedded, %sms refresh)\ttypes:[%s],\tbulk:%s\tembedded:%s\trefresh:[%s]",
+// (System.currentTimeMillis() - all_start), indexTime, embeddedTime, refreshTime,
+// StringUtils.join(types, ","),
+// bcount, ecount, StringUtils.join(refreshes, ","));
+// }
+
+
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search;
-
-import org.picocontainer.Startable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonar.api.ServerComponent;
-import org.sonar.server.search.action.IndexAction;
-
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class IndexQueueWorker extends ThreadPoolExecutor
- implements ServerComponent, Startable {
-
- private static final Logger LOG = LoggerFactory.getLogger(IndexQueueWorker.class);
-
- private final IndexClient indexes;
-
- public IndexQueueWorker(IndexQueue queue, IndexClient indexes) {
- super(1,1, 0L, TimeUnit.MILLISECONDS, queue);
- this.indexes = indexes;
- }
-
- protected void beforeExecute(Thread t, Runnable r) {
- LOG.debug("Starting task: {}", r);
- super.beforeExecute(t, r);
- if (IndexAction.class.isAssignableFrom(r.getClass())) {
- IndexAction ia = (IndexAction) r;
- LOG.debug("Task is an IndexAction for {}", ia.getIndexType());
- ia.setIndex(indexes.getByType(ia.getIndexType()));
- }
- }
-
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- if (t != null) {
- throw new IllegalStateException(t);
- }
- }
-
- @Override
- public void start() {
- this.prestartAllCoreThreads();
- }
-
- @Override
- public void stop() {
- this.shutdown();
- }
-}
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.sonar.api.config.Settings;
import org.sonar.core.profiling.Profiling;
import org.sonar.core.profiling.StopWatch;
*/
public class SearchClient extends TransportClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SearchClient.class);
+
private static final String DEFAULT_HEALTH_TIMEOUT = "30s";
private final Settings settings;
public <K extends ActionResponse> K execute(ActionRequestBuilder request) {
StopWatch fullProfile = profiling.start("search", Profiling.Level.FULL);
- ListenableActionFuture acc = request.execute();
+ K response = null;
try {
- K response = (K) acc.get();
+ response = (K) request.get();
if (profiling.isProfilingEnabled(Profiling.Level.BASIC)) {
if (ToXContent.class.isAssignableFrom(request.getClass())) {
}
return response;
} catch (Exception e) {
+ LOGGER.error("could not execute request: " + response);
throw new IllegalStateException("ES error: ", e);
+
}
}
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.client.Requests;
+import org.sonar.core.persistence.Dto;
+import org.sonar.server.search.Index;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeleteDto<DTO extends Dto> extends IndexActionRequest {
+
+ private final DTO dto;
+
+ public DeleteDto(String indexType, DTO dto) {
+ super(indexType);
+ this.dto = dto;
+ }
+
+ @Override
+ public String getKey() {
+ return dto.getKey().toString();
+ }
+
+ @Override
+ public Class<?> getPayloadClass() {
+ return dto.getClass();
+ }
+
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> requests = new ArrayList<ActionRequest>();
+ requests.add(Requests.deleteRequest(index.getIndexName())
+ .id(dto.getKey().toString())
+ .type(indexType));
+ return requests;
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.client.Requests;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeleteKey<K extends Serializable> extends IndexActionRequest {
+
+ private final K key;
+
+ public DeleteKey(String indexType, K key) {
+ super(indexType);
+ this.key = key;
+ }
+
+ @Override
+ public String getKey() {
+ return key.toString();
+ }
+
+ @Override
+ public Class<?> getPayloadClass() {
+ throw new IllegalStateException("Deletion by key does not have an object payload!");
+ }
+
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> requests = new ArrayList<ActionRequest>();
+ requests.add(Requests.deleteRequest(index.getIndexName())
+ .id(getKey())
+ .type(indexType));
+ return requests;
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeleteNestedItem<K extends Serializable> extends IndexActionRequest {
+
+ private final K key;
+ private final Object item;
+ private final Object[] items;
+
+ public DeleteNestedItem(String indexType, K key, Object item, Object... items) {
+ super(indexType);
+ this.key = key;
+ this.item = item;
+ this.items = items;
+ }
+
+ @Override
+ public String getKey() {
+ return this.key.toString();
+ }
+
+ @Override
+ public Class<?> getPayloadClass() {
+ return item.getClass();
+ }
+
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> updates = new ArrayList<ActionRequest>();
+ updates.addAll(deleteItem(index, item, key));
+ for (Object otherItem : items) {
+ updates.addAll(deleteItem(index, otherItem, key));
+ }
+ return updates;
+ }
+
+ private List<ActionRequest> deleteItem(Index index, Object item, K key) {
+ return index.getNormalizer().deleteNested(item, key);
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import org.sonar.core.persistence.Dto;
-
-public class DtoIndexAction<E extends Dto> extends IndexAction {
-
- private final E item;
- private final E[] items;
-
- public DtoIndexAction(String indexType, Method method, E item, E... items) {
- super(indexType, method);
- this.item = item;
- this.items = items;
- }
-
- @Override
- public Class<?> getPayloadClass() {
- return item.getClass();
- }
-
- @Override
- public String getKey() {
- return item.getKey().toString();
- }
-
- @Override
- public void doExecute() {
- try {
- if (this.getMethod().equals(Method.DELETE)) {
- index.deleteByDto(this.item, this.items);
- } else if (this.getMethod().equals(Method.UPSERT)) {
- index.upsert(this.item, this.items);
- }
- } catch (Exception e) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
- " cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() +
- " as " + this.getIndexType() +
- " on key: " + this.item.getKey(), e);
- }
- }
-}
-
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import java.io.Serializable;
-
-public class EmbeddedIndexAction<K extends Serializable> extends IndexAction {
-
- private final K key;
- private final Object item;
- private final Object[] items;
-
- public EmbeddedIndexAction(String indexType, Method method, K key, Object item, Object... items) {
- super(indexType, method);
- this.indexType = indexType;
- this.method = method;
- this.key = key;
- this.item = item;
- this.items = items;
- }
-
- @Override
- public String getKey() {
- return this.key.toString();
- }
-
- @Override
- public Class<?> getPayloadClass() {
- return item.getClass();
- }
-
- @Override
- public void doExecute() {
-
- try {
- if (this.getMethod().equals(Method.DELETE)) {
- index.delete(this.key, this.item, this.items);
- } else if (this.getMethod().equals(Method.UPSERT)) {
- index.upsert(this.key, this.item, this.items);
- }
- } catch (Exception e) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
- "cannot execute " + this.getMethod() + " for " + this.item.getClass().getSimpleName() +
- " as " + this.getIndexType() +
- " on key: " + this.key, e);
- }
- }
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import org.sonar.core.cluster.QueueAction;
-import org.sonar.server.search.Index;
-
-public abstract class IndexAction extends QueueAction {
-
-
- public abstract Class<?> getPayloadClass();
-
- public enum Method {
- UPSERT, DELETE
- }
-
- protected String indexType;
- protected Method method;
- protected Index index;
-
- public IndexAction(String indexType, Method method) {
- super();
- this.indexType = indexType;
- this.method = method;
- }
-
- public abstract String getKey();
-
- public Method getMethod() {
- return this.method;
- }
-
- public String getIndexType() {
- return indexType;
- }
-
- public void setMethod(Method method) {
- this.method = method;
- }
-
- public void setIndex(Index index) {
- this.index = index;
- }
-
- public Index getIndex() {
- return index;
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.sonar.core.cluster.ClusterAction;
+import org.sonar.server.search.Index;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class IndexActionRequest implements ClusterAction<List<ActionRequest>> {
+
+ protected final String indexType;
+ private Index index;
+
+ public IndexActionRequest(String indexType) {
+ super();
+ this.indexType = indexType;
+ }
+
+ public abstract String getKey();
+
+ public abstract Class<?> getPayloadClass();
+
+ public String getIndexType() {
+ return indexType;
+ }
+
+
+ public void setIndex(Index index) {
+ this.index = index;
+ }
+
+ @Override
+ public final List<ActionRequest> call() throws Exception {
+ if (index == null) {
+ throw new IllegalStateException("Cannot execute request - Index is null");
+ }
+ List<ActionRequest> finalRequests = new ArrayList<ActionRequest>();
+ for (ActionRequest request : doCall(index)) {
+ if (request.getClass().isAssignableFrom(UpdateRequest.class)) {
+ ((UpdateRequest) request)
+ .type(index.getIndexType())
+ .index(index.getIndexName());
+ }
+ finalRequests.add(request);
+ }
+ return finalRequests;
+ }
+
+ public abstract List<ActionRequest> doCall(Index index) throws Exception;
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.action;
-
-import java.io.Serializable;
-
-public class KeyIndexAction<K extends Serializable> extends IndexAction {
-
- private final K key;
- private final K[] keys;
-
- public KeyIndexAction(String indexType, Method method, K key, K... keys) {
- super(indexType, method);
- this.key = key;
- this.keys = keys;
- }
-
- @Override
- public Class<?> getPayloadClass() {
- return String.class;
- }
-
- @Override
- public String getKey() {
- return this.key.toString();
- }
-
- @Override
- public void doExecute() {
- try {
- if (this.getMethod().equals(Method.DELETE)) {
- index.deleteByKey(this.key, this.keys);
- } else if (this.getMethod().equals(Method.UPSERT)) {
- throw new IllegalStateException("Upsert by Key is not supported anymore");
- }
- } catch (Exception e) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
- "cannot execute " + this.getMethod() + " for " + this.key.getClass().getSimpleName() +
- " on type: " + this.getIndexType() +
- " on key: " + this.key, e);
- }
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.core.persistence.Dto;
+import org.sonar.server.search.Index;
+
+import java.util.List;
+
+public class UpsertDto<DTO extends Dto> extends IndexActionRequest {
+
+ private final DTO dto;
+
+ public UpsertDto(String indexType, DTO dto) {
+ super(indexType);
+ this.dto = dto;
+ }
+
+ @Override
+ public String getKey() {
+ return dto.getKey().toString();
+ }
+
+ @Override
+ public Class<?> getPayloadClass() {
+ return dto.getClass();
+ }
+
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ return index.getNormalizer().normalize(dto);
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.sonar.server.search.Index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UpsertNestedItem<K extends Serializable> extends IndexActionRequest {
+
+
+ private final K key;
+ private final Object item;
+ private final Object[] items;
+
+ public UpsertNestedItem(String indexType, K key, Object item, Object... items) {
+ super(indexType);
+ this.key = key;
+ this.item = item;
+ this.items = items;
+ }
+
+ @Override
+ public String getKey() {
+ return key.toString();
+ }
+
+ @Override
+ public Class<?> getPayloadClass() {
+ return item.getClass();
+ }
+
+ @Override
+ public List<ActionRequest> doCall(Index index) throws Exception {
+ List<ActionRequest> updates = new ArrayList<ActionRequest>();
+ updates.addAll(normalizeItem(index, item, key));
+ for (Object otherItem : items) {
+ updates.addAll(normalizeItem(index, otherItem, key));
+ }
+ return updates;
+ }
+
+ private List<ActionRequest> normalizeItem(Index index, Object item, K key) {
+ return index.getNormalizer().normalizeNested(item, key);
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.search.es;
-
-import com.google.common.collect.ImmutableSet;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.xcontent.support.XContentMapValues;
-import org.elasticsearch.script.AbstractExecutableScript;
-import org.elasticsearch.script.ExecutableScript;
-import org.elasticsearch.script.NativeScriptFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * @since 4.4
- */
-public class ListUpdate extends AbstractExecutableScript {
-
- public static final String NAME = "listUpdate";
-
- public static final String ID_FIELD = "idField";
- public static final String ID_VALUE = "idValue";
- public static final String FIELD = "field";
- public static final String VALUE = "value";
-
- public static class UpdateListScriptFactory implements NativeScriptFactory {
- @Override
- public ExecutableScript newScript(@Nullable Map<String, Object> params) {
- String idField = XContentMapValues.nodeStringValue(params.get(ID_FIELD), null);
- String idValue = XContentMapValues.nodeStringValue(params.get(ID_VALUE), null);
- String field = XContentMapValues.nodeStringValue(params.get(FIELD), null);
- Map value = null;
- if (idField == null) {
- throw new IllegalStateException("Missing '" + ID_FIELD + "' parameter");
- }
- if (idValue == null) {
- throw new IllegalStateException("Missing '" + ID_VALUE + "' parameter");
- }
- if (field == null) {
- throw new IllegalStateException("Missing '" + FIELD + "' parameter");
- }
-
- //NULL case is deletion of nested item
- if (params.containsKey(VALUE)) {
- Object obj = params.get(VALUE);
- if (obj != null) {
- value = XContentMapValues.nodeMapValue(params.get(VALUE), "Update item");
- }
- }
-
- return new ListUpdate(idField, idValue, field, value);
- }
- }
-
-
- private final String idField;
- private final String idValue;
- private final String field;
- private final Map<String, Object> value;
-
- private Map<String, Object> ctx;
-
- public ListUpdate(String idField, String idValue, String field, Map<String, Object> value) {
- this.idField = idField;
- this.idValue = idValue;
- this.field = field;
- this.value = value;
- }
-
- @Override
- public void setNextVar(String name, Object value) {
- if (name.equals("ctx")) {
- ctx = (Map<String, Object>) value;
- }
- }
-
- @Override
- public Object unwrap(Object value) {
- return value;
- }
-
- @Override
- public Object run() {
- try {
- //Get the Document's source from ctx
- Map<String, Object> source = XContentMapValues.nodeMapValue(ctx.get("_source"), "source from context");
-
- //Get the Object for list update
- Object fieldValue = source.get(field);
-
- if (fieldValue == null && value != null) {
- // 0. The field does not exist (this is a upsert then)
- source.put(field, value);
- } else if (!XContentMapValues.isArray(fieldValue) && value != null) {
- // 1. The field is not yet a list
- Map currentFieldValue = XContentMapValues.nodeMapValue(fieldValue, "current FieldValue");
- if (XContentMapValues.nodeStringValue(currentFieldValue.get(idField), null).equals(idValue)) {
- source.put(field, value);
- } else {
- source.put(field, ImmutableSet.of(fieldValue, value));
- }
- } else {
- // 3. field is a list
- Collection items = (Collection) fieldValue;
- Object target = null;
- for (Object item : items) {
- Map<String, Object> fields = (Map<String, Object>) item;
- String itemIdValue = XContentMapValues.nodeStringValue(fields.get(idField), null);
- if (itemIdValue != null && itemIdValue.equals(idValue)) {
- target = item;
- break;
- }
- }
- if (target != null) {
- items.remove(target);
- }
-
- //Supporting the update by NULL = deletion case
- if (value != null) {
- items.add(value);
- }
- source.put(field, items);
- }
- } catch (Exception e) {
- throw new IllegalStateException("failed to execute listUpdate script", e);
- }
- return null;
-
- }
-}
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.sonar.api.config.Settings;
-import org.sonar.core.cluster.NullQueue;
import org.sonar.process.MonitoredProcess;
import org.sonar.process.NetworkUtils;
import org.sonar.process.Props;
private BaseIndex getIndex(final SearchClient searchClient) {
BaseIndex index = new BaseIndex(
IndexDefinition.TEST,
- null, new NullQueue(), searchClient) {
+ null, searchClient) {
@Override
protected String getKeyValue(Serializable key) {
return null;
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.1.2</version>
+ </dependency>
</dependencies>
<build>
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.core.cluster;
+
+import java.util.concurrent.Callable;
+
+public interface ClusterAction<K> extends Callable<K> {
+
+ @Override
+ public K call() throws Exception;
+}
import java.util.List;
-public class NullQueue implements WorkQueue<QueueAction> {
+public class NullQueue implements WorkQueue<ClusterAction> {
@Override
- public void enqueue(QueueAction action) {
- // do nothing
- }
-
- @Override
- public void enqueue(List<QueueAction> actions) {
+ public void enqueue(List<ClusterAction> actions) {
// do nothing
}
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.core.cluster;
-
-import java.util.concurrent.CountDownLatch;
-
-public abstract class QueueAction implements Runnable {
-
- protected CountDownLatch latch;
-
- public QueueAction() {
- this.latch = null;
- }
-
- public void setLatch(CountDownLatch latch){
- this.latch = latch;
- }
-
- public abstract void doExecute();
-
- @Override
- public void run(){
- this.doExecute();
- if (latch != null){
- latch.countDown();
- }
- }
-}
package org.sonar.core.cluster;
import java.util.List;
+import java.util.concurrent.Callable;
-public interface WorkQueue<K extends QueueAction> {
-
- void enqueue(K action);
+public interface WorkQueue<K extends Callable> {
void enqueue(List<K> actions);
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
-import org.sonar.core.cluster.QueueAction;
+import org.sonar.core.cluster.ClusterAction;
import org.sonar.core.cluster.WorkQueue;
import java.sql.Connection;
public class DbSession implements SqlSession {
- private static final Integer IMPLICIT_COMMIT_SIZE = 200;
- private List<QueueAction> actions;
+ private static final Integer IMPLICIT_COMMIT_SIZE = 1000;
+ private List<ClusterAction> actions;
private WorkQueue queue;
private SqlSession session;
DbSession(WorkQueue queue, SqlSession session) {
this.session = session;
this.queue = queue;
- this.actions = new ArrayList<QueueAction>();
+ this.actions = new ArrayList<ClusterAction>();
}
- public void enqueue(QueueAction action) {
+ public void enqueue(ClusterAction action) {
this.actions.add(action);
if (this.actions.size() > IMPLICIT_COMMIT_SIZE) {
this.commit();