// tell the pool to shutdown.
q.baseCtxCancel()
return
- case data := <-q.dataChan:
+ case data, ok := <-q.dataChan:
+ if !ok {
+ return
+ }
if err := q.PushBack(data); err != nil {
log.Error("Unable to push back data into queue %s", q.name)
}
select {
case <-paused:
return nil
- case data := <-q.dataChan:
+ case data, ok := <-q.dataChan:
+ if !ok {
+ return nil
+ }
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
"testing"
"time"
+ "code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert"
)
if pausable, ok := queue.(Pausable); ok {
pausable.Pause()
}
- pushBack = false
lock.Unlock()
return data
}
}
return nil
}
- nilFn := func(_ func()) {}
+
+ queueShutdown := []func(){}
+ queueTerminate := []func(){}
queue, err = NewChannelQueue(handle,
ChannelQueueConfiguration{
}, &testData{})
assert.NoError(t, err)
- go queue.Run(nilFn, nilFn)
+ go queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ // Shutdown and Terminate in defer
+ defer func() {
+ lock.Lock()
+ callbacks := make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ lock.Lock()
+ log.Info("Finally terminating")
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ }()
test1 := testData{"A", 1}
test2 := testData{"B", 2}
pausable.Pause()
- paused, resumed := pausable.IsPausedIsResumed()
+ paused, _ := pausable.IsPausedIsResumed()
select {
case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
assert.Nil(t, result2)
pausable.Resume()
+ _, resumed := pausable.IsPausedIsResumed()
select {
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
}
pushBack = true
lock.Unlock()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed")
return
}
queue.Push(&test1)
+ paused, _ = pausable.IsPausedIsResumed()
select {
case <-paused:
case <-handleChan:
assert.Fail(t, "handler chan should not contain test1")
return
- case <-time.After(500 * time.Millisecond):
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "queue should be paused")
return
}
- paused, resumed = pausable.IsPausedIsResumed()
+ lock.Lock()
+ pushBack = false
+ lock.Unlock()
+
+ paused, _ = pausable.IsPausedIsResumed()
select {
case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
pausable.Resume()
+ _, resumed = pausable.IsPausedIsResumed()
select {
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
}
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.channelQueue.dataChan {
- _ = q.internal.Push(data)
- atomic.AddInt64(&q.channelQueue.numInQueue, -1)
- }
- log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
+ log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ close(q.channelQueue.dataChan)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+ }
+ log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
log.Info("pausing")
pausable.Pause()
}
- pushBack = false
lock.Unlock()
return data
}
queueTerminate = append(queueTerminate, terminate)
})
+ // Shutdown and Terminate in defer
+ defer func() {
+ lock.Lock()
+ callbacks := make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ lock.Lock()
+ log.Info("Finally terminating")
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ }()
+
test1 := testData{"A", 1}
test2 := testData{"B", 2}
assert.Equal(t, test1.TestInt, result1.TestInt)
pausable.Pause()
- paused, resumed := pausable.IsPausedIsResumed()
+ paused, _ := pausable.IsPausedIsResumed()
select {
case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
assert.Nil(t, result2)
pausable.Resume()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed := pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should be resumed")
- return
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
return
}
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
+ // Set pushBack to so that the next handle will result in a Pause
lock.Lock()
pushBack = true
lock.Unlock()
- paused, resumed = pausable.IsPausedIsResumed()
+ // Ensure that we're still resumed
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed")
return
}
+ // push test1
queue.Push(&test1)
+ // Now as this is handled it should pause
+ paused, _ = pausable.IsPausedIsResumed()
+
select {
case <-paused:
case <-handleChan:
return
}
- paused, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
- assert.Fail(t, "Queue is not paused")
- return
- }
+ lock.Lock()
+ pushBack = false
+ lock.Unlock()
pausable.Resume()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
return
}
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
+ queueShutdown = queueShutdown[:0]
lock.Unlock()
// Now shutdown the queue
for _, callback := range callbacks {
lock.Lock()
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
+ queueShutdown = queueTerminate[:0]
lock.Unlock()
for _, callback := range callbacks {
callback()
case <-paused:
}
- paused, resumed = pausable.IsPausedIsResumed()
+ paused, _ = pausable.IsPausedIsResumed()
select {
case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
+ case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
default:
}
+ lock.Lock()
+ pushBack = false
+ lock.Unlock()
+
pausable.Resume()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
return
}
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
- lock.Lock()
- callbacks = make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
}
default:
}
select {
- case data := <-q.dataChan:
+ case data, ok := <-q.dataChan:
+ if !ok {
+ return nil
+ }
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
q.channelQueue.Wait()
q.internal.(*LevelUniqueQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.channelQueue.dataChan {
- _ = q.internal.Push(data)
- }
- log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
+ close(q.channelQueue.dataChan)
+ log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ }
+ log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
}