You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

t_list.go 9.0KB


  1. package nodb
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "time"
  6. "github.com/lunny/nodb/store"
  7. )
  8. const (
  9. listHeadSeq int32 = 1
  10. listTailSeq int32 = 2
  11. listMinSeq int32 = 1000
  12. listMaxSeq int32 = 1<<31 - 1000
  13. listInitialSeq int32 = listMinSeq + (listMaxSeq-listMinSeq)/2
  14. )
  15. var errLMetaKey = errors.New("invalid lmeta key")
  16. var errListKey = errors.New("invalid list key")
  17. var errListSeq = errors.New("invalid list sequence, overflow")
  18. func (db *DB) lEncodeMetaKey(key []byte) []byte {
  19. buf := make([]byte, len(key)+2)
  20. buf[0] = db.index
  21. buf[1] = LMetaType
  22. copy(buf[2:], key)
  23. return buf
  24. }
  25. func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) {
  26. if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType {
  27. return nil, errLMetaKey
  28. }
  29. return ek[2:], nil
  30. }
  31. func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
  32. buf := make([]byte, len(key)+8)
  33. pos := 0
  34. buf[pos] = db.index
  35. pos++
  36. buf[pos] = ListType
  37. pos++
  38. binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
  39. pos += 2
  40. copy(buf[pos:], key)
  41. pos += len(key)
  42. binary.BigEndian.PutUint32(buf[pos:], uint32(seq))
  43. return buf
  44. }
  45. func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) {
  46. if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType {
  47. err = errListKey
  48. return
  49. }
  50. keyLen := int(binary.BigEndian.Uint16(ek[2:]))
  51. if keyLen+8 != len(ek) {
  52. err = errListKey
  53. return
  54. }
  55. key = ek[4 : 4+keyLen]
  56. seq = int32(binary.BigEndian.Uint32(ek[4+keyLen:]))
  57. return
  58. }
  59. func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
  60. if err := checkKeySize(key); err != nil {
  61. return 0, err
  62. }
  63. var headSeq int32
  64. var tailSeq int32
  65. var size int32
  66. var err error
  67. t := db.listBatch
  68. t.Lock()
  69. defer t.Unlock()
  70. metaKey := db.lEncodeMetaKey(key)
  71. headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
  72. if err != nil {
  73. return 0, err
  74. }
  75. var pushCnt int = len(args)
  76. if pushCnt == 0 {
  77. return int64(size), nil
  78. }
  79. var seq int32 = headSeq
  80. var delta int32 = -1
  81. if whereSeq == listTailSeq {
  82. seq = tailSeq
  83. delta = 1
  84. }
  85. // append elements
  86. if size > 0 {
  87. seq += delta
  88. }
  89. for i := 0; i < pushCnt; i++ {
  90. ek := db.lEncodeListKey(key, seq+int32(i)*delta)
  91. t.Put(ek, args[i])
  92. }
  93. seq += int32(pushCnt-1) * delta
  94. if seq <= listMinSeq || seq >= listMaxSeq {
  95. return 0, errListSeq
  96. }
  97. // set meta info
  98. if whereSeq == listHeadSeq {
  99. headSeq = seq
  100. } else {
  101. tailSeq = seq
  102. }
  103. db.lSetMeta(metaKey, headSeq, tailSeq)
  104. err = t.Commit()
  105. return int64(size) + int64(pushCnt), err
  106. }
  107. func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
  108. if err := checkKeySize(key); err != nil {
  109. return nil, err
  110. }
  111. t := db.listBatch
  112. t.Lock()
  113. defer t.Unlock()
  114. var headSeq int32
  115. var tailSeq int32
  116. var err error
  117. metaKey := db.lEncodeMetaKey(key)
  118. headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey)
  119. if err != nil {
  120. return nil, err
  121. }
  122. var value []byte
  123. var seq int32 = headSeq
  124. if whereSeq == listTailSeq {
  125. seq = tailSeq
  126. }
  127. itemKey := db.lEncodeListKey(key, seq)
  128. value, err = db.bucket.Get(itemKey)
  129. if err != nil {
  130. return nil, err
  131. }
  132. if whereSeq == listHeadSeq {
  133. headSeq += 1
  134. } else {
  135. tailSeq -= 1
  136. }
  137. t.Delete(itemKey)
  138. size := db.lSetMeta(metaKey, headSeq, tailSeq)
  139. if size == 0 {
  140. db.rmExpire(t, HashType, key)
  141. }
  142. err = t.Commit()
  143. return value, err
  144. }
  145. // ps : here just focus on deleting the list data,
  146. // any other likes expire is ignore.
  147. func (db *DB) lDelete(t *batch, key []byte) int64 {
  148. mk := db.lEncodeMetaKey(key)
  149. var headSeq int32
  150. var tailSeq int32
  151. var err error
  152. it := db.bucket.NewIterator()
  153. defer it.Close()
  154. headSeq, tailSeq, _, err = db.lGetMeta(it, mk)
  155. if err != nil {
  156. return 0
  157. }
  158. var num int64 = 0
  159. startKey := db.lEncodeListKey(key, headSeq)
  160. stopKey := db.lEncodeListKey(key, tailSeq)
  161. rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose})
  162. for ; rit.Valid(); rit.Next() {
  163. t.Delete(rit.RawKey())
  164. num++
  165. }
  166. t.Delete(mk)
  167. return num
  168. }
  169. func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
  170. var v []byte
  171. if it != nil {
  172. v = it.Find(ek)
  173. } else {
  174. v, err = db.bucket.Get(ek)
  175. }
  176. if err != nil {
  177. return
  178. } else if v == nil {
  179. headSeq = listInitialSeq
  180. tailSeq = listInitialSeq
  181. size = 0
  182. return
  183. } else {
  184. headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
  185. tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
  186. size = tailSeq - headSeq + 1
  187. }
  188. return
  189. }
  190. func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32) int32 {
  191. t := db.listBatch
  192. var size int32 = tailSeq - headSeq + 1
  193. if size < 0 {
  194. // todo : log error + panic
  195. } else if size == 0 {
  196. t.Delete(ek)
  197. } else {
  198. buf := make([]byte, 8)
  199. binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
  200. binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
  201. t.Put(ek, buf)
  202. }
  203. return size
  204. }
  205. func (db *DB) lExpireAt(key []byte, when int64) (int64, error) {
  206. t := db.listBatch
  207. t.Lock()
  208. defer t.Unlock()
  209. if llen, err := db.LLen(key); err != nil || llen == 0 {
  210. return 0, err
  211. } else {
  212. db.expireAt(t, ListType, key, when)
  213. if err := t.Commit(); err != nil {
  214. return 0, err
  215. }
  216. }
  217. return 1, nil
  218. }
  219. func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
  220. if err := checkKeySize(key); err != nil {
  221. return nil, err
  222. }
  223. var seq int32
  224. var headSeq int32
  225. var tailSeq int32
  226. var err error
  227. metaKey := db.lEncodeMetaKey(key)
  228. it := db.bucket.NewIterator()
  229. defer it.Close()
  230. headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey)
  231. if err != nil {
  232. return nil, err
  233. }
  234. if index >= 0 {
  235. seq = headSeq + index
  236. } else {
  237. seq = tailSeq + index + 1
  238. }
  239. sk := db.lEncodeListKey(key, seq)
  240. v := it.Find(sk)
  241. return v, nil
  242. }
  243. func (db *DB) LLen(key []byte) (int64, error) {
  244. if err := checkKeySize(key); err != nil {
  245. return 0, err
  246. }
  247. ek := db.lEncodeMetaKey(key)
  248. _, _, size, err := db.lGetMeta(nil, ek)
  249. return int64(size), err
  250. }
  251. func (db *DB) LPop(key []byte) ([]byte, error) {
  252. return db.lpop(key, listHeadSeq)
  253. }
  254. func (db *DB) LPush(key []byte, arg1 []byte, args ...[]byte) (int64, error) {
  255. var argss = [][]byte{arg1}
  256. argss = append(argss, args...)
  257. return db.lpush(key, listHeadSeq, argss...)
  258. }
  259. func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
  260. if err := checkKeySize(key); err != nil {
  261. return nil, err
  262. }
  263. var headSeq int32
  264. var llen int32
  265. var err error
  266. metaKey := db.lEncodeMetaKey(key)
  267. it := db.bucket.NewIterator()
  268. defer it.Close()
  269. if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil {
  270. return nil, err
  271. }
  272. if start < 0 {
  273. start = llen + start
  274. }
  275. if stop < 0 {
  276. stop = llen + stop
  277. }
  278. if start < 0 {
  279. start = 0
  280. }
  281. if start > stop || start >= llen {
  282. return [][]byte{}, nil
  283. }
  284. if stop >= llen {
  285. stop = llen - 1
  286. }
  287. limit := (stop - start) + 1
  288. headSeq += start
  289. v := make([][]byte, 0, limit)
  290. startKey := db.lEncodeListKey(key, headSeq)
  291. rit := store.NewRangeLimitIterator(it,
  292. &store.Range{
  293. Min: startKey,
  294. Max: nil,
  295. Type: store.RangeClose},
  296. &store.Limit{
  297. Offset: 0,
  298. Count: int(limit)})
  299. for ; rit.Valid(); rit.Next() {
  300. v = append(v, rit.Value())
  301. }
  302. return v, nil
  303. }
  304. func (db *DB) RPop(key []byte) ([]byte, error) {
  305. return db.lpop(key, listTailSeq)
  306. }
  307. func (db *DB) RPush(key []byte, arg1 []byte, args ...[]byte) (int64, error) {
  308. var argss = [][]byte{arg1}
  309. argss = append(argss, args...)
  310. return db.lpush(key, listTailSeq, argss...)
  311. }
  312. func (db *DB) LClear(key []byte) (int64, error) {
  313. if err := checkKeySize(key); err != nil {
  314. return 0, err
  315. }
  316. t := db.listBatch
  317. t.Lock()
  318. defer t.Unlock()
  319. num := db.lDelete(t, key)
  320. db.rmExpire(t, ListType, key)
  321. err := t.Commit()
  322. return num, err
  323. }
  324. func (db *DB) LMclear(keys ...[]byte) (int64, error) {
  325. t := db.listBatch
  326. t.Lock()
  327. defer t.Unlock()
  328. for _, key := range keys {
  329. if err := checkKeySize(key); err != nil {
  330. return 0, err
  331. }
  332. db.lDelete(t, key)
  333. db.rmExpire(t, ListType, key)
  334. }
  335. err := t.Commit()
  336. return int64(len(keys)), err
  337. }
  338. func (db *DB) lFlush() (drop int64, err error) {
  339. t := db.listBatch
  340. t.Lock()
  341. defer t.Unlock()
  342. return db.flushType(t, ListType)
  343. }
  344. func (db *DB) LExpire(key []byte, duration int64) (int64, error) {
  345. if duration <= 0 {
  346. return 0, errExpireValue
  347. }
  348. return db.lExpireAt(key, time.Now().Unix()+duration)
  349. }
  350. func (db *DB) LExpireAt(key []byte, when int64) (int64, error) {
  351. if when <= time.Now().Unix() {
  352. return 0, errExpireValue
  353. }
  354. return db.lExpireAt(key, when)
  355. }
  356. func (db *DB) LTTL(key []byte) (int64, error) {
  357. if err := checkKeySize(key); err != nil {
  358. return -1, err
  359. }
  360. return db.ttl(ListType, key)
  361. }
  362. func (db *DB) LPersist(key []byte) (int64, error) {
  363. if err := checkKeySize(key); err != nil {
  364. return 0, err
  365. }
  366. t := db.listBatch
  367. t.Lock()
  368. defer t.Unlock()
  369. n, err := db.rmExpire(t, ListType, key)
  370. if err != nil {
  371. return 0, err
  372. }
  373. err = t.Commit()
  374. return n, err
  375. }
  376. func (db *DB) LScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
  377. return db.scan(LMetaType, key, count, inclusive, match)
  378. }
  379. func (db *DB) lEncodeMinKey() []byte {
  380. return db.lEncodeMetaKey(nil)
  381. }
  382. func (db *DB) lEncodeMaxKey() []byte {
  383. ek := db.lEncodeMetaKey(nil)
  384. ek[len(ek)-1] = LMetaType + 1
  385. return ek
  386. }