diff options
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/bytefifo.go | 18 | ||||
-rw-r--r-- | modules/queue/manager.go | 6 | ||||
-rw-r--r-- | modules/queue/queue.go | 6 | ||||
-rw-r--r-- | modules/queue/queue_bytefifo.go | 217 | ||||
-rw-r--r-- | modules/queue/queue_channel.go | 76 | ||||
-rw-r--r-- | modules/queue/queue_channel_test.go | 5 | ||||
-rw-r--r-- | modules/queue/queue_disk.go | 9 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel.go | 87 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 49 | ||||
-rw-r--r-- | modules/queue/queue_disk_test.go | 9 | ||||
-rw-r--r-- | modules/queue/queue_redis.go | 20 | ||||
-rw-r--r-- | modules/queue/queue_wrapped.go | 8 | ||||
-rw-r--r-- | modules/queue/unique_queue_channel.go | 77 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk.go | 11 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk_channel.go | 88 | ||||
-rw-r--r-- | modules/queue/unique_queue_redis.go | 21 | ||||
-rw-r--r-- | modules/queue/workerpool.go | 55 |
17 files changed, 466 insertions, 296 deletions
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go index 94478e6f05..3a10c8e125 100644 --- a/modules/queue/bytefifo.go +++ b/modules/queue/bytefifo.go @@ -4,14 +4,16 @@ package queue +import "context" + // ByteFIFO defines a FIFO that takes a byte array type ByteFIFO interface { // Len returns the length of the fifo - Len() int64 + Len(ctx context.Context) int64 // PushFunc pushes data to the end of the fifo and calls the callback if it is added - PushFunc(data []byte, fn func() error) error + PushFunc(ctx context.Context, data []byte, fn func() error) error // Pop pops data from the start of the fifo - Pop() ([]byte, error) + Pop(ctx context.Context) ([]byte, error) // Close this fifo Close() error } @@ -20,7 +22,7 @@ type ByteFIFO interface { type UniqueByteFIFO interface { ByteFIFO // Has returns whether the fifo contains this data - Has(data []byte) (bool, error) + Has(ctx context.Context, data []byte) (bool, error) } var _ ByteFIFO = &DummyByteFIFO{} @@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{} type DummyByteFIFO struct{} // PushFunc returns nil -func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { +func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { return nil } // Pop returns nil -func (*DummyByteFIFO) Pop() ([]byte, error) { +func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) { return []byte{}, nil } @@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error { } // Len is always 0 -func (*DummyByteFIFO) Len() int64 { +func (*DummyByteFIFO) Len(ctx context.Context) int64 { return 0 } @@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct { } // Has always returns false -func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { +func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { return false, nil } diff --git a/modules/queue/manager.go b/modules/queue/manager.go index c3ec735af5..a6d48575ab 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -187,14 +187,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) go func(q *ManagedQueue) { - localCtx, localCancel := context.WithCancel(ctx) - pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) + localCtx, localCtxCancel := context.WithCancel(ctx) + pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true) err := flushable.FlushWithContext(localCtx) if err != nil && err != ctx.Err() { cancel() } q.CancelWorkers(pid) - localCancel() + localCtxCancel() wg.Done() }(mq) } else { diff --git a/modules/queue/queue.go b/modules/queue/queue.go index d08cba35a1..7159048c11 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -57,7 +57,7 @@ type Named interface { // Queues will handle their own contents in the Run method type Queue interface { Flushable - Run(atShutdown, atTerminate func(context.Context, func())) + Run(atShutdown, atTerminate func(func())) Push(Data) error } @@ -74,7 +74,7 @@ type DummyQueue struct { } // Run does nothing -func (*DummyQueue) Run(_, _ func(context.Context, func())) {} +func (*DummyQueue) Run(_, _ func(func())) {} // Push fakes a push of data to the queue func (*DummyQueue) Push(Data) error { @@ -122,7 +122,7 @@ type Immediate struct { } // Run does nothing -func (*Immediate) Run(_, _ func(context.Context, func())) {} +func (*Immediate) Run(_, _ func(func())) {} // Push fakes a push of data to the queue func (q *Immediate) Push(data Data) error { diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index fe1fb7807e..3ea61aad0e 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -17,8 +17,9 @@ import ( // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue type ByteFIFOQueueConfiguration struct { WorkerPoolConfiguration - Workers int - Name string + Workers int + Name string + WaitOnEmpty bool } var _ Queue = &ByteFIFOQueue{} @@ -26,14 +27,18 @@ var _ Queue = &ByteFIFOQueue{} // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool type ByteFIFOQueue struct { *WorkerPool - byteFIFO ByteFIFO - typ Type - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex + byteFIFO ByteFIFO + typ Type + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string + lock sync.Mutex + waitOnEmpty bool + pushed chan struct{} } // NewByteFIFOQueue creates a new ByteFIFOQueue @@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + waitOnEmpty: config.WaitOnEmpty, + pushed: make(chan struct{}, 1), }, nil } @@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - return q.byteFIFO.PushFunc(bs, fn) + if q.waitOnEmpty { + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + } + return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } // IsEmpty checks if the queue is empty @@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool { if !q.WorkerPool.IsEmpty() { return false } - return q.byteFIFO.Len() == 0 + return q.byteFIFO.Len(q.terminateCtx) == 0 } // Run runs the bytefifo queue -func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("%s: %s Starting", q.typ, q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) - go q.readToChan() + log.Trace("%s: %s Now running", q.typ, q.name) + q.readToChan() - log.Trace("%s: %s Waiting til closed", q.typ, q.name) - <-q.closed + <-q.shutdownCtx.Done() log.Trace("%s: %s Waiting til done", q.typ, q.name) q.Wait() log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - q.CleanUp(ctx) - cancel() + q.CleanUp(q.terminateCtx) + q.terminateCtxCancel() } +const maxBackOffTime = time.Second * 3 + func (q *ByteFIFOQueue) readToChan() { // handle quick cancels select { - case <-q.closed: + case <-q.shutdownCtx.Done(): // tell the pool to shutdown. - q.cancel() + q.baseCtxCancel() return default: } + // Default backoff values backOffTime := time.Millisecond * 100 - maxBackOffTime := time.Second * 3 - for { - success, resetBackoff := q.doPop() - if resetBackoff { - backOffTime = 100 * time.Millisecond - } - if success { +loop: + for { + err := q.doPop() + if err == errQueueEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() + case <-q.pushed: + // reset backOffTime + backOffTime = 100 * time.Millisecond + continue loop + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown whilst waiting + // Make sure the worker pool is shutdown too + q.baseCtxCancel() return - default: } - } else { + } + + // Reset the backOffTime if there is no error or an unmarshalError + if err == nil || err == errUnmarshal { + backOffTime = 100 * time.Millisecond + } + + if err != nil { + // Need to Backoff select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown whilst backing off + // Make sure the worker pool is shutdown too + q.baseCtxCancel() return case <-time.After(backOffTime): - } - backOffTime += backOffTime / 2 - if backOffTime > maxBackOffTime { - backOffTime = maxBackOffTime + // OK we've waited - so backoff a bit + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime + } + continue loop } } + select { + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown + // Make sure the worker pool is shutdown too + q.baseCtxCancel() + return + default: + continue loop + } } } -func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { +var errQueueEmpty = fmt.Errorf("empty queue") +var errEmptyBytes = fmt.Errorf("empty bytes") +var errUnmarshal = fmt.Errorf("failed to unmarshal") + +func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() defer q.lock.Unlock() - bs, err := q.byteFIFO.Pop() + bs, err := q.byteFIFO.Pop(q.shutdownCtx) if err != nil { + if err == context.Canceled { + q.baseCtxCancel() + return err + } log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - return + return err } if len(bs) == 0 { - return + if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 { + return errQueueEmpty + } + return errEmptyBytes } - resetBackoff = true - data, err := unmarshalAs(bs, q.exemplar) if err != nil { log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) - return + return errUnmarshal } log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) q.WorkerPool.Push(data) - success = true - return + return nil } // Shutdown processing from this queue func (q *ByteFIFOQueue) Shutdown() { log.Trace("%s: %s Shutting down", q.typ, q.name) - q.lock.Lock() select { - case <-q.closed: + case <-q.shutdownCtx.Done(): + return default: - close(q.closed) } - q.lock.Unlock() + q.shutdownCtxCancel() log.Debug("%s: %s Shutdown", q.typ, q.name) } // IsShutdown returns a channel which is closed when this Queue is shutdown func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} { - return q.closed + return q.shutdownCtx.Done() } // Terminate this queue and close the queue func (q *ByteFIFOQueue) Terminate() { log.Trace("%s: %s Terminating", q.typ, q.name) q.Shutdown() - q.lock.Lock() select { - case <-q.terminated: - q.lock.Unlock() + case <-q.terminateCtx.Done(): return default: } - close(q.terminated) - q.lock.Unlock() if log.IsDebug() { - log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx)) } + q.terminateCtxCancel() if err := q.byteFIFO.Close(); err != nil { log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) } @@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() { // IsTerminated returns a channel which is closed when this Queue is terminated func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} { - return q.terminated + return q.terminateCtx.Done() } var _ UniqueQueue = &ByteFIFOUniqueQueue{} @@ -240,17 +285,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun return nil, err } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) return &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, }, }, nil } @@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { if err != nil { return false, err } - return q.byteFIFO.(UniqueByteFIFO).Has(bs) + return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs) } diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index d7a11e79f5..4df64b69ee 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct { // It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { *WorkerPool - exemplar interface{} - workers int - name string + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelQueue creates a memory channel queue @@ -42,28 +46,30 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } // Run starts to run the queue -func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue @@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Shutdown processing from this queue +func (q *ChannelQueue) Shutdown() { + q.lock.Lock() + defer q.lock.Unlock() + select { + case <-q.shutdownCtx.Done(): + log.Trace("ChannelQueue: %s Already Shutting down", q.name) + return + default: + } + log.Trace("ChannelQueue: %s Shutting down", q.name) + go func() { + log.Trace("ChannelQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelQueue: %s Flushed", q.name) + }() + q.shutdownCtxCancel() + log.Debug("ChannelQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelQueue) Terminate() { + log.Trace("ChannelQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCtxCancel() + log.Debug("ChannelQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelQueue) Name() string { return q.name diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index bca81d50fd..e7abe5b50b 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "testing" "time" @@ -21,7 +20,7 @@ func TestChannelQueue(t *testing.T) { } } - nilFn := func(_ context.Context, _ func()) {} + nilFn := func(_ func()) {} queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ @@ -61,7 +60,7 @@ func TestChannelQueue_Batch(t *testing.T) { } } - nilFn := func(_ context.Context, _ func()) {} + nilFn := func(_ func()) {} queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 6c15a8e63b..911233a5d9 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "context" + "code.gitea.io/gitea/modules/nosql" "gitea.com/lunny/levelqueue" @@ -37,6 +39,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if len(config.ConnectionString) == 0 { config.ConnectionString = config.DataDir } + config.WaitOnEmpty = true byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { @@ -82,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro } // PushFunc will push data into the fifo -func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { if fn != nil { if err := fn(); err != nil { return err @@ -92,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { } // Pop pops data from the start of the fifo -func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { +func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() if err != nil && err != levelqueue.ErrNotFound { return nil, err @@ -108,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error { } // Len returns the length of the fifo -func (fifo *LevelQueueByteFIFO) Len() int64 { +func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 { return fifo.internal.Len() } diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 801fd8a122..c3a1c5781e 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error { } // Run starts to run the queue -func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) q.lock.Lock() if q.internal == nil { @@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - - go func() { - _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - }() + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go func() { + for !q.IsEmpty() { + _ = q.internal.Flush(0) + select { + case <-time.After(100 * time.Millisecond): + case <-q.internal.(*LevelQueue).shutdownCtx.Done(): + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + return + } + } + log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + }() + } else { + log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + } - log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) - q.channelQueue.cancel() - q.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) - q.channelQueue.Wait() - q.internal.(*LevelQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - atomic.AddInt64(&q.channelQueue.numInQueue, -1) - } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty @@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool { func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() + select { case <-q.closed: + q.lock.Unlock() + return default: - if q.internal != nil { - q.internal.(*LevelQueue).Shutdown() - } - close(q.closed) - log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } + q.channelQueue.Shutdown() + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() + } + close(q.closed) + q.lock.Unlock() + + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.baseCtxCancel() + q.internal.(*LevelQueue).baseCtxCancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } // Terminate this queue and close the queue @@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() { q.Shutdown() q.lock.Lock() defer q.lock.Unlock() + q.channelQueue.Terminate() if q.internal != nil { q.internal.(*LevelQueue).Terminate() } diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 93061bffc6..561f98ca90 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -5,10 +5,8 @@ package queue import ( - "context" "io/ioutil" "testing" - "time" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" @@ -32,17 +30,19 @@ func TestPersistableChannelQueue(t *testing.T) { defer util.RemoveAll(tmpDir) queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) @@ -64,13 +64,18 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Equal(t, test2.TestString, result2.TestString) assert.Equal(t, test2.TestInt, result2.TestInt) + // test1 is a testData not a *testData so will be rejected err = queue.Push(test1) assert.Error(t, err) + // Now shutdown the queue for _, callback := range queueShutdown { callback() } - time.Sleep(200 * time.Millisecond) + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + err = queue.Push(&test1) assert.NoError(t, err) err = queue.Push(&test2) @@ -80,23 +85,33 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Fail(t, "Handler processing should have stopped") default: } + + // terminate the queue for _, callback := range queueTerminate { callback() } + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + // Reopen queue queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index edaed49a52..1f884d4f8d 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "io/ioutil" "sync" "testing" @@ -49,11 +48,11 @@ func TestLevelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { lock.Lock() queueShutdown = append(queueShutdown, shutdown) lock.Unlock() - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { lock.Lock() queueTerminate = append(queueTerminate, terminate) lock.Unlock() @@ -123,11 +122,11 @@ func TestLevelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { lock.Lock() queueShutdown = append(queueShutdown, shutdown) lock.Unlock() - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { lock.Lock() queueTerminate = append(queueTerminate, terminate) lock.Unlock() diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index af2cc30335..a5fb866dc1 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -6,7 +6,6 @@ package queue import ( "context" - "fmt" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" @@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) return nil, err } - byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated")) - queue := &RedisQueue{ ByteFIFOQueue: byteFIFOQueue, } @@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{} // RedisByteFIFO represents a ByteFIFO formed from a redisClient type RedisByteFIFO struct { - ctx context.Context - client redisClient + client redisClient + queueName string } @@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) fifo := &RedisByteFIFO{ queueName: config.QueueName, } - fifo.ctx = graceful.GetManager().TerminateContext() fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil { return nil, err @@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { if fn != nil { if err := fn(); err != nil { return err } } - return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() + return fifo.client.RPush(ctx, fifo.queueName, data).Err() } // Pop pops data from the start of the fifo -func (fifo *RedisByteFIFO) Pop() ([]byte, error) { - data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() +func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) { + data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() if err == nil || err == redis.Nil { return data, nil } @@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error { } // Len returns the length of the fifo -func (fifo *RedisByteFIFO) Len() int64 { - val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result() +func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 { + val, err := fifo.client.LLen(ctx, fifo.queueName).Result() if err != nil { log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) return -1 diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 88d64e8246..ec30ab0281 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -38,7 +38,7 @@ type delayedStarter struct { } // setInternal must be called with the lock locked. -func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { +func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error { var ctx context.Context var cancel context.CancelFunc if q.timeout > 0 { @@ -49,9 +49,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h defer cancel() // Ensure we also stop at shutdown - atShutdown(ctx, func() { - cancel() - }) + atShutdown(cancel) i := 1 for q.internal == nil { @@ -221,7 +219,7 @@ func (q *WrappedQueue) IsEmpty() bool { } // Run starts to run the queue and attempts to create the internal queue -func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("WrappedQueue: %s Starting", q.name) q.lock.Lock() if q.internal == nil { diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index dec1cfc5c0..5bec67c4d3 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -28,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration // only guaranteed whilst the task is waiting in the queue. type ChannelUniqueQueue struct { *WorkerPool - lock sync.Mutex - table map[Data]bool - exemplar interface{} - workers int - name string + lock sync.Mutex + table map[Data]bool + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelUniqueQueue create a memory channel queue @@ -45,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + table: map[Data]bool{}, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { @@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } // Run starts to run the queue -func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelUniqueQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue if the data is not already in the queue @@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Shutdown processing from this queue +func (q *ChannelUniqueQueue) Shutdown() { + log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) + select { + case <-q.shutdownCtx.Done(): + return + default: + } + go func() { + log.Trace("ChannelUniqueQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelUniqueQueue: %s Flushed", q.name) + }() + q.shutdownCtxCancel() + log.Debug("ChannelUniqueQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelUniqueQueue) Terminate() { + log.Trace("ChannelUniqueQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCtxCancel() + log.Debug("ChannelUniqueQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelUniqueQueue) Name() string { return q.name diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index 8ec8848bc4..bb0eb7d950 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "context" + "code.gitea.io/gitea/modules/nosql" "gitea.com/lunny/levelqueue" @@ -41,6 +43,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, if len(config.ConnectionString) == 0 { config.ConnectionString = config.DataDir } + config.WaitOnEmpty = true byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { @@ -86,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { return fifo.internal.LPushFunc(data, fn) } // Pop pops data from the start of the fifo -func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { +func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() if err != nil && err != levelqueue.ErrNotFound { return nil, err @@ -100,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { } // Len returns the length of the fifo -func (fifo *LevelUniqueQueueByteFIFO) Len() int64 { +func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 { return fifo.internal.Len() } // Has returns whether the fifo contains this data -func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { +func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { return fifo.internal.Has(data) } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 47c4f2bdd5..65a3941519 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -36,7 +36,7 @@ type PersistableChannelUniqueQueueConfiguration struct { // task cannot be processed twice or more at the same time. Uniqueness is // only guaranteed whilst the task is waiting in the queue. type PersistableChannelUniqueQueue struct { - *ChannelUniqueQueue + channelQueue *ChannelUniqueQueue delayedStarter lock sync.Mutex closed chan struct{} @@ -85,8 +85,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac } queue := &PersistableChannelUniqueQueue{ - ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue), - closed: make(chan struct{}), + channelQueue: channelUniqueQueue.(*ChannelUniqueQueue), + closed: make(chan struct{}), } levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { @@ -138,14 +138,14 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err case <-q.closed: return q.internal.(UniqueQueue).PushFunc(data, fn) default: - return q.ChannelUniqueQueue.PushFunc(data, fn) + return q.channelQueue.PushFunc(data, fn) } } // Has will test if the queue has the data func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // This is more difficult... - has, err := q.ChannelUniqueQueue.Has(data) + has, err := q.channelQueue.Has(data) if err != nil || has { return has, err } @@ -158,7 +158,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { } // Run starts to run the queue -func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) q.lock.Lock() @@ -170,7 +170,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context log.Error("Unable push to channelled queue: %v", err) } } - }, q.exemplar) + }, q.channelQueue.exemplar) q.lock.Unlock() if err != nil { log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) @@ -179,53 +179,73 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - - go func() { - _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) - }() + if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go func() { + _ = q.internal.Flush(0) + log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelUniqueQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + }() + } else { + log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + } - log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) - q.internal.(*LevelUniqueQueue).cancel() - q.ChannelUniqueQueue.cancel() - log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) - q.ChannelUniqueQueue.Wait() - q.internal.(*LevelUniqueQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.ChannelUniqueQueue.dataChan { - _ = q.internal.Push(data) - } - log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { - return q.ChannelUniqueQueue.Flush(timeout) + return q.channelQueue.Flush(timeout) +} + +// FlushWithContext flushes the queue +func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error { + return q.channelQueue.FlushWithContext(ctx) +} + +// IsEmpty checks if a queue is empty +func (q *PersistableChannelUniqueQueue) IsEmpty() bool { + return q.channelQueue.IsEmpty() } // Shutdown processing this queue func (q *PersistableChannelUniqueQueue) Shutdown() { log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() select { case <-q.closed: + q.lock.Unlock() + return default: if q.internal != nil { q.internal.(*LevelUniqueQueue).Shutdown() } close(q.closed) + q.lock.Unlock() } + + log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).baseCtxCancel() + q.channelQueue.baseCtxCancel() + log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelUniqueQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) } diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 20a50cc1f2..7474c09665 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -5,9 +5,8 @@ package queue import ( - "fmt" + "context" - "code.gitea.io/gitea/modules/graceful" "github.com/go-redis/redis/v8" ) @@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, return nil, err } - byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated")) - queue := &RedisUniqueQueue{ ByteFIFOUniqueQueue: byteFIFOQueue, } @@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { - added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result() +func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() if err != nil { return err } @@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { return err } } - return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() + return fifo.client.RPush(ctx, fifo.queueName, data).Err() } // Pop pops data from the start of the fifo -func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { - data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() +func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { + data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() if err != nil && err != redis.Nil { return data, err } @@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { return data, nil } - err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err() + err = fifo.client.SRem(ctx, fifo.setName, data).Err() return data, err } // Has returns whether the fifo contains this data -func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) { - return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result() +func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { + return fifo.client.SIsMember(ctx, fifo.setName, data).Result() } func init() { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0f15ccac9e..0176e2e0b2 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -21,7 +21,7 @@ import ( type WorkerPool struct { lock sync.Mutex baseCtx context.Context - cancel context.CancelFunc + baseCtxCancel context.CancelFunc cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo dataChan := make(chan Data, config.QueueLength) pool := &WorkerPool{ baseCtx: ctx, - cancel: cancel, + baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, handle: handle, @@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) { } func (p *WorkerPool) zeroBoost() { - ctx, cancel := context.WithCancel(p.baseCtx) + ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() { start := time.Now() pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } + cancel = func() { mq.RemoveWorkers(pid) - cancel() - }() + } } else { log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } - cancel() - }() } p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(ctx, cancel, boost) } func (p *WorkerPool) pushBoost(data Data) { @@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) { return } p.blockTimeout *= 2 - ctx, cancel := context.WithCancel(p.baseCtx) + boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false) go func() { - <-ctx.Done() + <-boostCtx.Done() mq.RemoveWorkers(pid) - cancel() + boostCtxCancel() }() } else { log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) } go func() { <-time.After(p.boostTimeout) - cancel() + boostCtxCancel() p.lock.Lock() p.blockTimeout /= 2 p.lock.Unlock() }() p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(boostCtx, boostCtxCancel, boost) p.dataChan <- data } } @@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is mq := GetManager().GetManagedQueue(p.qid) if mq != nil { pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) - go func() { - <-ctx.Done() - mq.RemoveWorkers(pid) - cancel() - }() log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) - } else { - log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) - + return ctx, func() { + mq.RemoveWorkers(pid) + } } + log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) + return ctx, cancel } // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { ctx, cancel := p.commonRegisterWorkers(number, timeout, false) - p.addWorkers(ctx, number) + p.addWorkers(ctx, cancel, number) return cancel } // addWorkers adds workers to the pool -func (p *WorkerPool) addWorkers(ctx context.Context, number int) { +func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) { for i := 0; i < number; i++ { p.lock.Lock() if p.cond == nil { @@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) { p.numberOfWorkers-- if p.numberOfWorkers == 0 { p.cond.Broadcast() + cancel() } else if p.numberOfWorkers < 0 { // numberOfWorkers can't go negative but... log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid) p.numberOfWorkers = 0 p.cond.Broadcast() + cancel() } p.lock.Unlock() }() |