diff options
Diffstat (limited to 'modules/queue/unique_queue_disk_channel_test.go')
-rw-r--r-- | modules/queue/unique_queue_disk_channel_test.go | 55 |
1 files changed, 29 insertions, 26 deletions
diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index e2fe4aceee..11a1d4b88d 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -4,9 +4,9 @@ package queue import ( - "os" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -16,10 +16,7 @@ import ( ) func TestPersistableChannelUniqueQueue(t *testing.T) { - if os.Getenv("CI") != "" { - t.Skip("Skipping because test is flaky on CI") - } - + // Create a temporary directory for the queue tmpDir := t.TempDir() _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) @@ -100,7 +97,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { executedInitial := map[string][]string{} hasInitial := map[string][]string{} - fillQueue := func(name string, done chan struct{}) { + fillQueue := func(name string, done chan int64) { t.Run("Initial Filling: "+name, func(t *testing.T) { lock := sync.Mutex{} @@ -157,33 +154,39 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) mapLock.Unlock() }) + mapLock.Lock() + count := int64(len(hasInitial[name])) + mapLock.Unlock() + done <- count close(done) } - doneA := make(chan struct{}) - doneB := make(chan struct{}) + hasQueueAChan := make(chan int64) + hasQueueBChan := make(chan int64) - go fillQueue("QueueA", doneA) - go fillQueue("QueueB", doneB) + go fillQueue("QueueA", hasQueueAChan) + go fillQueue("QueueB", hasQueueBChan) - <-doneA - <-doneB + hasA := <-hasQueueAChan + hasB := <-hasQueueBChan executedEmpty := map[string][]string{} hasEmpty := map[string][]string{} - emptyQueue := func(name string, done chan struct{}) { + emptyQueue := func(name string, numInQueue int64, done chan struct{}) { t.Run("Empty Queue: "+name, func(t *testing.T) { lock := sync.Mutex{} stop := make(chan struct{}) // collect the tasks that have been executed + atomicCount := int64(0) handle := func(data ...Data) []Data { lock.Lock() for _, datum := range data { mapLock.Lock() executedEmpty[name] = append(executedEmpty[name], datum.(string)) mapLock.Unlock() - if datum.(string) == "final" { + count := atomic.AddInt64(&atomicCount, 1) + if count >= numInQueue { close(stop) } } @@ -217,11 +220,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { close(done) } - doneA = make(chan struct{}) - doneB = make(chan struct{}) + doneA := make(chan struct{}) + doneB := make(chan struct{}) - go emptyQueue("QueueA", doneA) - go emptyQueue("QueueB", doneB) + go emptyQueue("QueueA", hasA, doneA) + go emptyQueue("QueueB", hasB, doneB) <-doneA <-doneB @@ -237,20 +240,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { hasEmpty = map[string][]string{} mapLock.Unlock() - doneA = make(chan struct{}) - doneB = make(chan struct{}) + hasQueueAChan = make(chan int64) + hasQueueBChan = make(chan int64) - go fillQueue("QueueA", doneA) - go fillQueue("QueueB", doneB) + go fillQueue("QueueA", hasQueueAChan) + go fillQueue("QueueB", hasQueueBChan) - <-doneA - <-doneB + hasA = <-hasQueueAChan + hasB = <-hasQueueBChan doneA = make(chan struct{}) doneB = make(chan struct{}) - go emptyQueue("QueueA", doneA) - go emptyQueue("QueueB", doneB) + go emptyQueue("QueueA", hasA, doneA) + go emptyQueue("QueueB", hasB, doneB) <-doneA <-doneB |