diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /vendor/gitea.com/lunny/levelqueue | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'vendor/gitea.com/lunny/levelqueue')
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/.gitignore | 6 | ||||
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/README.md | 34 | ||||
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/error.go | 4 | ||||
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/queue.go | 97 | ||||
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/set.go | 110 | ||||
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/uniquequeue.go | 184 |
6 files changed, 401 insertions, 34 deletions
diff --git a/vendor/gitea.com/lunny/levelqueue/.gitignore b/vendor/gitea.com/lunny/levelqueue/.gitignore index 59a8bdee30..ab1fe76029 100644 --- a/vendor/gitea.com/lunny/levelqueue/.gitignore +++ b/vendor/gitea.com/lunny/levelqueue/.gitignore @@ -1,3 +1,7 @@ queue/ queue_pop/ -queue_push/
\ No newline at end of file +queue_push/ +uniquequeue/ +uniquequeue_pop/ +uniquequeue_push/ +set/ diff --git a/vendor/gitea.com/lunny/levelqueue/README.md b/vendor/gitea.com/lunny/levelqueue/README.md index 80a0853cf6..21db280839 100644 --- a/vendor/gitea.com/lunny/levelqueue/README.md +++ b/vendor/gitea.com/lunny/levelqueue/README.md @@ -25,4 +25,36 @@ data, err = queue.LPop() queue.LHandle(func(dt []byte) error{ return nil }) -```
\ No newline at end of file +``` + +You can now create a Set from a leveldb: + +```Go +set, err := levelqueue.OpenSet("./set") + +added, err:= set.Add([]byte("member1")) + +has, err := set.Has([]byte("member1")) + +members, err := set.Members() + +removed, err := set.Remove([]byte("member1")) +``` + +And you can create a UniqueQueue from a leveldb: + +```Go +queue, err := levelqueue.OpenUnique("./queue") + +err := queue.RPush([]byte("member1")) + +err = queue.LPush([]byte("member1")) +// Will return ErrAlreadyInQueue + +// and so on. +``` + +## Creating Queues, UniqueQueues and Sets from already open DB + +If you have an already open DB you can create these from this using the +`NewQueue`, `NewUniqueQueue` and `NewSet` functions.
\ No newline at end of file diff --git a/vendor/gitea.com/lunny/levelqueue/error.go b/vendor/gitea.com/lunny/levelqueue/error.go index d639c5d496..648c185655 100644 --- a/vendor/gitea.com/lunny/levelqueue/error.go +++ b/vendor/gitea.com/lunny/levelqueue/error.go @@ -7,6 +7,8 @@ package levelqueue import "errors" var ( - // ErrNotFound means no element in queue + // ErrNotFound means no elements in queue ErrNotFound = errors.New("no key found") + + ErrAlreadyInQueue = errors.New("value already in queue") ) diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go index af624db8e4..20ed90100c 100644 --- a/vendor/gitea.com/lunny/levelqueue/queue.go +++ b/vendor/gitea.com/lunny/levelqueue/queue.go @@ -12,37 +12,62 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +const ( + lowKeyStr = "low" + highKeyStr = "high" +) + // Queue defines a queue struct type Queue struct { - db *leveldb.DB - highLock sync.Mutex - lowLock sync.Mutex - low int64 - high int64 + db *leveldb.DB + highLock sync.Mutex + lowLock sync.Mutex + low int64 + high int64 + lowKey []byte + highKey []byte + prefix []byte + closeUnderlyingDB bool } -// Open opens a queue object or create it if not exist +// Open opens a queue from the db path or creates a +// queue if it doesn't exist. +// The keys will not be prefixed by default func Open(dataDir string) (*Queue, error) { db, err := leveldb.OpenFile(dataDir, nil) if err != nil { return nil, err } + return NewQueue(db, []byte{}, true) +} + +// NewQueue creates a queue from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) { + var err error var queue = &Queue{ - db: db, + db: db, + closeUnderlyingDB: closeUnderlyingDB, } - queue.low, err = queue.readID(lowKey) + + queue.prefix = make([]byte, len(prefix)) + copy(queue.prefix, prefix) + queue.lowKey = withPrefix(prefix, []byte(lowKeyStr)) + queue.highKey = withPrefix(prefix, []byte(highKeyStr)) + + queue.low, err = queue.readID(queue.lowKey) if err == leveldb.ErrNotFound { queue.low = 1 - err = db.Put(lowKey, id2bytes(1), nil) + err = db.Put(queue.lowKey, id2bytes(1), nil) } if err != nil { return nil, err } - queue.high, err = queue.readID(highKey) + queue.high, err = queue.readID(queue.highKey) if err == leveldb.ErrNotFound { - err = db.Put(highKey, id2bytes(0), nil) + err = db.Put(queue.highKey, id2bytes(0), nil) } if err != nil { return nil, err @@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) { return bytes2id(bs) } -var ( - lowKey = []byte("low") - highKey = []byte("high") -) - func (queue *Queue) highincrement() (int64, error) { id := queue.high + 1 queue.high = id - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high - 1 return 0, err @@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) { func (queue *Queue) highdecrement() (int64, error) { queue.high = queue.high - 1 - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high + 1 return 0, err @@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) { func (queue *Queue) lowincrement() (int64, error) { queue.low = queue.low + 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low - 1 return 0, err @@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) { func (queue *Queue) lowdecrement() (int64, error) { queue.low = queue.low - 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low + 1 return 0, err @@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) { return binary.ReadVarint(bytes.NewReader(b)) } +func withPrefix(prefix []byte, value []byte) []byte { + if len(prefix) == 0 { + return value + } + prefixed := make([]byte, len(prefix)+1+len(value)) + copy(prefixed[0:len(prefix)], prefix) + prefixed[len(prefix)] = '-' + copy(prefixed[len(prefix)+1:], value) + return prefixed +} + // RPush pushes a data from right of queue func (queue *Queue) RPush(data []byte) error { queue.highLock.Lock() @@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error { queue.highLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.highLock.Unlock() return err } @@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error { queue.lowLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.lowLock.Unlock() return err } @@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } // LPop pop a data from left of queue @@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } -// Close closes the queue +// Close closes the queue (and the underlying db is set to closeUnderlyingDB) func (queue *Queue) Close() error { + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } err := queue.db.Close() queue.db = nil return err diff --git a/vendor/gitea.com/lunny/levelqueue/set.go b/vendor/gitea.com/lunny/levelqueue/set.go new file mode 100644 index 0000000000..88f4e9b1d1 --- /dev/null +++ b/vendor/gitea.com/lunny/levelqueue/set.go @@ -0,0 +1,110 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "sync" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +const ( + setPrefixStr = "set" +) + +// Set defines a set struct +type Set struct { + db *leveldb.DB + closeUnderlyingDB bool + lock sync.Mutex + prefix []byte +} + +// OpenSet opens a set from the db path or creates a set if it doesn't exist. +// The keys will be prefixed with "set-" by default +func OpenSet(dataDir string) (*Set, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewSet(db, []byte(setPrefixStr), true) +} + +// NewSet creates a set from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) { + set := &Set{ + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + set.prefix = make([]byte, len(prefix)) + copy(set.prefix, prefix) + + return set, nil +} + +// Add adds a member string to a key set, returns true if the member was not already present +func (set *Set) Add(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + has, err := set.db.Has(setKey, nil) + if err != nil || has { + return !has, err + } + return !has, set.db.Put(setKey, []byte(""), nil) +} + +// Members returns the current members of the set +func (set *Set) Members() ([][]byte, error) { + set.lock.Lock() + defer set.lock.Unlock() + var members [][]byte + prefix := withPrefix(set.prefix, []byte{}) + iter := set.db.NewIterator(util.BytesPrefix(prefix), nil) + for iter.Next() { + slice := iter.Key()[len(prefix):] + value := make([]byte, len(slice)) + copy(value, slice) + members = append(members, value) + } + iter.Release() + return members, iter.Error() +} + +// Has returns if the member is in the set +func (set *Set) Has(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + + return set.db.Has(setKey, nil) +} + +// Remove removes a member from the set, returns true if the member was present +func (set *Set) Remove(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + + has, err := set.db.Has(setKey, nil) + if err != nil || !has { + return has, err + } + + return has, set.db.Delete(setKey, nil) +} + +// Close closes the set (and the underlying db if set to closeUnderlyingDB) +func (set *Set) Close() error { + if !set.closeUnderlyingDB { + set.db = nil + return nil + } + err := set.db.Close() + set.db = nil + return err +} diff --git a/vendor/gitea.com/lunny/levelqueue/uniquequeue.go b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go new file mode 100644 index 0000000000..8d2676b0d2 --- /dev/null +++ b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go @@ -0,0 +1,184 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "fmt" + + "github.com/syndtr/goleveldb/leveldb" +) + +const ( + uniqueQueuePrefixStr = "unique" +) + +// UniqueQueue defines an unique queue struct +type UniqueQueue struct { + q *Queue + set *Set + db *leveldb.DB + closeUnderlyingDB bool +} + +// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist. +// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-" +func OpenUnique(dataDir string) (*UniqueQueue, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true) +} + +// NewUniqueQueue creates a new unique queue from a db. +// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix +// and at close the db will be closed as per closeUnderlyingDB +func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) { + internal, err := NewQueue(db, queuePrefix, false) + if err != nil { + return nil, err + } + set, err := NewSet(db, setPrefix, false) + if err != nil { + return nil, err + } + queue := &UniqueQueue{ + q: internal, + set: set, + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + + return queue, err +} + +// LPush pushes data to the left of the queue +func (queue *UniqueQueue) LPush(data []byte) error { + return queue.LPushFunc(data, nil) +} + +// LPushFunc pushes data to the left of the queue and calls the callback if it is added +func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.LPush(data) +} + +// RPush pushes data to the right of the queue +func (queue *UniqueQueue) RPush(data []byte) error { + return queue.RPushFunc(data, nil) +} + +// RPushFunc pushes data to the right of the queue and calls the callback if is added +func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.RPush(data) +} + +// RPop pop data from the right of the queue +func (queue *UniqueQueue) RPop() ([]byte, error) { + popped, err := queue.q.RPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) RHandle(h func([]byte) error) error { + return queue.q.RHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// LPop pops data from left of the queue +func (queue *UniqueQueue) LPop() ([]byte, error) { + popped, err := queue.q.LPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) LHandle(h func([]byte) error) error { + return queue.q.LHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// Has checks whether the data is already in the queue +func (queue *UniqueQueue) Has(data []byte) (bool, error) { + return queue.set.Has(data) +} + +// Len returns the length of the queue +func (queue *UniqueQueue) Len() int64 { + queue.set.lock.Lock() + defer queue.set.lock.Unlock() + return queue.q.Len() +} + +// Close closes the queue (and the underlying DB if set to closeUnderlyingDB) +func (queue *UniqueQueue) Close() error { + _ = queue.q.Close() + _ = queue.set.Close() + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } + err := queue.db.Close() + queue.db = nil + return err +} |