]> source.dussan.org Git - gitea.git/commitdiff
Only attempt to flush queue if the underlying worker pool is not finished (#18593...
authorzeripath <art27@cantab.net>
Sun, 6 Feb 2022 06:55:44 +0000 (06:55 +0000)
committerGitHub <noreply@github.com>
Sun, 6 Feb 2022 06:55:44 +0000 (14:55 +0800)
* Only attempt to flush queue if the underlying worker pool is not finished (#18593)

Backport #18593

There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.

* On shutdown of Persistant Channel Queues close datachan and empty

Partial Backport #18415

Although we attempt to empty the datachan in queues - due to
races we are better off just closing the channel and forcibly emptying
it in shutdown.

Fix #18618

Signed-off-by: Andrew Thornton <art27@cantab.net>
* Move zero workers warning to debug

Fix #18617

Signed-off-by: Andrew Thornton <art27@cantab.net>
* Update modules/queue/manager.go

Co-authored-by: Gusted <williamzijl7@hotmail.com>
* Update modules/queue/manager.go

Co-authored-by: Gusted <williamzijl7@hotmail.com>
Co-authored-by: Gusted <williamzijl7@hotmail.com>
modules/queue/manager.go
modules/queue/queue_disk_channel.go
modules/queue/workerpool.go

index 23e96155a91dc792e23b7bb39cb5708a4acac3fd..310f3cd4e15bc564723e00a7ee7fcbceb239d19d 100644 (file)
@@ -72,6 +72,8 @@ type ManagedPool interface {
        BoostWorkers() int
        // SetPoolSettings sets the user updatable settings for the pool
        SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+       // Done returns a channel that will be closed when the Pool's baseCtx is closed
+       Done() <-chan struct{}
 }
 
 // ManagedQueueList implements the sort.Interface
@@ -141,7 +143,6 @@ func (m *Manager) Remove(qid int64) {
        delete(m.Queues, qid)
        m.mutex.Unlock()
        log.Trace("Queue Manager removed: QID: %d", qid)
-
 }
 
 // GetManagedQueue by qid
@@ -193,6 +194,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
                                wg.Done()
                                continue
                        }
+
+                       if pool, ok := mq.Managed.(ManagedPool); ok {
+                               // No point into flushing pools when their base's ctx is already done.
+                               select {
+                               case <-pool.Done():
+                                       wg.Done()
+                                       continue
+                               default:
+                               }
+                       }
+
                        allEmpty = false
                        if flushable, ok := mq.Managed.(Flushable); ok {
                                log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -225,7 +237,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
                wg.Wait()
        }
        return nil
-
 }
 
 // ManagedQueues returns the managed queues
index c3a1c5781ef09df4397ce19c5dd4a28646a16c2f..72f330670a3dc9edf5fdfa30b32d7430ada8daef 100644 (file)
@@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
                q.internal.(*LevelQueue).Shutdown()
                GetManager().Remove(q.internal.(*LevelQueue).qid)
        }
-
 }
 
 // Flush flushes the queue and blocks till the queue is empty
@@ -252,14 +251,13 @@ func (q *PersistableChannelQueue) Shutdown() {
        q.channelQueue.Wait()
        q.internal.(*LevelQueue).Wait()
        // Redirect all remaining data in the chan to the internal channel
-       go func() {
-               log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
-               for data := range q.channelQueue.dataChan {
-                       _ = q.internal.Push(data)
-                       atomic.AddInt64(&q.channelQueue.numInQueue, -1)
-               }
-               log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
-       }()
+       close(q.channelQueue.dataChan)
+       log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+       for data := range q.channelQueue.dataChan {
+               _ = q.internal.Push(data)
+               atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+       }
+       log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
 
        log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
 }
index 0176e2e0b2d20a467c2b5b2fe96afb76d1f476e7..dc6ff3b6331e71f51640d37867d113c0efece812 100644 (file)
@@ -65,6 +65,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
        return pool
 }
 
+// Done returns when this worker pool's base context has been cancelled
+func (p *WorkerPool) Done() <-chan struct{} {
+       return p.baseCtx.Done()
+}
+
 // Push pushes the data to the internal channel
 func (p *WorkerPool) Push(data Data) {
        atomic.AddInt64(&p.numInQueue, 1)
@@ -90,7 +95,7 @@ func (p *WorkerPool) zeroBoost() {
                boost = p.maxNumberOfWorkers - p.numberOfWorkers
        }
        if mq != nil {
-               log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
+               log.Debug("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
 
                start := time.Now()
                pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
@@ -98,7 +103,7 @@ func (p *WorkerPool) zeroBoost() {
                        mq.RemoveWorkers(pid)
                }
        } else {
-               log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
+               log.Debug("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
        }
        p.lock.Unlock()
        p.addWorkers(ctx, cancel, boost)
@@ -326,7 +331,10 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
        log.Trace("WorkerPool: %d Flush", p.qid)
        for {
                select {
-               case data := <-p.dataChan:
+               case data, ok := <-p.dataChan:
+                       if !ok {
+                               return nil
+                       }
                        p.handle(data)
                        atomic.AddInt64(&p.numInQueue, -1)
                case <-p.baseCtx.Done():
@@ -341,7 +349,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
 
 func (p *WorkerPool) doWork(ctx context.Context) {
        delay := time.Millisecond * 300
-       var data = make([]Data, 0, p.batchLength)
+       data := make([]Data, 0, p.batchLength)
        for {
                select {
                case <-ctx.Done():