You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 6.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. // Copyright 2019 Lunny Xiao. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package levelqueue
  5. import (
  6. "bytes"
  7. "encoding/binary"
  8. "sync"
  9. "github.com/syndtr/goleveldb/leveldb"
  10. "github.com/syndtr/goleveldb/leveldb/errors"
  11. )
  12. const (
  13. lowKeyStr = "low"
  14. highKeyStr = "high"
  15. )
  16. // Queue defines a queue struct
  17. type Queue struct {
  18. db *leveldb.DB
  19. highLock sync.Mutex
  20. lowLock sync.Mutex
  21. low int64
  22. high int64
  23. lowKey []byte
  24. highKey []byte
  25. prefix []byte
  26. closeUnderlyingDB bool
  27. }
  28. // Open opens a queue from the db path or creates a
  29. // queue if it doesn't exist.
  30. // The keys will not be prefixed by default
  31. func Open(dataDir string) (*Queue, error) {
  32. db, err := leveldb.OpenFile(dataDir, nil)
  33. if err != nil {
  34. if !errors.IsCorrupted(err) {
  35. return nil, err
  36. }
  37. db, err = leveldb.RecoverFile(dataDir, nil)
  38. if err != nil {
  39. return nil, err
  40. }
  41. }
  42. return NewQueue(db, []byte{}, true)
  43. }
  44. // NewQueue creates a queue from a db. The keys will be prefixed with prefix
  45. // and at close the db will be closed as per closeUnderlyingDB
  46. func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
  47. var err error
  48. var queue = &Queue{
  49. db: db,
  50. closeUnderlyingDB: closeUnderlyingDB,
  51. }
  52. queue.prefix = make([]byte, len(prefix))
  53. copy(queue.prefix, prefix)
  54. queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
  55. queue.highKey = withPrefix(prefix, []byte(highKeyStr))
  56. queue.low, err = queue.readID(queue.lowKey)
  57. if err == leveldb.ErrNotFound {
  58. queue.low = 1
  59. err = db.Put(queue.lowKey, id2bytes(1), nil)
  60. }
  61. if err != nil {
  62. return nil, err
  63. }
  64. queue.high, err = queue.readID(queue.highKey)
  65. if err == leveldb.ErrNotFound {
  66. err = db.Put(queue.highKey, id2bytes(0), nil)
  67. }
  68. if err != nil {
  69. return nil, err
  70. }
  71. return queue, nil
  72. }
  73. func (queue *Queue) readID(key []byte) (int64, error) {
  74. bs, err := queue.db.Get(key, nil)
  75. if err != nil {
  76. return 0, err
  77. }
  78. return bytes2id(bs)
  79. }
  80. func (queue *Queue) highincrement() (int64, error) {
  81. id := queue.high + 1
  82. queue.high = id
  83. err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
  84. if err != nil {
  85. queue.high = queue.high - 1
  86. return 0, err
  87. }
  88. return id, nil
  89. }
  90. func (queue *Queue) highdecrement() (int64, error) {
  91. queue.high = queue.high - 1
  92. err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
  93. if err != nil {
  94. queue.high = queue.high + 1
  95. return 0, err
  96. }
  97. return queue.high, nil
  98. }
  99. func (queue *Queue) lowincrement() (int64, error) {
  100. queue.low = queue.low + 1
  101. err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
  102. if err != nil {
  103. queue.low = queue.low - 1
  104. return 0, err
  105. }
  106. return queue.low, nil
  107. }
  108. func (queue *Queue) lowdecrement() (int64, error) {
  109. queue.low = queue.low - 1
  110. err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
  111. if err != nil {
  112. queue.low = queue.low + 1
  113. return 0, err
  114. }
  115. return queue.low, nil
  116. }
  117. // Len returns the length of the queue
  118. func (queue *Queue) Len() int64 {
  119. queue.lowLock.Lock()
  120. queue.highLock.Lock()
  121. l := queue.high - queue.low + 1
  122. queue.highLock.Unlock()
  123. queue.lowLock.Unlock()
  124. return l
  125. }
  126. func id2bytes(id int64) []byte {
  127. var buf = make([]byte, 8)
  128. binary.PutVarint(buf, id)
  129. return buf
  130. }
  131. func bytes2id(b []byte) (int64, error) {
  132. return binary.ReadVarint(bytes.NewReader(b))
  133. }
  134. func withPrefix(prefix []byte, value []byte) []byte {
  135. if len(prefix) == 0 {
  136. return value
  137. }
  138. prefixed := make([]byte, len(prefix)+1+len(value))
  139. copy(prefixed[0:len(prefix)], prefix)
  140. prefixed[len(prefix)] = '-'
  141. copy(prefixed[len(prefix)+1:], value)
  142. return prefixed
  143. }
  144. // RPush pushes a data from right of queue
  145. func (queue *Queue) RPush(data []byte) error {
  146. queue.highLock.Lock()
  147. id, err := queue.highincrement()
  148. if err != nil {
  149. queue.highLock.Unlock()
  150. return err
  151. }
  152. err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
  153. queue.highLock.Unlock()
  154. return err
  155. }
  156. // LPush pushes a data from left of queue
  157. func (queue *Queue) LPush(data []byte) error {
  158. queue.lowLock.Lock()
  159. id, err := queue.lowdecrement()
  160. if err != nil {
  161. queue.lowLock.Unlock()
  162. return err
  163. }
  164. err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
  165. queue.lowLock.Unlock()
  166. return err
  167. }
  168. // RPop pop a data from right of queue
  169. func (queue *Queue) RPop() ([]byte, error) {
  170. queue.highLock.Lock()
  171. defer queue.highLock.Unlock()
  172. currentID := queue.high
  173. res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  174. if err != nil {
  175. if err == leveldb.ErrNotFound {
  176. return nil, ErrNotFound
  177. }
  178. return nil, err
  179. }
  180. _, err = queue.highdecrement()
  181. if err != nil {
  182. return nil, err
  183. }
  184. err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  185. if err != nil {
  186. return nil, err
  187. }
  188. return res, nil
  189. }
  190. // RHandle receives a user callback function to handle the right element of the queue, if function return nil, then delete the element, otherwise keep the element.
  191. func (queue *Queue) RHandle(h func([]byte) error) error {
  192. queue.highLock.Lock()
  193. defer queue.highLock.Unlock()
  194. currentID := queue.high
  195. res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  196. if err != nil {
  197. if err == leveldb.ErrNotFound {
  198. return ErrNotFound
  199. }
  200. return err
  201. }
  202. if err = h(res); err != nil {
  203. return err
  204. }
  205. _, err = queue.highdecrement()
  206. if err != nil {
  207. return err
  208. }
  209. return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  210. }
  211. // LPop pop a data from left of queue
  212. func (queue *Queue) LPop() ([]byte, error) {
  213. queue.lowLock.Lock()
  214. defer queue.lowLock.Unlock()
  215. currentID := queue.low
  216. res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  217. if err != nil {
  218. if err == leveldb.ErrNotFound {
  219. return nil, ErrNotFound
  220. }
  221. return nil, err
  222. }
  223. _, err = queue.lowincrement()
  224. if err != nil {
  225. return nil, err
  226. }
  227. err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  228. if err != nil {
  229. return nil, err
  230. }
  231. return res, nil
  232. }
  233. // LHandle receives a user callback function to handle the left element of the queue, if function return nil, then delete the element, otherwise keep the element.
  234. func (queue *Queue) LHandle(h func([]byte) error) error {
  235. queue.lowLock.Lock()
  236. defer queue.lowLock.Unlock()
  237. currentID := queue.low
  238. res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  239. if err != nil {
  240. if err == leveldb.ErrNotFound {
  241. return ErrNotFound
  242. }
  243. return err
  244. }
  245. if err = h(res); err != nil {
  246. return err
  247. }
  248. _, err = queue.lowincrement()
  249. if err != nil {
  250. return err
  251. }
  252. return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
  253. }
  254. // Close closes the queue (and the underlying db is set to closeUnderlyingDB)
  255. func (queue *Queue) Close() error {
  256. if !queue.closeUnderlyingDB {
  257. queue.db = nil
  258. return nil
  259. }
  260. err := queue.db.Close()
  261. queue.db = nil
  262. return err
  263. }