aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java10
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java46
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java44
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java8
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java12
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java (renamed from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java)25
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java (renamed from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java)22
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java12
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java237
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java50
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java103
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java2
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java10
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java2
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java127
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java56
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java13
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java (renamed from server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java)69
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java4
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java208
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java55
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java78
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java223
25 files changed, 1295 insertions, 127 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java
index 08cd1464c45..0b53f4b6adb 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java
@@ -25,8 +25,8 @@ import java.util.stream.Collectors;
import org.picocontainer.Startable;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerController;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
-import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
import org.sonar.process.Jmx;
import org.sonar.process.systeminfo.SystemInfoSection;
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
@@ -35,13 +35,13 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect
private final CEQueueStatus queueStatus;
private final CeConfiguration ceConfiguration;
private final CeWorkerFactory ceWorkerFactory;
- private final EnabledCeWorkerController enabledCeWorkerController;
+ private final CeWorkerController ceWorkerController;
- public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) {
+ public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, CeWorkerController CeWorkerController) {
this.queueStatus = queueStatus;
this.ceConfiguration = ceConfiguration;
this.ceWorkerFactory = ceWorkerFactory;
- this.enabledCeWorkerController = enabledCeWorkerController;
+ this.ceWorkerController = CeWorkerController;
}
@Override
@@ -105,7 +105,7 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect
public List<String> getEnabledWorkerUuids() {
Set<CeWorker> workers = ceWorkerFactory.getWorkers();
return workers.stream()
- .filter(enabledCeWorkerController::isEnabled)
+ .filter(ceWorkerController::isEnabled)
.map(CeWorker::getUUID)
.sorted()
.collect(Collectors.toList());
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
index 17ca3b6f474..6cd77dddbbf 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -37,8 +37,8 @@ import org.sonar.ce.container.ComputeEngineStatus;
import org.sonar.ce.monitoring.CEQueueStatus;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskResult;
-import org.sonar.ce.task.projectanalysis.component.VisitException;
import org.sonar.ce.task.TypedException;
+import org.sonar.ce.task.projectanalysis.component.VisitException;
import org.sonar.core.util.UuidFactory;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
index 2a25ae0cbe0..4ce1f7210ec 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
@@ -41,12 +41,12 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
private final long delayBetweenEnabledTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
- private final EnabledCeWorkerController ceWorkerController;
+ private final CeWorkerController ceWorkerController;
private final int gracefulStopTimeoutInMs;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
- EnabledCeWorkerController ceWorkerController) {
+ CeWorkerController ceWorkerController) {
this.executorService = processingExecutorService;
this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java
new file mode 100644
index 00000000000..e082dd276c4
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java
@@ -0,0 +1,46 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.picocontainer.injectors.ProviderAdapter;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class CeTaskInterrupterProvider extends ProviderAdapter {
+ private static final String PROPERTY_CE_TASK_TIMEOUT = "sonar.ce.task.timeoutSeconds";
+
+ private CeTaskInterrupter instance;
+
+ public CeTaskInterrupter provide(Configuration configuration, CeWorkerController ceWorkerController, System2 system2) {
+ if (instance == null) {
+ instance = configuration.getLong(PROPERTY_CE_TASK_TIMEOUT)
+ .filter(timeOutInSeconds -> {
+ checkState(timeOutInSeconds >= 1, "The property '%s' must be a long value >= 1. Got '%s'", PROPERTY_CE_TASK_TIMEOUT, timeOutInSeconds);
+ return true;
+ })
+ .map(timeOutInSeconds -> (CeTaskInterrupter) new TimeoutCeTaskInterrupter(timeOutInSeconds * 1_000L, ceWorkerController, system2))
+ .orElseGet(SimpleCeTaskInterrupter::new);
+ }
+ return instance;
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java
new file mode 100644
index 00000000000..4cb42c123f4
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java
@@ -0,0 +1,44 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import javax.annotation.Nullable;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterrupter;
+import org.sonar.ce.task.CeTaskResult;
+import org.sonar.db.ce.CeActivityDto;
+
+public class CeTaskInterrupterWorkerExecutionListener implements CeWorker.ExecutionListener {
+ private final CeTaskInterrupter interrupter;
+
+ public CeTaskInterrupterWorkerExecutionListener(CeTaskInterrupter interrupter) {
+ this.interrupter = interrupter;
+ }
+
+ @Override
+ public void onStart(CeTask ceTask) {
+ interrupter.onStart(ceTask);
+ }
+
+ @Override
+ public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ interrupter.onEnd(ceTask);
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
index 4f3b0ba6989..bbd351a844b 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
@@ -29,9 +29,13 @@ public class CeTaskProcessorModule extends Module {
CeTaskProcessorRepositoryImpl.class,
CeLoggingWorkerExecutionListener.class,
ReportAnalysisFailureNotificationExecutionListener.class,
+ new CeTaskInterrupterProvider(),
+ CeTaskInterrupterWorkerExecutionListener.class,
CeWorkerFactoryImpl.class,
- EnabledCeWorkerControllerImpl.class,
+ CeWorkerControllerImpl.class,
CeProcessingSchedulerExecutorServiceImpl.class,
- CeProcessingSchedulerImpl.class);
+ CeProcessingSchedulerImpl.class
+
+ );
}
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
index 74b416fd0bb..8bfeffc4017 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
@@ -19,6 +19,7 @@
*/
package org.sonar.ce.taskprocessor;
+import java.util.Optional;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.sonar.ce.queue.CeQueue;
@@ -32,6 +33,7 @@ import org.sonar.db.ce.CeActivityDto;
* {@code false} otherwise.
*/
public interface CeWorker extends Callable<CeWorker.Result> {
+
enum Result {
/** Worker is disabled */
DISABLED,
@@ -52,6 +54,16 @@ public interface CeWorker extends Callable<CeWorker.Result> {
String getUUID();
/**
+ * @return {@code true} if this CeWorker currently being executed by the specified {@link Thread}.
+ */
+ boolean isExecutedBy(Thread thread);
+
+ /**
+ * @return the {@link CeTask} currently being executed by this worker, if any.
+ */
+ Optional<CeTask> getCurrentTask();
+
+ /**
* Classes implementing will be called a task start and finishes executing.
* All classes implementing this interface are guaranted to be called for each event, even if another implementation
* failed when called.
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java
index e11063d311c..df0d32820bc 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java
@@ -19,25 +19,40 @@
*/
package org.sonar.ce.taskprocessor;
+import java.util.Optional;
+
/**
* This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a
* task to process.
*/
-public interface EnabledCeWorkerController {
+public interface CeWorkerController {
interface ProcessingRecorderHook extends AutoCloseable {
+ /**
+ * Override to not declare any exception thrown.
+ */
+ @Override
+ void close();
}
/**
+ * Registers to the controller that the specified {@link CeWorker}
+ */
+ ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+
+ /**
* Returns {@code true} if the specified {@link CeWorker} is enabled
*/
boolean isEnabled(CeWorker ceWorker);
- ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+ /**
+ * @return the {@link CeWorker} running in the specified {@link Thread}, if any.
+ */
+ Optional<CeWorker> getCeWorkerIn(Thread thread);
/**
- * Whether at least one worker is being processed a task or not.
- * Returns {@code false} when all workers are waiting for tasks
- * or are being stopped.
+ * Whether at least one worker is processing a task or not.
+ *
+ * @return {@code false} when all workers are waiting for tasks or are being stopped.
*/
boolean hasAtLeastOneProcessingWorker();
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java
index 7370647a589..114be1e47f1 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java
@@ -19,19 +19,20 @@
*/
package org.sonar.ce.taskprocessor;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
-public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
- private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
+public class CeWorkerControllerImpl implements CeWorkerController {
+ private final ConcurrentHashMap<CeWorker, Status> workerStatuses = new ConcurrentHashMap<>();
private final CeConfiguration ceConfiguration;
enum Status {
PROCESSING, PAUSED
}
- public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
+ public CeWorkerControllerImpl(CeConfiguration ceConfiguration) {
this.ceConfiguration = ceConfiguration;
logEnabledWorkerCount();
}
@@ -39,18 +40,25 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController
private void logEnabledWorkerCount() {
int workerCount = ceConfiguration.getWorkerCount();
if (workerCount > 1) {
- Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
+ Loggers.get(CeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
}
}
@Override
+ public Optional<CeWorker> getCeWorkerIn(Thread thread) {
+ return workerStatuses.keySet().stream()
+ .filter(t -> t.isExecutedBy(thread))
+ .findFirst();
+ }
+
+ @Override
public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
return new ProcessingRecorderHookImpl(ceWorker);
}
@Override
public boolean hasAtLeastOneProcessingWorker() {
- return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
+ return workerStatuses.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
}
/**
@@ -69,12 +77,12 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController
private ProcessingRecorderHookImpl(CeWorker ceWorker) {
this.ceWorker = ceWorker;
- map.put(this.ceWorker, Status.PROCESSING);
+ workerStatuses.put(this.ceWorker, Status.PROCESSING);
}
@Override
public void close() {
- map.put(ceWorker, Status.PAUSED);
+ workerStatuses.put(ceWorker, Status.PAUSED);
}
}
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
index 5ddff907822..6a508b5118d 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
@@ -30,7 +30,7 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory {
private final UuidFactory uuidFactory;
private final InternalCeQueue queue;
private final CeTaskProcessorRepository taskProcessorRepository;
- private final EnabledCeWorkerController enabledCeWorkerController;
+ private final CeWorkerController ceWorkerController;
private final CeWorker.ExecutionListener[] executionListeners;
private Set<CeWorker> ceWorkers = Collections.emptySet();
@@ -38,24 +38,24 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory {
* Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container.
*/
public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController) {
- this(queue, taskProcessorRepository, uuidFactory, enabledCeWorkerController, new CeWorker.ExecutionListener[0]);
+ UuidFactory uuidFactory, CeWorkerController ceWorkerController) {
+ this(queue, taskProcessorRepository, uuidFactory, ceWorkerController, new CeWorker.ExecutionListener[0]);
}
public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController,
+ UuidFactory uuidFactory, CeWorkerController ceWorkerController,
CeWorker.ExecutionListener[] executionListeners) {
this.queue = queue;
this.taskProcessorRepository = taskProcessorRepository;
this.uuidFactory = uuidFactory;
- this.enabledCeWorkerController = enabledCeWorkerController;
+ this.ceWorkerController = ceWorkerController;
this.executionListeners = executionListeners;
}
@Override
public CeWorker create(int ordinal) {
String uuid = uuidFactory.create();
- CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+ CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, ceWorkerController, executionListeners);
ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1));
return ceWorker;
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
index f2c775a120a..8bb2c17023e 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
@@ -23,7 +23,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
@@ -32,6 +34,7 @@ import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.queue.InternalCeQueue;
import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterruptedException;
import org.sonar.ce.task.CeTaskResult;
import org.sonar.ce.task.taskprocessor.CeTaskProcessor;
import org.sonar.core.util.logs.Profiler;
@@ -39,9 +42,11 @@ import org.sonar.db.ce.CeActivityDto;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
+import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException;
import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED;
import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
+import static org.sonar.db.ce.CeActivityDto.Status.FAILED;
public class CeWorkerImpl implements CeWorker {
@@ -51,18 +56,19 @@ public class CeWorkerImpl implements CeWorker {
private final String uuid;
private final InternalCeQueue queue;
private final CeTaskProcessorRepository taskProcessorRepository;
- private final EnabledCeWorkerController enabledCeWorkerController;
+ private final CeWorkerController ceWorkerController;
private final List<ExecutionListener> listeners;
+ private final AtomicReference<RunningState> runningState = new AtomicReference<>();
public CeWorkerImpl(int ordinal, String uuid,
InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- EnabledCeWorkerController enabledCeWorkerController,
+ CeWorkerController ceWorkerController,
ExecutionListener... listeners) {
this.ordinal = checkOrdinal(ordinal);
this.uuid = uuid;
this.queue = queue;
this.taskProcessorRepository = taskProcessorRepository;
- this.enabledCeWorkerController = enabledCeWorkerController;
+ this.ceWorkerController = ceWorkerController;
this.listeners = Arrays.asList(listeners);
}
@@ -73,22 +79,71 @@ public class CeWorkerImpl implements CeWorker {
@Override
public Result call() {
- return withCustomizedThreadName(this::findAndProcessTask);
+ try (TrackRunningState trackRunningState = new TrackRunningState(this::findAndProcessTask)) {
+ return trackRunningState.get();
+ }
}
- private <T> T withCustomizedThreadName(Supplier<T> supplier) {
- Thread currentThread = Thread.currentThread();
- String oldName = currentThread.getName();
- try {
- currentThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
- return supplier.get();
- } finally {
- currentThread.setName(oldName);
+ @Override
+ public int getOrdinal() {
+ return ordinal;
+ }
+
+ @Override
+ public String getUUID() {
+ return uuid;
+ }
+
+ @Override
+ public boolean isExecutedBy(Thread thread) {
+ return Optional.ofNullable(runningState.get())
+ .filter(state -> state.runningThread.equals(thread))
+ .isPresent();
+ }
+
+ @Override
+ public Optional<CeTask> getCurrentTask() {
+ return Optional.ofNullable(runningState.get())
+ .flatMap(RunningState::getTask);
+ }
+
+ private class TrackRunningState implements AutoCloseable, Supplier<Result> {
+ private final RunningState localRunningState;
+ private final Function<RunningState, Result> delegate;
+ private final String oldName;
+
+ private TrackRunningState(Function<RunningState, Result> delegate) {
+ Thread currentThread = Thread.currentThread();
+ localRunningState = new RunningState(currentThread);
+ if (!runningState.compareAndSet(null, localRunningState)) {
+ LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " +
+ "Forcefully updating Workers's running state to new Thread.",
+ getOrdinal(), getUUID(), currentThread);
+ runningState.set(localRunningState);
+ }
+ this.delegate = delegate;
+ this.oldName = currentThread.getName();
+ }
+
+ @Override
+ public Result get() {
+ localRunningState.runningThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
+ return delegate.apply(localRunningState);
+ }
+
+ @Override
+ public void close() {
+ localRunningState.runningThread.setName(oldName);
+ if (!runningState.compareAndSet(localRunningState, null)) {
+ LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." +
+ " Keeping this new state.",
+ getOrdinal(), getUUID(), localRunningState.runningThread);
+ }
}
}
- private Result findAndProcessTask() {
- if (!enabledCeWorkerController.isEnabled(this)) {
+ private Result findAndProcessTask(RunningState localRunningState) {
+ if (!ceWorkerController.isEnabled(this)) {
return DISABLED;
}
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
@@ -96,8 +151,9 @@ public class CeWorkerImpl implements CeWorker {
return NO_TASK;
}
- try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) {
- executeTask(ceTask.get());
+ try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this);
+ ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
+ executeTask.run();
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
}
@@ -113,68 +169,95 @@ public class CeWorkerImpl implements CeWorker {
return Optional.empty();
}
- @Override
- public int getOrdinal() {
- return ordinal;
- }
+ private final class ExecuteTask implements Runnable, AutoCloseable {
+ private final CeTask task;
+ private final RunningState localRunningState;
+ private final Profiler ceProfiler;
+ private CeActivityDto.Status status = FAILED;
+ private CeTaskResult taskResult = null;
+ private Throwable error = null;
- @Override
- public String getUUID() {
- return uuid;
- }
+ private ExecuteTask(RunningState localRunningState, CeTask task) {
+ this.task = task;
+ this.localRunningState = localRunningState;
+ this.ceProfiler = startLogProfiler(task);
+ }
- private void executeTask(CeTask task) {
- callListeners(t -> t.onStart(task));
- Profiler ceProfiler = startLogProfiler(task);
+ @Override
+ public void run() {
+ beforeExecute();
+ executeTask();
+ }
- CeActivityDto.Status status = CeActivityDto.Status.FAILED;
- CeTaskResult taskResult = null;
- Throwable error = null;
- try {
- // TODO delegate the message to the related task processor, according to task type
- Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
- if (taskProcessor.isPresent()) {
- taskResult = taskProcessor.get().process(task);
- status = CeActivityDto.Status.SUCCESS;
- } else {
- LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
- status = CeActivityDto.Status.FAILED;
+ @Override
+ public void close() {
+ afterExecute();
+ }
+
+ private void beforeExecute() {
+ localRunningState.setTask(task);
+ callListeners(t -> t.onStart(task));
+ }
+
+ private void executeTask() {
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+ if (taskProcessor.isPresent()) {
+ taskResult = taskProcessor.get().process(task);
+ status = CeActivityDto.Status.SUCCESS;
+ } else {
+ LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+ status = FAILED;
+ }
+ } catch (MessageException e) {
+ // error
+ error = e;
+ } catch (Throwable e) {
+ Optional<CeTaskInterruptedException> taskInterruptedException = isTaskInterruptedException(e);
+ if (taskInterruptedException.isPresent()) {
+ LOG.trace("Task interrupted", e);
+ CeTaskInterruptedException exception = taskInterruptedException.get();
+ CeActivityDto.Status interruptionStatus = exception.getStatus();
+ status = interruptionStatus;
+ error = (interruptionStatus == FAILED ? exception : null);
+ } else {
+ // error
+ LOG.error("Failed to execute task {}", task.getUuid(), e);
+ error = e;
+ }
}
- } catch (MessageException e) {
- // error
- error = e;
- } catch (Throwable e) {
- // error
- LOG.error("Failed to execute task {}", task.getUuid(), e);
- error = e;
- } finally {
+ }
+
+ private void afterExecute() {
+ localRunningState.setTask(null);
finalizeTask(task, ceProfiler, status, taskResult, error);
}
- }
- private void callListeners(Consumer<ExecutionListener> call) {
- listeners.forEach(listener -> {
+ private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
try {
- call.accept(listener);
- } catch (Throwable t) {
- LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t);
+ queue.remove(task, status, taskResult, error);
+ } catch (Exception e) {
+ if (error != null) {
+ e.addSuppressed(error);
+ }
+ LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+ } finally {
+ // finalize
+ stopLogProfiler(ceProfiler, status);
+ callListeners(t -> t.onEnd(task, status, taskResult, error));
}
- });
- }
+ }
- private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
- @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
- try {
- queue.remove(task, status, taskResult, error);
- } catch (Exception e) {
- if (error != null) {
- e.addSuppressed(error);
- }
- LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
- } finally {
- // finalize
- stopLogProfiler(ceProfiler, status);
- callListeners(t -> t.onEnd(task, status, taskResult, error));
+ private void callListeners(Consumer<ExecutionListener> call) {
+ listeners.forEach(listener -> {
+ try {
+ call.accept(listener);
+ } catch (Throwable t) {
+ LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t);
+ }
+ });
}
}
@@ -210,4 +293,22 @@ public class CeWorkerImpl implements CeWorker {
profiler.addContext("status", status.name());
profiler.stopInfo("Executed task");
}
+
+ private static final class RunningState {
+ private final Thread runningThread;
+ private CeTask task;
+
+ private RunningState(Thread runningThread) {
+ this.runningThread = runningThread;
+ }
+
+ public Optional<CeTask> getTask() {
+ return Optional.ofNullable(task);
+ }
+
+ public void setTask(@Nullable CeTask task) {
+ this.task = task;
+ }
+ }
+
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java
new file mode 100644
index 00000000000..0847cbf2248
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java
@@ -0,0 +1,50 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+import org.sonar.ce.task.CeTaskInterruptedException;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+/**
+ * An implementation of {@link CeTaskInterrupter} which will only interrupt the processing if the current thread
+ * has been interrupted.
+ *
+ * @see Thread#isInterrupted()
+ */
+public class SimpleCeTaskInterrupter implements CeTaskInterrupter {
+ @Override
+ public void check(Thread currentThread) throws CeTaskInterruptedException {
+ if (currentThread.isInterrupted()) {
+ throw new CeTaskCanceledException(currentThread);
+ }
+ }
+
+ @Override
+ public void onStart(CeTask ceTask) {
+ // nothing to do
+ }
+
+ @Override
+ public void onEnd(CeTask ceTask) {
+ // nothing to do
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java
new file mode 100644
index 00000000000..1bebdee8819
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java
@@ -0,0 +1,103 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterruptedException;
+import org.sonar.ce.task.CeTaskTimeoutException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+
+/**
+ * An implementation of {@link org.sonar.ce.task.CeTaskInterrupter} which interrupts the processing of the task
+ * if:
+ * <ul>
+ * <li>the thread has been interrupted</li>
+ * <li>it's been running for more than a certain, configurable, amount of time</li>
+ * </ul>
+ */
+public class TimeoutCeTaskInterrupter extends SimpleCeTaskInterrupter {
+ private final long taskTimeoutThreshold;
+ private final CeWorkerController ceWorkerController;
+ private final System2 system2;
+ private final Map<String, Long> startTimestampByCeTaskUuid = new HashMap<>();
+
+ public TimeoutCeTaskInterrupter(long taskTimeoutThreshold, CeWorkerController ceWorkerController, System2 system2) {
+ checkArgument(taskTimeoutThreshold >= 1, "threshold must be >= 1");
+ Loggers.get(TimeoutCeTaskInterrupter.class).info("Compute Engine Task timeout enabled: {} ms", taskTimeoutThreshold);
+
+ this.taskTimeoutThreshold = taskTimeoutThreshold;
+ this.ceWorkerController = ceWorkerController;
+ this.system2 = system2;
+ }
+
+ @Override
+ public void check(Thread currentThread) throws CeTaskInterruptedException {
+ super.check(currentThread);
+
+ computeTimeOutOf(taskOf(currentThread))
+ .ifPresent(timeout -> {
+ throw new CeTaskTimeoutException(format("Execution of task timed out after %s ms", timeout));
+ });
+ }
+
+ private Optional<Long> computeTimeOutOf(CeTask ceTask) {
+ Long startTimestamp = startTimestampByCeTaskUuid.get(ceTask.getUuid());
+ checkState(startTimestamp != null, "No start time recorded for task %s", ceTask.getUuid());
+
+ long duration = system2.now() - startTimestamp;
+ return Optional.of(duration)
+ .filter(t -> t > taskTimeoutThreshold);
+ }
+
+ private CeTask taskOf(Thread currentThread) {
+ return ceWorkerController.getCeWorkerIn(currentThread)
+ .flatMap(CeWorker::getCurrentTask)
+ .orElseThrow(() -> new IllegalStateException(format("Could not find the CeTask being executed in thread '%s'", currentThread.getName())));
+ }
+
+ @Override
+ public void onStart(CeTask ceTask) {
+ long now = system2.now();
+ Long existingTimestamp = startTimestampByCeTaskUuid.put(ceTask.getUuid(), now);
+ if (existingTimestamp != null) {
+ Loggers.get(TimeoutCeTaskInterrupter.class)
+ .warn("Notified of start of execution of task %s but start had already been recorded at %s. Recording new start at %s",
+ ceTask.getUuid(), existingTimestamp, now);
+ }
+ }
+
+ @Override
+ public void onEnd(CeTask ceTask) {
+ Long startTimestamp = startTimestampByCeTaskUuid.remove(ceTask.getUuid());
+ if (startTimestamp == null) {
+ Loggers.get(TimeoutCeTaskInterrupter.class)
+ .warn("Notified of end of execution of task %s but start wasn't recorded", ceTask.getUuid());
+ }
+ }
+
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java
index 046bfd3d8c0..a05b5ebe74b 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java
@@ -103,7 +103,7 @@ public class ComputeEngineContainerImplTest {
+ 3 // content of CeHttpModule
+ 3 // content of CeTaskCommonsModule
+ 4 // content of ProjectAnalysisTaskModule
- + 7 // content of CeTaskProcessorModule
+ + 9 // content of CeTaskProcessorModule
+ 3 // content of ReportAnalysisFailureNotificationModule
+ 3 // CeCleaningModule + its content
+ 4 // WebhookModule
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java
index 18cc1c0462a..53c83fc0138 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java
@@ -35,8 +35,8 @@ import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerController;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
-import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
import org.sonar.core.util.stream.MoreCollectors;
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
@@ -61,8 +61,8 @@ public class CeTasksMBeanImplTest {
})
.collect(MoreCollectors.toSet());
- private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
- private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController);
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+ private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), ceWorkerController);
@Test
public void register_and_unregister() throws Exception {
@@ -124,9 +124,9 @@ public class CeTasksMBeanImplTest {
for (CeWorker worker : WORKERS) {
if (i < enabledWorkerCount) {
enabledWorkers[i] = worker;
- when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true);
+ when(ceWorkerController.isEnabled(worker)).thenReturn(true);
} else {
- when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false);
+ when(ceWorkerController.isEnabled(worker)).thenReturn(false);
}
i++;
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
index 9c1f68683e1..2fdbfdf9029 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -78,7 +78,7 @@ public class CeProcessingSchedulerImplTest {
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
- private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration);
+ private CeWorkerController ceWorkerController = new CeWorkerControllerImpl(ceConfiguration);
private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java
new file mode 100644
index 00000000000..05062eb1c76
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java
@@ -0,0 +1,127 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.lang.reflect.Field;
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.config.internal.MapSettings;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class CeTaskInterrupterProviderTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private MapSettings settings = new MapSettings();
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+ private System2 system2 = mock(System2.class);
+ private CeTaskInterrupterProvider underTest = new CeTaskInterrupterProvider();
+
+ @Test
+ public void provide_returns_a_SimpleCeTaskInterrupter_instance_if_configuration_is_empty() {
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isInstanceOf(SimpleCeTaskInterrupter.class);
+ }
+
+ @Test
+ public void provide_always_return_the_same_SimpleCeTaskInterrupter_instance() {
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2))
+ .isSameAs(underTest.provide(new MapSettings().asConfig(), ceWorkerController, system2));
+ }
+
+ @Test
+ public void provide_returns_a_TimeoutCeTaskInterrupter_instance_if_property_taskTimeout_has_a_value() throws IllegalAccessException, NoSuchFieldException {
+ int timeout = 1 + new Random().nextInt(2222);
+ settings.setProperty("sonar.ce.task.timeoutSeconds", timeout);
+
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isInstanceOf(TimeoutCeTaskInterrupter.class);
+
+ assertThat(readField(instance, "taskTimeoutThreshold"))
+ .isEqualTo(timeout * 1_000L);
+ assertThat(readField(instance, "ceWorkerController"))
+ .isSameAs(ceWorkerController);
+ assertThat(readField(instance, "system2"))
+ .isSameAs(system2);
+ }
+
+ @Test
+ public void provide_fails_with_ISE_if_property_is_not_a_long() {
+ settings.setProperty("sonar.ce.task.timeoutSeconds", "foo");
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' is not an long value: For input string: \"foo\"");
+
+ underTest.provide(settings.asConfig(), ceWorkerController, system2);
+ }
+
+ @Test
+ public void provide_fails_with_ISE_if_property_is_zero() {
+ settings.setProperty("sonar.ce.task.timeoutSeconds", "0");
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '0'");
+
+ underTest.provide(settings.asConfig(), ceWorkerController, system2);
+ }
+
+ @Test
+ public void provide_fails_with_ISE_if_property_is_less_than_zero() {
+ int negativeValue = -(1 + new Random().nextInt(1_212));
+ settings.setProperty("sonar.ce.task.timeoutSeconds", negativeValue);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '" + negativeValue + "'");
+
+ underTest.provide(settings.asConfig(), ceWorkerController, system2);
+ }
+
+ @Test
+ public void provide_always_return_the_same_TimeoutCeTaskInterrupter_instance() {
+ int timeout = 1 + new Random().nextInt(2222);
+ settings.setProperty("sonar.ce.task.timeoutSeconds", timeout);
+
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2))
+ .isSameAs(underTest.provide(new MapSettings().setProperty("sonar.ce.task.timeoutSeconds", 999).asConfig(), ceWorkerController, system2));
+ }
+
+ private static Object readField(CeTaskInterrupter instance, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+ Class<?> clazz = instance.getClass();
+ Field timeoutField = clazz.getDeclaredField(fieldName);
+ timeoutField.setAccessible(true);
+ return timeoutField.get(instance);
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java
new file mode 100644
index 00000000000..a2e12caaf77
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java
@@ -0,0 +1,56 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Random;
+import org.junit.Test;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterrupter;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class CeTaskInterrupterWorkerExecutionListenerTest {
+ private CeTaskInterrupter ceTaskInterrupter = mock(CeTaskInterrupter.class);
+ private CeTaskInterrupterWorkerExecutionListener underTest = new CeTaskInterrupterWorkerExecutionListener(ceTaskInterrupter);
+
+
+ @Test
+ public void onStart_delegates_to_ceTaskInterrupter_onStart() {
+ CeTask ceTask = mock(CeTask.class);
+
+ underTest.onStart(ceTask);
+
+ verify(ceTaskInterrupter).onStart(same(ceTask));
+ }
+
+ @Test
+ public void onEnd_delegates_to_ceTaskInterrupter_onEnd() {
+ CeTask ceTask = mock(CeTask.class);
+ CeActivityDto.Status randomStatus = CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)];
+
+ underTest.onEnd(ceTask, randomStatus, null, null);
+
+ verify(ceTaskInterrupter).onEnd(same(ceTask));
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java
index f11d88125e8..2f90ceb80d9 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java
@@ -21,7 +21,9 @@ package org.sonar.ce.taskprocessor;
import org.junit.Test;
import org.picocontainer.ComponentAdapter;
+import org.sonar.api.config.internal.MapSettings;
import org.sonar.ce.notification.ReportAnalysisFailureNotificationExecutionListener;
+import org.sonar.ce.task.CeTaskInterrupter;
import org.sonar.core.platform.ComponentContainer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -52,4 +54,15 @@ public class CeTaskProcessorModuleTest {
.map(ComponentAdapter::getComponentImplementation))
.contains(ReportAnalysisFailureNotificationExecutionListener.class);
}
+
+ @Test
+ public void defines_CeTaskInterrupterProvider_object() {
+ ComponentContainer container = new ComponentContainer();
+
+ underTest.configure(container);
+
+
+ assertThat(container.getPicoContainer().getComponentAdapter(CeTaskInterrupter.class))
+ .isInstanceOf(CeTaskInterrupterProvider.class);
+ }
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java
index 54c90fb1b7f..14f1ac32a0a 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java
@@ -27,10 +27,12 @@ import org.sonar.api.utils.log.LoggerLevel;
import org.sonar.ce.configuration.CeConfigurationRule;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
-public class EnabledCeWorkerControllerImplTest {
+public class CeWorkerControllerImplTest {
private Random random = new Random();
/** 1 <= workerCount <= 5 */
private int randomWorkerCount = 1 + random.nextInt(5);
@@ -42,7 +44,7 @@ public class EnabledCeWorkerControllerImplTest {
public LogTester logTester = new LogTester();
private CeWorker ceWorker = mock(CeWorker.class);
- private EnabledCeWorkerControllerImpl underTest = new EnabledCeWorkerControllerImpl(ceConfigurationRule);
+ private CeWorkerControllerImpl underTest = new CeWorkerControllerImpl(ceConfigurationRule);
@Test
public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() {
@@ -76,7 +78,7 @@ public class EnabledCeWorkerControllerImplTest {
ceConfigurationRule.setWorkerCount(1);
logTester.clear();
- new EnabledCeWorkerControllerImpl(ceConfigurationRule);
+ new CeWorkerControllerImpl(ceConfigurationRule);
assertThat(logTester.logs()).isEmpty();
}
@@ -87,7 +89,7 @@ public class EnabledCeWorkerControllerImplTest {
ceConfigurationRule.setWorkerCount(newWorkerCount);
logTester.clear();
- new EnabledCeWorkerControllerImpl(ceConfigurationRule);
+ new CeWorkerControllerImpl(ceConfigurationRule);
verifyInfoLog(newWorkerCount);
}
@@ -103,6 +105,65 @@ public class EnabledCeWorkerControllerImplTest {
assertThat(underTest.isEnabled(ceWorker)).isTrue();
}
+ @Test
+ public void getCeWorkerIn_returns_empty_if_worker_is_unregistered_in_CeWorkerController() {
+ CeWorker ceWorker = mock(CeWorker.class);
+ Thread currentThread = Thread.currentThread();
+ Thread otherThread = new Thread();
+
+ mockWorkerIsRunningOnNoThread(ceWorker);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+ mockWorkerIsRunningOnThread(ceWorker, currentThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+ mockWorkerIsRunningOnThread(ceWorker, otherThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+ }
+
+ @Test
+ public void getCeWorkerIn_returns_empty_if_worker_registered_in_CeWorkerController_but_has_no_current_thread() {
+ CeWorker ceWorker = mock(CeWorker.class);
+ Thread currentThread = Thread.currentThread();
+ Thread otherThread = new Thread();
+
+ underTest.registerProcessingFor(ceWorker);
+
+ mockWorkerIsRunningOnNoThread(ceWorker);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+ }
+
+ @Test
+ public void getCeWorkerIn_returns_thread_if_worker_registered_in_CeWorkerController_but_has_a_current_thread() {
+ CeWorker ceWorker = mock(CeWorker.class);
+ Thread currentThread = Thread.currentThread();
+ Thread otherThread = new Thread();
+
+ underTest.registerProcessingFor(ceWorker);
+
+ mockWorkerIsRunningOnThread(ceWorker, currentThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).contains(ceWorker);
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+ mockWorkerIsRunningOnThread(ceWorker, otherThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).contains(ceWorker);
+ }
+
+ private void mockWorkerIsRunningOnThread(CeWorker ceWorker, Thread thread) {
+ reset(ceWorker);
+ when(ceWorker.isExecutedBy(thread)).thenReturn(true);
+ }
+
+ private void mockWorkerIsRunningOnNoThread(CeWorker ceWorker) {
+ reset(ceWorker);
+ when(ceWorker.isExecutedBy(any())).thenReturn(false);
+ }
+
private void verifyInfoLog(int workerCount) {
assertThat(logTester.logs()).hasSize(1);
assertThat(logTester.logs(LoggerLevel.INFO))
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
index 4f0603ef59f..b88fbd4f7df 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
@@ -35,7 +35,7 @@ import static org.mockito.Mockito.mock;
public class CeWorkerFactoryImplTest {
private int randomOrdinal = new Random().nextInt(20);
private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class),
- mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class));
+ mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class));
@Test
public void create_return_CeWorker_object_with_specified_ordinal() {
@@ -49,7 +49,7 @@ public class CeWorkerFactoryImplTest {
CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class);
CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class);
CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class),
- mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class),
+ mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class),
new CeWorker.ExecutionListener[] {executionListener1, executionListener2});
CeWorker ceWorker = underTest.create(randomOrdinal);
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
index 0b15cca7ef3..db549d4c779 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
@@ -24,10 +24,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
-import org.apache.commons.lang.RandomStringUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -35,6 +37,7 @@ import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.sonar.api.utils.MessageException;
import org.sonar.api.utils.System2;
import org.sonar.api.utils.internal.TestSystem2;
@@ -45,6 +48,7 @@ import org.sonar.ce.queue.InternalCeQueue;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskResult;
import org.sonar.ce.task.projectanalysis.taskprocessor.ReportTaskProcessor;
+import org.sonar.ce.task.taskprocessor.CeTaskProcessor;
import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
import org.sonar.db.ce.CeActivityDto;
@@ -53,6 +57,7 @@ import org.sonar.db.user.UserDto;
import org.sonar.db.user.UserTesting;
import org.sonar.server.organization.BillingValidations;
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -85,19 +90,19 @@ public class CeWorkerImplTest {
private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
private CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class);
private CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class);
- private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
private ArgumentCaptor<String> workerUuidCaptor = ArgumentCaptor.forClass(String.class);
private int randomOrdinal = new Random().nextInt(50);
private String workerUuid = UUID.randomUUID().toString();
- private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController,
+ private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController,
executionListener1, executionListener2);
- private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController);
+ private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController);
private InOrder inOrder = Mockito.inOrder(taskProcessor, queue, executionListener1, executionListener2);
private final CeTask.User submitter = new CeTask.User("UUID_USER_1", "LOGIN_1");
@Before
public void setUp() {
- when(enabledCeWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true);
+ when(ceWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true);
}
@Test
@@ -105,20 +110,20 @@ public class CeWorkerImplTest {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Ordinal must be >= 0");
- new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, enabledCeWorkerController);
+ new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, ceWorkerController);
}
@Test
public void getUUID_must_return_the_uuid_of_constructor() {
String uuid = UUID.randomUUID().toString();
- CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController);
+ CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, ceWorkerController);
assertThat(underTest.getUUID()).isEqualTo(uuid);
}
@Test
public void worker_disabled() throws Exception {
- reset(enabledCeWorkerController);
- when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+ reset(ceWorkerController);
+ when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
assertThat(underTest.call()).isEqualTo(DISABLED);
@@ -127,8 +132,8 @@ public class CeWorkerImplTest {
@Test
public void worker_disabled_no_listener() throws Exception {
- reset(enabledCeWorkerController);
- when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+ reset(ceWorkerController);
+ when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
assertThat(underTestNoListener.call()).isEqualTo(DISABLED);
@@ -391,7 +396,7 @@ public class CeWorkerImplTest {
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
when(queue.peek(anyString())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
@@ -405,7 +410,7 @@ public class CeWorkerImplTest {
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
when(queue.peek(anyString())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
@@ -420,7 +425,7 @@ public class CeWorkerImplTest {
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
CeTask ceTask = createCeTask(submitter);
when(queue.peek(anyString())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
@@ -437,10 +442,10 @@ public class CeWorkerImplTest {
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_worker_is_disabled() throws Exception {
- reset(enabledCeWorkerController);
- when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+ reset(ceWorkerController);
+ when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
Thread newThread = createThreadNameVerifyingThread(threadName);
newThread.start();
@@ -561,6 +566,166 @@ public class CeWorkerImplTest {
assertThat(((Exception) arg1).getSuppressed()).containsOnly(ex);
}
+ @Test
+ public void isExecutedBy_returns_false_when_no_interaction_with_instance() {
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ }
+
+ @Test
+ public void isExecutedBy_returns_false_unless_a_thread_is_currently_calling_call() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ // mock long running peek(String) call => Thread is executing call() but not running a task
+ when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Optional.empty();
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isTrue();
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isFalse();
+ }
+
+ @Test
+ public void isExecutedBy_returns_false_unless_a_thread_is_currently_executing_a_task() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ String taskType = randomAlphabetic(12);
+ CeTask ceTask = mock(CeTask.class);
+ when(ceTask.getType()).thenReturn(taskType);
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
+ @CheckForNull
+ @Override
+ public CeTaskResult process(CeTask task) {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isTrue();
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isFalse();
+ }
+
+ @Test
+ public void getCurrentTask_returns_empty_when_no_interaction_with_instance() {
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ }
+
+ @Test
+ public void getCurrentTask_returns_empty_when_a_thread_is_currently_calling_call_but_not_executing_a_task() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ // mock long running peek(String) call => Thread is executing call() but not running a task
+ when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Optional.empty();
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ }
+
+ @Test
+ public void getCurrentTask_returns_empty_unless_a_thread_is_currently_executing_a_task() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ String taskType = randomAlphabetic(12);
+ CeTask ceTask = mock(CeTask.class);
+ when(ceTask.getType()).thenReturn(taskType);
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
+
+ @CheckForNull
+ @Override
+ public CeTaskResult process(CeTask task) {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.getCurrentTask()).contains(ceTask);
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ }
+
+ private Thread callCallInNewThread(CeWorker underTest) {
+ return new Thread(() -> {
+ try {
+ underTest.call();
+ } catch (Exception e) {
+ throw new RuntimeException("call to call() failed and this is unexpected. Fix the UT.", e);
+ }
+ });
+ }
+
private Thread createThreadNameVerifyingThread(String threadName) {
return new Thread(() -> {
verifyUnchangedThreadName(threadName);
@@ -597,7 +762,7 @@ public class CeWorkerImplTest {
.setCharacteristics(characteristicMap)
.build();
}
-
+
private UserDto insertRandomUser() {
UserDto userDto = UserTesting.newUserDto();
db.getDbClient().userDao().insert(session, userDto);
@@ -617,4 +782,11 @@ public class CeWorkerImplTest {
doThrow(t).when(taskProcessor).process(task);
return t;
}
+
+ private static abstract class SimpleCeTaskProcessor implements CeTaskProcessor {
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ throw new UnsupportedOperationException("getHandledCeTaskTypes should not be called");
+ }
+ }
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java
new file mode 100644
index 00000000000..3bb0a82186a
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java
@@ -0,0 +1,55 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+class ComputingThread extends Thread {
+ private boolean kill = false;
+
+ public ComputingThread(String name) {
+ setName(name);
+ }
+
+ private long fibo(int i) {
+ if (kill) {
+ return i;
+ }
+ if (i == 0) {
+ return 0;
+ }
+ if (i == 1) {
+ return 1;
+ }
+ return fibo(i - 1) + fibo(i - 2);
+ }
+
+ @Override
+ public void run() {
+ for (int i = 2; i < 50; i++) {
+ fibo(i);
+ if (kill) {
+ break;
+ }
+ }
+ }
+
+ public void kill() {
+ this.kill = true;
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java
new file mode 100644
index 00000000000..c8f40e9e9fb
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java
@@ -0,0 +1,78 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class SimpleCeTaskInterrupterTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private SimpleCeTaskInterrupter underTest = new SimpleCeTaskInterrupter();
+
+ @Test
+ public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException {
+ String threadName = randomAlphabetic(30);
+ ComputingThread t = new ComputingThread(threadName);
+
+ try {
+ t.start();
+
+ // will not fail
+ underTest.check(t);
+
+ t.interrupt();
+
+ expectedException.expect(CeTaskCanceledException.class);
+ expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+ underTest.check(t);
+ } finally {
+ t.kill();
+ t.join(1_000);
+ }
+ }
+
+ @Test
+ public void onStart_has_no_effect() {
+ CeTask ceTask = mock(CeTask.class);
+
+ underTest.onStart(ceTask);
+
+ verifyZeroInteractions(ceTask);
+ }
+
+ @Test
+ public void onEnd_has_no_effect() {
+ CeTask ceTask = mock(CeTask.class);
+
+ underTest.onEnd(ceTask);
+
+ verifyZeroInteractions(ceTask);
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java
new file mode 100644
index 00000000000..a49c8120565
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java
@@ -0,0 +1,223 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+import org.sonar.ce.task.CeTaskTimeoutException;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TimeoutCeTaskInterrupterTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private int timeoutInSeconds = 1 + new Random().nextInt(20);
+ private int timeoutInMs = timeoutInSeconds * 1_000;
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+ private System2 system2 = mock(System2.class);
+ private CeWorker ceWorker = mock(CeWorker.class);
+ private CeTask ceTask = mock(CeTask.class);
+ private TimeoutCeTaskInterrupter underTest = new TimeoutCeTaskInterrupter(timeoutInMs, ceWorkerController, system2);
+
+ @Test
+ public void constructor_fails_with_IAE_if_timeout_is_0() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("threshold must be >= 1");
+
+ new TimeoutCeTaskInterrupter(0, ceWorkerController, system2);
+ }
+
+ @Test
+ public void constructor_fails_with_IAE_if_timeout_is_less_than_0() {
+ long timeout = - (1 + new Random().nextInt(299));
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("threshold must be >= 1");
+
+ new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2);
+ }
+
+ @Test
+ public void constructor_log_timeout_in_ms_at_INFO_level() {
+ int timeout = 1 + new Random().nextInt(9_999);
+
+ new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2);
+
+ assertThat(logTester.logs()).hasSize(1);
+ assertThat(logTester.logs(LoggerLevel.INFO))
+ .containsExactly("Compute Engine Task timeout enabled: " + timeout + " ms");
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker() {
+ Thread t = newThreadWithRandomName();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'");
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker_with_no_current_CeTask() {
+ Thread t = newThreadWithRandomName();
+ mockWorkerOnThread(t, ceWorker);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'");
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_has_not_been_called_on_it() {
+ String taskUuid = randomAlphabetic(15);
+ Thread t = new Thread();
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ when(ceTask.getUuid()).thenReturn(taskUuid);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("No start time recorded for task " + taskUuid);
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_and_on_end_have_not_been_called_on_it() {
+ String taskUuid = randomAlphabetic(15);
+ Thread t = new Thread();
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ when(ceTask.getUuid()).thenReturn(taskUuid);
+ underTest.onStart(this.ceTask);
+ underTest.onEnd(this.ceTask);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("No start time recorded for task " + taskUuid);
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException {
+ String threadName = randomAlphabetic(30);
+ ComputingThread t = new ComputingThread(threadName);
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ underTest.onStart(ceTask);
+
+ try {
+ t.start();
+
+ // will not fail as thread is not interrupted nor timed out
+ underTest.check(t);
+
+ t.interrupt();
+
+ expectedException.expect(CeTaskCanceledException.class);
+ expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+ underTest.check(t);
+ } finally {
+ t.kill();
+ t.join(1_000);
+ }
+ }
+
+ @Test
+ public void check_throws_CeTaskTimeoutException_if_check_called_later_than_timeout_milliseconds_after_on_start() {
+ Thread thread = newThreadWithRandomName();
+ mockWorkerOnThread(thread, ceWorker);
+ mockWorkerWithTask(ceTask);
+ long now = 3_776_663_999L;
+ when(system2.now()).thenReturn(now);
+ underTest.onStart(ceTask);
+
+ // timeout not passed => no exception thrown
+ int beforeTimeoutOffset = 1 + new Random().nextInt(timeoutInMs - 1);
+ when(system2.now()).thenReturn(now + timeoutInMs - beforeTimeoutOffset);
+ underTest.check(thread);
+
+ int afterTimeoutOffset = new Random().nextInt(7_112);
+ when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset);
+
+ expectedException.expect(CeTaskTimeoutException.class);
+ expectedException.expectMessage("Execution of task timed out after " + (timeoutInMs + afterTimeoutOffset) + " ms");
+
+ underTest.check(thread);
+ }
+
+ @Test
+ public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted_even_if_timed_out() throws InterruptedException {
+ String threadName = randomAlphabetic(30);
+ ComputingThread t = new ComputingThread(threadName);
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ long now = 3_776_663_999L;
+ when(system2.now()).thenReturn(now);
+ underTest.onStart(ceTask);
+
+ try {
+ t.start();
+ t.interrupt();
+
+ // will not fail as thread is not interrupted nor timed out
+ int afterTimeoutOffset = new Random().nextInt(7_112);
+ when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset);
+
+ expectedException.expect(CeTaskCanceledException.class);
+ expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+ underTest.check(t);
+ } finally {
+ t.kill();
+ t.join(1_000);
+ }
+ }
+
+ private static Thread newThreadWithRandomName() {
+ String threadName = randomAlphabetic(30);
+ Thread t = new Thread();
+ t.setName(threadName);
+ return t;
+ }
+
+ private void mockWorkerOnThread(Thread t, CeWorker ceWorker) {
+ when(ceWorkerController.getCeWorkerIn(t)).thenReturn(Optional.of(ceWorker));
+ }
+
+ private void mockWorkerWithTask(CeTask ceTask) {
+ when(ceWorker.getCurrentTask()).thenReturn(Optional.of(ceTask));
+ }
+}