summaryrefslogtreecommitdiffstats
path: root/vendor
diff options
context:
space:
mode:
Diffstat (limited to 'vendor')
-rw-r--r--vendor/github.com/lunny/levelqueue/LICENSE19
-rw-r--r--vendor/github.com/lunny/levelqueue/error.go12
-rw-r--r--vendor/github.com/lunny/levelqueue/queue.go214
3 files changed, 245 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/levelqueue/LICENSE b/vendor/github.com/lunny/levelqueue/LICENSE
new file mode 100644
index 0000000000..4a5a4ea0ff
--- /dev/null
+++ b/vendor/github.com/lunny/levelqueue/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2019 Lunny Xiao
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/github.com/lunny/levelqueue/error.go b/vendor/github.com/lunny/levelqueue/error.go
new file mode 100644
index 0000000000..d639c5d496
--- /dev/null
+++ b/vendor/github.com/lunny/levelqueue/error.go
@@ -0,0 +1,12 @@
+// Copyright 2019 Lunny Xiao. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import "errors"
+
+var (
+ // ErrNotFound means no element in queue
+ ErrNotFound = errors.New("no key found")
+)
diff --git a/vendor/github.com/lunny/levelqueue/queue.go b/vendor/github.com/lunny/levelqueue/queue.go
new file mode 100644
index 0000000000..0b2bef6c84
--- /dev/null
+++ b/vendor/github.com/lunny/levelqueue/queue.go
@@ -0,0 +1,214 @@
+// Copyright 2019 Lunny Xiao. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+ "bytes"
+ "encoding/binary"
+ "sync"
+
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// Queue defines a queue struct
+type Queue struct {
+ db *leveldb.DB
+ highLock sync.Mutex
+ lowLock sync.Mutex
+ low int64
+ high int64
+}
+
+// Open opens a queue object or create it if not exist
+func Open(dataDir string) (*Queue, error) {
+ db, err := leveldb.OpenFile(dataDir, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ var queue = &Queue{
+ db: db,
+ }
+ queue.low, err = queue.readID(lowKey)
+ if err == leveldb.ErrNotFound {
+ queue.low = 1
+ err = db.Put(lowKey, id2bytes(1), nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ queue.high, err = queue.readID(highKey)
+ if err == leveldb.ErrNotFound {
+ err = db.Put(highKey, id2bytes(0), nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return queue, nil
+}
+
+func (queue *Queue) readID(key []byte) (int64, error) {
+ bs, err := queue.db.Get(key, nil)
+ if err != nil {
+ return 0, err
+ }
+ return bytes2id(bs)
+}
+
+var (
+ lowKey = []byte("low")
+ highKey = []byte("high")
+)
+
+func (queue *Queue) highincrement() (int64, error) {
+ id := queue.high + 1
+ queue.high = id
+ err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ if err != nil {
+ queue.high = queue.high - 1
+ return 0, err
+ }
+ return id, nil
+}
+
+func (queue *Queue) highdecrement() (int64, error) {
+ queue.high = queue.high - 1
+ err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ if err != nil {
+ queue.high = queue.high + 1
+ return 0, err
+ }
+ return queue.high, nil
+}
+
+func (queue *Queue) lowincrement() (int64, error) {
+ queue.low = queue.low + 1
+ err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ if err != nil {
+ queue.low = queue.low - 1
+ return 0, err
+ }
+ return queue.low, nil
+}
+
+func (queue *Queue) lowdecrement() (int64, error) {
+ queue.low = queue.low - 1
+ err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ if err != nil {
+ queue.low = queue.low + 1
+ return 0, err
+ }
+ return queue.low, nil
+}
+
+// Len returns the length of the queue
+func (queue *Queue) Len() int64 {
+ queue.lowLock.Lock()
+ queue.highLock.Lock()
+ l := queue.high - queue.low + 1
+ queue.highLock.Unlock()
+ queue.lowLock.Unlock()
+ return l
+}
+
+func id2bytes(id int64) []byte {
+ var buf = make([]byte, 8)
+ binary.PutVarint(buf, id)
+ return buf
+}
+
+func bytes2id(b []byte) (int64, error) {
+ return binary.ReadVarint(bytes.NewReader(b))
+}
+
+// RPush pushes a data from right of queue
+func (queue *Queue) RPush(data []byte) error {
+ queue.highLock.Lock()
+ id, err := queue.highincrement()
+ if err != nil {
+ queue.highLock.Unlock()
+ return err
+ }
+ err = queue.db.Put(id2bytes(id), data, nil)
+ queue.highLock.Unlock()
+ return err
+}
+
+// LPush pushes a data from left of queue
+func (queue *Queue) LPush(data []byte) error {
+ queue.highLock.Lock()
+ id, err := queue.lowdecrement()
+ if err != nil {
+ queue.highLock.Unlock()
+ return err
+ }
+ err = queue.db.Put(id2bytes(id), data, nil)
+ queue.highLock.Unlock()
+ return err
+}
+
+// RPop pop a data from right of queue
+func (queue *Queue) RPop() ([]byte, error) {
+ queue.highLock.Lock()
+ currentID := queue.high
+
+ res, err := queue.db.Get(id2bytes(currentID), nil)
+ if err != nil {
+ queue.highLock.Unlock()
+ if err == leveldb.ErrNotFound {
+ return nil, ErrNotFound
+ }
+ return nil, err
+ }
+
+ _, err = queue.highdecrement()
+ if err != nil {
+ queue.highLock.Unlock()
+ return nil, err
+ }
+
+ err = queue.db.Delete(id2bytes(currentID), nil)
+ queue.highLock.Unlock()
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+}
+
+// LPop pop a data from left of queue
+func (queue *Queue) LPop() ([]byte, error) {
+ queue.lowLock.Lock()
+ currentID := queue.low
+
+ res, err := queue.db.Get(id2bytes(currentID), nil)
+ if err != nil {
+ queue.lowLock.Unlock()
+ if err == leveldb.ErrNotFound {
+ return nil, ErrNotFound
+ }
+ return nil, err
+ }
+
+ _, err = queue.lowincrement()
+ if err != nil {
+ return nil, err
+ }
+
+ err = queue.db.Delete(id2bytes(currentID), nil)
+ queue.lowLock.Unlock()
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+}
+
+// Close closes the queue
+func (queue *Queue) Close() error {
+ err := queue.db.Close()
+ queue.db = nil
+ return err
+}