diff options
Diffstat (limited to 'vendor')
-rw-r--r-- | vendor/github.com/lunny/levelqueue/LICENSE | 19 | ||||
-rw-r--r-- | vendor/github.com/lunny/levelqueue/error.go | 12 | ||||
-rw-r--r-- | vendor/github.com/lunny/levelqueue/queue.go | 214 |
3 files changed, 245 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/levelqueue/LICENSE b/vendor/github.com/lunny/levelqueue/LICENSE new file mode 100644 index 0000000000..4a5a4ea0ff --- /dev/null +++ b/vendor/github.com/lunny/levelqueue/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2019 Lunny Xiao + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/lunny/levelqueue/error.go b/vendor/github.com/lunny/levelqueue/error.go new file mode 100644 index 0000000000..d639c5d496 --- /dev/null +++ b/vendor/github.com/lunny/levelqueue/error.go @@ -0,0 +1,12 @@ +// Copyright 2019 Lunny Xiao. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import "errors" + +var ( + // ErrNotFound means no element in queue + ErrNotFound = errors.New("no key found") +) diff --git a/vendor/github.com/lunny/levelqueue/queue.go b/vendor/github.com/lunny/levelqueue/queue.go new file mode 100644 index 0000000000..0b2bef6c84 --- /dev/null +++ b/vendor/github.com/lunny/levelqueue/queue.go @@ -0,0 +1,214 @@ +// Copyright 2019 Lunny Xiao. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "bytes" + "encoding/binary" + "sync" + + "github.com/syndtr/goleveldb/leveldb" +) + +// Queue defines a queue struct +type Queue struct { + db *leveldb.DB + highLock sync.Mutex + lowLock sync.Mutex + low int64 + high int64 +} + +// Open opens a queue object or create it if not exist +func Open(dataDir string) (*Queue, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + + var queue = &Queue{ + db: db, + } + queue.low, err = queue.readID(lowKey) + if err == leveldb.ErrNotFound { + queue.low = 1 + err = db.Put(lowKey, id2bytes(1), nil) + } + if err != nil { + return nil, err + } + + queue.high, err = queue.readID(highKey) + if err == leveldb.ErrNotFound { + err = db.Put(highKey, id2bytes(0), nil) + } + if err != nil { + return nil, err + } + + return queue, nil +} + +func (queue *Queue) readID(key []byte) (int64, error) { + bs, err := queue.db.Get(key, nil) + if err != nil { + return 0, err + } + return bytes2id(bs) +} + +var ( + lowKey = []byte("low") + highKey = []byte("high") +) + +func (queue *Queue) highincrement() (int64, error) { + id := queue.high + 1 + queue.high = id + err := queue.db.Put(highKey, id2bytes(queue.high), nil) + if err != nil { + queue.high = queue.high - 1 + return 0, err + } + return id, nil +} + +func (queue *Queue) highdecrement() (int64, error) { + queue.high = queue.high - 1 + err := queue.db.Put(highKey, id2bytes(queue.high), nil) + if err != nil { + queue.high = queue.high + 1 + return 0, err + } + return queue.high, nil +} + +func (queue *Queue) lowincrement() (int64, error) { + queue.low = queue.low + 1 + err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + if err != nil { + queue.low = queue.low - 1 + return 0, err + } + return queue.low, nil +} + +func (queue *Queue) lowdecrement() (int64, error) { + queue.low = queue.low - 1 + err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + if err != nil { + queue.low = queue.low + 1 + return 0, err + } + return queue.low, nil +} + +// Len returns the length of the queue +func (queue *Queue) Len() int64 { + queue.lowLock.Lock() + queue.highLock.Lock() + l := queue.high - queue.low + 1 + queue.highLock.Unlock() + queue.lowLock.Unlock() + return l +} + +func id2bytes(id int64) []byte { + var buf = make([]byte, 8) + binary.PutVarint(buf, id) + return buf +} + +func bytes2id(b []byte) (int64, error) { + return binary.ReadVarint(bytes.NewReader(b)) +} + +// RPush pushes a data from right of queue +func (queue *Queue) RPush(data []byte) error { + queue.highLock.Lock() + id, err := queue.highincrement() + if err != nil { + queue.highLock.Unlock() + return err + } + err = queue.db.Put(id2bytes(id), data, nil) + queue.highLock.Unlock() + return err +} + +// LPush pushes a data from left of queue +func (queue *Queue) LPush(data []byte) error { + queue.highLock.Lock() + id, err := queue.lowdecrement() + if err != nil { + queue.highLock.Unlock() + return err + } + err = queue.db.Put(id2bytes(id), data, nil) + queue.highLock.Unlock() + return err +} + +// RPop pop a data from right of queue +func (queue *Queue) RPop() ([]byte, error) { + queue.highLock.Lock() + currentID := queue.high + + res, err := queue.db.Get(id2bytes(currentID), nil) + if err != nil { + queue.highLock.Unlock() + if err == leveldb.ErrNotFound { + return nil, ErrNotFound + } + return nil, err + } + + _, err = queue.highdecrement() + if err != nil { + queue.highLock.Unlock() + return nil, err + } + + err = queue.db.Delete(id2bytes(currentID), nil) + queue.highLock.Unlock() + if err != nil { + return nil, err + } + return res, nil +} + +// LPop pop a data from left of queue +func (queue *Queue) LPop() ([]byte, error) { + queue.lowLock.Lock() + currentID := queue.low + + res, err := queue.db.Get(id2bytes(currentID), nil) + if err != nil { + queue.lowLock.Unlock() + if err == leveldb.ErrNotFound { + return nil, ErrNotFound + } + return nil, err + } + + _, err = queue.lowincrement() + if err != nil { + return nil, err + } + + err = queue.db.Delete(id2bytes(currentID), nil) + queue.lowLock.Unlock() + if err != nil { + return nil, err + } + return res, nil +} + +// Close closes the queue +func (queue *Queue) Close() error { + err := queue.db.Close() + queue.db = nil + return err +} |