summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-05-15 15:22:26 +0100
committerGitHub <noreply@github.com>2021-05-15 16:22:26 +0200
commitba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch)
treeddd9ff13b0da7b272b5a60445a997319cb0de882 /modules
parent9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff)
downloadgitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz
gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules')
-rw-r--r--modules/graceful/context.go23
-rw-r--r--modules/graceful/manager.go154
-rw-r--r--modules/graceful/manager_unix.go26
-rw-r--r--modules/graceful/manager_windows.go28
-rw-r--r--modules/indexer/code/indexer.go8
-rw-r--r--modules/indexer/issues/indexer.go4
-rw-r--r--modules/queue/bytefifo.go18
-rw-r--r--modules/queue/manager.go6
-rw-r--r--modules/queue/queue.go6
-rw-r--r--modules/queue/queue_bytefifo.go217
-rw-r--r--modules/queue/queue_channel.go76
-rw-r--r--modules/queue/queue_channel_test.go5
-rw-r--r--modules/queue/queue_disk.go9
-rw-r--r--modules/queue/queue_disk_channel.go87
-rw-r--r--modules/queue/queue_disk_channel_test.go49
-rw-r--r--modules/queue/queue_disk_test.go9
-rw-r--r--modules/queue/queue_redis.go20
-rw-r--r--modules/queue/queue_wrapped.go8
-rw-r--r--modules/queue/unique_queue_channel.go77
-rw-r--r--modules/queue/unique_queue_disk.go11
-rw-r--r--modules/queue/unique_queue_disk_channel.go88
-rw-r--r--modules/queue/unique_queue_redis.go21
-rw-r--r--modules/queue/workerpool.go55
23 files changed, 596 insertions, 409 deletions
diff --git a/modules/graceful/context.go b/modules/graceful/context.go
index 1ad1109b4e..9d955329a4 100644
--- a/modules/graceful/context.go
+++ b/modules/graceful/context.go
@@ -6,17 +6,9 @@ package graceful
import (
"context"
- "fmt"
"time"
)
-// Errors for context.Err()
-var (
- ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown")
- ErrHammer = fmt.Errorf("Graceful Manager called Hammer")
- ErrTerminate = fmt.Errorf("Graceful Manager called Terminate")
-)
-
// ChannelContext is a context that wraps a channel and error as a context
type ChannelContext struct {
done <-chan struct{}
@@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} {
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
func (g *Manager) ShutdownContext() context.Context {
- return &ChannelContext{
- done: g.IsShutdown(),
- err: ErrShutdown,
- }
+ return g.shutdownCtx
}
// HammerContext returns a context.Context that is Done at hammer
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
func (g *Manager) HammerContext() context.Context {
- return &ChannelContext{
- done: g.IsHammer(),
- err: ErrHammer,
- }
+ return g.hammerCtx
}
// TerminateContext returns a context.Context that is Done at terminate
// Callers using this context should ensure that they are registered as a terminating server
// in order that they are waited for.
func (g *Manager) TerminateContext() context.Context {
- return &ChannelContext{
- done: g.IsTerminate(),
- err: ErrTerminate,
- }
+ return g.terminateCtx
}
diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go
index 903d05ed21..8c3b95c4aa 100644
--- a/modules/graceful/manager.go
+++ b/modules/graceful/manager.go
@@ -54,8 +54,8 @@ func InitManager(ctx context.Context) {
})
}
-// CallbackWithContext is combined runnable and context to watch to see if the caller has finished
-type CallbackWithContext func(ctx context.Context, callback func())
+// WithCallback is a runnable to call when the caller has finished
+type WithCallback func(callback func())
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
// After the callback to atShutdown is called and is complete, the main function must return.
@@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func())
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
// - users must therefore be careful to only call these as necessary.
// If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
-type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func()))
+type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
// After the callback to atShutdown is called and is complete, the main function must return.
@@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
g.doShutdown()
}
}()
- run(func(ctx context.Context, atShutdown func()) {
- go func() {
- select {
- case <-g.IsShutdown():
+ run(func(atShutdown func()) {
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtShutdown = append(g.toRunAtShutdown,
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
+ g.doShutdown()
+ }
+ }()
atShutdown()
- case <-ctx.Done():
- return
- }
- }()
- }, func(ctx context.Context, atTerminate func()) {
- g.RunAtTerminate(ctx, atTerminate)
+ })
+ }, func(atTerminate func()) {
+ g.RunAtTerminate(atTerminate)
})
}
@@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
// (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
// The callback function provided to atTerminate must return once termination is complete.
// Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
-type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext)
+type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback)
// RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks
// After the atShutdown channel is closed, the main function must return once shutdown is complete.
@@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) {
g.doShutdown()
}
}()
- run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) {
- g.RunAtTerminate(ctx, atTerminate)
+ run(g.IsShutdown(), func(atTerminate func()) {
+ g.RunAtTerminate(atTerminate)
})
}
@@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
}
// RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
-func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) {
+func (g *Manager) RunAtTerminate(terminate func()) {
g.terminateWaitGroup.Add(1)
- go func() {
- defer g.terminateWaitGroup.Done()
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
- }
- }()
- select {
- case <-g.IsTerminate():
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtTerminate = append(g.toRunAtTerminate,
+ func() {
+ defer g.terminateWaitGroup.Done()
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
+ }
+ }()
terminate()
- case <-ctx.Done():
- }
- }()
+ })
}
// RunAtShutdown creates a go-routine to run the provided function at shutdown
func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
- go func() {
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtShutdown = append(g.toRunAtShutdown,
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
+ }
+ }()
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ shutdown()
}
- }()
- select {
- case <-g.IsShutdown():
- shutdown()
- case <-ctx.Done():
- }
- }()
+ })
}
// RunAtHammer creates a go-routine to run the provided function at shutdown
-func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) {
- go func() {
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
- }
- }()
- select {
- case <-g.IsHammer():
+func (g *Manager) RunAtHammer(hammer func()) {
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtHammer = append(g.toRunAtHammer,
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
+ }
+ }()
hammer()
- case <-ctx.Done():
- }
- }()
+ })
}
func (g *Manager) doShutdown() {
if !g.setStateTransition(stateRunning, stateShuttingDown) {
return
}
g.lock.Lock()
- close(g.shutdown)
+ g.shutdownCtxCancel()
+ for _, fn := range g.toRunAtShutdown {
+ go fn()
+ }
g.lock.Unlock()
if setting.GracefulHammerTime >= 0 {
@@ -203,7 +212,7 @@ func (g *Manager) doShutdown() {
g.doTerminate()
g.WaitForTerminate()
g.lock.Lock()
- close(g.done)
+ g.doneCtxCancel()
g.lock.Unlock()
}()
}
@@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) {
time.Sleep(d)
g.lock.Lock()
select {
- case <-g.hammer:
+ case <-g.hammerCtx.Done():
default:
log.Warn("Setting Hammer condition")
- close(g.hammer)
+ g.hammerCtxCancel()
+ for _, fn := range g.toRunAtHammer {
+ go fn()
+ }
}
g.lock.Unlock()
}
@@ -226,10 +238,13 @@ func (g *Manager) doTerminate() {
}
g.lock.Lock()
select {
- case <-g.terminate:
+ case <-g.terminateCtx.Done():
default:
log.Warn("Terminating")
- close(g.terminate)
+ g.terminateCtxCancel()
+ for _, fn := range g.toRunAtTerminate {
+ go fn()
+ }
}
g.lock.Unlock()
}
@@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool {
// IsShutdown returns a channel which will be closed at shutdown.
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
func (g *Manager) IsShutdown() <-chan struct{} {
- return g.shutdown
+ return g.shutdownCtx.Done()
}
// IsHammer returns a channel which will be closed at hammer
@@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} {
// Servers running within the running server wait group should respond to IsHammer
// if not shutdown already
func (g *Manager) IsHammer() <-chan struct{} {
- return g.hammer
+ return g.hammerCtx.Done()
}
// IsTerminate returns a channel which will be closed at terminate
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// IsTerminate will only close once all running servers have stopped
func (g *Manager) IsTerminate() <-chan struct{} {
- return g.terminate
+ return g.terminateCtx.Done()
}
// ServerDone declares a running server done and subtracts one from the
@@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() {
// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
func (g *Manager) Done() <-chan struct{} {
- return g.done
+ return g.doneCtx.Done()
}
-// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate
+// Err allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Err() error {
- select {
- case <-g.Done():
- return ErrTerminate
- default:
- return nil
- }
+ return g.doneCtx.Err()
}
-// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values
+// Value allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Value(key interface{}) interface{} {
- return nil
+ return g.doneCtx.Value(key)
}
// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
func (g *Manager) Deadline() (deadline time.Time, ok bool) {
- return
+ return g.doneCtx.Deadline()
}
diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go
index 540974454c..20d9b3905c 100644
--- a/modules/graceful/manager_unix.go
+++ b/modules/graceful/manager_unix.go
@@ -25,13 +25,21 @@ type Manager struct {
forked bool
lock *sync.RWMutex
state state
- shutdown chan struct{}
- hammer chan struct{}
- terminate chan struct{}
- done chan struct{}
+ shutdownCtx context.Context
+ hammerCtx context.Context
+ terminateCtx context.Context
+ doneCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ hammerCtxCancel context.CancelFunc
+ terminateCtxCancel context.CancelFunc
+ doneCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
+
+ toRunAtShutdown []func()
+ toRunAtHammer []func()
+ toRunAtTerminate []func()
}
func newGracefulManager(ctx context.Context) *Manager {
@@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager {
}
func (g *Manager) start(ctx context.Context) {
- // Make channels
- g.terminate = make(chan struct{})
- g.shutdown = make(chan struct{})
- g.hammer = make(chan struct{})
- g.done = make(chan struct{})
+ // Make contexts
+ g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
+ g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx)
+ g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx)
+ g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx)
// Set the running state & handle signals
g.setState(stateRunning)
diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go
index 14923c2a9b..51f29778ba 100644
--- a/modules/graceful/manager_windows.go
+++ b/modules/graceful/manager_windows.go
@@ -36,14 +36,22 @@ type Manager struct {
isChild bool
lock *sync.RWMutex
state state
- shutdown chan struct{}
- hammer chan struct{}
- terminate chan struct{}
- done chan struct{}
+ shutdownCtx context.Context
+ hammerCtx context.Context
+ terminateCtx context.Context
+ doneCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ hammerCtxCancel context.CancelFunc
+ terminateCtxCancel context.CancelFunc
+ doneCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
shutdownRequested chan struct{}
+
+ toRunAtShutdown []func()
+ toRunAtHammer []func()
+ toRunAtTerminate []func()
}
func newGracefulManager(ctx context.Context) *Manager {
@@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager {
}
func (g *Manager) start() {
+ // Make contexts
+ g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx)
+ g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx)
+ g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx)
+ g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx)
+
// Make channels
- g.terminate = make(chan struct{})
- g.shutdown = make(chan struct{})
- g.hammer = make(chan struct{})
- g.done = make(chan struct{})
g.shutdownRequested = make(chan struct{})
// Set the running state
@@ -171,7 +181,7 @@ hammerLoop:
default:
log.Debug("Unexpected control request: %v", change.Cmd)
}
- case <-g.hammer:
+ case <-g.hammerCtx.Done():
break hammerLoop
}
}
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index a7d78e9fdc..67fa43eda8 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -115,7 +115,13 @@ func Init() {
ctx, cancel := context.WithCancel(context.Background())
- graceful.GetManager().RunAtTerminate(ctx, func() {
+ graceful.GetManager().RunAtTerminate(func() {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ cancel()
log.Debug("Closing repository indexer")
indexer.Close()
log.Info("PID: %d Repository Indexer closed", os.Getpid())
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 9edaef6bdd..676b6686ea 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -160,7 +160,7 @@ func InitIssueIndexer(syncReindex bool) {
}
populate = !exist
holder.set(issueIndexer)
- graceful.GetManager().RunAtTerminate(context.Background(), func() {
+ graceful.GetManager().RunAtTerminate(func() {
log.Debug("Closing issue indexer")
issueIndexer := holder.get()
if issueIndexer != nil {
@@ -170,7 +170,7 @@ func InitIssueIndexer(syncReindex bool) {
})
log.Debug("Created Bleve Indexer")
case "elasticsearch":
- graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+ graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
if err != nil {
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go
index 94478e6f05..3a10c8e125 100644
--- a/modules/queue/bytefifo.go
+++ b/modules/queue/bytefifo.go
@@ -4,14 +4,16 @@
package queue
+import "context"
+
// ByteFIFO defines a FIFO that takes a byte array
type ByteFIFO interface {
// Len returns the length of the fifo
- Len() int64
+ Len(ctx context.Context) int64
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
- PushFunc(data []byte, fn func() error) error
+ PushFunc(ctx context.Context, data []byte, fn func() error) error
// Pop pops data from the start of the fifo
- Pop() ([]byte, error)
+ Pop(ctx context.Context) ([]byte, error)
// Close this fifo
Close() error
}
@@ -20,7 +22,7 @@ type ByteFIFO interface {
type UniqueByteFIFO interface {
ByteFIFO
// Has returns whether the fifo contains this data
- Has(data []byte) (bool, error)
+ Has(ctx context.Context, data []byte) (bool, error)
}
var _ ByteFIFO = &DummyByteFIFO{}
@@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{}
type DummyByteFIFO struct{}
// PushFunc returns nil
-func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
return nil
}
// Pop returns nil
-func (*DummyByteFIFO) Pop() ([]byte, error) {
+func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) {
return []byte{}, nil
}
@@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error {
}
// Len is always 0
-func (*DummyByteFIFO) Len() int64 {
+func (*DummyByteFIFO) Len(ctx context.Context) int64 {
return 0
}
@@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct {
}
// Has always returns false
-func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
+func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return false, nil
}
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index c3ec735af5..a6d48575ab 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -187,14 +187,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name)
go func(q *ManagedQueue) {
- localCtx, localCancel := context.WithCancel(ctx)
- pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
+ localCtx, localCtxCancel := context.WithCancel(ctx)
+ pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
err := flushable.FlushWithContext(localCtx)
if err != nil && err != ctx.Err() {
cancel()
}
q.CancelWorkers(pid)
- localCancel()
+ localCtxCancel()
wg.Done()
}(mq)
} else {
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index d08cba35a1..7159048c11 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -57,7 +57,7 @@ type Named interface {
// Queues will handle their own contents in the Run method
type Queue interface {
Flushable
- Run(atShutdown, atTerminate func(context.Context, func()))
+ Run(atShutdown, atTerminate func(func()))
Push(Data) error
}
@@ -74,7 +74,7 @@ type DummyQueue struct {
}
// Run does nothing
-func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
+func (*DummyQueue) Run(_, _ func(func())) {}
// Push fakes a push of data to the queue
func (*DummyQueue) Push(Data) error {
@@ -122,7 +122,7 @@ type Immediate struct {
}
// Run does nothing
-func (*Immediate) Run(_, _ func(context.Context, func())) {}
+func (*Immediate) Run(_, _ func(func())) {}
// Push fakes a push of data to the queue
func (q *Immediate) Push(data Data) error {
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index fe1fb7807e..3ea61aad0e 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -17,8 +17,9 @@ import (
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
- Workers int
- Name string
+ Workers int
+ Name string
+ WaitOnEmpty bool
}
var _ Queue = &ByteFIFOQueue{}
@@ -26,14 +27,18 @@ var _ Queue = &ByteFIFOQueue{}
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
type ByteFIFOQueue struct {
*WorkerPool
- byteFIFO ByteFIFO
- typ Type
- closed chan struct{}
- terminated chan struct{}
- exemplar interface{}
- workers int
- name string
- lock sync.Mutex
+ byteFIFO ByteFIFO
+ typ Type
+ shutdownCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ terminateCtx context.Context
+ terminateCtxCancel context.CancelFunc
+ exemplar interface{}
+ workers int
+ name string
+ lock sync.Mutex
+ waitOnEmpty bool
+ pushed chan struct{}
}
// NewByteFIFOQueue creates a new ByteFIFOQueue
@@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
}
config := configInterface.(ByteFIFOQueueConfiguration)
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
return &ByteFIFOQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- byteFIFO: byteFIFO,
- typ: typ,
- closed: make(chan struct{}),
- terminated: make(chan struct{}),
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ byteFIFO: byteFIFO,
+ typ: typ,
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
+ waitOnEmpty: config.WaitOnEmpty,
+ pushed: make(chan struct{}, 1),
}, nil
}
@@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if err != nil {
return err
}
- return q.byteFIFO.PushFunc(bs, fn)
+ if q.waitOnEmpty {
+ defer func() {
+ select {
+ case q.pushed <- struct{}{}:
+ default:
+ }
+ }()
+ }
+ return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
}
// IsEmpty checks if the queue is empty
@@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
if !q.WorkerPool.IsEmpty() {
return false
}
- return q.byteFIFO.Len() == 0
+ return q.byteFIFO.Len(q.terminateCtx) == 0
}
// Run runs the bytefifo queue
-func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), q.Shutdown)
- atTerminate(context.Background(), q.Terminate)
+func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
- go func() {
- _ = q.AddWorkers(q.workers, 0)
- }()
+ _ = q.AddWorkers(q.workers, 0)
- go q.readToChan()
+ log.Trace("%s: %s Now running", q.typ, q.name)
+ q.readToChan()
- log.Trace("%s: %s Waiting til closed", q.typ, q.name)
- <-q.closed
+ <-q.shutdownCtx.Done()
log.Trace("%s: %s Waiting til done", q.typ, q.name)
q.Wait()
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
- ctx, cancel := context.WithCancel(context.Background())
- atTerminate(ctx, cancel)
- q.CleanUp(ctx)
- cancel()
+ q.CleanUp(q.terminateCtx)
+ q.terminateCtxCancel()
}
+const maxBackOffTime = time.Second * 3
+
func (q *ByteFIFOQueue) readToChan() {
// handle quick cancels
select {
- case <-q.closed:
+ case <-q.shutdownCtx.Done():
// tell the pool to shutdown.
- q.cancel()
+ q.baseCtxCancel()
return
default:
}
+ // Default backoff values
backOffTime := time.Millisecond * 100
- maxBackOffTime := time.Second * 3
- for {
- success, resetBackoff := q.doPop()
- if resetBackoff {
- backOffTime = 100 * time.Millisecond
- }
- if success {
+loop:
+ for {
+ err := q.doPop()
+ if err == errQueueEmpty {
+ log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
select {
- case <-q.closed:
- // tell the pool to shutdown.
- q.cancel()
+ case <-q.pushed:
+ // reset backOffTime
+ backOffTime = 100 * time.Millisecond
+ continue loop
+ case <-q.shutdownCtx.Done():
+ // Oops we've been shutdown whilst waiting
+ // Make sure the worker pool is shutdown too
+ q.baseCtxCancel()
return
- default:
}
- } else {
+ }
+
+ // Reset the backOffTime if there is no error or an unmarshalError
+ if err == nil || err == errUnmarshal {
+ backOffTime = 100 * time.Millisecond
+ }
+
+ if err != nil {
+ // Need to Backoff
select {
- case <-q.closed:
- // tell the pool to shutdown.
- q.cancel()
+ case <-q.shutdownCtx.Done():
+ // Oops we've been shutdown whilst backing off
+ // Make sure the worker pool is shutdown too
+ q.baseCtxCancel()
return
case <-time.After(backOffTime):
- }
- backOffTime += backOffTime / 2
- if backOffTime > maxBackOffTime {
- backOffTime = maxBackOffTime
+ // OK we've waited - so backoff a bit
+ backOffTime += backOffTime / 2
+ if backOffTime > maxBackOffTime {
+ backOffTime = maxBackOffTime
+ }
+ continue loop
}
}
+ select {
+ case <-q.shutdownCtx.Done():
+ // Oops we've been shutdown
+ // Make sure the worker pool is shutdown too
+ q.baseCtxCancel()
+ return
+ default:
+ continue loop
+ }
}
}
-func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
+var errQueueEmpty = fmt.Errorf("empty queue")
+var errEmptyBytes = fmt.Errorf("empty bytes")
+var errUnmarshal = fmt.Errorf("failed to unmarshal")
+
+func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock()
defer q.lock.Unlock()
- bs, err := q.byteFIFO.Pop()
+ bs, err := q.byteFIFO.Pop(q.shutdownCtx)
if err != nil {
+ if err == context.Canceled {
+ q.baseCtxCancel()
+ return err
+ }
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
- return
+ return err
}
if len(bs) == 0 {
- return
+ if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
+ return errQueueEmpty
+ }
+ return errEmptyBytes
}
- resetBackoff = true
-
data, err := unmarshalAs(bs, q.exemplar)
if err != nil {
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
- return
+ return errUnmarshal
}
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
q.WorkerPool.Push(data)
- success = true
- return
+ return nil
}
// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)
- q.lock.Lock()
select {
- case <-q.closed:
+ case <-q.shutdownCtx.Done():
+ return
default:
- close(q.closed)
}
- q.lock.Unlock()
+ q.shutdownCtxCancel()
log.Debug("%s: %s Shutdown", q.typ, q.name)
}
// IsShutdown returns a channel which is closed when this Queue is shutdown
func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
- return q.closed
+ return q.shutdownCtx.Done()
}
// Terminate this queue and close the queue
func (q *ByteFIFOQueue) Terminate() {
log.Trace("%s: %s Terminating", q.typ, q.name)
q.Shutdown()
- q.lock.Lock()
select {
- case <-q.terminated:
- q.lock.Unlock()
+ case <-q.terminateCtx.Done():
return
default:
}
- close(q.terminated)
- q.lock.Unlock()
if log.IsDebug() {
- log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
+ log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
}
+ q.terminateCtxCancel()
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
@@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() {
// IsTerminated returns a channel which is closed when this Queue is terminated
func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
- return q.terminated
+ return q.terminateCtx.Done()
}
var _ UniqueQueue = &ByteFIFOUniqueQueue{}
@@ -240,17 +285,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
return &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- byteFIFO: byteFIFO,
- typ: typ,
- closed: make(chan struct{}),
- terminated: make(chan struct{}),
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ byteFIFO: byteFIFO,
+ typ: typ,
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
},
}, nil
}
@@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if err != nil {
return false, err
}
- return q.byteFIFO.(UniqueByteFIFO).Has(bs)
+ return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index d7a11e79f5..4df64b69ee 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct {
// It is basically a very thin wrapper around a WorkerPool
type ChannelQueue struct {
*WorkerPool
- exemplar interface{}
- workers int
- name string
+ shutdownCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ terminateCtx context.Context
+ terminateCtxCancel context.CancelFunc
+ exemplar interface{}
+ workers int
+ name string
}
// NewChannelQueue creates a memory channel queue
@@ -42,28 +46,30 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
if config.BatchLength == 0 {
config.BatchLength = 1
}
+
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
queue := &ChannelQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
}
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}
// Run starts to run the queue
-func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), func() {
- log.Warn("ChannelQueue: %s is not shutdownable!", q.name)
- })
- atTerminate(context.Background(), func() {
- log.Warn("ChannelQueue: %s is not terminatable!", q.name)
- })
+func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
log.Debug("ChannelQueue: %s Starting", q.name)
- go func() {
- _ = q.AddWorkers(q.workers, 0)
- }()
+ _ = q.AddWorkers(q.workers, 0)
}
// Push will push data into the queue
@@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error {
return nil
}
+// Shutdown processing from this queue
+func (q *ChannelQueue) Shutdown() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ select {
+ case <-q.shutdownCtx.Done():
+ log.Trace("ChannelQueue: %s Already Shutting down", q.name)
+ return
+ default:
+ }
+ log.Trace("ChannelQueue: %s Shutting down", q.name)
+ go func() {
+ log.Trace("ChannelQueue: %s Flushing", q.name)
+ if err := q.FlushWithContext(q.terminateCtx); err != nil {
+ log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
+ return
+ }
+ log.Debug("ChannelQueue: %s Flushed", q.name)
+ }()
+ q.shutdownCtxCancel()
+ log.Debug("ChannelQueue: %s Shutdown", q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ChannelQueue) Terminate() {
+ log.Trace("ChannelQueue: %s Terminating", q.name)
+ q.Shutdown()
+ select {
+ case <-q.terminateCtx.Done():
+ return
+ default:
+ }
+ q.terminateCtxCancel()
+ log.Debug("ChannelQueue: %s Terminated", q.name)
+}
+
// Name returns the name of this queue
func (q *ChannelQueue) Name() string {
return q.name
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index bca81d50fd..e7abe5b50b 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -5,7 +5,6 @@
package queue
import (
- "context"
"testing"
"time"
@@ -21,7 +20,7 @@ func TestChannelQueue(t *testing.T) {
}
}
- nilFn := func(_ context.Context, _ func()) {}
+ nilFn := func(_ func()) {}
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
@@ -61,7 +60,7 @@ func TestChannelQueue_Batch(t *testing.T) {
}
}
- nilFn := func(_ context.Context, _ func()) {}
+ nilFn := func(_ func()) {}
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index 6c15a8e63b..911233a5d9 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "context"
+
"code.gitea.io/gitea/modules/nosql"
"gitea.com/lunny/levelqueue"
@@ -37,6 +39,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
if len(config.ConnectionString) == 0 {
config.ConnectionString = config.DataDir
}
+ config.WaitOnEmpty = true
byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
@@ -82,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro
}
// PushFunc will push data into the fifo
-func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
if fn != nil {
if err := fn(); err != nil {
return err
@@ -92,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
}
// Pop pops data from the start of the fifo
-func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
+func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.internal.RPop()
if err != nil && err != levelqueue.ErrNotFound {
return nil, err
@@ -108,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error {
}
// Len returns the length of the fifo
-func (fifo *LevelQueueByteFIFO) Len() int64 {
+func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 {
return fifo.internal.Len()
}
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 801fd8a122..c3a1c5781e 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error {
}
// Run starts to run the queue
-func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
+ _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
q.lock.Lock()
if q.internal == nil {
@@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
} else {
q.lock.Unlock()
}
- atShutdown(context.Background(), q.Shutdown)
- atTerminate(context.Background(), q.Terminate)
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
- // Just run the level queue - we shut it down later
- go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
-
- go func() {
- _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
- }()
+ if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
+ // Just run the level queue - we shut it down once it's flushed
+ go q.internal.Run(func(_ func()) {}, func(_ func()) {})
+ go func() {
+ for !q.IsEmpty() {
+ _ = q.internal.Flush(0)
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-q.internal.(*LevelQueue).shutdownCtx.Done():
+ log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
+ return
+ }
+ }
+ log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
+ q.internal.(*LevelQueue).Shutdown()
+ GetManager().Remove(q.internal.(*LevelQueue).qid)
+ }()
+ } else {
+ log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
+ q.internal.(*LevelQueue).Shutdown()
+ GetManager().Remove(q.internal.(*LevelQueue).qid)
+ }
- log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
- <-q.closed
- log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
- q.channelQueue.cancel()
- q.internal.(*LevelQueue).cancel()
- log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
- q.channelQueue.Wait()
- q.internal.(*LevelQueue).Wait()
- // Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.channelQueue.dataChan {
- _ = q.internal.Push(data)
- atomic.AddInt64(&q.channelQueue.numInQueue, -1)
- }
- log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
- log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
}
// Flush flushes the queue and blocks till the queue is empty
@@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool {
func (q *PersistableChannelQueue) Shutdown() {
log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
q.lock.Lock()
- defer q.lock.Unlock()
+
select {
case <-q.closed:
+ q.lock.Unlock()
+ return
default:
- if q.internal != nil {
- q.internal.(*LevelQueue).Shutdown()
- }
- close(q.closed)
- log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
+ q.channelQueue.Shutdown()
+ if q.internal != nil {
+ q.internal.(*LevelQueue).Shutdown()
+ }
+ close(q.closed)
+ q.lock.Unlock()
+
+ log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
+ q.channelQueue.baseCtxCancel()
+ q.internal.(*LevelQueue).baseCtxCancel()
+ log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
+ q.channelQueue.Wait()
+ q.internal.(*LevelQueue).Wait()
+ // Redirect all remaining data in the chan to the internal channel
+ go func() {
+ log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+ }
+ log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+ }()
+
+ log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
// Terminate this queue and close the queue
@@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() {
q.Shutdown()
q.lock.Lock()
defer q.lock.Unlock()
+ q.channelQueue.Terminate()
if q.internal != nil {
q.internal.(*LevelQueue).Terminate()
}
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index 93061bffc6..561f98ca90 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -5,10 +5,8 @@
package queue
import (
- "context"
"io/ioutil"
"testing"
- "time"
"code.gitea.io/gitea/modules/util"
"github.com/stretchr/testify/assert"
@@ -32,17 +30,19 @@ func TestPersistableChannelQueue(t *testing.T) {
defer util.RemoveAll(tmpDir)
queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- QueueLength: 20,
- Workers: 1,
- MaxWorkers: 10,
+ DataDir: tmpDir,
+ BatchLength: 2,
+ QueueLength: 20,
+ Workers: 1,
+ BoostWorkers: 0,
+ MaxWorkers: 10,
+ Name: "first",
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(_ context.Context, shutdown func()) {
+ go queue.Run(func(shutdown func()) {
queueShutdown = append(queueShutdown, shutdown)
- }, func(_ context.Context, terminate func()) {
+ }, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate)
})
@@ -64,13 +64,18 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
+ // test1 is a testData not a *testData so will be rejected
err = queue.Push(test1)
assert.Error(t, err)
+ // Now shutdown the queue
for _, callback := range queueShutdown {
callback()
}
- time.Sleep(200 * time.Millisecond)
+
+ // Wait til it is closed
+ <-queue.(*PersistableChannelQueue).closed
+
err = queue.Push(&test1)
assert.NoError(t, err)
err = queue.Push(&test2)
@@ -80,23 +85,33 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.Fail(t, "Handler processing should have stopped")
default:
}
+
+ // terminate the queue
for _, callback := range queueTerminate {
callback()
}
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ default:
+ }
+
// Reopen queue
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- QueueLength: 20,
- Workers: 1,
- MaxWorkers: 10,
+ DataDir: tmpDir,
+ BatchLength: 2,
+ QueueLength: 20,
+ Workers: 1,
+ BoostWorkers: 0,
+ MaxWorkers: 10,
+ Name: "second",
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(_ context.Context, shutdown func()) {
+ go queue.Run(func(shutdown func()) {
queueShutdown = append(queueShutdown, shutdown)
- }, func(_ context.Context, terminate func()) {
+ }, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate)
})
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
index edaed49a52..1f884d4f8d 100644
--- a/modules/queue/queue_disk_test.go
+++ b/modules/queue/queue_disk_test.go
@@ -5,7 +5,6 @@
package queue
import (
- "context"
"io/ioutil"
"sync"
"testing"
@@ -49,11 +48,11 @@ func TestLevelQueue(t *testing.T) {
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(_ context.Context, shutdown func()) {
+ go queue.Run(func(shutdown func()) {
lock.Lock()
queueShutdown = append(queueShutdown, shutdown)
lock.Unlock()
- }, func(_ context.Context, terminate func()) {
+ }, func(terminate func()) {
lock.Lock()
queueTerminate = append(queueTerminate, terminate)
lock.Unlock()
@@ -123,11 +122,11 @@ func TestLevelQueue(t *testing.T) {
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(_ context.Context, shutdown func()) {
+ go queue.Run(func(shutdown func()) {
lock.Lock()
queueShutdown = append(queueShutdown, shutdown)
lock.Unlock()
- }, func(_ context.Context, terminate func()) {
+ }, func(terminate func()) {
lock.Lock()
queueTerminate = append(queueTerminate, terminate)
lock.Unlock()
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index af2cc30335..a5fb866dc1 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -6,7 +6,6 @@ package queue
import (
"context"
- "fmt"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
@@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, err
}
- byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
-
queue := &RedisQueue{
ByteFIFOQueue: byteFIFOQueue,
}
@@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{}
// RedisByteFIFO represents a ByteFIFO formed from a redisClient
type RedisByteFIFO struct {
- ctx context.Context
- client redisClient
+ client redisClient
+
queueName string
}
@@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
fifo := &RedisByteFIFO{
queueName: config.QueueName,
}
- fifo.ctx = graceful.GetManager().TerminateContext()
fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil {
return nil, err
@@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
}
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
if fn != nil {
if err := fn(); err != nil {
return err
}
}
- return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
+ return fifo.client.RPush(ctx, fifo.queueName, data).Err()
}
// Pop pops data from the start of the fifo
-func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
- data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
+func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
+ data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
if err == nil || err == redis.Nil {
return data, nil
}
@@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error {
}
// Len returns the length of the fifo
-func (fifo *RedisByteFIFO) Len() int64 {
- val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result()
+func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 {
+ val, err := fifo.client.LLen(ctx, fifo.queueName).Result()
if err != nil {
log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
return -1
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
index 88d64e8246..ec30ab0281 100644
--- a/modules/queue/queue_wrapped.go
+++ b/modules/queue/queue_wrapped.go
@@ -38,7 +38,7 @@ type delayedStarter struct {
}
// setInternal must be called with the lock locked.
-func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error {
+func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error {
var ctx context.Context
var cancel context.CancelFunc
if q.timeout > 0 {
@@ -49,9 +49,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
defer cancel()
// Ensure we also stop at shutdown
- atShutdown(ctx, func() {
- cancel()
- })
+ atShutdown(cancel)
i := 1
for q.internal == nil {
@@ -221,7 +219,7 @@ func (q *WrappedQueue) IsEmpty() bool {
}
// Run starts to run the queue and attempts to create the internal queue
-func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("WrappedQueue: %s Starting", q.name)
q.lock.Lock()
if q.internal == nil {
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index dec1cfc5c0..5bec67c4d3 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -28,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
// only guaranteed whilst the task is waiting in the queue.
type ChannelUniqueQueue struct {
*WorkerPool
- lock sync.Mutex
- table map[Data]bool
- exemplar interface{}
- workers int
- name string
+ lock sync.Mutex
+ table map[Data]bool
+ shutdownCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ terminateCtx context.Context
+ terminateCtxCancel context.CancelFunc
+ exemplar interface{}
+ workers int
+ name string
}
// NewChannelUniqueQueue create a memory channel queue
@@ -45,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
if config.BatchLength == 0 {
config.BatchLength = 1
}
+
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
queue := &ChannelUniqueQueue{
- table: map[Data]bool{},
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ table: map[Data]bool{},
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
@@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
// Run starts to run the queue
-func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), func() {
- log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
- })
- atTerminate(context.Background(), func() {
- log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
- })
+func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
- go func() {
- _ = q.AddWorkers(q.workers, 0)
- }()
+ _ = q.AddWorkers(q.workers, 0)
}
// Push will push data into the queue if the data is not already in the queue
@@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
return has, nil
}
+// Shutdown processing from this queue
+func (q *ChannelUniqueQueue) Shutdown() {
+ log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
+ select {
+ case <-q.shutdownCtx.Done():
+ return
+ default:
+ }
+ go func() {
+ log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
+ if err := q.FlushWithContext(q.terminateCtx); err != nil {
+ log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
+ return
+ }
+ log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
+ }()
+ q.shutdownCtxCancel()
+ log.Debug("ChannelUniqueQueue: %s Shutdown", q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ChannelUniqueQueue) Terminate() {
+ log.Trace("ChannelUniqueQueue: %s Terminating", q.name)
+ q.Shutdown()
+ select {
+ case <-q.terminateCtx.Done():
+ return
+ default:
+ }
+ q.terminateCtxCancel()
+ log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
+}
+
// Name returns the name of this queue
func (q *ChannelUniqueQueue) Name() string {
return q.name
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
index 8ec8848bc4..bb0eb7d950 100644
--- a/modules/queue/unique_queue_disk.go
+++ b/modules/queue/unique_queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "context"
+
"code.gitea.io/gitea/modules/nosql"
"gitea.com/lunny/levelqueue"
@@ -41,6 +43,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
if len(config.ConnectionString) == 0 {
config.ConnectionString = config.DataDir
}
+ config.WaitOnEmpty = true
byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
@@ -86,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy
}
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
return fifo.internal.LPushFunc(data, fn)
}
// Pop pops data from the start of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
+func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.internal.RPop()
if err != nil && err != levelqueue.ErrNotFound {
return nil, err
@@ -100,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
}
// Len returns the length of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
+func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 {
return fifo.internal.Len()
}
// Has returns whether the fifo contains this data
-func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
+func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return fifo.internal.Has(data)
}
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 47c4f2bdd5..65a3941519 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -36,7 +36,7 @@ type PersistableChannelUniqueQueueConfiguration struct {
// task cannot be processed twice or more at the same time. Uniqueness is
// only guaranteed whilst the task is waiting in the queue.
type PersistableChannelUniqueQueue struct {
- *ChannelUniqueQueue
+ channelQueue *ChannelUniqueQueue
delayedStarter
lock sync.Mutex
closed chan struct{}
@@ -85,8 +85,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
}
queue := &PersistableChannelUniqueQueue{
- ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue),
- closed: make(chan struct{}),
+ channelQueue: channelUniqueQueue.(*ChannelUniqueQueue),
+ closed: make(chan struct{}),
}
levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
@@ -138,14 +138,14 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err
case <-q.closed:
return q.internal.(UniqueQueue).PushFunc(data, fn)
default:
- return q.ChannelUniqueQueue.PushFunc(data, fn)
+ return q.channelQueue.PushFunc(data, fn)
}
}
// Has will test if the queue has the data
func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
// This is more difficult...
- has, err := q.ChannelUniqueQueue.Has(data)
+ has, err := q.channelQueue.Has(data)
if err != nil || has {
return has, err
}
@@ -158,7 +158,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
}
// Run starts to run the queue
-func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
q.lock.Lock()
@@ -170,7 +170,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context
log.Error("Unable push to channelled queue: %v", err)
}
}
- }, q.exemplar)
+ }, q.channelQueue.exemplar)
q.lock.Unlock()
if err != nil {
log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
@@ -179,53 +179,73 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context
} else {
q.lock.Unlock()
}
- atShutdown(context.Background(), q.Shutdown)
- atTerminate(context.Background(), q.Terminate)
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
+ _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
- // Just run the level queue - we shut it down later
- go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
-
- go func() {
- _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0)
- }()
+ if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
+ // Just run the level queue - we shut it down once it's flushed
+ go q.internal.Run(func(_ func()) {}, func(_ func()) {})
+ go func() {
+ _ = q.internal.Flush(0)
+ log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
+ q.internal.(*LevelUniqueQueue).Shutdown()
+ GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
+ }()
+ } else {
+ log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
+ q.internal.(*LevelUniqueQueue).Shutdown()
+ GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
+ }
- log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name)
- <-q.closed
- log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
- q.internal.(*LevelUniqueQueue).cancel()
- q.ChannelUniqueQueue.cancel()
- log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
- q.ChannelUniqueQueue.Wait()
- q.internal.(*LevelUniqueQueue).Wait()
- // Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.ChannelUniqueQueue.dataChan {
- _ = q.internal.Push(data)
- }
- log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
- log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
}
// Flush flushes the queue
func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
- return q.ChannelUniqueQueue.Flush(timeout)
+ return q.channelQueue.Flush(timeout)
+}
+
+// FlushWithContext flushes the queue
+func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
+ return q.channelQueue.FlushWithContext(ctx)
+}
+
+// IsEmpty checks if a queue is empty
+func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
+ return q.channelQueue.IsEmpty()
}
// Shutdown processing this queue
func (q *PersistableChannelUniqueQueue) Shutdown() {
log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
q.lock.Lock()
- defer q.lock.Unlock()
select {
case <-q.closed:
+ q.lock.Unlock()
+ return
default:
if q.internal != nil {
q.internal.(*LevelUniqueQueue).Shutdown()
}
close(q.closed)
+ q.lock.Unlock()
}
+
+ log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
+ q.internal.(*LevelUniqueQueue).baseCtxCancel()
+ q.channelQueue.baseCtxCancel()
+ log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
+ q.channelQueue.Wait()
+ q.internal.(*LevelUniqueQueue).Wait()
+ // Redirect all remaining data in the chan to the internal channel
+ go func() {
+ log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ }
+ log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+ }()
+
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
}
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
index 20a50cc1f2..7474c09665 100644
--- a/modules/queue/unique_queue_redis.go
+++ b/modules/queue/unique_queue_redis.go
@@ -5,9 +5,8 @@
package queue
import (
- "fmt"
+ "context"
- "code.gitea.io/gitea/modules/graceful"
"github.com/go-redis/redis/v8"
)
@@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
return nil, err
}
- byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
-
queue := &RedisUniqueQueue{
ByteFIFOUniqueQueue: byteFIFOQueue,
}
@@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq
}
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
- added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result()
+func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
+ added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
if err != nil {
return err
}
@@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
return err
}
}
- return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
+ return fifo.client.RPush(ctx, fifo.queueName, data).Err()
}
// Pop pops data from the start of the fifo
-func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
- data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
+func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
+ data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
if err != nil && err != redis.Nil {
return data, err
}
@@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
return data, nil
}
- err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err()
+ err = fifo.client.SRem(ctx, fifo.setName, data).Err()
return data, err
}
// Has returns whether the fifo contains this data
-func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
- return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result()
+func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
+ return fifo.client.SIsMember(ctx, fifo.setName, data).Result()
}
func init() {
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 0f15ccac9e..0176e2e0b2 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -21,7 +21,7 @@ import (
type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
- cancel context.CancelFunc
+ baseCtxCancel context.CancelFunc
cond *sync.Cond
qid int64
maxNumberOfWorkers int
@@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{
baseCtx: ctx,
- cancel: cancel,
+ baseCtxCancel: cancel,
batchLength: config.BatchLength,
dataChan: dataChan,
handle: handle,
@@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) {
}
func (p *WorkerPool) zeroBoost() {
- ctx, cancel := context.WithCancel(p.baseCtx)
+ ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() {
start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
- go func() {
- select {
- case <-ctx.Done():
- case <-time.After(p.boostTimeout):
- }
+ cancel = func() {
mq.RemoveWorkers(pid)
- cancel()
- }()
+ }
} else {
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
- go func() {
- select {
- case <-ctx.Done():
- case <-time.After(p.boostTimeout):
- }
- cancel()
- }()
}
p.lock.Unlock()
- p.addWorkers(ctx, boost)
+ p.addWorkers(ctx, cancel, boost)
}
func (p *WorkerPool) pushBoost(data Data) {
@@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) {
return
}
p.blockTimeout *= 2
- ctx, cancel := context.WithCancel(p.baseCtx)
+ boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx)
mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now()
- pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
+ pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false)
go func() {
- <-ctx.Done()
+ <-boostCtx.Done()
mq.RemoveWorkers(pid)
- cancel()
+ boostCtxCancel()
}()
} else {
log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
}
go func() {
<-time.After(p.boostTimeout)
- cancel()
+ boostCtxCancel()
p.lock.Lock()
p.blockTimeout /= 2
p.lock.Unlock()
}()
p.lock.Unlock()
- p.addWorkers(ctx, boost)
+ p.addWorkers(boostCtx, boostCtxCancel, boost)
p.dataChan <- data
}
}
@@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is
mq := GetManager().GetManagedQueue(p.qid)
if mq != nil {
pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
- go func() {
- <-ctx.Done()
- mq.RemoveWorkers(pid)
- cancel()
- }()
log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
- } else {
- log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
-
+ return ctx, func() {
+ mq.RemoveWorkers(pid)
+ }
}
+ log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
+
return ctx, cancel
}
// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
- p.addWorkers(ctx, number)
+ p.addWorkers(ctx, cancel, number)
return cancel
}
// addWorkers adds workers to the pool
-func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
+func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) {
for i := 0; i < number; i++ {
p.lock.Lock()
if p.cond == nil {
@@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
p.numberOfWorkers--
if p.numberOfWorkers == 0 {
p.cond.Broadcast()
+ cancel()
} else if p.numberOfWorkers < 0 {
// numberOfWorkers can't go negative but...
log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
p.numberOfWorkers = 0
p.cond.Broadcast()
+ cancel()
}
p.lock.Unlock()
}()