diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /modules/queue | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/bytefifo.go | 61 | ||||
-rw-r--r-- | modules/queue/queue.go | 20 | ||||
-rw-r--r-- | modules/queue/queue_bytefifo.go | 227 | ||||
-rw-r--r-- | modules/queue/queue_channel.go | 22 | ||||
-rw-r--r-- | modules/queue/queue_disk.go | 178 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel.go | 150 | ||||
-rw-r--r-- | modules/queue/queue_disk_test.go | 36 | ||||
-rw-r--r-- | modules/queue/queue_redis.go | 238 | ||||
-rw-r--r-- | modules/queue/setting.go | 40 | ||||
-rw-r--r-- | modules/queue/unique_queue.go | 29 | ||||
-rw-r--r-- | modules/queue/unique_queue_channel.go | 132 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk.go | 104 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk_channel.go | 241 | ||||
-rw-r--r-- | modules/queue/unique_queue_redis.go | 124 | ||||
-rw-r--r-- | modules/queue/unique_queue_wrapped.go | 172 |
15 files changed, 1371 insertions, 403 deletions
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go new file mode 100644 index 0000000000..2cd0ba0b95 --- /dev/null +++ b/modules/queue/bytefifo.go @@ -0,0 +1,61 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +// ByteFIFO defines a FIFO that takes a byte array +type ByteFIFO interface { + // Len returns the length of the fifo + Len() int64 + // PushFunc pushes data to the end of the fifo and calls the callback if it is added + PushFunc(data []byte, fn func() error) error + // Pop pops data from the start of the fifo + Pop() ([]byte, error) + // Close this fifo + Close() error +} + +// UniqueByteFIFO defines a FIFO that Uniques its contents +type UniqueByteFIFO interface { + ByteFIFO + // Has returns whether the fifo contains this data + Has(data []byte) (bool, error) +} + +var _ (ByteFIFO) = &DummyByteFIFO{} + +// DummyByteFIFO represents a dummy fifo +type DummyByteFIFO struct{} + +// PushFunc returns nil +func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { + return nil +} + +// Pop returns nil +func (*DummyByteFIFO) Pop() ([]byte, error) { + return []byte{}, nil +} + +// Close returns nil +func (*DummyByteFIFO) Close() error { + return nil +} + +// Len is always 0 +func (*DummyByteFIFO) Len() int64 { + return 0 +} + +var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{} + +// DummyUniqueByteFIFO represents a dummy unique fifo +type DummyUniqueByteFIFO struct { + DummyByteFIFO +} + +// Has always returns false +func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { + return false, nil +} diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 094699d4af..e3c63310be 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -74,25 +74,35 @@ type DummyQueue struct { } // Run does nothing -func (b *DummyQueue) Run(_, _ func(context.Context, func())) {} +func (*DummyQueue) Run(_, _ func(context.Context, func())) {} // Push fakes a push of data to the queue -func (b *DummyQueue) Push(Data) error { +func (*DummyQueue) Push(Data) error { return nil } +// PushFunc fakes a push of data to the queue with a function. The function is never run. +func (*DummyQueue) PushFunc(Data, func() error) error { + return nil +} + +// Has always returns false as this queue never does anything +func (*DummyQueue) Has(Data) (bool, error) { + return false, nil +} + // Flush always returns nil -func (b *DummyQueue) Flush(time.Duration) error { +func (*DummyQueue) Flush(time.Duration) error { return nil } // FlushWithContext always returns nil -func (b *DummyQueue) FlushWithContext(context.Context) error { +func (*DummyQueue) FlushWithContext(context.Context) error { return nil } // IsEmpty asserts that the queue is empty -func (b *DummyQueue) IsEmpty() bool { +func (*DummyQueue) IsEmpty() bool { return true } diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go new file mode 100644 index 0000000000..cad258bda8 --- /dev/null +++ b/modules/queue/queue_bytefifo.go @@ -0,0 +1,227 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue +type ByteFIFOQueueConfiguration struct { + WorkerPoolConfiguration + Workers int + Name string +} + +var _ (Queue) = &ByteFIFOQueue{} + +// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool +type ByteFIFOQueue struct { + *WorkerPool + byteFIFO ByteFIFO + typ Type + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int + name string + lock sync.Mutex +} + +// NewByteFIFOQueue creates a new ByteFIFOQueue +func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) { + configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ByteFIFOQueueConfiguration) + + return &ByteFIFOQueue{ + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + }, nil +} + +// Name returns the name of this queue +func (q *ByteFIFOQueue) Name() string { + return q.name +} + +// Push pushes data to the fifo +func (q *ByteFIFOQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc pushes data to the fifo +func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return q.byteFIFO.PushFunc(bs, fn) +} + +// IsEmpty checks if the queue is empty +func (q *ByteFIFOQueue) IsEmpty() bool { + q.lock.Lock() + defer q.lock.Unlock() + if !q.WorkerPool.IsEmpty() { + return false + } + return q.byteFIFO.Len() == 0 +} + +// Run runs the bytefifo queue +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) + log.Debug("%s: %s Starting", q.typ, q.name) + + go func() { + _ = q.AddWorkers(q.workers, 0) + }() + + go q.readToChan() + + log.Trace("%s: %s Waiting til closed", q.typ, q.name) + <-q.closed + log.Trace("%s: %s Waiting til done", q.typ, q.name) + q.Wait() + + log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + q.CleanUp(ctx) + cancel() +} + +func (q *ByteFIFOQueue) readToChan() { + for { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: + q.lock.Lock() + bs, err := q.byteFIFO.Pop() + if err != nil { + q.lock.Unlock() + log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) + time.Sleep(time.Millisecond * 100) + continue + } + + if len(bs) == 0 { + q.lock.Unlock() + time.Sleep(time.Millisecond * 100) + continue + } + + data, err := unmarshalAs(bs, q.exemplar) + if err != nil { + log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) + q.lock.Unlock() + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) + q.WorkerPool.Push(data) + q.lock.Unlock() + } + } +} + +// Shutdown processing from this queue +func (q *ByteFIFOQueue) Shutdown() { + log.Trace("%s: %s Shutting down", q.typ, q.name) + q.lock.Lock() + select { + case <-q.closed: + default: + close(q.closed) + } + q.lock.Unlock() + log.Debug("%s: %s Shutdown", q.typ, q.name) +} + +// Terminate this queue and close the queue +func (q *ByteFIFOQueue) Terminate() { + log.Trace("%s: %s Terminating", q.typ, q.name) + q.Shutdown() + q.lock.Lock() + select { + case <-q.terminated: + q.lock.Unlock() + return + default: + } + close(q.terminated) + q.lock.Unlock() + if log.IsDebug() { + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + } + if err := q.byteFIFO.Close(); err != nil { + log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) + } + log.Debug("%s: %s Terminated", q.typ, q.name) +} + +var _ (UniqueQueue) = &ByteFIFOUniqueQueue{} + +// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo +type ByteFIFOUniqueQueue struct { + ByteFIFOQueue +} + +// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue +func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) { + configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ByteFIFOQueueConfiguration) + + return &ByteFIFOUniqueQueue{ + ByteFIFOQueue: ByteFIFOQueue{ + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + }, + }, nil +} + +// Has checks if the provided data is in the queue +func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { + if !assignableTo(data, q.exemplar) { + return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + return q.byteFIFO.(UniqueByteFIFO).Has(bs) +} diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 45df8a443e..d7a11e79f5 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -53,31 +53,31 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro } // Run starts to run the queue -func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), func() { - log.Warn("ChannelQueue: %s is not shutdownable!", c.name) + log.Warn("ChannelQueue: %s is not shutdownable!", q.name) }) atTerminate(context.Background(), func() { - log.Warn("ChannelQueue: %s is not terminatable!", c.name) + log.Warn("ChannelQueue: %s is not terminatable!", q.name) }) - log.Debug("ChannelQueue: %s Starting", c.name) + log.Debug("ChannelQueue: %s Starting", q.name) go func() { - _ = c.AddWorkers(c.workers, 0) + _ = q.AddWorkers(q.workers, 0) }() } // Push will push data into the queue -func (c *ChannelQueue) Push(data Data) error { - if !assignableTo(data, c.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) +func (q *ChannelQueue) Push(data Data) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } - c.WorkerPool.Push(data) + q.WorkerPool.Push(data) return nil } // Name returns the name of this queue -func (c *ChannelQueue) Name() string { - return c.name +func (q *ChannelQueue) Name() string { + return q.name } func init() { diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index ca3e230e3d..ff0876488b 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,15 +5,6 @@ package queue import ( - "context" - "encoding/json" - "fmt" - "sync" - "sync/atomic" - "time" - - "code.gitea.io/gitea/modules/log" - "gitea.com/lunny/levelqueue" ) @@ -22,22 +13,13 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { - WorkerPoolConfiguration + ByteFIFOQueueConfiguration DataDir string - Workers int - Name string } // LevelQueue implements a disk library queue type LevelQueue struct { - *WorkerPool - queue *levelqueue.Queue - closed chan struct{} - terminated chan struct{} - lock sync.Mutex - exemplar interface{} - workers int - name string + *ByteFIFOQueue } // NewLevelQueue creates a ledis local queue @@ -48,149 +30,69 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - internal, err := levelqueue.Open(config.DataDir) + byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) if err != nil { return nil, err } queue := &LevelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - queue: internal, - exemplar: exemplar, - closed: make(chan struct{}), - terminated: make(chan struct{}), - workers: config.Workers, - name: config.Name, + ByteFIFOQueue: byteFIFOQueue, } queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar) return queue, nil } -// Run starts to run the queue -func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), l.Shutdown) - atTerminate(context.Background(), l.Terminate) - log.Debug("LevelQueue: %s Starting", l.name) - - go func() { - _ = l.AddWorkers(l.workers, 0) - }() - - go l.readToChan() - - log.Trace("LevelQueue: %s Waiting til closed", l.name) - <-l.closed - - log.Trace("LevelQueue: %s Waiting til done", l.name) - l.Wait() - - log.Trace("LevelQueue: %s Waiting til cleaned", l.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - l.CleanUp(ctx) - cancel() - log.Trace("LevelQueue: %s Cleaned", l.name) - -} +var _ (ByteFIFO) = &LevelQueueByteFIFO{} -func (l *LevelQueue) readToChan() { - for { - select { - case <-l.closed: - // tell the pool to shutdown. - l.cancel() - return - default: - atomic.AddInt64(&l.numInQueue, 1) - bs, err := l.queue.RPop() - if err != nil { - if err != levelqueue.ErrNotFound { - log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) - } - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - data, err := unmarshalAs(bs, l.exemplar) - if err != nil { - log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("LevelQueue %s: Task found: %#v", l.name, data) - l.WorkerPool.Push(data) - atomic.AddInt64(&l.numInQueue, -1) - } - } +// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue +type LevelQueueByteFIFO struct { + internal *levelqueue.Queue } -// Push will push the indexer data to queue -func (l *LevelQueue) Push(data Data) error { - if !assignableTo(data, l.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) - } - bs, err := json.Marshal(data) +// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue +func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) { + internal, err := levelqueue.Open(dataDir) if err != nil { - return err + return nil, err } - return l.queue.LPush(bs) + + return &LevelQueueByteFIFO{ + internal: internal, + }, nil } -// IsEmpty checks whether the queue is empty -func (l *LevelQueue) IsEmpty() bool { - if !l.WorkerPool.IsEmpty() { - return false +// PushFunc will push data into the fifo +func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { + if fn != nil { + if err := fn(); err != nil { + return err + } } - return l.queue.Len() == 0 + return fifo.internal.LPush(data) } -// Shutdown this queue and stop processing -func (l *LevelQueue) Shutdown() { - l.lock.Lock() - defer l.lock.Unlock() - log.Trace("LevelQueue: %s Shutting down", l.name) - select { - case <-l.closed: - default: - close(l.closed) +// Pop pops data from the start of the fifo +func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.internal.RPop() + if err != nil && err != levelqueue.ErrNotFound { + return nil, err } - log.Debug("LevelQueue: %s Shutdown", l.name) + return data, nil } -// Terminate this queue and close the queue -func (l *LevelQueue) Terminate() { - log.Trace("LevelQueue: %s Terminating", l.name) - l.Shutdown() - l.lock.Lock() - select { - case <-l.terminated: - l.lock.Unlock() - default: - close(l.terminated) - l.lock.Unlock() - if log.IsDebug() { - log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len()) - } - if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { - log.Error("Error whilst closing internal queue in %s: %v", l.name, err) - } - - } - log.Debug("LevelQueue: %s Terminated", l.name) +// Close this fifo +func (fifo *LevelQueueByteFIFO) Close() error { + return fifo.internal.Close() } -// Name returns the name of this queue -func (l *LevelQueue) Name() string { - return l.name +// Len returns the length of the fifo +func (fifo *LevelQueueByteFIFO) Len() int64 { + return fifo.internal.Len() } func init() { diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 961187ab0d..433435c301 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -69,17 +69,19 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( // the level backend only needs temporary workers to catch up with the previously dropped work levelCfg := LevelQueueConfiguration{ - WorkerPoolConfiguration: WorkerPoolConfiguration{ - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 6, + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 6, + }, + Workers: 1, + Name: config.Name + "-level", }, DataDir: config.DataDir, - Workers: 1, - Name: config.Name + "-level", } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) @@ -116,67 +118,67 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } // Name returns the name of this queue -func (p *PersistableChannelQueue) Name() string { - return p.delayedStarter.name +func (q *PersistableChannelQueue) Name() string { + return q.delayedStarter.name } // Push will push the indexer data to queue -func (p *PersistableChannelQueue) Push(data Data) error { +func (q *PersistableChannelQueue) Push(data Data) error { select { - case <-p.closed: - return p.internal.Push(data) + case <-q.closed: + return q.internal.Push(data) default: - return p.channelQueue.Push(data) + return q.channelQueue.Push(data) } } // Run starts to run the queue -func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name) +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) - p.lock.Lock() - if p.internal == nil { - err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar) - p.lock.Unlock() + q.lock.Lock() + if q.internal == nil { + err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar) + q.lock.Unlock() if err != nil { - log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err) + log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) return } } else { - p.lock.Unlock() + q.lock.Unlock() } - atShutdown(context.Background(), p.Shutdown) - atTerminate(context.Background(), p.Terminate) + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) // Just run the level queue - we shut it down later - go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) go func() { - _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) }() - log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name) - <-p.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name) - p.channelQueue.cancel() - p.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name) - p.channelQueue.Wait() - p.internal.(*LevelQueue).Wait() + log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) + <-q.closed + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.cancel() + q.internal.(*LevelQueue).cancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name) - for data := range p.channelQueue.dataChan { - _ = p.internal.Push(data) - atomic.AddInt64(&p.channelQueue.numInQueue, -1) + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) }() - log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty -func (p *PersistableChannelQueue) Flush(timeout time.Duration) error { +func (q *PersistableChannelQueue) Flush(timeout time.Duration) error { var ctx context.Context var cancel context.CancelFunc if timeout > 0 { @@ -185,24 +187,24 @@ func (p *PersistableChannelQueue) Flush(timeout time.Duration) error { ctx, cancel = context.WithCancel(context.Background()) } defer cancel() - return p.FlushWithContext(ctx) + return q.FlushWithContext(ctx) } // FlushWithContext flushes the queue and blocks till the queue is empty -func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { +func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { errChan := make(chan error, 1) go func() { - errChan <- p.channelQueue.FlushWithContext(ctx) + errChan <- q.channelQueue.FlushWithContext(ctx) }() go func() { - p.lock.Lock() - if p.internal == nil { - p.lock.Unlock() - errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name()) + q.lock.Lock() + if q.internal == nil { + q.lock.Unlock() + errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name()) return } - p.lock.Unlock() - errChan <- p.internal.FlushWithContext(ctx) + q.lock.Unlock() + errChan <- q.internal.FlushWithContext(ctx) }() err1 := <-errChan err2 := <-errChan @@ -214,44 +216,44 @@ func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { } // IsEmpty checks if a queue is empty -func (p *PersistableChannelQueue) IsEmpty() bool { - if !p.channelQueue.IsEmpty() { +func (q *PersistableChannelQueue) IsEmpty() bool { + if !q.channelQueue.IsEmpty() { return false } - p.lock.Lock() - defer p.lock.Unlock() - if p.internal == nil { + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { return false } - return p.internal.IsEmpty() + return q.internal.IsEmpty() } // Shutdown processing this queue -func (p *PersistableChannelQueue) Shutdown() { - log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name) +func (q *PersistableChannelQueue) Shutdown() { + log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) + q.lock.Lock() + defer q.lock.Unlock() select { - case <-p.closed: + case <-q.closed: default: - p.lock.Lock() - defer p.lock.Unlock() - if p.internal != nil { - p.internal.(*LevelQueue).Shutdown() + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() } - close(p.closed) + close(q.closed) + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } - log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) } // Terminate this queue and close the queue -func (p *PersistableChannelQueue) Terminate() { - log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name) - p.Shutdown() - p.lock.Lock() - defer p.lock.Unlock() - if p.internal != nil { - p.internal.(*LevelQueue).Terminate() +func (q *PersistableChannelQueue) Terminate() { + log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name) + q.Shutdown() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal != nil { + q.internal.(*LevelQueue).Terminate() } - log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name) + log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name) } func init() { diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index 038d7d8223..c7d3eb160b 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -34,16 +34,18 @@ func TestLevelQueue(t *testing.T) { defer os.RemoveAll(tmpDir) queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ - WorkerPoolConfiguration: WorkerPoolConfiguration{ - QueueLength: 20, - BatchLength: 2, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 10, + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + Workers: 1, }, DataDir: tmpDir, - Workers: 1, }, &testData{}) assert.NoError(t, err) @@ -105,16 +107,18 @@ func TestLevelQueue(t *testing.T) { WrappedQueueConfiguration{ Underlying: LevelQueueType, Config: LevelQueueConfiguration{ - WorkerPoolConfiguration: WorkerPoolConfiguration{ - QueueLength: 20, - BatchLength: 2, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 10, + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + Workers: 1, }, DataDir: tmpDir, - Workers: 1, }, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 0167c1ec49..8a395cd5aa 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -5,14 +5,8 @@ package queue import ( - "context" - "encoding/json" "errors" - "fmt" "strings" - "sync" - "sync/atomic" - "time" "code.gitea.io/gitea/modules/log" @@ -22,204 +16,130 @@ import ( // RedisQueueType is the type for redis queue const RedisQueueType Type = "redis" +// RedisQueueConfiguration is the configuration for the redis queue +type RedisQueueConfiguration struct { + ByteFIFOQueueConfiguration + RedisByteFIFOConfiguration +} + +// RedisQueue redis queue +type RedisQueue struct { + *ByteFIFOQueue +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisQueueConfiguration) + + byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &RedisQueue{ + ByteFIFOQueue: byteFIFOQueue, + } + + queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) + + return queue, nil +} + type redisClient interface { RPush(key string, args ...interface{}) *redis.IntCmd LPop(key string) *redis.StringCmd LLen(key string) *redis.IntCmd + SAdd(key string, members ...interface{}) *redis.IntCmd + SRem(key string, members ...interface{}) *redis.IntCmd + SIsMember(key string, member interface{}) *redis.BoolCmd Ping() *redis.StatusCmd Close() error } -// RedisQueue redis queue -type RedisQueue struct { - *WorkerPool - client redisClient - queueName string - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex +var _ (ByteFIFO) = &RedisByteFIFO{} + +// RedisByteFIFO represents a ByteFIFO formed from a redisClient +type RedisByteFIFO struct { + client redisClient + queueName string } -// RedisQueueConfiguration is the configuration for the redis queue -type RedisQueueConfiguration struct { - WorkerPoolConfiguration +// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO +type RedisByteFIFOConfiguration struct { Network string Addresses string Password string DBIndex int QueueName string - Workers int - Name string } -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { - configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) - if err != nil { - return nil, err +// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient +func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) { + fifo := &RedisByteFIFO{ + queueName: config.QueueName, } - config := configInterface.(RedisQueueConfiguration) - dbs := strings.Split(config.Addresses, ",") - - var queue = &RedisQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - queueName: config.QueueName, - exemplar: exemplar, - closed: make(chan struct{}), - terminated: make(chan struct{}), - workers: config.Workers, - name: config.Name, - } if len(dbs) == 0 { return nil, errors.New("no redis host specified") } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ + fifo.client = redis.NewClient(&redis.Options{ Network: config.Network, Addr: strings.TrimSpace(dbs[0]), // use default Addr Password: config.Password, // no password set DB: config.DBIndex, // use default DB }) } else { - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + fifo.client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: dbs, }) } - if err := queue.client.Ping().Err(); err != nil { + if err := fifo.client.Ping().Err(); err != nil { return nil, err } - queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) - - return queue, nil + return fifo, nil } -// Run runs the redis queue -func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), r.Shutdown) - atTerminate(context.Background(), r.Terminate) - log.Debug("RedisQueue: %s Starting", r.name) - - go func() { - _ = r.AddWorkers(r.workers, 0) - }() - - go r.readToChan() - - log.Trace("RedisQueue: %s Waiting til closed", r.name) - <-r.closed - log.Trace("RedisQueue: %s Waiting til done", r.name) - r.Wait() - - log.Trace("RedisQueue: %s Waiting til cleaned", r.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - r.CleanUp(ctx) - cancel() -} - -func (r *RedisQueue) readToChan() { - for { - select { - case <-r.closed: - // tell the pool to shutdown - r.cancel() - return - default: - atomic.AddInt64(&r.numInQueue, 1) - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil && err != redis.Nil { - log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - data, err := unmarshalAs(bs, r.exemplar) - if err != nil { - log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("RedisQueue: %s Task found: %#v", r.name, data) - r.WorkerPool.Push(data) - atomic.AddInt64(&r.numInQueue, -1) +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { + if fn != nil { + if err := fn(); err != nil { + return err } } + return fifo.client.RPush(fifo.queueName, data).Err() } -// Push implements Queue -func (r *RedisQueue) Push(data Data) error { - if !assignableTo(data, r.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) - } - bs, err := json.Marshal(data) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} - -// IsEmpty checks if the queue is empty -func (r *RedisQueue) IsEmpty() bool { - if !r.WorkerPool.IsEmpty() { - return false +// Pop pops data from the start of the fifo +func (fifo *RedisByteFIFO) Pop() ([]byte, error) { + data, err := fifo.client.LPop(fifo.queueName).Bytes() + if err != nil && err == redis.Nil { + return data, nil } - length, err := r.client.LLen(r.queueName).Result() - if err != nil { - log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err) - return false - } - return length == 0 + return data, err } -// Shutdown processing from this queue -func (r *RedisQueue) Shutdown() { - log.Trace("RedisQueue: %s Shutting down", r.name) - r.lock.Lock() - select { - case <-r.closed: - default: - close(r.closed) - } - r.lock.Unlock() - log.Debug("RedisQueue: %s Shutdown", r.name) +// Close this fifo +func (fifo *RedisByteFIFO) Close() error { + return fifo.client.Close() } -// Terminate this queue and close the queue -func (r *RedisQueue) Terminate() { - log.Trace("RedisQueue: %s Terminating", r.name) - r.Shutdown() - r.lock.Lock() - select { - case <-r.terminated: - r.lock.Unlock() - default: - close(r.terminated) - r.lock.Unlock() - if log.IsDebug() { - log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName)) - } - if err := r.client.Close(); err != nil { - log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) - } +// Len returns the length of the fifo +func (fifo *RedisByteFIFO) Len() int64 { + val, err := fifo.client.LLen(fifo.queueName).Result() + if err != nil { + log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) + return -1 } - log.Debug("RedisQueue: %s Terminated", r.name) -} - -// Name returns the name of this queue -func (r *RedisQueue) Name() string { - return r.name + return val } func init() { diff --git a/modules/queue/setting.go b/modules/queue/setting.go index 8760c09ae8..c47e85f756 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -7,6 +7,7 @@ package queue import ( "encoding/json" "fmt" + "strings" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) { opts["Password"] = q.Password opts["DBIndex"] = q.DBIndex opts["QueueName"] = q.QueueName + opts["SetName"] = q.SetName opts["Workers"] = q.Workers opts["MaxWorkers"] = q.MaxWorkers opts["BlockTimeout"] = q.BlockTimeout @@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { } return returnable } + +// CreateUniqueQueue for name with provided handler and exemplar +func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue { + q, cfg := getQueueSettings(name) + if len(cfg) == 0 { + return nil + } + + if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") { + q.Type = "unique-" + q.Type + } + + typ, err := validType(q.Type) + if err != nil || typ == PersistableChannelQueueType { + typ = PersistableChannelUniqueQueueType + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + } + + returnable, err := NewQueue(typ, handle, cfg, exemplar) + if q.WrapIfNecessary && err != nil { + log.Warn("Unable to create unique queue for %s: %v", name, err) + log.Warn("Attempting to create wrapped queue") + returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{ + Underlying: typ, + Timeout: q.Timeout, + MaxAttempts: q.MaxAttempts, + Config: cfg, + QueueLength: q.Length, + }, exemplar) + } + if err != nil { + log.Error("Unable to create unique queue for %s: %v", name, err) + return nil + } + return returnable.(UniqueQueue) +} diff --git a/modules/queue/unique_queue.go b/modules/queue/unique_queue.go new file mode 100644 index 0000000000..87e0594ecf --- /dev/null +++ b/modules/queue/unique_queue.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "fmt" +) + +// UniqueQueue defines a queue which guarantees only one instance of same +// data is in the queue. Instances with same identity will be +// discarded if there is already one in the line. +// +// This queue is particularly useful for preventing duplicated task +// of same purpose - please note that this does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +// +// Users of this queue should be careful to push only the identifier of the +// data +type UniqueQueue interface { + Queue + PushFunc(Data, func() error) error + Has(Data) (bool, error) +} + +// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue +var ErrAlreadyInQueue = fmt.Errorf("already in queue") diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go new file mode 100644 index 0000000000..dec1cfc5c0 --- /dev/null +++ b/modules/queue/unique_queue_channel.go @@ -0,0 +1,132 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "sync" + + "code.gitea.io/gitea/modules/log" +) + +// ChannelUniqueQueueType is the type for channel queue +const ChannelUniqueQueueType Type = "unique-channel" + +// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue +type ChannelUniqueQueueConfiguration ChannelQueueConfiguration + +// ChannelUniqueQueue implements UniqueQueue +// +// It is basically a thin wrapper around a WorkerPool but keeps a store of +// what has been pushed within a table. +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +type ChannelUniqueQueue struct { + *WorkerPool + lock sync.Mutex + table map[Data]bool + exemplar interface{} + workers int + name string +} + +// NewChannelUniqueQueue create a memory channel queue +func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ChannelUniqueQueueConfiguration) + if config.BatchLength == 0 { + config.BatchLength = 1 + } + queue := &ChannelUniqueQueue{ + table: map[Data]bool{}, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + } + queue.WorkerPool = NewWorkerPool(func(data ...Data) { + for _, datum := range data { + queue.lock.Lock() + delete(queue.table, datum) + queue.lock.Unlock() + handle(datum) + } + }, config.WorkerPoolConfiguration) + + queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar) + return queue, nil +} + +// Run starts to run the queue +func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), func() { + log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) + }) + atTerminate(context.Background(), func() { + log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) + }) + log.Debug("ChannelUniqueQueue: %s Starting", q.name) + go func() { + _ = q.AddWorkers(q.workers, 0) + }() +} + +// Push will push data into the queue if the data is not already in the queue +func (q *ChannelUniqueQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc will push data into the queue +func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) + } + q.lock.Lock() + locked := true + defer func() { + if locked { + q.lock.Unlock() + } + }() + if _, ok := q.table[data]; ok { + return ErrAlreadyInQueue + } + // FIXME: We probably need to implement some sort of limit here + // If the downstream queue blocks this table will grow without limit + q.table[data] = true + if fn != nil { + err := fn() + if err != nil { + delete(q.table, data) + return err + } + } + locked = false + q.lock.Unlock() + q.WorkerPool.Push(data) + return nil +} + +// Has checks if the data is in the queue +func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + q.lock.Lock() + defer q.lock.Unlock() + _, has := q.table[data] + return has, nil +} + +// Name returns the name of this queue +func (q *ChannelUniqueQueue) Name() string { + return q.name +} + +func init() { + queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue +} diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go new file mode 100644 index 0000000000..bfe7aeed83 --- /dev/null +++ b/modules/queue/unique_queue_disk.go @@ -0,0 +1,104 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "gitea.com/lunny/levelqueue" +) + +// LevelUniqueQueueType is the type for level queue +const LevelUniqueQueueType Type = "unique-level" + +// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue +type LevelUniqueQueueConfiguration struct { + ByteFIFOQueueConfiguration + DataDir string +} + +// LevelUniqueQueue implements a disk library queue +type LevelUniqueQueue struct { + *ByteFIFOUniqueQueue +} + +// NewLevelUniqueQueue creates a ledis local queue +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(LevelUniqueQueueConfiguration) + + byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &LevelUniqueQueue{ + ByteFIFOUniqueQueue: byteFIFOQueue, + } + queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar) + return queue, nil +} + +var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{} + +// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue +type LevelUniqueQueueByteFIFO struct { + internal *levelqueue.UniqueQueue +} + +// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue +func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) { + internal, err := levelqueue.OpenUnique(dataDir) + if err != nil { + return nil, err + } + + return &LevelUniqueQueueByteFIFO{ + internal: internal, + }, nil +} + +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error { + return fifo.internal.LPushFunc(data, fn) +} + +// Pop pops data from the start of the fifo +func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.internal.RPop() + if err != nil && err != levelqueue.ErrNotFound { + return nil, err + } + return data, nil +} + +// Len returns the length of the fifo +func (fifo *LevelUniqueQueueByteFIFO) Len() int64 { + return fifo.internal.Len() +} + +// Has returns whether the fifo contains this data +func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { + return fifo.internal.Has(data) +} + +// Close this fifo +func (fifo *LevelUniqueQueueByteFIFO) Close() error { + return fifo.internal.Close() +} + +func init() { + queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue +} diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go new file mode 100644 index 0000000000..71049f3259 --- /dev/null +++ b/modules/queue/unique_queue_disk_channel.go @@ -0,0 +1,241 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// PersistableChannelUniqueQueueType is the type for persistable queue +const PersistableChannelUniqueQueueType Type = "unique-persistable-channel" + +// PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue +type PersistableChannelUniqueQueueConfiguration struct { + Name string + DataDir string + BatchLength int + QueueLength int + Timeout time.Duration + MaxAttempts int + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int +} + +// PersistableChannelUniqueQueue wraps a channel queue and level queue together +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +type PersistableChannelUniqueQueue struct { + *ChannelUniqueQueue + delayedStarter + lock sync.Mutex + closed chan struct{} +} + +// NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down +// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate +func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(PersistableChannelUniqueQueueConfiguration) + + channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: config.BlockTimeout, + BoostTimeout: config.BoostTimeout, + BoostWorkers: config.BoostWorkers, + MaxWorkers: config.MaxWorkers, + }, + Workers: config.Workers, + Name: config.Name + "-channel", + }, exemplar) + if err != nil { + return nil, err + } + + // the level backend only needs temporary workers to catch up with the previously dropped work + levelCfg := LevelUniqueQueueConfiguration{ + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 1, + }, + Workers: 1, + Name: config.Name + "-level", + }, + DataDir: config.DataDir, + } + + queue := &PersistableChannelUniqueQueue{ + ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue), + closed: make(chan struct{}), + } + + levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { + for _, datum := range data { + err := queue.Push(datum) + if err != nil && err != ErrAlreadyInQueue { + log.Error("Unable push to channelled queue: %v", err) + } + } + }, levelCfg, exemplar) + if err == nil { + queue.delayedStarter = delayedStarter{ + internal: levelQueue.(*LevelUniqueQueue), + name: config.Name, + } + + _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar) + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + queue.delayedStarter = delayedStarter{ + cfg: levelCfg, + underlying: LevelUniqueQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, + } + _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar) + return queue, nil +} + +// Name returns the name of this queue +func (q *PersistableChannelUniqueQueue) Name() string { + return q.delayedStarter.name +} + +// Push will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error { + select { + case <-q.closed: + return q.internal.(UniqueQueue).PushFunc(data, fn) + default: + return q.ChannelUniqueQueue.PushFunc(data, fn) + } +} + +// Has will test if the queue has the data +func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { + // This is more difficult... + has, err := q.ChannelUniqueQueue.Has(data) + if err != nil || has { + return has, err + } + return q.internal.(UniqueQueue).Has(data) +} + +// Run starts to run the queue +func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) + + q.lock.Lock() + if q.internal == nil { + err := q.setInternal(atShutdown, func(data ...Data) { + for _, datum := range data { + err := q.Push(datum) + if err != nil && err != ErrAlreadyInQueue { + log.Error("Unable push to channelled queue: %v", err) + } + } + }, q.exemplar) + q.lock.Unlock() + if err != nil { + log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) + return + } + } else { + q.lock.Unlock() + } + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) + + // Just run the level queue - we shut it down later + go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + + go func() { + _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) + }() + + log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name) + <-q.closed + log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).cancel() + q.ChannelUniqueQueue.cancel() + log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) + q.ChannelUniqueQueue.Wait() + q.internal.(*LevelUniqueQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.ChannelUniqueQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name) +} + +// Flush flushes the queue +func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { + return q.ChannelUniqueQueue.Flush(timeout) +} + +// Shutdown processing this queue +func (q *PersistableChannelUniqueQueue) Shutdown() { + log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) + q.lock.Lock() + defer q.lock.Unlock() + select { + case <-q.closed: + default: + if q.internal != nil { + q.internal.(*LevelUniqueQueue).Shutdown() + } + close(q.closed) + } + log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) +} + +// Terminate this queue and close the queue +func (q *PersistableChannelUniqueQueue) Terminate() { + log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name) + q.Shutdown() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal != nil { + q.internal.(*LevelUniqueQueue).Terminate() + } + log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name) +} + +func init() { + queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue +} diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go new file mode 100644 index 0000000000..e5b2c48dbb --- /dev/null +++ b/modules/queue/unique_queue_redis.go @@ -0,0 +1,124 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +// RedisUniqueQueueType is the type for redis queue +const RedisUniqueQueueType Type = "unique-redis" + +// RedisUniqueQueue redis queue +type RedisUniqueQueue struct { + *ByteFIFOUniqueQueue +} + +// RedisUniqueQueueConfiguration is the configuration for the redis queue +type RedisUniqueQueueConfiguration struct { + ByteFIFOQueueConfiguration + RedisUniqueByteFIFOConfiguration +} + +// NewRedisUniqueQueue creates single redis or cluster redis queue. +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisUniqueQueueConfiguration) + + byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration) + if err != nil { + return nil, err + } + + if len(byteFIFO.setName) == 0 { + byteFIFO.setName = byteFIFO.queueName + "_unique" + } + + byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &RedisUniqueQueue{ + ByteFIFOUniqueQueue: byteFIFOQueue, + } + + queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar) + + return queue, nil +} + +var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{} + +// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient +type RedisUniqueByteFIFO struct { + RedisByteFIFO + setName string +} + +// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO +type RedisUniqueByteFIFOConfiguration struct { + RedisByteFIFOConfiguration + SetName string +} + +// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient +func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) { + internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration) + if err != nil { + return nil, err + } + + fifo := &RedisUniqueByteFIFO{ + RedisByteFIFO: *internal, + setName: config.SetName, + } + + return fifo, nil +} + +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { + added, err := fifo.client.SAdd(fifo.setName, data).Result() + if err != nil { + return err + } + if added == 0 { + return ErrAlreadyInQueue + } + if fn != nil { + if err := fn(); err != nil { + return err + } + } + return fifo.client.RPush(fifo.queueName, data).Err() +} + +// Pop pops data from the start of the fifo +func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.client.LPop(fifo.queueName).Bytes() + if err != nil { + return data, err + } + + if len(data) == 0 { + return data, nil + } + + err = fifo.client.SRem(fifo.setName, data).Err() + return data, err +} + +// Has returns whether the fifo contains this data +func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) { + return fifo.client.SIsMember(fifo.setName, data).Result() +} + +func init() { + queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue +} diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go new file mode 100644 index 0000000000..8c815218dd --- /dev/null +++ b/modules/queue/unique_queue_wrapped.go @@ -0,0 +1,172 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "fmt" + "sync" + "time" +) + +// WrappedUniqueQueueType is the type for a wrapped delayed starting queue +const WrappedUniqueQueueType Type = "unique-wrapped" + +// WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue +type WrappedUniqueQueueConfiguration struct { + Underlying Type + Timeout time.Duration + MaxAttempts int + Config interface{} + QueueLength int + Name string +} + +// WrappedUniqueQueue wraps a delayed starting unique queue +type WrappedUniqueQueue struct { + *WrappedQueue + table map[Data]bool + tlock sync.Mutex + ready bool +} + +// NewWrappedUniqueQueue will attempt to create a unique queue of the provided type, +// but if there is a problem creating this queue it will instead create +// a WrappedUniqueQueue with delayed startup of the queue instead and a +// channel which will be redirected to the queue +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(WrappedUniqueQueueConfiguration) + + queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar) + if err == nil { + // Just return the queue there is no need to wrap + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + wrapped := &WrappedUniqueQueue{ + WrappedQueue: &WrappedQueue{ + channel: make(chan Data, config.QueueLength), + exemplar: exemplar, + delayedStarter: delayedStarter{ + cfg: config.Config, + underlying: config.Underlying, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, + }, + }, + table: map[Data]bool{}, + } + + // wrapped.handle is passed to the delayedStarting internal queue and is run to handle + // data passed to + wrapped.handle = func(data ...Data) { + for _, datum := range data { + wrapped.tlock.Lock() + if !wrapped.ready { + delete(wrapped.table, data) + // If our table is empty all of the requests we have buffered between the + // wrapper queue starting and the internal queue starting have been handled. + // We can stop buffering requests in our local table and just pass Push + // direct to the internal queue + if len(wrapped.table) == 0 { + wrapped.ready = true + } + } + wrapped.tlock.Unlock() + handle(datum) + } + } + _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar) + return wrapped, nil +} + +// Push will push the data to the internal channel checking it against the exemplar +func (q *WrappedUniqueQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc will push the data to the internal channel checking it against the exemplar +func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + + q.tlock.Lock() + if q.ready { + // ready means our table is empty and all of the requests we have buffered between the + // wrapper queue starting and the internal queue starting have been handled. + // We can stop buffering requests in our local table and just pass Push + // direct to the internal queue + q.tlock.Unlock() + return q.internal.(UniqueQueue).PushFunc(data, fn) + } + + locked := true + defer func() { + if locked { + q.tlock.Unlock() + } + }() + if _, ok := q.table[data]; ok { + return ErrAlreadyInQueue + } + // FIXME: We probably need to implement some sort of limit here + // If the downstream queue blocks this table will grow without limit + q.table[data] = true + if fn != nil { + err := fn() + if err != nil { + delete(q.table, data) + return err + } + } + locked = false + q.tlock.Unlock() + + q.channel <- data + return nil +} + +// Has checks if the data is in the queue +func (q *WrappedUniqueQueue) Has(data Data) (bool, error) { + q.tlock.Lock() + defer q.tlock.Unlock() + if q.ready { + return q.internal.(UniqueQueue).Has(data) + } + _, has := q.table[data] + return has, nil +} + +// IsEmpty checks whether the queue is empty +func (q *WrappedUniqueQueue) IsEmpty() bool { + q.tlock.Lock() + if len(q.table) > 0 { + q.tlock.Unlock() + return false + } + if q.ready { + q.tlock.Unlock() + return q.internal.IsEmpty() + } + q.tlock.Unlock() + return false +} + +func init() { + queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue +} |