]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9040 ignore queued tasks with more than 2 execution attempts
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Tue, 28 Mar 2017 14:48:42 +0000 (16:48 +0200)
committerEric Hartmann <hartmann.eric@gmail.Com>
Thu, 27 Apr 2017 07:23:18 +0000 (09:23 +0200)
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java
server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java

index dc94239d88282bff3f92802771c075777400257e..3c630e3f0349ce91ca31674a4fda8953527ef268 100644 (file)
@@ -110,8 +110,8 @@ public class CeQueueDao implements Dao {
     return mapper(dbSession).countByStatusAndComponentUuid(status, componentUuid);
   }
 
-  public Optional<CeQueueDto> peek(DbSession session, String workerUuid) {
-    List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION);
+  public Optional<CeQueueDto> peek(DbSession session, String workerUuid, int maxExecutionCount) {
+    List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(maxExecutionCount, ONE_RESULT_PAGINATION);
     if (eligibles.isEmpty()) {
       return Optional.absent();
     }
index d605e2b8be1a7e8b43cd9073c6c64573861790cd..1f7161b7fad6016154565f445bf5696b36ac5a98 100644 (file)
@@ -36,7 +36,7 @@ public interface CeQueueMapper {
 
   int countByQuery(@Param("query") CeTaskQuery query);
 
-  List<EligibleTaskDto> selectEligibleForPeek(@Param("pagination") Pagination pagination);
+  List<EligibleTaskDto> selectEligibleForPeek(@Param("maxExecutionCount") int maxExecutionCount, @Param("pagination") Pagination pagination);
 
   @CheckForNull
   CeQueueDto selectByUuid(@Param("uuid") String uuid);
index 2d1f28c90c9202bcfa3fe0567b8e0fc8adf908bd..383fc06394db663148e15cace4ea4b311cd98d6b 100644 (file)
       ce_queue cq
     where
       cq.status='PENDING'
+      and cq.execution_count &lt; #{maxExecutionCount,jdbcType=INTEGER}
       and not exists (
         select
           1
index 2290193f241884c07416219c985abfcb55d781b1..925d8c61e3fb0777ecd46c2b70737f071d55a7ff 100644 (file)
@@ -26,12 +26,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.AlwaysIncreasingSystem2;
 import org.sonar.api.utils.internal.TestSystem2;
 import org.sonar.db.DbTester;
 
@@ -57,6 +59,7 @@ public class CeQueueDaoTest {
   private static final String WORKER_UUID_1 = "worker uuid 1";
   private static final String WORKER_UUID_2 = "worker uuid 2";
   private static final int EXECUTION_COUNT = 42;
+  private static final int MAX_EXECUTION_COUNT = 2;
 
   private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
 
@@ -67,6 +70,7 @@ public class CeQueueDaoTest {
 
   private CeQueueDao underTest = new CeQueueDao(system2);
   private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
+  private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(new AlwaysIncreasingSystem2());
 
   @Test
   public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
@@ -195,7 +199,7 @@ public class CeQueueDaoTest {
   @Test
   public void resetAllToPendingStatus_resets_startedAt() {
     assertThat(insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING).getStartedAt()).isNull();
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).get().getUuid()).isEqualTo(TASK_UUID_1);
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).get().getUuid()).isEqualTo(TASK_UUID_1);
     assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNotNull();
 
     underTest.resetAllToPendingStatus(db.getSession());
@@ -225,11 +229,11 @@ public class CeQueueDaoTest {
 
   @Test
   public void peek_none_if_no_pendings() throws Exception {
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
 
     // not pending, but in progress
     insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
   }
 
   @Test
@@ -242,7 +246,7 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
 
     // peek first one
-    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
     assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
@@ -251,7 +255,7 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
 
     // peek second one
-    peek = underTest.peek(db.getSession(), WORKER_UUID_2);
+    peek = underTest.peek(db.getSession(), WORKER_UUID_2, MAX_EXECUTION_COUNT);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
     assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
@@ -260,7 +264,7 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);
 
     // no more pendings
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
   }
 
   @Test
@@ -270,7 +274,7 @@ public class CeQueueDaoTest {
     system2.setNow(INIT_TIME + 3_000_000);
     insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
 
-    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
     assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
@@ -278,17 +282,61 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
 
     // do not peek second task as long as the first one is in progress
-    peek = underTest.peek(db.getSession(), WORKER_UUID_1);
+    peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
     assertThat(peek.isPresent()).isFalse();
 
     // first one is finished
     underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
-    peek = underTest.peek(db.getSession(), WORKER_UUID_2);
+    peek = underTest.peek(db.getSession(), WORKER_UUID_2, MAX_EXECUTION_COUNT);
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
     assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
     assertThat(peek.get().getExecutionCount()).isEqualTo(1);
   }
 
+  @Test
+  public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_0() {
+    peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(0, null);
+  }
+
+  @Test
+  public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_1() {
+    peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(1, "u0");
+  }
+
+  @Test
+  public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_2() {
+    peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(2, "u1");
+  }
+
+  @Test
+  public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_3() {
+    peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(3, "u2");
+  }
+
+  @Test
+  public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_4() {
+    peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(4, "u3");
+  }
+
+  @Test
+  public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_more_then_4() {
+    peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(4 + Math.abs(new Random().nextInt(100)), "u3");
+  }
+
+  private void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(int maxExecutionCount, @Nullable String expected) {
+    insert("u3", CeQueueDto.Status.PENDING, 3);
+    insert("u2", CeQueueDto.Status.PENDING, 2);
+    insert("u1", CeQueueDto.Status.PENDING, 1);
+    insert("u0", CeQueueDto.Status.PENDING, 0);
+
+    Optional<CeQueueDto> dto = underTest.peek(db.getSession(), WORKER_UUID_1, maxExecutionCount);
+    if (expected == null) {
+      assertThat(dto.isPresent()).isFalse();
+    } else {
+      assertThat(dto.get().getUuid()).isEqualTo(expected);
+    }
+  }
+
   @Test
   public void select_by_query() {
     // task status not in query
@@ -421,6 +469,18 @@ public class CeQueueDaoTest {
     db.commit();
   }
 
+  private CeQueueDto insert(String uuid, CeQueueDto.Status status, int executionCount) {
+    CeQueueDto dto = new CeQueueDto();
+    dto.setUuid(uuid);
+    dto.setTaskType(CeTaskTypes.REPORT);
+    dto.setStatus(status);
+    dto.setSubmitterLogin("henri");
+    dto.setExecutionCount(executionCount);
+    underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
+    db.getSession().commit();
+    return dto;
+  }
+
   private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
     CeQueueDto dto = new CeQueueDto();
     dto.setUuid(uuid);
index a0280f3a667f140f2944ab218f81a799d56405a7..265e314224bc59e6c7f64f2e46143df5a4d6ecbc 100644 (file)
@@ -45,6 +45,9 @@ public interface InternalCeQueue extends CeQueue {
    * <p>Only a single task can be peeked by project.</p>
    *
    * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
+   *
+   * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}
+   * are ignored</p>
    */
   Optional<CeTask> peek(String workerUuid);
 
index d1aaf4786eb4a7bece4c3aa832096786f23fc60c..566843818a06ab140ce56f9e2b5c24e111c476d6 100644 (file)
@@ -48,6 +48,8 @@ import static java.util.Objects.requireNonNull;
 @ComputeEngineSide
 public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
 
+  private static final int MAX_EXECUTION_COUNT = 2;
+
   private final System2 system2;
   private final DbClient dbClient;
   private final CEQueueStatus queueStatus;
@@ -71,7 +73,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
       return Optional.absent();
     }
     try (DbSession dbSession = dbClient.openSession(false)) {
-      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid);
+      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
       CeTask task = null;
       if (dto.isPresent()) {
         task = loadTask(dbSession, dto.get());
index 2c7a38560c33ebaf7f5516363ada58dfa25e16d1..6e2b2b7dc7adcf4f8db1b0a498a0cd6576c304c8 100644 (file)
@@ -46,6 +46,8 @@ import static org.hamcrest.Matchers.startsWith;
 public class CeQueueImplTest {
 
   private static final String WORKER_UUID = "workerUuid";
+  private static final int MAX_EXECUTION_COUNT = 3;
+
   private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
 
   @Rule
@@ -147,7 +149,7 @@ public class CeQueueImplTest {
 
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
 
-    dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
+    dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
 
     underTest.cancel(task.getUuid());
   }
@@ -158,7 +160,7 @@ public class CeQueueImplTest {
     CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
     CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
 
-    dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
+    dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
 
     int canceledCount = underTest.cancelAll();
     assertThat(canceledCount).isEqualTo(2);
index 0151e48d37ad8a1e78e8798eca689e13fbd1b1e9..869f3076612acac8ba413ffe5152ff7ae1c403aa 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.List;
+import java.util.Random;
 import javax.annotation.Nullable;
 import org.junit.Rule;
 import org.junit.Test;
@@ -267,20 +268,18 @@ public class InternalCeQueueImplTest {
   }
 
   @Test
-  public void peek_increases_executionCount_and_override_workerUuid_to_argument() {
+  public void peek_overrides_workerUuid_to_argument() {
     dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
       .setUuid("uuid")
       .setTaskType("foo")
       .setStatus(CeQueueDto.Status.PENDING)
-      .setWorkerUuid("must be overriden")
-      .setExecutionCount(2));
+      .setWorkerUuid("must be overriden"));
     dbTester.commit();
 
     underTest.peek(WORKER_UUID_1);
 
     CeQueueDto ceQueueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get();
     assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
-    assertThat(ceQueueDto.getExecutionCount()).isEqualTo(3);
   }
 
   @Test
@@ -292,6 +291,56 @@ public class InternalCeQueueImplTest {
     assertThat(peek.isPresent()).isFalse();
   }
 
+  @Test
+  public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() {
+    dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+        .setUuid("uuid")
+        .setTaskType("foo")
+        .setStatus(CeQueueDto.Status.PENDING)
+        .setExecutionCount(0));
+    dbTester.commit();
+
+    assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+    assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void peek_peeks_pending_tasks_with_executionCount_equal_to_1_and_increases_it() {
+    dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+        .setUuid("uuid")
+        .setTaskType("foo")
+        .setStatus(CeQueueDto.Status.PENDING)
+        .setExecutionCount(1));
+    dbTester.commit();
+
+    assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+    assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(2);
+  }
+
+  @Test
+  public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() {
+    dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+        .setUuid("uuid")
+        .setTaskType("foo")
+        .setStatus(CeQueueDto.Status.PENDING)
+        .setExecutionCount(2));
+    dbTester.commit();
+
+    assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
+  }
+
+  @Test
+  public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() {
+    dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+        .setUuid("uuid")
+        .setTaskType("foo")
+        .setStatus(CeQueueDto.Status.PENDING)
+        .setExecutionCount(2 + Math.abs(new Random().nextInt(100))));
+    dbTester.commit();
+
+    assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
+  }
+
   @Test
   public void cancel_pending() throws Exception {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");