// If batchSize is increased then we should return a list instead of a single element
return notifications.get(0).toNotification();
+ }
+ public long count() {
+ return notificationQueueDao.count();
}
/**
MyBatis.closeQuietly(session);
}
}
+
+ public long count() {
+ SqlSession session = mybatis.openSession();
+ try {
+ return session.getMapper(NotificationQueueMapper.class).count();
+ } finally {
+ MyBatis.closeQuietly(session);
+ }
+ }
}
*/
public class NotificationQueueDto {
+ private static final String UNABLE_TO_READ_NOTIFICATION = "Unable to read notification";
private Long id;
private byte[] data;
return (Notification) result;
} catch (IOException e) {
- throw new SonarException("Unable to read notification", e);
+ throw new SonarException(UNABLE_TO_READ_NOTIFICATION, e);
} catch (ClassNotFoundException e) {
- throw new SonarException("Unable to read notification", e);
+ throw new SonarException(UNABLE_TO_READ_NOTIFICATION, e);
} finally {
IOUtils.closeQuietly(byteArrayInputStream);
List<NotificationQueueDto> findOldest(int count);
+ long count();
+
}
delete from notifications where id=#{id}
</delete>
+ <select id="count" resultType="long">
+ select count(*) from notifications
+ </select>
+
<select id="findOldest" parameterType="int" resultType="NotificationQueue">
select id, data
from notifications
assertThat(dao.findOldest(1).get(0).toNotification().getType()).isEqualTo("email");
}
+ @Test
+ public void should_count_notification_queue() {
+ NotificationQueueDto notificationQueueDto = NotificationQueueDto.toNotificationQueueDto(new Notification("email"));
+
+ assertThat(dao.count()).isEqualTo(0);
+
+ dao.insert(Arrays.asList(notificationQueueDto));
+
+ assertThat(dao.count()).isEqualTo(1);
+ }
+
@Test
public void should_delete_notification() {
setupData("should_delete_notification");
@VisibleForTesting
synchronized void processQueue() {
TIME_PROFILER.start("Processing notifications queue");
+ long start = now();
+ long lastLog = start;
+ long notifSentCount = 0;
Notification notifToSend = manager.getFromQueue();
while (notifToSend != null) {
deliver(notifToSend);
+ notifSentCount++;
if (stopping) {
break;
}
+ long now = now();
+ if (now - lastLog > 10 * 60 * 1000) {
+ long remainingNotifCount = manager.count();
+ lastLog = now;
+ long spentTimeInMinutes = (now - start) / (60 * 1000);
+ log(notifSentCount, remainingNotifCount, spentTimeInMinutes);
+ }
notifToSend = manager.getFromQueue();
}
TIME_PROFILER.stop();
}
+ @VisibleForTesting
+ void log(long notifSentCount, long remainingNotifCount, long spentTimeInMinutes) {
+ LOG.info("{} notifications sent during the past {} minutes and {} still waiting to be sent", new Object[] {notifSentCount, spentTimeInMinutes, remainingNotifCount});
+ }
+
+ @VisibleForTesting
+ long now() {
+ return System.currentTimeMillis();
+ }
+
private void deliver(Notification notification) {
LOG.debug("Delivering notification " + notification);
final SetMultimap<String, NotificationChannel> recipients = HashMultimap.create();
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
assertThat(service.getDispatchers()).hasSize(0);
}
+ @Test
+ public void shouldLogEvery10Minutes() throws InterruptedException {
+ setUpMocks(CREATOR_EVGENY, ASSIGNEE_SIMON);
+ // Emulate 2 notifications in DB
+ when(manager.getFromQueue()).thenReturn(notification).thenReturn(notification).thenReturn(null);
+ when(manager.count()).thenReturn(1L).thenReturn(0L);
+ service = spy(service);
+ // Emulate processing of each notification take 10 min to have a log each time
+ when(service.now()).thenReturn(0L).thenReturn(10 * 60 * 1000 + 1L).thenReturn(20 * 60 * 1000 + 2L);
+ service.start();
+ verify(service, timeout(100)).log(1, 1, 10);
+ verify(service, timeout(100)).log(2, 0, 20);
+ service.stop();
+ }
+
private static Answer<Object> addUser(final String user, final NotificationChannel channel) {
return addUser(user, new NotificationChannel[] {channel});
}