From 74c27dcda863eee86475940f0187cf2de352433f Mon Sep 17 00:00:00 2001 From: Stephane Gamard Date: Fri, 25 Apr 2014 12:42:13 +0200 Subject: [PATCH] SONAR-5237 - updated Index interface and intitial Index abstract --- .../org/sonar/server/cluster/WorkQueue.java | 15 +- .../org/sonar/server/search/BaseIndex.java | 180 ++++++++++++++++++ .../java/org/sonar/server/search/Index.java | 9 +- .../server/search/IndexSynchronizer.java | 69 +++++++ 4 files changed, 262 insertions(+), 11 deletions(-) create mode 100644 sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java create mode 100644 sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java diff --git a/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java b/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java index d7ffbd5d448..1b6a89117c9 100644 --- a/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java +++ b/sonar-server/src/main/java/org/sonar/server/cluster/WorkQueue.java @@ -19,22 +19,19 @@ */ package org.sonar.server.cluster; -import java.io.Serializable; - - public interface WorkQueue { - Integer enqueInsert(String indexName, Serializable key); + Integer enqueInsert(String indexName, Object key); - Integer enqueUpdate(String indexName, Serializable key); + Integer enqueUpdate(String indexName, Object key); - Integer enqueDelete(String indexName, Serializable key); + Integer enqueDelete(String indexName, Object key); - Serializable dequeInsert(String indexName); + Object dequeInsert(String indexName); - Serializable dequeUpdate(String indexName); + Object dequeUpdate(String indexName); - Serializable dequeDelete(String indexName); + Object dequeDelete(String indexName); Status getStatus(Integer workId); diff --git a/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java b/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java new file mode 100644 index 00000000000..e3716781f89 --- /dev/null +++ b/sonar-server/src/main/java/org/sonar/server/search/BaseIndex.java @@ -0,0 +1,180 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.search; + +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonar.core.profiling.Profiling; +import org.sonar.core.profiling.Profiling.Level; +import org.sonar.core.profiling.StopWatch; +import org.sonar.server.cluster.WorkQueue; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public abstract class BaseIndex implements Index{ + + private static final String ES_EXECUTE_FAILED = "Failed execution of {}. Root is {}"; + + private static final String BULK_EXECUTE_FAILED = "Execution of bulk operation failed"; + private static final String BULK_INTERRUPTED = "Interrupted during bulk operation"; + + private static final String PROFILE_DOMAIN = "es"; + private static final Logger LOG = LoggerFactory.getLogger(BaseIndex.class); + + public static final String ES_CLUSTER_NAME = "sonarcluster"; + + private static final String LOCAL_ES_NODE_HOST = "localhost"; + private static final int LOCAL_ES_NODE_PORT = 9300; + + private final Profiling profiling; + private Client client; + private WorkQueue workQueue; + private IndexSynchronizer synchronizer; + + public BaseIndex(WorkQueue workQueue, Profiling profiling) { + this.profiling = profiling; + this.workQueue = workQueue; + this.synchronizer = IndexSynchronizer.getOnetimeSynchronizer(this, this.workQueue); + } + + @Override + public void start() { + + /* Settings to access our local ES node */ + Settings settings = ImmutableSettings.settingsBuilder() + .put("client.transport.sniff", true) + .put("cluster.name", ES_CLUSTER_NAME) + .put("node.name", "localclient_") + .build(); + + this.client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress(LOCAL_ES_NODE_HOST, LOCAL_ES_NODE_PORT)); + + /* Cannot do that yet, need version >= 1.0 + ImmutableList nodes = client.connectedNodes(); + if (nodes.isEmpty()) { + throw new ElasticSearchUnavailableException("No nodes available. Verify ES is running!"); + } else { + log.info("connected to nodes: " + nodes.toString()); + } + */ + + /* Launch synchronization */ + synchronizer.start(); + } + + @Override + public void stop() { + if (client != null) { + client.close(); + } + } + + public Collection synchronizeSince(Long date) { + // TODO Auto-generated method stub + return Collections.EMPTY_LIST; + } + + public ClusterStatsNodes getNodesStats() { + StopWatch watch = createWatch(); + try { + return client.admin().cluster().prepareClusterStats().get().getNodesStats(); + } finally { + watch.stop("ping from transport client"); + } + } + + private StopWatch createWatch() { + return profiling.start(PROFILE_DOMAIN, Level.FULL); + } + + @Override + public Hit getByKey(K key) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void insert(K key) { + // TODO Auto-generated method stub + + } + + @Override + public void udpate(K key) { + // TODO Auto-generated method stub + + } + + @Override + public void delete(K key) { + // TODO Auto-generated method stub + + } + + @Override + public K dequeueInsert() { + // TODO Auto-generated method stub + return null; + } + + @Override + public K dequeueUpdate() { + // TODO Auto-generated method stub + return null; + } + + @Override + public K dequeueDelete() { + // TODO Auto-generated method stub + return null; + } + + @Override + public abstract Map normalize(K key); + + + @Override + public String getIndexName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setLastSynchronization(Long time) { + // TODO Auto-generated method stub + + } + + @Override + public Long getLastSynchronization() { + // TODO Auto-generated method stub + return null; + } +} diff --git a/sonar-server/src/main/java/org/sonar/server/search/Index.java b/sonar-server/src/main/java/org/sonar/server/search/Index.java index a93d2db89e0..4505806039c 100644 --- a/sonar-server/src/main/java/org/sonar/server/search/Index.java +++ b/sonar-server/src/main/java/org/sonar/server/search/Index.java @@ -28,6 +28,8 @@ import java.util.Map; public interface Index extends Startable { + String getIndexName(); + Hit getByKey(K key); void insert(K key); @@ -44,7 +46,10 @@ public interface Index extends Startable { Map normalize(K key); - Date getLastSynchronization(); + Long getLastSynchronization(); + + void setLastSynchronization(Long time); + + Collection synchronizeSince(Long time); - Collection synchronizeSince(Date date); } diff --git a/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java b/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java new file mode 100644 index 00000000000..ae26a60cf0f --- /dev/null +++ b/sonar-server/src/main/java/org/sonar/server/search/IndexSynchronizer.java @@ -0,0 +1,69 @@ +package org.sonar.server.search; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonar.server.cluster.WorkQueue; + +import java.util.Date; + +public class IndexSynchronizer { + + private static final Logger LOG = LoggerFactory.getLogger(IndexSynchronizer.class); + + private static final Long DEFAULT_WAIT_TIME = 5000l; + + private long wait = 0; + private boolean continuous; + + private final Index index; + private final WorkQueue workQueue; + + public static IndexSynchronizer getContinuousSynchronizer(Index index, WorkQueue workQueue) { + return new IndexSynchronizer(index, workQueue) + .setContinuous(true) + .setWait(DEFAULT_WAIT_TIME); + } + + public static IndexSynchronizer getContinuousSynchronizer(Index index, WorkQueue workQueue, Long wait) { + return new IndexSynchronizer(index, workQueue) + .setContinuous(true) + .setWait(wait); + } + + public static IndexSynchronizer getOnetimeSynchronizer(Index index, WorkQueue workQueue) { + return new IndexSynchronizer(index, workQueue) + .setContinuous(false); + } + + private IndexSynchronizer(Index index, WorkQueue workQueue) { + this.index = index; + this.workQueue = workQueue; + } + + private IndexSynchronizer setWait(Long wait) { + this.wait = wait; + return this; + } + + private IndexSynchronizer setContinuous(Boolean continuous) { + this.continuous = continuous; + return this; + } + + public IndexSynchronizer start() { + + LOG.info("Starting synchronization thread for ", index.getClass().getSimpleName()); + + Long since = index.getLastSynchronization(); + index.setLastSynchronization(System.currentTimeMillis()); + + for (Object key : index.synchronizeSince(since)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding {} to workQueue for {}", key, index.getClass().getSimpleName()); + } + workQueue.enqueInsert(index.getIndexName(), key); + } + + return this; + } +} -- 2.39.5