summaryrefslogtreecommitdiffstats
path: root/vendor/gitea.com/lunny/levelqueue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gitea.com/lunny/levelqueue/queue.go')
-rw-r--r--vendor/gitea.com/lunny/levelqueue/queue.go97
1 files changed, 66 insertions, 31 deletions
diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go
index af624db8e4..20ed90100c 100644
--- a/vendor/gitea.com/lunny/levelqueue/queue.go
+++ b/vendor/gitea.com/lunny/levelqueue/queue.go
@@ -12,37 +12,62 @@ import (
"github.com/syndtr/goleveldb/leveldb"
)
+const (
+ lowKeyStr = "low"
+ highKeyStr = "high"
+)
+
// Queue defines a queue struct
type Queue struct {
- db *leveldb.DB
- highLock sync.Mutex
- lowLock sync.Mutex
- low int64
- high int64
+ db *leveldb.DB
+ highLock sync.Mutex
+ lowLock sync.Mutex
+ low int64
+ high int64
+ lowKey []byte
+ highKey []byte
+ prefix []byte
+ closeUnderlyingDB bool
}
-// Open opens a queue object or create it if not exist
+// Open opens a queue from the db path or creates a
+// queue if it doesn't exist.
+// The keys will not be prefixed by default
func Open(dataDir string) (*Queue, error) {
db, err := leveldb.OpenFile(dataDir, nil)
if err != nil {
return nil, err
}
+ return NewQueue(db, []byte{}, true)
+}
+
+// NewQueue creates a queue from a db. The keys will be prefixed with prefix
+// and at close the db will be closed as per closeUnderlyingDB
+func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
+ var err error
var queue = &Queue{
- db: db,
+ db: db,
+ closeUnderlyingDB: closeUnderlyingDB,
}
- queue.low, err = queue.readID(lowKey)
+
+ queue.prefix = make([]byte, len(prefix))
+ copy(queue.prefix, prefix)
+ queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
+ queue.highKey = withPrefix(prefix, []byte(highKeyStr))
+
+ queue.low, err = queue.readID(queue.lowKey)
if err == leveldb.ErrNotFound {
queue.low = 1
- err = db.Put(lowKey, id2bytes(1), nil)
+ err = db.Put(queue.lowKey, id2bytes(1), nil)
}
if err != nil {
return nil, err
}
- queue.high, err = queue.readID(highKey)
+ queue.high, err = queue.readID(queue.highKey)
if err == leveldb.ErrNotFound {
- err = db.Put(highKey, id2bytes(0), nil)
+ err = db.Put(queue.highKey, id2bytes(0), nil)
}
if err != nil {
return nil, err
@@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) {
return bytes2id(bs)
}
-var (
- lowKey = []byte("low")
- highKey = []byte("high")
-)
-
func (queue *Queue) highincrement() (int64, error) {
id := queue.high + 1
queue.high = id
- err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high - 1
return 0, err
@@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) {
func (queue *Queue) highdecrement() (int64, error) {
queue.high = queue.high - 1
- err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high + 1
return 0, err
@@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) {
func (queue *Queue) lowincrement() (int64, error) {
queue.low = queue.low + 1
- err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low - 1
return 0, err
@@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) {
func (queue *Queue) lowdecrement() (int64, error) {
queue.low = queue.low - 1
- err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low + 1
return 0, err
@@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) {
return binary.ReadVarint(bytes.NewReader(b))
}
+func withPrefix(prefix []byte, value []byte) []byte {
+ if len(prefix) == 0 {
+ return value
+ }
+ prefixed := make([]byte, len(prefix)+1+len(value))
+ copy(prefixed[0:len(prefix)], prefix)
+ prefixed[len(prefix)] = '-'
+ copy(prefixed[len(prefix)+1:], value)
+ return prefixed
+}
+
// RPush pushes a data from right of queue
func (queue *Queue) RPush(data []byte) error {
queue.highLock.Lock()
@@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error {
queue.highLock.Unlock()
return err
}
- err = queue.db.Put(id2bytes(id), data, nil)
+ err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.highLock.Unlock()
return err
}
@@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error {
queue.lowLock.Unlock()
return err
}
- err = queue.db.Put(id2bytes(id), data, nil)
+ err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.lowLock.Unlock()
return err
}
@@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) {
defer queue.highLock.Unlock()
currentID := queue.high
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
@@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) {
return nil, err
}
- err = queue.db.Delete(id2bytes(currentID), nil)
+ err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
@@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
defer queue.highLock.Unlock()
currentID := queue.high
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
@@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
return err
}
- return queue.db.Delete(id2bytes(currentID), nil)
+ return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
// LPop pop a data from left of queue
@@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) {
defer queue.lowLock.Unlock()
currentID := queue.low
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
@@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) {
return nil, err
}
- err = queue.db.Delete(id2bytes(currentID), nil)
+ err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
@@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
defer queue.lowLock.Unlock()
currentID := queue.low
- res, err := queue.db.Get(id2bytes(currentID), nil)
+ res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
@@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
return err
}
- return queue.db.Delete(id2bytes(currentID), nil)
+ return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
-// Close closes the queue
+// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
func (queue *Queue) Close() error {
+ if !queue.closeUnderlyingDB {
+ queue.db = nil
+ return nil
+ }
err := queue.db.Close()
queue.db = nil
return err