diff options
Diffstat (limited to 'vendor/github.com/lunny/nodb/t_kv.go')
-rw-r--r-- | vendor/github.com/lunny/nodb/t_kv.go | 387 |
1 files changed, 387 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/nodb/t_kv.go b/vendor/github.com/lunny/nodb/t_kv.go new file mode 100644 index 0000000000..82a12f7027 --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_kv.go @@ -0,0 +1,387 @@ +package nodb + +import ( + "errors" + "time" +) + +type KVPair struct { + Key []byte + Value []byte +} + +var errKVKey = errors.New("invalid encode kv key") + +func checkKeySize(key []byte) error { + if len(key) > MaxKeySize || len(key) == 0 { + return errKeySize + } + return nil +} + +func checkValueSize(value []byte) error { + if len(value) > MaxValueSize { + return errValueSize + } + + return nil +} + +func (db *DB) encodeKVKey(key []byte) []byte { + ek := make([]byte, len(key)+2) + ek[0] = db.index + ek[1] = KVType + copy(ek[2:], key) + return ek +} + +func (db *DB) decodeKVKey(ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType { + return nil, errKVKey + } + + return ek[2:], nil +} + +func (db *DB) encodeKVMinKey() []byte { + ek := db.encodeKVKey(nil) + return ek +} + +func (db *DB) encodeKVMaxKey() []byte { + ek := db.encodeKVKey(nil) + ek[len(ek)-1] = KVType + 1 + return ek +} + +func (db *DB) incr(key []byte, delta int64) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + var err error + key = db.encodeKVKey(key) + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + var n int64 + n, err = StrInt64(db.bucket.Get(key)) + if err != nil { + return 0, err + } + + n += delta + + t.Put(key, StrPutInt64(n)) + + //todo binlog + + err = t.Commit() + return n, err +} + +// ps : here just focus on deleting the key-value data, +// any other likes expire is ignore. +func (db *DB) delete(t *batch, key []byte) int64 { + key = db.encodeKVKey(key) + t.Delete(key) + return 1 +} + +func (db *DB) setExpireAt(key []byte, when int64) (int64, error) { + t := db.kvBatch + t.Lock() + defer t.Unlock() + + if exist, err := db.Exists(key); err != nil || exist == 0 { + return 0, err + } else { + db.expireAt(t, KVType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) Decr(key []byte) (int64, error) { + return db.incr(key, -1) +} + +func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) { + return db.incr(key, -decrement) +} + +func (db *DB) Del(keys ...[]byte) (int64, error) { + if len(keys) == 0 { + return 0, nil + } + + codedKeys := make([][]byte, len(keys)) + for i, k := range keys { + codedKeys[i] = db.encodeKVKey(k) + } + + t := db.kvBatch + t.Lock() + defer t.Unlock() + + for i, k := range keys { + t.Delete(codedKeys[i]) + db.rmExpire(t, KVType, k) + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (db *DB) Exists(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + var err error + key = db.encodeKVKey(key) + + var v []byte + v, err = db.bucket.Get(key) + if v != nil && err == nil { + return 1, nil + } + + return 0, err +} + +func (db *DB) Get(key []byte) ([]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + key = db.encodeKVKey(key) + + return db.bucket.Get(key) +} + +func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } else if err := checkValueSize(value); err != nil { + return nil, err + } + + key = db.encodeKVKey(key) + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + oldValue, err := db.bucket.Get(key) + if err != nil { + return nil, err + } + + t.Put(key, value) + //todo, binlog + + err = t.Commit() + + return oldValue, err +} + +func (db *DB) Incr(key []byte) (int64, error) { + return db.incr(key, 1) +} + +func (db *DB) IncrBy(key []byte, increment int64) (int64, error) { + return db.incr(key, increment) +} + +func (db *DB) MGet(keys ...[]byte) ([][]byte, error) { + values := make([][]byte, len(keys)) + + it := db.bucket.NewIterator() + defer it.Close() + + for i := range keys { + if err := checkKeySize(keys[i]); err != nil { + return nil, err + } + + values[i] = it.Find(db.encodeKVKey(keys[i])) + } + + return values, nil +} + +func (db *DB) MSet(args ...KVPair) error { + if len(args) == 0 { + return nil + } + + t := db.kvBatch + + var err error + var key []byte + var value []byte + + t.Lock() + defer t.Unlock() + + for i := 0; i < len(args); i++ { + if err := checkKeySize(args[i].Key); err != nil { + return err + } else if err := checkValueSize(args[i].Value); err != nil { + return err + } + + key = db.encodeKVKey(args[i].Key) + + value = args[i].Value + + t.Put(key, value) + + //todo binlog + } + + err = t.Commit() + return err +} + +func (db *DB) Set(key []byte, value []byte) error { + if err := checkKeySize(key); err != nil { + return err + } else if err := checkValueSize(value); err != nil { + return err + } + + var err error + key = db.encodeKVKey(key) + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + t.Put(key, value) + + err = t.Commit() + + return err +} + +func (db *DB) SetNX(key []byte, value []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } else if err := checkValueSize(value); err != nil { + return 0, err + } + + var err error + key = db.encodeKVKey(key) + + var n int64 = 1 + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + if v, err := db.bucket.Get(key); err != nil { + return 0, err + } else if v != nil { + n = 0 + } else { + t.Put(key, value) + + //todo binlog + + err = t.Commit() + } + + return n, err +} + +func (db *DB) flush() (drop int64, err error) { + t := db.kvBatch + t.Lock() + defer t.Unlock() + return db.flushType(t, KVType) +} + +//if inclusive is true, scan range [key, inf) else (key, inf) +func (db *DB) Scan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(KVType, key, count, inclusive, match) +} + +func (db *DB) Expire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.setExpireAt(key, time.Now().Unix()+duration) +} + +func (db *DB) ExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.setExpireAt(key, when) +} + +func (db *DB) TTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(KVType, key) +} + +func (db *DB) Persist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.kvBatch + t.Lock() + defer t.Unlock() + n, err := db.rmExpire(t, KVType, key) + if err != nil { + return 0, err + } + + err = t.Commit() + return n, err +} + +func (db *DB) Lock() { + t := db.kvBatch + t.Lock() +} + +func (db *DB) Remove(key []byte) bool { + if len(key) == 0 { + return false + } + t := db.kvBatch + t.Delete(db.encodeKVKey(key)) + _, err := db.rmExpire(t, KVType, key) + if err != nil { + return false + } + return true +} + +func (db *DB) Commit() error { + t := db.kvBatch + return t.Commit() +} + +func (db *DB) Unlock() { + t := db.kvBatch + t.Unlock() +} |