summaryrefslogtreecommitdiffstats
path: root/vendor/gitea.com/lunny/levelqueue
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-02-02 23:19:58 +0000
committerGitHub <noreply@github.com>2020-02-02 23:19:58 +0000
commit2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch)
treed5ca361d9597e027ad92f1e02a841be1d266b554 /vendor/gitea.com/lunny/levelqueue
parentb4914249ee389a733e7dcfd2df20708ab3215827 (diff)
downloadgitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz
gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'vendor/gitea.com/lunny/levelqueue')
-rw-r--r--vendor/gitea.com/lunny/levelqueue/.gitignore6
-rw-r--r--vendor/gitea.com/lunny/levelqueue/README.md34
-rw-r--r--vendor/gitea.com/lunny/levelqueue/error.go4
-rw-r--r--vendor/gitea.com/lunny/levelqueue/queue.go97
-rw-r--r--vendor/gitea.com/lunny/levelqueue/set.go110
-rw-r--r--vendor/gitea.com/lunny/levelqueue/uniquequeue.go184
6 files changed, 401 insertions, 34 deletions
diff --git a/vendor/gitea.com/lunny/levelqueue/.gitignore b/vendor/gitea.com/lunny/levelqueue/.gitignore
index 59a8bdee30..ab1fe76029 100644
--- a/vendor/gitea.com/lunny/levelqueue/.gitignore
+++ b/vendor/gitea.com/lunny/levelqueue/.gitignore
@@ -1,3 +1,7 @@
queue/
queue_pop/
-queue_push/ \ No newline at end of file
+queue_push/
+uniquequeue/
+uniquequeue_pop/
+uniquequeue_push/
+set/
diff --git a/vendor/gitea.com/lunny/levelqueue/README.md b/vendor/gitea.com/lunny/levelqueue/README.md
index 80a0853cf6..21db280839 100644
--- a/vendor/gitea.com/lunny/levelqueue/README.md
+++ b/vendor/gitea.com/lunny/levelqueue/README.md
@@ -25,4 +25,36 @@ data, err = queue.LPop()
queue.LHandle(func(dt []byte) error{
return nil
})
-``` \ No newline at end of file
+```
+
+You can now create a Set from a leveldb:
+
+```Go
+set, err := levelqueue.OpenSet("./set")
+
+added, err:= set.Add([]byte("member1"))
+
+has, err := set.Has([]byte("member1"))
+
+members, err := set.Members()
+
+removed, err := set.Remove([]byte("member1"))
+```
+
+And you can create a UniqueQueue from a leveldb:
+
+```Go
+queue, err := levelqueue.OpenUnique("./queue")
+
+err := queue.RPush([]byte("member1"))
+
+err = queue.LPush([]byte("member1"))
+// Will return ErrAlreadyInQueue
+
+// and so on.
+```
+
+## Creating Queues, UniqueQueues and Sets from already open DB
+
+If you have an already open DB you can create these from this using the
+`NewQueue`, `NewUniqueQueue` and `NewSet` functions. \ No newline at end of file
diff --git a/vendor/gitea.com/lunny/levelqueue/error.go b/vendor/gitea.com/lunny/levelqueue/error.go
index d639c5d496..648c185655 100644
--- a/vendor/gitea.com/lunny/levelqueue/error.go
+++ b/vendor/gitea.com/lunny/levelqueue/error.go
@@ -7,6 +7,8 @@ package levelqueue
import "errors"
var (
- // ErrNotFound means no element in queue
+ // ErrNotFound means no elements in queue
ErrNotFound = errors.New("no key found")
+
+ ErrAlreadyInQueue = errors.New("value already in queue")
)
diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go
index af624db8e4..20ed90100c 100644
--- a/vendor/gitea.com/lunny/levelqueue/queue.go
+++ b/vendor/gitea.com/lunny/levelqueue/queue.go
@@ -12,37 +12,62 @@ import (
"github.com/syndtr/goleveldb/leveldb"
)
+const (
+ lowKeyStr = "low"
+ highKeyStr = "high"
+)
+
// Queue defines a queue struct
type Queue struct {
- db *leveldb.DB
- highLock sync.Mutex
- lowLock sync.Mutex
- low int64
- high int64
+ db *leveldb.DB
+ highLock sync.Mutex
+ lowLock sync.Mutex
+ low int64
+ high int64
+ lowKey []byte
+ highKey []byte
+ prefix []byte
+ closeUnderlyingDB bool
}
-// Open opens a queue object or create it if not exist
+// Open opens a queue from the db path or creates a
+// queue if it doesn't exist.
+// The keys will not be prefixed by default
func Open(dataDir string) (*Queue, error) {
db, err := leveldb.OpenFile(dataDir, nil)
if err != nil {
return nil, err
}
+ return NewQueue(db, []byte{}, true)
+}
+
+// NewQueue creates a queue from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
+ var err error
var queue = &Queue{
- db: db,
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
}
- queue.low, err = queue.readID(lowKey)
+
+ queue.prefix = make([]byte, len(prefix))
+ copy(queue.prefix, prefix)
+ queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
+ queue.highKey = withPrefix(prefix, []byte(highKeyStr))
+
+ queue.low, err = queue.readID(queue.lowKey)
if err == leveldb.ErrNotFound {
queue.low = 1
- err = db.Put(lowKey, id2bytes(1), nil)
+ err = db.Put(queue.lowKey, id2bytes(1), nil)
}
if err != nil {
return nil, err
}
- queue.high, err = queue.readID(highKey)
+ queue.high, err = queue.readID(queue.highKey)
if err == leveldb.ErrNotFound {
- err = db.Put(highKey, id2bytes(0), nil)
+ err = db.Put(queue.highKey, id2bytes(0), nil)
}
if err != nil {
return nil, err
@@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) {
return bytes2id(bs)
}
-var (
- lowKey = []byte("low")
- highKey = []byte("high")
-)
-
func (queue *Queue) highincrement() (int64, error) {
id := queue.high + 1
queue.high = id
- err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high - 1
return 0, err
@@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) {
func (queue *Queue) highdecrement() (int64, error) {
queue.high = queue.high - 1
- err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high + 1
return 0, err
@@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) {
func (queue *Queue) lowincrement() (int64, error) {
queue.low = queue.low + 1
- err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low - 1
return 0, err
@@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) {
func (queue *Queue) lowdecrement() (int64, error) {
queue.low = queue.low - 1
- err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low + 1
return 0, err
@@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) {
return binary.ReadVarint(bytes.NewReader(b))
}
+func withPrefix(prefix []byte, value []byte) []byte {
+ if len(prefix) == 0 {
+ return value
+ }
+ prefixed := make([]byte, len(prefix)+1+len(value))
+ copy(prefixed[0:len(prefix)], prefix)
+ prefixed[len(prefix)] = '-'
+ copy(prefixed[len(prefix)+1:], value)
+ return prefixed
+}
+
// RPush pushes a data from right of queue
func (queue *Queue) RPush(data []byte) error {
queue.highLock.Lock()
@@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error {
queue.highLock.Unlock()
return err
}
- err = queue.db.Put(id2bytes(id), data, nil)
+ err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.highLock.Unlock()
return err
}
@@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error {
queue.lowLock.Unlock()
return err
}
- err = queue.db.Put(id2bytes(id), data, nil)
+ err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.lowLock.Unlock()
return err
}
@@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) {
defer queue.highLock.Unlock()
currentID := queue.high
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
@@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) {
return nil, err
}
- err = queue.db.Delete(id2bytes(currentID), nil)
+ err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
@@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
defer queue.highLock.Unlock()
currentID := queue.high
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
@@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
return err
}
- return queue.db.Delete(id2bytes(currentID), nil)
+ return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
// LPop pop a data from left of queue
@@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) {
defer queue.lowLock.Unlock()
currentID := queue.low
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
@@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) {
return nil, err
}
- err = queue.db.Delete(id2bytes(currentID), nil)
+ err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
@@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
defer queue.lowLock.Unlock()
currentID := queue.low
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
@@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
return err
}
- return queue.db.Delete(id2bytes(currentID), nil)
+ return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
-// Close closes the queue
+// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
func (queue *Queue) Close() error {
+ if !queue.closeUnderlyingDB {
+ queue.db = nil
+ return nil
+ }
err := queue.db.Close()
queue.db = nil
return err
diff --git a/vendor/gitea.com/lunny/levelqueue/set.go b/vendor/gitea.com/lunny/levelqueue/set.go
new file mode 100644
index 0000000000..88f4e9b1d1
--- /dev/null
+++ b/vendor/gitea.com/lunny/levelqueue/set.go
@@ -0,0 +1,110 @@
+// Copyright 2020 Andrew Thornton. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+ "sync"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+const (
+ setPrefixStr = "set"
+)
+
+// Set defines a set struct
+type Set struct {
+ db *leveldb.DB
+ closeUnderlyingDB bool
+ lock sync.Mutex
+ prefix []byte
+}
+
+// OpenSet opens a set from the db path or creates a set if it doesn't exist.
+// The keys will be prefixed with "set-" by default
+func OpenSet(dataDir string) (*Set, error) {
+ db, err := leveldb.OpenFile(dataDir, nil)
+ if err != nil {
+ return nil, err
+ }
+ return NewSet(db, []byte(setPrefixStr), true)
+}
+
+// NewSet creates a set from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) {
+ set := &Set{
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
+ }
+ set.prefix = make([]byte, len(prefix))
+ copy(set.prefix, prefix)
+
+ return set, nil
+}
+
+// Add adds a member string to a key set, returns true if the member was not already present
+func (set *Set) Add(value []byte) (bool, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ setKey := withPrefix(set.prefix, value)
+ has, err := set.db.Has(setKey, nil)
+ if err != nil || has {
+ return !has, err
+ }
+ return !has, set.db.Put(setKey, []byte(""), nil)
+}
+
+// Members returns the current members of the set
+func (set *Set) Members() ([][]byte, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ var members [][]byte
+ prefix := withPrefix(set.prefix, []byte{})
+ iter := set.db.NewIterator(util.BytesPrefix(prefix), nil)
+ for iter.Next() {
+ slice := iter.Key()[len(prefix):]
+ value := make([]byte, len(slice))
+ copy(value, slice)
+ members = append(members, value)
+ }
+ iter.Release()
+ return members, iter.Error()
+}
+
+// Has returns if the member is in the set
+func (set *Set) Has(value []byte) (bool, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ setKey := withPrefix(set.prefix, value)
+
+ return set.db.Has(setKey, nil)
+}
+
+// Remove removes a member from the set, returns true if the member was present
+func (set *Set) Remove(value []byte) (bool, error) {
+ set.lock.Lock()
+ defer set.lock.Unlock()
+ setKey := withPrefix(set.prefix, value)
+
+ has, err := set.db.Has(setKey, nil)
+ if err != nil || !has {
+ return has, err
+ }
+
+ return has, set.db.Delete(setKey, nil)
+}
+
+// Close closes the set (and the underlying db if set to closeUnderlyingDB)
+func (set *Set) Close() error {
+ if !set.closeUnderlyingDB {
+ set.db = nil
+ return nil
+ }
+ err := set.db.Close()
+ set.db = nil
+ return err
+}
diff --git a/vendor/gitea.com/lunny/levelqueue/uniquequeue.go b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go
new file mode 100644
index 0000000000..8d2676b0d2
--- /dev/null
+++ b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go
@@ -0,0 +1,184 @@
+// Copyright 2020 Andrew Thornton. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+ "fmt"
+
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+const (
+ uniqueQueuePrefixStr = "unique"
+)
+
+// UniqueQueue defines an unique queue struct
+type UniqueQueue struct {
+ q *Queue
+ set *Set
+ db *leveldb.DB
+ closeUnderlyingDB bool
+}
+
+// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist.
+// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-"
+func OpenUnique(dataDir string) (*UniqueQueue, error) {
+ db, err := leveldb.OpenFile(dataDir, nil)
+ if err != nil {
+ return nil, err
+ }
+ return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true)
+}
+
+// NewUniqueQueue creates a new unique queue from a db.
+// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) {
+ internal, err := NewQueue(db, queuePrefix, false)
+ if err != nil {
+ return nil, err
+ }
+ set, err := NewSet(db, setPrefix, false)
+ if err != nil {
+ return nil, err
+ }
+ queue := &UniqueQueue{
+ q: internal,
+ set: set,
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
+ }
+
+ return queue, err
+}
+
+// LPush pushes data to the left of the queue
+func (queue *UniqueQueue) LPush(data []byte) error {
+ return queue.LPushFunc(data, nil)
+}
+
+// LPushFunc pushes data to the left of the queue and calls the callback if it is added
+func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error {
+ added, err := queue.set.Add(data)
+ if err != nil {
+ return err
+ }
+ if !added {
+ return ErrAlreadyInQueue
+ }
+
+ if fn != nil {
+ err = fn()
+ if err != nil {
+ _, remErr := queue.set.Remove(data)
+ if remErr != nil {
+ return fmt.Errorf("%v & %v", err, remErr)
+ }
+ return err
+ }
+ }
+
+ return queue.q.LPush(data)
+}
+
+// RPush pushes data to the right of the queue
+func (queue *UniqueQueue) RPush(data []byte) error {
+ return queue.RPushFunc(data, nil)
+}
+
+// RPushFunc pushes data to the right of the queue and calls the callback if is added
+func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error {
+ added, err := queue.set.Add(data)
+ if err != nil {
+ return err
+ }
+ if !added {
+ return ErrAlreadyInQueue
+ }
+
+ if fn != nil {
+ err = fn()
+ if err != nil {
+ _, remErr := queue.set.Remove(data)
+ if remErr != nil {
+ return fmt.Errorf("%v & %v", err, remErr)
+ }
+ return err
+ }
+ }
+
+ return queue.q.RPush(data)
+}
+
+// RPop pop data from the right of the queue
+func (queue *UniqueQueue) RPop() ([]byte, error) {
+ popped, err := queue.q.RPop()
+ if err != nil {
+ return popped, err
+ }
+ _, err = queue.set.Remove(popped)
+
+ return popped, err
+}
+
+// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
+func (queue *UniqueQueue) RHandle(h func([]byte) error) error {
+ return queue.q.RHandle(func(data []byte) error {
+ err := h(data)
+ if err != nil {
+ return err
+ }
+ _, err = queue.set.Remove(data)
+ return err
+ })
+}
+
+// LPop pops data from left of the queue
+func (queue *UniqueQueue) LPop() ([]byte, error) {
+ popped, err := queue.q.LPop()
+ if err != nil {
+ return popped, err
+ }
+ _, err = queue.set.Remove(popped)
+
+ return popped, err
+}
+
+// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
+func (queue *UniqueQueue) LHandle(h func([]byte) error) error {
+ return queue.q.LHandle(func(data []byte) error {
+ err := h(data)
+ if err != nil {
+ return err
+ }
+ _, err = queue.set.Remove(data)
+ return err
+ })
+}
+
+// Has checks whether the data is already in the queue
+func (queue *UniqueQueue) Has(data []byte) (bool, error) {
+ return queue.set.Has(data)
+}
+
+// Len returns the length of the queue
+func (queue *UniqueQueue) Len() int64 {
+ queue.set.lock.Lock()
+ defer queue.set.lock.Unlock()
+ return queue.q.Len()
+}
+
+// Close closes the queue (and the underlying DB if set to closeUnderlyingDB)
+func (queue *UniqueQueue) Close() error {
+ _ = queue.q.Close()
+ _ = queue.set.Close()
+ if !queue.closeUnderlyingDB {
+ queue.db = nil
+ return nil
+ }
+ err := queue.db.Close()
+ queue.db = nil
+ return err
+}