]> source.dussan.org Git - gitea.git/commitdiff
Correctly handle select on multiple channels in Queues (#22146) (#22428)
authorzeripath <art27@cantab.net>
Fri, 13 Jan 2023 20:42:42 +0000 (20:42 +0000)
committerGitHub <noreply@github.com>
Fri, 13 Jan 2023 20:42:42 +0000 (20:42 +0000)
Backport #22146

There are a few places in FlushQueueWithContext which make an incorrect
assumption about how `select` on multiple channels works.

The problem is best expressed by looking at the following example:

```go
package main

import "fmt"

func main() {
    closedChan := make(chan struct{})
    close(closedChan)
    toClose := make(chan struct{})
    count := 0

    for {
        select {
        case <-closedChan:
            count++
            fmt.Println(count)
            if count == 2 {
                close(toClose)
            }
        case <-toClose:
            return
        }
    }
}
```

This PR double-checks that the contexts are closed outside of checking
if there is data in the dataChan. It also rationalises the WorkerPool
FlushWithContext because the previous implementation failed to handle
pausing correctly. This will probably fix the underlying problem in
 #22145

Fix #22145

Signed-off-by: Andrew Thornton <art27@cantab.net>
modules/queue/queue_channel.go
modules/queue/unique_queue_channel.go
modules/queue/workerpool.go

index 028023d50032c13d49f20d92221d18ee4059c66c..3a375fdb4124cb64370fafe6540265ba6b1022b5 100644 (file)
@@ -110,32 +110,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
        return q.FlushWithContext(ctx)
 }
 
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
-       log.Trace("ChannelQueue: %d Flush", q.qid)
-       paused, _ := q.IsPausedIsResumed()
-       for {
-               select {
-               case <-paused:
-                       return nil
-               case data, ok := <-q.dataChan:
-                       if !ok {
-                               return nil
-                       }
-                       if unhandled := q.handle(data); unhandled != nil {
-                               log.Error("Unhandled Data whilst flushing queue %d", q.qid)
-                       }
-                       atomic.AddInt64(&q.numInQueue, -1)
-               case <-q.baseCtx.Done():
-                       return q.baseCtx.Err()
-               case <-ctx.Done():
-                       return ctx.Err()
-               default:
-                       return nil
-               }
-       }
-}
-
 // Shutdown processing from this queue
 func (q *ChannelQueue) Shutdown() {
        q.lock.Lock()
index d1bf7239eb945eca457f00ddc98f57c488a15c85..8458e8c52e9cf4492624ff5ffed7901c14d20833 100644 (file)
@@ -9,7 +9,6 @@ import (
        "fmt"
        "runtime/pprof"
        "sync"
-       "sync/atomic"
        "time"
 
        "code.gitea.io/gitea/modules/container"
@@ -168,35 +167,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
        return q.FlushWithContext(ctx)
 }
 
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
-       log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
-       paused, _ := q.IsPausedIsResumed()
-       for {
-               select {
-               case <-paused:
-                       return nil
-               default:
-               }
-               select {
-               case data, ok := <-q.dataChan:
-                       if !ok {
-                               return nil
-                       }
-                       if unhandled := q.handle(data); unhandled != nil {
-                               log.Error("Unhandled Data whilst flushing queue %d", q.qid)
-                       }
-                       atomic.AddInt64(&q.numInQueue, -1)
-               case <-q.baseCtx.Done():
-                       return q.baseCtx.Err()
-               case <-ctx.Done():
-                       return ctx.Err()
-               default:
-                       return nil
-               }
-       }
-}
-
 // Shutdown processing from this queue
 func (q *ChannelUniqueQueue) Shutdown() {
        log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
index bdf04a363b79fc5b6cc3cc035a8ef77395498374..9403bfddeca65de10c276009f3eb90a653e3655a 100644 (file)
@@ -464,13 +464,43 @@ func (p *WorkerPool) IsEmpty() bool {
        return atomic.LoadInt64(&p.numInQueue) == 0
 }
 
+// contextError returns either ctx.Done(), the base context's error or nil
+func (p *WorkerPool) contextError(ctx context.Context) error {
+       select {
+       case <-p.baseCtx.Done():
+               return p.baseCtx.Err()
+       case <-ctx.Done():
+               return ctx.Err()
+       default:
+               return nil
+       }
+}
+
 // FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
 // NB: The worker will not be registered with the manager.
 func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
        log.Trace("WorkerPool: %d Flush", p.qid)
+       paused, _ := p.IsPausedIsResumed()
        for {
+               // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
+               select {
+               case <-paused:
+                       // Ensure that even if paused that the cancelled error is still sent
+                       return p.contextError(ctx)
+               case <-p.baseCtx.Done():
+                       return p.baseCtx.Err()
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+               }
+
                select {
-               case data := <-p.dataChan:
+               case <-paused:
+                       return p.contextError(ctx)
+               case data, ok := <-p.dataChan:
+                       if !ok {
+                               return nil
+                       }
                        if unhandled := p.handle(data); unhandled != nil {
                                log.Error("Unhandled Data whilst flushing queue %d", p.qid)
                        }
@@ -496,6 +526,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
        paused, _ := p.IsPausedIsResumed()
        data := make([]Data, 0, p.batchLength)
        for {
+               // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
                select {
                case <-paused:
                        log.Trace("Worker for Queue %d Pausing", p.qid)
@@ -516,8 +547,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                                log.Trace("Worker shutting down")
                                return
                        }
+               case <-ctx.Done():
+                       if len(data) > 0 {
+                               log.Trace("Handling: %d data, %v", len(data), data)
+                               if unhandled := p.handle(data...); unhandled != nil {
+                                       log.Error("Unhandled Data in queue %d", p.qid)
+                               }
+                               atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+                       }
+                       log.Trace("Worker shutting down")
+                       return
                default:
                }
+
                select {
                case <-paused:
                        // go back around