diff options
29 files changed, 1934 insertions, 500 deletions
diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 7e5b39e480..cbf05b5349 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -252,6 +252,10 @@ relation to port exhaustion. - `BATCH_LENGTH`: **20**: Batch data before passing to the handler - `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type. - `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section. +- `SET_NAME`: **_unique**: The suffix that will added to the default redis +set name for unique queues. Individual queues will default to +**`name`**`QUEUE_NAME`_`SET_NAME`_ but can be overridden in the specific +`queue.name` section. - `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.) - `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue - `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create. @@ -4,7 +4,7 @@ go 1.13 require ( cloud.google.com/go v0.45.0 // indirect - gitea.com/lunny/levelqueue v0.1.0 + gitea.com/lunny/levelqueue v0.2.0 gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76 gitea.com/macaron/captcha v0.0.0-20190822015246-daa973478bae @@ -11,6 +11,8 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= gitea.com/lunny/levelqueue v0.1.0 h1:7wMk0VH6mvKN6vZEZCy9nUDgRmdPLgeNrm1NkW8EHNk= gitea.com/lunny/levelqueue v0.1.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s= +gitea.com/lunny/levelqueue v0.2.0 h1:lR/5EAwQtFcn5YvPEkNMw0p9pAy2/O2nSP5ImECLA2E= +gitea.com/lunny/levelqueue v0.2.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s= gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b h1:vXt85uYV17KURaUlhU7v4GbCShkqRZDSfo0TkC0YCjQ= gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b/go.mod h1:Cxadig6POWpPYYSfg23E7jo35Yf0yvsdC1lifoKWmPo= gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76 h1:mMsMEg90c5KXQgRWsH8D6GHXfZIW1RAe5S9VYIb12lM= diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go new file mode 100644 index 0000000000..2cd0ba0b95 --- /dev/null +++ b/modules/queue/bytefifo.go @@ -0,0 +1,61 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +// ByteFIFO defines a FIFO that takes a byte array +type ByteFIFO interface { + // Len returns the length of the fifo + Len() int64 + // PushFunc pushes data to the end of the fifo and calls the callback if it is added + PushFunc(data []byte, fn func() error) error + // Pop pops data from the start of the fifo + Pop() ([]byte, error) + // Close this fifo + Close() error +} + +// UniqueByteFIFO defines a FIFO that Uniques its contents +type UniqueByteFIFO interface { + ByteFIFO + // Has returns whether the fifo contains this data + Has(data []byte) (bool, error) +} + +var _ (ByteFIFO) = &DummyByteFIFO{} + +// DummyByteFIFO represents a dummy fifo +type DummyByteFIFO struct{} + +// PushFunc returns nil +func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { + return nil +} + +// Pop returns nil +func (*DummyByteFIFO) Pop() ([]byte, error) { + return []byte{}, nil +} + +// Close returns nil +func (*DummyByteFIFO) Close() error { + return nil +} + +// Len is always 0 +func (*DummyByteFIFO) Len() int64 { + return 0 +} + +var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{} + +// DummyUniqueByteFIFO represents a dummy unique fifo +type DummyUniqueByteFIFO struct { + DummyByteFIFO +} + +// Has always returns false +func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { + return false, nil +} diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 094699d4af..e3c63310be 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -74,25 +74,35 @@ type DummyQueue struct { } // Run does nothing -func (b *DummyQueue) Run(_, _ func(context.Context, func())) {} +func (*DummyQueue) Run(_, _ func(context.Context, func())) {} // Push fakes a push of data to the queue -func (b *DummyQueue) Push(Data) error { +func (*DummyQueue) Push(Data) error { return nil } +// PushFunc fakes a push of data to the queue with a function. The function is never run. +func (*DummyQueue) PushFunc(Data, func() error) error { + return nil +} + +// Has always returns false as this queue never does anything +func (*DummyQueue) Has(Data) (bool, error) { + return false, nil +} + // Flush always returns nil -func (b *DummyQueue) Flush(time.Duration) error { +func (*DummyQueue) Flush(time.Duration) error { return nil } // FlushWithContext always returns nil -func (b *DummyQueue) FlushWithContext(context.Context) error { +func (*DummyQueue) FlushWithContext(context.Context) error { return nil } // IsEmpty asserts that the queue is empty -func (b *DummyQueue) IsEmpty() bool { +func (*DummyQueue) IsEmpty() bool { return true } diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go new file mode 100644 index 0000000000..cad258bda8 --- /dev/null +++ b/modules/queue/queue_bytefifo.go @@ -0,0 +1,227 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue +type ByteFIFOQueueConfiguration struct { + WorkerPoolConfiguration + Workers int + Name string +} + +var _ (Queue) = &ByteFIFOQueue{} + +// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool +type ByteFIFOQueue struct { + *WorkerPool + byteFIFO ByteFIFO + typ Type + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int + name string + lock sync.Mutex +} + +// NewByteFIFOQueue creates a new ByteFIFOQueue +func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) { + configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ByteFIFOQueueConfiguration) + + return &ByteFIFOQueue{ + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + }, nil +} + +// Name returns the name of this queue +func (q *ByteFIFOQueue) Name() string { + return q.name +} + +// Push pushes data to the fifo +func (q *ByteFIFOQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc pushes data to the fifo +func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return q.byteFIFO.PushFunc(bs, fn) +} + +// IsEmpty checks if the queue is empty +func (q *ByteFIFOQueue) IsEmpty() bool { + q.lock.Lock() + defer q.lock.Unlock() + if !q.WorkerPool.IsEmpty() { + return false + } + return q.byteFIFO.Len() == 0 +} + +// Run runs the bytefifo queue +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) + log.Debug("%s: %s Starting", q.typ, q.name) + + go func() { + _ = q.AddWorkers(q.workers, 0) + }() + + go q.readToChan() + + log.Trace("%s: %s Waiting til closed", q.typ, q.name) + <-q.closed + log.Trace("%s: %s Waiting til done", q.typ, q.name) + q.Wait() + + log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + q.CleanUp(ctx) + cancel() +} + +func (q *ByteFIFOQueue) readToChan() { + for { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: + q.lock.Lock() + bs, err := q.byteFIFO.Pop() + if err != nil { + q.lock.Unlock() + log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) + time.Sleep(time.Millisecond * 100) + continue + } + + if len(bs) == 0 { + q.lock.Unlock() + time.Sleep(time.Millisecond * 100) + continue + } + + data, err := unmarshalAs(bs, q.exemplar) + if err != nil { + log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) + q.lock.Unlock() + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) + q.WorkerPool.Push(data) + q.lock.Unlock() + } + } +} + +// Shutdown processing from this queue +func (q *ByteFIFOQueue) Shutdown() { + log.Trace("%s: %s Shutting down", q.typ, q.name) + q.lock.Lock() + select { + case <-q.closed: + default: + close(q.closed) + } + q.lock.Unlock() + log.Debug("%s: %s Shutdown", q.typ, q.name) +} + +// Terminate this queue and close the queue +func (q *ByteFIFOQueue) Terminate() { + log.Trace("%s: %s Terminating", q.typ, q.name) + q.Shutdown() + q.lock.Lock() + select { + case <-q.terminated: + q.lock.Unlock() + return + default: + } + close(q.terminated) + q.lock.Unlock() + if log.IsDebug() { + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + } + if err := q.byteFIFO.Close(); err != nil { + log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) + } + log.Debug("%s: %s Terminated", q.typ, q.name) +} + +var _ (UniqueQueue) = &ByteFIFOUniqueQueue{} + +// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo +type ByteFIFOUniqueQueue struct { + ByteFIFOQueue +} + +// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue +func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) { + configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ByteFIFOQueueConfiguration) + + return &ByteFIFOUniqueQueue{ + ByteFIFOQueue: ByteFIFOQueue{ + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + }, + }, nil +} + +// Has checks if the provided data is in the queue +func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { + if !assignableTo(data, q.exemplar) { + return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + return q.byteFIFO.(UniqueByteFIFO).Has(bs) +} diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 45df8a443e..d7a11e79f5 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -53,31 +53,31 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro } // Run starts to run the queue -func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), func() { - log.Warn("ChannelQueue: %s is not shutdownable!", c.name) + log.Warn("ChannelQueue: %s is not shutdownable!", q.name) }) atTerminate(context.Background(), func() { - log.Warn("ChannelQueue: %s is not terminatable!", c.name) + log.Warn("ChannelQueue: %s is not terminatable!", q.name) }) - log.Debug("ChannelQueue: %s Starting", c.name) + log.Debug("ChannelQueue: %s Starting", q.name) go func() { - _ = c.AddWorkers(c.workers, 0) + _ = q.AddWorkers(q.workers, 0) }() } // Push will push data into the queue -func (c *ChannelQueue) Push(data Data) error { - if !assignableTo(data, c.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) +func (q *ChannelQueue) Push(data Data) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } - c.WorkerPool.Push(data) + q.WorkerPool.Push(data) return nil } // Name returns the name of this queue -func (c *ChannelQueue) Name() string { - return c.name +func (q *ChannelQueue) Name() string { + return q.name } func init() { diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index ca3e230e3d..ff0876488b 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,15 +5,6 @@ package queue import ( - "context" - "encoding/json" - "fmt" - "sync" - "sync/atomic" - "time" - - "code.gitea.io/gitea/modules/log" - "gitea.com/lunny/levelqueue" ) @@ -22,22 +13,13 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { - WorkerPoolConfiguration + ByteFIFOQueueConfiguration DataDir string - Workers int - Name string } // LevelQueue implements a disk library queue type LevelQueue struct { - *WorkerPool - queue *levelqueue.Queue - closed chan struct{} - terminated chan struct{} - lock sync.Mutex - exemplar interface{} - workers int - name string + *ByteFIFOQueue } // NewLevelQueue creates a ledis local queue @@ -48,149 +30,69 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - internal, err := levelqueue.Open(config.DataDir) + byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) if err != nil { return nil, err } queue := &LevelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - queue: internal, - exemplar: exemplar, - closed: make(chan struct{}), - terminated: make(chan struct{}), - workers: config.Workers, - name: config.Name, + ByteFIFOQueue: byteFIFOQueue, } queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar) return queue, nil } -// Run starts to run the queue -func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), l.Shutdown) - atTerminate(context.Background(), l.Terminate) - log.Debug("LevelQueue: %s Starting", l.name) - - go func() { - _ = l.AddWorkers(l.workers, 0) - }() - - go l.readToChan() - - log.Trace("LevelQueue: %s Waiting til closed", l.name) - <-l.closed - - log.Trace("LevelQueue: %s Waiting til done", l.name) - l.Wait() - - log.Trace("LevelQueue: %s Waiting til cleaned", l.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - l.CleanUp(ctx) - cancel() - log.Trace("LevelQueue: %s Cleaned", l.name) - -} +var _ (ByteFIFO) = &LevelQueueByteFIFO{} -func (l *LevelQueue) readToChan() { - for { - select { - case <-l.closed: - // tell the pool to shutdown. - l.cancel() - return - default: - atomic.AddInt64(&l.numInQueue, 1) - bs, err := l.queue.RPop() - if err != nil { - if err != levelqueue.ErrNotFound { - log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) - } - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - data, err := unmarshalAs(bs, l.exemplar) - if err != nil { - log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("LevelQueue %s: Task found: %#v", l.name, data) - l.WorkerPool.Push(data) - atomic.AddInt64(&l.numInQueue, -1) - } - } +// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue +type LevelQueueByteFIFO struct { + internal *levelqueue.Queue } -// Push will push the indexer data to queue -func (l *LevelQueue) Push(data Data) error { - if !assignableTo(data, l.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) - } - bs, err := json.Marshal(data) +// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue +func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) { + internal, err := levelqueue.Open(dataDir) if err != nil { - return err + return nil, err } - return l.queue.LPush(bs) + + return &LevelQueueByteFIFO{ + internal: internal, + }, nil } -// IsEmpty checks whether the queue is empty -func (l *LevelQueue) IsEmpty() bool { - if !l.WorkerPool.IsEmpty() { - return false +// PushFunc will push data into the fifo +func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { + if fn != nil { + if err := fn(); err != nil { + return err + } } - return l.queue.Len() == 0 + return fifo.internal.LPush(data) } -// Shutdown this queue and stop processing -func (l *LevelQueue) Shutdown() { - l.lock.Lock() - defer l.lock.Unlock() - log.Trace("LevelQueue: %s Shutting down", l.name) - select { - case <-l.closed: - default: - close(l.closed) +// Pop pops data from the start of the fifo +func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.internal.RPop() + if err != nil && err != levelqueue.ErrNotFound { + return nil, err } - log.Debug("LevelQueue: %s Shutdown", l.name) + return data, nil } -// Terminate this queue and close the queue -func (l *LevelQueue) Terminate() { - log.Trace("LevelQueue: %s Terminating", l.name) - l.Shutdown() - l.lock.Lock() - select { - case <-l.terminated: - l.lock.Unlock() - default: - close(l.terminated) - l.lock.Unlock() - if log.IsDebug() { - log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len()) - } - if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { - log.Error("Error whilst closing internal queue in %s: %v", l.name, err) - } - - } - log.Debug("LevelQueue: %s Terminated", l.name) +// Close this fifo +func (fifo *LevelQueueByteFIFO) Close() error { + return fifo.internal.Close() } -// Name returns the name of this queue -func (l *LevelQueue) Name() string { - return l.name +// Len returns the length of the fifo +func (fifo *LevelQueueByteFIFO) Len() int64 { + return fifo.internal.Len() } func init() { diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 961187ab0d..433435c301 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -69,17 +69,19 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( // the level backend only needs temporary workers to catch up with the previously dropped work levelCfg := LevelQueueConfiguration{ - WorkerPoolConfiguration: WorkerPoolConfiguration{ - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 6, + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 6, + }, + Workers: 1, + Name: config.Name + "-level", }, DataDir: config.DataDir, - Workers: 1, - Name: config.Name + "-level", } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) @@ -116,67 +118,67 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } // Name returns the name of this queue -func (p *PersistableChannelQueue) Name() string { - return p.delayedStarter.name +func (q *PersistableChannelQueue) Name() string { + return q.delayedStarter.name } // Push will push the indexer data to queue -func (p *PersistableChannelQueue) Push(data Data) error { +func (q *PersistableChannelQueue) Push(data Data) error { select { - case <-p.closed: - return p.internal.Push(data) + case <-q.closed: + return q.internal.Push(data) default: - return p.channelQueue.Push(data) + return q.channelQueue.Push(data) } } // Run starts to run the queue -func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name) +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) - p.lock.Lock() - if p.internal == nil { - err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar) - p.lock.Unlock() + q.lock.Lock() + if q.internal == nil { + err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar) + q.lock.Unlock() if err != nil { - log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err) + log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) return } } else { - p.lock.Unlock() + q.lock.Unlock() } - atShutdown(context.Background(), p.Shutdown) - atTerminate(context.Background(), p.Terminate) + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) // Just run the level queue - we shut it down later - go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) go func() { - _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) }() - log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name) - <-p.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name) - p.channelQueue.cancel() - p.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name) - p.channelQueue.Wait() - p.internal.(*LevelQueue).Wait() + log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) + <-q.closed + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.cancel() + q.internal.(*LevelQueue).cancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name) - for data := range p.channelQueue.dataChan { - _ = p.internal.Push(data) - atomic.AddInt64(&p.channelQueue.numInQueue, -1) + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) }() - log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty -func (p *PersistableChannelQueue) Flush(timeout time.Duration) error { +func (q *PersistableChannelQueue) Flush(timeout time.Duration) error { var ctx context.Context var cancel context.CancelFunc if timeout > 0 { @@ -185,24 +187,24 @@ func (p *PersistableChannelQueue) Flush(timeout time.Duration) error { ctx, cancel = context.WithCancel(context.Background()) } defer cancel() - return p.FlushWithContext(ctx) + return q.FlushWithContext(ctx) } // FlushWithContext flushes the queue and blocks till the queue is empty -func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { +func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { errChan := make(chan error, 1) go func() { - errChan <- p.channelQueue.FlushWithContext(ctx) + errChan <- q.channelQueue.FlushWithContext(ctx) }() go func() { - p.lock.Lock() - if p.internal == nil { - p.lock.Unlock() - errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name()) + q.lock.Lock() + if q.internal == nil { + q.lock.Unlock() + errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name()) return } - p.lock.Unlock() - errChan <- p.internal.FlushWithContext(ctx) + q.lock.Unlock() + errChan <- q.internal.FlushWithContext(ctx) }() err1 := <-errChan err2 := <-errChan @@ -214,44 +216,44 @@ func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { } // IsEmpty checks if a queue is empty -func (p *PersistableChannelQueue) IsEmpty() bool { - if !p.channelQueue.IsEmpty() { +func (q *PersistableChannelQueue) IsEmpty() bool { + if !q.channelQueue.IsEmpty() { return false } - p.lock.Lock() - defer p.lock.Unlock() - if p.internal == nil { + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { return false } - return p.internal.IsEmpty() + return q.internal.IsEmpty() } // Shutdown processing this queue -func (p *PersistableChannelQueue) Shutdown() { - log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name) +func (q *PersistableChannelQueue) Shutdown() { + log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) + q.lock.Lock() + defer q.lock.Unlock() select { - case <-p.closed: + case <-q.closed: default: - p.lock.Lock() - defer p.lock.Unlock() - if p.internal != nil { - p.internal.(*LevelQueue).Shutdown() + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() } - close(p.closed) + close(q.closed) + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } - log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) } // Terminate this queue and close the queue -func (p *PersistableChannelQueue) Terminate() { - log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name) - p.Shutdown() - p.lock.Lock() - defer p.lock.Unlock() - if p.internal != nil { - p.internal.(*LevelQueue).Terminate() +func (q *PersistableChannelQueue) Terminate() { + log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name) + q.Shutdown() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal != nil { + q.internal.(*LevelQueue).Terminate() } - log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name) + log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name) } func init() { diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index 038d7d8223..c7d3eb160b 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -34,16 +34,18 @@ func TestLevelQueue(t *testing.T) { defer os.RemoveAll(tmpDir) queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ - WorkerPoolConfiguration: WorkerPoolConfiguration{ - QueueLength: 20, - BatchLength: 2, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 10, + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + Workers: 1, }, DataDir: tmpDir, - Workers: 1, }, &testData{}) assert.NoError(t, err) @@ -105,16 +107,18 @@ func TestLevelQueue(t *testing.T) { WrappedQueueConfiguration{ Underlying: LevelQueueType, Config: LevelQueueConfiguration{ - WorkerPoolConfiguration: WorkerPoolConfiguration{ - QueueLength: 20, - BatchLength: 2, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 10, + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + Workers: 1, }, DataDir: tmpDir, - Workers: 1, }, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 0167c1ec49..8a395cd5aa 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -5,14 +5,8 @@ package queue import ( - "context" - "encoding/json" "errors" - "fmt" "strings" - "sync" - "sync/atomic" - "time" "code.gitea.io/gitea/modules/log" @@ -22,204 +16,130 @@ import ( // RedisQueueType is the type for redis queue const RedisQueueType Type = "redis" +// RedisQueueConfiguration is the configuration for the redis queue +type RedisQueueConfiguration struct { + ByteFIFOQueueConfiguration + RedisByteFIFOConfiguration +} + +// RedisQueue redis queue +type RedisQueue struct { + *ByteFIFOQueue +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisQueueConfiguration) + + byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &RedisQueue{ + ByteFIFOQueue: byteFIFOQueue, + } + + queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) + + return queue, nil +} + type redisClient interface { RPush(key string, args ...interface{}) *redis.IntCmd LPop(key string) *redis.StringCmd LLen(key string) *redis.IntCmd + SAdd(key string, members ...interface{}) *redis.IntCmd + SRem(key string, members ...interface{}) *redis.IntCmd + SIsMember(key string, member interface{}) *redis.BoolCmd Ping() *redis.StatusCmd Close() error } -// RedisQueue redis queue -type RedisQueue struct { - *WorkerPool - client redisClient - queueName string - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex +var _ (ByteFIFO) = &RedisByteFIFO{} + +// RedisByteFIFO represents a ByteFIFO formed from a redisClient +type RedisByteFIFO struct { + client redisClient + queueName string } -// RedisQueueConfiguration is the configuration for the redis queue -type RedisQueueConfiguration struct { - WorkerPoolConfiguration +// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO +type RedisByteFIFOConfiguration struct { Network string Addresses string Password string DBIndex int QueueName string - Workers int - Name string } -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { - configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) - if err != nil { - return nil, err +// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient +func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) { + fifo := &RedisByteFIFO{ + queueName: config.QueueName, } - config := configInterface.(RedisQueueConfiguration) - dbs := strings.Split(config.Addresses, ",") - - var queue = &RedisQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - queueName: config.QueueName, - exemplar: exemplar, - closed: make(chan struct{}), - terminated: make(chan struct{}), - workers: config.Workers, - name: config.Name, - } if len(dbs) == 0 { return nil, errors.New("no redis host specified") } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ + fifo.client = redis.NewClient(&redis.Options{ Network: config.Network, Addr: strings.TrimSpace(dbs[0]), // use default Addr Password: config.Password, // no password set DB: config.DBIndex, // use default DB }) } else { - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + fifo.client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: dbs, }) } - if err := queue.client.Ping().Err(); err != nil { + if err := fifo.client.Ping().Err(); err != nil { return nil, err } - queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) - - return queue, nil + return fifo, nil } -// Run runs the redis queue -func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), r.Shutdown) - atTerminate(context.Background(), r.Terminate) - log.Debug("RedisQueue: %s Starting", r.name) - - go func() { - _ = r.AddWorkers(r.workers, 0) - }() - - go r.readToChan() - - log.Trace("RedisQueue: %s Waiting til closed", r.name) - <-r.closed - log.Trace("RedisQueue: %s Waiting til done", r.name) - r.Wait() - - log.Trace("RedisQueue: %s Waiting til cleaned", r.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - r.CleanUp(ctx) - cancel() -} - -func (r *RedisQueue) readToChan() { - for { - select { - case <-r.closed: - // tell the pool to shutdown - r.cancel() - return - default: - atomic.AddInt64(&r.numInQueue, 1) - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil && err != redis.Nil { - log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - data, err := unmarshalAs(bs, r.exemplar) - if err != nil { - log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("RedisQueue: %s Task found: %#v", r.name, data) - r.WorkerPool.Push(data) - atomic.AddInt64(&r.numInQueue, -1) +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { + if fn != nil { + if err := fn(); err != nil { + return err } } + return fifo.client.RPush(fifo.queueName, data).Err() } -// Push implements Queue -func (r *RedisQueue) Push(data Data) error { - if !assignableTo(data, r.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) - } - bs, err := json.Marshal(data) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} - -// IsEmpty checks if the queue is empty -func (r *RedisQueue) IsEmpty() bool { - if !r.WorkerPool.IsEmpty() { - return false +// Pop pops data from the start of the fifo +func (fifo *RedisByteFIFO) Pop() ([]byte, error) { + data, err := fifo.client.LPop(fifo.queueName).Bytes() + if err != nil && err == redis.Nil { + return data, nil } - length, err := r.client.LLen(r.queueName).Result() - if err != nil { - log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err) - return false - } - return length == 0 + return data, err } -// Shutdown processing from this queue -func (r *RedisQueue) Shutdown() { - log.Trace("RedisQueue: %s Shutting down", r.name) - r.lock.Lock() - select { - case <-r.closed: - default: - close(r.closed) - } - r.lock.Unlock() - log.Debug("RedisQueue: %s Shutdown", r.name) +// Close this fifo +func (fifo *RedisByteFIFO) Close() error { + return fifo.client.Close() } -// Terminate this queue and close the queue -func (r *RedisQueue) Terminate() { - log.Trace("RedisQueue: %s Terminating", r.name) - r.Shutdown() - r.lock.Lock() - select { - case <-r.terminated: - r.lock.Unlock() - default: - close(r.terminated) - r.lock.Unlock() - if log.IsDebug() { - log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName)) - } - if err := r.client.Close(); err != nil { - log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) - } +// Len returns the length of the fifo +func (fifo *RedisByteFIFO) Len() int64 { + val, err := fifo.client.LLen(fifo.queueName).Result() + if err != nil { + log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) + return -1 } - log.Debug("RedisQueue: %s Terminated", r.name) -} - -// Name returns the name of this queue -func (r *RedisQueue) Name() string { - return r.name + return val } func init() { diff --git a/modules/queue/setting.go b/modules/queue/setting.go index 8760c09ae8..c47e85f756 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -7,6 +7,7 @@ package queue import ( "encoding/json" "fmt" + "strings" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) { opts["Password"] = q.Password opts["DBIndex"] = q.DBIndex opts["QueueName"] = q.QueueName + opts["SetName"] = q.SetName opts["Workers"] = q.Workers opts["MaxWorkers"] = q.MaxWorkers opts["BlockTimeout"] = q.BlockTimeout @@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { } return returnable } + +// CreateUniqueQueue for name with provided handler and exemplar +func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue { + q, cfg := getQueueSettings(name) + if len(cfg) == 0 { + return nil + } + + if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") { + q.Type = "unique-" + q.Type + } + + typ, err := validType(q.Type) + if err != nil || typ == PersistableChannelQueueType { + typ = PersistableChannelUniqueQueueType + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + } + + returnable, err := NewQueue(typ, handle, cfg, exemplar) + if q.WrapIfNecessary && err != nil { + log.Warn("Unable to create unique queue for %s: %v", name, err) + log.Warn("Attempting to create wrapped queue") + returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{ + Underlying: typ, + Timeout: q.Timeout, + MaxAttempts: q.MaxAttempts, + Config: cfg, + QueueLength: q.Length, + }, exemplar) + } + if err != nil { + log.Error("Unable to create unique queue for %s: %v", name, err) + return nil + } + return returnable.(UniqueQueue) +} diff --git a/modules/queue/unique_queue.go b/modules/queue/unique_queue.go new file mode 100644 index 0000000000..87e0594ecf --- /dev/null +++ b/modules/queue/unique_queue.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "fmt" +) + +// UniqueQueue defines a queue which guarantees only one instance of same +// data is in the queue. Instances with same identity will be +// discarded if there is already one in the line. +// +// This queue is particularly useful for preventing duplicated task +// of same purpose - please note that this does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +// +// Users of this queue should be careful to push only the identifier of the +// data +type UniqueQueue interface { + Queue + PushFunc(Data, func() error) error + Has(Data) (bool, error) +} + +// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue +var ErrAlreadyInQueue = fmt.Errorf("already in queue") diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go new file mode 100644 index 0000000000..dec1cfc5c0 --- /dev/null +++ b/modules/queue/unique_queue_channel.go @@ -0,0 +1,132 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "sync" + + "code.gitea.io/gitea/modules/log" +) + +// ChannelUniqueQueueType is the type for channel queue +const ChannelUniqueQueueType Type = "unique-channel" + +// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue +type ChannelUniqueQueueConfiguration ChannelQueueConfiguration + +// ChannelUniqueQueue implements UniqueQueue +// +// It is basically a thin wrapper around a WorkerPool but keeps a store of +// what has been pushed within a table. +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +type ChannelUniqueQueue struct { + *WorkerPool + lock sync.Mutex + table map[Data]bool + exemplar interface{} + workers int + name string +} + +// NewChannelUniqueQueue create a memory channel queue +func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ChannelUniqueQueueConfiguration) + if config.BatchLength == 0 { + config.BatchLength = 1 + } + queue := &ChannelUniqueQueue{ + table: map[Data]bool{}, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + } + queue.WorkerPool = NewWorkerPool(func(data ...Data) { + for _, datum := range data { + queue.lock.Lock() + delete(queue.table, datum) + queue.lock.Unlock() + handle(datum) + } + }, config.WorkerPoolConfiguration) + + queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar) + return queue, nil +} + +// Run starts to run the queue +func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), func() { + log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) + }) + atTerminate(context.Background(), func() { + log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) + }) + log.Debug("ChannelUniqueQueue: %s Starting", q.name) + go func() { + _ = q.AddWorkers(q.workers, 0) + }() +} + +// Push will push data into the queue if the data is not already in the queue +func (q *ChannelUniqueQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc will push data into the queue +func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) + } + q.lock.Lock() + locked := true + defer func() { + if locked { + q.lock.Unlock() + } + }() + if _, ok := q.table[data]; ok { + return ErrAlreadyInQueue + } + // FIXME: We probably need to implement some sort of limit here + // If the downstream queue blocks this table will grow without limit + q.table[data] = true + if fn != nil { + err := fn() + if err != nil { + delete(q.table, data) + return err + } + } + locked = false + q.lock.Unlock() + q.WorkerPool.Push(data) + return nil +} + +// Has checks if the data is in the queue +func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + q.lock.Lock() + defer q.lock.Unlock() + _, has := q.table[data] + return has, nil +} + +// Name returns the name of this queue +func (q *ChannelUniqueQueue) Name() string { + return q.name +} + +func init() { + queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue +} diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go new file mode 100644 index 0000000000..bfe7aeed83 --- /dev/null +++ b/modules/queue/unique_queue_disk.go @@ -0,0 +1,104 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "gitea.com/lunny/levelqueue" +) + +// LevelUniqueQueueType is the type for level queue +const LevelUniqueQueueType Type = "unique-level" + +// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue +type LevelUniqueQueueConfiguration struct { + ByteFIFOQueueConfiguration + DataDir string +} + +// LevelUniqueQueue implements a disk library queue +type LevelUniqueQueue struct { + *ByteFIFOUniqueQueue +} + +// NewLevelUniqueQueue creates a ledis local queue +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(LevelUniqueQueueConfiguration) + + byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &LevelUniqueQueue{ + ByteFIFOUniqueQueue: byteFIFOQueue, + } + queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar) + return queue, nil +} + +var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{} + +// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue +type LevelUniqueQueueByteFIFO struct { + internal *levelqueue.UniqueQueue +} + +// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue +func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) { + internal, err := levelqueue.OpenUnique(dataDir) + if err != nil { + return nil, err + } + + return &LevelUniqueQueueByteFIFO{ + internal: internal, + }, nil +} + +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error { + return fifo.internal.LPushFunc(data, fn) +} + +// Pop pops data from the start of the fifo +func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.internal.RPop() + if err != nil && err != levelqueue.ErrNotFound { + return nil, err + } + return data, nil +} + +// Len returns the length of the fifo +func (fifo *LevelUniqueQueueByteFIFO) Len() int64 { + return fifo.internal.Len() +} + +// Has returns whether the fifo contains this data +func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { + return fifo.internal.Has(data) +} + +// Close this fifo +func (fifo *LevelUniqueQueueByteFIFO) Close() error { + return fifo.internal.Close() +} + +func init() { + queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue +} diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go new file mode 100644 index 0000000000..71049f3259 --- /dev/null +++ b/modules/queue/unique_queue_disk_channel.go @@ -0,0 +1,241 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// PersistableChannelUniqueQueueType is the type for persistable queue +const PersistableChannelUniqueQueueType Type = "unique-persistable-channel" + +// PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue +type PersistableChannelUniqueQueueConfiguration struct { + Name string + DataDir string + BatchLength int + QueueLength int + Timeout time.Duration + MaxAttempts int + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int +} + +// PersistableChannelUniqueQueue wraps a channel queue and level queue together +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +type PersistableChannelUniqueQueue struct { + *ChannelUniqueQueue + delayedStarter + lock sync.Mutex + closed chan struct{} +} + +// NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down +// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate +func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(PersistableChannelUniqueQueueConfiguration) + + channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: config.BlockTimeout, + BoostTimeout: config.BoostTimeout, + BoostWorkers: config.BoostWorkers, + MaxWorkers: config.MaxWorkers, + }, + Workers: config.Workers, + Name: config.Name + "-channel", + }, exemplar) + if err != nil { + return nil, err + } + + // the level backend only needs temporary workers to catch up with the previously dropped work + levelCfg := LevelUniqueQueueConfiguration{ + ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 1, + }, + Workers: 1, + Name: config.Name + "-level", + }, + DataDir: config.DataDir, + } + + queue := &PersistableChannelUniqueQueue{ + ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue), + closed: make(chan struct{}), + } + + levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { + for _, datum := range data { + err := queue.Push(datum) + if err != nil && err != ErrAlreadyInQueue { + log.Error("Unable push to channelled queue: %v", err) + } + } + }, levelCfg, exemplar) + if err == nil { + queue.delayedStarter = delayedStarter{ + internal: levelQueue.(*LevelUniqueQueue), + name: config.Name, + } + + _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar) + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + queue.delayedStarter = delayedStarter{ + cfg: levelCfg, + underlying: LevelUniqueQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, + } + _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar) + return queue, nil +} + +// Name returns the name of this queue +func (q *PersistableChannelUniqueQueue) Name() string { + return q.delayedStarter.name +} + +// Push will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error { + select { + case <-q.closed: + return q.internal.(UniqueQueue).PushFunc(data, fn) + default: + return q.ChannelUniqueQueue.PushFunc(data, fn) + } +} + +// Has will test if the queue has the data +func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { + // This is more difficult... + has, err := q.ChannelUniqueQueue.Has(data) + if err != nil || has { + return has, err + } + return q.internal.(UniqueQueue).Has(data) +} + +// Run starts to run the queue +func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) + + q.lock.Lock() + if q.internal == nil { + err := q.setInternal(atShutdown, func(data ...Data) { + for _, datum := range data { + err := q.Push(datum) + if err != nil && err != ErrAlreadyInQueue { + log.Error("Unable push to channelled queue: %v", err) + } + } + }, q.exemplar) + q.lock.Unlock() + if err != nil { + log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) + return + } + } else { + q.lock.Unlock() + } + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) + + // Just run the level queue - we shut it down later + go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + + go func() { + _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) + }() + + log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name) + <-q.closed + log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).cancel() + q.ChannelUniqueQueue.cancel() + log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) + q.ChannelUniqueQueue.Wait() + q.internal.(*LevelUniqueQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.ChannelUniqueQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name) +} + +// Flush flushes the queue +func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { + return q.ChannelUniqueQueue.Flush(timeout) +} + +// Shutdown processing this queue +func (q *PersistableChannelUniqueQueue) Shutdown() { + log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) + q.lock.Lock() + defer q.lock.Unlock() + select { + case <-q.closed: + default: + if q.internal != nil { + q.internal.(*LevelUniqueQueue).Shutdown() + } + close(q.closed) + } + log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) +} + +// Terminate this queue and close the queue +func (q *PersistableChannelUniqueQueue) Terminate() { + log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name) + q.Shutdown() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal != nil { + q.internal.(*LevelUniqueQueue).Terminate() + } + log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name) +} + +func init() { + queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue +} diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go new file mode 100644 index 0000000000..e5b2c48dbb --- /dev/null +++ b/modules/queue/unique_queue_redis.go @@ -0,0 +1,124 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +// RedisUniqueQueueType is the type for redis queue +const RedisUniqueQueueType Type = "unique-redis" + +// RedisUniqueQueue redis queue +type RedisUniqueQueue struct { + *ByteFIFOUniqueQueue +} + +// RedisUniqueQueueConfiguration is the configuration for the redis queue +type RedisUniqueQueueConfiguration struct { + ByteFIFOQueueConfiguration + RedisUniqueByteFIFOConfiguration +} + +// NewRedisUniqueQueue creates single redis or cluster redis queue. +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisUniqueQueueConfiguration) + + byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration) + if err != nil { + return nil, err + } + + if len(byteFIFO.setName) == 0 { + byteFIFO.setName = byteFIFO.queueName + "_unique" + } + + byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &RedisUniqueQueue{ + ByteFIFOUniqueQueue: byteFIFOQueue, + } + + queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar) + + return queue, nil +} + +var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{} + +// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient +type RedisUniqueByteFIFO struct { + RedisByteFIFO + setName string +} + +// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO +type RedisUniqueByteFIFOConfiguration struct { + RedisByteFIFOConfiguration + SetName string +} + +// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient +func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) { + internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration) + if err != nil { + return nil, err + } + + fifo := &RedisUniqueByteFIFO{ + RedisByteFIFO: *internal, + setName: config.SetName, + } + + return fifo, nil +} + +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { + added, err := fifo.client.SAdd(fifo.setName, data).Result() + if err != nil { + return err + } + if added == 0 { + return ErrAlreadyInQueue + } + if fn != nil { + if err := fn(); err != nil { + return err + } + } + return fifo.client.RPush(fifo.queueName, data).Err() +} + +// Pop pops data from the start of the fifo +func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.client.LPop(fifo.queueName).Bytes() + if err != nil { + return data, err + } + + if len(data) == 0 { + return data, nil + } + + err = fifo.client.SRem(fifo.setName, data).Err() + return data, err +} + +// Has returns whether the fifo contains this data +func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) { + return fifo.client.SIsMember(fifo.setName, data).Result() +} + +func init() { + queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue +} diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go new file mode 100644 index 0000000000..8c815218dd --- /dev/null +++ b/modules/queue/unique_queue_wrapped.go @@ -0,0 +1,172 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "fmt" + "sync" + "time" +) + +// WrappedUniqueQueueType is the type for a wrapped delayed starting queue +const WrappedUniqueQueueType Type = "unique-wrapped" + +// WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue +type WrappedUniqueQueueConfiguration struct { + Underlying Type + Timeout time.Duration + MaxAttempts int + Config interface{} + QueueLength int + Name string +} + +// WrappedUniqueQueue wraps a delayed starting unique queue +type WrappedUniqueQueue struct { + *WrappedQueue + table map[Data]bool + tlock sync.Mutex + ready bool +} + +// NewWrappedUniqueQueue will attempt to create a unique queue of the provided type, +// but if there is a problem creating this queue it will instead create +// a WrappedUniqueQueue with delayed startup of the queue instead and a +// channel which will be redirected to the queue +// +// Please note that this Queue does not guarantee that a particular +// task cannot be processed twice or more at the same time. Uniqueness is +// only guaranteed whilst the task is waiting in the queue. +func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(WrappedUniqueQueueConfiguration) + + queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar) + if err == nil { + // Just return the queue there is no need to wrap + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + wrapped := &WrappedUniqueQueue{ + WrappedQueue: &WrappedQueue{ + channel: make(chan Data, config.QueueLength), + exemplar: exemplar, + delayedStarter: delayedStarter{ + cfg: config.Config, + underlying: config.Underlying, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, + }, + }, + table: map[Data]bool{}, + } + + // wrapped.handle is passed to the delayedStarting internal queue and is run to handle + // data passed to + wrapped.handle = func(data ...Data) { + for _, datum := range data { + wrapped.tlock.Lock() + if !wrapped.ready { + delete(wrapped.table, data) + // If our table is empty all of the requests we have buffered between the + // wrapper queue starting and the internal queue starting have been handled. + // We can stop buffering requests in our local table and just pass Push + // direct to the internal queue + if len(wrapped.table) == 0 { + wrapped.ready = true + } + } + wrapped.tlock.Unlock() + handle(datum) + } + } + _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar) + return wrapped, nil +} + +// Push will push the data to the internal channel checking it against the exemplar +func (q *WrappedUniqueQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc will push the data to the internal channel checking it against the exemplar +func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + + q.tlock.Lock() + if q.ready { + // ready means our table is empty and all of the requests we have buffered between the + // wrapper queue starting and the internal queue starting have been handled. + // We can stop buffering requests in our local table and just pass Push + // direct to the internal queue + q.tlock.Unlock() + return q.internal.(UniqueQueue).PushFunc(data, fn) + } + + locked := true + defer func() { + if locked { + q.tlock.Unlock() + } + }() + if _, ok := q.table[data]; ok { + return ErrAlreadyInQueue + } + // FIXME: We probably need to implement some sort of limit here + // If the downstream queue blocks this table will grow without limit + q.table[data] = true + if fn != nil { + err := fn() + if err != nil { + delete(q.table, data) + return err + } + } + locked = false + q.tlock.Unlock() + + q.channel <- data + return nil +} + +// Has checks if the data is in the queue +func (q *WrappedUniqueQueue) Has(data Data) (bool, error) { + q.tlock.Lock() + defer q.tlock.Unlock() + if q.ready { + return q.internal.(UniqueQueue).Has(data) + } + _, has := q.table[data] + return has, nil +} + +// IsEmpty checks whether the queue is empty +func (q *WrappedUniqueQueue) IsEmpty() bool { + q.tlock.Lock() + if len(q.table) > 0 { + q.tlock.Unlock() + return false + } + if q.ready { + q.tlock.Unlock() + return q.internal.IsEmpty() + } + q.tlock.Unlock() + return false +} + +func init() { + queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue +} diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 934c5a8108..8bdca1017f 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -26,6 +26,7 @@ type QueueSettings struct { Addresses string Password string QueueName string + SetName string DBIndex int WrapIfNecessary bool MaxAttempts int @@ -54,8 +55,13 @@ func GetQueueSettings(name string) QueueSettings { q.DataDir = key.MustString(q.DataDir) case "QUEUE_NAME": q.QueueName = key.MustString(q.QueueName) + case "SET_NAME": + q.SetName = key.MustString(q.SetName) } } + if len(q.SetName) == 0 && len(Queue.SetName) > 0 { + q.SetName = q.QueueName + Queue.SetName + } if !filepath.IsAbs(q.DataDir) { q.DataDir = filepath.Join(AppDataPath, q.DataDir) } @@ -100,6 +106,7 @@ func NewQueueService() { Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5) Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue") + Queue.SetName = sec.Key("SET_NAME").MustString("") // Now handle the old issue_indexer configuration section := Cfg.Section("queue.issue_indexer") @@ -142,6 +149,17 @@ func NewQueueService() { if _, ok := sectionMap["LENGTH"]; !ok { _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) } + + // Handle the old test pull requests configuration + // Please note this will be a unique queue + section = Cfg.Section("queue.pr_patch_checker") + sectionMap = map[string]bool{} + for _, key := range section.Keys() { + sectionMap[key.Name()] = true + } + if _, ok := sectionMap["LENGTH"]; !ok { + _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) + } } // ParseQueueConnStr parses a queue connection string diff --git a/routers/init.go b/routers/init.go index 1d7cf78438..f86a7ad4b2 100644 --- a/routers/init.go +++ b/routers/init.go @@ -113,7 +113,9 @@ func GlobalInit(ctx context.Context) { code_indexer.Init() mirror_service.InitSyncMirrors() webhook.InitDeliverHooks() - pull_service.Init() + if err := pull_service.Init(); err != nil { + log.Fatal("Failed to initialize test pull requests queue: %v", err) + } if err := task.Init(); err != nil { log.Fatal("Failed to initialize task scheduler: %v", err) } diff --git a/services/pull/check.go b/services/pull/check.go index 5d380b4609..d64f49de3b 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "os" + "strconv" "strings" "code.gitea.io/gitea/models" @@ -17,24 +18,32 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" - "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/timeutil" "github.com/unknwon/com" ) -// pullRequestQueue represents a queue to handle update pull request tests -var pullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength) +// prQueue represents a queue to handle update pull request tests +var prQueue queue.UniqueQueue // AddToTaskQueue adds itself to pull request test task queue. func AddToTaskQueue(pr *models.PullRequest) { - go pullRequestQueue.AddFunc(pr.ID, func() { - pr.Status = models.PullRequestStatusChecking - if err := pr.UpdateCols("status"); err != nil { - log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err) + go func() { + err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error { + pr.Status = models.PullRequestStatusChecking + err := pr.UpdateCols("status") + if err != nil { + log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err) + } else { + log.Trace("Adding PR ID: %d to the test pull requests queue", pr.ID) + } + return err + }) + if err != nil && err != queue.ErrAlreadyInQueue { + log.Error("Error adding prID %d to the test pull requests queue: %v", pr.ID, err) } - }) + }() } // checkAndUpdateStatus checks if pull request is possible to leaving checking status, @@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) { } // Make sure there is no waiting test to process before leaving the checking status. - if !pullRequestQueue.Exist(pr.ID) { + has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10)) + if err != nil { + log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err) + } + + if !has { if err := pr.UpdateCols("status, conflicted_files"); err != nil { log.Error("Update[%d]: %v", pr.ID, err) } @@ -73,7 +87,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) { headFile := pr.GetGitRefName() // Check if a pull request is merged into BaseBranch - _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) + _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch). + RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) if err != nil { // Errors are signaled by a non-zero status that is not 1 if strings.Contains(err.Error(), "exit status 1") { @@ -93,7 +108,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) { cmd := commitID[:40] + ".." + pr.BaseBranch // Get the commit from BaseBranch where the pull request got merged - mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) + mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd). + RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) if err != nil { return nil, fmt.Errorf("git rev-list --ancestry-path --merges --reverse: %v", err) } else if len(mergeCommit) < 40 { @@ -155,61 +171,65 @@ func manuallyMerged(pr *models.PullRequest) bool { return false } -// TestPullRequests checks and tests untested patches of pull requests. -// TODO: test more pull requests at same time. -func TestPullRequests(ctx context.Context) { - - go func() { - prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) - if err != nil { - log.Error("Find Checking PRs: %v", err) +// InitializePullRequests checks and tests untested patches of pull requests. +func InitializePullRequests(ctx context.Context) { + prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) + if err != nil { + log.Error("Find Checking PRs: %v", err) + return + } + for _, prID := range prs { + select { + case <-ctx.Done(): return - } - for _, prID := range prs { - select { - case <-ctx.Done(): - return - default: - pullRequestQueue.Add(prID) + default: + if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error { + log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID) + return nil + }); err != nil { + log.Error("Error adding prID: %s to the pull requests patch checking queue %v", prID, err) } } - }() + } +} - // Start listening on new test requests. - for { - select { - case prID := <-pullRequestQueue.Queue(): - log.Trace("TestPullRequests[%v]: processing test task", prID) - pullRequestQueue.Remove(prID) +// handle passed PR IDs and test the PRs +func handle(data ...queue.Data) { + for _, datum := range data { + prID := datum.(string) + id := com.StrTo(prID).MustInt64() - id := com.StrTo(prID).MustInt64() + log.Trace("Testing PR ID %d from the pull requests patch checking queue", id) - pr, err := models.GetPullRequestByID(id) - if err != nil { - log.Error("GetPullRequestByID[%s]: %v", prID, err) - continue - } else if pr.Status != models.PullRequestStatusChecking { - continue - } else if manuallyMerged(pr) { - continue - } else if err = TestPatch(pr); err != nil { - log.Error("testPatch[%d]: %v", pr.ID, err) - pr.Status = models.PullRequestStatusError - if err := pr.UpdateCols("status"); err != nil { - log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err) - } - continue + pr, err := models.GetPullRequestByID(id) + if err != nil { + log.Error("GetPullRequestByID[%s]: %v", prID, err) + continue + } else if pr.Status != models.PullRequestStatusChecking { + continue + } else if manuallyMerged(pr) { + continue + } else if err = TestPatch(pr); err != nil { + log.Error("testPatch[%d]: %v", pr.ID, err) + pr.Status = models.PullRequestStatusError + if err := pr.UpdateCols("status"); err != nil { + log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err) } - checkAndUpdateStatus(pr) - case <-ctx.Done(): - pullRequestQueue.Close() - log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) - return + continue } + checkAndUpdateStatus(pr) } } // Init runs the task queue to test all the checking status pull requests -func Init() { - go graceful.GetManager().RunWithShutdownContext(TestPullRequests) +func Init() error { + prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "").(queue.UniqueQueue) + + if prQueue == nil { + return fmt.Errorf("Unable to create pr_patch_checker Queue") + } + + go graceful.GetManager().RunWithShutdownFns(prQueue.Run) + go graceful.GetManager().RunWithShutdownContext(InitializePullRequests) + return nil } diff --git a/services/pull/check_test.go b/services/pull/check_test.go index 48a7774a61..4591edd7aa 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -6,29 +6,82 @@ package pull import ( + "context" "strconv" "testing" "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/queue" "github.com/stretchr/testify/assert" + "github.com/unknwon/com" ) func TestPullRequest_AddToTaskQueue(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) + idChan := make(chan int64, 10) + + q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) { + for _, datum := range data { + prID := datum.(string) + id := com.StrTo(prID).MustInt64() + idChan <- id + } + }, queue.ChannelUniqueQueueConfiguration{ + WorkerPoolConfiguration: queue.WorkerPoolConfiguration{ + QueueLength: 10, + BatchLength: 1, + }, + Workers: 1, + Name: "temporary-queue", + }, "") + assert.NoError(t, err) + + queueShutdown := []func(){} + queueTerminate := []func(){} + + prQueue = q.(queue.UniqueQueue) + pr := models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest) AddToTaskQueue(pr) + assert.Eventually(t, func() bool { + pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest) + return pr.Status == models.PullRequestStatusChecking + }, 1*time.Second, 100*time.Millisecond) + + has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10)) + assert.True(t, has) + assert.NoError(t, err) + + prQueue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(_ context.Context, terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + select { - case id := <-pullRequestQueue.Queue(): - assert.EqualValues(t, strconv.FormatInt(pr.ID, 10), id) + case id := <-idChan: + assert.EqualValues(t, pr.ID, id) case <-time.After(time.Second): assert.Fail(t, "Timeout: nothing was added to pullRequestQueue") } - assert.True(t, pullRequestQueue.Exist(pr.ID)) + has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10)) + assert.False(t, has) + assert.NoError(t, err) + pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest) assert.Equal(t, models.PullRequestStatusChecking, pr.Status) + + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } + + prQueue = nil } diff --git a/vendor/gitea.com/lunny/levelqueue/.gitignore b/vendor/gitea.com/lunny/levelqueue/.gitignore index 59a8bdee30..ab1fe76029 100644 --- a/vendor/gitea.com/lunny/levelqueue/.gitignore +++ b/vendor/gitea.com/lunny/levelqueue/.gitignore @@ -1,3 +1,7 @@ queue/ queue_pop/ -queue_push/
\ No newline at end of file +queue_push/ +uniquequeue/ +uniquequeue_pop/ +uniquequeue_push/ +set/ diff --git a/vendor/gitea.com/lunny/levelqueue/README.md b/vendor/gitea.com/lunny/levelqueue/README.md index 80a0853cf6..21db280839 100644 --- a/vendor/gitea.com/lunny/levelqueue/README.md +++ b/vendor/gitea.com/lunny/levelqueue/README.md @@ -25,4 +25,36 @@ data, err = queue.LPop() queue.LHandle(func(dt []byte) error{ return nil }) -```
\ No newline at end of file +``` + +You can now create a Set from a leveldb: + +```Go +set, err := levelqueue.OpenSet("./set") + +added, err:= set.Add([]byte("member1")) + +has, err := set.Has([]byte("member1")) + +members, err := set.Members() + +removed, err := set.Remove([]byte("member1")) +``` + +And you can create a UniqueQueue from a leveldb: + +```Go +queue, err := levelqueue.OpenUnique("./queue") + +err := queue.RPush([]byte("member1")) + +err = queue.LPush([]byte("member1")) +// Will return ErrAlreadyInQueue + +// and so on. +``` + +## Creating Queues, UniqueQueues and Sets from already open DB + +If you have an already open DB you can create these from this using the +`NewQueue`, `NewUniqueQueue` and `NewSet` functions.
\ No newline at end of file diff --git a/vendor/gitea.com/lunny/levelqueue/error.go b/vendor/gitea.com/lunny/levelqueue/error.go index d639c5d496..648c185655 100644 --- a/vendor/gitea.com/lunny/levelqueue/error.go +++ b/vendor/gitea.com/lunny/levelqueue/error.go @@ -7,6 +7,8 @@ package levelqueue import "errors" var ( - // ErrNotFound means no element in queue + // ErrNotFound means no elements in queue ErrNotFound = errors.New("no key found") + + ErrAlreadyInQueue = errors.New("value already in queue") ) diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go index af624db8e4..20ed90100c 100644 --- a/vendor/gitea.com/lunny/levelqueue/queue.go +++ b/vendor/gitea.com/lunny/levelqueue/queue.go @@ -12,37 +12,62 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +const ( + lowKeyStr = "low" + highKeyStr = "high" +) + // Queue defines a queue struct type Queue struct { - db *leveldb.DB - highLock sync.Mutex - lowLock sync.Mutex - low int64 - high int64 + db *leveldb.DB + highLock sync.Mutex + lowLock sync.Mutex + low int64 + high int64 + lowKey []byte + highKey []byte + prefix []byte + closeUnderlyingDB bool } -// Open opens a queue object or create it if not exist +// Open opens a queue from the db path or creates a +// queue if it doesn't exist. +// The keys will not be prefixed by default func Open(dataDir string) (*Queue, error) { db, err := leveldb.OpenFile(dataDir, nil) if err != nil { return nil, err } + return NewQueue(db, []byte{}, true) +} + +// NewQueue creates a queue from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) { + var err error var queue = &Queue{ - db: db, + db: db, + closeUnderlyingDB: closeUnderlyingDB, } - queue.low, err = queue.readID(lowKey) + + queue.prefix = make([]byte, len(prefix)) + copy(queue.prefix, prefix) + queue.lowKey = withPrefix(prefix, []byte(lowKeyStr)) + queue.highKey = withPrefix(prefix, []byte(highKeyStr)) + + queue.low, err = queue.readID(queue.lowKey) if err == leveldb.ErrNotFound { queue.low = 1 - err = db.Put(lowKey, id2bytes(1), nil) + err = db.Put(queue.lowKey, id2bytes(1), nil) } if err != nil { return nil, err } - queue.high, err = queue.readID(highKey) + queue.high, err = queue.readID(queue.highKey) if err == leveldb.ErrNotFound { - err = db.Put(highKey, id2bytes(0), nil) + err = db.Put(queue.highKey, id2bytes(0), nil) } if err != nil { return nil, err @@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) { return bytes2id(bs) } -var ( - lowKey = []byte("low") - highKey = []byte("high") -) - func (queue *Queue) highincrement() (int64, error) { id := queue.high + 1 queue.high = id - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high - 1 return 0, err @@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) { func (queue *Queue) highdecrement() (int64, error) { queue.high = queue.high - 1 - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high + 1 return 0, err @@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) { func (queue *Queue) lowincrement() (int64, error) { queue.low = queue.low + 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low - 1 return 0, err @@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) { func (queue *Queue) lowdecrement() (int64, error) { queue.low = queue.low - 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low + 1 return 0, err @@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) { return binary.ReadVarint(bytes.NewReader(b)) } +func withPrefix(prefix []byte, value []byte) []byte { + if len(prefix) == 0 { + return value + } + prefixed := make([]byte, len(prefix)+1+len(value)) + copy(prefixed[0:len(prefix)], prefix) + prefixed[len(prefix)] = '-' + copy(prefixed[len(prefix)+1:], value) + return prefixed +} + // RPush pushes a data from right of queue func (queue *Queue) RPush(data []byte) error { queue.highLock.Lock() @@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error { queue.highLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.highLock.Unlock() return err } @@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error { queue.lowLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.lowLock.Unlock() return err } @@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } // LPop pop a data from left of queue @@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } -// Close closes the queue +// Close closes the queue (and the underlying db is set to closeUnderlyingDB) func (queue *Queue) Close() error { + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } err := queue.db.Close() queue.db = nil return err diff --git a/vendor/gitea.com/lunny/levelqueue/set.go b/vendor/gitea.com/lunny/levelqueue/set.go new file mode 100644 index 0000000000..88f4e9b1d1 --- /dev/null +++ b/vendor/gitea.com/lunny/levelqueue/set.go @@ -0,0 +1,110 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "sync" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +const ( + setPrefixStr = "set" +) + +// Set defines a set struct +type Set struct { + db *leveldb.DB + closeUnderlyingDB bool + lock sync.Mutex + prefix []byte +} + +// OpenSet opens a set from the db path or creates a set if it doesn't exist. +// The keys will be prefixed with "set-" by default +func OpenSet(dataDir string) (*Set, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewSet(db, []byte(setPrefixStr), true) +} + +// NewSet creates a set from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) { + set := &Set{ + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + set.prefix = make([]byte, len(prefix)) + copy(set.prefix, prefix) + + return set, nil +} + +// Add adds a member string to a key set, returns true if the member was not already present +func (set *Set) Add(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + has, err := set.db.Has(setKey, nil) + if err != nil || has { + return !has, err + } + return !has, set.db.Put(setKey, []byte(""), nil) +} + +// Members returns the current members of the set +func (set *Set) Members() ([][]byte, error) { + set.lock.Lock() + defer set.lock.Unlock() + var members [][]byte + prefix := withPrefix(set.prefix, []byte{}) + iter := set.db.NewIterator(util.BytesPrefix(prefix), nil) + for iter.Next() { + slice := iter.Key()[len(prefix):] + value := make([]byte, len(slice)) + copy(value, slice) + members = append(members, value) + } + iter.Release() + return members, iter.Error() +} + +// Has returns if the member is in the set +func (set *Set) Has(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + + return set.db.Has(setKey, nil) +} + +// Remove removes a member from the set, returns true if the member was present +func (set *Set) Remove(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + + has, err := set.db.Has(setKey, nil) + if err != nil || !has { + return has, err + } + + return has, set.db.Delete(setKey, nil) +} + +// Close closes the set (and the underlying db if set to closeUnderlyingDB) +func (set *Set) Close() error { + if !set.closeUnderlyingDB { + set.db = nil + return nil + } + err := set.db.Close() + set.db = nil + return err +} diff --git a/vendor/gitea.com/lunny/levelqueue/uniquequeue.go b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go new file mode 100644 index 0000000000..8d2676b0d2 --- /dev/null +++ b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go @@ -0,0 +1,184 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "fmt" + + "github.com/syndtr/goleveldb/leveldb" +) + +const ( + uniqueQueuePrefixStr = "unique" +) + +// UniqueQueue defines an unique queue struct +type UniqueQueue struct { + q *Queue + set *Set + db *leveldb.DB + closeUnderlyingDB bool +} + +// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist. +// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-" +func OpenUnique(dataDir string) (*UniqueQueue, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true) +} + +// NewUniqueQueue creates a new unique queue from a db. +// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix +// and at close the db will be closed as per closeUnderlyingDB +func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) { + internal, err := NewQueue(db, queuePrefix, false) + if err != nil { + return nil, err + } + set, err := NewSet(db, setPrefix, false) + if err != nil { + return nil, err + } + queue := &UniqueQueue{ + q: internal, + set: set, + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + + return queue, err +} + +// LPush pushes data to the left of the queue +func (queue *UniqueQueue) LPush(data []byte) error { + return queue.LPushFunc(data, nil) +} + +// LPushFunc pushes data to the left of the queue and calls the callback if it is added +func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.LPush(data) +} + +// RPush pushes data to the right of the queue +func (queue *UniqueQueue) RPush(data []byte) error { + return queue.RPushFunc(data, nil) +} + +// RPushFunc pushes data to the right of the queue and calls the callback if is added +func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.RPush(data) +} + +// RPop pop data from the right of the queue +func (queue *UniqueQueue) RPop() ([]byte, error) { + popped, err := queue.q.RPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) RHandle(h func([]byte) error) error { + return queue.q.RHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// LPop pops data from left of the queue +func (queue *UniqueQueue) LPop() ([]byte, error) { + popped, err := queue.q.LPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) LHandle(h func([]byte) error) error { + return queue.q.LHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// Has checks whether the data is already in the queue +func (queue *UniqueQueue) Has(data []byte) (bool, error) { + return queue.set.Has(data) +} + +// Len returns the length of the queue +func (queue *UniqueQueue) Len() int64 { + queue.set.lock.Lock() + defer queue.set.lock.Unlock() + return queue.q.Len() +} + +// Close closes the queue (and the underlying DB if set to closeUnderlyingDB) +func (queue *UniqueQueue) Close() error { + _ = queue.q.Close() + _ = queue.set.Close() + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } + err := queue.db.Close() + queue.db = nil + return err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5203c24e4a..947008d63c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,6 +1,6 @@ # cloud.google.com/go v0.45.0 cloud.google.com/go/compute/metadata -# gitea.com/lunny/levelqueue v0.1.0 +# gitea.com/lunny/levelqueue v0.2.0 gitea.com/lunny/levelqueue # gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b gitea.com/macaron/binding |