int getWorkerCount();
/**
- * The delay in millisecond before a {@link CeWorker} shall try and find a task
+ * The delay in millisecond before a {@link org.sonar.ce.taskprocessor.CeWorker} shall try and find a task
* to process when it's previous execution had nothing to do.
*/
long getQueuePollingDelay();
/**
- * Delay before running job that cancels worn out tasks for the first time (in minutes).
+ * Delay before running job that cleans CE tasks for the first time (in minutes).
*/
- long getCancelWornOutsInitialDelay();
+ long getCleanCeTasksInitialDelay();
/**
- * Delay between the end of a run and the start of the next one of the job that cancels worn out CE tasks (in minutes).
+ * Delay between the end of a run and the start of the next one of the job that cleans CE tasks (in minutes).
*/
- long getCancelWornOutsDelay();
+ long getCleanCeTasksDelay();
+
}
}
@Override
- public long getCancelWornOutsInitialDelay() {
+ public long getCleanCeTasksInitialDelay() {
return CANCEL_WORN_OUTS_INITIAL_DELAY;
}
@Override
- public long getCancelWornOutsDelay() {
+ public long getCleanCeTasksDelay() {
return CANCEL_WORN_OUTS_DELAY;
}
+
}
}
@Test
- public void getCancelWornOutsInitialDelay_returns_1() {
- assertThat(new CeConfigurationImpl(settings).getCancelWornOutsInitialDelay())
+ public void getCleanCeTasksInitialDelay_returns_1() {
+ assertThat(new CeConfigurationImpl(settings).getCleanCeTasksInitialDelay())
.isEqualTo(1L);
}
@Test
- public void getCancelWornOutsDelay_returns_10() {
- assertThat(new CeConfigurationImpl(settings).getCancelWornOutsDelay())
+ public void getCleanCeTasksDelay_returns_10() {
+ assertThat(new CeConfigurationImpl(settings).getCleanCeTasksDelay())
.isEqualTo(10L);
}
}
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.queue.InternalCeQueue;
private final CeCleaningExecutorService executorService;
private final CeConfiguration ceConfiguration;
private final InternalCeQueue internalCeQueue;
+ private final CeDistributedInformation ceDistributedInformation;
- public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, InternalCeQueue internalCeQueue) {
+ public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration,
+ InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) {
this.executorService = executorService;
this.internalCeQueue = internalCeQueue;
this.ceConfiguration = ceConfiguration;
+ this.ceDistributedInformation = ceDistributedInformation;
}
@Override
public void startScheduling() {
- executorService.scheduleWithFixedDelay(this::cancelWornOuts,
- ceConfiguration.getCancelWornOutsInitialDelay(),
- ceConfiguration.getCancelWornOutsDelay(),
+ executorService.scheduleWithFixedDelay(this::cleanCeQueue,
+ ceConfiguration.getCleanCeTasksInitialDelay(),
+ ceConfiguration.getCleanCeTasksDelay(),
MINUTES);
}
+ private void cleanCeQueue() {
+ cancelWornOuts();
+ resetTasksWithUnknownWorkerUUIDs();
+ }
+
private void cancelWornOuts() {
try {
- LOG.info("Deleting any worn out task");
+ LOG.debug("Deleting any worn out task");
internalCeQueue.cancelWornOuts();
} catch (Exception e) {
LOG.warn("Failed to cancel worn out tasks", e);
}
}
+
+ private void resetTasksWithUnknownWorkerUUIDs() {
+ try {
+ LOG.debug("Resetting state of tasks with unknown worker UUIDs");
+ internalCeQueue.resetTasksWithUnknownWorkerUUIDs(ceDistributedInformation.getWorkerUUIDs());
+ } catch (Exception e) {
+ LOG.warn("Failed to reset tasks with unknown worker UUIDs", e);
+ }
+ }
}
package org.sonar.ce.queue;
import java.util.Optional;
+import java.util.Set;
import javax.annotation.Nullable;
-import org.sonar.ce.queue.CeQueue;
-import org.sonar.ce.queue.CeTask;
-import org.sonar.ce.queue.CeTaskResult;
import org.sonar.db.ce.CeActivityDto.Status;
/**
void cancelWornOuts();
+ void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs);
+
void pausePeek();
void resumePeek();
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
queueStatus.addInProgress();
}
return Optional.ofNullable(task);
-
}
}
}
}
+ @Override
+ public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs);
+ dbSession.commit();
+ }
+ }
+
@Override
public void pausePeek() {
this.peekPaused.set(true);
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
+import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.queue.InternalCeQueue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class CeCleaningSchedulerImplTest {
@Test
- public void startScheduling_does_not_fail_if_cancelWornOuts_send_even_an_Exception() {
+ public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
@Override
command.run();
return null;
}
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue);
- doThrow(new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts")).when(mockedInternalCeQueue).cancelWornOuts();
+ }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
+ doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
+ doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
underTest.startScheduling();
verify(mockedInternalCeQueue).cancelWornOuts();
+ verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
}
@Test
- public void startScheduling_fails_if_cancelWornOuts_send_even_an_Error() {
+ public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
@Override
command.run();
return null;
}
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue);
+ }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();
} catch (Error e) {
assertThat(e).isSameAs(expected);
}
+ verify(mockedInternalCeQueue).cancelWornOuts();
}
@Test
- public void startScheduling_calls_cancelWornOuts_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
+ public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- long initialDelay = 10L;
- long delay = 20L;
- CeConfiguration mockedCeConfiguration = mockCeConfiguration(initialDelay, delay);
+ CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ // synchronously execute command
+ command.run();
+ return null;
+ }
+ }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ Error expected = new Error("faking Error thrown by cancelWornOuts");
+ doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
+
+ try {
+ underTest.startScheduling();
+ fail("the error should have been thrown");
+ } catch (Error e) {
+ assertThat(e).isSameAs(expected);
+ }
+ verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
+ }
+
+ @Test
+ public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
+ InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+ long wornOutInitialDelay = 10L;
+ long wornOutDelay = 20L;
+ long unknownWorkerInitialDelay = 11L;
+ long unknownWorkerDelay = 21L;
+ CeConfiguration mockedCeConfiguration = mockCeConfiguration(wornOutInitialDelay, wornOutDelay);
CeCleaningAdapter executorService = new CeCleaningAdapter() {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initDelay, long period, TimeUnit unit) {
- assertThat(initDelay).isEqualTo(initialDelay);
- assertThat(period).isEqualTo(delay);
- assertThat(unit).isEqualTo(TimeUnit.MINUTES);
+ schedulerCounter++;
+ switch(schedulerCounter) {
+ case 1:
+ assertThat(initDelay).isEqualTo(wornOutInitialDelay);
+ assertThat(period).isEqualTo(wornOutDelay);
+ assertThat(unit).isEqualTo(TimeUnit.MINUTES);
+ break;
+ case 2:
+ assertThat(initDelay).isEqualTo(unknownWorkerInitialDelay);
+ assertThat(period).isEqualTo(unknownWorkerDelay);
+ assertThat(unit).isEqualTo(TimeUnit.MINUTES);
+ break;
+ default:
+ fail("Unknwon call of scheduleWithFixedDelay");
+ }
// synchronously execute command
command.run();
return null;
}
};
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue);
+ CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class));
underTest.startScheduling();
-
+ assertThat(executorService.schedulerCounter).isEqualTo(1);
verify(mockedInternalCeQueue).cancelWornOuts();
}
- private CeConfiguration mockCeConfiguration(long initialDelay, long delay) {
+ private CeConfiguration mockCeConfiguration(long cleanCeTasksInitialDelay, long cleanCeTasksDelay) {
CeConfiguration mockedCeConfiguration = mock(CeConfiguration.class);
- when(mockedCeConfiguration.getCancelWornOutsInitialDelay()).thenReturn(initialDelay);
- when(mockedCeConfiguration.getCancelWornOutsDelay()).thenReturn(delay);
+ when(mockedCeConfiguration.getCleanCeTasksInitialDelay()).thenReturn(cleanCeTasksInitialDelay);
+ when(mockedCeConfiguration.getCleanCeTasksDelay()).thenReturn(cleanCeTasksDelay);
return mockedCeConfiguration;
}
* method.
*/
private static class CeCleaningAdapter implements CeCleaningExecutorService {
+ protected int schedulerCounter = 0;
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
}
@Override
- public long getCancelWornOutsInitialDelay() {
+ public long getCleanCeTasksInitialDelay() {
return cancelWornOutsInitialDelay;
}
- public void setCancelWornOutsInitialDelay(long cancelWornOutsInitialDelay) {
+ public void setCleanCeTasksInitialDelay(long cancelWornOutsInitialDelay) {
checkArgument(cancelWornOutsInitialDelay > 0, "cancel worn-outs polling initial delay must be >= 1");
this.cancelWornOutsInitialDelay = cancelWornOutsInitialDelay;
}
@Override
- public long getCancelWornOutsDelay() {
+ public long getCleanCeTasksDelay() {
return cancelWornOutsDelay;
}
- public void setCancelWornOutsDelay(long cancelWornOutsDelay) {
+ public void setCleanCeTasksDelay(long cancelWornOutsDelay) {
checkArgument(cancelWornOutsDelay > 0, "cancel worn-outs polling delay must be >= 1");
this.cancelWornOutsDelay = cancelWornOutsDelay;
}
}
@Override
- public long getCancelWornOutsInitialDelay() {
- throw new UnsupportedOperationException("getCancelWornOutsInitialDelay is not implemented");
+ public long getCleanCeTasksInitialDelay() {
+ throw new UnsupportedOperationException("getCleanCeTasksInitialDelay is not implemented");
}
@Override
- public long getCancelWornOutsDelay() {
- throw new UnsupportedOperationException("getCancelWornOutsDelay is not implemented");
+ public long getCleanCeTasksDelay() {
+ throw new UnsupportedOperationException("getCleanCeTasksDelay is not implemented");
}
+
}
@CheckForNull
*/
package org.sonar.ce.queue;
+import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;
underTest.cancelWornOuts();
- verifyUnmodifiedByCancelWornOuts(u1);
- verifyUnmodifiedByCancelWornOuts(u2);
+ verifyUnmodified(u1);
+ verifyUnmodified(u2);
verifyCanceled(u3);
verifyCanceled(u4);
- verifyUnmodifiedByCancelWornOuts(u5);
- verifyUnmodifiedByCancelWornOuts(u6);
- verifyUnmodifiedByCancelWornOuts(u7);
- verifyUnmodifiedByCancelWornOuts(u8);
+ verifyUnmodified(u5);
+ verifyUnmodified(u6);
+ verifyUnmodified(u7);
+ verifyUnmodified(u8);
}
- private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) {
+ @Test
+ public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() {
+ CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
+ CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+ CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
+ CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
+ CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
+ CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+ CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
+ CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
+
+ underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3"));
+
+ // Pending tasks must not be modified even if a workerUUID is not present
+ verifyUnmodified(u1);
+ verifyUnmodified(u2);
+ verifyUnmodified(u3);
+ verifyUnmodified(u4);
+
+ // Unknown worker : null, "worker1"
+ verifyReset(u5);
+ verifyReset(u6);
+
+ // Known workers : "worker2", "worker3"
+ verifyUnmodified(u7);
+ verifyUnmodified(u8);
+ }
+
+ @Test
+ public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() {
+ CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
+ CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+ CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
+ CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
+ CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
+ CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+ CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
+ CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
+
+ underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of());
+
+ // Pending tasks must not be modified even if a workerUUID is not present
+ verifyUnmodified(u1);
+ verifyUnmodified(u2);
+ verifyUnmodified(u3);
+ verifyUnmodified(u4);
+
+ // Unknown worker : null, "worker1"
+ verifyReset(u5);
+ verifyReset(u6);
+ verifyReset(u7);
+ verifyReset(u8);
+ }
+
+ @Test
+ public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() {
+ CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
+ CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+ CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
+ CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
+ CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
+ CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+ CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
+ CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
+
+ underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001"));
+
+ // Pending tasks must not be modified even if a workerUUID is not present
+ verifyUnmodified(u1);
+ verifyUnmodified(u2);
+ verifyUnmodified(u3);
+ verifyUnmodified(u4);
+
+ // Unknown worker : null, "worker1"
+ verifyReset(u5);
+ verifyReset(u6);
+ verifyReset(u7);
+ verifyReset(u8);
+ }
+
+ private void verifyReset(CeQueueDto original) {
+ CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
+ // We do not touch ExecutionCount nor CreatedAt
+ assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+ assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+
+ // Status must have changed to PENDING and must not be equal to previous status
+ assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
+ // UpdatedAt must have been updated
+ assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt());
+ // StartedAt must be null
+ assertThat(dto.getStartedAt()).isNull();
+ // WorkerUuid must be null
+ assertThat(dto.getWorkerUuid()).isNull();
+ }
+
+ private void verifyUnmodified(CeQueueDto original) {
CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(original.getStatus());
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.ibatis.session.RowBounds;
import org.sonar.api.utils.System2;
import org.sonar.db.Pagination;
import static java.util.Collections.emptyList;
+import static org.sonar.db.DatabaseUtils.executeLargeUpdates;
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
return mapper(dbSession).selectPendingByMinimumExecutionCount(minExecutionCount);
}
+ public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) {
+ if (knownWorkerUUIDs.isEmpty()) {
+ mapper(dbSession).resetAllInProgressTasks(system2.now());
+ } else {
+ // executeLargeUpdates won't call the SQL command if knownWorkerUUIDs is empty
+ executeLargeUpdates(knownWorkerUUIDs,
+ (Consumer<List<String>>) uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now())
+ );
+ }
+ }
+
public CeQueueDto insert(DbSession session, CeQueueDto dto) {
if (dto.getCreatedAt() == 0L || dto.getUpdatedAt() == 0L) {
long now = system2.now();
*/
List<CeQueueDto> selectPendingByMinimumExecutionCount(@Param("minExecutionCount") int minExecutionCount);
+ /**
+ * Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs}
+ */
+ void resetTasksWithUnknownWorkerUUIDs(@Param("knownWorkerUUIDs") List<String> knownWorkerUUIDs, @Param("updatedAt") long updatedAt);
+
+ /**
+ * Reset all IN_PROGRESS TASKS
+ */
+ void resetAllInProgressTasks(@Param("updatedAt") long updatedAt);
+
+
int countByStatusAndComponentUuid(@Param("status") CeQueueDto.Status status, @Nullable @Param("componentUuid") String componentUuid);
void insert(CeQueueDto dto);
uuid=#{uuid,jdbcType=VARCHAR}
</delete>
+ <update id="resetTasksWithUnknownWorkerUUIDs">
+ update ce_queue set
+ status='PENDING',
+ worker_uuid=NULL,
+ started_at=NULL,
+ updated_at=#{updatedAt,jdbcType=BIGINT}
+ where
+ status = 'IN_PROGRESS'
+ and (
+ worker_uuid is NULL
+ or worker_uuid not in
+ <foreach collection="knownWorkerUUIDs" open="(" close=")" item="workerUUID" separator=",">
+ #{workerUUID,jdbcType=VARCHAR}
+ </foreach>
+ )
+ </update>
+
+ <update id="resetAllInProgressTasks">
+ update ce_queue set
+ status='PENDING',
+ worker_uuid=NULL,
+ started_at=NULL,
+ updated_at=#{updatedAt,jdbcType=BIGINT}
+ where
+ status = 'IN_PROGRESS'
+ </update>
</mapper>
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
verifyUnchangedByResetToPendingForWorker(o4);
}
+
+ @Test
+ public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() {
+ long startedAt = 2_099_888L;
+ CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
+ CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
+ CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
+ CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
+ CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
+ CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
+ CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
+ CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
+
+ underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of());
+
+ verifyResetByResetTasks(u1);
+ verifyUnchangedByResetToPendingForWorker(u2);
+ verifyUnchangedByResetToPendingForWorker(u3);
+ verifyResetByResetTasks(u4);
+ verifyResetByResetTasks(o1);
+ verifyUnchangedByResetToPendingForWorker(o2);
+ verifyUnchangedByResetToPendingForWorker(o3);
+ verifyResetByResetTasks(o4);
+ }
+
+ @Test
+ public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() {
+ long startedAt = 2_099_888L;
+ CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
+ CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
+ CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
+ CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
+ CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
+ CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
+ CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
+ CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
+
+ underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown"));
+
+ verifyUnchangedByResetToPendingForWorker(u1);
+ verifyUnchangedByResetToPendingForWorker(u2);
+ verifyUnchangedByResetToPendingForWorker(u3);
+ verifyUnchangedByResetToPendingForWorker(u4);
+ verifyResetByResetTasks(o1);
+ verifyUnchangedByResetToPendingForWorker(o2);
+ verifyUnchangedByResetToPendingForWorker(o3);
+ verifyResetByResetTasks(o4);
+ }
+
+ private void verifyResetByResetTasks(CeQueueDto original) {
+ CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
+ assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+ assertThat(dto.getStartedAt()).isNull();
+ assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+ assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
+ assertThat(dto.getWorkerUuid()).isNull();
+ }
+
private void verifyResetToPendingForWorker(CeQueueDto original) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);