import java.io.Serializable;
-public class IndexAction {
+public class IndexAction<K extends Serializable> {
public enum Method {
INSERT, UPDATE, DELETE
}
String indexName;
- Serializable key;
+ K key;
Method method;
- public IndexAction(String indexName, Method method, Serializable key){
+ public IndexAction(String indexName, Method method, K key){
this.indexName = indexName;
this.method = method;
this.key = key;
this.indexName = indexName;
}
- public Serializable getKey() {
+ public K getKey() {
return key;
}
- public void setKey(Serializable key) {
+ public void setKey(K key) {
this.key = key;
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.core.cluster;
-
-import org.jfree.util.Log;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class LocalNonBlockingWorkQueue implements WorkQueue{
-
- private ConcurrentLinkedQueue<IndexAction> actions;
-
- public LocalNonBlockingWorkQueue(){
- this.actions = new ConcurrentLinkedQueue<IndexAction>();
- }
-
- @Override
- public Integer enqueue(IndexAction... indexActions){
- for(IndexAction action:indexActions){
- actions.offer(action);
- }
- return 0;
- }
-
- @Override
- public Object dequeue(){
- Object out = actions.poll();
- while(out == null){
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- Log.error("Oops");
- }
- out = actions.poll();
- }
- return out;
- }
-
- @Override
- public Status getStatus(Integer workId) {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
}
@Override
- public Integer enqueue(IndexAction... action) {
+ public Integer enqueue(IndexAction<?>... action) {
// TODO Auto-generated method stub
return null;
}
@Override
- public Object dequeue() {
+ public IndexAction<?> dequeue() {
// TODO Auto-generated method stub
return null;
}
-
- @Override
- public Status getStatus(Integer workId) {
- // TODO Auto-generated method stub
- return null;
- }
-
}
public interface WorkQueue {
- Integer enqueue(IndexAction... action);
+ Integer enqueue(IndexAction<?>... action);
- Object dequeue();
+ IndexAction<?> dequeue();
- Status getStatus(Integer workId);
-
- interface Status {
-
- }
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.core.cluster;
-
-import org.sonar.core.cluster.LocalNonBlockingWorkQueue;
-
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import static org.fest.assertions.Assertions.assertThat;
-
-
-public class LocalNonBlockingWorkQueueTest {
-//
-// private static final String WORKING_INDEX = "working_index";
-// private static final String NON_WORKING_INDEX = "non_working_index";
-//
-// @Test
-// public void test_insert_queue(){
-// LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
-//
-// assertThat(queue.dequeInsert(WORKING_INDEX)).isNull();
-// assertThat(queue.dequeInsert(NON_WORKING_INDEX)).isNull();
-//
-// queue.enqueInsert(WORKING_INDEX, new Integer(0));
-// assertThat(queue.dequeInsert(NON_WORKING_INDEX)).isNull();
-//
-// Object dequeued = queue.dequeInsert(WORKING_INDEX);
-// assertThat(queue.dequeInsert(NON_WORKING_INDEX)).isNull();
-// assertThat(queue.dequeInsert(WORKING_INDEX)).isNull();
-//
-// assertThat(dequeued).isEqualTo(new Integer(0));
-// }
-//
-// @Test
-// public void test_update_queue(){
-// LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
-//
-// assertThat(queue.dequeUpdate(WORKING_INDEX)).isNull();
-// assertThat(queue.dequeUpdate(NON_WORKING_INDEX)).isNull();
-//
-// queue.enqueUpdate(WORKING_INDEX, new Integer(0));
-// assertThat(queue.dequeUpdate(NON_WORKING_INDEX)).isNull();
-//
-// Object dequeued = queue.dequeUpdate(WORKING_INDEX);
-// assertThat(queue.dequeUpdate(NON_WORKING_INDEX)).isNull();
-// assertThat(queue.dequeUpdate(WORKING_INDEX)).isNull();
-//
-// assertThat(dequeued).isEqualTo(new Integer(0));
-// }
-//
-// @Test
-// public void test_delete_queue(){
-// LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
-//
-// assertThat(queue.dequeDelete(WORKING_INDEX)).isNull();
-// assertThat(queue.dequeDelete(NON_WORKING_INDEX)).isNull();
-//
-// queue.enqueDelete(WORKING_INDEX, new Integer(0));
-// assertThat(queue.dequeDelete(NON_WORKING_INDEX)).isNull();
-//
-// Object dequeued = queue.dequeDelete(WORKING_INDEX);
-// assertThat(queue.dequeDelete(NON_WORKING_INDEX)).isNull();
-// assertThat(queue.dequeDelete(WORKING_INDEX)).isNull();
-//
-// assertThat(dequeued).isEqualTo(new Integer(0));
-// }
-//
-// @Test
-// public void test_enque_seralizable_object(){
-//
-// LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
-//
-// class NonSerializable implements Serializable{
-// private Object var1;
-// private Map<String, Object> objs;
-// }
-//
-// NonSerializable nonSer = new NonSerializable();
-// assertThat(queue.enqueInsert(WORKING_INDEX, nonSer)).isNotNull();
-//
-// Object dequeued = queue.dequeInsert(WORKING_INDEX);
-// assertThat(queue.dequeInsert(NON_WORKING_INDEX)).isNull();
-//
-// assertThat(dequeued).isNotNull();
-// assertThat(dequeued.getClass()).isEqualTo(NonSerializable.class);
-// }
-//
-// @Test
-// public void test_under_queue_capacity(){
-// LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
-//
-// for(int i = 0; i < 10; i++){
-// assertThat(queue.enqueDelete(WORKING_INDEX, i)).isNotNull();
-// }
-//
-// for(int i = 0; i < 10; i++){
-// assertThat(queue.dequeDelete(WORKING_INDEX)).isNotNull();
-// }
-// assertThat(queue.dequeDelete(WORKING_INDEX)).isNull();
-//
-// }
-//
-// @Test
-// public void test_over_queue_capacity(){
-// LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
-//
-// for(int i = 0; i < 100; i++){
-// assertThat(queue.enqueDelete(WORKING_INDEX, i)).isNotNull();
-// }
-//
-// for(int i = 0; i < 100; i++){
-// assertThat(queue.dequeDelete(WORKING_INDEX)).isNotNull();
-// }
-// assertThat(queue.dequeDelete(WORKING_INDEX)).isNull();
-//
-// }
-
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.cluster;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonar.core.cluster.IndexAction;
+import org.sonar.core.cluster.WorkQueue;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class LocalNonBlockingWorkQueue implements WorkQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalNonBlockingWorkQueue.class);
+
+ private final ConcurrentLinkedQueue<IndexAction<?>> actions;
+
+ public LocalNonBlockingWorkQueue() {
+ this.actions = new ConcurrentLinkedQueue<IndexAction<?>>();
+ }
+
+ @Override
+ public Integer enqueue(IndexAction<?>... indexActions) {
+ for (IndexAction<?> action : indexActions) {
+ actions.offer(action);
+ }
+ return 0;
+ }
+
+ @Override
+ public IndexAction<?> dequeue() {
+ IndexAction<?> out = actions.poll();
+ return out;
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.cluster;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jfree.util.Log;
+import org.sonar.core.cluster.IndexAction;
+import org.sonar.server.search.Index;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.picocontainer.Startable;
+import org.sonar.api.ServerComponent;
+import org.sonar.core.cluster.WorkQueue;
+
+public class LocalQueueWorker implements ServerComponent, Startable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalNonBlockingWorkQueue.class);
+
+ private WorkQueue queue;
+
+ private volatile Thread worker;
+ private Map<String, Index<?>> indexes;
+
+ class Worker implements Runnable {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ LOG.info("Starting Worker Thread");
+ Thread thisThread = Thread.currentThread();
+ while (worker == thisThread) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ Log.error("Oops");
+ }
+
+ @SuppressWarnings("rawtypes")
+ IndexAction action = queue.dequeue();
+
+ if (action != null && indexes.containsKey(action.getIndexName())) {
+ indexes.get(action.getIndexName()).executeAction(action);
+ }
+ }
+ LOG.info("Stoping Worker Thread");
+ }
+ }
+
+ public LocalQueueWorker(WorkQueue queue, Index<?>... indexes) {
+ this.queue = queue;
+ this.worker = new Thread(new Worker());
+ this.indexes = new HashMap<String, Index<?>>();
+ for(Index<?> index:indexes){
+ this.indexes.put(index.getIndexName(), index);
+ }
+ }
+
+
+ @Override
+ public void start() {
+ this.worker.start();
+ }
+
+ @Override
+ public void stop() {
+ this.worker = null;
+ }
+}
*/
package org.sonar.server.platform;
+import org.sonar.server.search.IndexUtils;
+
import com.google.common.collect.Lists;
import org.apache.commons.configuration.BaseConfiguration;
import org.sonar.api.config.EmailSettings;
import org.sonar.api.utils.System2;
import org.sonar.api.utils.UriReader;
import org.sonar.api.utils.internal.TempFolderCleaner;
-import org.sonar.core.cluster.LocalNonBlockingWorkQueue;
import org.sonar.core.component.SnapshotPerspectives;
import org.sonar.core.component.db.ComponentDao;
import org.sonar.core.config.CorePropertyDefinitions;
import org.sonar.core.metric.DefaultMetricFinder;
import org.sonar.core.notification.DefaultNotificationManager;
import org.sonar.core.permission.PermissionFacade;
-import org.sonar.core.persistence.*;
+import org.sonar.core.persistence.DaoUtils;
+import org.sonar.core.persistence.DatabaseVersion;
+import org.sonar.core.persistence.DefaultDatabase;
+import org.sonar.core.persistence.MyBatis;
+import org.sonar.core.persistence.PreviewDatabaseFactory;
+import org.sonar.core.persistence.SemaphoreUpdater;
+import org.sonar.core.persistence.SemaphoresImpl;
import org.sonar.core.preview.PreviewCache;
import org.sonar.core.profiling.Profiling;
import org.sonar.core.purge.PurgeProfiler;
import org.sonar.jpa.session.DefaultDatabaseConnector;
import org.sonar.jpa.session.ThreadLocalDatabaseSessionFactory;
import org.sonar.server.charts.ChartFactory;
+import org.sonar.server.cluster.LocalNonBlockingWorkQueue;
import org.sonar.server.component.DefaultComponentFinder;
import org.sonar.server.component.DefaultRubyComponentService;
import org.sonar.server.db.EmbeddedDatabaseFactory;
import org.sonar.server.db.migrations.DatabaseMigrations;
import org.sonar.server.db.migrations.DatabaseMigrator;
-import org.sonar.server.debt.*;
+import org.sonar.server.debt.DebtCharacteristicsXMLImporter;
+import org.sonar.server.debt.DebtModelBackup;
+import org.sonar.server.debt.DebtModelLookup;
+import org.sonar.server.debt.DebtModelOperations;
+import org.sonar.server.debt.DebtModelPluginRepository;
+import org.sonar.server.debt.DebtModelService;
+import org.sonar.server.debt.DebtModelXMLExporter;
+import org.sonar.server.debt.DebtRulesXMLImporter;
import org.sonar.server.es.ESIndex;
import org.sonar.server.es.ESNode;
-import org.sonar.server.issue.*;
+import org.sonar.server.issue.ActionPlanService;
+import org.sonar.server.issue.ActionService;
+import org.sonar.server.issue.AssignAction;
+import org.sonar.server.issue.CommentAction;
+import org.sonar.server.issue.DefaultIssueFinder;
+import org.sonar.server.issue.InternalRubyIssueService;
+import org.sonar.server.issue.IssueBulkChangeService;
+import org.sonar.server.issue.IssueChangelogFormatter;
+import org.sonar.server.issue.IssueChangelogService;
+import org.sonar.server.issue.IssueCommentService;
+import org.sonar.server.issue.IssueService;
+import org.sonar.server.issue.IssueStatsFinder;
+import org.sonar.server.issue.PlanAction;
+import org.sonar.server.issue.PublicRubyIssueService;
+import org.sonar.server.issue.ServerIssueStorage;
+import org.sonar.server.issue.SetSeverityAction;
+import org.sonar.server.issue.TransitionAction;
import org.sonar.server.issue.filter.IssueFilterService;
import org.sonar.server.issue.filter.IssueFilterWs;
import org.sonar.server.issue.ws.ActionPlanWs;
import org.sonar.server.permission.PermissionFinder;
import org.sonar.server.platform.ws.RestartHandler;
import org.sonar.server.platform.ws.SystemWs;
-import org.sonar.server.plugins.*;
+import org.sonar.server.plugins.BatchWs;
+import org.sonar.server.plugins.InstalledPluginReferentialFactory;
+import org.sonar.server.plugins.PluginDownloader;
+import org.sonar.server.plugins.ServerExtensionInstaller;
+import org.sonar.server.plugins.ServerPluginJarInstaller;
+import org.sonar.server.plugins.ServerPluginJarsInstaller;
+import org.sonar.server.plugins.ServerPluginRepository;
+import org.sonar.server.plugins.UpdateCenterClient;
+import org.sonar.server.plugins.UpdateCenterMatrixFactory;
import org.sonar.server.qualitygate.QgateProjectFinder;
import org.sonar.server.qualitygate.QualityGates;
import org.sonar.server.qualitygate.RegisterQualityGates;
import org.sonar.server.qualitygate.ws.QgateAppHandler;
import org.sonar.server.qualitygate.ws.QualityGatesWs;
-import org.sonar.server.qualityprofile.*;
+import org.sonar.server.qualityprofile.ESActiveRule;
+import org.sonar.server.qualityprofile.ProfilesManager;
+import org.sonar.server.qualityprofile.QProfileActiveRuleOperations;
+import org.sonar.server.qualityprofile.QProfileBackup;
+import org.sonar.server.qualityprofile.QProfileLookup;
+import org.sonar.server.qualityprofile.QProfileOperations;
+import org.sonar.server.qualityprofile.QProfileProjectLookup;
+import org.sonar.server.qualityprofile.QProfileProjectOperations;
+import org.sonar.server.qualityprofile.QProfileRepositoryExporter;
+import org.sonar.server.qualityprofile.QProfileRuleLookup;
+import org.sonar.server.qualityprofile.QProfiles;
import org.sonar.server.qualityprofile.ws.QProfileBackupWsHandler;
import org.sonar.server.qualityprofile.ws.QProfilesWs;
-import org.sonar.server.rule.*;
-import org.sonar.server.rule.ws.*;
+import org.sonar.server.rule.DeprecatedRulesDefinition;
+import org.sonar.server.rule.ESRuleTags;
+import org.sonar.server.rule.RegisterRules;
+import org.sonar.server.rule.RubyRuleService;
+import org.sonar.server.rule.RuleDefinitionsLoader;
+import org.sonar.server.rule.RuleOperations;
+import org.sonar.server.rule.RuleRegistry;
+import org.sonar.server.rule.RuleRepositories;
+import org.sonar.server.rule.RuleTagLookup;
+import org.sonar.server.rule.RuleTagOperations;
+import org.sonar.server.rule.RuleTags;
+import org.sonar.server.rule.Rules;
+import org.sonar.server.rule.ws.AddTagsWsHandler;
+import org.sonar.server.rule.ws.RemoveTagsWsHandler;
+import org.sonar.server.rule.ws.RuleSearchWsHandler;
+import org.sonar.server.rule.ws.RuleShowWsHandler;
+import org.sonar.server.rule.ws.RuleTagsWs;
+import org.sonar.server.rule.ws.RulesWs;
import org.sonar.server.source.CodeColorizers;
import org.sonar.server.source.DeprecatedSourceDecorator;
import org.sonar.server.source.HtmlSourceDecorator;
import org.sonar.server.source.SourceService;
import org.sonar.server.source.ws.SourcesShowWsHandler;
import org.sonar.server.source.ws.SourcesWs;
-import org.sonar.server.startup.*;
+import org.sonar.server.startup.CleanPreviewAnalysisCache;
+import org.sonar.server.startup.CopyRequirementsFromCharacteristicsToRules;
+import org.sonar.server.startup.GeneratePluginIndex;
+import org.sonar.server.startup.GwtPublisher;
+import org.sonar.server.startup.JdbcDriverDeployer;
+import org.sonar.server.startup.LogServerId;
+import org.sonar.server.startup.RegisterDashboards;
+import org.sonar.server.startup.RegisterDebtModel;
+import org.sonar.server.startup.RegisterMetrics;
+import org.sonar.server.startup.RegisterNewMeasureFilters;
+import org.sonar.server.startup.RegisterPermissionTemplates;
+import org.sonar.server.startup.RegisterQualityProfiles;
+import org.sonar.server.startup.RegisterServletFilters;
+import org.sonar.server.startup.RenameDeprecatedPropertyKeys;
+import org.sonar.server.startup.ServerMetadataPersister;
import org.sonar.server.text.MacroInterpreter;
import org.sonar.server.text.RubyTextService;
import org.sonar.server.ui.JRubyI18n;
import org.sonar.server.ui.JRubyProfiling;
import org.sonar.server.ui.PageDecorations;
import org.sonar.server.ui.Views;
-import org.sonar.server.user.*;
-import org.sonar.server.util.*;
+import org.sonar.server.user.DefaultUserService;
+import org.sonar.server.user.DoPrivileged;
+import org.sonar.server.user.GroupMembershipFinder;
+import org.sonar.server.user.GroupMembershipService;
+import org.sonar.server.user.NewUserNotifier;
+import org.sonar.server.user.SecurityRealmFactory;
+import org.sonar.server.util.BooleanTypeValidation;
+import org.sonar.server.util.FloatTypeValidation;
+import org.sonar.server.util.IntegerTypeValidation;
+import org.sonar.server.util.StringListTypeValidation;
+import org.sonar.server.util.StringTypeValidation;
+import org.sonar.server.util.TextTypeValidation;
+import org.sonar.server.util.TypeValidations;
import org.sonar.server.ws.ListingWs;
import org.sonar.server.ws.WebServiceEngine;
components.addAll(CorePropertyDefinitions.all());
components.addAll(DatabaseMigrations.CLASSES);
components.addAll(DaoUtils.getDaoClasses());
+ components.addAll(IndexUtils.getIndexClasses());
return components;
}
*/
package org.sonar.server.rule2;
+import org.sonar.core.cluster.IndexAction;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
//Use a MyBatis to normalize the Rule form multiple Table
return null;
}
-
- @Override
- public Collection<RuleKey> synchronizeSince(Long date) {
- //Use MyBatis to get the RuleKey created since date X
- return null;
- }
}
import org.elasticsearch.index.query.QueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.sonar.core.cluster.IndexAction;
+import org.sonar.core.cluster.IndexAction.Method;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.db.Dao;
import org.sonar.core.profiling.Profiling;
import org.sonar.core.profiling.StopWatch;
import java.io.Serializable;
-import java.util.Collection;
public abstract class BaseIndex<K extends Serializable> implements Index<K> {
private Client client;
private WorkQueue workQueue;
private IndexSynchronizer<K> synchronizer;
- protected Dao<?,K> dao;
+ protected Dao<?, K> dao;
- public BaseIndex(WorkQueue workQueue, Dao<?,K> dao, Profiling profiling) {
+ public BaseIndex(WorkQueue workQueue, Dao<?, K> dao, Profiling profiling) {
this.profiling = profiling;
this.workQueue = workQueue;
- this.synchronizer = IndexSynchronizer.getOnetimeSynchronizer(this, this.workQueue);
+ this.synchronizer = new IndexSynchronizer<K>(this, dao, this.workQueue);
this.dao = dao;
}
- protected Dao<?,K> getDao(){
+ protected Dao<?, K> getDao() {
return this.dao;
}
- protected Client getClient(){
+ protected Client getClient() {
return this.client;
}
return profiling.start(PROFILE_DOMAIN, Level.FULL);
}
- public void connect(){
+ public void connect() {
/* Settings to access our local ES node */
Settings settings = ImmutableSettings.settingsBuilder()
.put("client.transport.sniff", true)
}
}
-
public ClusterStatsNodes getNodesStats() {
StopWatch watch = createWatch();
try {
}
}
+ /* Index Action Methods */
- /* Index management and Tx methods */
+ @Override
+ public void executeAction(IndexAction<K> action) {
+ if (action.getMethod().equals(Method.DELETE)) {
+ this.delete(action.getKey());
+ } else if (action.getMethod().equals(Method.INSERT)) {
+ this.insert(action.getKey());
+ } else if (action.getMethod().equals(Method.UPDATE)) {
+ this.update(action.getKey());
+ }
+ }
+
+ /* Index management methods */
protected abstract Settings getIndexSettings();
protected abstract XContentBuilder getMapping();
- public abstract Collection<K> synchronizeSince(Long date);
-
-
/* Base CRUD methods */
protected abstract QueryBuilder getKeyQuery(K key);
@Override
public void setLastSynchronization(Long time) {
- if(time > (getLastSynchronization() + cooldown)){
+ if (time > (getLastSynchronization() + cooldown)) {
LOG.trace("Updating synchTime updating");
lastSynch = time;
} else {
package org.sonar.server.search;
import org.picocontainer.Startable;
+import org.sonar.core.cluster.IndexAction;
import java.io.Serializable;
-import java.util.Collection;
import java.util.Map;
public interface Index<K extends Serializable> extends Startable {
String getIndexName();
+ void executeAction(IndexAction<K> action);
+
Hit getByKey(K key);
void insert(K key);
void setLastSynchronization(Long time);
- Collection<K> synchronizeSince(Long time);
-
}
*/
package org.sonar.server.search;
-import org.sonar.core.cluster.IndexAction;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.sonar.core.cluster.IndexAction;
import org.sonar.core.cluster.WorkQueue;
+import org.sonar.core.db.Dao;
import java.io.Serializable;
private static final Logger LOG = LoggerFactory.getLogger(IndexSynchronizer.class);
- private static final Long DEFAULT_WAIT_TIME = 5000l;
-
- private long wait = 0;
- private boolean continuous;
-
private final Index<K> index;
+ private final Dao<?,K> dao;
private final WorkQueue workQueue;
- public static <K extends Serializable> IndexSynchronizer<K> getContinuousSynchronizer(Index<K> index, WorkQueue workQueue) {
- return new IndexSynchronizer<K>(index, workQueue)
- .setContinuous(true)
- .setWait(DEFAULT_WAIT_TIME);
- }
-
- public static <K extends Serializable> IndexSynchronizer<K> getContinuousSynchronizer(Index<K> index, WorkQueue workQueue, Long wait) {
- return new IndexSynchronizer<K>(index, workQueue)
- .setContinuous(true)
- .setWait(wait);
- }
-
- public static <K extends Serializable> IndexSynchronizer<K> getOnetimeSynchronizer(Index<K> index, WorkQueue workQueue) {
- return new IndexSynchronizer<K>(index, workQueue)
- .setContinuous(false);
- }
-
- private IndexSynchronizer(Index<K> index, WorkQueue workQueue) {
+ public IndexSynchronizer(Index<K> index, Dao<?,K> dao, WorkQueue workQueue) {
this.index = index;
+ this.dao = dao;
this.workQueue = workQueue;
}
- private IndexSynchronizer<K> setWait(Long wait) {
- this.wait = wait;
- return this;
- }
-
- private IndexSynchronizer<K> setContinuous(Boolean continuous) {
- this.continuous = continuous;
- return this;
- }
-
public IndexSynchronizer<K> start() {
LOG.info("Starting synchronization thread for ", index.getClass().getSimpleName());
Long since = index.getLastSynchronization();
index.setLastSynchronization(System.currentTimeMillis());
- for (K key : index.synchronizeSince(since)) {
+ for (K key : dao.keysOfRowsUpdatedAfter(since)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding {} to workQueue for {}", key, index.getClass().getSimpleName());
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.search;
+
+import com.google.common.collect.ImmutableList;
+import org.sonar.server.rule2.RuleIndex;
+
+import java.util.List;
+
+public final class IndexUtils {
+
+ private IndexUtils() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static List<Class> getIndexClasses() {
+ return ImmutableList.<Class>of(
+ RuleIndex.class
+ );
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.cluster;
+
+import org.sonar.core.cluster.IndexAction;
+import org.junit.Test;
+import static org.fest.assertions.Assertions.assertThat;
+
+public class LocalNonBlockingWorkQueueTest {
+
+ private static final String WORKING_IDNEX = "working_index";
+
+ @Test
+ public void test_enqueue_dequeue_indexAction(){
+ LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
+ queue.enqueue(new IndexAction(WORKING_IDNEX, IndexAction.Method.INSERT,new Integer(33)));
+ IndexAction action = queue.dequeue();
+ assertThat(action.getIndexName()).isEqualTo(WORKING_IDNEX);
+ assertThat(action.getKey()).isEqualTo(new Integer(33));
+ }
+
+ @Test
+ public void test_enqueue_dequeue_to_null(){
+ LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
+ queue.enqueue(new IndexAction<Integer>(WORKING_IDNEX, IndexAction.Method.INSERT,new Integer(33)));
+ queue.enqueue(new IndexAction(WORKING_IDNEX, IndexAction.Method.INSERT,new Integer(33)));
+ assertThat(queue.dequeue()).isNotNull();
+ assertThat(queue.dequeue()).isNotNull();
+ assertThat(queue.dequeue()).isNull();
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.cluster;
+
+import org.sonar.core.cluster.IndexAction;
+import org.junit.Test;
+import static org.fest.assertions.Assertions.assertThat;
+
+public class LocalQueueWorkerTest {
+
+ private static final String WORKING_IDNEX = "working_index";
+
+ @Test
+ public void test_worker_dequeue(){
+ LocalNonBlockingWorkQueue queue = new LocalNonBlockingWorkQueue();
+ LocalQueueWorker worker = new LocalQueueWorker(queue);
+ worker.start();
+ queue.enqueue(new IndexAction(WORKING_IDNEX, IndexAction.Method.INSERT,new Integer(33)));
+ queue.enqueue(new IndexAction(WORKING_IDNEX, IndexAction.Method.INSERT,new Integer(33)));
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ assertThat(queue.dequeue()).isNull();
+ queue = null;
+ }
+}
import org.sonar.api.utils.DateUtils;
import org.sonar.api.utils.MessageException;
import org.sonar.api.utils.System2;
-import org.sonar.core.cluster.LocalNonBlockingWorkQueue;
import org.sonar.core.cluster.WorkQueue;
import org.sonar.core.persistence.AbstractDaoTestCase;
import org.sonar.core.persistence.MyBatis;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class RulesTest {
*/
package org.sonar.server.rule2;
-import org.junit.Ignore;
-
-import org.sonar.core.cluster.LocalNonBlockingWorkQueue;
import com.github.tlrx.elasticsearch.test.annotations.ElasticsearchNode;
import com.github.tlrx.elasticsearch.test.support.junit.runners.ElasticsearchRunner;
import org.elasticsearch.node.Node;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.sonar.api.config.Settings;
import org.sonar.core.profiling.Profiling;
+import org.sonar.server.cluster.LocalNonBlockingWorkQueue;
import org.sonar.server.search.BaseIndex;
+
import static org.fest.assertions.Assertions.assertThat;
@RunWith(ElasticsearchRunner.class)
*/
package org.sonar.server.search;
+import org.sonar.core.cluster.IndexAction;
+
import com.github.tlrx.elasticsearch.test.annotations.ElasticsearchNode;
import com.github.tlrx.elasticsearch.test.support.junit.runners.ElasticsearchRunner;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.sonar.api.config.Settings;
-import org.sonar.core.cluster.LocalNonBlockingWorkQueue;
import org.sonar.core.profiling.Profiling;
+import org.sonar.server.cluster.LocalNonBlockingWorkQueue;
import java.io.Serializable;
import java.util.Collection;
}
@Override
- public Collection<Serializable> synchronizeSince(Long date) {
+ protected QueryBuilder getKeyQuery(Serializable key) {
// TODO Auto-generated method stub
return null;
}
@Override
- protected QueryBuilder getKeyQuery(Serializable key) {
+ public Map<String, Object> normalize(Serializable key) {
// TODO Auto-generated method stub
return null;
}
@Override
- public Map<String, Object> normalize(Serializable key) {
+ public void executeAction(IndexAction action) {
// TODO Auto-generated method stub
- return null;
+
}
};
}
package org.sonar.server.ws;
import org.junit.Test;
-import org.sonar.api.server.ws.*;
+import org.sonar.api.server.ws.Request;
+import org.sonar.api.server.ws.RequestHandler;
+import org.sonar.api.server.ws.Response;
+import org.sonar.api.server.ws.WebService;
import static org.fest.assertions.Assertions.assertThat;