summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-02-02 23:19:58 +0000
committerGitHub <noreply@github.com>2020-02-02 23:19:58 +0000
commit2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch)
treed5ca361d9597e027ad92f1e02a841be1d266b554
parentb4914249ee389a733e7dcfd2df20708ab3215827 (diff)
downloadgitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz
gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
-rw-r--r--docs/content/doc/advanced/config-cheat-sheet.en-us.md4
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--modules/queue/bytefifo.go61
-rw-r--r--modules/queue/queue.go20
-rw-r--r--modules/queue/queue_bytefifo.go227
-rw-r--r--modules/queue/queue_channel.go22
-rw-r--r--modules/queue/queue_disk.go178
-rw-r--r--modules/queue/queue_disk_channel.go150
-rw-r--r--modules/queue/queue_disk_test.go36
-rw-r--r--modules/queue/queue_redis.go238
-rw-r--r--modules/queue/setting.go40
-rw-r--r--modules/queue/unique_queue.go29
-rw-r--r--modules/queue/unique_queue_channel.go132
-rw-r--r--modules/queue/unique_queue_disk.go104
-rw-r--r--modules/queue/unique_queue_disk_channel.go241
-rw-r--r--modules/queue/unique_queue_redis.go124
-rw-r--r--modules/queue/unique_queue_wrapped.go172
-rw-r--r--modules/setting/queue.go18
-rw-r--r--routers/init.go4
-rw-r--r--services/pull/check.go134
-rw-r--r--services/pull/check_test.go59
-rw-r--r--vendor/gitea.com/lunny/levelqueue/.gitignore6
-rw-r--r--vendor/gitea.com/lunny/levelqueue/README.md34
-rw-r--r--vendor/gitea.com/lunny/levelqueue/error.go4
-rw-r--r--vendor/gitea.com/lunny/levelqueue/queue.go97
-rw-r--r--vendor/gitea.com/lunny/levelqueue/set.go110
-rw-r--r--vendor/gitea.com/lunny/levelqueue/uniquequeue.go184
-rw-r--r--vendor/modules.txt2
29 files changed, 1934 insertions, 500 deletions
diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
index 7e5b39e480..cbf05b5349 100644
--- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md
+++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
@@ -252,6 +252,10 @@ relation to port exhaustion.
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
- `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section.
+- `SET_NAME`: **_unique**: The suffix that will added to the default redis
+set name for unique queues. Individual queues will default to
+**`name`**`QUEUE_NAME`_`SET_NAME`_ but can be overridden in the specific
+`queue.name` section.
- `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.)
- `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue
- `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create.
diff --git a/go.mod b/go.mod
index cb1dca4b5d..508024d39c 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.13
require (
cloud.google.com/go v0.45.0 // indirect
- gitea.com/lunny/levelqueue v0.1.0
+ gitea.com/lunny/levelqueue v0.2.0
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b
gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76
gitea.com/macaron/captcha v0.0.0-20190822015246-daa973478bae
diff --git a/go.sum b/go.sum
index 13ffa77502..27ad269429 100644
--- a/go.sum
+++ b/go.sum
@@ -11,6 +11,8 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
gitea.com/lunny/levelqueue v0.1.0 h1:7wMk0VH6mvKN6vZEZCy9nUDgRmdPLgeNrm1NkW8EHNk=
gitea.com/lunny/levelqueue v0.1.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
+gitea.com/lunny/levelqueue v0.2.0 h1:lR/5EAwQtFcn5YvPEkNMw0p9pAy2/O2nSP5ImECLA2E=
+gitea.com/lunny/levelqueue v0.2.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b h1:vXt85uYV17KURaUlhU7v4GbCShkqRZDSfo0TkC0YCjQ=
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b/go.mod h1:Cxadig6POWpPYYSfg23E7jo35Yf0yvsdC1lifoKWmPo=
gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76 h1:mMsMEg90c5KXQgRWsH8D6GHXfZIW1RAe5S9VYIb12lM=
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go
new file mode 100644
index 0000000000..2cd0ba0b95
--- /dev/null
+++ b/modules/queue/bytefifo.go
@@ -0,0 +1,61 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+// ByteFIFO defines a FIFO that takes a byte array
+type ByteFIFO interface {
+ // Len returns the length of the fifo
+ Len() int64
+ // PushFunc pushes data to the end of the fifo and calls the callback if it is added
+ PushFunc(data []byte, fn func() error) error
+ // Pop pops data from the start of the fifo
+ Pop() ([]byte, error)
+ // Close this fifo
+ Close() error
+}
+
+// UniqueByteFIFO defines a FIFO that Uniques its contents
+type UniqueByteFIFO interface {
+ ByteFIFO
+ // Has returns whether the fifo contains this data
+ Has(data []byte) (bool, error)
+}
+
+var _ (ByteFIFO) = &DummyByteFIFO{}
+
+// DummyByteFIFO represents a dummy fifo
+type DummyByteFIFO struct{}
+
+// PushFunc returns nil
+func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
+ return nil
+}
+
+// Pop returns nil
+func (*DummyByteFIFO) Pop() ([]byte, error) {
+ return []byte{}, nil
+}
+
+// Close returns nil
+func (*DummyByteFIFO) Close() error {
+ return nil
+}
+
+// Len is always 0
+func (*DummyByteFIFO) Len() int64 {
+ return 0
+}
+
+var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{}
+
+// DummyUniqueByteFIFO represents a dummy unique fifo
+type DummyUniqueByteFIFO struct {
+ DummyByteFIFO
+}
+
+// Has always returns false
+func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
+ return false, nil
+}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index 094699d4af..e3c63310be 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -74,25 +74,35 @@ type DummyQueue struct {
}
// Run does nothing
-func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
+func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
// Push fakes a push of data to the queue
-func (b *DummyQueue) Push(Data) error {
+func (*DummyQueue) Push(Data) error {
return nil
}
+// PushFunc fakes a push of data to the queue with a function. The function is never run.
+func (*DummyQueue) PushFunc(Data, func() error) error {
+ return nil
+}
+
+// Has always returns false as this queue never does anything
+func (*DummyQueue) Has(Data) (bool, error) {
+ return false, nil
+}
+
// Flush always returns nil
-func (b *DummyQueue) Flush(time.Duration) error {
+func (*DummyQueue) Flush(time.Duration) error {
return nil
}
// FlushWithContext always returns nil
-func (b *DummyQueue) FlushWithContext(context.Context) error {
+func (*DummyQueue) FlushWithContext(context.Context) error {
return nil
}
// IsEmpty asserts that the queue is empty
-func (b *DummyQueue) IsEmpty() bool {
+func (*DummyQueue) IsEmpty() bool {
return true
}
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
new file mode 100644
index 0000000000..cad258bda8
--- /dev/null
+++ b/modules/queue/queue_bytefifo.go
@@ -0,0 +1,227 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
+type ByteFIFOQueueConfiguration struct {
+ WorkerPoolConfiguration
+ Workers int
+ Name string
+}
+
+var _ (Queue) = &ByteFIFOQueue{}
+
+// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
+type ByteFIFOQueue struct {
+ *WorkerPool
+ byteFIFO ByteFIFO
+ typ Type
+ closed chan struct{}
+ terminated chan struct{}
+ exemplar interface{}
+ workers int
+ name string
+ lock sync.Mutex
+}
+
+// NewByteFIFOQueue creates a new ByteFIFOQueue
+func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
+ configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(ByteFIFOQueueConfiguration)
+
+ return &ByteFIFOQueue{
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ byteFIFO: byteFIFO,
+ typ: typ,
+ closed: make(chan struct{}),
+ terminated: make(chan struct{}),
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
+ }, nil
+}
+
+// Name returns the name of this queue
+func (q *ByteFIFOQueue) Name() string {
+ return q.name
+}
+
+// Push pushes data to the fifo
+func (q *ByteFIFOQueue) Push(data Data) error {
+ return q.PushFunc(data, nil)
+}
+
+// PushFunc pushes data to the fifo
+func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
+ if !assignableTo(data, q.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ }
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ return q.byteFIFO.PushFunc(bs, fn)
+}
+
+// IsEmpty checks if the queue is empty
+func (q *ByteFIFOQueue) IsEmpty() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if !q.WorkerPool.IsEmpty() {
+ return false
+ }
+ return q.byteFIFO.Len() == 0
+}
+
+// Run runs the bytefifo queue
+func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ atShutdown(context.Background(), q.Shutdown)
+ atTerminate(context.Background(), q.Terminate)
+ log.Debug("%s: %s Starting", q.typ, q.name)
+
+ go func() {
+ _ = q.AddWorkers(q.workers, 0)
+ }()
+
+ go q.readToChan()
+
+ log.Trace("%s: %s Waiting til closed", q.typ, q.name)
+ <-q.closed
+ log.Trace("%s: %s Waiting til done", q.typ, q.name)
+ q.Wait()
+
+ log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
+ ctx, cancel := context.WithCancel(context.Background())
+ atTerminate(ctx, cancel)
+ q.CleanUp(ctx)
+ cancel()
+}
+
+func (q *ByteFIFOQueue) readToChan() {
+ for {
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ default:
+ q.lock.Lock()
+ bs, err := q.byteFIFO.Pop()
+ if err != nil {
+ q.lock.Unlock()
+ log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ if len(bs) == 0 {
+ q.lock.Unlock()
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ data, err := unmarshalAs(bs, q.exemplar)
+ if err != nil {
+ log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
+ q.lock.Unlock()
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
+ q.WorkerPool.Push(data)
+ q.lock.Unlock()
+ }
+ }
+}
+
+// Shutdown processing from this queue
+func (q *ByteFIFOQueue) Shutdown() {
+ log.Trace("%s: %s Shutting down", q.typ, q.name)
+ q.lock.Lock()
+ select {
+ case <-q.closed:
+ default:
+ close(q.closed)
+ }
+ q.lock.Unlock()
+ log.Debug("%s: %s Shutdown", q.typ, q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ByteFIFOQueue) Terminate() {
+ log.Trace("%s: %s Terminating", q.typ, q.name)
+ q.Shutdown()
+ q.lock.Lock()
+ select {
+ case <-q.terminated:
+ q.lock.Unlock()
+ return
+ default:
+ }
+ close(q.terminated)
+ q.lock.Unlock()
+ if log.IsDebug() {
+ log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
+ }
+ if err := q.byteFIFO.Close(); err != nil {
+ log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
+ }
+ log.Debug("%s: %s Terminated", q.typ, q.name)
+}
+
+var _ (UniqueQueue) = &ByteFIFOUniqueQueue{}
+
+// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
+type ByteFIFOUniqueQueue struct {
+ ByteFIFOQueue
+}
+
+// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
+func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
+ configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(ByteFIFOQueueConfiguration)
+
+ return &ByteFIFOUniqueQueue{
+ ByteFIFOQueue: ByteFIFOQueue{
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ byteFIFO: byteFIFO,
+ typ: typ,
+ closed: make(chan struct{}),
+ terminated: make(chan struct{}),
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
+ },
+ }, nil
+}
+
+// Has checks if the provided data is in the queue
+func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
+ if !assignableTo(data, q.exemplar) {
+ return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ }
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return false, err
+ }
+ return q.byteFIFO.(UniqueByteFIFO).Has(bs)
+}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 45df8a443e..d7a11e79f5 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -53,31 +53,31 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
}
// Run starts to run the queue
-func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), func() {
- log.Warn("ChannelQueue: %s is not shutdownable!", c.name)
+ log.Warn("ChannelQueue: %s is not shutdownable!", q.name)
})
atTerminate(context.Background(), func() {
- log.Warn("ChannelQueue: %s is not terminatable!", c.name)
+ log.Warn("ChannelQueue: %s is not terminatable!", q.name)
})
- log.Debug("ChannelQueue: %s Starting", c.name)
+ log.Debug("ChannelQueue: %s Starting", q.name)
go func() {
- _ = c.AddWorkers(c.workers, 0)
+ _ = q.AddWorkers(q.workers, 0)
}()
}
// Push will push data into the queue
-func (c *ChannelQueue) Push(data Data) error {
- if !assignableTo(data, c.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
+func (q *ChannelQueue) Push(data Data) error {
+ if !assignableTo(data, q.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
- c.WorkerPool.Push(data)
+ q.WorkerPool.Push(data)
return nil
}
// Name returns the name of this queue
-func (c *ChannelQueue) Name() string {
- return c.name
+func (q *ChannelQueue) Name() string {
+ return q.name
}
func init() {
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index ca3e230e3d..ff0876488b 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -5,15 +5,6 @@
package queue
import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
"gitea.com/lunny/levelqueue"
)
@@ -22,22 +13,13 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
- WorkerPoolConfiguration
+ ByteFIFOQueueConfiguration
DataDir string
- Workers int
- Name string
}
// LevelQueue implements a disk library queue
type LevelQueue struct {
- *WorkerPool
- queue *levelqueue.Queue
- closed chan struct{}
- terminated chan struct{}
- lock sync.Mutex
- exemplar interface{}
- workers int
- name string
+ *ByteFIFOQueue
}
// NewLevelQueue creates a ledis local queue
@@ -48,149 +30,69 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
}
config := configInterface.(LevelQueueConfiguration)
- internal, err := levelqueue.Open(config.DataDir)
+ byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
+ if err != nil {
+ return nil, err
+ }
+
+ byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
if err != nil {
return nil, err
}
queue := &LevelQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- queue: internal,
- exemplar: exemplar,
- closed: make(chan struct{}),
- terminated: make(chan struct{}),
- workers: config.Workers,
- name: config.Name,
+ ByteFIFOQueue: byteFIFOQueue,
}
queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
return queue, nil
}
-// Run starts to run the queue
-func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), l.Shutdown)
- atTerminate(context.Background(), l.Terminate)
- log.Debug("LevelQueue: %s Starting", l.name)
-
- go func() {
- _ = l.AddWorkers(l.workers, 0)
- }()
-
- go l.readToChan()
-
- log.Trace("LevelQueue: %s Waiting til closed", l.name)
- <-l.closed
-
- log.Trace("LevelQueue: %s Waiting til done", l.name)
- l.Wait()
-
- log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
- ctx, cancel := context.WithCancel(context.Background())
- atTerminate(ctx, cancel)
- l.CleanUp(ctx)
- cancel()
- log.Trace("LevelQueue: %s Cleaned", l.name)
-
-}
+var _ (ByteFIFO) = &LevelQueueByteFIFO{}
-func (l *LevelQueue) readToChan() {
- for {
- select {
- case <-l.closed:
- // tell the pool to shutdown.
- l.cancel()
- return
- default:
- atomic.AddInt64(&l.numInQueue, 1)
- bs, err := l.queue.RPop()
- if err != nil {
- if err != levelqueue.ErrNotFound {
- log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
- }
- atomic.AddInt64(&l.numInQueue, -1)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- if len(bs) == 0 {
- atomic.AddInt64(&l.numInQueue, -1)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- data, err := unmarshalAs(bs, l.exemplar)
- if err != nil {
- log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
- atomic.AddInt64(&l.numInQueue, -1)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
- l.WorkerPool.Push(data)
- atomic.AddInt64(&l.numInQueue, -1)
- }
- }
+// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
+type LevelQueueByteFIFO struct {
+ internal *levelqueue.Queue
}
-// Push will push the indexer data to queue
-func (l *LevelQueue) Push(data Data) error {
- if !assignableTo(data, l.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
- }
- bs, err := json.Marshal(data)
+// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
+func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
+ internal, err := levelqueue.Open(dataDir)
if err != nil {
- return err
+ return nil, err
}
- return l.queue.LPush(bs)
+
+ return &LevelQueueByteFIFO{
+ internal: internal,
+ }, nil
}
-// IsEmpty checks whether the queue is empty
-func (l *LevelQueue) IsEmpty() bool {
- if !l.WorkerPool.IsEmpty() {
- return false
+// PushFunc will push data into the fifo
+func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+ if fn != nil {
+ if err := fn(); err != nil {
+ return err
+ }
}
- return l.queue.Len() == 0
+ return fifo.internal.LPush(data)
}
-// Shutdown this queue and stop processing
-func (l *LevelQueue) Shutdown() {
- l.lock.Lock()
- defer l.lock.Unlock()
- log.Trace("LevelQueue: %s Shutting down", l.name)
- select {
- case <-l.closed:
- default:
- close(l.closed)
+// Pop pops data from the start of the fifo
+func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
+ data, err := fifo.internal.RPop()
+ if err != nil && err != levelqueue.ErrNotFound {
+ return nil, err
}
- log.Debug("LevelQueue: %s Shutdown", l.name)
+ return data, nil
}
-// Terminate this queue and close the queue
-func (l *LevelQueue) Terminate() {
- log.Trace("LevelQueue: %s Terminating", l.name)
- l.Shutdown()
- l.lock.Lock()
- select {
- case <-l.terminated:
- l.lock.Unlock()
- default:
- close(l.terminated)
- l.lock.Unlock()
- if log.IsDebug() {
- log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
- }
- if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
- log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
- }
-
- }
- log.Debug("LevelQueue: %s Terminated", l.name)
+// Close this fifo
+func (fifo *LevelQueueByteFIFO) Close() error {
+ return fifo.internal.Close()
}
-// Name returns the name of this queue
-func (l *LevelQueue) Name() string {
- return l.name
+// Len returns the length of the fifo
+func (fifo *LevelQueueByteFIFO) Len() int64 {
+ return fifo.internal.Len()
}
func init() {
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 961187ab0d..433435c301 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -69,17 +69,19 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
// the level backend only needs temporary workers to catch up with the previously dropped work
levelCfg := LevelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- MaxWorkers: 6,
+ ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 6,
+ },
+ Workers: 1,
+ Name: config.Name + "-level",
},
DataDir: config.DataDir,
- Workers: 1,
- Name: config.Name + "-level",
}
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
@@ -116,67 +118,67 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
}
// Name returns the name of this queue
-func (p *PersistableChannelQueue) Name() string {
- return p.delayedStarter.name
+func (q *PersistableChannelQueue) Name() string {
+ return q.delayedStarter.name
}
// Push will push the indexer data to queue
-func (p *PersistableChannelQueue) Push(data Data) error {
+func (q *PersistableChannelQueue) Push(data Data) error {
select {
- case <-p.closed:
- return p.internal.Push(data)
+ case <-q.closed:
+ return q.internal.Push(data)
default:
- return p.channelQueue.Push(data)
+ return q.channelQueue.Push(data)
}
}
// Run starts to run the queue
-func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name)
+func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
- p.lock.Lock()
- if p.internal == nil {
- err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar)
- p.lock.Unlock()
+ q.lock.Lock()
+ if q.internal == nil {
+ err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar)
+ q.lock.Unlock()
if err != nil {
- log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
+ log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
return
}
} else {
- p.lock.Unlock()
+ q.lock.Unlock()
}
- atShutdown(context.Background(), p.Shutdown)
- atTerminate(context.Background(), p.Terminate)
+ atShutdown(context.Background(), q.Shutdown)
+ atTerminate(context.Background(), q.Terminate)
// Just run the level queue - we shut it down later
- go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
+ go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
go func() {
- _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0)
+ _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
}()
- log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
- <-p.closed
- log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
- p.channelQueue.cancel()
- p.internal.(*LevelQueue).cancel()
- log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
- p.channelQueue.Wait()
- p.internal.(*LevelQueue).Wait()
+ log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
+ <-q.closed
+ log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
+ q.channelQueue.cancel()
+ q.internal.(*LevelQueue).cancel()
+ log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
+ q.channelQueue.Wait()
+ q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
- log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
- for data := range p.channelQueue.dataChan {
- _ = p.internal.Push(data)
- atomic.AddInt64(&p.channelQueue.numInQueue, -1)
+ log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
- log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
+ log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
- log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
+ log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
}
// Flush flushes the queue and blocks till the queue is empty
-func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
+func (q *PersistableChannelQueue) Flush(timeout time.Duration) error {
var ctx context.Context
var cancel context.CancelFunc
if timeout > 0 {
@@ -185,24 +187,24 @@ func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()
- return p.FlushWithContext(ctx)
+ return q.FlushWithContext(ctx)
}
// FlushWithContext flushes the queue and blocks till the queue is empty
-func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
+func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
errChan := make(chan error, 1)
go func() {
- errChan <- p.channelQueue.FlushWithContext(ctx)
+ errChan <- q.channelQueue.FlushWithContext(ctx)
}()
go func() {
- p.lock.Lock()
- if p.internal == nil {
- p.lock.Unlock()
- errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name())
+ q.lock.Lock()
+ if q.internal == nil {
+ q.lock.Unlock()
+ errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name())
return
}
- p.lock.Unlock()
- errChan <- p.internal.FlushWithContext(ctx)
+ q.lock.Unlock()
+ errChan <- q.internal.FlushWithContext(ctx)
}()
err1 := <-errChan
err2 := <-errChan
@@ -214,44 +216,44 @@ func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
}
// IsEmpty checks if a queue is empty
-func (p *PersistableChannelQueue) IsEmpty() bool {
- if !p.channelQueue.IsEmpty() {
+func (q *PersistableChannelQueue) IsEmpty() bool {
+ if !q.channelQueue.IsEmpty() {
return false
}
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.internal == nil {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if q.internal == nil {
return false
}
- return p.internal.IsEmpty()
+ return q.internal.IsEmpty()
}
// Shutdown processing this queue
-func (p *PersistableChannelQueue) Shutdown() {
- log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name)
+func (q *PersistableChannelQueue) Shutdown() {
+ log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
+ q.lock.Lock()
+ defer q.lock.Unlock()
select {
- case <-p.closed:
+ case <-q.closed:
default:
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.internal != nil {
- p.internal.(*LevelQueue).Shutdown()
+ if q.internal != nil {
+ q.internal.(*LevelQueue).Shutdown()
}
- close(p.closed)
+ close(q.closed)
+ log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
- log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
}
// Terminate this queue and close the queue
-func (p *PersistableChannelQueue) Terminate() {
- log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
- p.Shutdown()
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.internal != nil {
- p.internal.(*LevelQueue).Terminate()
+func (q *PersistableChannelQueue) Terminate() {
+ log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name)
+ q.Shutdown()
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if q.internal != nil {
+ q.internal.(*LevelQueue).Terminate()
}
- log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name)
+ log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name)
}
func init() {
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
index 038d7d8223..c7d3eb160b 100644
--- a/modules/queue/queue_disk_test.go
+++ b/modules/queue/queue_disk_test.go
@@ -34,16 +34,18 @@ func TestLevelQueue(t *testing.T) {
defer os.RemoveAll(tmpDir)
queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- MaxWorkers: 10,
+ ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ BatchLength: 2,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 10,
+ },
+ Workers: 1,
},
DataDir: tmpDir,
- Workers: 1,
}, &testData{})
assert.NoError(t, err)
@@ -105,16 +107,18 @@ func TestLevelQueue(t *testing.T) {
WrappedQueueConfiguration{
Underlying: LevelQueueType,
Config: LevelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- MaxWorkers: 10,
+ ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ BatchLength: 2,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 10,
+ },
+ Workers: 1,
},
DataDir: tmpDir,
- Workers: 1,
},
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index 0167c1ec49..8a395cd5aa 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -5,14 +5,8 @@
package queue
import (
- "context"
- "encoding/json"
"errors"
- "fmt"
"strings"
- "sync"
- "sync/atomic"
- "time"
"code.gitea.io/gitea/modules/log"
@@ -22,204 +16,130 @@ import (
// RedisQueueType is the type for redis queue
const RedisQueueType Type = "redis"
+// RedisQueueConfiguration is the configuration for the redis queue
+type RedisQueueConfiguration struct {
+ ByteFIFOQueueConfiguration
+ RedisByteFIFOConfiguration
+}
+
+// RedisQueue redis queue
+type RedisQueue struct {
+ *ByteFIFOQueue
+}
+
+// NewRedisQueue creates single redis or cluster redis queue
+func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(RedisQueueConfiguration)
+
+ byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
+ if err != nil {
+ return nil, err
+ }
+
+ byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
+ if err != nil {
+ return nil, err
+ }
+
+ queue := &RedisQueue{
+ ByteFIFOQueue: byteFIFOQueue,
+ }
+
+ queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
+
+ return queue, nil
+}
+
type redisClient interface {
RPush(key string, args ...interface{}) *redis.IntCmd
LPop(key string) *redis.StringCmd
LLen(key string) *redis.IntCmd
+ SAdd(key string, members ...interface{}) *redis.IntCmd
+ SRem(key string, members ...interface{}) *redis.IntCmd
+ SIsMember(key string, member interface{}) *redis.BoolCmd
Ping() *redis.StatusCmd
Close() error
}
-// RedisQueue redis queue
-type RedisQueue struct {
- *WorkerPool
- client redisClient
- queueName string
- closed chan struct{}
- terminated chan struct{}
- exemplar interface{}
- workers int
- name string
- lock sync.Mutex
+var _ (ByteFIFO) = &RedisByteFIFO{}
+
+// RedisByteFIFO represents a ByteFIFO formed from a redisClient
+type RedisByteFIFO struct {
+ client redisClient
+ queueName string
}
-// RedisQueueConfiguration is the configuration for the redis queue
-type RedisQueueConfiguration struct {
- WorkerPoolConfiguration
+// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
+type RedisByteFIFOConfiguration struct {
Network string
Addresses string
Password string
DBIndex int
QueueName string
- Workers int
- Name string
}
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
+// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
+func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) {
+ fifo := &RedisByteFIFO{
+ queueName: config.QueueName,
}
- config := configInterface.(RedisQueueConfiguration)
-
dbs := strings.Split(config.Addresses, ",")
-
- var queue = &RedisQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- queueName: config.QueueName,
- exemplar: exemplar,
- closed: make(chan struct{}),
- terminated: make(chan struct{}),
- workers: config.Workers,
- name: config.Name,
- }
if len(dbs) == 0 {
return nil, errors.New("no redis host specified")
} else if len(dbs) == 1 {
- queue.client = redis.NewClient(&redis.Options{
+ fifo.client = redis.NewClient(&redis.Options{
Network: config.Network,
Addr: strings.TrimSpace(dbs[0]), // use default Addr
Password: config.Password, // no password set
DB: config.DBIndex, // use default DB
})
} else {
- queue.client = redis.NewClusterClient(&redis.ClusterOptions{
+ fifo.client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: dbs,
})
}
- if err := queue.client.Ping().Err(); err != nil {
+ if err := fifo.client.Ping().Err(); err != nil {
return nil, err
}
- queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
-
- return queue, nil
+ return fifo, nil
}
-// Run runs the redis queue
-func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), r.Shutdown)
- atTerminate(context.Background(), r.Terminate)
- log.Debug("RedisQueue: %s Starting", r.name)
-
- go func() {
- _ = r.AddWorkers(r.workers, 0)
- }()
-
- go r.readToChan()
-
- log.Trace("RedisQueue: %s Waiting til closed", r.name)
- <-r.closed
- log.Trace("RedisQueue: %s Waiting til done", r.name)
- r.Wait()
-
- log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
- ctx, cancel := context.WithCancel(context.Background())
- atTerminate(ctx, cancel)
- r.CleanUp(ctx)
- cancel()
-}
-
-func (r *RedisQueue) readToChan() {
- for {
- select {
- case <-r.closed:
- // tell the pool to shutdown
- r.cancel()
- return
- default:
- atomic.AddInt64(&r.numInQueue, 1)
- bs, err := r.client.LPop(r.queueName).Bytes()
- if err != nil && err != redis.Nil {
- log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
- atomic.AddInt64(&r.numInQueue, -1)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- if len(bs) == 0 {
- atomic.AddInt64(&r.numInQueue, -1)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- data, err := unmarshalAs(bs, r.exemplar)
- if err != nil {
- log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
- atomic.AddInt64(&r.numInQueue, -1)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
- r.WorkerPool.Push(data)
- atomic.AddInt64(&r.numInQueue, -1)
+// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
+ if fn != nil {
+ if err := fn(); err != nil {
+ return err
}
}
+ return fifo.client.RPush(fifo.queueName, data).Err()
}
-// Push implements Queue
-func (r *RedisQueue) Push(data Data) error {
- if !assignableTo(data, r.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
- }
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- return r.client.RPush(r.queueName, bs).Err()
-}
-
-// IsEmpty checks if the queue is empty
-func (r *RedisQueue) IsEmpty() bool {
- if !r.WorkerPool.IsEmpty() {
- return false
+// Pop pops data from the start of the fifo
+func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
+ data, err := fifo.client.LPop(fifo.queueName).Bytes()
+ if err != nil && err == redis.Nil {
+ return data, nil
}
- length, err := r.client.LLen(r.queueName).Result()
- if err != nil {
- log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err)
- return false
- }
- return length == 0
+ return data, err
}
-// Shutdown processing from this queue
-func (r *RedisQueue) Shutdown() {
- log.Trace("RedisQueue: %s Shutting down", r.name)
- r.lock.Lock()
- select {
- case <-r.closed:
- default:
- close(r.closed)
- }
- r.lock.Unlock()
- log.Debug("RedisQueue: %s Shutdown", r.name)
+// Close this fifo
+func (fifo *RedisByteFIFO) Close() error {
+ return fifo.client.Close()
}
-// Terminate this queue and close the queue
-func (r *RedisQueue) Terminate() {
- log.Trace("RedisQueue: %s Terminating", r.name)
- r.Shutdown()
- r.lock.Lock()
- select {
- case <-r.terminated:
- r.lock.Unlock()
- default:
- close(r.terminated)
- r.lock.Unlock()
- if log.IsDebug() {
- log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName))
- }
- if err := r.client.Close(); err != nil {
- log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
- }
+// Len returns the length of the fifo
+func (fifo *RedisByteFIFO) Len() int64 {
+ val, err := fifo.client.LLen(fifo.queueName).Result()
+ if err != nil {
+ log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
+ return -1
}
- log.Debug("RedisQueue: %s Terminated", r.name)
-}
-
-// Name returns the name of this queue
-func (r *RedisQueue) Name() string {
- return r.name
+ return val
}
func init() {
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
index 8760c09ae8..c47e85f756 100644
--- a/modules/queue/setting.go
+++ b/modules/queue/setting.go
@@ -7,6 +7,7 @@ package queue
import (
"encoding/json"
"fmt"
+ "strings"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
opts["Password"] = q.Password
opts["DBIndex"] = q.DBIndex
opts["QueueName"] = q.QueueName
+ opts["SetName"] = q.SetName
opts["Workers"] = q.Workers
opts["MaxWorkers"] = q.MaxWorkers
opts["BlockTimeout"] = q.BlockTimeout
@@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
}
return returnable
}
+
+// CreateUniqueQueue for name with provided handler and exemplar
+func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
+ q, cfg := getQueueSettings(name)
+ if len(cfg) == 0 {
+ return nil
+ }
+
+ if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") {
+ q.Type = "unique-" + q.Type
+ }
+
+ typ, err := validType(q.Type)
+ if err != nil || typ == PersistableChannelQueueType {
+ typ = PersistableChannelUniqueQueueType
+ if err != nil {
+ log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+ }
+ }
+
+ returnable, err := NewQueue(typ, handle, cfg, exemplar)
+ if q.WrapIfNecessary && err != nil {
+ log.Warn("Unable to create unique queue for %s: %v", name, err)
+ log.Warn("Attempting to create wrapped queue")
+ returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{
+ Underlying: typ,
+ Timeout: q.Timeout,
+ MaxAttempts: q.MaxAttempts,
+ Config: cfg,
+ QueueLength: q.Length,
+ }, exemplar)
+ }
+ if err != nil {
+ log.Error("Unable to create unique queue for %s: %v", name, err)
+ return nil
+ }
+ return returnable.(UniqueQueue)
+}
diff --git a/modules/queue/unique_queue.go b/modules/queue/unique_queue.go
new file mode 100644
index 0000000000..87e0594ecf
--- /dev/null
+++ b/modules/queue/unique_queue.go
@@ -0,0 +1,29 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "fmt"
+)
+
+// UniqueQueue defines a queue which guarantees only one instance of same
+// data is in the queue. Instances with same identity will be
+// discarded if there is already one in the line.
+//
+// This queue is particularly useful for preventing duplicated task
+// of same purpose - please note that this does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+//
+// Users of this queue should be careful to push only the identifier of the
+// data
+type UniqueQueue interface {
+ Queue
+ PushFunc(Data, func() error) error
+ Has(Data) (bool, error)
+}
+
+// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue
+var ErrAlreadyInQueue = fmt.Errorf("already in queue")
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
new file mode 100644
index 0000000000..dec1cfc5c0
--- /dev/null
+++ b/modules/queue/unique_queue_channel.go
@@ -0,0 +1,132 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// ChannelUniqueQueueType is the type for channel queue
+const ChannelUniqueQueueType Type = "unique-channel"
+
+// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
+type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
+
+// ChannelUniqueQueue implements UniqueQueue
+//
+// It is basically a thin wrapper around a WorkerPool but keeps a store of
+// what has been pushed within a table.
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+type ChannelUniqueQueue struct {
+ *WorkerPool
+ lock sync.Mutex
+ table map[Data]bool
+ exemplar interface{}
+ workers int
+ name string
+}
+
+// NewChannelUniqueQueue create a memory channel queue
+func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(ChannelUniqueQueueConfiguration)
+ if config.BatchLength == 0 {
+ config.BatchLength = 1
+ }
+ queue := &ChannelUniqueQueue{
+ table: map[Data]bool{},
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
+ }
+ queue.WorkerPool = NewWorkerPool(func(data ...Data) {
+ for _, datum := range data {
+ queue.lock.Lock()
+ delete(queue.table, datum)
+ queue.lock.Unlock()
+ handle(datum)
+ }
+ }, config.WorkerPoolConfiguration)
+
+ queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
+ return queue, nil
+}
+
+// Run starts to run the queue
+func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ atShutdown(context.Background(), func() {
+ log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
+ })
+ atTerminate(context.Background(), func() {
+ log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
+ })
+ log.Debug("ChannelUniqueQueue: %s Starting", q.name)
+ go func() {
+ _ = q.AddWorkers(q.workers, 0)
+ }()
+}
+
+// Push will push data into the queue if the data is not already in the queue
+func (q *ChannelUniqueQueue) Push(data Data) error {
+ return q.PushFunc(data, nil)
+}
+
+// PushFunc will push data into the queue
+func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
+ if !assignableTo(data, q.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
+ }
+ q.lock.Lock()
+ locked := true
+ defer func() {
+ if locked {
+ q.lock.Unlock()
+ }
+ }()
+ if _, ok := q.table[data]; ok {
+ return ErrAlreadyInQueue
+ }
+ // FIXME: We probably need to implement some sort of limit here
+ // If the downstream queue blocks this table will grow without limit
+ q.table[data] = true
+ if fn != nil {
+ err := fn()
+ if err != nil {
+ delete(q.table, data)
+ return err
+ }
+ }
+ locked = false
+ q.lock.Unlock()
+ q.WorkerPool.Push(data)
+ return nil
+}
+
+// Has checks if the data is in the queue
+func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ _, has := q.table[data]
+ return has, nil
+}
+
+// Name returns the name of this queue
+func (q *ChannelUniqueQueue) Name() string {
+ return q.name
+}
+
+func init() {
+ queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
+}
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
new file mode 100644
index 0000000000..bfe7aeed83
--- /dev/null
+++ b/modules/queue/unique_queue_disk.go
@@ -0,0 +1,104 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "gitea.com/lunny/levelqueue"
+)
+
+// LevelUniqueQueueType is the type for level queue
+const LevelUniqueQueueType Type = "unique-level"
+
+// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
+type LevelUniqueQueueConfiguration struct {
+ ByteFIFOQueueConfiguration
+ DataDir string
+}
+
+// LevelUniqueQueue implements a disk library queue
+type LevelUniqueQueue struct {
+ *ByteFIFOUniqueQueue
+}
+
+// NewLevelUniqueQueue creates a ledis local queue
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(LevelUniqueQueueConfiguration)
+
+ byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir)
+ if err != nil {
+ return nil, err
+ }
+
+ byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
+ if err != nil {
+ return nil, err
+ }
+
+ queue := &LevelUniqueQueue{
+ ByteFIFOUniqueQueue: byteFIFOQueue,
+ }
+ queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
+ return queue, nil
+}
+
+var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{}
+
+// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
+type LevelUniqueQueueByteFIFO struct {
+ internal *levelqueue.UniqueQueue
+}
+
+// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
+func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) {
+ internal, err := levelqueue.OpenUnique(dataDir)
+ if err != nil {
+ return nil, err
+ }
+
+ return &LevelUniqueQueueByteFIFO{
+ internal: internal,
+ }, nil
+}
+
+// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
+ return fifo.internal.LPushFunc(data, fn)
+}
+
+// Pop pops data from the start of the fifo
+func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
+ data, err := fifo.internal.RPop()
+ if err != nil && err != levelqueue.ErrNotFound {
+ return nil, err
+ }
+ return data, nil
+}
+
+// Len returns the length of the fifo
+func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
+ return fifo.internal.Len()
+}
+
+// Has returns whether the fifo contains this data
+func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
+ return fifo.internal.Has(data)
+}
+
+// Close this fifo
+func (fifo *LevelUniqueQueueByteFIFO) Close() error {
+ return fifo.internal.Close()
+}
+
+func init() {
+ queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
+}
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
new file mode 100644
index 0000000000..71049f3259
--- /dev/null
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -0,0 +1,241 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// PersistableChannelUniqueQueueType is the type for persistable queue
+const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"
+
+// PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue
+type PersistableChannelUniqueQueueConfiguration struct {
+ Name string
+ DataDir string
+ BatchLength int
+ QueueLength int
+ Timeout time.Duration
+ MaxAttempts int
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+}
+
+// PersistableChannelUniqueQueue wraps a channel queue and level queue together
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+type PersistableChannelUniqueQueue struct {
+ *ChannelUniqueQueue
+ delayedStarter
+ lock sync.Mutex
+ closed chan struct{}
+}
+
+// NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
+// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
+func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(PersistableChannelUniqueQueueConfiguration)
+
+ channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ BlockTimeout: config.BlockTimeout,
+ BoostTimeout: config.BoostTimeout,
+ BoostWorkers: config.BoostWorkers,
+ MaxWorkers: config.MaxWorkers,
+ },
+ Workers: config.Workers,
+ Name: config.Name + "-channel",
+ }, exemplar)
+ if err != nil {
+ return nil, err
+ }
+
+ // the level backend only needs temporary workers to catch up with the previously dropped work
+ levelCfg := LevelUniqueQueueConfiguration{
+ ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ BlockTimeout: 0,
+ BoostTimeout: 0,
+ BoostWorkers: 0,
+ MaxWorkers: 1,
+ },
+ Workers: 1,
+ Name: config.Name + "-level",
+ },
+ DataDir: config.DataDir,
+ }
+
+ queue := &PersistableChannelUniqueQueue{
+ ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue),
+ closed: make(chan struct{}),
+ }
+
+ levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
+ for _, datum := range data {
+ err := queue.Push(datum)
+ if err != nil && err != ErrAlreadyInQueue {
+ log.Error("Unable push to channelled queue: %v", err)
+ }
+ }
+ }, levelCfg, exemplar)
+ if err == nil {
+ queue.delayedStarter = delayedStarter{
+ internal: levelQueue.(*LevelUniqueQueue),
+ name: config.Name,
+ }
+
+ _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
+ return queue, nil
+ }
+ if IsErrInvalidConfiguration(err) {
+ // Retrying ain't gonna make this any better...
+ return nil, ErrInvalidConfiguration{cfg: cfg}
+ }
+
+ queue.delayedStarter = delayedStarter{
+ cfg: levelCfg,
+ underlying: LevelUniqueQueueType,
+ timeout: config.Timeout,
+ maxAttempts: config.MaxAttempts,
+ name: config.Name,
+ }
+ _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
+ return queue, nil
+}
+
+// Name returns the name of this queue
+func (q *PersistableChannelUniqueQueue) Name() string {
+ return q.delayedStarter.name
+}
+
+// Push will push the indexer data to queue
+func (q *PersistableChannelUniqueQueue) Push(data Data) error {
+ return q.PushFunc(data, nil)
+}
+
+// PushFunc will push the indexer data to queue
+func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
+ select {
+ case <-q.closed:
+ return q.internal.(UniqueQueue).PushFunc(data, fn)
+ default:
+ return q.ChannelUniqueQueue.PushFunc(data, fn)
+ }
+}
+
+// Has will test if the queue has the data
+func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
+ // This is more difficult...
+ has, err := q.ChannelUniqueQueue.Has(data)
+ if err != nil || has {
+ return has, err
+ }
+ return q.internal.(UniqueQueue).Has(data)
+}
+
+// Run starts to run the queue
+func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
+
+ q.lock.Lock()
+ if q.internal == nil {
+ err := q.setInternal(atShutdown, func(data ...Data) {
+ for _, datum := range data {
+ err := q.Push(datum)
+ if err != nil && err != ErrAlreadyInQueue {
+ log.Error("Unable push to channelled queue: %v", err)
+ }
+ }
+ }, q.exemplar)
+ q.lock.Unlock()
+ if err != nil {
+ log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
+ return
+ }
+ } else {
+ q.lock.Unlock()
+ }
+ atShutdown(context.Background(), q.Shutdown)
+ atTerminate(context.Background(), q.Terminate)
+
+ // Just run the level queue - we shut it down later
+ go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
+
+ go func() {
+ _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0)
+ }()
+
+ log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name)
+ <-q.closed
+ log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
+ q.internal.(*LevelUniqueQueue).cancel()
+ q.ChannelUniqueQueue.cancel()
+ log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
+ q.ChannelUniqueQueue.Wait()
+ q.internal.(*LevelUniqueQueue).Wait()
+ // Redirect all remaining data in the chan to the internal channel
+ go func() {
+ log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.ChannelUniqueQueue.dataChan {
+ _ = q.internal.Push(data)
+ }
+ log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+ }()
+ log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
+}
+
+// Flush flushes the queue
+func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
+ return q.ChannelUniqueQueue.Flush(timeout)
+}
+
+// Shutdown processing this queue
+func (q *PersistableChannelUniqueQueue) Shutdown() {
+ log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ select {
+ case <-q.closed:
+ default:
+ if q.internal != nil {
+ q.internal.(*LevelUniqueQueue).Shutdown()
+ }
+ close(q.closed)
+ }
+ log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
+}
+
+// Terminate this queue and close the queue
+func (q *PersistableChannelUniqueQueue) Terminate() {
+ log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name)
+ q.Shutdown()
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if q.internal != nil {
+ q.internal.(*LevelUniqueQueue).Terminate()
+ }
+ log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
+}
+
+func init() {
+ queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue
+}
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
new file mode 100644
index 0000000000..e5b2c48dbb
--- /dev/null
+++ b/modules/queue/unique_queue_redis.go
@@ -0,0 +1,124 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+// RedisUniqueQueueType is the type for redis queue
+const RedisUniqueQueueType Type = "unique-redis"
+
+// RedisUniqueQueue redis queue
+type RedisUniqueQueue struct {
+ *ByteFIFOUniqueQueue
+}
+
+// RedisUniqueQueueConfiguration is the configuration for the redis queue
+type RedisUniqueQueueConfiguration struct {
+ ByteFIFOQueueConfiguration
+ RedisUniqueByteFIFOConfiguration
+}
+
+// NewRedisUniqueQueue creates single redis or cluster redis queue.
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(RedisUniqueQueueConfiguration)
+
+ byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(byteFIFO.setName) == 0 {
+ byteFIFO.setName = byteFIFO.queueName + "_unique"
+ }
+
+ byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
+ if err != nil {
+ return nil, err
+ }
+
+ queue := &RedisUniqueQueue{
+ ByteFIFOUniqueQueue: byteFIFOQueue,
+ }
+
+ queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
+
+ return queue, nil
+}
+
+var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{}
+
+// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
+type RedisUniqueByteFIFO struct {
+ RedisByteFIFO
+ setName string
+}
+
+// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
+type RedisUniqueByteFIFOConfiguration struct {
+ RedisByteFIFOConfiguration
+ SetName string
+}
+
+// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
+func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
+ internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
+ if err != nil {
+ return nil, err
+ }
+
+ fifo := &RedisUniqueByteFIFO{
+ RedisByteFIFO: *internal,
+ setName: config.SetName,
+ }
+
+ return fifo, nil
+}
+
+// PushFunc pushes data to the end of the fifo and calls the callback if it is added
+func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
+ added, err := fifo.client.SAdd(fifo.setName, data).Result()
+ if err != nil {
+ return err
+ }
+ if added == 0 {
+ return ErrAlreadyInQueue
+ }
+ if fn != nil {
+ if err := fn(); err != nil {
+ return err
+ }
+ }
+ return fifo.client.RPush(fifo.queueName, data).Err()
+}
+
+// Pop pops data from the start of the fifo
+func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
+ data, err := fifo.client.LPop(fifo.queueName).Bytes()
+ if err != nil {
+ return data, err
+ }
+
+ if len(data) == 0 {
+ return data, nil
+ }
+
+ err = fifo.client.SRem(fifo.setName, data).Err()
+ return data, err
+}
+
+// Has returns whether the fifo contains this data
+func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
+ return fifo.client.SIsMember(fifo.setName, data).Result()
+}
+
+func init() {
+ queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
+}
diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go
new file mode 100644
index 0000000000..8c815218dd
--- /dev/null
+++ b/modules/queue/unique_queue_wrapped.go
@@ -0,0 +1,172 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+// WrappedUniqueQueueType is the type for a wrapped delayed starting queue
+const WrappedUniqueQueueType Type = "unique-wrapped"
+
+// WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue
+type WrappedUniqueQueueConfiguration struct {
+ Underlying Type
+ Timeout time.Duration
+ MaxAttempts int
+ Config interface{}
+ QueueLength int
+ Name string
+}
+
+// WrappedUniqueQueue wraps a delayed starting unique queue
+type WrappedUniqueQueue struct {
+ *WrappedQueue
+ table map[Data]bool
+ tlock sync.Mutex
+ ready bool
+}
+
+// NewWrappedUniqueQueue will attempt to create a unique queue of the provided type,
+// but if there is a problem creating this queue it will instead create
+// a WrappedUniqueQueue with delayed startup of the queue instead and a
+// channel which will be redirected to the queue
+//
+// Please note that this Queue does not guarantee that a particular
+// task cannot be processed twice or more at the same time. Uniqueness is
+// only guaranteed whilst the task is waiting in the queue.
+func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(WrappedUniqueQueueConfiguration)
+
+ queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
+ if err == nil {
+ // Just return the queue there is no need to wrap
+ return queue, nil
+ }
+ if IsErrInvalidConfiguration(err) {
+ // Retrying ain't gonna make this any better...
+ return nil, ErrInvalidConfiguration{cfg: cfg}
+ }
+
+ wrapped := &WrappedUniqueQueue{
+ WrappedQueue: &WrappedQueue{
+ channel: make(chan Data, config.QueueLength),
+ exemplar: exemplar,
+ delayedStarter: delayedStarter{
+ cfg: config.Config,
+ underlying: config.Underlying,
+ timeout: config.Timeout,
+ maxAttempts: config.MaxAttempts,
+ name: config.Name,
+ },
+ },
+ table: map[Data]bool{},
+ }
+
+ // wrapped.handle is passed to the delayedStarting internal queue and is run to handle
+ // data passed to
+ wrapped.handle = func(data ...Data) {
+ for _, datum := range data {
+ wrapped.tlock.Lock()
+ if !wrapped.ready {
+ delete(wrapped.table, data)
+ // If our table is empty all of the requests we have buffered between the
+ // wrapper queue starting and the internal queue starting have been handled.
+ // We can stop buffering requests in our local table and just pass Push
+ // direct to the internal queue
+ if len(wrapped.table) == 0 {
+ wrapped.ready = true
+ }
+ }
+ wrapped.tlock.Unlock()
+ handle(datum)
+ }
+ }
+ _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
+ return wrapped, nil
+}
+
+// Push will push the data to the internal channel checking it against the exemplar
+func (q *WrappedUniqueQueue) Push(data Data) error {
+ return q.PushFunc(data, nil)
+}
+
+// PushFunc will push the data to the internal channel checking it against the exemplar
+func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
+ if !assignableTo(data, q.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ }
+
+ q.tlock.Lock()
+ if q.ready {
+ // ready means our table is empty and all of the requests we have buffered between the
+ // wrapper queue starting and the internal queue starting have been handled.
+ // We can stop buffering requests in our local table and just pass Push
+ // direct to the internal queue
+ q.tlock.Unlock()
+ return q.internal.(UniqueQueue).PushFunc(data, fn)
+ }
+
+ locked := true
+ defer func() {
+ if locked {
+ q.tlock.Unlock()
+ }
+ }()
+ if _, ok := q.table[data]; ok {
+ return ErrAlreadyInQueue
+ }
+ // FIXME: We probably need to implement some sort of limit here
+ // If the downstream queue blocks this table will grow without limit
+ q.table[data] = true
+ if fn != nil {
+ err := fn()
+ if err != nil {
+ delete(q.table, data)
+ return err
+ }
+ }
+ locked = false
+ q.tlock.Unlock()
+
+ q.channel <- data
+ return nil
+}
+
+// Has checks if the data is in the queue
+func (q *WrappedUniqueQueue) Has(data Data) (bool, error) {
+ q.tlock.Lock()
+ defer q.tlock.Unlock()
+ if q.ready {
+ return q.internal.(UniqueQueue).Has(data)
+ }
+ _, has := q.table[data]
+ return has, nil
+}
+
+// IsEmpty checks whether the queue is empty
+func (q *WrappedUniqueQueue) IsEmpty() bool {
+ q.tlock.Lock()
+ if len(q.table) > 0 {
+ q.tlock.Unlock()
+ return false
+ }
+ if q.ready {
+ q.tlock.Unlock()
+ return q.internal.IsEmpty()
+ }
+ q.tlock.Unlock()
+ return false
+}
+
+func init() {
+ queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue
+}
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index 934c5a8108..8bdca1017f 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -26,6 +26,7 @@ type QueueSettings struct {
Addresses string
Password string
QueueName string
+ SetName string
DBIndex int
WrapIfNecessary bool
MaxAttempts int
@@ -54,8 +55,13 @@ func GetQueueSettings(name string) QueueSettings {
q.DataDir = key.MustString(q.DataDir)
case "QUEUE_NAME":
q.QueueName = key.MustString(q.QueueName)
+ case "SET_NAME":
+ q.SetName = key.MustString(q.SetName)
}
}
+ if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
+ q.SetName = q.QueueName + Queue.SetName
+ }
if !filepath.IsAbs(q.DataDir) {
q.DataDir = filepath.Join(AppDataPath, q.DataDir)
}
@@ -100,6 +106,7 @@ func NewQueueService() {
Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
+ Queue.SetName = sec.Key("SET_NAME").MustString("")
// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
@@ -142,6 +149,17 @@ func NewQueueService() {
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
}
+
+ // Handle the old test pull requests configuration
+ // Please note this will be a unique queue
+ section = Cfg.Section("queue.pr_patch_checker")
+ sectionMap = map[string]bool{}
+ for _, key := range section.Keys() {
+ sectionMap[key.Name()] = true
+ }
+ if _, ok := sectionMap["LENGTH"]; !ok {
+ _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
+ }
}
// ParseQueueConnStr parses a queue connection string
diff --git a/routers/init.go b/routers/init.go
index 1d7cf78438..f86a7ad4b2 100644
--- a/routers/init.go
+++ b/routers/init.go
@@ -113,7 +113,9 @@ func GlobalInit(ctx context.Context) {
code_indexer.Init()
mirror_service.InitSyncMirrors()
webhook.InitDeliverHooks()
- pull_service.Init()
+ if err := pull_service.Init(); err != nil {
+ log.Fatal("Failed to initialize test pull requests queue: %v", err)
+ }
if err := task.Init(); err != nil {
log.Fatal("Failed to initialize task scheduler: %v", err)
}
diff --git a/services/pull/check.go b/services/pull/check.go
index 5d380b4609..d64f49de3b 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -10,6 +10,7 @@ import (
"fmt"
"io/ioutil"
"os"
+ "strconv"
"strings"
"code.gitea.io/gitea/models"
@@ -17,24 +18,32 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
- "code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/sync"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/timeutil"
"github.com/unknwon/com"
)
-// pullRequestQueue represents a queue to handle update pull request tests
-var pullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength)
+// prQueue represents a queue to handle update pull request tests
+var prQueue queue.UniqueQueue
// AddToTaskQueue adds itself to pull request test task queue.
func AddToTaskQueue(pr *models.PullRequest) {
- go pullRequestQueue.AddFunc(pr.ID, func() {
- pr.Status = models.PullRequestStatusChecking
- if err := pr.UpdateCols("status"); err != nil {
- log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
+ go func() {
+ err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error {
+ pr.Status = models.PullRequestStatusChecking
+ err := pr.UpdateCols("status")
+ if err != nil {
+ log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
+ } else {
+ log.Trace("Adding PR ID: %d to the test pull requests queue", pr.ID)
+ }
+ return err
+ })
+ if err != nil && err != queue.ErrAlreadyInQueue {
+ log.Error("Error adding prID %d to the test pull requests queue: %v", pr.ID, err)
}
- })
+ }()
}
// checkAndUpdateStatus checks if pull request is possible to leaving checking status,
@@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
}
// Make sure there is no waiting test to process before leaving the checking status.
- if !pullRequestQueue.Exist(pr.ID) {
+ has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
+ if err != nil {
+ log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err)
+ }
+
+ if !has {
if err := pr.UpdateCols("status, conflicted_files"); err != nil {
log.Error("Update[%d]: %v", pr.ID, err)
}
@@ -73,7 +87,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) {
headFile := pr.GetGitRefName()
// Check if a pull request is merged into BaseBranch
- _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
+ _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).
+ RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
if err != nil {
// Errors are signaled by a non-zero status that is not 1
if strings.Contains(err.Error(), "exit status 1") {
@@ -93,7 +108,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) {
cmd := commitID[:40] + ".." + pr.BaseBranch
// Get the commit from BaseBranch where the pull request got merged
- mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
+ mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).
+ RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
if err != nil {
return nil, fmt.Errorf("git rev-list --ancestry-path --merges --reverse: %v", err)
} else if len(mergeCommit) < 40 {
@@ -155,61 +171,65 @@ func manuallyMerged(pr *models.PullRequest) bool {
return false
}
-// TestPullRequests checks and tests untested patches of pull requests.
-// TODO: test more pull requests at same time.
-func TestPullRequests(ctx context.Context) {
-
- go func() {
- prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
- if err != nil {
- log.Error("Find Checking PRs: %v", err)
+// InitializePullRequests checks and tests untested patches of pull requests.
+func InitializePullRequests(ctx context.Context) {
+ prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
+ if err != nil {
+ log.Error("Find Checking PRs: %v", err)
+ return
+ }
+ for _, prID := range prs {
+ select {
+ case <-ctx.Done():
return
- }
- for _, prID := range prs {
- select {
- case <-ctx.Done():
- return
- default:
- pullRequestQueue.Add(prID)
+ default:
+ if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error {
+ log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID)
+ return nil
+ }); err != nil {
+ log.Error("Error adding prID: %s to the pull requests patch checking queue %v", prID, err)
}
}
- }()
+ }
+}
- // Start listening on new test requests.
- for {
- select {
- case prID := <-pullRequestQueue.Queue():
- log.Trace("TestPullRequests[%v]: processing test task", prID)
- pullRequestQueue.Remove(prID)
+// handle passed PR IDs and test the PRs
+func handle(data ...queue.Data) {
+ for _, datum := range data {
+ prID := datum.(string)
+ id := com.StrTo(prID).MustInt64()
- id := com.StrTo(prID).MustInt64()
+ log.Trace("Testing PR ID %d from the pull requests patch checking queue", id)
- pr, err := models.GetPullRequestByID(id)
- if err != nil {
- log.Error("GetPullRequestByID[%s]: %v", prID, err)
- continue
- } else if pr.Status != models.PullRequestStatusChecking {
- continue
- } else if manuallyMerged(pr) {
- continue
- } else if err = TestPatch(pr); err != nil {
- log.Error("testPatch[%d]: %v", pr.ID, err)
- pr.Status = models.PullRequestStatusError
- if err := pr.UpdateCols("status"); err != nil {
- log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
- }
- continue
+ pr, err := models.GetPullRequestByID(id)
+ if err != nil {
+ log.Error("GetPullRequestByID[%s]: %v", prID, err)
+ continue
+ } else if pr.Status != models.PullRequestStatusChecking {
+ continue
+ } else if manuallyMerged(pr) {
+ continue
+ } else if err = TestPatch(pr); err != nil {
+ log.Error("testPatch[%d]: %v", pr.ID, err)
+ pr.Status = models.PullRequestStatusError
+ if err := pr.UpdateCols("status"); err != nil {
+ log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
}
- checkAndUpdateStatus(pr)
- case <-ctx.Done():
- pullRequestQueue.Close()
- log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
- return
+ continue
}
+ checkAndUpdateStatus(pr)
}
}
// Init runs the task queue to test all the checking status pull requests
-func Init() {
- go graceful.GetManager().RunWithShutdownContext(TestPullRequests)
+func Init() error {
+ prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "").(queue.UniqueQueue)
+
+ if prQueue == nil {
+ return fmt.Errorf("Unable to create pr_patch_checker Queue")
+ }
+
+ go graceful.GetManager().RunWithShutdownFns(prQueue.Run)
+ go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
+ return nil
}
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index 48a7774a61..4591edd7aa 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -6,29 +6,82 @@
package pull
import (
+ "context"
"strconv"
"testing"
"time"
"code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/queue"
"github.com/stretchr/testify/assert"
+ "github.com/unknwon/com"
)
func TestPullRequest_AddToTaskQueue(t *testing.T) {
assert.NoError(t, models.PrepareTestDatabase())
+ idChan := make(chan int64, 10)
+
+ q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) {
+ for _, datum := range data {
+ prID := datum.(string)
+ id := com.StrTo(prID).MustInt64()
+ idChan <- id
+ }
+ }, queue.ChannelUniqueQueueConfiguration{
+ WorkerPoolConfiguration: queue.WorkerPoolConfiguration{
+ QueueLength: 10,
+ BatchLength: 1,
+ },
+ Workers: 1,
+ Name: "temporary-queue",
+ }, "")
+ assert.NoError(t, err)
+
+ queueShutdown := []func(){}
+ queueTerminate := []func(){}
+
+ prQueue = q.(queue.UniqueQueue)
+
pr := models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
AddToTaskQueue(pr)
+ assert.Eventually(t, func() bool {
+ pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
+ return pr.Status == models.PullRequestStatusChecking
+ }, 1*time.Second, 100*time.Millisecond)
+
+ has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
+ assert.True(t, has)
+ assert.NoError(t, err)
+
+ prQueue.Run(func(_ context.Context, shutdown func()) {
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(_ context.Context, terminate func()) {
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
select {
- case id := <-pullRequestQueue.Queue():
- assert.EqualValues(t, strconv.FormatInt(pr.ID, 10), id)
+ case id := <-idChan:
+ assert.EqualValues(t, pr.ID, id)
case <-time.After(time.Second):
assert.Fail(t, "Timeout: nothing was added to pullRequestQueue")
}
- assert.True(t, pullRequestQueue.Exist(pr.ID))
+ has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10))
+ assert.False(t, has)
+ assert.NoError(t, err)
+
pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
assert.Equal(t, models.PullRequestStatusChecking, pr.Status)
+
+ for _, callback := range queueShutdown {
+ callback()
+ }
+ for _, callback := range queueTerminate {
+ callback()
+ }
+
+ prQueue = nil
}
diff --git a/vendor/gitea.com/lunny/levelqueue/.gitignore b/vendor/gitea.com/lunny/levelqueue/.gitignore
index 59a8bdee30..ab1fe76029 100644
--- a/vendor/gitea.com/lunny/levelqueue/.gitignore
+++ b/vendor/gitea.com/lunny/levelqueue/.gitignore
@@ -1,3 +1,7 @@
queue/
queue_pop/
-queue_push/ \ No newline at end of file
+queue_push/
+uniquequeue/
+uniquequeue_pop/
+uniquequeue_push/
+set/
diff --git a/vendor/gitea.com/lunny/levelqueue/README.md b/vendor/gitea.com/lunny/levelqueue/README.md
index 80a0853cf6..21db280839 100644
--- a/vendor/gitea.com/lunny/levelqueue/README.md
+++ b/vendor/gitea.com/lunny/levelqueue/README.md
@@ -25,4 +25,36 @@ data, err = queue.LPop()
queue.LHandle(func(dt []byte) error{
return nil
})
-``` \ No newline at end of file
+```
+
+You can now create a Set from a leveldb:
+
+```Go
+set, err := levelqueue.OpenSet("./set")
+
+added, err:= set.Add([]byte("member1"))
+
+has, err := set.Has([]byte("member1"))
+
+members, err := set.Members()
+
+removed, err := set.Remove([]byte("member1"))
+```
+
+And you can create a UniqueQueue from a leveldb:
+
+```Go
+queue, err := levelqueue.OpenUnique("./queue")
+
+err := queue.RPush([]byte("member1"))
+
+err = queue.LPush([]byte("member1"))
+// Will return ErrAlreadyInQueue
+
+// and so on.
+```
+
+## Creating Queues, UniqueQueues and Sets from already open DB
+
+If you have an already open DB you can create these from this using the
+`NewQueue`, `NewUniqueQueue` and `NewSet` functions. \ No newline at end of file
diff --git a/vendor/gitea.com/lunny/levelqueue/error.go b/vendor/gitea.com/lunny/levelqueue/error.go
index d639c5d496..648c185655 100644
--- a/vendor/gitea.com/lunny/levelqueue/error.go
+++ b/vendor/gitea.com/lunny/levelqueue/error.go
@@ -7,6 +7,8 @@ package levelqueue
import "errors"
var (
- // ErrNotFound means no element in queue
+ // ErrNotFound means no elements in queue
ErrNotFound = errors.New("no key found")
+
+ ErrAlreadyInQueue = errors.New("value already in queue")
)
diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go
index af624db8e4..20ed90100c 100644
--- a/vendor/gitea.com/lunny/levelqueue/queue.go
+++ b/vendor/gitea.com/lunny/levelqueue/queue.go
@@ -12,37 +12,62 @@ import (
"github.com/syndtr/goleveldb/leveldb"
)
+const (
+ lowKeyStr = "low"
+ highKeyStr = "high"
+)
+
// Queue defines a queue struct
type Queue struct {
- db *leveldb.DB
- highLock sync.Mutex
- lowLock sync.Mutex
- low int64
- high int64
+ db *leveldb.DB
+ highLock sync.Mutex
+ lowLock sync.Mutex
+ low int64
+ high int64
+ lowKey []byte
+ highKey []byte
+ prefix []byte
+ closeUnderlyingDB bool
}
-// Open opens a queue object or create it if not exist
+// Open opens a queue from the db path or creates a
+// queue if it doesn't exist.
+// The keys will not be prefixed by default
func Open(dataDir string) (*Queue, error) {
db, err := leveldb.OpenFile(dataDir, nil)
if err != nil {
return nil, err
}
+ return NewQueue(db, []byte{}, true)
+}
+
+// NewQueue creates a queue from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
+ var err error
var queue = &Queue{
- db: db,
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
}
- queue.low, err = queue.readID(lowKey)
+
+ queue.prefix = make([]byte, len(prefix))
+ copy(queue.prefix, prefix)
+ queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
+ queue.highKey = withPrefix(prefix, []byte(highKeyStr))
+
+ queue.low, err = queue.readID(queue.lowKey)
if err == leveldb.ErrNotFound {
queue.low = 1
- err = db.Put(lowKey, id2bytes(1), nil)
+ err = db.Put(queue.lowKey, id2bytes(1), nil)
}
if err != nil {
return nil, err
}
- queue.high, err = queue.readID(highKey)
+ queue.high, err = queue.readID(queue.highKey)
if err == leveldb.ErrNotFound {
- err = db.Put(highKey, id2bytes(0), nil)
+ err = db.Put(queue.highKey, id2bytes(0), nil)
}
if err != nil {
return nil, err
@@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) {
return bytes2id(bs)
}
-var (
- lowKey = []byte("low")
- highKey = []byte("high")
-)
-
func (queue *Queue) highincrement() (int64, error) {
id := queue.high + 1
queue.high = id
- err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high - 1
return 0, err
@@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) {
func (queue *Queue) highdecrement() (int64, error) {
queue.high = queue.high - 1
- err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high + 1
return 0, err
@@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) {
func (queue *Queue) lowincrement() (int64, error) {
queue.low = queue.low + 1
- err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low - 1
return 0, err
@@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) {
func (queue *Queue) lowdecrement() (int64, error) {
queue.low = queue.low - 1
- err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low + 1
return 0, err
@@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) {
return binary.ReadVarint(bytes.NewReader(b))
}
+func withPrefix(prefix []byte, value []byte) []byte {
+ if len(prefix) == 0 {
+ return value
+ }
+ prefixed := make([]byte, len(prefix)+1+len(value))
+ copy(prefixed[0:len(prefix)], prefix)
+ prefixed[len(prefix)] = '-'
+ copy(prefixed[len(prefix)+1:], value)
+ return prefixed
+}
+
// RPush pushes a data from right of queue
func (queue *Queue) RPush(data []byte) error {
queue.highLock.Lock()
@@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error {
queue.highLock.Unlock()
return err
}
- err = queue.db.Put(id2bytes(id), data, nil)
+ err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.highLock.Unlock()
return err
}
@@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error {
queue.lowLock.Unlock()
return err
}
- err = queue.db.Put(id2bytes(id), data, nil)
+ err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.lowLock.Unlock()
return err
}
@@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) {
defer queue.highLock.Unlock()
currentID := queue.high
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
@@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) {
return nil, err
}
- err = queue.db.Delete(id2bytes(currentID), nil)
+ err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
@@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
defer queue.highLock.Unlock()
currentID := queue.high
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
@@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
return err
}
- return queue.db.Delete(id2bytes(currentID), nil)
+ return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
// LPop pop a data from left of queue
@@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) {
defer queue.lowLock.Unlock()
currentID := queue.low
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
@@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) {
return nil, err
}
- err = queue.db.Delete(id2bytes(currentID), nil)
+ err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
@@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
defer queue.lowLock.Unlock()
currentID := queue.low
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
@@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
return err
}
- return queue.db.Delete(id2bytes(currentID), nil)
+ return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
-// Close closes the queue
+// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
func (queue *Queue) Close() error {
+ if !queue.closeUnderlyingDB {
+ queue.db = nil
+ return nil
+ }
err := queue.db.Close()
queue.db = nil
return err
diff --git a/vendor/gitea.com/lunny/levelqueue/set.go b/vendor/gitea.com/lunny/levelqueue/set.go
new file mode 100644
index 0000000000..88f4e9b1d1
--- /dev/null
+++ b/vendor/gitea.com/lunny/levelqueue/set.go
@@ -0,0 +1,110 @@
+// Copyright 2020 Andrew Thornton. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+ "sync"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+const (
+ setPrefixStr = "set"
+)
+
+// Set defines a set struct
+type Set struct {
+ db *leveldb.DB
+ closeUnderlyingDB bool
+ lock sync.Mutex
+ prefix []byte
+}
+
+// OpenSet opens a set from the db path or creates a set if it doesn't exist.
+// The keys will be prefixed with "set-" by default
+func OpenSet(dataDir string) (*Set, error) {
+ db, err := leveldb.OpenFile(dataDir, nil)
+ if err != nil {
+ return nil, err
+ }
+ return NewSet(db, []byte(setPrefixStr), true)
+}
+
+// NewSet creates a set from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) {
+ set := &Set{
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
+ }
+ set.prefix = make([]byte, len(prefix))
+ copy(set.prefix, prefix)
+
+ return set, nil
+}
+
+// Add adds a member string to a key set, returns true if the member was not already present
+func (set *Set) Add(value []byte) (bool, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ setKey := withPrefix(set.prefix, value)
+ has, err := set.db.Has(setKey, nil)
+ if err != nil || has {
+ return !has, err
+ }
+ return !has, set.db.Put(setKey, []byte(""), nil)
+}
+
+// Members returns the current members of the set
+func (set *Set) Members() ([][]byte, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ var members [][]byte
+ prefix := withPrefix(set.prefix, []byte{})
+ iter := set.db.NewIterator(util.BytesPrefix(prefix), nil)
+ for iter.Next() {
+ slice := iter.Key()[len(prefix):]
+ value := make([]byte, len(slice))
+ copy(value, slice)
+ members = append(members, value)
+ }
+ iter.Release()
+ return members, iter.Error()
+}
+
+// Has returns if the member is in the set
+func (set *Set) Has(value []byte) (bool, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ setKey := withPrefix(set.prefix, value)
+
+ return set.db.Has(setKey, nil)
+}
+
+// Remove removes a member from the set, returns true if the member was present
+func (set *Set) Remove(value []byte) (bool, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ setKey := withPrefix(set.prefix, value)
+
+ has, err := set.db.Has(setKey, nil)
+ if err != nil || !has {
+ return has, err
+ }
+
+ return has, set.db.Delete(setKey, nil)
+}
+
+// Close closes the set (and the underlying db if set to closeUnderlyingDB)
+func (set *Set) Close() error {
+ if !set.closeUnderlyingDB {
+ set.db = nil
+ return nil
+ }
+ err := set.db.Close()
+ set.db = nil
+ return err
+}
diff --git a/vendor/gitea.com/lunny/levelqueue/uniquequeue.go b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go
new file mode 100644
index 0000000000..8d2676b0d2
--- /dev/null
+++ b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go
@@ -0,0 +1,184 @@
+// Copyright 2020 Andrew Thornton. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+ "fmt"
+
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+const (
+ uniqueQueuePrefixStr = "unique"
+)
+
+// UniqueQueue defines an unique queue struct
+type UniqueQueue struct {
+ q *Queue
+ set *Set
+ db *leveldb.DB
+ closeUnderlyingDB bool
+}
+
+// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist.
+// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-"
+func OpenUnique(dataDir string) (*UniqueQueue, error) {
+ db, err := leveldb.OpenFile(dataDir, nil)
+ if err != nil {
+ return nil, err
+ }
+ return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true)
+}
+
+// NewUniqueQueue creates a new unique queue from a db.
+// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) {
+ internal, err := NewQueue(db, queuePrefix, false)
+ if err != nil {
+ return nil, err
+ }
+ set, err := NewSet(db, setPrefix, false)
+ if err != nil {
+ return nil, err
+ }
+ queue := &UniqueQueue{
+ q: internal,
+ set: set,
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
+ }
+
+ return queue, err
+}
+
+// LPush pushes data to the left of the queue
+func (queue *UniqueQueue) LPush(data []byte) error {
+ return queue.LPushFunc(data, nil)
+}
+
+// LPushFunc pushes data to the left of the queue and calls the callback if it is added
+func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error {
+ added, err := queue.set.Add(data)
+ if err != nil {
+ return err
+ }
+ if !added {
+ return ErrAlreadyInQueue
+ }
+
+ if fn != nil {
+ err = fn()
+ if err != nil {
+ _, remErr := queue.set.Remove(data)
+ if remErr != nil {
+ return fmt.Errorf("%v & %v", err, remErr)
+ }
+ return err
+ }
+ }
+
+ return queue.q.LPush(data)
+}
+
+// RPush pushes data to the right of the queue
+func (queue *UniqueQueue) RPush(data []byte) error {
+ return queue.RPushFunc(data, nil)
+}
+
+// RPushFunc pushes data to the right of the queue and calls the callback if is added
+func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error {
+ added, err := queue.set.Add(data)
+ if err != nil {
+ return err
+ }
+ if !added {
+ return ErrAlreadyInQueue
+ }
+
+ if fn != nil {
+ err = fn()
+ if err != nil {
+ _, remErr := queue.set.Remove(data)
+ if remErr != nil {
+ return fmt.Errorf("%v & %v", err, remErr)
+ }
+ return err
+ }
+ }
+
+ return queue.q.RPush(data)
+}
+
+// RPop pop data from the right of the queue
+func (queue *UniqueQueue) RPop() ([]byte, error) {
+ popped, err := queue.q.RPop()
+ if err != nil {
+ return popped, err
+ }
+ _, err = queue.set.Remove(popped)
+
+ return popped, err
+}
+
+// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
+func (queue *UniqueQueue) RHandle(h func([]byte) error) error {
+ return queue.q.RHandle(func(data []byte) error {
+ err := h(data)
+ if err != nil {
+ return err
+ }
+ _, err = queue.set.Remove(data)
+ return err
+ })
+}
+
+// LPop pops data from left of the queue
+func (queue *UniqueQueue) LPop() ([]byte, error) {
+ popped, err := queue.q.LPop()
+ if err != nil {
+ return popped, err
+ }
+ _, err = queue.set.Remove(popped)
+
+ return popped, err
+}
+
+// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
+func (queue *UniqueQueue) LHandle(h func([]byte) error) error {
+ return queue.q.LHandle(func(data []byte) error {
+ err := h(data)
+ if err != nil {
+ return err
+ }
+ _, err = queue.set.Remove(data)
+ return err
+ })
+}
+
+// Has checks whether the data is already in the queue
+func (queue *UniqueQueue) Has(data []byte) (bool, error) {
+ return queue.set.Has(data)
+}
+
+// Len returns the length of the queue
+func (queue *UniqueQueue) Len() int64 {
+ queue.set.lock.Lock()
+ defer queue.set.lock.Unlock()
+ return queue.q.Len()
+}
+
+// Close closes the queue (and the underlying DB if set to closeUnderlyingDB)
+func (queue *UniqueQueue) Close() error {
+ _ = queue.q.Close()
+ _ = queue.set.Close()
+ if !queue.closeUnderlyingDB {
+ queue.db = nil
+ return nil
+ }
+ err := queue.db.Close()
+ queue.db = nil
+ return err
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 5203c24e4a..947008d63c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1,6 +1,6 @@
# cloud.google.com/go v0.45.0
cloud.google.com/go/compute/metadata
-# gitea.com/lunny/levelqueue v0.1.0
+# gitea.com/lunny/levelqueue v0.2.0
gitea.com/lunny/levelqueue
# gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b
gitea.com/macaron/binding