]> source.dussan.org Git - gitea.git/commitdiff
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue...
authorzeripath <art27@cantab.net>
Sat, 15 May 2021 14:22:26 +0000 (15:22 +0100)
committerGitHub <noreply@github.com>
Sat, 15 May 2021 14:22:26 +0000 (16:22 +0200)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines

Coalesce the shutdownfns etc into a list of functions that get run at shutdown
rather then have them run at goroutines blocked on selects.

This may help reduce the background select/poll load in certain
configurations.

* The LevelDB queues can actually wait on empty instead of polling

Slight refactor to cause leveldb queues to wait on empty instead of polling.

* Shutdown the shadow level queue once it is empty

* Remove bytefifo additional goroutine for readToChan as it can just be run in run

* Remove additional removeWorkers goroutine for workers

* Simplify the AtShutdown and AtTerminate functions and add Channel Flusher

* Add shutdown flusher to CUQ

* move persistable channel shutdown stuff to Shutdown Fn

* Ensure that UPCQ has the correct config

* handle shutdown during the flushing

* reduce risk of race between zeroBoost and addWorkers

* prevent double shutdown

Signed-off-by: Andrew Thornton <art27@cantab.net>
24 files changed:
modules/graceful/context.go
modules/graceful/manager.go
modules/graceful/manager_unix.go
modules/graceful/manager_windows.go
modules/indexer/code/indexer.go
modules/indexer/issues/indexer.go
modules/queue/bytefifo.go
modules/queue/manager.go
modules/queue/queue.go
modules/queue/queue_bytefifo.go
modules/queue/queue_channel.go
modules/queue/queue_channel_test.go
modules/queue/queue_disk.go
modules/queue/queue_disk_channel.go
modules/queue/queue_disk_channel_test.go
modules/queue/queue_disk_test.go
modules/queue/queue_redis.go
modules/queue/queue_wrapped.go
modules/queue/unique_queue_channel.go
modules/queue/unique_queue_disk.go
modules/queue/unique_queue_disk_channel.go
modules/queue/unique_queue_redis.go
modules/queue/workerpool.go
services/pull/check_test.go

index 1ad1109b4e5bd048177c1a4ba34a9aeb384ff632..9d955329a42b994b51f5105357e5c9ec2d51845e 100644 (file)
@@ -6,17 +6,9 @@ package graceful
 
 import (
        "context"
-       "fmt"
        "time"
 )
 
-// Errors for context.Err()
-var (
-       ErrShutdown  = fmt.Errorf("Graceful Manager called Shutdown")
-       ErrHammer    = fmt.Errorf("Graceful Manager called Hammer")
-       ErrTerminate = fmt.Errorf("Graceful Manager called Terminate")
-)
-
 // ChannelContext is a context that wraps a channel and error as a context
 type ChannelContext struct {
        done <-chan struct{}
@@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} {
 // Callers using this context should ensure that they are registered as a running server
 // in order that they are waited for.
 func (g *Manager) ShutdownContext() context.Context {
-       return &ChannelContext{
-               done: g.IsShutdown(),
-               err:  ErrShutdown,
-       }
+       return g.shutdownCtx
 }
 
 // HammerContext returns a context.Context that is Done at hammer
 // Callers using this context should ensure that they are registered as a running server
 // in order that they are waited for.
 func (g *Manager) HammerContext() context.Context {
-       return &ChannelContext{
-               done: g.IsHammer(),
-               err:  ErrHammer,
-       }
+       return g.hammerCtx
 }
 
 // TerminateContext returns a context.Context that is Done at terminate
 // Callers using this context should ensure that they are registered as a terminating server
 // in order that they are waited for.
 func (g *Manager) TerminateContext() context.Context {
-       return &ChannelContext{
-               done: g.IsTerminate(),
-               err:  ErrTerminate,
-       }
+       return g.terminateCtx
 }
index 903d05ed21f4133253417bd2da0467ff8d1e8124..8c3b95c4aa74de3ef5910e386d669d1f79f449fa 100644 (file)
@@ -54,8 +54,8 @@ func InitManager(ctx context.Context) {
        })
 }
 
-// CallbackWithContext is combined runnable and context to watch to see if the caller has finished
-type CallbackWithContext func(ctx context.Context, callback func())
+// WithCallback is a runnable to call when the caller has finished
+type WithCallback func(callback func())
 
 // RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
 // After the callback to atShutdown is called and is complete, the main function must return.
@@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func())
 // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
 // - users must therefore be careful to only call these as necessary.
 // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
-type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func()))
+type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
 
 // RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
 // After the callback to atShutdown is called and is complete, the main function must return.
@@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
                        g.doShutdown()
                }
        }()
-       run(func(ctx context.Context, atShutdown func()) {
-               go func() {
-                       select {
-                       case <-g.IsShutdown():
+       run(func(atShutdown func()) {
+               g.lock.Lock()
+               defer g.lock.Unlock()
+               g.toRunAtShutdown = append(g.toRunAtShutdown,
+                       func() {
+                               defer func() {
+                                       if err := recover(); err != nil {
+                                               log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
+                                               g.doShutdown()
+                                       }
+                               }()
                                atShutdown()
-                       case <-ctx.Done():
-                               return
-                       }
-               }()
-       }, func(ctx context.Context, atTerminate func()) {
-               g.RunAtTerminate(ctx, atTerminate)
+                       })
+       }, func(atTerminate func()) {
+               g.RunAtTerminate(atTerminate)
        })
 }
 
@@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
 // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
 // The callback function provided to atTerminate must return once termination is complete.
 // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
-type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext)
+type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback)
 
 // RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks
 // After the atShutdown channel is closed, the main function must return once shutdown is complete.
@@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) {
                        g.doShutdown()
                }
        }()
-       run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) {
-               g.RunAtTerminate(ctx, atTerminate)
+       run(g.IsShutdown(), func(atTerminate func()) {
+               g.RunAtTerminate(atTerminate)
        })
 }
 
@@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
 }
 
 // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
-func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) {
+func (g *Manager) RunAtTerminate(terminate func()) {
        g.terminateWaitGroup.Add(1)
-       go func() {
-               defer g.terminateWaitGroup.Done()
-               defer func() {
-                       if err := recover(); err != nil {
-                               log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
-                       }
-               }()
-               select {
-               case <-g.IsTerminate():
+       g.lock.Lock()
+       defer g.lock.Unlock()
+       g.toRunAtTerminate = append(g.toRunAtTerminate,
+               func() {
+                       defer g.terminateWaitGroup.Done()
+                       defer func() {
+                               if err := recover(); err != nil {
+                                       log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
+                               }
+                       }()
                        terminate()
-               case <-ctx.Done():
-               }
-       }()
+               })
 }
 
 // RunAtShutdown creates a go-routine to run the provided function at shutdown
 func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
-       go func() {
-               defer func() {
-                       if err := recover(); err != nil {
-                               log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
+       g.lock.Lock()
+       defer g.lock.Unlock()
+       g.toRunAtShutdown = append(g.toRunAtShutdown,
+               func() {
+                       defer func() {
+                               if err := recover(); err != nil {
+                                       log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
+                               }
+                       }()
+                       select {
+                       case <-ctx.Done():
+                               return
+                       default:
+                               shutdown()
                        }
-               }()
-               select {
-               case <-g.IsShutdown():
-                       shutdown()
-               case <-ctx.Done():
-               }
-       }()
+               })
 }
 
 // RunAtHammer creates a go-routine to run the provided function at shutdown
-func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) {
-       go func() {
-               defer func() {
-                       if err := recover(); err != nil {
-                               log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
-                       }
-               }()
-               select {
-               case <-g.IsHammer():
+func (g *Manager) RunAtHammer(hammer func()) {
+       g.lock.Lock()
+       defer g.lock.Unlock()
+       g.toRunAtHammer = append(g.toRunAtHammer,
+               func() {
+                       defer func() {
+                               if err := recover(); err != nil {
+                                       log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
+                               }
+                       }()
                        hammer()
-               case <-ctx.Done():
-               }
-       }()
+               })
 }
 func (g *Manager) doShutdown() {
        if !g.setStateTransition(stateRunning, stateShuttingDown) {
                return
        }
        g.lock.Lock()
-       close(g.shutdown)
+       g.shutdownCtxCancel()
+       for _, fn := range g.toRunAtShutdown {
+               go fn()
+       }
        g.lock.Unlock()
 
        if setting.GracefulHammerTime >= 0 {
@@ -203,7 +212,7 @@ func (g *Manager) doShutdown() {
                g.doTerminate()
                g.WaitForTerminate()
                g.lock.Lock()
-               close(g.done)
+               g.doneCtxCancel()
                g.lock.Unlock()
        }()
 }
@@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) {
        time.Sleep(d)
        g.lock.Lock()
        select {
-       case <-g.hammer:
+       case <-g.hammerCtx.Done():
        default:
                log.Warn("Setting Hammer condition")
-               close(g.hammer)
+               g.hammerCtxCancel()
+               for _, fn := range g.toRunAtHammer {
+                       go fn()
+               }
        }
        g.lock.Unlock()
 }
@@ -226,10 +238,13 @@ func (g *Manager) doTerminate() {
        }
        g.lock.Lock()
        select {
-       case <-g.terminate:
+       case <-g.terminateCtx.Done():
        default:
                log.Warn("Terminating")
-               close(g.terminate)
+               g.terminateCtxCancel()
+               for _, fn := range g.toRunAtTerminate {
+                       go fn()
+               }
        }
        g.lock.Unlock()
 }
@@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool {
 // IsShutdown returns a channel which will be closed at shutdown.
 // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
 func (g *Manager) IsShutdown() <-chan struct{} {
-       return g.shutdown
+       return g.shutdownCtx.Done()
 }
 
 // IsHammer returns a channel which will be closed at hammer
@@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} {
 // Servers running within the running server wait group should respond to IsHammer
 // if not shutdown already
 func (g *Manager) IsHammer() <-chan struct{} {
-       return g.hammer
+       return g.hammerCtx.Done()
 }
 
 // IsTerminate returns a channel which will be closed at terminate
 // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
 // IsTerminate will only close once all running servers have stopped
 func (g *Manager) IsTerminate() <-chan struct{} {
-       return g.terminate
+       return g.terminateCtx.Done()
 }
 
 // ServerDone declares a running server done and subtracts one from the
@@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() {
 
 // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
 func (g *Manager) Done() <-chan struct{} {
-       return g.done
+       return g.doneCtx.Done()
 }
 
-// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate
+// Err allows the manager to be viewed as a context.Context done at Terminate
 func (g *Manager) Err() error {
-       select {
-       case <-g.Done():
-               return ErrTerminate
-       default:
-               return nil
-       }
+       return g.doneCtx.Err()
 }
 
-// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values
+// Value allows the manager to be viewed as a context.Context done at Terminate
 func (g *Manager) Value(key interface{}) interface{} {
-       return nil
+       return g.doneCtx.Value(key)
 }
 
 // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
 func (g *Manager) Deadline() (deadline time.Time, ok bool) {
-       return
+       return g.doneCtx.Deadline()
 }
index 540974454c34c808e31e5b0d4299971aeb467e4f..20d9b3905c4fb0a9b20d8c8925b457141f9bacec 100644 (file)
@@ -25,13 +25,21 @@ type Manager struct {
        forked                 bool
        lock                   *sync.RWMutex
        state                  state
-       shutdown               chan struct{}
-       hammer                 chan struct{}
-       terminate              chan struct{}
-       done                   chan struct{}
+       shutdownCtx            context.Context
+       hammerCtx              context.Context
+       terminateCtx           context.Context
+       doneCtx                context.Context
+       shutdownCtxCancel      context.CancelFunc
+       hammerCtxCancel        context.CancelFunc
+       terminateCtxCancel     context.CancelFunc
+       doneCtxCancel          context.CancelFunc
        runningServerWaitGroup sync.WaitGroup
        createServerWaitGroup  sync.WaitGroup
        terminateWaitGroup     sync.WaitGroup
+
+       toRunAtShutdown  []func()
+       toRunAtHammer    []func()
+       toRunAtTerminate []func()
 }
 
 func newGracefulManager(ctx context.Context) *Manager {
@@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager {
 }
 
 func (g *Manager) start(ctx context.Context) {
-       // Make channels
-       g.terminate = make(chan struct{})
-       g.shutdown = make(chan struct{})
-       g.hammer = make(chan struct{})
-       g.done = make(chan struct{})
+       // Make contexts
+       g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
+       g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx)
+       g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx)
+       g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx)
 
        // Set the running state & handle signals
        g.setState(stateRunning)
index 14923c2a9b7d27f00656117be4ca5ace4a47c044..51f29778ba7afbb32bcb459b37eda77972d84afd 100644 (file)
@@ -36,14 +36,22 @@ type Manager struct {
        isChild                bool
        lock                   *sync.RWMutex
        state                  state
-       shutdown               chan struct{}
-       hammer                 chan struct{}
-       terminate              chan struct{}
-       done                   chan struct{}
+       shutdownCtx            context.Context
+       hammerCtx              context.Context
+       terminateCtx           context.Context
+       doneCtx                context.Context
+       shutdownCtxCancel      context.CancelFunc
+       hammerCtxCancel        context.CancelFunc
+       terminateCtxCancel     context.CancelFunc
+       doneCtxCancel          context.CancelFunc
        runningServerWaitGroup sync.WaitGroup
        createServerWaitGroup  sync.WaitGroup
        terminateWaitGroup     sync.WaitGroup
        shutdownRequested      chan struct{}
+
+       toRunAtShutdown  []func()
+       toRunAtHammer    []func()
+       toRunAtTerminate []func()
 }
 
 func newGracefulManager(ctx context.Context) *Manager {
@@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager {
 }
 
 func (g *Manager) start() {
+       // Make contexts
+       g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx)
+       g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx)
+       g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx)
+       g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx)
+
        // Make channels
-       g.terminate = make(chan struct{})
-       g.shutdown = make(chan struct{})
-       g.hammer = make(chan struct{})
-       g.done = make(chan struct{})
        g.shutdownRequested = make(chan struct{})
 
        // Set the running state
@@ -171,7 +181,7 @@ hammerLoop:
                        default:
                                log.Debug("Unexpected control request: %v", change.Cmd)
                        }
-               case <-g.hammer:
+               case <-g.hammerCtx.Done():
                        break hammerLoop
                }
        }
index a7d78e9fdc82c2fe12fd455e8fb1005fc780769a..67fa43eda89dc35f318280a784313ee270455fcd 100644 (file)
@@ -115,7 +115,13 @@ func Init() {
 
        ctx, cancel := context.WithCancel(context.Background())
 
-       graceful.GetManager().RunAtTerminate(ctx, func() {
+       graceful.GetManager().RunAtTerminate(func() {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+               cancel()
                log.Debug("Closing repository indexer")
                indexer.Close()
                log.Info("PID: %d Repository Indexer closed", os.Getpid())
index 9edaef6bdd017916651c64a16046346030c64c8e..676b6686ea5b2aa714dbaab97b7ebccaea267ee1 100644 (file)
@@ -160,7 +160,7 @@ func InitIssueIndexer(syncReindex bool) {
                        }
                        populate = !exist
                        holder.set(issueIndexer)
-                       graceful.GetManager().RunAtTerminate(context.Background(), func() {
+                       graceful.GetManager().RunAtTerminate(func() {
                                log.Debug("Closing issue indexer")
                                issueIndexer := holder.get()
                                if issueIndexer != nil {
@@ -170,7 +170,7 @@ func InitIssueIndexer(syncReindex bool) {
                        })
                        log.Debug("Created Bleve Indexer")
                case "elasticsearch":
-                       graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+                       graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
                                issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
                                if err != nil {
                                        log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
index 94478e6f05c4bb2d8451603a5ecda409db047ca5..3a10c8e1259c6d048e94ec9f84754ac5ba652df8 100644 (file)
@@ -4,14 +4,16 @@
 
 package queue
 
+import "context"
+
 // ByteFIFO defines a FIFO that takes a byte array
 type ByteFIFO interface {
        // Len returns the length of the fifo
-       Len() int64
+       Len(ctx context.Context) int64
        // PushFunc pushes data to the end of the fifo and calls the callback if it is added
-       PushFunc(data []byte, fn func() error) error
+       PushFunc(ctx context.Context, data []byte, fn func() error) error
        // Pop pops data from the start of the fifo
-       Pop() ([]byte, error)
+       Pop(ctx context.Context) ([]byte, error)
        // Close this fifo
        Close() error
 }
@@ -20,7 +22,7 @@ type ByteFIFO interface {
 type UniqueByteFIFO interface {
        ByteFIFO
        // Has returns whether the fifo contains this data
-       Has(data []byte) (bool, error)
+       Has(ctx context.Context, data []byte) (bool, error)
 }
 
 var _ ByteFIFO = &DummyByteFIFO{}
@@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{}
 type DummyByteFIFO struct{}
 
 // PushFunc returns nil
-func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
        return nil
 }
 
 // Pop returns nil
-func (*DummyByteFIFO) Pop() ([]byte, error) {
+func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) {
        return []byte{}, nil
 }
 
@@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error {
 }
 
 // Len is always 0
-func (*DummyByteFIFO) Len() int64 {
+func (*DummyByteFIFO) Len(ctx context.Context) int64 {
        return 0
 }
 
@@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct {
 }
 
 // Has always returns false
-func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
+func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
        return false, nil
 }
index c3ec735af504dafd3e7a8eba35f236836453759b..a6d48575ab674191a38bec6841f25b502a33101d 100644 (file)
@@ -187,14 +187,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
                        if flushable, ok := mq.Managed.(Flushable); ok {
                                log.Debug("Flushing (flushable) queue: %s", mq.Name)
                                go func(q *ManagedQueue) {
-                                       localCtx, localCancel := context.WithCancel(ctx)
-                                       pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
+                                       localCtx, localCtxCancel := context.WithCancel(ctx)
+                                       pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
                                        err := flushable.FlushWithContext(localCtx)
                                        if err != nil && err != ctx.Err() {
                                                cancel()
                                        }
                                        q.CancelWorkers(pid)
-                                       localCancel()
+                                       localCtxCancel()
                                        wg.Done()
                                }(mq)
                        } else {
index d08cba35a1ea5422569d6b1b6d64475f40f9e1d2..7159048c11689d2f91289f65a30b4813c30ab06c 100644 (file)
@@ -57,7 +57,7 @@ type Named interface {
 // Queues will handle their own contents in the Run method
 type Queue interface {
        Flushable
-       Run(atShutdown, atTerminate func(context.Context, func()))
+       Run(atShutdown, atTerminate func(func()))
        Push(Data) error
 }
 
@@ -74,7 +74,7 @@ type DummyQueue struct {
 }
 
 // Run does nothing
-func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
+func (*DummyQueue) Run(_, _ func(func())) {}
 
 // Push fakes a push of data to the queue
 func (*DummyQueue) Push(Data) error {
@@ -122,7 +122,7 @@ type Immediate struct {
 }
 
 // Run does nothing
-func (*Immediate) Run(_, _ func(context.Context, func())) {}
+func (*Immediate) Run(_, _ func(func())) {}
 
 // Push fakes a push of data to the queue
 func (q *Immediate) Push(data Data) error {
index fe1fb7807e831ca652d423ee6e2982380a1d044a..3ea61aad0e4c5469150955ecfa39ffe782b592c6 100644 (file)
@@ -17,8 +17,9 @@ import (
 // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
 type ByteFIFOQueueConfiguration struct {
        WorkerPoolConfiguration
-       Workers int
-       Name    string
+       Workers     int
+       Name        string
+       WaitOnEmpty bool
 }
 
 var _ Queue = &ByteFIFOQueue{}
@@ -26,14 +27,18 @@ var _ Queue = &ByteFIFOQueue{}
 // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
 type ByteFIFOQueue struct {
        *WorkerPool
-       byteFIFO   ByteFIFO
-       typ        Type
-       closed     chan struct{}
-       terminated chan struct{}
-       exemplar   interface{}
-       workers    int
-       name       string
-       lock       sync.Mutex
+       byteFIFO           ByteFIFO
+       typ                Type
+       shutdownCtx        context.Context
+       shutdownCtxCancel  context.CancelFunc
+       terminateCtx       context.Context
+       terminateCtxCancel context.CancelFunc
+       exemplar           interface{}
+       workers            int
+       name               string
+       lock               sync.Mutex
+       waitOnEmpty        bool
+       pushed             chan struct{}
 }
 
 // NewByteFIFOQueue creates a new ByteFIFOQueue
@@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
        }
        config := configInterface.(ByteFIFOQueueConfiguration)
 
+       terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+       shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
        return &ByteFIFOQueue{
-               WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
-               byteFIFO:   byteFIFO,
-               typ:        typ,
-               closed:     make(chan struct{}),
-               terminated: make(chan struct{}),
-               exemplar:   exemplar,
-               workers:    config.Workers,
-               name:       config.Name,
+               WorkerPool:         NewWorkerPool(handle, config.WorkerPoolConfiguration),
+               byteFIFO:           byteFIFO,
+               typ:                typ,
+               shutdownCtx:        shutdownCtx,
+               shutdownCtxCancel:  shutdownCtxCancel,
+               terminateCtx:       terminateCtx,
+               terminateCtxCancel: terminateCtxCancel,
+               exemplar:           exemplar,
+               workers:            config.Workers,
+               name:               config.Name,
+               waitOnEmpty:        config.WaitOnEmpty,
+               pushed:             make(chan struct{}, 1),
        }, nil
 }
 
@@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
        if err != nil {
                return err
        }
-       return q.byteFIFO.PushFunc(bs, fn)
+       if q.waitOnEmpty {
+               defer func() {
+                       select {
+                       case q.pushed <- struct{}{}:
+                       default:
+                       }
+               }()
+       }
+       return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
 }
 
 // IsEmpty checks if the queue is empty
@@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
        if !q.WorkerPool.IsEmpty() {
                return false
        }
-       return q.byteFIFO.Len() == 0
+       return q.byteFIFO.Len(q.terminateCtx) == 0
 }
 
 // Run runs the bytefifo queue
-func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
-       atShutdown(context.Background(), q.Shutdown)
-       atTerminate(context.Background(), q.Terminate)
+func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
+       atShutdown(q.Shutdown)
+       atTerminate(q.Terminate)
        log.Debug("%s: %s Starting", q.typ, q.name)
 
-       go func() {
-               _ = q.AddWorkers(q.workers, 0)
-       }()
+       _ = q.AddWorkers(q.workers, 0)
 
-       go q.readToChan()
+       log.Trace("%s: %s Now running", q.typ, q.name)
+       q.readToChan()
 
-       log.Trace("%s: %s Waiting til closed", q.typ, q.name)
-       <-q.closed
+       <-q.shutdownCtx.Done()
        log.Trace("%s: %s Waiting til done", q.typ, q.name)
        q.Wait()
 
        log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
-       ctx, cancel := context.WithCancel(context.Background())
-       atTerminate(ctx, cancel)
-       q.CleanUp(ctx)
-       cancel()
+       q.CleanUp(q.terminateCtx)
+       q.terminateCtxCancel()
 }
 
+const maxBackOffTime = time.Second * 3
+
 func (q *ByteFIFOQueue) readToChan() {
        // handle quick cancels
        select {
-       case <-q.closed:
+       case <-q.shutdownCtx.Done():
                // tell the pool to shutdown.
-               q.cancel()
+               q.baseCtxCancel()
                return
        default:
        }
 
+       // Default backoff values
        backOffTime := time.Millisecond * 100
-       maxBackOffTime := time.Second * 3
-       for {
-               success, resetBackoff := q.doPop()
-               if resetBackoff {
-                       backOffTime = 100 * time.Millisecond
-               }
 
-               if success {
+loop:
+       for {
+               err := q.doPop()
+               if err == errQueueEmpty {
+                       log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
                        select {
-                       case <-q.closed:
-                               // tell the pool to shutdown.
-                               q.cancel()
+                       case <-q.pushed:
+                               // reset backOffTime
+                               backOffTime = 100 * time.Millisecond
+                               continue loop
+                       case <-q.shutdownCtx.Done():
+                               // Oops we've been shutdown whilst waiting
+                               // Make sure the worker pool is shutdown too
+                               q.baseCtxCancel()
                                return
-                       default:
                        }
-               } else {
+               }
+
+               // Reset the backOffTime if there is no error or an unmarshalError
+               if err == nil || err == errUnmarshal {
+                       backOffTime = 100 * time.Millisecond
+               }
+
+               if err != nil {
+                       // Need to Backoff
                        select {
-                       case <-q.closed:
-                               // tell the pool to shutdown.
-                               q.cancel()
+                       case <-q.shutdownCtx.Done():
+                               // Oops we've been shutdown whilst backing off
+                               // Make sure the worker pool is shutdown too
+                               q.baseCtxCancel()
                                return
                        case <-time.After(backOffTime):
-                       }
-                       backOffTime += backOffTime / 2
-                       if backOffTime > maxBackOffTime {
-                               backOffTime = maxBackOffTime
+                               // OK we've waited - so backoff a bit
+                               backOffTime += backOffTime / 2
+                               if backOffTime > maxBackOffTime {
+                                       backOffTime = maxBackOffTime
+                               }
+                               continue loop
                        }
                }
+               select {
+               case <-q.shutdownCtx.Done():
+                       // Oops we've been shutdown
+                       // Make sure the worker pool is shutdown too
+                       q.baseCtxCancel()
+                       return
+               default:
+                       continue loop
+               }
        }
 }
 
-func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
+var errQueueEmpty = fmt.Errorf("empty queue")
+var errEmptyBytes = fmt.Errorf("empty bytes")
+var errUnmarshal = fmt.Errorf("failed to unmarshal")
+
+func (q *ByteFIFOQueue) doPop() error {
        q.lock.Lock()
        defer q.lock.Unlock()
-       bs, err := q.byteFIFO.Pop()
+       bs, err := q.byteFIFO.Pop(q.shutdownCtx)
        if err != nil {
+               if err == context.Canceled {
+                       q.baseCtxCancel()
+                       return err
+               }
                log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
-               return
+               return err
        }
        if len(bs) == 0 {
-               return
+               if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
+                       return errQueueEmpty
+               }
+               return errEmptyBytes
        }
 
-       resetBackoff = true
-
        data, err := unmarshalAs(bs, q.exemplar)
        if err != nil {
                log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
-               return
+               return errUnmarshal
        }
 
        log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
        q.WorkerPool.Push(data)
-       success = true
-       return
+       return nil
 }
 
 // Shutdown processing from this queue
 func (q *ByteFIFOQueue) Shutdown() {
        log.Trace("%s: %s Shutting down", q.typ, q.name)
-       q.lock.Lock()
        select {
-       case <-q.closed:
+       case <-q.shutdownCtx.Done():
+               return
        default:
-               close(q.closed)
        }
-       q.lock.Unlock()
+       q.shutdownCtxCancel()
        log.Debug("%s: %s Shutdown", q.typ, q.name)
 }
 
 // IsShutdown returns a channel which is closed when this Queue is shutdown
 func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
-       return q.closed
+       return q.shutdownCtx.Done()
 }
 
 // Terminate this queue and close the queue
 func (q *ByteFIFOQueue) Terminate() {
        log.Trace("%s: %s Terminating", q.typ, q.name)
        q.Shutdown()
-       q.lock.Lock()
        select {
-       case <-q.terminated:
-               q.lock.Unlock()
+       case <-q.terminateCtx.Done():
                return
        default:
        }
-       close(q.terminated)
-       q.lock.Unlock()
        if log.IsDebug() {
-               log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
+               log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
        }
+       q.terminateCtxCancel()
        if err := q.byteFIFO.Close(); err != nil {
                log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
        }
@@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() {
 
 // IsTerminated returns a channel which is closed when this Queue is terminated
 func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
-       return q.terminated
+       return q.terminateCtx.Done()
 }
 
 var _ UniqueQueue = &ByteFIFOUniqueQueue{}
@@ -240,17 +285,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
                return nil, err
        }
        config := configInterface.(ByteFIFOQueueConfiguration)
+       terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+       shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
 
        return &ByteFIFOUniqueQueue{
                ByteFIFOQueue: ByteFIFOQueue{
-                       WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
-                       byteFIFO:   byteFIFO,
-                       typ:        typ,
-                       closed:     make(chan struct{}),
-                       terminated: make(chan struct{}),
-                       exemplar:   exemplar,
-                       workers:    config.Workers,
-                       name:       config.Name,
+                       WorkerPool:         NewWorkerPool(handle, config.WorkerPoolConfiguration),
+                       byteFIFO:           byteFIFO,
+                       typ:                typ,
+                       shutdownCtx:        shutdownCtx,
+                       shutdownCtxCancel:  shutdownCtxCancel,
+                       terminateCtx:       terminateCtx,
+                       terminateCtxCancel: terminateCtxCancel,
+                       exemplar:           exemplar,
+                       workers:            config.Workers,
+                       name:               config.Name,
                },
        }, nil
 }
@@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
        if err != nil {
                return false, err
        }
-       return q.byteFIFO.(UniqueByteFIFO).Has(bs)
+       return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
 }
index d7a11e79f5dc637f4b11386e1bad2ceeee1b9ead..4df64b69ee5ee88387e0b981924535e506442794 100644 (file)
@@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct {
 // It is basically a very thin wrapper around a WorkerPool
 type ChannelQueue struct {
        *WorkerPool
-       exemplar interface{}
-       workers  int
-       name     string
+       shutdownCtx        context.Context
+       shutdownCtxCancel  context.CancelFunc
+       terminateCtx       context.Context
+       terminateCtxCancel context.CancelFunc
+       exemplar           interface{}
+       workers            int
+       name               string
 }
 
 // NewChannelQueue creates a memory channel queue
@@ -42,28 +46,30 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
        if config.BatchLength == 0 {
                config.BatchLength = 1
        }
+
+       terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+       shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
        queue := &ChannelQueue{
-               WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
-               exemplar:   exemplar,
-               workers:    config.Workers,
-               name:       config.Name,
+               WorkerPool:         NewWorkerPool(handle, config.WorkerPoolConfiguration),
+               shutdownCtx:        shutdownCtx,
+               shutdownCtxCancel:  shutdownCtxCancel,
+               terminateCtx:       terminateCtx,
+               terminateCtxCancel: terminateCtxCancel,
+               exemplar:           exemplar,
+               workers:            config.Workers,
+               name:               config.Name,
        }
        queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
        return queue, nil
 }
 
 // Run starts to run the queue
-func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
-       atShutdown(context.Background(), func() {
-               log.Warn("ChannelQueue: %s is not shutdownable!", q.name)
-       })
-       atTerminate(context.Background(), func() {
-               log.Warn("ChannelQueue: %s is not terminatable!", q.name)
-       })
+func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
+       atShutdown(q.Shutdown)
+       atTerminate(q.Terminate)
        log.Debug("ChannelQueue: %s Starting", q.name)
-       go func() {
-               _ = q.AddWorkers(q.workers, 0)
-       }()
+       _ = q.AddWorkers(q.workers, 0)
 }
 
 // Push will push data into the queue
@@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error {
        return nil
 }
 
+// Shutdown processing from this queue
+func (q *ChannelQueue) Shutdown() {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       select {
+       case <-q.shutdownCtx.Done():
+               log.Trace("ChannelQueue: %s Already Shutting down", q.name)
+               return
+       default:
+       }
+       log.Trace("ChannelQueue: %s Shutting down", q.name)
+       go func() {
+               log.Trace("ChannelQueue: %s Flushing", q.name)
+               if err := q.FlushWithContext(q.terminateCtx); err != nil {
+                       log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
+                       return
+               }
+               log.Debug("ChannelQueue: %s Flushed", q.name)
+       }()
+       q.shutdownCtxCancel()
+       log.Debug("ChannelQueue: %s Shutdown", q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ChannelQueue) Terminate() {
+       log.Trace("ChannelQueue: %s Terminating", q.name)
+       q.Shutdown()
+       select {
+       case <-q.terminateCtx.Done():
+               return
+       default:
+       }
+       q.terminateCtxCancel()
+       log.Debug("ChannelQueue: %s Terminated", q.name)
+}
+
 // Name returns the name of this queue
 func (q *ChannelQueue) Name() string {
        return q.name
index bca81d50fdaeaef69f90815f7e9cd1fa77202d61..e7abe5b50b76451b04cdd2ed06ecc38817174042 100644 (file)
@@ -5,7 +5,6 @@
 package queue
 
 import (
-       "context"
        "testing"
        "time"
 
@@ -21,7 +20,7 @@ func TestChannelQueue(t *testing.T) {
                }
        }
 
-       nilFn := func(_ context.Context, _ func()) {}
+       nilFn := func(_ func()) {}
 
        queue, err := NewChannelQueue(handle,
                ChannelQueueConfiguration{
@@ -61,7 +60,7 @@ func TestChannelQueue_Batch(t *testing.T) {
                }
        }
 
-       nilFn := func(_ context.Context, _ func()) {}
+       nilFn := func(_ func()) {}
 
        queue, err := NewChannelQueue(handle,
                ChannelQueueConfiguration{
index 6c15a8e63be2901dc36e424126e6bf1bc24eace0..911233a5d9a011aa136f7c634b3ec6ed28e19695 100644 (file)
@@ -5,6 +5,8 @@
 package queue
 
 import (
+       "context"
+
        "code.gitea.io/gitea/modules/nosql"
 
        "gitea.com/lunny/levelqueue"
@@ -37,6 +39,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
        if len(config.ConnectionString) == 0 {
                config.ConnectionString = config.DataDir
        }
+       config.WaitOnEmpty = true
 
        byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
        if err != nil {
@@ -82,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro
 }
 
 // PushFunc will push data into the fifo
-func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
        if fn != nil {
                if err := fn(); err != nil {
                        return err
@@ -92,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
 }
 
 // Pop pops data from the start of the fifo
-func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
+func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
        data, err := fifo.internal.RPop()
        if err != nil && err != levelqueue.ErrNotFound {
                return nil, err
@@ -108,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error {
 }
 
 // Len returns the length of the fifo
-func (fifo *LevelQueueByteFIFO) Len() int64 {
+func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 {
        return fifo.internal.Len()
 }
 
index 801fd8a12235c7593b78573c51707d7f8e120650..c3a1c5781ef09df4397ce19c5dd4a28646a16c2f 100644 (file)
@@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error {
 }
 
 // Run starts to run the queue
-func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
        log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
+       _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
 
        q.lock.Lock()
        if q.internal == nil {
@@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
        } else {
                q.lock.Unlock()
        }
-       atShutdown(context.Background(), q.Shutdown)
-       atTerminate(context.Background(), q.Terminate)
+       atShutdown(q.Shutdown)
+       atTerminate(q.Terminate)
 
-       // Just run the level queue - we shut it down later
-       go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
-
-       go func() {
-               _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
-       }()
+       if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
+               // Just run the level queue - we shut it down once it's flushed
+               go q.internal.Run(func(_ func()) {}, func(_ func()) {})
+               go func() {
+                       for !q.IsEmpty() {
+                               _ = q.internal.Flush(0)
+                               select {
+                               case <-time.After(100 * time.Millisecond):
+                               case <-q.internal.(*LevelQueue).shutdownCtx.Done():
+                                       log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
+                                       return
+                               }
+                       }
+                       log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
+                       q.internal.(*LevelQueue).Shutdown()
+                       GetManager().Remove(q.internal.(*LevelQueue).qid)
+               }()
+       } else {
+               log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
+               q.internal.(*LevelQueue).Shutdown()
+               GetManager().Remove(q.internal.(*LevelQueue).qid)
+       }
 
-       log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
-       <-q.closed
-       log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
-       q.channelQueue.cancel()
-       q.internal.(*LevelQueue).cancel()
-       log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
-       q.channelQueue.Wait()
-       q.internal.(*LevelQueue).Wait()
-       // Redirect all remaining data in the chan to the internal channel
-       go func() {
-               log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
-               for data := range q.channelQueue.dataChan {
-                       _ = q.internal.Push(data)
-                       atomic.AddInt64(&q.channelQueue.numInQueue, -1)
-               }
-               log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
-       }()
-       log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
 }
 
 // Flush flushes the queue and blocks till the queue is empty
@@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool {
 func (q *PersistableChannelQueue) Shutdown() {
        log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
        q.lock.Lock()
-       defer q.lock.Unlock()
+
        select {
        case <-q.closed:
+               q.lock.Unlock()
+               return
        default:
-               if q.internal != nil {
-                       q.internal.(*LevelQueue).Shutdown()
-               }
-               close(q.closed)
-               log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
        }
+       q.channelQueue.Shutdown()
+       if q.internal != nil {
+               q.internal.(*LevelQueue).Shutdown()
+       }
+       close(q.closed)
+       q.lock.Unlock()
+
+       log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
+       q.channelQueue.baseCtxCancel()
+       q.internal.(*LevelQueue).baseCtxCancel()
+       log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
+       q.channelQueue.Wait()
+       q.internal.(*LevelQueue).Wait()
+       // Redirect all remaining data in the chan to the internal channel
+       go func() {
+               log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+               for data := range q.channelQueue.dataChan {
+                       _ = q.internal.Push(data)
+                       atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+               }
+               log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+       }()
+
+       log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
 }
 
 // Terminate this queue and close the queue
@@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() {
        q.Shutdown()
        q.lock.Lock()
        defer q.lock.Unlock()
+       q.channelQueue.Terminate()
        if q.internal != nil {
                q.internal.(*LevelQueue).Terminate()
        }
index 93061bffc65865c679a4ee6ccc54b51d27ad7575..561f98ca907b6ad68cc43606f81edc559957827f 100644 (file)
@@ -5,10 +5,8 @@
 package queue
 
 import (
-       "context"
        "io/ioutil"
        "testing"
-       "time"
 
        "code.gitea.io/gitea/modules/util"
        "github.com/stretchr/testify/assert"
@@ -32,17 +30,19 @@ func TestPersistableChannelQueue(t *testing.T) {
        defer util.RemoveAll(tmpDir)
 
        queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
-               DataDir:     tmpDir,
-               BatchLength: 2,
-               QueueLength: 20,
-               Workers:     1,
-               MaxWorkers:  10,
+               DataDir:      tmpDir,
+               BatchLength:  2,
+               QueueLength:  20,
+               Workers:      1,
+               BoostWorkers: 0,
+               MaxWorkers:   10,
+               Name:         "first",
        }, &testData{})
        assert.NoError(t, err)
 
-       go queue.Run(func(_ context.Context, shutdown func()) {
+       go queue.Run(func(shutdown func()) {
                queueShutdown = append(queueShutdown, shutdown)
-       }, func(_ context.Context, terminate func()) {
+       }, func(terminate func()) {
                queueTerminate = append(queueTerminate, terminate)
        })
 
@@ -64,13 +64,18 @@ func TestPersistableChannelQueue(t *testing.T) {
        assert.Equal(t, test2.TestString, result2.TestString)
        assert.Equal(t, test2.TestInt, result2.TestInt)
 
+       // test1 is a testData not a *testData so will be rejected
        err = queue.Push(test1)
        assert.Error(t, err)
 
+       // Now shutdown the queue
        for _, callback := range queueShutdown {
                callback()
        }
-       time.Sleep(200 * time.Millisecond)
+
+       // Wait til it is closed
+       <-queue.(*PersistableChannelQueue).closed
+
        err = queue.Push(&test1)
        assert.NoError(t, err)
        err = queue.Push(&test2)
@@ -80,23 +85,33 @@ func TestPersistableChannelQueue(t *testing.T) {
                assert.Fail(t, "Handler processing should have stopped")
        default:
        }
+
+       // terminate the queue
        for _, callback := range queueTerminate {
                callback()
        }
 
+       select {
+       case <-handleChan:
+               assert.Fail(t, "Handler processing should have stopped")
+       default:
+       }
+
        // Reopen queue
        queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
-               DataDir:     tmpDir,
-               BatchLength: 2,
-               QueueLength: 20,
-               Workers:     1,
-               MaxWorkers:  10,
+               DataDir:      tmpDir,
+               BatchLength:  2,
+               QueueLength:  20,
+               Workers:      1,
+               BoostWorkers: 0,
+               MaxWorkers:   10,
+               Name:         "second",
        }, &testData{})
        assert.NoError(t, err)
 
-       go queue.Run(func(_ context.Context, shutdown func()) {
+       go queue.Run(func(shutdown func()) {
                queueShutdown = append(queueShutdown, shutdown)
-       }, func(_ context.Context, terminate func()) {
+       }, func(terminate func()) {
                queueTerminate = append(queueTerminate, terminate)
        })
 
index edaed49a52396f5a78a0da64b0762d4884186326..1f884d4f8d76db1fef0db10b07893a1de6b4f3af 100644 (file)
@@ -5,7 +5,6 @@
 package queue
 
 import (
-       "context"
        "io/ioutil"
        "sync"
        "testing"
@@ -49,11 +48,11 @@ func TestLevelQueue(t *testing.T) {
        }, &testData{})
        assert.NoError(t, err)
 
-       go queue.Run(func(_ context.Context, shutdown func()) {
+       go queue.Run(func(shutdown func()) {
                lock.Lock()
                queueShutdown = append(queueShutdown, shutdown)
                lock.Unlock()
-       }, func(_ context.Context, terminate func()) {
+       }, func(terminate func()) {
                lock.Lock()
                queueTerminate = append(queueTerminate, terminate)
                lock.Unlock()
@@ -123,11 +122,11 @@ func TestLevelQueue(t *testing.T) {
                }, &testData{})
        assert.NoError(t, err)
 
-       go queue.Run(func(_ context.Context, shutdown func()) {
+       go queue.Run(func(shutdown func()) {
                lock.Lock()
                queueShutdown = append(queueShutdown, shutdown)
                lock.Unlock()
-       }, func(_ context.Context, terminate func()) {
+       }, func(terminate func()) {
                lock.Lock()
                queueTerminate = append(queueTerminate, terminate)
                lock.Unlock()
index af2cc30335b78785e0dd19e428329abc26093514..a5fb866dc1e11aa6f09bdc1711a30875b4834717 100644 (file)
@@ -6,7 +6,6 @@ package queue
 
 import (
        "context"
-       "fmt"
 
        "code.gitea.io/gitea/modules/graceful"
        "code.gitea.io/gitea/modules/log"
@@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
                return nil, err
        }
 
-       byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
-
        queue := &RedisQueue{
                ByteFIFOQueue: byteFIFOQueue,
        }
@@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{}
 
 // RedisByteFIFO represents a ByteFIFO formed from a redisClient
 type RedisByteFIFO struct {
-       ctx       context.Context
-       client    redisClient
+       client redisClient
+
        queueName string
 }
 
@@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
        fifo := &RedisByteFIFO{
                queueName: config.QueueName,
        }
-       fifo.ctx = graceful.GetManager().TerminateContext()
        fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
        if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil {
                return nil, err
@@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
 }
 
 // PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
        if fn != nil {
                if err := fn(); err != nil {
                        return err
                }
        }
-       return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
+       return fifo.client.RPush(ctx, fifo.queueName, data).Err()
 }
 
 // Pop pops data from the start of the fifo
-func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
-       data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
+func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
+       data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
        if err == nil || err == redis.Nil {
                return data, nil
        }
@@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error {
 }
 
 // Len returns the length of the fifo
-func (fifo *RedisByteFIFO) Len() int64 {
-       val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result()
+func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 {
+       val, err := fifo.client.LLen(ctx, fifo.queueName).Result()
        if err != nil {
                log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
                return -1
index 88d64e82464f4c8d5bd13ae6e2e091a80c65aa10..ec30ab028197229d534978c92c7e53f818470cb8 100644 (file)
@@ -38,7 +38,7 @@ type delayedStarter struct {
 }
 
 // setInternal must be called with the lock locked.
-func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error {
+func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error {
        var ctx context.Context
        var cancel context.CancelFunc
        if q.timeout > 0 {
@@ -49,9 +49,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
 
        defer cancel()
        // Ensure we also stop at shutdown
-       atShutdown(ctx, func() {
-               cancel()
-       })
+       atShutdown(cancel)
 
        i := 1
        for q.internal == nil {
@@ -221,7 +219,7 @@ func (q *WrappedQueue) IsEmpty() bool {
 }
 
 // Run starts to run the queue and attempts to create the internal queue
-func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) {
        log.Debug("WrappedQueue: %s Starting", q.name)
        q.lock.Lock()
        if q.internal == nil {
index dec1cfc5c06e3ab6829e4f8aa5e5fc00c93085f6..5bec67c4d355cc669d008f853360f16c61499005 100644 (file)
@@ -28,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
 // only guaranteed whilst the task is waiting in the queue.
 type ChannelUniqueQueue struct {
        *WorkerPool
-       lock     sync.Mutex
-       table    map[Data]bool
-       exemplar interface{}
-       workers  int
-       name     string
+       lock               sync.Mutex
+       table              map[Data]bool
+       shutdownCtx        context.Context
+       shutdownCtxCancel  context.CancelFunc
+       terminateCtx       context.Context
+       terminateCtxCancel context.CancelFunc
+       exemplar           interface{}
+       workers            int
+       name               string
 }
 
 // NewChannelUniqueQueue create a memory channel queue
@@ -45,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
        if config.BatchLength == 0 {
                config.BatchLength = 1
        }
+
+       terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+       shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
        queue := &ChannelUniqueQueue{
-               table:    map[Data]bool{},
-               exemplar: exemplar,
-               workers:  config.Workers,
-               name:     config.Name,
+               table:              map[Data]bool{},
+               shutdownCtx:        shutdownCtx,
+               shutdownCtxCancel:  shutdownCtxCancel,
+               terminateCtx:       terminateCtx,
+               terminateCtxCancel: terminateCtxCancel,
+               exemplar:           exemplar,
+               workers:            config.Workers,
+               name:               config.Name,
        }
        queue.WorkerPool = NewWorkerPool(func(data ...Data) {
                for _, datum := range data {
@@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
 }
 
 // Run starts to run the queue
-func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
-       atShutdown(context.Background(), func() {
-               log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
-       })
-       atTerminate(context.Background(), func() {
-               log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
-       })
+func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+       atShutdown(q.Shutdown)
+       atTerminate(q.Terminate)
        log.Debug("ChannelUniqueQueue: %s Starting", q.name)
-       go func() {
-               _ = q.AddWorkers(q.workers, 0)
-       }()
+       _ = q.AddWorkers(q.workers, 0)
 }
 
 // Push will push data into the queue if the data is not already in the queue
@@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
        return has, nil
 }
 
+// Shutdown processing from this queue
+func (q *ChannelUniqueQueue) Shutdown() {
+       log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
+       select {
+       case <-q.shutdownCtx.Done():
+               return
+       default:
+       }
+       go func() {
+               log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
+               if err := q.FlushWithContext(q.terminateCtx); err != nil {
+                       log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
+                       return
+               }
+               log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
+       }()
+       q.shutdownCtxCancel()
+       log.Debug("ChannelUniqueQueue: %s Shutdown", q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ChannelUniqueQueue) Terminate() {
+       log.Trace("ChannelUniqueQueue: %s Terminating", q.name)
+       q.Shutdown()
+       select {
+       case <-q.terminateCtx.Done():
+               return
+       default:
+       }
+       q.terminateCtxCancel()
+       log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
+}
+
 // Name returns the name of this queue
 func (q *ChannelUniqueQueue) Name() string {
        return q.name
index 8ec8848bc498b48cbc6bffaecb5f96e6f28c562d..bb0eb7d950c599db59a2ce3cc8376d2c4c6e6fbd 100644 (file)
@@ -5,6 +5,8 @@
 package queue
 
 import (
+       "context"
+
        "code.gitea.io/gitea/modules/nosql"
 
        "gitea.com/lunny/levelqueue"
@@ -41,6 +43,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
        if len(config.ConnectionString) == 0 {
                config.ConnectionString = config.DataDir
        }
+       config.WaitOnEmpty = true
 
        byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
        if err != nil {
@@ -86,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy
 }
 
 // PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
        return fifo.internal.LPushFunc(data, fn)
 }
 
 // Pop pops data from the start of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
+func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
        data, err := fifo.internal.RPop()
        if err != nil && err != levelqueue.ErrNotFound {
                return nil, err
@@ -100,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
 }
 
 // Len returns the length of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
+func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 {
        return fifo.internal.Len()
 }
 
 // Has returns whether the fifo contains this data
-func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
+func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
        return fifo.internal.Has(data)
 }
 
index 47c4f2bdd574da2eedcf26f6637350bebe7e4a42..65a39415199543f64a0f78188de29bf05dfd0676 100644 (file)
@@ -36,7 +36,7 @@ type PersistableChannelUniqueQueueConfiguration struct {
 // task cannot be processed twice or more at the same time. Uniqueness is
 // only guaranteed whilst the task is waiting in the queue.
 type PersistableChannelUniqueQueue struct {
-       *ChannelUniqueQueue
+       channelQueue *ChannelUniqueQueue
        delayedStarter
        lock   sync.Mutex
        closed chan struct{}
@@ -85,8 +85,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
        }
 
        queue := &PersistableChannelUniqueQueue{
-               ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue),
-               closed:             make(chan struct{}),
+               channelQueue: channelUniqueQueue.(*ChannelUniqueQueue),
+               closed:       make(chan struct{}),
        }
 
        levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
@@ -138,14 +138,14 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err
        case <-q.closed:
                return q.internal.(UniqueQueue).PushFunc(data, fn)
        default:
-               return q.ChannelUniqueQueue.PushFunc(data, fn)
+               return q.channelQueue.PushFunc(data, fn)
        }
 }
 
 // Has will test if the queue has the data
 func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
        // This is more difficult...
-       has, err := q.ChannelUniqueQueue.Has(data)
+       has, err := q.channelQueue.Has(data)
        if err != nil || has {
                return has, err
        }
@@ -158,7 +158,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
 }
 
 // Run starts to run the queue
-func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
        log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
 
        q.lock.Lock()
@@ -170,7 +170,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context
                                        log.Error("Unable push to channelled queue: %v", err)
                                }
                        }
-               }, q.exemplar)
+               }, q.channelQueue.exemplar)
                q.lock.Unlock()
                if err != nil {
                        log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
@@ -179,53 +179,73 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context
        } else {
                q.lock.Unlock()
        }
-       atShutdown(context.Background(), q.Shutdown)
-       atTerminate(context.Background(), q.Terminate)
+       atShutdown(q.Shutdown)
+       atTerminate(q.Terminate)
+       _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
 
-       // Just run the level queue - we shut it down later
-       go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
-
-       go func() {
-               _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0)
-       }()
+       if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
+               // Just run the level queue - we shut it down once it's flushed
+               go q.internal.Run(func(_ func()) {}, func(_ func()) {})
+               go func() {
+                       _ = q.internal.Flush(0)
+                       log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
+                       q.internal.(*LevelUniqueQueue).Shutdown()
+                       GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
+               }()
+       } else {
+               log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
+               q.internal.(*LevelUniqueQueue).Shutdown()
+               GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
+       }
 
-       log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name)
-       <-q.closed
-       log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
-       q.internal.(*LevelUniqueQueue).cancel()
-       q.ChannelUniqueQueue.cancel()
-       log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
-       q.ChannelUniqueQueue.Wait()
-       q.internal.(*LevelUniqueQueue).Wait()
-       // Redirect all remaining data in the chan to the internal channel
-       go func() {
-               log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
-               for data := range q.ChannelUniqueQueue.dataChan {
-                       _ = q.internal.Push(data)
-               }
-               log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
-       }()
-       log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
 }
 
 // Flush flushes the queue
 func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
-       return q.ChannelUniqueQueue.Flush(timeout)
+       return q.channelQueue.Flush(timeout)
+}
+
+// FlushWithContext flushes the queue
+func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
+       return q.channelQueue.FlushWithContext(ctx)
+}
+
+// IsEmpty checks if a queue is empty
+func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
+       return q.channelQueue.IsEmpty()
 }
 
 // Shutdown processing this queue
 func (q *PersistableChannelUniqueQueue) Shutdown() {
        log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
        q.lock.Lock()
-       defer q.lock.Unlock()
        select {
        case <-q.closed:
+               q.lock.Unlock()
+               return
        default:
                if q.internal != nil {
                        q.internal.(*LevelUniqueQueue).Shutdown()
                }
                close(q.closed)
+               q.lock.Unlock()
        }
+
+       log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
+       q.internal.(*LevelUniqueQueue).baseCtxCancel()
+       q.channelQueue.baseCtxCancel()
+       log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
+       q.channelQueue.Wait()
+       q.internal.(*LevelUniqueQueue).Wait()
+       // Redirect all remaining data in the chan to the internal channel
+       go func() {
+               log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+               for data := range q.channelQueue.dataChan {
+                       _ = q.internal.Push(data)
+               }
+               log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+       }()
+
        log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
 }
 
index 20a50cc1f235f50b4ae7ddb042f7cab594878bfc..7474c096655d3b596c552295a11c74fc8b2a4f31 100644 (file)
@@ -5,9 +5,8 @@
 package queue
 
 import (
-       "fmt"
+       "context"
 
-       "code.gitea.io/gitea/modules/graceful"
        "github.com/go-redis/redis/v8"
 )
 
@@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
                return nil, err
        }
 
-       byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
-
        queue := &RedisUniqueQueue{
                ByteFIFOUniqueQueue: byteFIFOQueue,
        }
@@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq
 }
 
 // PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
-       added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result()
+func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
+       added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
        if err != nil {
                return err
        }
@@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
                        return err
                }
        }
-       return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
+       return fifo.client.RPush(ctx, fifo.queueName, data).Err()
 }
 
 // Pop pops data from the start of the fifo
-func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
-       data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
+func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
+       data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
        if err != nil && err != redis.Nil {
                return data, err
        }
@@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
                return data, nil
        }
 
-       err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err()
+       err = fifo.client.SRem(ctx, fifo.setName, data).Err()
        return data, err
 }
 
 // Has returns whether the fifo contains this data
-func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
-       return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result()
+func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
+       return fifo.client.SIsMember(ctx, fifo.setName, data).Result()
 }
 
 func init() {
index 0f15ccac9efd70b06b9ab676f2d72dd76fe19b27..0176e2e0b2d20a467c2b5b2fe96afb76d1f476e7 100644 (file)
@@ -21,7 +21,7 @@ import (
 type WorkerPool struct {
        lock               sync.Mutex
        baseCtx            context.Context
-       cancel             context.CancelFunc
+       baseCtxCancel      context.CancelFunc
        cond               *sync.Cond
        qid                int64
        maxNumberOfWorkers int
@@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
        dataChan := make(chan Data, config.QueueLength)
        pool := &WorkerPool{
                baseCtx:            ctx,
-               cancel:             cancel,
+               baseCtxCancel:      cancel,
                batchLength:        config.BatchLength,
                dataChan:           dataChan,
                handle:             handle,
@@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) {
 }
 
 func (p *WorkerPool) zeroBoost() {
-       ctx, cancel := context.WithCancel(p.baseCtx)
+       ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
        mq := GetManager().GetManagedQueue(p.qid)
        boost := p.boostWorkers
        if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() {
 
                start := time.Now()
                pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
-               go func() {
-                       select {
-                       case <-ctx.Done():
-                       case <-time.After(p.boostTimeout):
-                       }
+               cancel = func() {
                        mq.RemoveWorkers(pid)
-                       cancel()
-               }()
+               }
        } else {
                log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
-               go func() {
-                       select {
-                       case <-ctx.Done():
-                       case <-time.After(p.boostTimeout):
-                       }
-                       cancel()
-               }()
        }
        p.lock.Unlock()
-       p.addWorkers(ctx, boost)
+       p.addWorkers(ctx, cancel, boost)
 }
 
 func (p *WorkerPool) pushBoost(data Data) {
@@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) {
                                return
                        }
                        p.blockTimeout *= 2
-                       ctx, cancel := context.WithCancel(p.baseCtx)
+                       boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx)
                        mq := GetManager().GetManagedQueue(p.qid)
                        boost := p.boostWorkers
                        if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) {
                                log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
 
                                start := time.Now()
-                               pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
+                               pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false)
                                go func() {
-                                       <-ctx.Done()
+                                       <-boostCtx.Done()
                                        mq.RemoveWorkers(pid)
-                                       cancel()
+                                       boostCtxCancel()
                                }()
                        } else {
                                log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
                        }
                        go func() {
                                <-time.After(p.boostTimeout)
-                               cancel()
+                               boostCtxCancel()
                                p.lock.Lock()
                                p.blockTimeout /= 2
                                p.lock.Unlock()
                        }()
                        p.lock.Unlock()
-                       p.addWorkers(ctx, boost)
+                       p.addWorkers(boostCtx, boostCtxCancel, boost)
                        p.dataChan <- data
                }
        }
@@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is
        mq := GetManager().GetManagedQueue(p.qid)
        if mq != nil {
                pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
-               go func() {
-                       <-ctx.Done()
-                       mq.RemoveWorkers(pid)
-                       cancel()
-               }()
                log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
-       } else {
-               log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
-
+               return ctx, func() {
+                       mq.RemoveWorkers(pid)
+               }
        }
+       log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
+
        return ctx, cancel
 }
 
 // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
 func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
        ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
-       p.addWorkers(ctx, number)
+       p.addWorkers(ctx, cancel, number)
        return cancel
 }
 
 // addWorkers adds workers to the pool
-func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
+func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) {
        for i := 0; i < number; i++ {
                p.lock.Lock()
                if p.cond == nil {
@@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
                        p.numberOfWorkers--
                        if p.numberOfWorkers == 0 {
                                p.cond.Broadcast()
+                               cancel()
                        } else if p.numberOfWorkers < 0 {
                                // numberOfWorkers can't go negative but...
                                log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
                                p.numberOfWorkers = 0
                                p.cond.Broadcast()
+                               cancel()
                        }
                        p.lock.Unlock()
                }()
index 33a230e5ab86b9fdd98eb83710312d9fc6c82990..f6614ea0ad27fadf83c99c6f460d11d649682fe9 100644 (file)
@@ -6,7 +6,6 @@
 package pull
 
 import (
-       "context"
        "strconv"
        "testing"
        "time"
@@ -54,9 +53,9 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
        assert.True(t, has)
        assert.NoError(t, err)
 
-       prQueue.Run(func(_ context.Context, shutdown func()) {
+       prQueue.Run(func(shutdown func()) {
                queueShutdown = append(queueShutdown, shutdown)
-       }, func(_ context.Context, terminate func()) {
+       }, func(terminate func()) {
                queueTerminate = append(queueTerminate, terminate)
        })