diff options
Diffstat (limited to 'modules/queue/queue_disk_channel.go')
-rw-r--r-- | modules/queue/queue_disk_channel.go | 126 |
1 files changed, 95 insertions, 31 deletions
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 6bb5a1be97..961187ab0d 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -6,7 +6,9 @@ package queue import ( "context" + "fmt" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -31,8 +33,10 @@ type PersistableChannelQueueConfiguration struct { } // PersistableChannelQueue wraps a channel queue and level queue together +// The disk level queue will be used to store data at shutdown and terminate - and will be restored +// on start up. type PersistableChannelQueue struct { - *ChannelQueue + channelQueue *ChannelQueue delayedStarter lock sync.Mutex closed chan struct{} @@ -48,14 +52,16 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( config := configInterface.(PersistableChannelQueueConfiguration) channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - Workers: config.Workers, - MaxWorkers: config.MaxWorkers, - BlockTimeout: config.BlockTimeout, - BoostTimeout: config.BoostTimeout, - BoostWorkers: config.BoostWorkers, - Name: config.Name + "-channel", + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: config.BlockTimeout, + BoostTimeout: config.BoostTimeout, + BoostWorkers: config.BoostWorkers, + MaxWorkers: config.MaxWorkers, + }, + Workers: config.Workers, + Name: config.Name + "-channel", }, exemplar) if err != nil { return nil, err @@ -63,28 +69,30 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( // the level backend only needs temporary workers to catch up with the previously dropped work levelCfg := LevelQueueConfiguration{ - DataDir: config.DataDir, - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - Workers: 1, - MaxWorkers: 6, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - Name: config.Name + "-level", + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 6, + }, + DataDir: config.DataDir, + Workers: 1, + Name: config.Name + "-level", } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) if err == nil { queue := &PersistableChannelQueue{ - ChannelQueue: channelQueue.(*ChannelQueue), + channelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ internal: levelQueue.(*LevelQueue), name: config.Name, }, closed: make(chan struct{}), } - _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil } if IsErrInvalidConfiguration(err) { @@ -93,7 +101,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } queue := &PersistableChannelQueue{ - ChannelQueue: channelQueue.(*ChannelQueue), + channelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ cfg: levelCfg, underlying: LevelQueueType, @@ -103,7 +111,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( }, closed: make(chan struct{}), } - _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil } @@ -118,15 +126,17 @@ func (p *PersistableChannelQueue) Push(data Data) error { case <-p.closed: return p.internal.Push(data) default: - return p.ChannelQueue.Push(data) + return p.channelQueue.Push(data) } } // Run starts to run the queue func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name) + p.lock.Lock() if p.internal == nil { - err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar) + err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar) p.lock.Unlock() if err != nil { log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err) @@ -142,31 +152,83 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) go func() { - _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0) + _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0) }() log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name) <-p.closed log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name) - p.ChannelQueue.pool.cancel() - p.internal.(*LevelQueue).pool.cancel() + p.channelQueue.cancel() + p.internal.(*LevelQueue).cancel() log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name) - p.ChannelQueue.pool.Wait() - p.internal.(*LevelQueue).pool.Wait() + p.channelQueue.Wait() + p.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel go func() { log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name) - for data := range p.ChannelQueue.pool.dataChan { + for data := range p.channelQueue.dataChan { _ = p.internal.Push(data) + atomic.AddInt64(&p.channelQueue.numInQueue, -1) } log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name) }() log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name) } +// Flush flushes the queue and blocks till the queue is empty +func (p *PersistableChannelQueue) Flush(timeout time.Duration) error { + var ctx context.Context + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + defer cancel() + return p.FlushWithContext(ctx) +} + +// FlushWithContext flushes the queue and blocks till the queue is empty +func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { + errChan := make(chan error, 1) + go func() { + errChan <- p.channelQueue.FlushWithContext(ctx) + }() + go func() { + p.lock.Lock() + if p.internal == nil { + p.lock.Unlock() + errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name()) + return + } + p.lock.Unlock() + errChan <- p.internal.FlushWithContext(ctx) + }() + err1 := <-errChan + err2 := <-errChan + + if err1 != nil { + return err1 + } + return err2 +} + +// IsEmpty checks if a queue is empty +func (p *PersistableChannelQueue) IsEmpty() bool { + if !p.channelQueue.IsEmpty() { + return false + } + p.lock.Lock() + defer p.lock.Unlock() + if p.internal == nil { + return false + } + return p.internal.IsEmpty() +} + // Shutdown processing this queue func (p *PersistableChannelQueue) Shutdown() { - log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name) select { case <-p.closed: default: @@ -177,6 +239,7 @@ func (p *PersistableChannelQueue) Shutdown() { } close(p.closed) } + log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) } // Terminate this queue and close the queue @@ -188,6 +251,7 @@ func (p *PersistableChannelQueue) Terminate() { if p.internal != nil { p.internal.(*LevelQueue).Terminate() } + log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name) } func init() { |