diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /vendor/gitea.com/lunny/levelqueue/uniquequeue.go | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'vendor/gitea.com/lunny/levelqueue/uniquequeue.go')
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/uniquequeue.go | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/vendor/gitea.com/lunny/levelqueue/uniquequeue.go b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go new file mode 100644 index 0000000000..8d2676b0d2 --- /dev/null +++ b/vendor/gitea.com/lunny/levelqueue/uniquequeue.go @@ -0,0 +1,184 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "fmt" + + "github.com/syndtr/goleveldb/leveldb" +) + +const ( + uniqueQueuePrefixStr = "unique" +) + +// UniqueQueue defines an unique queue struct +type UniqueQueue struct { + q *Queue + set *Set + db *leveldb.DB + closeUnderlyingDB bool +} + +// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist. +// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-" +func OpenUnique(dataDir string) (*UniqueQueue, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true) +} + +// NewUniqueQueue creates a new unique queue from a db. +// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix +// and at close the db will be closed as per closeUnderlyingDB +func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) { + internal, err := NewQueue(db, queuePrefix, false) + if err != nil { + return nil, err + } + set, err := NewSet(db, setPrefix, false) + if err != nil { + return nil, err + } + queue := &UniqueQueue{ + q: internal, + set: set, + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + + return queue, err +} + +// LPush pushes data to the left of the queue +func (queue *UniqueQueue) LPush(data []byte) error { + return queue.LPushFunc(data, nil) +} + +// LPushFunc pushes data to the left of the queue and calls the callback if it is added +func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.LPush(data) +} + +// RPush pushes data to the right of the queue +func (queue *UniqueQueue) RPush(data []byte) error { + return queue.RPushFunc(data, nil) +} + +// RPushFunc pushes data to the right of the queue and calls the callback if is added +func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.RPush(data) +} + +// RPop pop data from the right of the queue +func (queue *UniqueQueue) RPop() ([]byte, error) { + popped, err := queue.q.RPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) RHandle(h func([]byte) error) error { + return queue.q.RHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// LPop pops data from left of the queue +func (queue *UniqueQueue) LPop() ([]byte, error) { + popped, err := queue.q.LPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) LHandle(h func([]byte) error) error { + return queue.q.LHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// Has checks whether the data is already in the queue +func (queue *UniqueQueue) Has(data []byte) (bool, error) { + return queue.set.Has(data) +} + +// Len returns the length of the queue +func (queue *UniqueQueue) Len() int64 { + queue.set.lock.Lock() + defer queue.set.lock.Unlock() + return queue.q.Len() +} + +// Close closes the queue (and the underlying DB if set to closeUnderlyingDB) +func (queue *UniqueQueue) Close() error { + _ = queue.q.Close() + _ = queue.set.Close() + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } + err := queue.db.Close() + queue.db = nil + return err +} |