diff options
Diffstat (limited to 'vendor/gitea.com/lunny/levelqueue/queue.go')
-rw-r--r-- | vendor/gitea.com/lunny/levelqueue/queue.go | 97 |
1 files changed, 66 insertions, 31 deletions
diff --git a/vendor/gitea.com/lunny/levelqueue/queue.go b/vendor/gitea.com/lunny/levelqueue/queue.go index af624db8e4..20ed90100c 100644 --- a/vendor/gitea.com/lunny/levelqueue/queue.go +++ b/vendor/gitea.com/lunny/levelqueue/queue.go @@ -12,37 +12,62 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +const ( + lowKeyStr = "low" + highKeyStr = "high" +) + // Queue defines a queue struct type Queue struct { - db *leveldb.DB - highLock sync.Mutex - lowLock sync.Mutex - low int64 - high int64 + db *leveldb.DB + highLock sync.Mutex + lowLock sync.Mutex + low int64 + high int64 + lowKey []byte + highKey []byte + prefix []byte + closeUnderlyingDB bool } -// Open opens a queue object or create it if not exist +// Open opens a queue from the db path or creates a +// queue if it doesn't exist. +// The keys will not be prefixed by default func Open(dataDir string) (*Queue, error) { db, err := leveldb.OpenFile(dataDir, nil) if err != nil { return nil, err } + return NewQueue(db, []byte{}, true) +} + +// NewQueue creates a queue from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) { + var err error var queue = &Queue{ - db: db, + db: db, + closeUnderlyingDB: closeUnderlyingDB, } - queue.low, err = queue.readID(lowKey) + + queue.prefix = make([]byte, len(prefix)) + copy(queue.prefix, prefix) + queue.lowKey = withPrefix(prefix, []byte(lowKeyStr)) + queue.highKey = withPrefix(prefix, []byte(highKeyStr)) + + queue.low, err = queue.readID(queue.lowKey) if err == leveldb.ErrNotFound { queue.low = 1 - err = db.Put(lowKey, id2bytes(1), nil) + err = db.Put(queue.lowKey, id2bytes(1), nil) } if err != nil { return nil, err } - queue.high, err = queue.readID(highKey) + queue.high, err = queue.readID(queue.highKey) if err == leveldb.ErrNotFound { - err = db.Put(highKey, id2bytes(0), nil) + err = db.Put(queue.highKey, id2bytes(0), nil) } if err != nil { return nil, err @@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) { return bytes2id(bs) } -var ( - lowKey = []byte("low") - highKey = []byte("high") -) - func (queue *Queue) highincrement() (int64, error) { id := queue.high + 1 queue.high = id - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high - 1 return 0, err @@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) { func (queue *Queue) highdecrement() (int64, error) { queue.high = queue.high - 1 - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high + 1 return 0, err @@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) { func (queue *Queue) lowincrement() (int64, error) { queue.low = queue.low + 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low - 1 return 0, err @@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) { func (queue *Queue) lowdecrement() (int64, error) { queue.low = queue.low - 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low + 1 return 0, err @@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) { return binary.ReadVarint(bytes.NewReader(b)) } +func withPrefix(prefix []byte, value []byte) []byte { + if len(prefix) == 0 { + return value + } + prefixed := make([]byte, len(prefix)+1+len(value)) + copy(prefixed[0:len(prefix)], prefix) + prefixed[len(prefix)] = '-' + copy(prefixed[len(prefix)+1:], value) + return prefixed +} + // RPush pushes a data from right of queue func (queue *Queue) RPush(data []byte) error { queue.highLock.Lock() @@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error { queue.highLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.highLock.Unlock() return err } @@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error { queue.lowLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.lowLock.Unlock() return err } @@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } // LPop pop a data from left of queue @@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } -// Close closes the queue +// Close closes the queue (and the underlying db is set to closeUnderlyingDB) func (queue *Queue) Close() error { + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } err := queue.db.Close() queue.db = nil return err |