summaryrefslogtreecommitdiffstats
path: root/sonar-server
diff options
context:
space:
mode:
authorStephane Gamard <stephane.gamard@searchbox.com>2014-04-25 12:42:13 +0200
committerStephane Gamard <stephane.gamard@searchbox.com>2014-04-25 12:42:13 +0200
commit74c27dcda863eee86475940f0187cf2de352433f (patch)
tree82a188380085cdf2806b7982c59d60db9ef6f953 /sonar-server
parentbb373c4a41e454b376c4f6ffb2b7434d11dcc3d2 (diff)
downloadsonarqube-74c27dcda863eee86475940f0187cf2de352433f.tar.gz
sonarqube-74c27dcda863eee86475940f0187cf2de352433f.zip
SONAR-5237 - updated Index interface and intitial Index abstract
Diffstat (limited to 'sonar-server')
-rw-r--r--sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java15
-rw-r--r--sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java180
-rw-r--r--sonar-server/src/main/java/org/sonar/server/search/Index.java9
-rw-r--r--sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java69
4 files changed, 262 insertions, 11 deletions
diff --git a/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java b/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java
index d7ffbd5d448..1b6a89117c9 100644
--- a/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java
+++ b/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java
@@ -19,22 +19,19 @@
*/
package org.sonar.server.cluster;
-import java.io.Serializable;
-
-
public interface WorkQueue {
- Integer enqueInsert(String indexName, Serializable key);
+ Integer enqueInsert(String indexName, Object key);
- Integer enqueUpdate(String indexName, Serializable key);
+ Integer enqueUpdate(String indexName, Object key);
- Integer enqueDelete(String indexName, Serializable key);
+ Integer enqueDelete(String indexName, Object key);
- Serializable dequeInsert(String indexName);
+ Object dequeInsert(String indexName);
- Serializable dequeUpdate(String indexName);
+ Object dequeUpdate(String indexName);
- Serializable dequeDelete(String indexName);
+ Object dequeDelete(String indexName);
Status getStatus(Integer workId);
diff --git a/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java b/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
new file mode 100644
index 00000000000..e3716781f89
--- /dev/null
+++ b/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java
@@ -0,0 +1,180 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search;
+
+import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonar.core.profiling.Profiling;
+import org.sonar.core.profiling.Profiling.Level;
+import org.sonar.core.profiling.StopWatch;
+import org.sonar.server.cluster.WorkQueue;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public abstract class BaseIndex<K extends Serializable> implements Index<K>{
+
+ private static final String ES_EXECUTE_FAILED = "Failed execution of {}. Root is {}";
+
+ private static final String BULK_EXECUTE_FAILED = "Execution of bulk operation failed";
+ private static final String BULK_INTERRUPTED = "Interrupted during bulk operation";
+
+ private static final String PROFILE_DOMAIN = "es";
+ private static final Logger LOG = LoggerFactory.getLogger(BaseIndex.class);
+
+ public static final String ES_CLUSTER_NAME = "sonarcluster";
+
+ private static final String LOCAL_ES_NODE_HOST = "localhost";
+ private static final int LOCAL_ES_NODE_PORT = 9300;
+
+ private final Profiling profiling;
+ private Client client;
+ private WorkQueue workQueue;
+ private IndexSynchronizer synchronizer;
+
+ public BaseIndex(WorkQueue workQueue, Profiling profiling) {
+ this.profiling = profiling;
+ this.workQueue = workQueue;
+ this.synchronizer = IndexSynchronizer.getOnetimeSynchronizer(this, this.workQueue);
+ }
+
+ @Override
+ public void start() {
+
+ /* Settings to access our local ES node */
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put("client.transport.sniff", true)
+ .put("cluster.name", ES_CLUSTER_NAME)
+ .put("node.name", "localclient_")
+ .build();
+
+ this.client = new TransportClient(settings)
+ .addTransportAddress(new InetSocketTransportAddress(LOCAL_ES_NODE_HOST, LOCAL_ES_NODE_PORT));
+
+ /* Cannot do that yet, need version >= 1.0
+ ImmutableList<DiscoveryNode> nodes = client.connectedNodes();
+ if (nodes.isEmpty()) {
+ throw new ElasticSearchUnavailableException("No nodes available. Verify ES is running!");
+ } else {
+ log.info("connected to nodes: " + nodes.toString());
+ }
+ */
+
+ /* Launch synchronization */
+ synchronizer.start();
+ }
+
+ @Override
+ public void stop() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ public Collection<K> synchronizeSince(Long date) {
+ // TODO Auto-generated method stub
+ return Collections.EMPTY_LIST;
+ }
+
+ public ClusterStatsNodes getNodesStats() {
+ StopWatch watch = createWatch();
+ try {
+ return client.admin().cluster().prepareClusterStats().get().getNodesStats();
+ } finally {
+ watch.stop("ping from transport client");
+ }
+ }
+
+ private StopWatch createWatch() {
+ return profiling.start(PROFILE_DOMAIN, Level.FULL);
+ }
+
+ @Override
+ public Hit getByKey(K key) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void insert(K key) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void udpate(K key) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void delete(K key) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public K dequeueInsert() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public K dequeueUpdate() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public K dequeueDelete() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public abstract Map<String, Object> normalize(K key);
+
+
+ @Override
+ public String getIndexName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setLastSynchronization(Long time) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Long getLastSynchronization() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
diff --git a/sonar-server/src/main/java/org/sonar/server/search/Index.java b/sonar-server/src/main/java/org/sonar/server/search/Index.java
index a93d2db89e0..4505806039c 100644
--- a/sonar-server/src/main/java/org/sonar/server/search/Index.java
+++ b/sonar-server/src/main/java/org/sonar/server/search/Index.java
@@ -28,6 +28,8 @@ import java.util.Map;
public interface Index<K extends Serializable> extends Startable {
+ String getIndexName();
+
Hit getByKey(K key);
void insert(K key);
@@ -44,7 +46,10 @@ public interface Index<K extends Serializable> extends Startable {
Map<String, Object> normalize(K key);
- Date getLastSynchronization();
+ Long getLastSynchronization();
+
+ void setLastSynchronization(Long time);
+
+ Collection<K> synchronizeSince(Long time);
- Collection<K> synchronizeSince(Date date);
}
diff --git a/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java b/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java
new file mode 100644
index 00000000000..ae26a60cf0f
--- /dev/null
+++ b/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java
@@ -0,0 +1,69 @@
+package org.sonar.server.search;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonar.server.cluster.WorkQueue;
+
+import java.util.Date;
+
+public class IndexSynchronizer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IndexSynchronizer.class);
+
+ private static final Long DEFAULT_WAIT_TIME = 5000l;
+
+ private long wait = 0;
+ private boolean continuous;
+
+ private final Index<?> index;
+ private final WorkQueue workQueue;
+
+ public static IndexSynchronizer getContinuousSynchronizer(Index<?> index, WorkQueue workQueue) {
+ return new IndexSynchronizer(index, workQueue)
+ .setContinuous(true)
+ .setWait(DEFAULT_WAIT_TIME);
+ }
+
+ public static IndexSynchronizer getContinuousSynchronizer(Index<?> index, WorkQueue workQueue, Long wait) {
+ return new IndexSynchronizer(index, workQueue)
+ .setContinuous(true)
+ .setWait(wait);
+ }
+
+ public static IndexSynchronizer getOnetimeSynchronizer(Index<?> index, WorkQueue workQueue) {
+ return new IndexSynchronizer(index, workQueue)
+ .setContinuous(false);
+ }
+
+ private IndexSynchronizer(Index<?> index, WorkQueue workQueue) {
+ this.index = index;
+ this.workQueue = workQueue;
+ }
+
+ private IndexSynchronizer setWait(Long wait) {
+ this.wait = wait;
+ return this;
+ }
+
+ private IndexSynchronizer setContinuous(Boolean continuous) {
+ this.continuous = continuous;
+ return this;
+ }
+
+ public IndexSynchronizer start() {
+
+ LOG.info("Starting synchronization thread for ", index.getClass().getSimpleName());
+
+ Long since = index.getLastSynchronization();
+ index.setLastSynchronization(System.currentTimeMillis());
+
+ for (Object key : index.synchronizeSince(since)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding {} to workQueue for {}", key, index.getClass().getSimpleName());
+ }
+ workQueue.enqueInsert(index.getIndexName(), key);
+ }
+
+ return this;
+ }
+}