]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-8092 use BulkProcessor for es indexing
authorDaniel Schwarz <bartfastiel@users.noreply.github.com>
Tue, 9 May 2017 09:16:31 +0000 (11:16 +0200)
committerGitHub <noreply@github.com>
Tue, 9 May 2017 09:16:31 +0000 (11:16 +0200)
13 files changed:
server/sonar-server/src/main/java/org/sonar/server/component/index/ComponentIndexer.java
server/sonar-server/src/main/java/org/sonar/server/es/BulkIndexer.java
server/sonar-server/src/main/java/org/sonar/server/es/EsClient.java
server/sonar-server/src/main/java/org/sonar/server/issue/index/IssueIndexer.java
server/sonar-server/src/main/java/org/sonar/server/measure/index/ProjectMeasuresIndexer.java
server/sonar-server/src/main/java/org/sonar/server/permission/index/PermissionIndexer.java
server/sonar-server/src/main/java/org/sonar/server/qualityprofile/index/ActiveRuleIndexer.java
server/sonar-server/src/main/java/org/sonar/server/rule/index/RuleIndexer.java
server/sonar-server/src/main/java/org/sonar/server/test/index/TestIndexer.java
server/sonar-server/src/main/java/org/sonar/server/user/index/UserIndexer.java
server/sonar-server/src/main/java/org/sonar/server/view/index/ViewIndexer.java
server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerConcurrentRequestCalculationTest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerTest.java

index 5a1100624d5a3a5264661ce9d1344f2ddf905e20..429ae9dedf6b46fc831971f43f23fb2374be0117 100644 (file)
@@ -85,12 +85,11 @@ public class ComponentIndexer implements ProjectIndexer, NeedAuthorizationIndexe
   }
 
   /**
-   * @param projectUuid the uuid of the project to analyze, or <code>null</code> if all content should be indexed.<br/>
-   * <b>Warning:</b> only use <code>null</code> during startup.
+   * @param projectUuid the uuid of the project to analyze, or {@code null} if all content should be indexed.<br/>
+   * <b>Warning:</b> only use {@code null} during startup.
    */
   private void doIndexByProjectUuid(@Nullable String projectUuid, Size bulkSize) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_COMPONENT.getIndex());
-    bulk.setSize(bulkSize);
+    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_COMPONENT.getIndex(), bulkSize);
 
     bulk.start();
     try (DbSession dbSession = dbClient.openSession(false)) {
@@ -112,15 +111,14 @@ public class ComponentIndexer implements ProjectIndexer, NeedAuthorizationIndexe
   }
 
   public void delete(String projectUuid, Collection<String> disabledComponentUuids) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_COMPONENT.getIndex());
+    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_COMPONENT.getIndex(), Size.REGULAR);
     bulk.start();
-    disabledComponentUuids.stream().forEach(uuid -> bulk.addDeletion(INDEX_TYPE_COMPONENT, uuid, projectUuid));
+    disabledComponentUuids.forEach(uuid -> bulk.addDeletion(INDEX_TYPE_COMPONENT, uuid, projectUuid));
     bulk.stop();
   }
 
   void index(ComponentDto... docs) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_COMPONENT.getIndex());
-    bulk.setSize(Size.REGULAR);
+    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_COMPONENT.getIndex(), Size.REGULAR);
     bulk.start();
     Arrays.stream(docs)
       .map(ComponentIndexer::toDocument)
index 4679c63a70eb0713ffc9cac637678e862b3a1860..6d6adce0883235b45ab70b7f181845898a36b774 100644 (file)
  */
 package org.sonar.server.es;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
+import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkProcessor.Listener;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -54,93 +55,54 @@ import static java.lang.String.format;
  * <ul>
  *   <li>bulk request is sent on the wire when its size is higher than 5Mb</li>
  *   <li>on large table indexing, replicas and automatic refresh can be temporarily disabled</li>
- *   <li>index refresh is optional (enabled by default)</li>
  * </ul>
  */
 public class BulkIndexer implements Startable {
 
   private static final Logger LOGGER = Loggers.get(BulkIndexer.class);
-  private static final long FLUSH_BYTE_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB).bytes();
+  private static final ByteSizeValue FLUSH_BYTE_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
+  private static final int FLUSH_ACTIONS = -1;
   private static final String REFRESH_INTERVAL_SETTING = "index.refresh_interval";
-  private static final String ALREADY_STARTED_MESSAGE = "Bulk indexing is already started";
+  private static final int DEFAULT_NUMBER_OF_SHARDS = 5;
 
   private final EsClient client;
   private final String indexName;
-  private Size size = Size.REGULAR;
-  private long flushByteSize = FLUSH_BYTE_SIZE;
-  private BulkRequestBuilder bulkRequest = null;
-  private Map<String, Object> largeInitialSettings = null;
+  private final BulkProcessor bulkProcessor;
   private final AtomicLong counter = new AtomicLong(0L);
-  private final int concurrentRequests;
-  private final Semaphore semaphore;
-  private final ProgressLogger progress;
+  private final SizeHandler sizeHandler;
 
-  public BulkIndexer(EsClient client, String indexName) {
+  public BulkIndexer(EsClient client, String indexName, Size size) {
     this.client = client;
     this.indexName = indexName;
-    this.progress = new ProgressLogger(format("Progress[BulkIndexer[%s]]", indexName), counter, LOGGER)
-      .setPluralLabel("requests");
-
-    // see https://jira.sonarsource.com/browse/SONAR-8075
-    this.concurrentRequests = Math.max(1, Runtime.getRuntime().availableProcessors() / 5);
-    this.semaphore = new Semaphore(concurrentRequests);
-  }
-
-  public enum Size {
-    /** Use this size for a limited number of documents. */
-    REGULAR,
-
-    /** Use this size for initial indexing and if you expect unusual huge numbers of documents. */
-    LARGE;
-  }
-
-  /**
-   * Large indexing is an heavy operation that populates an index generally from scratch. Replicas and
-   * automatic refresh are disabled during bulk indexing and lucene segments are optimized at the end.
-   */
-  public BulkIndexer setSize(Size size) {
-    Preconditions.checkState(bulkRequest == null, ALREADY_STARTED_MESSAGE);
-    this.size = size;
-    return this;
-  }
-
-  public BulkIndexer setFlushByteSize(long flushByteSize) {
-    this.flushByteSize = flushByteSize;
-    return this;
+    this.sizeHandler = size.createHandler(Runtime2.INSTANCE);
+    this.bulkProcessor = BulkProcessor.builder(client.nativeClient(), new BulkProcessorListener())
+      .setBackoffPolicy(BackoffPolicy.exponentialBackoff())
+      .setBulkSize(FLUSH_BYTE_SIZE)
+      .setBulkActions(FLUSH_ACTIONS)
+      .setConcurrentRequests(sizeHandler.getConcurrentRequests())
+      .build();
   }
 
   @Override
   public void start() {
-    Preconditions.checkState(bulkRequest == null, ALREADY_STARTED_MESSAGE);
-    if (size == Size.LARGE) {
-      largeInitialSettings = Maps.newHashMap();
-      Map<String, Object> bulkSettings = Maps.newHashMap();
-      GetSettingsResponse settingsResp = client.nativeClient().admin().indices().prepareGetSettings(indexName).get();
-
-      // deactivate replicas
-      int initialReplicas = Integer.parseInt(settingsResp.getSetting(indexName, IndexMetaData.SETTING_NUMBER_OF_REPLICAS));
-      if (initialReplicas > 0) {
-        largeInitialSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, initialReplicas);
-        bulkSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0);
-      }
-
-      // deactivate periodical refresh
-      String refreshInterval = settingsResp.getSetting(indexName, REFRESH_INTERVAL_SETTING);
-      largeInitialSettings.put(REFRESH_INTERVAL_SETTING, refreshInterval);
-      bulkSettings.put(REFRESH_INTERVAL_SETTING, "-1");
+    sizeHandler.beforeStart(this);
+    counter.set(0L);
+  }
 
-      updateSettings(bulkSettings);
+  @Override
+  public void stop() {
+    try {
+      bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Elasticsearch bulk requests still being executed after 10 minutes", e);
     }
-    bulkRequest = client.prepareBulk().setRefresh(false);
-    counter.set(0L);
-    progress.start();
+    client.prepareRefresh(indexName).get();
+    sizeHandler.afterStop(this);
   }
 
   public void add(ActionRequest<?> request) {
-    bulkRequest.request().add(request);
-    if (bulkRequest.request().estimatedSizeInBytes() >= flushByteSize) {
-      executeBulk();
-    }
+    bulkProcessor.add(request);
   }
 
   public void addDeletion(SearchRequestBuilder searchRequest) {
@@ -192,73 +154,142 @@ public class BulkIndexer implements Startable {
    * Note that the parameter indexName could be removed if progress logs are not needed.
    */
   public static void delete(EsClient client, String indexName, SearchRequestBuilder searchRequest) {
-    BulkIndexer bulk = new BulkIndexer(client, indexName);
+    BulkIndexer bulk = new BulkIndexer(client, indexName, Size.REGULAR);
     bulk.start();
     bulk.addDeletion(searchRequest);
     bulk.stop();
   }
 
-  @Override
-  public void stop() {
-    if (bulkRequest.numberOfActions() > 0) {
-      executeBulk();
+  private final class BulkProcessorListener implements Listener {
+
+    @Override
+    public void beforeBulk(long executionId, BulkRequest request) {
+      // no action required
     }
-    try {
-      if (semaphore.tryAcquire(concurrentRequests, 10, TimeUnit.MINUTES)) {
-        semaphore.release(concurrentRequests);
+
+    @Override
+    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+      counter.addAndGet(response.getItems().length);
+
+      for (BulkItemResponse item : response.getItems()) {
+        if (item.isFailed()) {
+          LOGGER.error("index [{}], type [{}], id [{}], message [{}]", item.getIndex(), item.getType(), item.getId(), item.getFailureMessage());
+        }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Elasticsearch bulk requests still being executed after 10 minutes", e);
     }
-    progress.stop();
-    client.prepareRefresh(indexName).get();
-    if (size == Size.LARGE) {
-      // optimize lucene segments and revert index settings
-      // Optimization must be done before re-applying replicas:
-      // http://www.elasticsearch.org/blog/performance-considerations-elasticsearch-indexing/
-      client.prepareForceMerge(indexName).get();
 
-      updateSettings(largeInitialSettings);
+    @Override
+    public void afterBulk(long executionId, BulkRequest req, Throwable e) {
+      LOGGER.error("Fail to execute bulk index request: " + req, e);
     }
-    bulkRequest = null;
   }
 
-  private void updateSettings(Map<String, Object> settings) {
-    UpdateSettingsRequestBuilder req = client.nativeClient().admin().indices().prepareUpdateSettings(indexName);
-    req.setSettings(settings);
-    req.get();
+  public enum Size {
+    /** Use this size for a limited number of documents. */
+    REGULAR {
+      @Override
+      SizeHandler createHandler(Runtime2 runtime2) {
+        return new SizeHandler();
+      }
+    },
+
+    /**
+     * Large indexing is an heavy operation that populates an index generally from scratch. Replicas and
+     * automatic refresh are disabled during bulk indexing and lucene segments are optimized at the end.
+     * Use this size for initial indexing and if you expect unusual huge numbers of documents.
+     */
+    LARGE {
+      @Override
+      SizeHandler createHandler(Runtime2 runtime2) {
+        return new LargeSizeHandler(runtime2);
+      }
+    };
+
+    abstract SizeHandler createHandler(Runtime2 runtime2);
   }
 
-  private void executeBulk() {
-    final BulkRequestBuilder req = this.bulkRequest;
-    this.bulkRequest = client.prepareBulk().setRefresh(false);
-    semaphore.acquireUninterruptibly();
-    req.execute(new BulkResponseActionListener(req));
+  @VisibleForTesting
+  static class Runtime2 {
+    private static final Runtime2 INSTANCE = new Runtime2();
+
+    int getCores() {
+      return Runtime.getRuntime().availableProcessors();
+    }
+  }
+
+  static class SizeHandler {
+    /**
+     * @see BulkProcessor.Builder#setConcurrentRequests(int)
+     */
+    int getConcurrentRequests() {
+      // in the same thread by default
+      return 0;
+    }
+
+    void beforeStart(BulkIndexer bulkIndexer) {
+    }
+
+    void afterStop(BulkIndexer bulkIndexer) {
+    }
   }
 
-  private class BulkResponseActionListener implements ActionListener<BulkResponse> {
-    private final BulkRequestBuilder req;
+  static class LargeSizeHandler extends SizeHandler {
+
+    private final Map<String, Object> initialSettings = new HashMap<>();
+    private final Runtime2 runtime2;
+    private ProgressLogger progress;
 
-    BulkResponseActionListener(BulkRequestBuilder req) {
-      this.req = req;
+    LargeSizeHandler(Runtime2 runtime2) {
+      this.runtime2 = runtime2;
     }
 
     @Override
-    public void onResponse(BulkResponse response) {
-      semaphore.release();
-      counter.addAndGet(response.getItems().length);
+    int getConcurrentRequests() {
+      // see SONAR-8075
+      int cores = runtime2.getCores();
+      // FIXME do not use DEFAULT_NUMBER_OF_SHARDS
+      return Math.max(1, cores / DEFAULT_NUMBER_OF_SHARDS) - 1;
+    }
 
-      for (BulkItemResponse item : response.getItems()) {
-        if (item.isFailed()) {
-          LOGGER.error("index [{}], type [{}], id [{}], message [{}]", item.getIndex(), item.getType(), item.getId(), item.getFailureMessage());
-        }
+    @Override
+    void beforeStart(BulkIndexer bulkIndexer) {
+      this.progress = new ProgressLogger(format("Progress[BulkIndexer[%s]]", bulkIndexer.indexName), bulkIndexer.counter, LOGGER)
+        .setPluralLabel("requests");
+      this.progress.start();
+      Map<String, Object> temporarySettings = new HashMap<>();
+      GetSettingsResponse settingsResp = bulkIndexer.client.nativeClient().admin().indices().prepareGetSettings(bulkIndexer.indexName).get();
+
+      // deactivate replicas
+      int initialReplicas = Integer.parseInt(settingsResp.getSetting(bulkIndexer.indexName, IndexMetaData.SETTING_NUMBER_OF_REPLICAS));
+      if (initialReplicas > 0) {
+        initialSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, initialReplicas);
+        temporarySettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0);
       }
+
+      // deactivate periodical refresh
+      String refreshInterval = settingsResp.getSetting(bulkIndexer.indexName, REFRESH_INTERVAL_SETTING);
+      initialSettings.put(REFRESH_INTERVAL_SETTING, refreshInterval);
+      temporarySettings.put(REFRESH_INTERVAL_SETTING, "-1");
+
+      updateSettings(bulkIndexer, temporarySettings);
     }
 
     @Override
-    public void onFailure(Throwable e) {
-      semaphore.release();
-      LOGGER.error("Fail to execute bulk index request: " + req, e);
+    void afterStop(BulkIndexer bulkIndexer) {
+      // optimize lucene segments and revert index settings
+      // Optimization must be done before re-applying replicas:
+      // http://www.elasticsearch.org/blog/performance-considerations-elasticsearch-indexing/
+      bulkIndexer.client.prepareForceMerge(bulkIndexer.indexName).get();
+
+      updateSettings(bulkIndexer, initialSettings);
+      this.progress.stop();
+    }
+
+    private static void updateSettings(BulkIndexer bulkIndexer, Map<String, Object> settings) {
+      UpdateSettingsRequestBuilder req = bulkIndexer.client.nativeClient().admin().indices().prepareUpdateSettings(bulkIndexer.indexName);
+      req.setSettings(settings);
+      req.get();
     }
   }
+
 }
index 7149c34fca48e13504f2edc050361c2a888497b5..2bf75677dd92cbf070ee3695d18e6fa71531d95a 100644 (file)
  */
 package org.sonar.server.es;
 
+import static java.util.Objects.requireNonNull;
+
 import java.io.Closeable;
+
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
@@ -68,8 +71,6 @@ import org.sonar.server.es.request.ProxyRefreshRequestBuilder;
 import org.sonar.server.es.request.ProxySearchRequestBuilder;
 import org.sonar.server.es.request.ProxySearchScrollRequestBuilder;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * Facade to connect to Elasticsearch node. Handles correctly errors (logging + exceptions
  * with context) and profiling of requests.
index 421d9ade205e74cd4e3f9bab8b9bd07e6560663f..4eaa6523a30dff884ae9bae736eee1b34fa3f60c 100644 (file)
@@ -125,7 +125,7 @@ public class IssueIndexer implements ProjectIndexer, NeedAuthorizationIndexer, S
 
   @Override
   public void deleteProject(String uuid) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_ISSUE.getIndex());
+    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_ISSUE.getIndex(), Size.REGULAR);
     bulk.start();
     SearchRequestBuilder search = esClient.prepareSearch(INDEX_TYPE_ISSUE)
       .setRouting(uuid)
@@ -157,8 +157,7 @@ public class IssueIndexer implements ProjectIndexer, NeedAuthorizationIndexer, S
   }
 
   private BulkIndexer createBulkIndexer(Size bulkSize) {
-    return new BulkIndexer(esClient, INDEX_TYPE_ISSUE.getIndex())
-      .setSize(bulkSize);
+    return new BulkIndexer(esClient, INDEX_TYPE_ISSUE.getIndex(), bulkSize);
   }
 
   private static IndexRequest newIndexRequest(IssueDoc issue) {
index 5268ade753bb61379e553f0668d229202a36e392..e7851052f4936f5415cedc67204d4e5fd7c8c072 100644 (file)
@@ -111,8 +111,7 @@ public class ProjectMeasuresIndexer implements ProjectIndexer, NeedAuthorization
   }
 
   private BulkIndexer createBulkIndexer(Size bulkSize) {
-    return new BulkIndexer(esClient, INDEX_TYPE_PROJECT_MEASURES.getIndex())
-      .setSize(bulkSize);
+    return new BulkIndexer(esClient, INDEX_TYPE_PROJECT_MEASURES.getIndex(), bulkSize);
   }
 
   private static IndexRequest newIndexRequest(ProjectMeasuresDoc doc) {
index 9ee21a3ef2b5fb6ab321492fcc9396333b031478..41dd5b64ba4c72995e831251064c1beeaf928d20 100644 (file)
@@ -148,8 +148,7 @@ public class PermissionIndexer implements ProjectIndexer, StartupIndexer {
   private void index(Collection<PermissionIndexerDao.Dto> authorizations, AuthorizationScope scope, Size bulkSize) {
     IndexType indexType = scope.getIndexType();
 
-    BulkIndexer bulkIndexer = new BulkIndexer(esClient, indexType.getIndex());
-    bulkIndexer.setSize(bulkSize);
+    BulkIndexer bulkIndexer = new BulkIndexer(esClient, indexType.getIndex(), bulkSize);
     bulkIndexer.start();
 
     authorizations.stream()
index 7b54ecb583393ddac4e7fec6a0b7b9af52e03cc3..c5b385b487e7abf7bf5a679f199217bb711d8d91 100644 (file)
@@ -53,11 +53,11 @@ public class ActiveRuleIndexer extends BaseIndexer {
 
   @Override
   protected long doIndex(long lastUpdatedAt) {
-    return doIndex(createBulkIndexer(Size.REGULAR), lastUpdatedAt);
+    return doIndex(createBulkIndexer(), lastUpdatedAt);
   }
 
   public void index(Iterator<ActiveRuleDoc> rules) {
-    doIndex(createBulkIndexer(Size.REGULAR), rules);
+    doIndex(createBulkIndexer(), rules);
   }
 
   private long doIndex(BulkIndexer bulk, long lastUpdatedAt) {
@@ -94,7 +94,7 @@ public class ActiveRuleIndexer extends BaseIndexer {
   }
 
   public void deleteByProfileKeys(Collection<String> profileKeys) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_ACTIVE_RULE.getIndex());
+    BulkIndexer bulk = createBulkIndexer();
     bulk.start();
     profileKeys.forEach(profileKey -> {
       SearchRequestBuilder search = esClient.prepareSearch(INDEX_TYPE_ACTIVE_RULE)
@@ -105,7 +105,7 @@ public class ActiveRuleIndexer extends BaseIndexer {
   }
 
   private void deleteKeys(List<ActiveRuleKey> keys) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_ACTIVE_RULE.getIndex());
+    BulkIndexer bulk = createBulkIndexer();
     bulk.start();
     SearchRequestBuilder search = esClient.prepareSearch(INDEX_TYPE_ACTIVE_RULE)
       .setQuery(QueryBuilders.boolQuery().must(termsQuery(FIELD_ACTIVE_RULE_KEY, keys)));
@@ -113,10 +113,8 @@ public class ActiveRuleIndexer extends BaseIndexer {
     bulk.stop();
   }
 
-  private BulkIndexer createBulkIndexer(Size size) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_ACTIVE_RULE.getIndex());
-    bulk.setSize(size);
-    return bulk;
+  private BulkIndexer createBulkIndexer() {
+    return new BulkIndexer(esClient, INDEX_TYPE_ACTIVE_RULE.getIndex(), Size.REGULAR);
   }
 
   private static IndexRequest newIndexRequest(ActiveRuleDoc doc) {
index b5032644f38fcb6ee417d22e149e6667399c962d..adf9617d2e589ab4ed317dfc368f99127c595a36 100644 (file)
@@ -56,7 +56,7 @@ public class RuleIndexer implements StartupIndexer {
 
   @Override
   public void indexOnStartup(Set<IndexType> uninitializedIndexTypes) {
-    BulkIndexer bulk = new BulkIndexer(esClient, RuleIndexDefinition.INDEX).setSize(Size.LARGE);
+    BulkIndexer bulk = new BulkIndexer(esClient, RuleIndexDefinition.INDEXSize.LARGE);
     bulk.start();
 
     // index all definitions and system extensions
@@ -81,7 +81,7 @@ public class RuleIndexer implements StartupIndexer {
   }
 
   public void indexRuleDefinitions(List<RuleKey> ruleKeys) {
-    BulkIndexer bulk = new BulkIndexer(esClient, RuleIndexDefinition.INDEX).setSize(Size.REGULAR);
+    BulkIndexer bulk = new BulkIndexer(esClient, RuleIndexDefinition.INDEXSize.REGULAR);
     bulk.start();
 
     try (RuleIterator rules = new RuleIteratorForMultipleChunks(dbClient, ruleKeys)) {
@@ -98,12 +98,10 @@ public class RuleIndexer implements StartupIndexer {
         .map(ruleExtension -> RuleExtensionDoc.of(ruleKey, RuleExtensionScope.organization(organization), ruleExtension))
         .map(Arrays::asList)
         .map(List::iterator)
-        .ifPresent(metadatas -> {
-          BulkIndexer bulk = new BulkIndexer(esClient, RuleIndexDefinition.INDEX).setSize(Size.REGULAR);
+        .ifPresent(metadata -> {
+          BulkIndexer bulk = new BulkIndexer(esClient, RuleIndexDefinition.INDEXSize.REGULAR);
           bulk.start();
-
-          doIndexRuleExtensions(metadatas, bulk);
-
+          doIndexRuleExtensions(metadata, bulk);
           bulk.stop();
         });
     }
index bb6b0553bbcc1cd58c8df1e6489fed79cad63c11..4e0eccebfcbb38f6b3a3e9f380294901e5dd94d5 100644 (file)
@@ -82,23 +82,18 @@ public class TestIndexer implements ProjectIndexer, StartupIndexer {
   }
 
   public long index(Iterator<FileSourcesUpdaterHelper.Row> dbRows) {
-    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_TEST.getIndex());
+    BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_TEST.getIndex(), Size.REGULAR);
     return doIndex(bulk, dbRows);
   }
 
   private long doIndex(@Nullable String projectUuid, Size bulkSize) {
-    final BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_TEST.getIndex());
-    bulk.setSize(bulkSize);
+    final BulkIndexer bulk = new BulkIndexer(esClient, INDEX_TYPE_TEST.getIndex(), bulkSize);
 
-    DbSession dbSession = dbClient.openSession(false);
-    try {
+    try (DbSession dbSession = dbClient.openSession(false)) {
       TestResultSetIterator rowIt = TestResultSetIterator.create(dbClient, dbSession, projectUuid);
       long maxUpdatedAt = doIndex(bulk, rowIt);
       rowIt.close();
       return maxUpdatedAt;
-
-    } finally {
-      dbSession.close();
     }
   }
 
index e8c535915b376efdc6799b8193c801ffe2673d9d..65241119240ffccf7d48c3f446242a793e7399d1 100644 (file)
@@ -98,9 +98,7 @@ public class UserIndexer implements StartupIndexer {
   }
 
   private BulkIndexer newBulkIndexer(Size bulkSize) {
-    final BulkIndexer bulk = new BulkIndexer(esClient, UserIndexDefinition.INDEX_TYPE_USER.getIndex());
-    bulk.setSize(bulkSize);
-    return bulk;
+    return new BulkIndexer(esClient, UserIndexDefinition.INDEX_TYPE_USER.getIndex(), bulkSize);
   }
 
   private static IndexRequest newIndexRequest(UserDoc user) {
index 0f0eed695d2687b18fd889ac99205d07af2a21db..14b5ee7d892cac0a4ad0d31522b66bcdd6b5e95f 100644 (file)
@@ -88,15 +88,14 @@ public class ViewIndexer implements StartupIndexer {
    * The views lookup cache will be cleared
    */
   public void index(ViewDoc viewDoc) {
-    final BulkIndexer bulk = new BulkIndexer(esClient, ViewIndexDefinition.INDEX_TYPE_VIEW.getIndex());
+    BulkIndexer bulk = new BulkIndexer(esClient, ViewIndexDefinition.INDEX_TYPE_VIEW.getIndex(), Size.REGULAR);
     bulk.start();
     doIndex(bulk, viewDoc, true);
     bulk.stop();
   }
 
   private void index(DbSession dbSession, Map<String, String> viewAndProjectViewUuidMap, boolean needClearCache, Size bulkSize) {
-    final BulkIndexer bulk = new BulkIndexer(esClient, ViewIndexDefinition.INDEX_TYPE_VIEW.getIndex());
-    bulk.setSize(bulkSize);
+    BulkIndexer bulk = new BulkIndexer(esClient, ViewIndexDefinition.INDEX_TYPE_VIEW.getIndex(), bulkSize);
     bulk.start();
     for (Map.Entry<String, String> entry : viewAndProjectViewUuidMap.entrySet()) {
       String viewUuid = entry.getKey();
diff --git a/server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerConcurrentRequestCalculationTest.java b/server/sonar-server/src/test/java/org/sonar/server/es/BulkIndexerConcurrentRequestCalculationTest.java
new file mode 100644 (file)
index 0000000..66d0f39
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.es;
+
+import org.assertj.core.api.AbstractIntegerAssert;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BulkIndexerConcurrentRequestCalculationTest {
+
+  @Test
+  public void should_not_parallelize_if_regular_size() {
+    assertConcurrentRequests(BulkIndexer.Size.REGULAR, cores(4))
+      .isEqualTo(0);
+  }
+
+  @Test
+  public void should_not_parallelize_if_large_indexing_but_few_cores() {
+    assertConcurrentRequests(BulkIndexer.Size.LARGE, cores(4))
+      .isEqualTo(0);
+  }
+
+  /**
+   * see https://jira.sonarsource.com/browse/SONAR-8075
+   */
+  @Test
+  public void should_heavily_parallelize_on_96_cores_if_large_indexing() {
+    assertConcurrentRequests(BulkIndexer.Size.LARGE, cores(96))
+      .isEqualTo(18);
+  }
+
+  private AbstractIntegerAssert<?> assertConcurrentRequests(BulkIndexer.Size size, BulkIndexer.Runtime2 runtime2) {
+    return assertThat(size.createHandler(runtime2).getConcurrentRequests());
+  }
+
+  private static BulkIndexer.Runtime2 cores(int cores) {
+    BulkIndexer.Runtime2 runtime = mock(BulkIndexer.Runtime2.class);
+    when(runtime.getCores()).thenReturn(cores);
+    return runtime;
+  }
+}
index c56819fe9cfd43c3d3ea149ee48855f7b689254e..00a6028c46b3250aabedea8d51eaee161ab801ba 100644 (file)
@@ -40,7 +40,7 @@ public class BulkIndexerTest {
 
   @Test
   public void index_nothing() {
-    BulkIndexer indexer = new BulkIndexer(esTester.client(), INDEX);
+    BulkIndexer indexer = new BulkIndexer(esTester.client(), INDEX, Size.REGULAR);
     indexer.start();
     indexer.stop();
 
@@ -49,7 +49,7 @@ public class BulkIndexerTest {
 
   @Test
   public void index_documents() {
-    BulkIndexer indexer = new BulkIndexer(esTester.client(), INDEX);
+    BulkIndexer indexer = new BulkIndexer(esTester.client(), INDEX, Size.REGULAR);
     indexer.start();
     indexer.add(newIndexRequest(42));
     indexer.add(newIndexRequest(78));
@@ -67,9 +67,7 @@ public class BulkIndexerTest {
     // index has one replica
     assertThat(replicas()).isEqualTo(1);
 
-    BulkIndexer indexer = new BulkIndexer(esTester.client(), INDEX)
-      .setFlushByteSize(500)
-      .setSize(Size.LARGE);
+    BulkIndexer indexer = new BulkIndexer(esTester.client(), INDEX, Size.LARGE);
     indexer.start();
 
     // replicas are temporarily disabled