aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/lunny
diff options
context:
space:
mode:
authortechknowlogick <matti@mdranta.net>2019-02-05 11:52:51 -0500
committerGitHub <noreply@github.com>2019-02-05 11:52:51 -0500
commit9de871a0f8911030f8e06a881803cf722b8798ea (patch)
tree206400f0a5873d7d078fcdd004956036f07a1db5 /vendor/github.com/lunny
parentbf4badad1d68c18d7ffb92c69e09e4e8aa252935 (diff)
downloadgitea-9de871a0f8911030f8e06a881803cf722b8798ea.tar.gz
gitea-9de871a0f8911030f8e06a881803cf722b8798ea.zip
add other session providers (#5963)
Diffstat (limited to 'vendor/github.com/lunny')
-rw-r--r--vendor/github.com/lunny/log/LICENSE27
-rw-r--r--vendor/github.com/lunny/log/dbwriter.go36
-rw-r--r--vendor/github.com/lunny/log/filewriter.go112
-rw-r--r--vendor/github.com/lunny/log/logext.go595
-rw-r--r--vendor/github.com/lunny/nodb/LICENSE21
-rw-r--r--vendor/github.com/lunny/nodb/batch.go106
-rw-r--r--vendor/github.com/lunny/nodb/binlog.go391
-rw-r--r--vendor/github.com/lunny/nodb/binlog_util.go215
-rw-r--r--vendor/github.com/lunny/nodb/config/config.go135
-rw-r--r--vendor/github.com/lunny/nodb/const.go98
-rw-r--r--vendor/github.com/lunny/nodb/doc.go61
-rw-r--r--vendor/github.com/lunny/nodb/dump.go200
-rw-r--r--vendor/github.com/lunny/nodb/info.go24
-rw-r--r--vendor/github.com/lunny/nodb/multi.go73
-rw-r--r--vendor/github.com/lunny/nodb/nodb.go128
-rw-r--r--vendor/github.com/lunny/nodb/nodb_db.go171
-rw-r--r--vendor/github.com/lunny/nodb/replication.go312
-rw-r--r--vendor/github.com/lunny/nodb/scan.go144
-rw-r--r--vendor/github.com/lunny/nodb/store/db.go61
-rw-r--r--vendor/github.com/lunny/nodb/store/driver/batch.go39
-rw-r--r--vendor/github.com/lunny/nodb/store/driver/driver.go67
-rw-r--r--vendor/github.com/lunny/nodb/store/driver/store.go46
-rw-r--r--vendor/github.com/lunny/nodb/store/goleveldb/batch.go27
-rw-r--r--vendor/github.com/lunny/nodb/store/goleveldb/const.go4
-rw-r--r--vendor/github.com/lunny/nodb/store/goleveldb/db.go187
-rw-r--r--vendor/github.com/lunny/nodb/store/goleveldb/iterator.go49
-rw-r--r--vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go26
-rw-r--r--vendor/github.com/lunny/nodb/store/iterator.go327
-rw-r--r--vendor/github.com/lunny/nodb/store/snapshot.go16
-rw-r--r--vendor/github.com/lunny/nodb/store/store.go51
-rw-r--r--vendor/github.com/lunny/nodb/store/tx.go42
-rw-r--r--vendor/github.com/lunny/nodb/store/writebatch.go9
-rw-r--r--vendor/github.com/lunny/nodb/t_bit.go922
-rw-r--r--vendor/github.com/lunny/nodb/t_hash.go509
-rw-r--r--vendor/github.com/lunny/nodb/t_kv.go387
-rw-r--r--vendor/github.com/lunny/nodb/t_list.go492
-rw-r--r--vendor/github.com/lunny/nodb/t_set.go601
-rw-r--r--vendor/github.com/lunny/nodb/t_ttl.go195
-rw-r--r--vendor/github.com/lunny/nodb/t_zset.go943
-rw-r--r--vendor/github.com/lunny/nodb/tx.go113
-rw-r--r--vendor/github.com/lunny/nodb/util.go113
41 files changed, 8075 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/log/LICENSE b/vendor/github.com/lunny/log/LICENSE
new file mode 100644
index 0000000000..c9338f8293
--- /dev/null
+++ b/vendor/github.com/lunny/log/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2014 - 2016 lunny
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of the {organization} nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/vendor/github.com/lunny/log/dbwriter.go b/vendor/github.com/lunny/log/dbwriter.go
new file mode 100644
index 0000000000..e8ff00bd89
--- /dev/null
+++ b/vendor/github.com/lunny/log/dbwriter.go
@@ -0,0 +1,36 @@
+package log
+
+import (
+ "database/sql"
+ "time"
+)
+
+type DBWriter struct {
+ db *sql.DB
+ stmt *sql.Stmt
+ content chan []byte
+}
+
+func NewDBWriter(db *sql.DB) (*DBWriter, error) {
+ _, err := db.Exec("CREATE TABLE IF NOT EXISTS log (id int, content text, created datetime)")
+ if err != nil {
+ return nil, err
+ }
+ stmt, err := db.Prepare("INSERT INTO log (content, created) values (?, ?)")
+ if err != nil {
+ return nil, err
+ }
+ return &DBWriter{db, stmt, make(chan []byte, 1000)}, nil
+}
+
+func (w *DBWriter) Write(p []byte) (n int, err error) {
+ _, err = w.stmt.Exec(string(p), time.Now())
+ if err == nil {
+ n = len(p)
+ }
+ return
+}
+
+func (w *DBWriter) Close() {
+ w.stmt.Close()
+}
diff --git a/vendor/github.com/lunny/log/filewriter.go b/vendor/github.com/lunny/log/filewriter.go
new file mode 100644
index 0000000000..f0bb4d1df1
--- /dev/null
+++ b/vendor/github.com/lunny/log/filewriter.go
@@ -0,0 +1,112 @@
+package log
+
+import (
+ "io"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+var _ io.Writer = &Files{}
+
+type ByType int
+
+const (
+ ByDay ByType = iota
+ ByHour
+ ByMonth
+)
+
+var (
+ formats = map[ByType]string{
+ ByDay: "2006-01-02",
+ ByHour: "2006-01-02-15",
+ ByMonth: "2006-01",
+ }
+)
+
+func SetFileFormat(t ByType, format string) {
+ formats[t] = format
+}
+
+func (b ByType) Format() string {
+ return formats[b]
+}
+
+type Files struct {
+ FileOptions
+ f *os.File
+ lastFormat string
+ lock sync.Mutex
+}
+
+type FileOptions struct {
+ Dir string
+ ByType ByType
+ Loc *time.Location
+}
+
+func prepareFileOption(opts []FileOptions) FileOptions {
+ var opt FileOptions
+ if len(opts) > 0 {
+ opt = opts[0]
+ }
+ if opt.Dir == "" {
+ opt.Dir = "./"
+ }
+ err := os.MkdirAll(opt.Dir, os.ModePerm)
+ if err != nil {
+ panic(err.Error())
+ }
+
+ if opt.Loc == nil {
+ opt.Loc = time.Local
+ }
+ return opt
+}
+
+func NewFileWriter(opts ...FileOptions) *Files {
+ opt := prepareFileOption(opts)
+ return &Files{
+ FileOptions: opt,
+ }
+}
+
+func (f *Files) getFile() (*os.File, error) {
+ var err error
+ t := time.Now().In(f.Loc)
+ if f.f == nil {
+ f.lastFormat = t.Format(f.ByType.Format())
+ f.f, err = os.OpenFile(filepath.Join(f.Dir, f.lastFormat+".log"),
+ os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+ return f.f, err
+ }
+ if f.lastFormat != t.Format(f.ByType.Format()) {
+ f.f.Close()
+ f.lastFormat = t.Format(f.ByType.Format())
+ f.f, err = os.OpenFile(filepath.Join(f.Dir, f.lastFormat+".log"),
+ os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+ return f.f, err
+ }
+ return f.f, nil
+}
+
+func (f *Files) Write(bs []byte) (int, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ w, err := f.getFile()
+ if err != nil {
+ return 0, err
+ }
+ return w.Write(bs)
+}
+
+func (f *Files) Close() {
+ if f.f != nil {
+ f.f.Close()
+ f.f = nil
+ }
+ f.lastFormat = ""
+}
diff --git a/vendor/github.com/lunny/log/logext.go b/vendor/github.com/lunny/log/logext.go
new file mode 100644
index 0000000000..215c45f309
--- /dev/null
+++ b/vendor/github.com/lunny/log/logext.go
@@ -0,0 +1,595 @@
+package log
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+)
+
+// These flags define which text to prefix to each log entry generated by the Logger.
+const (
+ // Bits or'ed together to control what's printed. There is no control over the
+ // order they appear (the order listed here) or the format they present (as
+ // described in the comments). A colon appears after these items:
+ // 2009/0123 01:23:23.123123 /a/b/c/d.go:23: message
+ Ldate = 1 << iota // the date: 2009/0123
+ Ltime // the time: 01:23:23
+ Lmicroseconds // microsecond resolution: 01:23:23.123123. assumes Ltime.
+ Llongfile // full file name and line number: /a/b/c/d.go:23
+ Lshortfile // final file name element and line number: d.go:23. overrides Llongfile
+ Lmodule // module name
+ Llevel // level: 0(Debug), 1(Info), 2(Warn), 3(Error), 4(Panic), 5(Fatal)
+ Llongcolor // color will start [info] end of line
+ Lshortcolor // color only include [info]
+ LstdFlags = Ldate | Ltime // initial values for the standard logger
+ //Ldefault = Llevel | LstdFlags | Lshortfile | Llongcolor
+) // [prefix][time][level][module][shortfile|longfile]
+
+func Ldefault() int {
+ if runtime.GOOS == "windows" {
+ return Llevel | LstdFlags | Lshortfile
+ }
+ return Llevel | LstdFlags | Lshortfile | Llongcolor
+}
+
+func Version() string {
+ return "0.2.0.1121"
+}
+
+const (
+ Lall = iota
+)
+const (
+ Ldebug = iota
+ Linfo
+ Lwarn
+ Lerror
+ Lpanic
+ Lfatal
+ Lnone
+)
+
+const (
+ ForeBlack = iota + 30 //30
+ ForeRed //31
+ ForeGreen //32
+ ForeYellow //33
+ ForeBlue //34
+ ForePurple //35
+ ForeCyan //36
+ ForeWhite //37
+)
+
+const (
+ BackBlack = iota + 40 //40
+ BackRed //41
+ BackGreen //42
+ BackYellow //43
+ BackBlue //44
+ BackPurple //45
+ BackCyan //46
+ BackWhite //47
+)
+
+var levels = []string{
+ "[Debug]",
+ "[Info]",
+ "[Warn]",
+ "[Error]",
+ "[Panic]",
+ "[Fatal]",
+}
+
+// MUST called before all logs
+func SetLevels(lvs []string) {
+ levels = lvs
+}
+
+var colors = []int{
+ ForeCyan,
+ ForeGreen,
+ ForeYellow,
+ ForeRed,
+ ForePurple,
+ ForeBlue,
+}
+
+// MUST called before all logs
+func SetColors(cls []int) {
+ colors = cls
+}
+
+// A Logger represents an active logging object that generates lines of
+// output to an io.Writer. Each logging operation makes a single call to
+// the Writer's Write method. A Logger can be used simultaneously from
+// multiple goroutines; it guarantees to serialize access to the Writer.
+type Logger struct {
+ mu sync.Mutex // ensures atomic writes; protects the following fields
+ prefix string // prefix to write at beginning of each line
+ flag int // properties
+ Level int
+ out io.Writer // destination for output
+ buf bytes.Buffer // for accumulating text to write
+ levelStats [6]int64
+ loc *time.Location
+}
+
+// New creates a new Logger. The out variable sets the
+// destination to which log data will be written.
+// The prefix appears at the beginning of each generated log line.
+// The flag argument defines the logging properties.
+func New(out io.Writer, prefix string, flag int) *Logger {
+ l := &Logger{out: out, prefix: prefix, Level: 1, flag: flag, loc: time.Local}
+ if out != os.Stdout {
+ l.flag = RmColorFlags(l.flag)
+ }
+ return l
+}
+
+var Std = New(os.Stderr, "", Ldefault())
+
+// Cheap integer to fixed-width decimal ASCII. Give a negative width to avoid zero-padding.
+// Knows the buffer has capacity.
+func itoa(buf *bytes.Buffer, i int, wid int) {
+ var u uint = uint(i)
+ if u == 0 && wid <= 1 {
+ buf.WriteByte('0')
+ return
+ }
+
+ // Assemble decimal in reverse order.
+ var b [32]byte
+ bp := len(b)
+ for ; u > 0 || wid > 0; u /= 10 {
+ bp--
+ wid--
+ b[bp] = byte(u%10) + '0'
+ }
+
+ // avoid slicing b to avoid an allocation.
+ for bp < len(b) {
+ buf.WriteByte(b[bp])
+ bp++
+ }
+}
+
+func moduleOf(file string) string {
+ pos := strings.LastIndex(file, "/")
+ if pos != -1 {
+ pos1 := strings.LastIndex(file[:pos], "/src/")
+ if pos1 != -1 {
+ return file[pos1+5 : pos]
+ }
+ }
+ return "UNKNOWN"
+}
+
+func (l *Logger) formatHeader(buf *bytes.Buffer, t time.Time,
+ file string, line int, lvl int, reqId string) {
+ if l.prefix != "" {
+ buf.WriteString(l.prefix)
+ }
+ if l.flag&(Ldate|Ltime|Lmicroseconds) != 0 {
+ if l.flag&Ldate != 0 {
+ year, month, day := t.Date()
+ itoa(buf, year, 4)
+ buf.WriteByte('/')
+ itoa(buf, int(month), 2)
+ buf.WriteByte('/')
+ itoa(buf, day, 2)
+ buf.WriteByte(' ')
+ }
+ if l.flag&(Ltime|Lmicroseconds) != 0 {
+ hour, min, sec := t.Clock()
+ itoa(buf, hour, 2)
+ buf.WriteByte(':')
+ itoa(buf, min, 2)
+ buf.WriteByte(':')
+ itoa(buf, sec, 2)
+ if l.flag&Lmicroseconds != 0 {
+ buf.WriteByte('.')
+ itoa(buf, t.Nanosecond()/1e3, 6)
+ }
+ buf.WriteByte(' ')
+ }
+ }
+ if reqId != "" {
+ buf.WriteByte('[')
+ buf.WriteString(reqId)
+ buf.WriteByte(']')
+ buf.WriteByte(' ')
+ }
+
+ if l.flag&(Lshortcolor|Llongcolor) != 0 {
+ buf.WriteString(fmt.Sprintf("\033[1;%dm", colors[lvl]))
+ }
+ if l.flag&Llevel != 0 {
+ buf.WriteString(levels[lvl])
+ buf.WriteByte(' ')
+ }
+ if l.flag&Lshortcolor != 0 {
+ buf.WriteString("\033[0m")
+ }
+
+ if l.flag&Lmodule != 0 {
+ buf.WriteByte('[')
+ buf.WriteString(moduleOf(file))
+ buf.WriteByte(']')
+ buf.WriteByte(' ')
+ }
+ if l.flag&(Lshortfile|Llongfile) != 0 {
+ if l.flag&Lshortfile != 0 {
+ short := file
+ for i := len(file) - 1; i > 0; i-- {
+ if file[i] == '/' {
+ short = file[i+1:]
+ break
+ }
+ }
+ file = short
+ }
+ buf.WriteString(file)
+ buf.WriteByte(':')
+ itoa(buf, line, -1)
+ buf.WriteByte(' ')
+ }
+}
+
+// Output writes the output for a logging event. The string s contains
+// the text to print after the prefix specified by the flags of the
+// Logger. A newline is appended if the last character of s is not
+// already a newline. Calldepth is used to recover the PC and is
+// provided for generality, although at the moment on all pre-defined
+// paths it will be 2.
+func (l *Logger) Output(reqId string, lvl int, calldepth int, s string) error {
+ if lvl < l.Level {
+ return nil
+ }
+ now := time.Now().In(l.loc) // get this early.
+ var file string
+ var line int
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.flag&(Lshortfile|Llongfile|Lmodule) != 0 {
+ // release lock while getting caller info - it's expensive.
+ l.mu.Unlock()
+ var ok bool
+ _, file, line, ok = runtime.Caller(calldepth)
+ if !ok {
+ file = "???"
+ line = 0
+ }
+ l.mu.Lock()
+ }
+ l.levelStats[lvl]++
+ l.buf.Reset()
+ l.formatHeader(&l.buf, now, file, line, lvl, reqId)
+ l.buf.WriteString(s)
+ if l.flag&Llongcolor != 0 {
+ l.buf.WriteString("\033[0m")
+ }
+ if len(s) > 0 && s[len(s)-1] != '\n' {
+ l.buf.WriteByte('\n')
+ }
+ _, err := l.out.Write(l.buf.Bytes())
+ return err
+}
+
+// -----------------------------------------
+
+// Printf calls l.Output to print to the logger.
+// Arguments are handled in the manner of fmt.Printf.
+func (l *Logger) Printf(format string, v ...interface{}) {
+ l.Output("", Linfo, 2, fmt.Sprintf(format, v...))
+}
+
+// Print calls l.Output to print to the logger.
+// Arguments are handled in the manner of fmt.Print.
+func (l *Logger) Print(v ...interface{}) {
+ l.Output("", Linfo, 2, fmt.Sprint(v...))
+}
+
+// Println calls l.Output to print to the logger.
+// Arguments are handled in the manner of fmt.Println.
+func (l *Logger) Println(v ...interface{}) {
+ l.Output("", Linfo, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func (l *Logger) Debugf(format string, v ...interface{}) {
+ l.Output("", Ldebug, 2, fmt.Sprintf(format, v...))
+}
+
+func (l *Logger) Debug(v ...interface{}) {
+ l.Output("", Ldebug, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+func (l *Logger) Infof(format string, v ...interface{}) {
+ l.Output("", Linfo, 2, fmt.Sprintf(format, v...))
+}
+
+func (l *Logger) Info(v ...interface{}) {
+ l.Output("", Linfo, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+func (l *Logger) Warnf(format string, v ...interface{}) {
+ l.Output("", Lwarn, 2, fmt.Sprintf(format, v...))
+}
+
+func (l *Logger) Warn(v ...interface{}) {
+ l.Output("", Lwarn, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func (l *Logger) Errorf(format string, v ...interface{}) {
+ l.Output("", Lerror, 2, fmt.Sprintf(format, v...))
+}
+
+func (l *Logger) Error(v ...interface{}) {
+ l.Output("", Lerror, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func (l *Logger) Fatal(v ...interface{}) {
+ l.Output("", Lfatal, 2, fmt.Sprintln(v...))
+ os.Exit(1)
+}
+
+// Fatalf is equivalent to l.Printf() followed by a call to os.Exit(1).
+func (l *Logger) Fatalf(format string, v ...interface{}) {
+ l.Output("", Lfatal, 2, fmt.Sprintf(format, v...))
+ os.Exit(1)
+}
+
+// -----------------------------------------
+// Panic is equivalent to l.Print() followed by a call to panic().
+func (l *Logger) Panic(v ...interface{}) {
+ s := fmt.Sprintln(v...)
+ l.Output("", Lpanic, 2, s)
+ panic(s)
+}
+
+// Panicf is equivalent to l.Printf() followed by a call to panic().
+func (l *Logger) Panicf(format string, v ...interface{}) {
+ s := fmt.Sprintf(format, v...)
+ l.Output("", Lpanic, 2, s)
+ panic(s)
+}
+
+// -----------------------------------------
+func (l *Logger) Stack(v ...interface{}) {
+ s := fmt.Sprint(v...)
+ s += "\n"
+ buf := make([]byte, 1024*1024)
+ n := runtime.Stack(buf, true)
+ s += string(buf[:n])
+ s += "\n"
+ l.Output("", Lerror, 2, s)
+}
+
+// -----------------------------------------
+func (l *Logger) Stat() (stats []int64) {
+ l.mu.Lock()
+ v := l.levelStats
+ l.mu.Unlock()
+ return v[:]
+}
+
+// Flags returns the output flags for the logger.
+func (l *Logger) Flags() int {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ return l.flag
+}
+
+func RmColorFlags(flag int) int {
+ // for un std out, it should not show color since almost them don't support
+ if flag&Llongcolor != 0 {
+ flag = flag ^ Llongcolor
+ }
+ if flag&Lshortcolor != 0 {
+ flag = flag ^ Lshortcolor
+ }
+ return flag
+}
+
+func (l *Logger) Location() *time.Location {
+ return l.loc
+}
+
+func (l *Logger) SetLocation(loc *time.Location) {
+ l.loc = loc
+}
+
+// SetFlags sets the output flags for the logger.
+func (l *Logger) SetFlags(flag int) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.out != os.Stdout {
+ flag = RmColorFlags(flag)
+ }
+ l.flag = flag
+}
+
+// Prefix returns the output prefix for the logger.
+func (l *Logger) Prefix() string {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ return l.prefix
+}
+
+// SetPrefix sets the output prefix for the logger.
+func (l *Logger) SetPrefix(prefix string) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ l.prefix = prefix
+}
+
+// SetOutputLevel sets the output level for the logger.
+func (l *Logger) SetOutputLevel(lvl int) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ l.Level = lvl
+}
+
+func (l *Logger) OutputLevel() int {
+ return l.Level
+}
+
+func (l *Logger) SetOutput(w io.Writer) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ l.out = w
+ if w != os.Stdout {
+ l.flag = RmColorFlags(l.flag)
+ }
+}
+
+// SetOutput sets the output destination for the standard logger.
+func SetOutput(w io.Writer) {
+ Std.SetOutput(w)
+}
+
+func SetLocation(loc *time.Location) {
+ Std.SetLocation(loc)
+}
+
+func Location() *time.Location {
+ return Std.Location()
+}
+
+// Flags returns the output flags for the standard logger.
+func Flags() int {
+ return Std.Flags()
+}
+
+// SetFlags sets the output flags for the standard logger.
+func SetFlags(flag int) {
+ Std.SetFlags(flag)
+}
+
+// Prefix returns the output prefix for the standard logger.
+func Prefix() string {
+ return Std.Prefix()
+}
+
+// SetPrefix sets the output prefix for the standard logger.
+func SetPrefix(prefix string) {
+ Std.SetPrefix(prefix)
+}
+
+func SetOutputLevel(lvl int) {
+ Std.SetOutputLevel(lvl)
+}
+
+func OutputLevel() int {
+ return Std.OutputLevel()
+}
+
+// -----------------------------------------
+
+// Print calls Output to print to the standard logger.
+// Arguments are handled in the manner of fmt.Print.
+func Print(v ...interface{}) {
+ Std.Output("", Linfo, 2, fmt.Sprintln(v...))
+}
+
+// Printf calls Output to print to the standard logger.
+// Arguments are handled in the manner of fmt.Printf.
+func Printf(format string, v ...interface{}) {
+ Std.Output("", Linfo, 2, fmt.Sprintf(format, v...))
+}
+
+// Println calls Output to print to the standard logger.
+// Arguments are handled in the manner of fmt.Println.
+func Println(v ...interface{}) {
+ Std.Output("", Linfo, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func Debugf(format string, v ...interface{}) {
+ Std.Output("", Ldebug, 2, fmt.Sprintf(format, v...))
+}
+
+func Debug(v ...interface{}) {
+ Std.Output("", Ldebug, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func Infof(format string, v ...interface{}) {
+ Std.Output("", Linfo, 2, fmt.Sprintf(format, v...))
+}
+
+func Info(v ...interface{}) {
+ Std.Output("", Linfo, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func Warnf(format string, v ...interface{}) {
+ Std.Output("", Lwarn, 2, fmt.Sprintf(format, v...))
+}
+
+func Warn(v ...interface{}) {
+ Std.Output("", Lwarn, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+func Errorf(format string, v ...interface{}) {
+ Std.Output("", Lerror, 2, fmt.Sprintf(format, v...))
+}
+
+func Error(v ...interface{}) {
+ Std.Output("", Lerror, 2, fmt.Sprintln(v...))
+}
+
+// -----------------------------------------
+
+// Fatal is equivalent to Print() followed by a call to os.Exit(1).
+func Fatal(v ...interface{}) {
+ Std.Output("", Lfatal, 2, fmt.Sprintln(v...))
+}
+
+// Fatalf is equivalent to Printf() followed by a call to os.Exit(1).
+func Fatalf(format string, v ...interface{}) {
+ Std.Output("", Lfatal, 2, fmt.Sprintf(format, v...))
+}
+
+// -----------------------------------------
+
+// Panic is equivalent to Print() followed by a call to panic().
+func Panic(v ...interface{}) {
+ Std.Output("", Lpanic, 2, fmt.Sprintln(v...))
+}
+
+// Panicf is equivalent to Printf() followed by a call to panic().
+func Panicf(format string, v ...interface{}) {
+ Std.Output("", Lpanic, 2, fmt.Sprintf(format, v...))
+}
+
+// -----------------------------------------
+
+func Stack(v ...interface{}) {
+ s := fmt.Sprint(v...)
+ s += "\n"
+ buf := make([]byte, 1024*1024)
+ n := runtime.Stack(buf, true)
+ s += string(buf[:n])
+ s += "\n"
+ Std.Output("", Lerror, 2, s)
+}
+
+// -----------------------------------------
diff --git a/vendor/github.com/lunny/nodb/LICENSE b/vendor/github.com/lunny/nodb/LICENSE
new file mode 100644
index 0000000000..7ece9fdf5a
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 siddontang
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/vendor/github.com/lunny/nodb/batch.go b/vendor/github.com/lunny/nodb/batch.go
new file mode 100644
index 0000000000..e69d96a122
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/batch.go
@@ -0,0 +1,106 @@
+package nodb
+
+import (
+ "sync"
+
+ "github.com/lunny/nodb/store"
+)
+
+type batch struct {
+ l *Nodb
+
+ store.WriteBatch
+
+ sync.Locker
+
+ logs [][]byte
+
+ tx *Tx
+}
+
+func (b *batch) Commit() error {
+ b.l.commitLock.Lock()
+ defer b.l.commitLock.Unlock()
+
+ err := b.WriteBatch.Commit()
+
+ if b.l.binlog != nil {
+ if err == nil {
+ if b.tx == nil {
+ b.l.binlog.Log(b.logs...)
+ } else {
+ b.tx.logs = append(b.tx.logs, b.logs...)
+ }
+ }
+ b.logs = [][]byte{}
+ }
+
+ return err
+}
+
+func (b *batch) Lock() {
+ b.Locker.Lock()
+}
+
+func (b *batch) Unlock() {
+ if b.l.binlog != nil {
+ b.logs = [][]byte{}
+ }
+ b.WriteBatch.Rollback()
+ b.Locker.Unlock()
+}
+
+func (b *batch) Put(key []byte, value []byte) {
+ if b.l.binlog != nil {
+ buf := encodeBinLogPut(key, value)
+ b.logs = append(b.logs, buf)
+ }
+ b.WriteBatch.Put(key, value)
+}
+
+func (b *batch) Delete(key []byte) {
+ if b.l.binlog != nil {
+ buf := encodeBinLogDelete(key)
+ b.logs = append(b.logs, buf)
+ }
+ b.WriteBatch.Delete(key)
+}
+
+type dbBatchLocker struct {
+ l *sync.Mutex
+ wrLock *sync.RWMutex
+}
+
+func (l *dbBatchLocker) Lock() {
+ l.wrLock.RLock()
+ l.l.Lock()
+}
+
+func (l *dbBatchLocker) Unlock() {
+ l.l.Unlock()
+ l.wrLock.RUnlock()
+}
+
+type txBatchLocker struct {
+}
+
+func (l *txBatchLocker) Lock() {}
+func (l *txBatchLocker) Unlock() {}
+
+type multiBatchLocker struct {
+}
+
+func (l *multiBatchLocker) Lock() {}
+func (l *multiBatchLocker) Unlock() {}
+
+func (l *Nodb) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
+ b := new(batch)
+ b.l = l
+ b.WriteBatch = wb
+
+ b.tx = tx
+ b.Locker = locker
+
+ b.logs = [][]byte{}
+ return b
+}
diff --git a/vendor/github.com/lunny/nodb/binlog.go b/vendor/github.com/lunny/nodb/binlog.go
new file mode 100644
index 0000000000..4c094d9463
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/binlog.go
@@ -0,0 +1,391 @@
+package nodb
+
+import (
+ "bufio"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "path"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/lunny/log"
+ "github.com/lunny/nodb/config"
+)
+
+type BinLogHead struct {
+ CreateTime uint32
+ BatchId uint32
+ PayloadLen uint32
+}
+
+func (h *BinLogHead) Len() int {
+ return 12
+}
+
+func (h *BinLogHead) Write(w io.Writer) error {
+ if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil {
+ return err
+ }
+
+ if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil {
+ return err
+ }
+
+ if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (h *BinLogHead) handleReadError(err error) error {
+ if err == io.EOF {
+ return io.ErrUnexpectedEOF
+ } else {
+ return err
+ }
+}
+
+func (h *BinLogHead) Read(r io.Reader) error {
+ var err error
+ if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil {
+ return err
+ }
+
+ if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil {
+ return h.handleReadError(err)
+ }
+
+ if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil {
+ return h.handleReadError(err)
+ }
+
+ return nil
+}
+
+func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool {
+ if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId {
+ return true
+ } else {
+ return false
+ }
+}
+
+/*
+index file format:
+ledis-bin.00001
+ledis-bin.00002
+ledis-bin.00003
+
+log file format
+
+Log: Head|PayloadData
+
+Head: createTime|batchId|payloadData
+
+*/
+
+type BinLog struct {
+ sync.Mutex
+
+ path string
+
+ cfg *config.BinLogConfig
+
+ logFile *os.File
+
+ logWb *bufio.Writer
+
+ indexName string
+ logNames []string
+ lastLogIndex int64
+
+ batchId uint32
+
+ ch chan struct{}
+}
+
+func NewBinLog(cfg *config.Config) (*BinLog, error) {
+ l := new(BinLog)
+
+ l.cfg = &cfg.BinLog
+ l.cfg.Adjust()
+
+ l.path = path.Join(cfg.DataDir, "binlog")
+
+ if err := os.MkdirAll(l.path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ l.logNames = make([]string, 0, 16)
+
+ l.ch = make(chan struct{})
+
+ if err := l.loadIndex(); err != nil {
+ return nil, err
+ }
+
+ return l, nil
+}
+
+func (l *BinLog) flushIndex() error {
+ data := strings.Join(l.logNames, "\n")
+
+ bakName := fmt.Sprintf("%s.bak", l.indexName)
+ f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666)
+ if err != nil {
+ log.Error("create binlog bak index error %s", err.Error())
+ return err
+ }
+
+ if _, err := f.WriteString(data); err != nil {
+ log.Error("write binlog index error %s", err.Error())
+ f.Close()
+ return err
+ }
+
+ f.Close()
+
+ if err := os.Rename(bakName, l.indexName); err != nil {
+ log.Error("rename binlog bak index error %s", err.Error())
+ return err
+ }
+
+ return nil
+}
+
+func (l *BinLog) loadIndex() error {
+ l.indexName = path.Join(l.path, fmt.Sprintf("ledis-bin.index"))
+ if _, err := os.Stat(l.indexName); os.IsNotExist(err) {
+ //no index file, nothing to do
+ } else {
+ indexData, err := ioutil.ReadFile(l.indexName)
+ if err != nil {
+ return err
+ }
+
+ lines := strings.Split(string(indexData), "\n")
+ for _, line := range lines {
+ line = strings.Trim(line, "\r\n ")
+ if len(line) == 0 {
+ continue
+ }
+
+ if _, err := os.Stat(path.Join(l.path, line)); err != nil {
+ log.Error("load index line %s error %s", line, err.Error())
+ return err
+ } else {
+ l.logNames = append(l.logNames, line)
+ }
+ }
+ }
+ if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum {
+ //remove oldest logfile
+ if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil {
+ return err
+ }
+ }
+
+ var err error
+ if len(l.logNames) == 0 {
+ l.lastLogIndex = 1
+ } else {
+ lastName := l.logNames[len(l.logNames)-1]
+
+ if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil {
+ log.Error("invalid logfile name %s", err.Error())
+ return err
+ }
+
+ //like mysql, if server restart, a new binlog will create
+ l.lastLogIndex++
+ }
+
+ return nil
+}
+
+func (l *BinLog) getLogFile() string {
+ return l.FormatLogFileName(l.lastLogIndex)
+}
+
+func (l *BinLog) openNewLogFile() error {
+ var err error
+ lastName := l.getLogFile()
+
+ logPath := path.Join(l.path, lastName)
+ if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil {
+ log.Error("open new logfile error %s", err.Error())
+ return err
+ }
+
+ if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum {
+ l.purge(1)
+ }
+
+ l.logNames = append(l.logNames, lastName)
+
+ if l.logWb == nil {
+ l.logWb = bufio.NewWriterSize(l.logFile, 1024)
+ } else {
+ l.logWb.Reset(l.logFile)
+ }
+
+ if err = l.flushIndex(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (l *BinLog) checkLogFileSize() bool {
+ if l.logFile == nil {
+ return false
+ }
+
+ st, _ := l.logFile.Stat()
+ if st.Size() >= int64(l.cfg.MaxFileSize) {
+ l.closeLog()
+ return true
+ }
+
+ return false
+}
+
+func (l *BinLog) closeLog() {
+ l.lastLogIndex++
+
+ l.logFile.Close()
+ l.logFile = nil
+}
+
+func (l *BinLog) purge(n int) {
+ for i := 0; i < n; i++ {
+ logPath := path.Join(l.path, l.logNames[i])
+ os.Remove(logPath)
+ }
+
+ copy(l.logNames[0:], l.logNames[n:])
+ l.logNames = l.logNames[0 : len(l.logNames)-n]
+}
+
+func (l *BinLog) Close() {
+ if l.logFile != nil {
+ l.logFile.Close()
+ l.logFile = nil
+ }
+}
+
+func (l *BinLog) LogNames() []string {
+ return l.logNames
+}
+
+func (l *BinLog) LogFileName() string {
+ return l.getLogFile()
+}
+
+func (l *BinLog) LogFilePos() int64 {
+ if l.logFile == nil {
+ return 0
+ } else {
+ st, _ := l.logFile.Stat()
+ return st.Size()
+ }
+}
+
+func (l *BinLog) LogFileIndex() int64 {
+ return l.lastLogIndex
+}
+
+func (l *BinLog) FormatLogFileName(index int64) string {
+ return fmt.Sprintf("ledis-bin.%07d", index)
+}
+
+func (l *BinLog) FormatLogFilePath(index int64) string {
+ return path.Join(l.path, l.FormatLogFileName(index))
+}
+
+func (l *BinLog) LogPath() string {
+ return l.path
+}
+
+func (l *BinLog) Purge(n int) error {
+ l.Lock()
+ defer l.Unlock()
+
+ if len(l.logNames) == 0 {
+ return nil
+ }
+
+ if n >= len(l.logNames) {
+ n = len(l.logNames)
+ //can not purge current log file
+ if l.logNames[n-1] == l.getLogFile() {
+ n = n - 1
+ }
+ }
+
+ l.purge(n)
+
+ return l.flushIndex()
+}
+
+func (l *BinLog) PurgeAll() error {
+ l.Lock()
+ defer l.Unlock()
+
+ l.closeLog()
+ return l.openNewLogFile()
+}
+
+func (l *BinLog) Log(args ...[]byte) error {
+ l.Lock()
+ defer l.Unlock()
+
+ var err error
+
+ if l.logFile == nil {
+ if err = l.openNewLogFile(); err != nil {
+ return err
+ }
+ }
+
+ head := &BinLogHead{}
+
+ head.CreateTime = uint32(time.Now().Unix())
+ head.BatchId = l.batchId
+
+ l.batchId++
+
+ for _, data := range args {
+ head.PayloadLen = uint32(len(data))
+
+ if err := head.Write(l.logWb); err != nil {
+ return err
+ }
+
+ if _, err := l.logWb.Write(data); err != nil {
+ return err
+ }
+ }
+
+ if err = l.logWb.Flush(); err != nil {
+ log.Error("write log error %s", err.Error())
+ return err
+ }
+
+ l.checkLogFileSize()
+
+ close(l.ch)
+ l.ch = make(chan struct{})
+
+ return nil
+}
+
+func (l *BinLog) Wait() <-chan struct{} {
+ return l.ch
+}
diff --git a/vendor/github.com/lunny/nodb/binlog_util.go b/vendor/github.com/lunny/nodb/binlog_util.go
new file mode 100644
index 0000000000..22124dda07
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/binlog_util.go
@@ -0,0 +1,215 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "strconv"
+)
+
+var (
+ errBinLogDeleteType = errors.New("invalid bin log delete type")
+ errBinLogPutType = errors.New("invalid bin log put type")
+ errBinLogCommandType = errors.New("invalid bin log command type")
+)
+
+func encodeBinLogDelete(key []byte) []byte {
+ buf := make([]byte, 1+len(key))
+ buf[0] = BinLogTypeDeletion
+ copy(buf[1:], key)
+ return buf
+}
+
+func decodeBinLogDelete(sz []byte) ([]byte, error) {
+ if len(sz) < 1 || sz[0] != BinLogTypeDeletion {
+ return nil, errBinLogDeleteType
+ }
+
+ return sz[1:], nil
+}
+
+func encodeBinLogPut(key []byte, value []byte) []byte {
+ buf := make([]byte, 3+len(key)+len(value))
+ buf[0] = BinLogTypePut
+ pos := 1
+ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
+ pos += 2
+ copy(buf[pos:], key)
+ pos += len(key)
+ copy(buf[pos:], value)
+
+ return buf
+}
+
+func decodeBinLogPut(sz []byte) ([]byte, []byte, error) {
+ if len(sz) < 3 || sz[0] != BinLogTypePut {
+ return nil, nil, errBinLogPutType
+ }
+
+ keyLen := int(binary.BigEndian.Uint16(sz[1:]))
+ if 3+keyLen > len(sz) {
+ return nil, nil, errBinLogPutType
+ }
+
+ return sz[3 : 3+keyLen], sz[3+keyLen:], nil
+}
+
+func FormatBinLogEvent(event []byte) (string, error) {
+ logType := uint8(event[0])
+
+ var err error
+ var k []byte
+ var v []byte
+
+ var buf []byte = make([]byte, 0, 1024)
+
+ switch logType {
+ case BinLogTypePut:
+ k, v, err = decodeBinLogPut(event)
+ buf = append(buf, "PUT "...)
+ case BinLogTypeDeletion:
+ k, err = decodeBinLogDelete(event)
+ buf = append(buf, "DELETE "...)
+ default:
+ err = errInvalidBinLogEvent
+ }
+
+ if err != nil {
+ return "", err
+ }
+
+ if buf, err = formatDataKey(buf, k); err != nil {
+ return "", err
+ }
+
+ if v != nil && len(v) != 0 {
+ buf = append(buf, fmt.Sprintf(" %q", v)...)
+ }
+
+ return String(buf), nil
+}
+
+func formatDataKey(buf []byte, k []byte) ([]byte, error) {
+ if len(k) < 2 {
+ return nil, errInvalidBinLogEvent
+ }
+
+ buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...)
+ buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...)
+
+ db := new(DB)
+ db.index = k[0]
+
+ //to do format at respective place
+
+ switch k[1] {
+ case KVType:
+ if key, err := db.decodeKVKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ case HashType:
+ if key, field, err := db.hDecodeHashKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendQuote(buf, String(field))
+ }
+ case HSizeType:
+ if key, err := db.hDecodeSizeKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ case ListType:
+ if key, seq, err := db.lDecodeListKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendInt(buf, int64(seq), 10)
+ }
+ case LMetaType:
+ if key, err := db.lDecodeMetaKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ case ZSetType:
+ if key, m, err := db.zDecodeSetKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendQuote(buf, String(m))
+ }
+ case ZSizeType:
+ if key, err := db.zDecodeSizeKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ case ZScoreType:
+ if key, m, score, err := db.zDecodeScoreKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendQuote(buf, String(m))
+ buf = append(buf, ' ')
+ buf = strconv.AppendInt(buf, score, 10)
+ }
+ case BitType:
+ if key, seq, err := db.bDecodeBinKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendUint(buf, uint64(seq), 10)
+ }
+ case BitMetaType:
+ if key, err := db.bDecodeMetaKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ case SetType:
+ if key, member, err := db.sDecodeSetKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendQuote(buf, String(member))
+ }
+ case SSizeType:
+ if key, err := db.sDecodeSizeKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ case ExpTimeType:
+ if tp, key, t, err := db.expDecodeTimeKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = append(buf, TypeName[tp]...)
+ buf = append(buf, ' ')
+ buf = strconv.AppendQuote(buf, String(key))
+ buf = append(buf, ' ')
+ buf = strconv.AppendInt(buf, t, 10)
+ }
+ case ExpMetaType:
+ if tp, key, err := db.expDecodeMetaKey(k); err != nil {
+ return nil, err
+ } else {
+ buf = append(buf, TypeName[tp]...)
+ buf = append(buf, ' ')
+ buf = strconv.AppendQuote(buf, String(key))
+ }
+ default:
+ return nil, errInvalidBinLogEvent
+ }
+
+ return buf, nil
+}
diff --git a/vendor/github.com/lunny/nodb/config/config.go b/vendor/github.com/lunny/nodb/config/config.go
new file mode 100644
index 0000000000..3b44d3043f
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/config/config.go
@@ -0,0 +1,135 @@
+package config
+
+import (
+ "io/ioutil"
+
+ "github.com/BurntSushi/toml"
+)
+
+type Size int
+
+const (
+ DefaultAddr string = "127.0.0.1:6380"
+ DefaultHttpAddr string = "127.0.0.1:11181"
+
+ DefaultDBName string = "goleveldb"
+
+ DefaultDataDir string = "./data"
+)
+
+const (
+ MaxBinLogFileSize int = 1024 * 1024 * 1024
+ MaxBinLogFileNum int = 10000
+
+ DefaultBinLogFileSize int = MaxBinLogFileSize
+ DefaultBinLogFileNum int = 10
+)
+
+type LevelDBConfig struct {
+ Compression bool `toml:"compression"`
+ BlockSize int `toml:"block_size"`
+ WriteBufferSize int `toml:"write_buffer_size"`
+ CacheSize int `toml:"cache_size"`
+ MaxOpenFiles int `toml:"max_open_files"`
+}
+
+type LMDBConfig struct {
+ MapSize int `toml:"map_size"`
+ NoSync bool `toml:"nosync"`
+}
+
+type BinLogConfig struct {
+ MaxFileSize int `toml:"max_file_size"`
+ MaxFileNum int `toml:"max_file_num"`
+}
+
+type Config struct {
+ DataDir string `toml:"data_dir"`
+
+ DBName string `toml:"db_name"`
+
+ LevelDB LevelDBConfig `toml:"leveldb"`
+
+ LMDB LMDBConfig `toml:"lmdb"`
+
+ BinLog BinLogConfig `toml:"binlog"`
+
+ SlaveOf string `toml:"slaveof"`
+
+ AccessLog string `toml:"access_log"`
+}
+
+func NewConfigWithFile(fileName string) (*Config, error) {
+ data, err := ioutil.ReadFile(fileName)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewConfigWithData(data)
+}
+
+func NewConfigWithData(data []byte) (*Config, error) {
+ cfg := NewConfigDefault()
+
+ _, err := toml.Decode(string(data), cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ return cfg, nil
+}
+
+func NewConfigDefault() *Config {
+ cfg := new(Config)
+
+ cfg.DataDir = DefaultDataDir
+
+ cfg.DBName = DefaultDBName
+
+ // disable binlog
+ cfg.BinLog.MaxFileNum = 0
+ cfg.BinLog.MaxFileSize = 0
+
+ // disable replication
+ cfg.SlaveOf = ""
+
+ // disable access log
+ cfg.AccessLog = ""
+
+ cfg.LMDB.MapSize = 20 * 1024 * 1024
+ cfg.LMDB.NoSync = true
+
+ return cfg
+}
+
+func (cfg *LevelDBConfig) Adjust() {
+ if cfg.CacheSize <= 0 {
+ cfg.CacheSize = 4 * 1024 * 1024
+ }
+
+ if cfg.BlockSize <= 0 {
+ cfg.BlockSize = 4 * 1024
+ }
+
+ if cfg.WriteBufferSize <= 0 {
+ cfg.WriteBufferSize = 4 * 1024 * 1024
+ }
+
+ if cfg.MaxOpenFiles < 1024 {
+ cfg.MaxOpenFiles = 1024
+ }
+}
+
+func (cfg *BinLogConfig) Adjust() {
+ if cfg.MaxFileSize <= 0 {
+ cfg.MaxFileSize = DefaultBinLogFileSize
+ } else if cfg.MaxFileSize > MaxBinLogFileSize {
+ cfg.MaxFileSize = MaxBinLogFileSize
+ }
+
+ if cfg.MaxFileNum <= 0 {
+ cfg.MaxFileNum = DefaultBinLogFileNum
+ } else if cfg.MaxFileNum > MaxBinLogFileNum {
+ cfg.MaxFileNum = MaxBinLogFileNum
+ }
+}
diff --git a/vendor/github.com/lunny/nodb/const.go b/vendor/github.com/lunny/nodb/const.go
new file mode 100644
index 0000000000..446dae634e
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/const.go
@@ -0,0 +1,98 @@
+package nodb
+
+import (
+ "errors"
+)
+
+const (
+ NoneType byte = 0
+ KVType byte = 1
+ HashType byte = 2
+ HSizeType byte = 3
+ ListType byte = 4
+ LMetaType byte = 5
+ ZSetType byte = 6
+ ZSizeType byte = 7
+ ZScoreType byte = 8
+ BitType byte = 9
+ BitMetaType byte = 10
+ SetType byte = 11
+ SSizeType byte = 12
+
+ maxDataType byte = 100
+
+ ExpTimeType byte = 101
+ ExpMetaType byte = 102
+)
+
+var (
+ TypeName = map[byte]string{
+ KVType: "kv",
+ HashType: "hash",
+ HSizeType: "hsize",
+ ListType: "list",
+ LMetaType: "lmeta",
+ ZSetType: "zset",
+ ZSizeType: "zsize",
+ ZScoreType: "zscore",
+ BitType: "bit",
+ BitMetaType: "bitmeta",
+ SetType: "set",
+ SSizeType: "ssize",
+ ExpTimeType: "exptime",
+ ExpMetaType: "expmeta",
+ }
+)
+
+const (
+ defaultScanCount int = 10
+)
+
+var (
+ errKeySize = errors.New("invalid key size")
+ errValueSize = errors.New("invalid value size")
+ errHashFieldSize = errors.New("invalid hash field size")
+ errSetMemberSize = errors.New("invalid set member size")
+ errZSetMemberSize = errors.New("invalid zset member size")
+ errExpireValue = errors.New("invalid expire value")
+)
+
+const (
+ //we don't support too many databases
+ MaxDBNumber uint8 = 16
+
+ //max key size
+ MaxKeySize int = 1024
+
+ //max hash field size
+ MaxHashFieldSize int = 1024
+
+ //max zset member size
+ MaxZSetMemberSize int = 1024
+
+ //max set member size
+ MaxSetMemberSize int = 1024
+
+ //max value size
+ MaxValueSize int = 10 * 1024 * 1024
+)
+
+var (
+ ErrScoreMiss = errors.New("zset score miss")
+)
+
+const (
+ BinLogTypeDeletion uint8 = 0x0
+ BinLogTypePut uint8 = 0x1
+ BinLogTypeCommand uint8 = 0x2
+)
+
+const (
+ DBAutoCommit uint8 = 0x0
+ DBInTransaction uint8 = 0x1
+ DBInMulti uint8 = 0x2
+)
+
+var (
+ Version = "0.1"
+)
diff --git a/vendor/github.com/lunny/nodb/doc.go b/vendor/github.com/lunny/nodb/doc.go
new file mode 100644
index 0000000000..2f7df33ffd
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/doc.go
@@ -0,0 +1,61 @@
+// package nodb is a high performance embedded NoSQL.
+//
+// nodb supports various data structure like kv, list, hash and zset like redis.
+//
+// Other features include binlog replication, data with a limited time-to-live.
+//
+// Usage
+//
+// First create a nodb instance before use:
+//
+// l := nodb.Open(cfg)
+//
+// cfg is a Config instance which contains configuration for nodb use,
+// like DataDir (root directory for nodb working to store data).
+//
+// After you create a nodb instance, you can select a DB to store you data:
+//
+// db, _ := l.Select(0)
+//
+// DB must be selected by a index, nodb supports only 16 databases, so the index range is [0-15].
+//
+// KV
+//
+// KV is the most basic nodb type like any other key-value database.
+//
+// err := db.Set(key, value)
+// value, err := db.Get(key)
+//
+// List
+//
+// List is simply lists of values, sorted by insertion order.
+// You can push or pop value on the list head (left) or tail (right).
+//
+// err := db.LPush(key, value1)
+// err := db.RPush(key, value2)
+// value1, err := db.LPop(key)
+// value2, err := db.RPop(key)
+//
+// Hash
+//
+// Hash is a map between fields and values.
+//
+// n, err := db.HSet(key, field1, value1)
+// n, err := db.HSet(key, field2, value2)
+// value1, err := db.HGet(key, field1)
+// value2, err := db.HGet(key, field2)
+//
+// ZSet
+//
+// ZSet is a sorted collections of values.
+// Every member of zset is associated with score, a int64 value which used to sort, from smallest to greatest score.
+// Members are unique, but score may be same.
+//
+// n, err := db.ZAdd(key, ScorePair{score1, member1}, ScorePair{score2, member2})
+// ay, err := db.ZRangeByScore(key, minScore, maxScore, 0, -1)
+//
+// Binlog
+//
+// nodb supports binlog, so you can sync binlog to another server for replication. If you want to open binlog support, set UseBinLog to true in config.
+//
+package nodb
diff --git a/vendor/github.com/lunny/nodb/dump.go b/vendor/github.com/lunny/nodb/dump.go
new file mode 100644
index 0000000000..3c9722e00d
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/dump.go
@@ -0,0 +1,200 @@
+package nodb
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "io"
+ "os"
+
+ "github.com/siddontang/go-snappy/snappy"
+)
+
+//dump format
+// fileIndex(bigendian int64)|filePos(bigendian int64)
+// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value......
+//
+//key and value are both compressed for fast transfer dump on network using snappy
+
+type BinLogAnchor struct {
+ LogFileIndex int64
+ LogPos int64
+}
+
+func (m *BinLogAnchor) WriteTo(w io.Writer) error {
+ if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil {
+ return err
+ }
+
+ if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (m *BinLogAnchor) ReadFrom(r io.Reader) error {
+ err := binary.Read(r, binary.BigEndian, &m.LogFileIndex)
+ if err != nil {
+ return err
+ }
+
+ err = binary.Read(r, binary.BigEndian, &m.LogPos)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (l *Nodb) DumpFile(path string) error {
+ f, err := os.Create(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ return l.Dump(f)
+}
+
+func (l *Nodb) Dump(w io.Writer) error {
+ m := new(BinLogAnchor)
+
+ var err error
+
+ l.wLock.Lock()
+ defer l.wLock.Unlock()
+
+ if l.binlog != nil {
+ m.LogFileIndex = l.binlog.LogFileIndex()
+ m.LogPos = l.binlog.LogFilePos()
+ }
+
+ wb := bufio.NewWriterSize(w, 4096)
+ if err = m.WriteTo(wb); err != nil {
+ return err
+ }
+
+ it := l.ldb.NewIterator()
+ it.SeekToFirst()
+
+ compressBuf := make([]byte, 4096)
+
+ var key []byte
+ var value []byte
+ for ; it.Valid(); it.Next() {
+ key = it.RawKey()
+ value = it.RawValue()
+
+ if key, err = snappy.Encode(compressBuf, key); err != nil {
+ return err
+ }
+
+ if err = binary.Write(wb, binary.BigEndian, uint16(len(key))); err != nil {
+ return err
+ }
+
+ if _, err = wb.Write(key); err != nil {
+ return err
+ }
+
+ if value, err = snappy.Encode(compressBuf, value); err != nil {
+ return err
+ }
+
+ if err = binary.Write(wb, binary.BigEndian, uint32(len(value))); err != nil {
+ return err
+ }
+
+ if _, err = wb.Write(value); err != nil {
+ return err
+ }
+ }
+
+ if err = wb.Flush(); err != nil {
+ return err
+ }
+
+ compressBuf = nil
+
+ return nil
+}
+
+func (l *Nodb) LoadDumpFile(path string) (*BinLogAnchor, error) {
+ f, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ return l.LoadDump(f)
+}
+
+func (l *Nodb) LoadDump(r io.Reader) (*BinLogAnchor, error) {
+ l.wLock.Lock()
+ defer l.wLock.Unlock()
+
+ info := new(BinLogAnchor)
+
+ rb := bufio.NewReaderSize(r, 4096)
+
+ err := info.ReadFrom(rb)
+ if err != nil {
+ return nil, err
+ }
+
+ var keyLen uint16
+ var valueLen uint32
+
+ var keyBuf bytes.Buffer
+ var valueBuf bytes.Buffer
+
+ deKeyBuf := make([]byte, 4096)
+ deValueBuf := make([]byte, 4096)
+
+ var key, value []byte
+
+ for {
+ if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF {
+ return nil, err
+ } else if err == io.EOF {
+ break
+ }
+
+ if _, err = io.CopyN(&keyBuf, rb, int64(keyLen)); err != nil {
+ return nil, err
+ }
+
+ if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil {
+ return nil, err
+ }
+
+ if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil {
+ return nil, err
+ }
+
+ if _, err = io.CopyN(&valueBuf, rb, int64(valueLen)); err != nil {
+ return nil, err
+ }
+
+ if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil {
+ return nil, err
+ }
+
+ if err = l.ldb.Put(key, value); err != nil {
+ return nil, err
+ }
+
+ keyBuf.Reset()
+ valueBuf.Reset()
+ }
+
+ deKeyBuf = nil
+ deValueBuf = nil
+
+ //if binlog enable, we will delete all binlogs and open a new one for handling simply
+ if l.binlog != nil {
+ l.binlog.PurgeAll()
+ }
+
+ return info, nil
+}
diff --git a/vendor/github.com/lunny/nodb/info.go b/vendor/github.com/lunny/nodb/info.go
new file mode 100644
index 0000000000..3fd37e3d44
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/info.go
@@ -0,0 +1,24 @@
+package nodb
+
+// todo, add info
+
+// type Keyspace struct {
+// Kvs int `json:"kvs"`
+// KvExpires int `json:"kv_expires"`
+
+// Lists int `json:"lists"`
+// ListExpires int `json:"list_expires"`
+
+// Bitmaps int `json:"bitmaps"`
+// BitmapExpires int `json:"bitmap_expires"`
+
+// ZSets int `json:"zsets"`
+// ZSetExpires int `json:"zset_expires"`
+
+// Hashes int `json:"hashes"`
+// HashExpires int `json:"hahsh_expires"`
+// }
+
+// type Info struct {
+// KeySpaces [MaxDBNumber]Keyspace
+// }
diff --git a/vendor/github.com/lunny/nodb/multi.go b/vendor/github.com/lunny/nodb/multi.go
new file mode 100644
index 0000000000..ca581ce9a2
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/multi.go
@@ -0,0 +1,73 @@
+package nodb
+
+import (
+ "errors"
+ "fmt"
+)
+
+var (
+ ErrNestMulti = errors.New("nest multi not supported")
+ ErrMultiDone = errors.New("multi has been closed")
+)
+
+type Multi struct {
+ *DB
+}
+
+func (db *DB) IsInMulti() bool {
+ return db.status == DBInMulti
+}
+
+// begin a mutli to execute commands,
+// it will block any other write operations before you close the multi, unlike transaction, mutli can not rollback
+func (db *DB) Multi() (*Multi, error) {
+ if db.IsInMulti() {
+ return nil, ErrNestMulti
+ }
+
+ m := new(Multi)
+
+ m.DB = new(DB)
+ m.DB.status = DBInMulti
+
+ m.DB.l = db.l
+
+ m.l.wLock.Lock()
+
+ m.DB.sdb = db.sdb
+
+ m.DB.bucket = db.sdb
+
+ m.DB.index = db.index
+
+ m.DB.kvBatch = m.newBatch()
+ m.DB.listBatch = m.newBatch()
+ m.DB.hashBatch = m.newBatch()
+ m.DB.zsetBatch = m.newBatch()
+ m.DB.binBatch = m.newBatch()
+ m.DB.setBatch = m.newBatch()
+
+ return m, nil
+}
+
+func (m *Multi) newBatch() *batch {
+ return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
+}
+
+func (m *Multi) Close() error {
+ if m.bucket == nil {
+ return ErrMultiDone
+ }
+ m.l.wLock.Unlock()
+ m.bucket = nil
+ return nil
+}
+
+func (m *Multi) Select(index int) error {
+ if index < 0 || index >= int(MaxDBNumber) {
+ return fmt.Errorf("invalid db index %d", index)
+ }
+
+ m.DB.index = uint8(index)
+ return nil
+}
diff --git a/vendor/github.com/lunny/nodb/nodb.go b/vendor/github.com/lunny/nodb/nodb.go
new file mode 100644
index 0000000000..fdd0272c94
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/nodb.go
@@ -0,0 +1,128 @@
+package nodb
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/lunny/log"
+ "github.com/lunny/nodb/config"
+ "github.com/lunny/nodb/store"
+)
+
+type Nodb struct {
+ cfg *config.Config
+
+ ldb *store.DB
+ dbs [MaxDBNumber]*DB
+
+ quit chan struct{}
+ jobs *sync.WaitGroup
+
+ binlog *BinLog
+
+ wLock sync.RWMutex //allow one write at same time
+ commitLock sync.Mutex //allow one write commit at same time
+}
+
+func Open(cfg *config.Config) (*Nodb, error) {
+ if len(cfg.DataDir) == 0 {
+ cfg.DataDir = config.DefaultDataDir
+ }
+
+ ldb, err := store.Open(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ l := new(Nodb)
+
+ l.quit = make(chan struct{})
+ l.jobs = new(sync.WaitGroup)
+
+ l.ldb = ldb
+
+ if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 {
+ l.binlog, err = NewBinLog(cfg)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ l.binlog = nil
+ }
+
+ for i := uint8(0); i < MaxDBNumber; i++ {
+ l.dbs[i] = l.newDB(i)
+ }
+
+ l.activeExpireCycle()
+
+ return l, nil
+}
+
+func (l *Nodb) Close() {
+ close(l.quit)
+ l.jobs.Wait()
+
+ l.ldb.Close()
+
+ if l.binlog != nil {
+ l.binlog.Close()
+ l.binlog = nil
+ }
+}
+
+func (l *Nodb) Select(index int) (*DB, error) {
+ if index < 0 || index >= int(MaxDBNumber) {
+ return nil, fmt.Errorf("invalid db index %d", index)
+ }
+
+ return l.dbs[index], nil
+}
+
+func (l *Nodb) FlushAll() error {
+ for index, db := range l.dbs {
+ if _, err := db.FlushAll(); err != nil {
+ log.Error("flush db %d error %s", index, err.Error())
+ }
+ }
+
+ return nil
+}
+
+// very dangerous to use
+func (l *Nodb) DataDB() *store.DB {
+ return l.ldb
+}
+
+func (l *Nodb) activeExpireCycle() {
+ var executors []*elimination = make([]*elimination, len(l.dbs))
+ for i, db := range l.dbs {
+ executors[i] = db.newEliminator()
+ }
+
+ l.jobs.Add(1)
+ go func() {
+ tick := time.NewTicker(1 * time.Second)
+ end := false
+ done := make(chan struct{})
+ for !end {
+ select {
+ case <-tick.C:
+ go func() {
+ for _, eli := range executors {
+ eli.active()
+ }
+ done <- struct{}{}
+ }()
+ <-done
+ case <-l.quit:
+ end = true
+ break
+ }
+ }
+
+ tick.Stop()
+ l.jobs.Done()
+ }()
+}
diff --git a/vendor/github.com/lunny/nodb/nodb_db.go b/vendor/github.com/lunny/nodb/nodb_db.go
new file mode 100644
index 0000000000..f68ebaa0d4
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/nodb_db.go
@@ -0,0 +1,171 @@
+package nodb
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/lunny/nodb/store"
+)
+
+type ibucket interface {
+ Get(key []byte) ([]byte, error)
+
+ Put(key []byte, value []byte) error
+ Delete(key []byte) error
+
+ NewIterator() *store.Iterator
+
+ NewWriteBatch() store.WriteBatch
+
+ RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
+ RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
+ RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
+ RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
+}
+
+type DB struct {
+ l *Nodb
+
+ sdb *store.DB
+
+ bucket ibucket
+
+ index uint8
+
+ kvBatch *batch
+ listBatch *batch
+ hashBatch *batch
+ zsetBatch *batch
+ binBatch *batch
+ setBatch *batch
+
+ status uint8
+}
+
+func (l *Nodb) newDB(index uint8) *DB {
+ d := new(DB)
+
+ d.l = l
+
+ d.sdb = l.ldb
+
+ d.bucket = d.sdb
+
+ d.status = DBAutoCommit
+ d.index = index
+
+ d.kvBatch = d.newBatch()
+ d.listBatch = d.newBatch()
+ d.hashBatch = d.newBatch()
+ d.zsetBatch = d.newBatch()
+ d.binBatch = d.newBatch()
+ d.setBatch = d.newBatch()
+
+ return d
+}
+
+func (db *DB) newBatch() *batch {
+ return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
+}
+
+func (db *DB) Index() int {
+ return int(db.index)
+}
+
+func (db *DB) IsAutoCommit() bool {
+ return db.status == DBAutoCommit
+}
+
+func (db *DB) FlushAll() (drop int64, err error) {
+ all := [...](func() (int64, error)){
+ db.flush,
+ db.lFlush,
+ db.hFlush,
+ db.zFlush,
+ db.bFlush,
+ db.sFlush}
+
+ for _, flush := range all {
+ if n, e := flush(); e != nil {
+ err = e
+ return
+ } else {
+ drop += n
+ }
+ }
+
+ return
+}
+
+func (db *DB) newEliminator() *elimination {
+ eliminator := newEliminator(db)
+
+ eliminator.regRetireContext(KVType, db.kvBatch, db.delete)
+ eliminator.regRetireContext(ListType, db.listBatch, db.lDelete)
+ eliminator.regRetireContext(HashType, db.hashBatch, db.hDelete)
+ eliminator.regRetireContext(ZSetType, db.zsetBatch, db.zDelete)
+ eliminator.regRetireContext(BitType, db.binBatch, db.bDelete)
+ eliminator.regRetireContext(SetType, db.setBatch, db.sDelete)
+
+ return eliminator
+}
+
+func (db *DB) flushRegion(t *batch, minKey []byte, maxKey []byte) (drop int64, err error) {
+ it := db.bucket.RangeIterator(minKey, maxKey, store.RangeROpen)
+ for ; it.Valid(); it.Next() {
+ t.Delete(it.RawKey())
+ drop++
+ if drop&1023 == 0 {
+ if err = t.Commit(); err != nil {
+ return
+ }
+ }
+ }
+ it.Close()
+ return
+}
+
+func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
+ var deleteFunc func(t *batch, key []byte) int64
+ var metaDataType byte
+ switch dataType {
+ case KVType:
+ deleteFunc = db.delete
+ metaDataType = KVType
+ case ListType:
+ deleteFunc = db.lDelete
+ metaDataType = LMetaType
+ case HashType:
+ deleteFunc = db.hDelete
+ metaDataType = HSizeType
+ case ZSetType:
+ deleteFunc = db.zDelete
+ metaDataType = ZSizeType
+ case BitType:
+ deleteFunc = db.bDelete
+ metaDataType = BitMetaType
+ case SetType:
+ deleteFunc = db.sDelete
+ metaDataType = SSizeType
+ default:
+ return 0, fmt.Errorf("invalid data type: %s", TypeName[dataType])
+ }
+
+ var keys [][]byte
+ keys, err = db.scan(metaDataType, nil, 1024, false, "")
+ for len(keys) != 0 || err != nil {
+ for _, key := range keys {
+ deleteFunc(t, key)
+ db.rmExpire(t, dataType, key)
+
+ }
+
+ if err = t.Commit(); err != nil {
+ return
+ } else {
+ drop += int64(len(keys))
+ }
+ keys, err = db.scan(metaDataType, nil, 1024, false, "")
+ }
+ return
+}
diff --git a/vendor/github.com/lunny/nodb/replication.go b/vendor/github.com/lunny/nodb/replication.go
new file mode 100644
index 0000000000..f9bc951085
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/replication.go
@@ -0,0 +1,312 @@
+package nodb
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "io"
+ "os"
+ "time"
+
+ "github.com/lunny/log"
+ "github.com/lunny/nodb/store/driver"
+)
+
+const (
+ maxReplBatchNum = 100
+ maxReplLogSize = 1 * 1024 * 1024
+)
+
+var (
+ ErrSkipEvent = errors.New("skip to next event")
+)
+
+var (
+ errInvalidBinLogEvent = errors.New("invalid binglog event")
+ errInvalidBinLogFile = errors.New("invalid binlog file")
+)
+
+type replBatch struct {
+ wb driver.IWriteBatch
+ events [][]byte
+ l *Nodb
+
+ lastHead *BinLogHead
+}
+
+func (b *replBatch) Commit() error {
+ b.l.commitLock.Lock()
+ defer b.l.commitLock.Unlock()
+
+ err := b.wb.Commit()
+ if err != nil {
+ b.Rollback()
+ return err
+ }
+
+ if b.l.binlog != nil {
+ if err = b.l.binlog.Log(b.events...); err != nil {
+ b.Rollback()
+ return err
+ }
+ }
+
+ b.events = [][]byte{}
+ b.lastHead = nil
+
+ return nil
+}
+
+func (b *replBatch) Rollback() error {
+ b.wb.Rollback()
+ b.events = [][]byte{}
+ b.lastHead = nil
+ return nil
+}
+
+func (l *Nodb) replicateEvent(b *replBatch, event []byte) error {
+ if len(event) == 0 {
+ return errInvalidBinLogEvent
+ }
+
+ b.events = append(b.events, event)
+
+ logType := uint8(event[0])
+ switch logType {
+ case BinLogTypePut:
+ return l.replicatePutEvent(b, event)
+ case BinLogTypeDeletion:
+ return l.replicateDeleteEvent(b, event)
+ default:
+ return errInvalidBinLogEvent
+ }
+}
+
+func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error {
+ key, value, err := decodeBinLogPut(event)
+ if err != nil {
+ return err
+ }
+
+ b.wb.Put(key, value)
+
+ return nil
+}
+
+func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error {
+ key, err := decodeBinLogDelete(event)
+ if err != nil {
+ return err
+ }
+
+ b.wb.Delete(key)
+
+ return nil
+}
+
+func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
+ head := &BinLogHead{}
+ var err error
+
+ for {
+ if err = head.Read(rb); err != nil {
+ if err == io.EOF {
+ break
+ } else {
+ return err
+ }
+ }
+
+ var dataBuf bytes.Buffer
+
+ if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
+ return err
+ }
+
+ err = f(head, dataBuf.Bytes())
+ if err != nil && err != ErrSkipEvent {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (l *Nodb) ReplicateFromReader(rb io.Reader) error {
+ b := new(replBatch)
+
+ b.wb = l.ldb.NewWriteBatch()
+ b.l = l
+
+ f := func(head *BinLogHead, event []byte) error {
+ if b.lastHead == nil {
+ b.lastHead = head
+ } else if !b.lastHead.InSameBatch(head) {
+ if err := b.Commit(); err != nil {
+ log.Fatal("replication error %s, skip to next", err.Error())
+ return ErrSkipEvent
+ }
+ b.lastHead = head
+ }
+
+ err := l.replicateEvent(b, event)
+ if err != nil {
+ log.Fatal("replication error %s, skip to next", err.Error())
+ return ErrSkipEvent
+ }
+ return nil
+ }
+
+ err := ReadEventFromReader(rb, f)
+ if err != nil {
+ b.Rollback()
+ return err
+ }
+ return b.Commit()
+}
+
+func (l *Nodb) ReplicateFromData(data []byte) error {
+ rb := bytes.NewReader(data)
+
+ err := l.ReplicateFromReader(rb)
+
+ return err
+}
+
+func (l *Nodb) ReplicateFromBinLog(filePath string) error {
+ f, err := os.Open(filePath)
+ if err != nil {
+ return err
+ }
+
+ rb := bufio.NewReaderSize(f, 4096)
+
+ err = l.ReplicateFromReader(rb)
+
+ f.Close()
+
+ return err
+}
+
+// try to read events, if no events read, try to wait the new event singal until timeout seconds
+func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
+ lastIndex := info.LogFileIndex
+ lastPos := info.LogPos
+
+ n = 0
+ if l.binlog == nil {
+ //binlog not supported
+ info.LogFileIndex = 0
+ info.LogPos = 0
+ return
+ }
+
+ n, err = l.ReadEventsTo(info, w)
+ if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
+ //no events read
+ select {
+ case <-l.binlog.Wait():
+ case <-time.After(time.Duration(timeout) * time.Second):
+ }
+ return l.ReadEventsTo(info, w)
+ }
+ return
+}
+
+func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
+ n = 0
+ if l.binlog == nil {
+ //binlog not supported
+ info.LogFileIndex = 0
+ info.LogPos = 0
+ return
+ }
+
+ index := info.LogFileIndex
+ offset := info.LogPos
+
+ filePath := l.binlog.FormatLogFilePath(index)
+
+ var f *os.File
+ f, err = os.Open(filePath)
+ if os.IsNotExist(err) {
+ lastIndex := l.binlog.LogFileIndex()
+
+ if index == lastIndex {
+ //no binlog at all
+ info.LogPos = 0
+ } else {
+ //slave binlog info had lost
+ info.LogFileIndex = -1
+ }
+ }
+
+ if err != nil {
+ if os.IsNotExist(err) {
+ err = nil
+ }
+ return
+ }
+
+ defer f.Close()
+
+ var fileSize int64
+ st, _ := f.Stat()
+ fileSize = st.Size()
+
+ if fileSize == info.LogPos {
+ return
+ }
+
+ if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
+ //may be invliad seek offset
+ return
+ }
+
+ var lastHead *BinLogHead = nil
+
+ head := &BinLogHead{}
+
+ batchNum := 0
+
+ for {
+ if err = head.Read(f); err != nil {
+ if err == io.EOF {
+ //we will try to use next binlog
+ if index < l.binlog.LogFileIndex() {
+ info.LogFileIndex += 1
+ info.LogPos = 0
+ }
+ err = nil
+ return
+ } else {
+ return
+ }
+
+ }
+
+ if lastHead == nil {
+ lastHead = head
+ batchNum++
+ } else if !lastHead.InSameBatch(head) {
+ lastHead = head
+ batchNum++
+ if batchNum > maxReplBatchNum || n > maxReplLogSize {
+ return
+ }
+ }
+
+ if err = head.Write(w); err != nil {
+ return
+ }
+
+ if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
+ return
+ }
+
+ n += (head.Len() + int(head.PayloadLen))
+ info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
+ }
+
+ return
+}
diff --git a/vendor/github.com/lunny/nodb/scan.go b/vendor/github.com/lunny/nodb/scan.go
new file mode 100644
index 0000000000..e989db3fed
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/scan.go
@@ -0,0 +1,144 @@
+package nodb
+
+import (
+ "bytes"
+ "errors"
+ "regexp"
+
+ "github.com/lunny/nodb/store"
+)
+
+var errDataType = errors.New("error data type")
+var errMetaKey = errors.New("error meta key")
+
+// Seek search the prefix key
+func (db *DB) Seek(key []byte) (*store.Iterator, error) {
+ return db.seek(KVType, key)
+}
+
+func (db *DB) seek(dataType byte, key []byte) (*store.Iterator, error) {
+ var minKey []byte
+ var err error
+
+ if len(key) > 0 {
+ if err = checkKeySize(key); err != nil {
+ return nil, err
+ }
+ if minKey, err = db.encodeMetaKey(dataType, key); err != nil {
+ return nil, err
+ }
+
+ } else {
+ if minKey, err = db.encodeMinKey(dataType); err != nil {
+ return nil, err
+ }
+ }
+
+ it := db.bucket.NewIterator()
+ it.Seek(minKey)
+ return it, nil
+}
+
+func (db *DB) MaxKey() ([]byte, error) {
+ return db.encodeMaxKey(KVType)
+}
+
+func (db *DB) Key(it *store.Iterator) ([]byte, error) {
+ return db.decodeMetaKey(KVType, it.Key())
+}
+
+func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ var minKey, maxKey []byte
+ var err error
+ var r *regexp.Regexp
+
+ if len(match) > 0 {
+ if r, err = regexp.Compile(match); err != nil {
+ return nil, err
+ }
+ }
+
+ if len(key) > 0 {
+ if err = checkKeySize(key); err != nil {
+ return nil, err
+ }
+ if minKey, err = db.encodeMetaKey(dataType, key); err != nil {
+ return nil, err
+ }
+
+ } else {
+ if minKey, err = db.encodeMinKey(dataType); err != nil {
+ return nil, err
+ }
+ }
+
+ if maxKey, err = db.encodeMaxKey(dataType); err != nil {
+ return nil, err
+ }
+
+ if count <= 0 {
+ count = defaultScanCount
+ }
+
+ v := make([][]byte, 0, count)
+
+ it := db.bucket.NewIterator()
+ it.Seek(minKey)
+
+ if !inclusive {
+ if it.Valid() && bytes.Equal(it.RawKey(), minKey) {
+ it.Next()
+ }
+ }
+
+ for i := 0; it.Valid() && i < count && bytes.Compare(it.RawKey(), maxKey) < 0; it.Next() {
+ if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil {
+ continue
+ } else if r != nil && !r.Match(k) {
+ continue
+ } else {
+ v = append(v, k)
+ i++
+ }
+ }
+ it.Close()
+ return v, nil
+}
+
+func (db *DB) encodeMinKey(dataType byte) ([]byte, error) {
+ return db.encodeMetaKey(dataType, nil)
+}
+
+func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) {
+ k, err := db.encodeMetaKey(dataType, nil)
+ if err != nil {
+ return nil, err
+ }
+ k[len(k)-1] = dataType + 1
+ return k, nil
+}
+
+func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) {
+ switch dataType {
+ case KVType:
+ return db.encodeKVKey(key), nil
+ case LMetaType:
+ return db.lEncodeMetaKey(key), nil
+ case HSizeType:
+ return db.hEncodeSizeKey(key), nil
+ case ZSizeType:
+ return db.zEncodeSizeKey(key), nil
+ case BitMetaType:
+ return db.bEncodeMetaKey(key), nil
+ case SSizeType:
+ return db.sEncodeSizeKey(key), nil
+ default:
+ return nil, errDataType
+ }
+}
+func (db *DB) decodeMetaKey(dataType byte, ek []byte) ([]byte, error) {
+ if len(ek) < 2 || ek[0] != db.index || ek[1] != dataType {
+ return nil, errMetaKey
+ }
+ return ek[2:], nil
+}
diff --git a/vendor/github.com/lunny/nodb/store/db.go b/vendor/github.com/lunny/nodb/store/db.go
new file mode 100644
index 0000000000..00a8831a67
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/db.go
@@ -0,0 +1,61 @@
+package store
+
+import (
+ "github.com/lunny/nodb/store/driver"
+)
+
+type DB struct {
+ driver.IDB
+}
+
+func (db *DB) NewIterator() *Iterator {
+ it := new(Iterator)
+ it.it = db.IDB.NewIterator()
+
+ return it
+}
+
+func (db *DB) NewWriteBatch() WriteBatch {
+ return db.IDB.NewWriteBatch()
+}
+
+func (db *DB) NewSnapshot() (*Snapshot, error) {
+ var err error
+ s := &Snapshot{}
+ if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil {
+ return nil, err
+ }
+
+ return s, nil
+}
+
+func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
+ return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
+}
+
+func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
+ return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
+}
+
+//count < 0, unlimit.
+//
+//offset must >= 0, if < 0, will get nothing.
+func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
+ return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
+}
+
+//count < 0, unlimit.
+//
+//offset must >= 0, if < 0, will get nothing.
+func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
+ return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
+}
+
+func (db *DB) Begin() (*Tx, error) {
+ tx, err := db.IDB.Begin()
+ if err != nil {
+ return nil, err
+ }
+
+ return &Tx{tx}, nil
+}
diff --git a/vendor/github.com/lunny/nodb/store/driver/batch.go b/vendor/github.com/lunny/nodb/store/driver/batch.go
new file mode 100644
index 0000000000..6b79c21c48
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/driver/batch.go
@@ -0,0 +1,39 @@
+package driver
+
+type BatchPuter interface {
+ BatchPut([]Write) error
+}
+
+type Write struct {
+ Key []byte
+ Value []byte
+}
+
+type WriteBatch struct {
+ batch BatchPuter
+ wb []Write
+}
+
+func (w *WriteBatch) Put(key, value []byte) {
+ if value == nil {
+ value = []byte{}
+ }
+ w.wb = append(w.wb, Write{key, value})
+}
+
+func (w *WriteBatch) Delete(key []byte) {
+ w.wb = append(w.wb, Write{key, nil})
+}
+
+func (w *WriteBatch) Commit() error {
+ return w.batch.BatchPut(w.wb)
+}
+
+func (w *WriteBatch) Rollback() error {
+ w.wb = w.wb[0:0]
+ return nil
+}
+
+func NewWriteBatch(puter BatchPuter) IWriteBatch {
+ return &WriteBatch{puter, []Write{}}
+}
diff --git a/vendor/github.com/lunny/nodb/store/driver/driver.go b/vendor/github.com/lunny/nodb/store/driver/driver.go
new file mode 100644
index 0000000000..6da67df083
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/driver/driver.go
@@ -0,0 +1,67 @@
+package driver
+
+import (
+ "errors"
+)
+
+var (
+ ErrTxSupport = errors.New("transaction is not supported")
+)
+
+type IDB interface {
+ Close() error
+
+ Get(key []byte) ([]byte, error)
+
+ Put(key []byte, value []byte) error
+ Delete(key []byte) error
+
+ NewIterator() IIterator
+
+ NewWriteBatch() IWriteBatch
+
+ NewSnapshot() (ISnapshot, error)
+
+ Begin() (Tx, error)
+}
+
+type ISnapshot interface {
+ Get(key []byte) ([]byte, error)
+ NewIterator() IIterator
+ Close()
+}
+
+type IIterator interface {
+ Close() error
+
+ First()
+ Last()
+ Seek(key []byte)
+
+ Next()
+ Prev()
+
+ Valid() bool
+
+ Key() []byte
+ Value() []byte
+}
+
+type IWriteBatch interface {
+ Put(key []byte, value []byte)
+ Delete(key []byte)
+ Commit() error
+ Rollback() error
+}
+
+type Tx interface {
+ Get(key []byte) ([]byte, error)
+ Put(key []byte, value []byte) error
+ Delete(key []byte) error
+
+ NewIterator() IIterator
+ NewWriteBatch() IWriteBatch
+
+ Commit() error
+ Rollback() error
+}
diff --git a/vendor/github.com/lunny/nodb/store/driver/store.go b/vendor/github.com/lunny/nodb/store/driver/store.go
new file mode 100644
index 0000000000..173431d4c1
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/driver/store.go
@@ -0,0 +1,46 @@
+package driver
+
+import (
+ "fmt"
+
+ "github.com/lunny/nodb/config"
+)
+
+type Store interface {
+ String() string
+ Open(path string, cfg *config.Config) (IDB, error)
+ Repair(path string, cfg *config.Config) error
+}
+
+var dbs = map[string]Store{}
+
+func Register(s Store) {
+ name := s.String()
+ if _, ok := dbs[name]; ok {
+ panic(fmt.Errorf("store %s is registered", s))
+ }
+
+ dbs[name] = s
+}
+
+func ListStores() []string {
+ s := []string{}
+ for k, _ := range dbs {
+ s = append(s, k)
+ }
+
+ return s
+}
+
+func GetStore(cfg *config.Config) (Store, error) {
+ if len(cfg.DBName) == 0 {
+ cfg.DBName = config.DefaultDBName
+ }
+
+ s, ok := dbs[cfg.DBName]
+ if !ok {
+ return nil, fmt.Errorf("store %s is not registered", cfg.DBName)
+ }
+
+ return s, nil
+}
diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/batch.go b/vendor/github.com/lunny/nodb/store/goleveldb/batch.go
new file mode 100644
index 0000000000..b17e85e750
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/goleveldb/batch.go
@@ -0,0 +1,27 @@
+package goleveldb
+
+import (
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type WriteBatch struct {
+ db *DB
+ wbatch *leveldb.Batch
+}
+
+func (w *WriteBatch) Put(key, value []byte) {
+ w.wbatch.Put(key, value)
+}
+
+func (w *WriteBatch) Delete(key []byte) {
+ w.wbatch.Delete(key)
+}
+
+func (w *WriteBatch) Commit() error {
+ return w.db.db.Write(w.wbatch, nil)
+}
+
+func (w *WriteBatch) Rollback() error {
+ w.wbatch.Reset()
+ return nil
+}
diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/const.go b/vendor/github.com/lunny/nodb/store/goleveldb/const.go
new file mode 100644
index 0000000000..2fffa7c82b
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/goleveldb/const.go
@@ -0,0 +1,4 @@
+package goleveldb
+
+const DBName = "goleveldb"
+const MemDBName = "memory"
diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/db.go b/vendor/github.com/lunny/nodb/store/goleveldb/db.go
new file mode 100644
index 0000000000..a36e87f628
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/goleveldb/db.go
@@ -0,0 +1,187 @@
+package goleveldb
+
+import (
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/cache"
+ "github.com/syndtr/goleveldb/leveldb/filter"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+
+ "github.com/lunny/nodb/config"
+ "github.com/lunny/nodb/store/driver"
+
+ "os"
+)
+
+const defaultFilterBits int = 10
+
+type Store struct {
+}
+
+func (s Store) String() string {
+ return DBName
+}
+
+type MemStore struct {
+}
+
+func (s MemStore) String() string {
+ return MemDBName
+}
+
+type DB struct {
+ path string
+
+ cfg *config.LevelDBConfig
+
+ db *leveldb.DB
+
+ opts *opt.Options
+
+ iteratorOpts *opt.ReadOptions
+
+ cache cache.Cache
+
+ filter filter.Filter
+}
+
+func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
+ if err := os.MkdirAll(path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ db := new(DB)
+ db.path = path
+ db.cfg = &cfg.LevelDB
+
+ db.initOpts()
+
+ var err error
+ db.db, err = leveldb.OpenFile(db.path, db.opts)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return db, nil
+}
+
+func (s Store) Repair(path string, cfg *config.Config) error {
+ db, err := leveldb.RecoverFile(path, newOptions(&cfg.LevelDB))
+ if err != nil {
+ return err
+ }
+
+ db.Close()
+ return nil
+}
+
+func (s MemStore) Open(path string, cfg *config.Config) (driver.IDB, error) {
+ db := new(DB)
+ db.path = path
+ db.cfg = &cfg.LevelDB
+
+ db.initOpts()
+
+ var err error
+ db.db, err = leveldb.Open(storage.NewMemStorage(), db.opts)
+ if err != nil {
+ return nil, err
+ }
+
+ return db, nil
+}
+
+func (s MemStore) Repair(path string, cfg *config.Config) error {
+ return nil
+}
+
+func (db *DB) initOpts() {
+ db.opts = newOptions(db.cfg)
+
+ db.iteratorOpts = &opt.ReadOptions{}
+ db.iteratorOpts.DontFillCache = true
+}
+
+func newOptions(cfg *config.LevelDBConfig) *opt.Options {
+ opts := &opt.Options{}
+ opts.ErrorIfMissing = false
+
+ cfg.Adjust()
+
+ //opts.BlockCacher = cache.NewLRU(cfg.CacheSize)
+ opts.BlockCacheCapacity = cfg.CacheSize
+
+ //we must use bloomfilter
+ opts.Filter = filter.NewBloomFilter(defaultFilterBits)
+
+ if !cfg.Compression {
+ opts.Compression = opt.NoCompression
+ } else {
+ opts.Compression = opt.SnappyCompression
+ }
+
+ opts.BlockSize = cfg.BlockSize
+ opts.WriteBuffer = cfg.WriteBufferSize
+
+ return opts
+}
+
+func (db *DB) Close() error {
+ return db.db.Close()
+}
+
+func (db *DB) Put(key, value []byte) error {
+ return db.db.Put(key, value, nil)
+}
+
+func (db *DB) Get(key []byte) ([]byte, error) {
+ v, err := db.db.Get(key, nil)
+ if err == leveldb.ErrNotFound {
+ return nil, nil
+ }
+ return v, nil
+}
+
+func (db *DB) Delete(key []byte) error {
+ return db.db.Delete(key, nil)
+}
+
+func (db *DB) NewWriteBatch() driver.IWriteBatch {
+ wb := &WriteBatch{
+ db: db,
+ wbatch: new(leveldb.Batch),
+ }
+ return wb
+}
+
+func (db *DB) NewIterator() driver.IIterator {
+ it := &Iterator{
+ db.db.NewIterator(nil, db.iteratorOpts),
+ }
+
+ return it
+}
+
+func (db *DB) Begin() (driver.Tx, error) {
+ return nil, driver.ErrTxSupport
+}
+
+func (db *DB) NewSnapshot() (driver.ISnapshot, error) {
+ snapshot, err := db.db.GetSnapshot()
+ if err != nil {
+ return nil, err
+ }
+
+ s := &Snapshot{
+ db: db,
+ snp: snapshot,
+ }
+
+ return s, nil
+}
+
+func init() {
+ driver.Register(Store{})
+ driver.Register(MemStore{})
+}
diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/iterator.go b/vendor/github.com/lunny/nodb/store/goleveldb/iterator.go
new file mode 100644
index 0000000000..c1fd8b5573
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/goleveldb/iterator.go
@@ -0,0 +1,49 @@
+package goleveldb
+
+import (
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+type Iterator struct {
+ it iterator.Iterator
+}
+
+func (it *Iterator) Key() []byte {
+ return it.it.Key()
+}
+
+func (it *Iterator) Value() []byte {
+ return it.it.Value()
+}
+
+func (it *Iterator) Close() error {
+ if it.it != nil {
+ it.it.Release()
+ it.it = nil
+ }
+ return nil
+}
+
+func (it *Iterator) Valid() bool {
+ return it.it.Valid()
+}
+
+func (it *Iterator) Next() {
+ it.it.Next()
+}
+
+func (it *Iterator) Prev() {
+ it.it.Prev()
+}
+
+func (it *Iterator) First() {
+ it.it.First()
+}
+
+func (it *Iterator) Last() {
+ it.it.Last()
+}
+
+func (it *Iterator) Seek(key []byte) {
+ it.it.Seek(key)
+}
diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go b/vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go
new file mode 100644
index 0000000000..fe2b409c3f
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go
@@ -0,0 +1,26 @@
+package goleveldb
+
+import (
+ "github.com/lunny/nodb/store/driver"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type Snapshot struct {
+ db *DB
+ snp *leveldb.Snapshot
+}
+
+func (s *Snapshot) Get(key []byte) ([]byte, error) {
+ return s.snp.Get(key, s.db.iteratorOpts)
+}
+
+func (s *Snapshot) NewIterator() driver.IIterator {
+ it := &Iterator{
+ s.snp.NewIterator(nil, s.db.iteratorOpts),
+ }
+ return it
+}
+
+func (s *Snapshot) Close() {
+ s.snp.Release()
+}
diff --git a/vendor/github.com/lunny/nodb/store/iterator.go b/vendor/github.com/lunny/nodb/store/iterator.go
new file mode 100644
index 0000000000..27bf689da2
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/iterator.go
@@ -0,0 +1,327 @@
+package store
+
+import (
+ "bytes"
+
+ "github.com/lunny/nodb/store/driver"
+)
+
+const (
+ IteratorForward uint8 = 0
+ IteratorBackward uint8 = 1
+)
+
+const (
+ RangeClose uint8 = 0x00
+ RangeLOpen uint8 = 0x01
+ RangeROpen uint8 = 0x10
+ RangeOpen uint8 = 0x11
+)
+
+// min must less or equal than max
+//
+// range type:
+//
+// close: [min, max]
+// open: (min, max)
+// lopen: (min, max]
+// ropen: [min, max)
+//
+type Range struct {
+ Min []byte
+ Max []byte
+
+ Type uint8
+}
+
+type Limit struct {
+ Offset int
+ Count int
+}
+
+type Iterator struct {
+ it driver.IIterator
+}
+
+// Returns a copy of key.
+func (it *Iterator) Key() []byte {
+ k := it.it.Key()
+ if k == nil {
+ return nil
+ }
+
+ return append([]byte{}, k...)
+}
+
+// Returns a copy of value.
+func (it *Iterator) Value() []byte {
+ v := it.it.Value()
+ if v == nil {
+ return nil
+ }
+
+ return append([]byte{}, v...)
+}
+
+// Returns a reference of key.
+// you must be careful that it will be changed after next iterate.
+func (it *Iterator) RawKey() []byte {
+ return it.it.Key()
+}
+
+// Returns a reference of value.
+// you must be careful that it will be changed after next iterate.
+func (it *Iterator) RawValue() []byte {
+ return it.it.Value()
+}
+
+// Copy key to b, if b len is small or nil, returns a new one.
+func (it *Iterator) BufKey(b []byte) []byte {
+ k := it.RawKey()
+ if k == nil {
+ return nil
+ }
+ if b == nil {
+ b = []byte{}
+ }
+
+ b = b[0:0]
+ return append(b, k...)
+}
+
+// Copy value to b, if b len is small or nil, returns a new one.
+func (it *Iterator) BufValue(b []byte) []byte {
+ v := it.RawValue()
+ if v == nil {
+ return nil
+ }
+
+ if b == nil {
+ b = []byte{}
+ }
+
+ b = b[0:0]
+ return append(b, v...)
+}
+
+func (it *Iterator) Close() {
+ if it.it != nil {
+ it.it.Close()
+ it.it = nil
+ }
+}
+
+func (it *Iterator) Valid() bool {
+ return it.it.Valid()
+}
+
+func (it *Iterator) Next() {
+ it.it.Next()
+}
+
+func (it *Iterator) Prev() {
+ it.it.Prev()
+}
+
+func (it *Iterator) SeekToFirst() {
+ it.it.First()
+}
+
+func (it *Iterator) SeekToLast() {
+ it.it.Last()
+}
+
+func (it *Iterator) Seek(key []byte) {
+ it.it.Seek(key)
+}
+
+// Finds by key, if not found, nil returns.
+func (it *Iterator) Find(key []byte) []byte {
+ it.Seek(key)
+ if it.Valid() {
+ k := it.RawKey()
+ if k == nil {
+ return nil
+ } else if bytes.Equal(k, key) {
+ return it.Value()
+ }
+ }
+
+ return nil
+}
+
+// Finds by key, if not found, nil returns, else a reference of value returns.
+// you must be careful that it will be changed after next iterate.
+func (it *Iterator) RawFind(key []byte) []byte {
+ it.Seek(key)
+ if it.Valid() {
+ k := it.RawKey()
+ if k == nil {
+ return nil
+ } else if bytes.Equal(k, key) {
+ return it.RawValue()
+ }
+ }
+
+ return nil
+}
+
+type RangeLimitIterator struct {
+ it *Iterator
+
+ r *Range
+ l *Limit
+
+ step int
+
+ //0 for IteratorForward, 1 for IteratorBackward
+ direction uint8
+}
+
+func (it *RangeLimitIterator) Key() []byte {
+ return it.it.Key()
+}
+
+func (it *RangeLimitIterator) Value() []byte {
+ return it.it.Value()
+}
+
+func (it *RangeLimitIterator) RawKey() []byte {
+ return it.it.RawKey()
+}
+
+func (it *RangeLimitIterator) RawValue() []byte {
+ return it.it.RawValue()
+}
+
+func (it *RangeLimitIterator) BufKey(b []byte) []byte {
+ return it.it.BufKey(b)
+}
+
+func (it *RangeLimitIterator) BufValue(b []byte) []byte {
+ return it.it.BufValue(b)
+}
+
+func (it *RangeLimitIterator) Valid() bool {
+ if it.l.Offset < 0 {
+ return false
+ } else if !it.it.Valid() {
+ return false
+ } else if it.l.Count >= 0 && it.step >= it.l.Count {
+ return false
+ }
+
+ if it.direction == IteratorForward {
+ if it.r.Max != nil {
+ r := bytes.Compare(it.it.RawKey(), it.r.Max)
+ if it.r.Type&RangeROpen > 0 {
+ return !(r >= 0)
+ } else {
+ return !(r > 0)
+ }
+ }
+ } else {
+ if it.r.Min != nil {
+ r := bytes.Compare(it.it.RawKey(), it.r.Min)
+ if it.r.Type&RangeLOpen > 0 {
+ return !(r <= 0)
+ } else {
+ return !(r < 0)
+ }
+ }
+ }
+
+ return true
+}
+
+func (it *RangeLimitIterator) Next() {
+ it.step++
+
+ if it.direction == IteratorForward {
+ it.it.Next()
+ } else {
+ it.it.Prev()
+ }
+}
+
+func (it *RangeLimitIterator) Close() {
+ it.it.Close()
+}
+
+func NewRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterator {
+ return rangeLimitIterator(i, r, l, IteratorForward)
+}
+
+func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterator {
+ return rangeLimitIterator(i, r, l, IteratorBackward)
+}
+
+func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator {
+ return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward)
+}
+
+func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator {
+ return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward)
+}
+
+func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator {
+ it := new(RangeLimitIterator)
+
+ it.it = i
+
+ it.r = r
+ it.l = l
+ it.direction = direction
+
+ it.step = 0
+
+ if l.Offset < 0 {
+ return it
+ }
+
+ if direction == IteratorForward {
+ if r.Min == nil {
+ it.it.SeekToFirst()
+ } else {
+ it.it.Seek(r.Min)
+
+ if r.Type&RangeLOpen > 0 {
+ if it.it.Valid() && bytes.Equal(it.it.RawKey(), r.Min) {
+ it.it.Next()
+ }
+ }
+ }
+ } else {
+ if r.Max == nil {
+ it.it.SeekToLast()
+ } else {
+ it.it.Seek(r.Max)
+
+ if !it.it.Valid() {
+ it.it.SeekToLast()
+ } else {
+ if !bytes.Equal(it.it.RawKey(), r.Max) {
+ it.it.Prev()
+ }
+ }
+
+ if r.Type&RangeROpen > 0 {
+ if it.it.Valid() && bytes.Equal(it.it.RawKey(), r.Max) {
+ it.it.Prev()
+ }
+ }
+ }
+ }
+
+ for i := 0; i < l.Offset; i++ {
+ if it.it.Valid() {
+ if it.direction == IteratorForward {
+ it.it.Next()
+ } else {
+ it.it.Prev()
+ }
+ }
+ }
+
+ return it
+}
diff --git a/vendor/github.com/lunny/nodb/store/snapshot.go b/vendor/github.com/lunny/nodb/store/snapshot.go
new file mode 100644
index 0000000000..75ba0497db
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/snapshot.go
@@ -0,0 +1,16 @@
+package store
+
+import (
+ "github.com/lunny/nodb/store/driver"
+)
+
+type Snapshot struct {
+ driver.ISnapshot
+}
+
+func (s *Snapshot) NewIterator() *Iterator {
+ it := new(Iterator)
+ it.it = s.ISnapshot.NewIterator()
+
+ return it
+}
diff --git a/vendor/github.com/lunny/nodb/store/store.go b/vendor/github.com/lunny/nodb/store/store.go
new file mode 100644
index 0000000000..5d0ade1bf0
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/store.go
@@ -0,0 +1,51 @@
+package store
+
+import (
+ "fmt"
+ "os"
+ "path"
+ "github.com/lunny/nodb/config"
+ "github.com/lunny/nodb/store/driver"
+
+ _ "github.com/lunny/nodb/store/goleveldb"
+)
+
+func getStorePath(cfg *config.Config) string {
+ return path.Join(cfg.DataDir, fmt.Sprintf("%s_data", cfg.DBName))
+}
+
+func Open(cfg *config.Config) (*DB, error) {
+ s, err := driver.GetStore(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ path := getStorePath(cfg)
+
+ if err := os.MkdirAll(path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ idb, err := s.Open(path, cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ db := &DB{idb}
+
+ return db, nil
+}
+
+func Repair(cfg *config.Config) error {
+ s, err := driver.GetStore(cfg)
+ if err != nil {
+ return err
+ }
+
+ path := getStorePath(cfg)
+
+ return s.Repair(path, cfg)
+}
+
+func init() {
+}
diff --git a/vendor/github.com/lunny/nodb/store/tx.go b/vendor/github.com/lunny/nodb/store/tx.go
new file mode 100644
index 0000000000..32bcbcda4b
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/tx.go
@@ -0,0 +1,42 @@
+package store
+
+import (
+ "github.com/lunny/nodb/store/driver"
+)
+
+type Tx struct {
+ driver.Tx
+}
+
+func (tx *Tx) NewIterator() *Iterator {
+ it := new(Iterator)
+ it.it = tx.Tx.NewIterator()
+
+ return it
+}
+
+func (tx *Tx) NewWriteBatch() WriteBatch {
+ return tx.Tx.NewWriteBatch()
+}
+
+func (tx *Tx) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
+ return NewRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
+}
+
+func (tx *Tx) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
+ return NewRevRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
+}
+
+//count < 0, unlimit.
+//
+//offset must >= 0, if < 0, will get nothing.
+func (tx *Tx) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
+ return NewRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
+}
+
+//count < 0, unlimit.
+//
+//offset must >= 0, if < 0, will get nothing.
+func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
+ return NewRevRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
+}
diff --git a/vendor/github.com/lunny/nodb/store/writebatch.go b/vendor/github.com/lunny/nodb/store/writebatch.go
new file mode 100644
index 0000000000..23e079eba6
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/store/writebatch.go
@@ -0,0 +1,9 @@
+package store
+
+import (
+ "github.com/lunny/nodb/store/driver"
+)
+
+type WriteBatch interface {
+ driver.IWriteBatch
+}
diff --git a/vendor/github.com/lunny/nodb/t_bit.go b/vendor/github.com/lunny/nodb/t_bit.go
new file mode 100644
index 0000000000..930d4ba568
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_bit.go
@@ -0,0 +1,922 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "sort"
+ "time"
+
+ "github.com/lunny/nodb/store"
+)
+
+const (
+ OPand uint8 = iota + 1
+ OPor
+ OPxor
+ OPnot
+)
+
+type BitPair struct {
+ Pos int32
+ Val uint8
+}
+
+type segBitInfo struct {
+ Seq uint32
+ Off uint32
+ Val uint8
+}
+
+type segBitInfoArray []segBitInfo
+
+const (
+ // byte
+ segByteWidth uint32 = 9
+ segByteSize uint32 = 1 << segByteWidth
+
+ // bit
+ segBitWidth uint32 = segByteWidth + 3
+ segBitSize uint32 = segByteSize << 3
+
+ maxByteSize uint32 = 8 << 20
+ maxSegCount uint32 = maxByteSize / segByteSize
+
+ minSeq uint32 = 0
+ maxSeq uint32 = uint32((maxByteSize << 3) - 1)
+)
+
+var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3,
+ 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3,
+ 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4,
+ 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4,
+ 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4,
+ 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2,
+ 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3,
+ 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
+ 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4,
+ 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6,
+ 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5,
+ 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}
+
+var fillBits = [...]uint8{1, 3, 7, 15, 31, 63, 127, 255}
+
+var emptySegment []byte = make([]byte, segByteSize, segByteSize)
+
+var fillSegment []byte = func() []byte {
+ data := make([]byte, segByteSize, segByteSize)
+ for i := uint32(0); i < segByteSize; i++ {
+ data[i] = 0xff
+ }
+ return data
+}()
+
+var errBinKey = errors.New("invalid bin key")
+var errOffset = errors.New("invalid offset")
+var errDuplicatePos = errors.New("duplicate bit pos")
+
+func getBit(sz []byte, offset uint32) uint8 {
+ index := offset >> 3
+ if index >= uint32(len(sz)) {
+ return 0 // error("overflow")
+ }
+
+ offset -= index << 3
+ return sz[index] >> offset & 1
+}
+
+func setBit(sz []byte, offset uint32, val uint8) bool {
+ if val != 1 && val != 0 {
+ return false // error("invalid val")
+ }
+
+ index := offset >> 3
+ if index >= uint32(len(sz)) {
+ return false // error("overflow")
+ }
+
+ offset -= index << 3
+ if sz[index]>>offset&1 != val {
+ sz[index] ^= (1 << offset)
+ }
+ return true
+}
+
+func (datas segBitInfoArray) Len() int {
+ return len(datas)
+}
+
+func (datas segBitInfoArray) Less(i, j int) bool {
+ res := (datas)[i].Seq < (datas)[j].Seq
+ if !res && (datas)[i].Seq == (datas)[j].Seq {
+ res = (datas)[i].Off < (datas)[j].Off
+ }
+ return res
+}
+
+func (datas segBitInfoArray) Swap(i, j int) {
+ datas[i], datas[j] = datas[j], datas[i]
+}
+
+func (db *DB) bEncodeMetaKey(key []byte) []byte {
+ mk := make([]byte, len(key)+2)
+ mk[0] = db.index
+ mk[1] = BitMetaType
+
+ copy(mk[2:], key)
+ return mk
+}
+
+func (db *DB) bDecodeMetaKey(bkey []byte) ([]byte, error) {
+ if len(bkey) < 2 || bkey[0] != db.index || bkey[1] != BitMetaType {
+ return nil, errBinKey
+ }
+
+ return bkey[2:], nil
+}
+
+func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte {
+ bk := make([]byte, len(key)+8)
+
+ pos := 0
+ bk[pos] = db.index
+ pos++
+ bk[pos] = BitType
+ pos++
+
+ binary.BigEndian.PutUint16(bk[pos:], uint16(len(key)))
+ pos += 2
+
+ copy(bk[pos:], key)
+ pos += len(key)
+
+ binary.BigEndian.PutUint32(bk[pos:], seq)
+
+ return bk
+}
+
+func (db *DB) bDecodeBinKey(bkey []byte) (key []byte, seq uint32, err error) {
+ if len(bkey) < 8 || bkey[0] != db.index {
+ err = errBinKey
+ return
+ }
+
+ keyLen := binary.BigEndian.Uint16(bkey[2:4])
+ if int(keyLen+8) != len(bkey) {
+ err = errBinKey
+ return
+ }
+
+ key = bkey[4 : 4+keyLen]
+ seq = uint32(binary.BigEndian.Uint32(bkey[4+keyLen:]))
+ return
+}
+
+func (db *DB) bCapByteSize(seq uint32, off uint32) uint32 {
+ var offByteSize uint32 = (off >> 3) + 1
+ if offByteSize > segByteSize {
+ offByteSize = segByteSize
+ }
+
+ return seq<<segByteWidth + offByteSize
+}
+
+func (db *DB) bParseOffset(key []byte, offset int32) (seq uint32, off uint32, err error) {
+ if offset < 0 {
+ if tailSeq, tailOff, e := db.bGetMeta(key); e != nil {
+ err = e
+ return
+ } else if tailSeq >= 0 {
+ offset += int32((uint32(tailSeq)<<segBitWidth | uint32(tailOff)) + 1)
+ if offset < 0 {
+ err = errOffset
+ return
+ }
+ }
+ }
+
+ off = uint32(offset)
+
+ seq = off >> segBitWidth
+ off &= (segBitSize - 1)
+ return
+}
+
+func (db *DB) bGetMeta(key []byte) (tailSeq int32, tailOff int32, err error) {
+ var v []byte
+
+ mk := db.bEncodeMetaKey(key)
+ v, err = db.bucket.Get(mk)
+ if err != nil {
+ return
+ }
+
+ if v != nil {
+ tailSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
+ tailOff = int32(binary.LittleEndian.Uint32(v[4:8]))
+ } else {
+ tailSeq = -1
+ tailOff = -1
+ }
+ return
+}
+
+func (db *DB) bSetMeta(t *batch, key []byte, tailSeq uint32, tailOff uint32) {
+ ek := db.bEncodeMetaKey(key)
+
+ buf := make([]byte, 8)
+ binary.LittleEndian.PutUint32(buf[0:4], tailSeq)
+ binary.LittleEndian.PutUint32(buf[4:8], tailOff)
+
+ t.Put(ek, buf)
+ return
+}
+
+func (db *DB) bUpdateMeta(t *batch, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) {
+ var tseq, toff int32
+ var update bool = false
+
+ if tseq, toff, err = db.bGetMeta(key); err != nil {
+ return
+ } else if tseq < 0 {
+ update = true
+ } else {
+ tailSeq = uint32(MaxInt32(tseq, 0))
+ tailOff = uint32(MaxInt32(toff, 0))
+ update = (seq > tailSeq || (seq == tailSeq && off > tailOff))
+ }
+
+ if update {
+ db.bSetMeta(t, key, seq, off)
+ tailSeq = seq
+ tailOff = off
+ }
+ return
+}
+
+func (db *DB) bDelete(t *batch, key []byte) (drop int64) {
+ mk := db.bEncodeMetaKey(key)
+ t.Delete(mk)
+
+ minKey := db.bEncodeBinKey(key, minSeq)
+ maxKey := db.bEncodeBinKey(key, maxSeq)
+ it := db.bucket.RangeIterator(minKey, maxKey, store.RangeClose)
+ for ; it.Valid(); it.Next() {
+ t.Delete(it.RawKey())
+ drop++
+ }
+ it.Close()
+
+ return drop
+}
+
+func (db *DB) bGetSegment(key []byte, seq uint32) ([]byte, []byte, error) {
+ bk := db.bEncodeBinKey(key, seq)
+ segment, err := db.bucket.Get(bk)
+ if err != nil {
+ return bk, nil, err
+ }
+ return bk, segment, nil
+}
+
+func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) {
+ bk, segment, err := db.bGetSegment(key, seq)
+ if err == nil && segment == nil {
+ segment = make([]byte, segByteSize, segByteSize)
+ }
+ return bk, segment, err
+}
+
+func (db *DB) bIterator(key []byte) *store.RangeLimitIterator {
+ sk := db.bEncodeBinKey(key, minSeq)
+ ek := db.bEncodeBinKey(key, maxSeq)
+ return db.bucket.RangeIterator(sk, ek, store.RangeClose)
+}
+
+func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) {
+ if a == nil || b == nil {
+ *res = nil
+ return
+ }
+
+ data := *res
+ if data == nil {
+ data = make([]byte, segByteSize, segByteSize)
+ *res = data
+ }
+
+ for i := uint32(0); i < segByteSize; i++ {
+ data[i] = a[i] & b[i]
+ }
+ return
+}
+
+func (db *DB) bSegOr(a []byte, b []byte, res *[]byte) {
+ if a == nil || b == nil {
+ if a == nil && b == nil {
+ *res = nil
+ } else if a == nil {
+ *res = b
+ } else {
+ *res = a
+ }
+ return
+ }
+
+ data := *res
+ if data == nil {
+ data = make([]byte, segByteSize, segByteSize)
+ *res = data
+ }
+
+ for i := uint32(0); i < segByteSize; i++ {
+ data[i] = a[i] | b[i]
+ }
+ return
+}
+
+func (db *DB) bSegXor(a []byte, b []byte, res *[]byte) {
+ if a == nil && b == nil {
+ *res = fillSegment
+ return
+ }
+
+ if a == nil {
+ a = emptySegment
+ }
+
+ if b == nil {
+ b = emptySegment
+ }
+
+ data := *res
+ if data == nil {
+ data = make([]byte, segByteSize, segByteSize)
+ *res = data
+ }
+
+ for i := uint32(0); i < segByteSize; i++ {
+ data[i] = a[i] ^ b[i]
+ }
+
+ return
+}
+
+func (db *DB) bExpireAt(key []byte, when int64) (int64, error) {
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 {
+ return 0, err
+ } else {
+ db.expireAt(t, BitType, key, when)
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ }
+ return 1, nil
+}
+
+func (db *DB) bCountByte(val byte, soff uint32, eoff uint32) int32 {
+ if soff > eoff {
+ soff, eoff = eoff, soff
+ }
+
+ mask := uint8(0)
+ if soff > 0 {
+ mask |= fillBits[soff-1]
+ }
+ if eoff < 7 {
+ mask |= (fillBits[7] ^ fillBits[eoff])
+ }
+ mask = fillBits[7] ^ mask
+
+ return bitsInByte[val&mask]
+}
+
+func (db *DB) bCountSeg(key []byte, seq uint32, soff uint32, eoff uint32) (cnt int32, err error) {
+ if soff >= segBitSize || soff < 0 ||
+ eoff >= segBitSize || eoff < 0 {
+ return
+ }
+
+ var segment []byte
+ if _, segment, err = db.bGetSegment(key, seq); err != nil {
+ return
+ }
+
+ if segment == nil {
+ return
+ }
+
+ if soff > eoff {
+ soff, eoff = eoff, soff
+ }
+
+ headIdx := int(soff >> 3)
+ endIdx := int(eoff >> 3)
+ sByteOff := soff - ((soff >> 3) << 3)
+ eByteOff := eoff - ((eoff >> 3) << 3)
+
+ if headIdx == endIdx {
+ cnt = db.bCountByte(segment[headIdx], sByteOff, eByteOff)
+ } else {
+ cnt = db.bCountByte(segment[headIdx], sByteOff, 7) +
+ db.bCountByte(segment[endIdx], 0, eByteOff)
+ }
+
+ // sum up following bytes
+ for idx, end := headIdx+1, endIdx-1; idx <= end; idx += 1 {
+ cnt += bitsInByte[segment[idx]]
+ if idx == end {
+ break
+ }
+ }
+
+ return
+}
+
+func (db *DB) BGet(key []byte) (data []byte, err error) {
+ if err = checkKeySize(key); err != nil {
+ return
+ }
+
+ var ts, to int32
+ if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 {
+ return
+ }
+
+ var tailSeq, tailOff = uint32(ts), uint32(to)
+ var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff)
+ data = make([]byte, capByteSize, capByteSize)
+
+ minKey := db.bEncodeBinKey(key, minSeq)
+ maxKey := db.bEncodeBinKey(key, tailSeq)
+ it := db.bucket.RangeIterator(minKey, maxKey, store.RangeClose)
+
+ var seq, s, e uint32
+ for ; it.Valid(); it.Next() {
+ if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil {
+ data = nil
+ break
+ }
+
+ s = seq << segByteWidth
+ e = MinUInt32(s+segByteSize, capByteSize)
+ copy(data[s:e], it.RawValue())
+ }
+ it.Close()
+
+ return
+}
+
+func (db *DB) BDelete(key []byte) (drop int64, err error) {
+ if err = checkKeySize(key); err != nil {
+ return
+ }
+
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ drop = db.bDelete(t, key)
+ db.rmExpire(t, BitType, key)
+
+ err = t.Commit()
+ return
+}
+
+func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) {
+ if err = checkKeySize(key); err != nil {
+ return
+ }
+
+ // todo : check offset
+ var seq, off uint32
+ if seq, off, err = db.bParseOffset(key, offset); err != nil {
+ return 0, err
+ }
+
+ var bk, segment []byte
+ if bk, segment, err = db.bAllocateSegment(key, seq); err != nil {
+ return 0, err
+ }
+
+ if segment != nil {
+ ori = getBit(segment, off)
+ if setBit(segment, off, val) {
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ t.Put(bk, segment)
+ if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil {
+ err = e
+ return
+ }
+
+ err = t.Commit()
+ }
+ }
+
+ return
+}
+
+func (db *DB) BMSetBit(key []byte, args ...BitPair) (place int64, err error) {
+ if err = checkKeySize(key); err != nil {
+ return
+ }
+
+ // (ps : so as to aviod wasting memory copy while calling db.Get() and batch.Put(),
+ // here we sequence the params by pos, so that we can merge the execution of
+ // diff pos setting which targets on the same segment respectively. )
+
+ // #1 : sequence request data
+ var argCnt = len(args)
+ var bitInfos segBitInfoArray = make(segBitInfoArray, argCnt)
+ var seq, off uint32
+
+ for i, info := range args {
+ if seq, off, err = db.bParseOffset(key, info.Pos); err != nil {
+ return
+ }
+
+ bitInfos[i].Seq = seq
+ bitInfos[i].Off = off
+ bitInfos[i].Val = info.Val
+ }
+
+ sort.Sort(bitInfos)
+
+ for i := 1; i < argCnt; i++ {
+ if bitInfos[i].Seq == bitInfos[i-1].Seq && bitInfos[i].Off == bitInfos[i-1].Off {
+ return 0, errDuplicatePos
+ }
+ }
+
+ // #2 : execute bit set in order
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var curBinKey, curSeg []byte
+ var curSeq, maxSeq, maxOff uint32
+
+ for _, info := range bitInfos {
+ if curSeg != nil && info.Seq != curSeq {
+ t.Put(curBinKey, curSeg)
+ curSeg = nil
+ }
+
+ if curSeg == nil {
+ curSeq = info.Seq
+ if curBinKey, curSeg, err = db.bAllocateSegment(key, info.Seq); err != nil {
+ return
+ }
+
+ if curSeg == nil {
+ continue
+ }
+ }
+
+ if setBit(curSeg, info.Off, info.Val) {
+ maxSeq = info.Seq
+ maxOff = info.Off
+ place++
+ }
+ }
+
+ if curSeg != nil {
+ t.Put(curBinKey, curSeg)
+ }
+
+ // finally, update meta
+ if place > 0 {
+ if _, _, err = db.bUpdateMeta(t, key, maxSeq, maxOff); err != nil {
+ return
+ }
+
+ err = t.Commit()
+ }
+
+ return
+}
+
+func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) {
+ if seq, off, err := db.bParseOffset(key, offset); err != nil {
+ return 0, err
+ } else {
+ _, segment, err := db.bGetSegment(key, seq)
+ if err != nil {
+ return 0, err
+ }
+
+ if segment == nil {
+ return 0, nil
+ } else {
+ return getBit(segment, off), nil
+ }
+ }
+}
+
+// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) {
+// section := make([]byte)
+
+// return
+// }
+
+func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) {
+ var sseq, soff uint32
+ if sseq, soff, err = db.bParseOffset(key, start); err != nil {
+ return
+ }
+
+ var eseq, eoff uint32
+ if eseq, eoff, err = db.bParseOffset(key, end); err != nil {
+ return
+ }
+
+ if sseq > eseq || (sseq == eseq && soff > eoff) {
+ sseq, eseq = eseq, sseq
+ soff, eoff = eoff, soff
+ }
+
+ var segCnt int32
+ if eseq == sseq {
+ if segCnt, err = db.bCountSeg(key, sseq, soff, eoff); err != nil {
+ return 0, err
+ }
+
+ cnt = segCnt
+
+ } else {
+ if segCnt, err = db.bCountSeg(key, sseq, soff, segBitSize-1); err != nil {
+ return 0, err
+ } else {
+ cnt += segCnt
+ }
+
+ if segCnt, err = db.bCountSeg(key, eseq, 0, eoff); err != nil {
+ return 0, err
+ } else {
+ cnt += segCnt
+ }
+ }
+
+ // middle segs
+ var segment []byte
+ skey := db.bEncodeBinKey(key, sseq)
+ ekey := db.bEncodeBinKey(key, eseq)
+
+ it := db.bucket.RangeIterator(skey, ekey, store.RangeOpen)
+ for ; it.Valid(); it.Next() {
+ segment = it.RawValue()
+ for _, bt := range segment {
+ cnt += bitsInByte[bt]
+ }
+ }
+ it.Close()
+
+ return
+}
+
+func (db *DB) BTail(key []byte) (int32, error) {
+ // effective length of data, the highest bit-pos set in history
+ tailSeq, tailOff, err := db.bGetMeta(key)
+ if err != nil {
+ return 0, err
+ }
+
+ tail := int32(-1)
+ if tailSeq >= 0 {
+ tail = int32(uint32(tailSeq)<<segBitWidth | uint32(tailOff))
+ }
+
+ return tail, nil
+}
+
+func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32, err error) {
+ // blen -
+ // the total bit size of data stored in destination key,
+ // that is equal to the size of the longest input string.
+
+ var exeOp func([]byte, []byte, *[]byte)
+ switch op {
+ case OPand:
+ exeOp = db.bSegAnd
+ case OPor:
+ exeOp = db.bSegOr
+ case OPxor, OPnot:
+ exeOp = db.bSegXor
+ default:
+ return
+ }
+
+ if dstkey == nil || srckeys == nil {
+ return
+ }
+
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var srcKseq, srcKoff int32
+ var seq, off, maxDstSeq, maxDstOff uint32
+
+ var keyNum int = len(srckeys)
+ var validKeyNum int
+ for i := 0; i < keyNum; i++ {
+ if srcKseq, srcKoff, err = db.bGetMeta(srckeys[i]); err != nil {
+ return
+ } else if srcKseq < 0 {
+ srckeys[i] = nil
+ continue
+ }
+
+ validKeyNum++
+
+ seq = uint32(srcKseq)
+ off = uint32(srcKoff)
+ if seq > maxDstSeq || (seq == maxDstSeq && off > maxDstOff) {
+ maxDstSeq = seq
+ maxDstOff = off
+ }
+ }
+
+ if (op == OPnot && validKeyNum != 1) ||
+ (op != OPnot && validKeyNum < 2) {
+ return // with not enough existing source key
+ }
+
+ var srcIdx int
+ for srcIdx = 0; srcIdx < keyNum; srcIdx++ {
+ if srckeys[srcIdx] != nil {
+ break
+ }
+ }
+
+ // init - data
+ var segments = make([][]byte, maxDstSeq+1)
+
+ if op == OPnot {
+ // ps :
+ // ( ~num == num ^ 0x11111111 )
+ // we init the result segments with all bit set,
+ // then we can calculate through the way of 'xor'.
+
+ // ahead segments bin format : 1111 ... 1111
+ for i := uint32(0); i < maxDstSeq; i++ {
+ segments[i] = fillSegment
+ }
+
+ // last segment bin format : 1111..1100..0000
+ var tailSeg = make([]byte, segByteSize, segByteSize)
+ var fillByte = fillBits[7]
+ var tailSegLen = db.bCapByteSize(uint32(0), maxDstOff)
+ for i := uint32(0); i < tailSegLen-1; i++ {
+ tailSeg[i] = fillByte
+ }
+ tailSeg[tailSegLen-1] = fillBits[maxDstOff-(tailSegLen-1)<<3]
+ segments[maxDstSeq] = tailSeg
+
+ } else {
+ // ps : init segments by data corresponding to the 1st valid source key
+ it := db.bIterator(srckeys[srcIdx])
+ for ; it.Valid(); it.Next() {
+ if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil {
+ // to do ...
+ it.Close()
+ return
+ }
+ segments[seq] = it.Value()
+ }
+ it.Close()
+ srcIdx++
+ }
+
+ // operation with following keys
+ var res []byte
+ for i := srcIdx; i < keyNum; i++ {
+ if srckeys[i] == nil {
+ continue
+ }
+
+ it := db.bIterator(srckeys[i])
+ for idx, end := uint32(0), false; !end; it.Next() {
+ end = !it.Valid()
+ if !end {
+ if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil {
+ // to do ...
+ it.Close()
+ return
+ }
+ } else {
+ seq = maxDstSeq + 1
+ }
+
+ // todo :
+ // operation 'and' can be optimize here :
+ // if seq > max_segments_idx, this loop can be break,
+ // which can avoid cost from Key() and bDecodeBinKey()
+
+ for ; idx < seq; idx++ {
+ res = nil
+ exeOp(segments[idx], nil, &res)
+ segments[idx] = res
+ }
+
+ if !end {
+ res = it.Value()
+ exeOp(segments[seq], res, &res)
+ segments[seq] = res
+ idx++
+ }
+ }
+ it.Close()
+ }
+
+ // clear the old data in case
+ db.bDelete(t, dstkey)
+ db.rmExpire(t, BitType, dstkey)
+
+ // set data
+ db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff)
+
+ var bk []byte
+ for seq, segt := range segments {
+ if segt != nil {
+ bk = db.bEncodeBinKey(dstkey, uint32(seq))
+ t.Put(bk, segt)
+ }
+ }
+
+ err = t.Commit()
+ if err == nil {
+ // blen = int32(db.bCapByteSize(maxDstOff, maxDstOff))
+ blen = int32(maxDstSeq<<segBitWidth | maxDstOff + 1)
+ }
+
+ return
+}
+
+func (db *DB) BExpire(key []byte, duration int64) (int64, error) {
+ if duration <= 0 {
+ return 0, errExpireValue
+ }
+
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.bExpireAt(key, time.Now().Unix()+duration)
+}
+
+func (db *DB) BExpireAt(key []byte, when int64) (int64, error) {
+ if when <= time.Now().Unix() {
+ return 0, errExpireValue
+ }
+
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.bExpireAt(key, when)
+}
+
+func (db *DB) BTTL(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.ttl(BitType, key)
+}
+
+func (db *DB) BPersist(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ n, err := db.rmExpire(t, BitType, key)
+ if err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return n, err
+}
+
+func (db *DB) BScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ return db.scan(BitMetaType, key, count, inclusive, match)
+}
+
+func (db *DB) bFlush() (drop int64, err error) {
+ t := db.binBatch
+ t.Lock()
+ defer t.Unlock()
+
+ return db.flushType(t, BitType)
+}
diff --git a/vendor/github.com/lunny/nodb/t_hash.go b/vendor/github.com/lunny/nodb/t_hash.go
new file mode 100644
index 0000000000..bedfbf7c3e
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_hash.go
@@ -0,0 +1,509 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "time"
+
+ "github.com/lunny/nodb/store"
+)
+
+type FVPair struct {
+ Field []byte
+ Value []byte
+}
+
+var errHashKey = errors.New("invalid hash key")
+var errHSizeKey = errors.New("invalid hsize key")
+
+const (
+ hashStartSep byte = ':'
+ hashStopSep byte = hashStartSep + 1
+)
+
+func checkHashKFSize(key []byte, field []byte) error {
+ if len(key) > MaxKeySize || len(key) == 0 {
+ return errKeySize
+ } else if len(field) > MaxHashFieldSize || len(field) == 0 {
+ return errHashFieldSize
+ }
+ return nil
+}
+
+func (db *DB) hEncodeSizeKey(key []byte) []byte {
+ buf := make([]byte, len(key)+2)
+
+ buf[0] = db.index
+ buf[1] = HSizeType
+
+ copy(buf[2:], key)
+ return buf
+}
+
+func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) {
+ if len(ek) < 2 || ek[0] != db.index || ek[1] != HSizeType {
+ return nil, errHSizeKey
+ }
+
+ return ek[2:], nil
+}
+
+func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte {
+ buf := make([]byte, len(key)+len(field)+1+1+2+1)
+
+ pos := 0
+ buf[pos] = db.index
+ pos++
+ buf[pos] = HashType
+ pos++
+
+ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
+ pos += 2
+
+ copy(buf[pos:], key)
+ pos += len(key)
+
+ buf[pos] = hashStartSep
+ pos++
+ copy(buf[pos:], field)
+
+ return buf
+}
+
+func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) {
+ if len(ek) < 5 || ek[0] != db.index || ek[1] != HashType {
+ return nil, nil, errHashKey
+ }
+
+ pos := 2
+ keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
+ pos += 2
+
+ if keyLen+5 > len(ek) {
+ return nil, nil, errHashKey
+ }
+
+ key := ek[pos : pos+keyLen]
+ pos += keyLen
+
+ if ek[pos] != hashStartSep {
+ return nil, nil, errHashKey
+ }
+
+ pos++
+ field := ek[pos:]
+ return key, field, nil
+}
+
+func (db *DB) hEncodeStartKey(key []byte) []byte {
+ return db.hEncodeHashKey(key, nil)
+}
+
+func (db *DB) hEncodeStopKey(key []byte) []byte {
+ k := db.hEncodeHashKey(key, nil)
+
+ k[len(k)-1] = hashStopSep
+
+ return k
+}
+
+func (db *DB) hSetItem(key []byte, field []byte, value []byte) (int64, error) {
+ t := db.hashBatch
+
+ ek := db.hEncodeHashKey(key, field)
+
+ var n int64 = 1
+ if v, _ := db.bucket.Get(ek); v != nil {
+ n = 0
+ } else {
+ if _, err := db.hIncrSize(key, 1); err != nil {
+ return 0, err
+ }
+ }
+
+ t.Put(ek, value)
+ return n, nil
+}
+
+// ps : here just focus on deleting the hash data,
+// any other likes expire is ignore.
+func (db *DB) hDelete(t *batch, key []byte) int64 {
+ sk := db.hEncodeSizeKey(key)
+ start := db.hEncodeStartKey(key)
+ stop := db.hEncodeStopKey(key)
+
+ var num int64 = 0
+ it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ t.Delete(it.Key())
+ num++
+ }
+ it.Close()
+
+ t.Delete(sk)
+ return num
+}
+
+func (db *DB) hExpireAt(key []byte, when int64) (int64, error) {
+ t := db.hashBatch
+ t.Lock()
+ defer t.Unlock()
+
+ if hlen, err := db.HLen(key); err != nil || hlen == 0 {
+ return 0, err
+ } else {
+ db.expireAt(t, HashType, key, when)
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ }
+ return 1, nil
+}
+
+func (db *DB) HLen(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ return Int64(db.bucket.Get(db.hEncodeSizeKey(key)))
+}
+
+func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) {
+ if err := checkHashKFSize(key, field); err != nil {
+ return 0, err
+ } else if err := checkValueSize(value); err != nil {
+ return 0, err
+ }
+
+ t := db.hashBatch
+ t.Lock()
+ defer t.Unlock()
+
+ n, err := db.hSetItem(key, field, value)
+ if err != nil {
+ return 0, err
+ }
+
+ //todo add binlog
+
+ err = t.Commit()
+ return n, err
+}
+
+func (db *DB) HGet(key []byte, field []byte) ([]byte, error) {
+ if err := checkHashKFSize(key, field); err != nil {
+ return nil, err
+ }
+
+ return db.bucket.Get(db.hEncodeHashKey(key, field))
+}
+
+func (db *DB) HMset(key []byte, args ...FVPair) error {
+ t := db.hashBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var err error
+ var ek []byte
+ var num int64 = 0
+ for i := 0; i < len(args); i++ {
+ if err := checkHashKFSize(key, args[i].Field); err != nil {
+ return err
+ } else if err := checkValueSize(args[i].Value); err != nil {
+ return err
+ }
+
+ ek = db.hEncodeHashKey(key, args[i].Field)
+
+ if v, err := db.bucket.Get(ek); err != nil {
+ return err
+ } else if v == nil {
+ num++
+ }
+
+ t.Put(ek, args[i].Value)
+ }
+
+ if _, err = db.hIncrSize(key, num); err != nil {
+ return err
+ }
+
+ //todo add binglog
+ err = t.Commit()
+ return err
+}
+
+func (db *DB) HMget(key []byte, args ...[]byte) ([][]byte, error) {
+ var ek []byte
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ r := make([][]byte, len(args))
+ for i := 0; i < len(args); i++ {
+ if err := checkHashKFSize(key, args[i]); err != nil {
+ return nil, err
+ }
+
+ ek = db.hEncodeHashKey(key, args[i])
+
+ r[i] = it.Find(ek)
+ }
+
+ return r, nil
+}
+
+func (db *DB) HDel(key []byte, args ...[]byte) (int64, error) {
+ t := db.hashBatch
+
+ var ek []byte
+ var v []byte
+ var err error
+
+ t.Lock()
+ defer t.Unlock()
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ var num int64 = 0
+ for i := 0; i < len(args); i++ {
+ if err := checkHashKFSize(key, args[i]); err != nil {
+ return 0, err
+ }
+
+ ek = db.hEncodeHashKey(key, args[i])
+
+ v = it.RawFind(ek)
+ if v == nil {
+ continue
+ } else {
+ num++
+ t.Delete(ek)
+ }
+ }
+
+ if _, err = db.hIncrSize(key, -num); err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+
+ return num, err
+}
+
+func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) {
+ t := db.hashBatch
+ sk := db.hEncodeSizeKey(key)
+
+ var err error
+ var size int64 = 0
+ if size, err = Int64(db.bucket.Get(sk)); err != nil {
+ return 0, err
+ } else {
+ size += delta
+ if size <= 0 {
+ size = 0
+ t.Delete(sk)
+ db.rmExpire(t, HashType, key)
+ } else {
+ t.Put(sk, PutInt64(size))
+ }
+ }
+
+ return size, nil
+}
+
+func (db *DB) HIncrBy(key []byte, field []byte, delta int64) (int64, error) {
+ if err := checkHashKFSize(key, field); err != nil {
+ return 0, err
+ }
+
+ t := db.hashBatch
+ var ek []byte
+ var err error
+
+ t.Lock()
+ defer t.Unlock()
+
+ ek = db.hEncodeHashKey(key, field)
+
+ var n int64 = 0
+ if n, err = StrInt64(db.bucket.Get(ek)); err != nil {
+ return 0, err
+ }
+
+ n += delta
+
+ _, err = db.hSetItem(key, field, StrPutInt64(n))
+ if err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+
+ return n, err
+}
+
+func (db *DB) HGetAll(key []byte) ([]FVPair, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ start := db.hEncodeStartKey(key)
+ stop := db.hEncodeStopKey(key)
+
+ v := make([]FVPair, 0, 16)
+
+ it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ _, f, err := db.hDecodeHashKey(it.Key())
+ if err != nil {
+ return nil, err
+ }
+
+ v = append(v, FVPair{Field: f, Value: it.Value()})
+ }
+
+ it.Close()
+
+ return v, nil
+}
+
+func (db *DB) HKeys(key []byte) ([][]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ start := db.hEncodeStartKey(key)
+ stop := db.hEncodeStopKey(key)
+
+ v := make([][]byte, 0, 16)
+
+ it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ _, f, err := db.hDecodeHashKey(it.Key())
+ if err != nil {
+ return nil, err
+ }
+ v = append(v, f)
+ }
+
+ it.Close()
+
+ return v, nil
+}
+
+func (db *DB) HValues(key []byte) ([][]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ start := db.hEncodeStartKey(key)
+ stop := db.hEncodeStopKey(key)
+
+ v := make([][]byte, 0, 16)
+
+ it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ _, _, err := db.hDecodeHashKey(it.Key())
+ if err != nil {
+ return nil, err
+ }
+
+ v = append(v, it.Value())
+ }
+
+ it.Close()
+
+ return v, nil
+}
+
+func (db *DB) HClear(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.hashBatch
+ t.Lock()
+ defer t.Unlock()
+
+ num := db.hDelete(t, key)
+ db.rmExpire(t, HashType, key)
+
+ err := t.Commit()
+ return num, err
+}
+
+func (db *DB) HMclear(keys ...[]byte) (int64, error) {
+ t := db.hashBatch
+ t.Lock()
+ defer t.Unlock()
+
+ for _, key := range keys {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ db.hDelete(t, key)
+ db.rmExpire(t, HashType, key)
+ }
+
+ err := t.Commit()
+ return int64(len(keys)), err
+}
+
+func (db *DB) hFlush() (drop int64, err error) {
+ t := db.hashBatch
+
+ t.Lock()
+ defer t.Unlock()
+
+ return db.flushType(t, HashType)
+}
+
+func (db *DB) HScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ return db.scan(HSizeType, key, count, inclusive, match)
+}
+
+func (db *DB) HExpire(key []byte, duration int64) (int64, error) {
+ if duration <= 0 {
+ return 0, errExpireValue
+ }
+
+ return db.hExpireAt(key, time.Now().Unix()+duration)
+}
+
+func (db *DB) HExpireAt(key []byte, when int64) (int64, error) {
+ if when <= time.Now().Unix() {
+ return 0, errExpireValue
+ }
+
+ return db.hExpireAt(key, when)
+}
+
+func (db *DB) HTTL(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.ttl(HashType, key)
+}
+
+func (db *DB) HPersist(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.hashBatch
+ t.Lock()
+ defer t.Unlock()
+
+ n, err := db.rmExpire(t, HashType, key)
+ if err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return n, err
+}
diff --git a/vendor/github.com/lunny/nodb/t_kv.go b/vendor/github.com/lunny/nodb/t_kv.go
new file mode 100644
index 0000000000..82a12f7027
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_kv.go
@@ -0,0 +1,387 @@
+package nodb
+
+import (
+ "errors"
+ "time"
+)
+
+type KVPair struct {
+ Key []byte
+ Value []byte
+}
+
+var errKVKey = errors.New("invalid encode kv key")
+
+func checkKeySize(key []byte) error {
+ if len(key) > MaxKeySize || len(key) == 0 {
+ return errKeySize
+ }
+ return nil
+}
+
+func checkValueSize(value []byte) error {
+ if len(value) > MaxValueSize {
+ return errValueSize
+ }
+
+ return nil
+}
+
+func (db *DB) encodeKVKey(key []byte) []byte {
+ ek := make([]byte, len(key)+2)
+ ek[0] = db.index
+ ek[1] = KVType
+ copy(ek[2:], key)
+ return ek
+}
+
+func (db *DB) decodeKVKey(ek []byte) ([]byte, error) {
+ if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType {
+ return nil, errKVKey
+ }
+
+ return ek[2:], nil
+}
+
+func (db *DB) encodeKVMinKey() []byte {
+ ek := db.encodeKVKey(nil)
+ return ek
+}
+
+func (db *DB) encodeKVMaxKey() []byte {
+ ek := db.encodeKVKey(nil)
+ ek[len(ek)-1] = KVType + 1
+ return ek
+}
+
+func (db *DB) incr(key []byte, delta int64) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ var err error
+ key = db.encodeKVKey(key)
+
+ t := db.kvBatch
+
+ t.Lock()
+ defer t.Unlock()
+
+ var n int64
+ n, err = StrInt64(db.bucket.Get(key))
+ if err != nil {
+ return 0, err
+ }
+
+ n += delta
+
+ t.Put(key, StrPutInt64(n))
+
+ //todo binlog
+
+ err = t.Commit()
+ return n, err
+}
+
+// ps : here just focus on deleting the key-value data,
+// any other likes expire is ignore.
+func (db *DB) delete(t *batch, key []byte) int64 {
+ key = db.encodeKVKey(key)
+ t.Delete(key)
+ return 1
+}
+
+func (db *DB) setExpireAt(key []byte, when int64) (int64, error) {
+ t := db.kvBatch
+ t.Lock()
+ defer t.Unlock()
+
+ if exist, err := db.Exists(key); err != nil || exist == 0 {
+ return 0, err
+ } else {
+ db.expireAt(t, KVType, key, when)
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ }
+ return 1, nil
+}
+
+func (db *DB) Decr(key []byte) (int64, error) {
+ return db.incr(key, -1)
+}
+
+func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) {
+ return db.incr(key, -decrement)
+}
+
+func (db *DB) Del(keys ...[]byte) (int64, error) {
+ if len(keys) == 0 {
+ return 0, nil
+ }
+
+ codedKeys := make([][]byte, len(keys))
+ for i, k := range keys {
+ codedKeys[i] = db.encodeKVKey(k)
+ }
+
+ t := db.kvBatch
+ t.Lock()
+ defer t.Unlock()
+
+ for i, k := range keys {
+ t.Delete(codedKeys[i])
+ db.rmExpire(t, KVType, k)
+ }
+
+ err := t.Commit()
+ return int64(len(keys)), err
+}
+
+func (db *DB) Exists(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ var err error
+ key = db.encodeKVKey(key)
+
+ var v []byte
+ v, err = db.bucket.Get(key)
+ if v != nil && err == nil {
+ return 1, nil
+ }
+
+ return 0, err
+}
+
+func (db *DB) Get(key []byte) ([]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ key = db.encodeKVKey(key)
+
+ return db.bucket.Get(key)
+}
+
+func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ } else if err := checkValueSize(value); err != nil {
+ return nil, err
+ }
+
+ key = db.encodeKVKey(key)
+
+ t := db.kvBatch
+
+ t.Lock()
+ defer t.Unlock()
+
+ oldValue, err := db.bucket.Get(key)
+ if err != nil {
+ return nil, err
+ }
+
+ t.Put(key, value)
+ //todo, binlog
+
+ err = t.Commit()
+
+ return oldValue, err
+}
+
+func (db *DB) Incr(key []byte) (int64, error) {
+ return db.incr(key, 1)
+}
+
+func (db *DB) IncrBy(key []byte, increment int64) (int64, error) {
+ return db.incr(key, increment)
+}
+
+func (db *DB) MGet(keys ...[]byte) ([][]byte, error) {
+ values := make([][]byte, len(keys))
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ for i := range keys {
+ if err := checkKeySize(keys[i]); err != nil {
+ return nil, err
+ }
+
+ values[i] = it.Find(db.encodeKVKey(keys[i]))
+ }
+
+ return values, nil
+}
+
+func (db *DB) MSet(args ...KVPair) error {
+ if len(args) == 0 {
+ return nil
+ }
+
+ t := db.kvBatch
+
+ var err error
+ var key []byte
+ var value []byte
+
+ t.Lock()
+ defer t.Unlock()
+
+ for i := 0; i < len(args); i++ {
+ if err := checkKeySize(args[i].Key); err != nil {
+ return err
+ } else if err := checkValueSize(args[i].Value); err != nil {
+ return err
+ }
+
+ key = db.encodeKVKey(args[i].Key)
+
+ value = args[i].Value
+
+ t.Put(key, value)
+
+ //todo binlog
+ }
+
+ err = t.Commit()
+ return err
+}
+
+func (db *DB) Set(key []byte, value []byte) error {
+ if err := checkKeySize(key); err != nil {
+ return err
+ } else if err := checkValueSize(value); err != nil {
+ return err
+ }
+
+ var err error
+ key = db.encodeKVKey(key)
+
+ t := db.kvBatch
+
+ t.Lock()
+ defer t.Unlock()
+
+ t.Put(key, value)
+
+ err = t.Commit()
+
+ return err
+}
+
+func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ } else if err := checkValueSize(value); err != nil {
+ return 0, err
+ }
+
+ var err error
+ key = db.encodeKVKey(key)
+
+ var n int64 = 1
+
+ t := db.kvBatch
+
+ t.Lock()
+ defer t.Unlock()
+
+ if v, err := db.bucket.Get(key); err != nil {
+ return 0, err
+ } else if v != nil {
+ n = 0
+ } else {
+ t.Put(key, value)
+
+ //todo binlog
+
+ err = t.Commit()
+ }
+
+ return n, err
+}
+
+func (db *DB) flush() (drop int64, err error) {
+ t := db.kvBatch
+ t.Lock()
+ defer t.Unlock()
+ return db.flushType(t, KVType)
+}
+
+//if inclusive is true, scan range [key, inf) else (key, inf)
+func (db *DB) Scan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ return db.scan(KVType, key, count, inclusive, match)
+}
+
+func (db *DB) Expire(key []byte, duration int64) (int64, error) {
+ if duration <= 0 {
+ return 0, errExpireValue
+ }
+
+ return db.setExpireAt(key, time.Now().Unix()+duration)
+}
+
+func (db *DB) ExpireAt(key []byte, when int64) (int64, error) {
+ if when <= time.Now().Unix() {
+ return 0, errExpireValue
+ }
+
+ return db.setExpireAt(key, when)
+}
+
+func (db *DB) TTL(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.ttl(KVType, key)
+}
+
+func (db *DB) Persist(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.kvBatch
+ t.Lock()
+ defer t.Unlock()
+ n, err := db.rmExpire(t, KVType, key)
+ if err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return n, err
+}
+
+func (db *DB) Lock() {
+ t := db.kvBatch
+ t.Lock()
+}
+
+func (db *DB) Remove(key []byte) bool {
+ if len(key) == 0 {
+ return false
+ }
+ t := db.kvBatch
+ t.Delete(db.encodeKVKey(key))
+ _, err := db.rmExpire(t, KVType, key)
+ if err != nil {
+ return false
+ }
+ return true
+}
+
+func (db *DB) Commit() error {
+ t := db.kvBatch
+ return t.Commit()
+}
+
+func (db *DB) Unlock() {
+ t := db.kvBatch
+ t.Unlock()
+}
diff --git a/vendor/github.com/lunny/nodb/t_list.go b/vendor/github.com/lunny/nodb/t_list.go
new file mode 100644
index 0000000000..5b9d9d9c21
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_list.go
@@ -0,0 +1,492 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "time"
+
+ "github.com/lunny/nodb/store"
+)
+
+const (
+ listHeadSeq int32 = 1
+ listTailSeq int32 = 2
+
+ listMinSeq int32 = 1000
+ listMaxSeq int32 = 1<<31 - 1000
+ listInitialSeq int32 = listMinSeq + (listMaxSeq-listMinSeq)/2
+)
+
+var errLMetaKey = errors.New("invalid lmeta key")
+var errListKey = errors.New("invalid list key")
+var errListSeq = errors.New("invalid list sequence, overflow")
+
+func (db *DB) lEncodeMetaKey(key []byte) []byte {
+ buf := make([]byte, len(key)+2)
+ buf[0] = db.index
+ buf[1] = LMetaType
+
+ copy(buf[2:], key)
+ return buf
+}
+
+func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) {
+ if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType {
+ return nil, errLMetaKey
+ }
+
+ return ek[2:], nil
+}
+
+func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
+ buf := make([]byte, len(key)+8)
+
+ pos := 0
+ buf[pos] = db.index
+ pos++
+ buf[pos] = ListType
+ pos++
+
+ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
+ pos += 2
+
+ copy(buf[pos:], key)
+ pos += len(key)
+
+ binary.BigEndian.PutUint32(buf[pos:], uint32(seq))
+
+ return buf
+}
+
+func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) {
+ if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType {
+ err = errListKey
+ return
+ }
+
+ keyLen := int(binary.BigEndian.Uint16(ek[2:]))
+ if keyLen+8 != len(ek) {
+ err = errListKey
+ return
+ }
+
+ key = ek[4 : 4+keyLen]
+ seq = int32(binary.BigEndian.Uint32(ek[4+keyLen:]))
+ return
+}
+
+func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ var headSeq int32
+ var tailSeq int32
+ var size int32
+ var err error
+
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+
+ metaKey := db.lEncodeMetaKey(key)
+ headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
+ if err != nil {
+ return 0, err
+ }
+
+ var pushCnt int = len(args)
+ if pushCnt == 0 {
+ return int64(size), nil
+ }
+
+ var seq int32 = headSeq
+ var delta int32 = -1
+ if whereSeq == listTailSeq {
+ seq = tailSeq
+ delta = 1
+ }
+
+ // append elements
+ if size > 0 {
+ seq += delta
+ }
+
+ for i := 0; i < pushCnt; i++ {
+ ek := db.lEncodeListKey(key, seq+int32(i)*delta)
+ t.Put(ek, args[i])
+ }
+
+ seq += int32(pushCnt-1) * delta
+ if seq <= listMinSeq || seq >= listMaxSeq {
+ return 0, errListSeq
+ }
+
+ // set meta info
+ if whereSeq == listHeadSeq {
+ headSeq = seq
+ } else {
+ tailSeq = seq
+ }
+
+ db.lSetMeta(metaKey, headSeq, tailSeq)
+
+ err = t.Commit()
+ return int64(size) + int64(pushCnt), err
+}
+
+func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var headSeq int32
+ var tailSeq int32
+ var err error
+
+ metaKey := db.lEncodeMetaKey(key)
+ headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey)
+ if err != nil {
+ return nil, err
+ }
+
+ var value []byte
+
+ var seq int32 = headSeq
+ if whereSeq == listTailSeq {
+ seq = tailSeq
+ }
+
+ itemKey := db.lEncodeListKey(key, seq)
+ value, err = db.bucket.Get(itemKey)
+ if err != nil {
+ return nil, err
+ }
+
+ if whereSeq == listHeadSeq {
+ headSeq += 1
+ } else {
+ tailSeq -= 1
+ }
+
+ t.Delete(itemKey)
+ size := db.lSetMeta(metaKey, headSeq, tailSeq)
+ if size == 0 {
+ db.rmExpire(t, HashType, key)
+ }
+
+ err = t.Commit()
+ return value, err
+}
+
+// ps : here just focus on deleting the list data,
+// any other likes expire is ignore.
+func (db *DB) lDelete(t *batch, key []byte) int64 {
+ mk := db.lEncodeMetaKey(key)
+
+ var headSeq int32
+ var tailSeq int32
+ var err error
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ headSeq, tailSeq, _, err = db.lGetMeta(it, mk)
+ if err != nil {
+ return 0
+ }
+
+ var num int64 = 0
+ startKey := db.lEncodeListKey(key, headSeq)
+ stopKey := db.lEncodeListKey(key, tailSeq)
+
+ rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose})
+ for ; rit.Valid(); rit.Next() {
+ t.Delete(rit.RawKey())
+ num++
+ }
+
+ t.Delete(mk)
+
+ return num
+}
+
+func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
+ var v []byte
+ if it != nil {
+ v = it.Find(ek)
+ } else {
+ v, err = db.bucket.Get(ek)
+ }
+ if err != nil {
+ return
+ } else if v == nil {
+ headSeq = listInitialSeq
+ tailSeq = listInitialSeq
+ size = 0
+ return
+ } else {
+ headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
+ tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
+ size = tailSeq - headSeq + 1
+ }
+ return
+}
+
+func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32) int32 {
+ t := db.listBatch
+
+ var size int32 = tailSeq - headSeq + 1
+ if size < 0 {
+ // todo : log error + panic
+ } else if size == 0 {
+ t.Delete(ek)
+ } else {
+ buf := make([]byte, 8)
+
+ binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
+ binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
+
+ t.Put(ek, buf)
+ }
+
+ return size
+}
+
+func (db *DB) lExpireAt(key []byte, when int64) (int64, error) {
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+
+ if llen, err := db.LLen(key); err != nil || llen == 0 {
+ return 0, err
+ } else {
+ db.expireAt(t, ListType, key, when)
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ }
+ return 1, nil
+}
+
+func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ var seq int32
+ var headSeq int32
+ var tailSeq int32
+ var err error
+
+ metaKey := db.lEncodeMetaKey(key)
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey)
+ if err != nil {
+ return nil, err
+ }
+
+ if index >= 0 {
+ seq = headSeq + index
+ } else {
+ seq = tailSeq + index + 1
+ }
+
+ sk := db.lEncodeListKey(key, seq)
+ v := it.Find(sk)
+
+ return v, nil
+}
+
+func (db *DB) LLen(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ ek := db.lEncodeMetaKey(key)
+ _, _, size, err := db.lGetMeta(nil, ek)
+ return int64(size), err
+}
+
+func (db *DB) LPop(key []byte) ([]byte, error) {
+ return db.lpop(key, listHeadSeq)
+}
+
+func (db *DB) LPush(key []byte, arg1 []byte, args ...[]byte) (int64, error) {
+ var argss = [][]byte{arg1}
+ argss = append(argss, args...)
+ return db.lpush(key, listHeadSeq, argss...)
+}
+
+func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ var headSeq int32
+ var llen int32
+ var err error
+
+ metaKey := db.lEncodeMetaKey(key)
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil {
+ return nil, err
+ }
+
+ if start < 0 {
+ start = llen + start
+ }
+ if stop < 0 {
+ stop = llen + stop
+ }
+ if start < 0 {
+ start = 0
+ }
+
+ if start > stop || start >= llen {
+ return [][]byte{}, nil
+ }
+
+ if stop >= llen {
+ stop = llen - 1
+ }
+
+ limit := (stop - start) + 1
+ headSeq += start
+
+ v := make([][]byte, 0, limit)
+
+ startKey := db.lEncodeListKey(key, headSeq)
+ rit := store.NewRangeLimitIterator(it,
+ &store.Range{
+ Min: startKey,
+ Max: nil,
+ Type: store.RangeClose},
+ &store.Limit{
+ Offset: 0,
+ Count: int(limit)})
+
+ for ; rit.Valid(); rit.Next() {
+ v = append(v, rit.Value())
+ }
+
+ return v, nil
+}
+
+func (db *DB) RPop(key []byte) ([]byte, error) {
+ return db.lpop(key, listTailSeq)
+}
+
+func (db *DB) RPush(key []byte, arg1 []byte, args ...[]byte) (int64, error) {
+ var argss = [][]byte{arg1}
+ argss = append(argss, args...)
+ return db.lpush(key, listTailSeq, argss...)
+}
+
+func (db *DB) LClear(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+
+ num := db.lDelete(t, key)
+ db.rmExpire(t, ListType, key)
+
+ err := t.Commit()
+ return num, err
+}
+
+func (db *DB) LMclear(keys ...[]byte) (int64, error) {
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+
+ for _, key := range keys {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ db.lDelete(t, key)
+ db.rmExpire(t, ListType, key)
+
+ }
+
+ err := t.Commit()
+ return int64(len(keys)), err
+}
+
+func (db *DB) lFlush() (drop int64, err error) {
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+ return db.flushType(t, ListType)
+}
+
+func (db *DB) LExpire(key []byte, duration int64) (int64, error) {
+ if duration <= 0 {
+ return 0, errExpireValue
+ }
+
+ return db.lExpireAt(key, time.Now().Unix()+duration)
+}
+
+func (db *DB) LExpireAt(key []byte, when int64) (int64, error) {
+ if when <= time.Now().Unix() {
+ return 0, errExpireValue
+ }
+
+ return db.lExpireAt(key, when)
+}
+
+func (db *DB) LTTL(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.ttl(ListType, key)
+}
+
+func (db *DB) LPersist(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.listBatch
+ t.Lock()
+ defer t.Unlock()
+
+ n, err := db.rmExpire(t, ListType, key)
+ if err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return n, err
+}
+
+func (db *DB) LScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ return db.scan(LMetaType, key, count, inclusive, match)
+}
+
+func (db *DB) lEncodeMinKey() []byte {
+ return db.lEncodeMetaKey(nil)
+}
+
+func (db *DB) lEncodeMaxKey() []byte {
+ ek := db.lEncodeMetaKey(nil)
+ ek[len(ek)-1] = LMetaType + 1
+ return ek
+}
diff --git a/vendor/github.com/lunny/nodb/t_set.go b/vendor/github.com/lunny/nodb/t_set.go
new file mode 100644
index 0000000000..41ce30e8ce
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_set.go
@@ -0,0 +1,601 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "time"
+
+ "github.com/lunny/nodb/store"
+)
+
+var errSetKey = errors.New("invalid set key")
+var errSSizeKey = errors.New("invalid ssize key")
+
+const (
+ setStartSep byte = ':'
+ setStopSep byte = setStartSep + 1
+ UnionType byte = 51
+ DiffType byte = 52
+ InterType byte = 53
+)
+
+func checkSetKMSize(key []byte, member []byte) error {
+ if len(key) > MaxKeySize || len(key) == 0 {
+ return errKeySize
+ } else if len(member) > MaxSetMemberSize || len(member) == 0 {
+ return errSetMemberSize
+ }
+ return nil
+}
+
+func (db *DB) sEncodeSizeKey(key []byte) []byte {
+ buf := make([]byte, len(key)+2)
+
+ buf[0] = db.index
+ buf[1] = SSizeType
+
+ copy(buf[2:], key)
+ return buf
+}
+
+func (db *DB) sDecodeSizeKey(ek []byte) ([]byte, error) {
+ if len(ek) < 2 || ek[0] != db.index || ek[1] != SSizeType {
+ return nil, errSSizeKey
+ }
+
+ return ek[2:], nil
+}
+
+func (db *DB) sEncodeSetKey(key []byte, member []byte) []byte {
+ buf := make([]byte, len(key)+len(member)+1+1+2+1)
+
+ pos := 0
+ buf[pos] = db.index
+ pos++
+ buf[pos] = SetType
+ pos++
+
+ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
+ pos += 2
+
+ copy(buf[pos:], key)
+ pos += len(key)
+
+ buf[pos] = setStartSep
+ pos++
+ copy(buf[pos:], member)
+
+ return buf
+}
+
+func (db *DB) sDecodeSetKey(ek []byte) ([]byte, []byte, error) {
+ if len(ek) < 5 || ek[0] != db.index || ek[1] != SetType {
+ return nil, nil, errSetKey
+ }
+
+ pos := 2
+ keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
+ pos += 2
+
+ if keyLen+5 > len(ek) {
+ return nil, nil, errSetKey
+ }
+
+ key := ek[pos : pos+keyLen]
+ pos += keyLen
+
+ if ek[pos] != hashStartSep {
+ return nil, nil, errSetKey
+ }
+
+ pos++
+ member := ek[pos:]
+ return key, member, nil
+}
+
+func (db *DB) sEncodeStartKey(key []byte) []byte {
+ return db.sEncodeSetKey(key, nil)
+}
+
+func (db *DB) sEncodeStopKey(key []byte) []byte {
+ k := db.sEncodeSetKey(key, nil)
+
+ k[len(k)-1] = setStopSep
+
+ return k
+}
+
+func (db *DB) sFlush() (drop int64, err error) {
+
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ return db.flushType(t, SetType)
+}
+
+func (db *DB) sDelete(t *batch, key []byte) int64 {
+ sk := db.sEncodeSizeKey(key)
+ start := db.sEncodeStartKey(key)
+ stop := db.sEncodeStopKey(key)
+
+ var num int64 = 0
+ it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ t.Delete(it.RawKey())
+ num++
+ }
+
+ it.Close()
+
+ t.Delete(sk)
+ return num
+}
+
+func (db *DB) sIncrSize(key []byte, delta int64) (int64, error) {
+ t := db.setBatch
+ sk := db.sEncodeSizeKey(key)
+
+ var err error
+ var size int64 = 0
+ if size, err = Int64(db.bucket.Get(sk)); err != nil {
+ return 0, err
+ } else {
+ size += delta
+ if size <= 0 {
+ size = 0
+ t.Delete(sk)
+ db.rmExpire(t, SetType, key)
+ } else {
+ t.Put(sk, PutInt64(size))
+ }
+ }
+
+ return size, nil
+}
+
+func (db *DB) sExpireAt(key []byte, when int64) (int64, error) {
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ if scnt, err := db.SCard(key); err != nil || scnt == 0 {
+ return 0, err
+ } else {
+ db.expireAt(t, SetType, key, when)
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+
+ }
+
+ return 1, nil
+}
+
+func (db *DB) sSetItem(key []byte, member []byte) (int64, error) {
+ t := db.setBatch
+ ek := db.sEncodeSetKey(key, member)
+
+ var n int64 = 1
+ if v, _ := db.bucket.Get(ek); v != nil {
+ n = 0
+ } else {
+ if _, err := db.sIncrSize(key, 1); err != nil {
+ return 0, err
+ }
+ }
+
+ t.Put(ek, nil)
+ return n, nil
+}
+
+func (db *DB) SAdd(key []byte, args ...[]byte) (int64, error) {
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var err error
+ var ek []byte
+ var num int64 = 0
+ for i := 0; i < len(args); i++ {
+ if err := checkSetKMSize(key, args[i]); err != nil {
+ return 0, err
+ }
+
+ ek = db.sEncodeSetKey(key, args[i])
+
+ if v, err := db.bucket.Get(ek); err != nil {
+ return 0, err
+ } else if v == nil {
+ num++
+ }
+
+ t.Put(ek, nil)
+ }
+
+ if _, err = db.sIncrSize(key, num); err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return num, err
+
+}
+
+func (db *DB) SCard(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ sk := db.sEncodeSizeKey(key)
+
+ return Int64(db.bucket.Get(sk))
+}
+
+func (db *DB) sDiffGeneric(keys ...[]byte) ([][]byte, error) {
+ destMap := make(map[string]bool)
+
+ members, err := db.SMembers(keys[0])
+ if err != nil {
+ return nil, err
+ }
+
+ for _, m := range members {
+ destMap[String(m)] = true
+ }
+
+ for _, k := range keys[1:] {
+ members, err := db.SMembers(k)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, m := range members {
+ if _, ok := destMap[String(m)]; !ok {
+ continue
+ } else if ok {
+ delete(destMap, String(m))
+ }
+ }
+ // O - A = O, O is zero set.
+ if len(destMap) == 0 {
+ return nil, nil
+ }
+ }
+
+ slice := make([][]byte, len(destMap))
+ idx := 0
+ for k, v := range destMap {
+ if !v {
+ continue
+ }
+ slice[idx] = []byte(k)
+ idx++
+ }
+
+ return slice, nil
+}
+
+func (db *DB) SDiff(keys ...[]byte) ([][]byte, error) {
+ v, err := db.sDiffGeneric(keys...)
+ return v, err
+}
+
+func (db *DB) SDiffStore(dstKey []byte, keys ...[]byte) (int64, error) {
+ n, err := db.sStoreGeneric(dstKey, DiffType, keys...)
+ return n, err
+}
+
+func (db *DB) sInterGeneric(keys ...[]byte) ([][]byte, error) {
+ destMap := make(map[string]bool)
+
+ members, err := db.SMembers(keys[0])
+ if err != nil {
+ return nil, err
+ }
+
+ for _, m := range members {
+ destMap[String(m)] = true
+ }
+
+ for _, key := range keys[1:] {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ members, err := db.SMembers(key)
+ if err != nil {
+ return nil, err
+ } else if len(members) == 0 {
+ return nil, err
+ }
+
+ tempMap := make(map[string]bool)
+ for _, member := range members {
+ if err := checkKeySize(member); err != nil {
+ return nil, err
+ }
+ if _, ok := destMap[String(member)]; ok {
+ tempMap[String(member)] = true //mark this item as selected
+ }
+ }
+ destMap = tempMap //reduce the size of the result set
+ if len(destMap) == 0 {
+ return nil, nil
+ }
+ }
+
+ slice := make([][]byte, len(destMap))
+ idx := 0
+ for k, v := range destMap {
+ if !v {
+ continue
+ }
+
+ slice[idx] = []byte(k)
+ idx++
+ }
+
+ return slice, nil
+
+}
+
+func (db *DB) SInter(keys ...[]byte) ([][]byte, error) {
+ v, err := db.sInterGeneric(keys...)
+ return v, err
+
+}
+
+func (db *DB) SInterStore(dstKey []byte, keys ...[]byte) (int64, error) {
+ n, err := db.sStoreGeneric(dstKey, InterType, keys...)
+ return n, err
+}
+
+func (db *DB) SIsMember(key []byte, member []byte) (int64, error) {
+ ek := db.sEncodeSetKey(key, member)
+
+ var n int64 = 1
+ if v, err := db.bucket.Get(ek); err != nil {
+ return 0, err
+ } else if v == nil {
+ n = 0
+ }
+ return n, nil
+}
+
+func (db *DB) SMembers(key []byte) ([][]byte, error) {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ start := db.sEncodeStartKey(key)
+ stop := db.sEncodeStopKey(key)
+
+ v := make([][]byte, 0, 16)
+
+ it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ _, m, err := db.sDecodeSetKey(it.Key())
+ if err != nil {
+ return nil, err
+ }
+
+ v = append(v, m)
+ }
+
+ it.Close()
+
+ return v, nil
+}
+
+func (db *DB) SRem(key []byte, args ...[]byte) (int64, error) {
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var ek []byte
+ var v []byte
+ var err error
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ var num int64 = 0
+ for i := 0; i < len(args); i++ {
+ if err := checkSetKMSize(key, args[i]); err != nil {
+ return 0, err
+ }
+
+ ek = db.sEncodeSetKey(key, args[i])
+
+ v = it.RawFind(ek)
+ if v == nil {
+ continue
+ } else {
+ num++
+ t.Delete(ek)
+ }
+ }
+
+ if _, err = db.sIncrSize(key, -num); err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return num, err
+
+}
+
+func (db *DB) sUnionGeneric(keys ...[]byte) ([][]byte, error) {
+ dstMap := make(map[string]bool)
+
+ for _, key := range keys {
+ if err := checkKeySize(key); err != nil {
+ return nil, err
+ }
+
+ members, err := db.SMembers(key)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, member := range members {
+ dstMap[String(member)] = true
+ }
+ }
+
+ slice := make([][]byte, len(dstMap))
+ idx := 0
+ for k, v := range dstMap {
+ if !v {
+ continue
+ }
+ slice[idx] = []byte(k)
+ idx++
+ }
+
+ return slice, nil
+}
+
+func (db *DB) SUnion(keys ...[]byte) ([][]byte, error) {
+ v, err := db.sUnionGeneric(keys...)
+ return v, err
+}
+
+func (db *DB) SUnionStore(dstKey []byte, keys ...[]byte) (int64, error) {
+ n, err := db.sStoreGeneric(dstKey, UnionType, keys...)
+ return n, err
+}
+
+func (db *DB) sStoreGeneric(dstKey []byte, optType byte, keys ...[]byte) (int64, error) {
+ if err := checkKeySize(dstKey); err != nil {
+ return 0, err
+ }
+
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ db.sDelete(t, dstKey)
+
+ var err error
+ var ek []byte
+ var v [][]byte
+
+ switch optType {
+ case UnionType:
+ v, err = db.sUnionGeneric(keys...)
+ case DiffType:
+ v, err = db.sDiffGeneric(keys...)
+ case InterType:
+ v, err = db.sInterGeneric(keys...)
+ }
+
+ if err != nil {
+ return 0, err
+ }
+
+ for _, m := range v {
+ if err := checkSetKMSize(dstKey, m); err != nil {
+ return 0, err
+ }
+
+ ek = db.sEncodeSetKey(dstKey, m)
+
+ if _, err := db.bucket.Get(ek); err != nil {
+ return 0, err
+ }
+
+ t.Put(ek, nil)
+ }
+
+ var num = int64(len(v))
+ sk := db.sEncodeSizeKey(dstKey)
+ t.Put(sk, PutInt64(num))
+
+ if err = t.Commit(); err != nil {
+ return 0, err
+ }
+ return num, nil
+}
+
+func (db *DB) SClear(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ num := db.sDelete(t, key)
+ db.rmExpire(t, SetType, key)
+
+ err := t.Commit()
+ return num, err
+}
+
+func (db *DB) SMclear(keys ...[]byte) (int64, error) {
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ for _, key := range keys {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ db.sDelete(t, key)
+ db.rmExpire(t, SetType, key)
+ }
+
+ err := t.Commit()
+ return int64(len(keys)), err
+}
+
+func (db *DB) SExpire(key []byte, duration int64) (int64, error) {
+ if duration <= 0 {
+ return 0, errExpireValue
+ }
+
+ return db.sExpireAt(key, time.Now().Unix()+duration)
+
+}
+
+func (db *DB) SExpireAt(key []byte, when int64) (int64, error) {
+ if when <= time.Now().Unix() {
+ return 0, errExpireValue
+ }
+
+ return db.sExpireAt(key, when)
+
+}
+
+func (db *DB) STTL(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.ttl(SetType, key)
+}
+
+func (db *DB) SPersist(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.setBatch
+ t.Lock()
+ defer t.Unlock()
+
+ n, err := db.rmExpire(t, SetType, key)
+ if err != nil {
+ return 0, err
+ }
+ err = t.Commit()
+ return n, err
+}
+
+func (db *DB) SScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ return db.scan(SSizeType, key, count, inclusive, match)
+}
diff --git a/vendor/github.com/lunny/nodb/t_ttl.go b/vendor/github.com/lunny/nodb/t_ttl.go
new file mode 100644
index 0000000000..5c3638891c
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_ttl.go
@@ -0,0 +1,195 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "time"
+
+ "github.com/lunny/nodb/store"
+)
+
+var (
+ errExpMetaKey = errors.New("invalid expire meta key")
+ errExpTimeKey = errors.New("invalid expire time key")
+)
+
+type retireCallback func(*batch, []byte) int64
+
+type elimination struct {
+ db *DB
+ exp2Tx []*batch
+ exp2Retire []retireCallback
+}
+
+var errExpType = errors.New("invalid expire type")
+
+func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
+ buf := make([]byte, len(key)+11)
+
+ buf[0] = db.index
+ buf[1] = ExpTimeType
+ buf[2] = dataType
+ pos := 3
+
+ binary.BigEndian.PutUint64(buf[pos:], uint64(when))
+ pos += 8
+
+ copy(buf[pos:], key)
+
+ return buf
+}
+
+func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
+ buf := make([]byte, len(key)+3)
+
+ buf[0] = db.index
+ buf[1] = ExpMetaType
+ buf[2] = dataType
+ pos := 3
+
+ copy(buf[pos:], key)
+
+ return buf
+}
+
+func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
+ if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType {
+ return 0, nil, errExpMetaKey
+ }
+
+ return mk[2], mk[3:], nil
+}
+
+func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
+ if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType {
+ return 0, nil, 0, errExpTimeKey
+ }
+
+ return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil
+}
+
+func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) {
+ db.expireAt(t, dataType, key, time.Now().Unix()+duration)
+}
+
+func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
+ mk := db.expEncodeMetaKey(dataType, key)
+ tk := db.expEncodeTimeKey(dataType, key, when)
+
+ t.Put(tk, mk)
+ t.Put(mk, PutInt64(when))
+}
+
+func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
+ mk := db.expEncodeMetaKey(dataType, key)
+
+ if t, err = Int64(db.bucket.Get(mk)); err != nil || t == 0 {
+ t = -1
+ } else {
+ t -= time.Now().Unix()
+ if t <= 0 {
+ t = -1
+ }
+ // if t == -1 : to remove ????
+ }
+
+ return t, err
+}
+
+func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) {
+ mk := db.expEncodeMetaKey(dataType, key)
+ if v, err := db.bucket.Get(mk); err != nil {
+ return 0, err
+ } else if v == nil {
+ return 0, nil
+ } else if when, err2 := Int64(v, nil); err2 != nil {
+ return 0, err2
+ } else {
+ tk := db.expEncodeTimeKey(dataType, key, when)
+ t.Delete(mk)
+ t.Delete(tk)
+ return 1, nil
+ }
+}
+
+func (db *DB) expFlush(t *batch, dataType byte) (err error) {
+ minKey := make([]byte, 3)
+ minKey[0] = db.index
+ minKey[1] = ExpTimeType
+ minKey[2] = dataType
+
+ maxKey := make([]byte, 3)
+ maxKey[0] = db.index
+ maxKey[1] = ExpMetaType
+ maxKey[2] = dataType + 1
+
+ _, err = db.flushRegion(t, minKey, maxKey)
+ err = t.Commit()
+ return
+}
+
+//////////////////////////////////////////////////////////
+//
+//////////////////////////////////////////////////////////
+
+func newEliminator(db *DB) *elimination {
+ eli := new(elimination)
+ eli.db = db
+ eli.exp2Tx = make([]*batch, maxDataType)
+ eli.exp2Retire = make([]retireCallback, maxDataType)
+ return eli
+}
+
+func (eli *elimination) regRetireContext(dataType byte, t *batch, onRetire retireCallback) {
+
+ // todo .. need to ensure exist - mapExpMetaType[expType]
+
+ eli.exp2Tx[dataType] = t
+ eli.exp2Retire[dataType] = onRetire
+}
+
+// call by outside ... (from *db to another *db)
+func (eli *elimination) active() {
+ now := time.Now().Unix()
+ db := eli.db
+ dbGet := db.bucket.Get
+
+ minKey := db.expEncodeTimeKey(NoneType, nil, 0)
+ maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
+
+ it := db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
+ for ; it.Valid(); it.Next() {
+ tk := it.RawKey()
+ mk := it.RawValue()
+
+ dt, k, _, err := db.expDecodeTimeKey(tk)
+ if err != nil {
+ continue
+ }
+
+ t := eli.exp2Tx[dt]
+ onRetire := eli.exp2Retire[dt]
+ if tk == nil || onRetire == nil {
+ continue
+ }
+
+ t.Lock()
+
+ if exp, err := Int64(dbGet(mk)); err == nil {
+ // check expire again
+ if exp <= now {
+ onRetire(t, k)
+ t.Delete(tk)
+ t.Delete(mk)
+
+ t.Commit()
+ }
+
+ }
+
+ t.Unlock()
+ }
+ it.Close()
+
+ return
+}
diff --git a/vendor/github.com/lunny/nodb/t_zset.go b/vendor/github.com/lunny/nodb/t_zset.go
new file mode 100644
index 0000000000..d0ffb7ccf3
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/t_zset.go
@@ -0,0 +1,943 @@
+package nodb
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "time"
+
+ "github.com/lunny/nodb/store"
+)
+
+const (
+ MinScore int64 = -1<<63 + 1
+ MaxScore int64 = 1<<63 - 1
+ InvalidScore int64 = -1 << 63
+
+ AggregateSum byte = 0
+ AggregateMin byte = 1
+ AggregateMax byte = 2
+)
+
+type ScorePair struct {
+ Score int64
+ Member []byte
+}
+
+var errZSizeKey = errors.New("invalid zsize key")
+var errZSetKey = errors.New("invalid zset key")
+var errZScoreKey = errors.New("invalid zscore key")
+var errScoreOverflow = errors.New("zset score overflow")
+var errInvalidAggregate = errors.New("invalid aggregate")
+var errInvalidWeightNum = errors.New("invalid weight number")
+var errInvalidSrcKeyNum = errors.New("invalid src key number")
+
+const (
+ zsetNScoreSep byte = '<'
+ zsetPScoreSep byte = zsetNScoreSep + 1
+ zsetStopScoreSep byte = zsetPScoreSep + 1
+
+ zsetStartMemSep byte = ':'
+ zsetStopMemSep byte = zsetStartMemSep + 1
+)
+
+func checkZSetKMSize(key []byte, member []byte) error {
+ if len(key) > MaxKeySize || len(key) == 0 {
+ return errKeySize
+ } else if len(member) > MaxZSetMemberSize || len(member) == 0 {
+ return errZSetMemberSize
+ }
+ return nil
+}
+
+func (db *DB) zEncodeSizeKey(key []byte) []byte {
+ buf := make([]byte, len(key)+2)
+ buf[0] = db.index
+ buf[1] = ZSizeType
+
+ copy(buf[2:], key)
+ return buf
+}
+
+func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) {
+ if len(ek) < 2 || ek[0] != db.index || ek[1] != ZSizeType {
+ return nil, errZSizeKey
+ }
+
+ return ek[2:], nil
+}
+
+func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
+ buf := make([]byte, len(key)+len(member)+5)
+
+ pos := 0
+ buf[pos] = db.index
+ pos++
+
+ buf[pos] = ZSetType
+ pos++
+
+ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
+ pos += 2
+
+ copy(buf[pos:], key)
+ pos += len(key)
+
+ buf[pos] = zsetStartMemSep
+ pos++
+
+ copy(buf[pos:], member)
+
+ return buf
+}
+
+func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) {
+ if len(ek) < 5 || ek[0] != db.index || ek[1] != ZSetType {
+ return nil, nil, errZSetKey
+ }
+
+ keyLen := int(binary.BigEndian.Uint16(ek[2:]))
+ if keyLen+5 > len(ek) {
+ return nil, nil, errZSetKey
+ }
+
+ key := ek[4 : 4+keyLen]
+
+ if ek[4+keyLen] != zsetStartMemSep {
+ return nil, nil, errZSetKey
+ }
+
+ member := ek[5+keyLen:]
+ return key, member, nil
+}
+
+func (db *DB) zEncodeStartSetKey(key []byte) []byte {
+ k := db.zEncodeSetKey(key, nil)
+ return k
+}
+
+func (db *DB) zEncodeStopSetKey(key []byte) []byte {
+ k := db.zEncodeSetKey(key, nil)
+ k[len(k)-1] = zsetStartMemSep + 1
+ return k
+}
+
+func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte {
+ buf := make([]byte, len(key)+len(member)+14)
+
+ pos := 0
+ buf[pos] = db.index
+ pos++
+
+ buf[pos] = ZScoreType
+ pos++
+
+ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
+ pos += 2
+
+ copy(buf[pos:], key)
+ pos += len(key)
+
+ if score < 0 {
+ buf[pos] = zsetNScoreSep
+ } else {
+ buf[pos] = zsetPScoreSep
+ }
+
+ pos++
+ binary.BigEndian.PutUint64(buf[pos:], uint64(score))
+ pos += 8
+
+ buf[pos] = zsetStartMemSep
+ pos++
+
+ copy(buf[pos:], member)
+ return buf
+}
+
+func (db *DB) zEncodeStartScoreKey(key []byte, score int64) []byte {
+ return db.zEncodeScoreKey(key, nil, score)
+}
+
+func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte {
+ k := db.zEncodeScoreKey(key, nil, score)
+ k[len(k)-1] = zsetStopMemSep
+ return k
+}
+
+func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) {
+ if len(ek) < 14 || ek[0] != db.index || ek[1] != ZScoreType {
+ err = errZScoreKey
+ return
+ }
+
+ keyLen := int(binary.BigEndian.Uint16(ek[2:]))
+ if keyLen+14 > len(ek) {
+ err = errZScoreKey
+ return
+ }
+
+ key = ek[4 : 4+keyLen]
+ pos := 4 + keyLen
+
+ if (ek[pos] != zsetNScoreSep) && (ek[pos] != zsetPScoreSep) {
+ err = errZScoreKey
+ return
+ }
+ pos++
+
+ score = int64(binary.BigEndian.Uint64(ek[pos:]))
+ pos += 8
+
+ if ek[pos] != zsetStartMemSep {
+ err = errZScoreKey
+ return
+ }
+
+ pos++
+
+ member = ek[pos:]
+ return
+}
+
+func (db *DB) zSetItem(t *batch, key []byte, score int64, member []byte) (int64, error) {
+ if score <= MinScore || score >= MaxScore {
+ return 0, errScoreOverflow
+ }
+
+ var exists int64 = 0
+ ek := db.zEncodeSetKey(key, member)
+
+ if v, err := db.bucket.Get(ek); err != nil {
+ return 0, err
+ } else if v != nil {
+ exists = 1
+
+ if s, err := Int64(v, err); err != nil {
+ return 0, err
+ } else {
+ sk := db.zEncodeScoreKey(key, member, s)
+ t.Delete(sk)
+ }
+ }
+
+ t.Put(ek, PutInt64(score))
+
+ sk := db.zEncodeScoreKey(key, member, score)
+ t.Put(sk, []byte{})
+
+ return exists, nil
+}
+
+func (db *DB) zDelItem(t *batch, key []byte, member []byte, skipDelScore bool) (int64, error) {
+ ek := db.zEncodeSetKey(key, member)
+ if v, err := db.bucket.Get(ek); err != nil {
+ return 0, err
+ } else if v == nil {
+ //not exists
+ return 0, nil
+ } else {
+ //exists
+ if !skipDelScore {
+ //we must del score
+ if s, err := Int64(v, err); err != nil {
+ return 0, err
+ } else {
+ sk := db.zEncodeScoreKey(key, member, s)
+ t.Delete(sk)
+ }
+ }
+ }
+
+ t.Delete(ek)
+
+ return 1, nil
+}
+
+func (db *DB) zDelete(t *batch, key []byte) int64 {
+ delMembCnt, _ := db.zRemRange(t, key, MinScore, MaxScore, 0, -1)
+ // todo : log err
+ return delMembCnt
+}
+
+func (db *DB) zExpireAt(key []byte, when int64) (int64, error) {
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 {
+ return 0, err
+ } else {
+ db.expireAt(t, ZSetType, key, when)
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ }
+ return 1, nil
+}
+
+func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) {
+ if len(args) == 0 {
+ return 0, nil
+ }
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var num int64 = 0
+ for i := 0; i < len(args); i++ {
+ score := args[i].Score
+ member := args[i].Member
+
+ if err := checkZSetKMSize(key, member); err != nil {
+ return 0, err
+ }
+
+ if n, err := db.zSetItem(t, key, score, member); err != nil {
+ return 0, err
+ } else if n == 0 {
+ //add new
+ num++
+ }
+ }
+
+ if _, err := db.zIncrSize(t, key, num); err != nil {
+ return 0, err
+ }
+
+ //todo add binlog
+ err := t.Commit()
+ return num, err
+}
+
+func (db *DB) zIncrSize(t *batch, key []byte, delta int64) (int64, error) {
+ sk := db.zEncodeSizeKey(key)
+
+ size, err := Int64(db.bucket.Get(sk))
+ if err != nil {
+ return 0, err
+ } else {
+ size += delta
+ if size <= 0 {
+ size = 0
+ t.Delete(sk)
+ db.rmExpire(t, ZSetType, key)
+ } else {
+ t.Put(sk, PutInt64(size))
+ }
+ }
+
+ return size, nil
+}
+
+func (db *DB) ZCard(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ sk := db.zEncodeSizeKey(key)
+ return Int64(db.bucket.Get(sk))
+}
+
+func (db *DB) ZScore(key []byte, member []byte) (int64, error) {
+ if err := checkZSetKMSize(key, member); err != nil {
+ return InvalidScore, err
+ }
+
+ var score int64 = InvalidScore
+
+ k := db.zEncodeSetKey(key, member)
+ if v, err := db.bucket.Get(k); err != nil {
+ return InvalidScore, err
+ } else if v == nil {
+ return InvalidScore, ErrScoreMiss
+ } else {
+ if score, err = Int64(v, nil); err != nil {
+ return InvalidScore, err
+ }
+ }
+
+ return score, nil
+}
+
+func (db *DB) ZRem(key []byte, members ...[]byte) (int64, error) {
+ if len(members) == 0 {
+ return 0, nil
+ }
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ var num int64 = 0
+ for i := 0; i < len(members); i++ {
+ if err := checkZSetKMSize(key, members[i]); err != nil {
+ return 0, err
+ }
+
+ if n, err := db.zDelItem(t, key, members[i], false); err != nil {
+ return 0, err
+ } else if n == 1 {
+ num++
+ }
+ }
+
+ if _, err := db.zIncrSize(t, key, -num); err != nil {
+ return 0, err
+ }
+
+ err := t.Commit()
+ return num, err
+}
+
+func (db *DB) ZIncrBy(key []byte, delta int64, member []byte) (int64, error) {
+ if err := checkZSetKMSize(key, member); err != nil {
+ return InvalidScore, err
+ }
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ ek := db.zEncodeSetKey(key, member)
+
+ var oldScore int64 = 0
+ v, err := db.bucket.Get(ek)
+ if err != nil {
+ return InvalidScore, err
+ } else if v == nil {
+ db.zIncrSize(t, key, 1)
+ } else {
+ if oldScore, err = Int64(v, err); err != nil {
+ return InvalidScore, err
+ }
+ }
+
+ newScore := oldScore + delta
+ if newScore >= MaxScore || newScore <= MinScore {
+ return InvalidScore, errScoreOverflow
+ }
+
+ sk := db.zEncodeScoreKey(key, member, newScore)
+ t.Put(sk, []byte{})
+ t.Put(ek, PutInt64(newScore))
+
+ if v != nil {
+ // so as to update score, we must delete the old one
+ oldSk := db.zEncodeScoreKey(key, member, oldScore)
+ t.Delete(oldSk)
+ }
+
+ err = t.Commit()
+ return newScore, err
+}
+
+func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+ minKey := db.zEncodeStartScoreKey(key, min)
+ maxKey := db.zEncodeStopScoreKey(key, max)
+
+ rangeType := store.RangeROpen
+
+ it := db.bucket.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1)
+ var n int64 = 0
+ for ; it.Valid(); it.Next() {
+ n++
+ }
+ it.Close()
+
+ return n, nil
+}
+
+func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
+ if err := checkZSetKMSize(key, member); err != nil {
+ return 0, err
+ }
+
+ k := db.zEncodeSetKey(key, member)
+
+ it := db.bucket.NewIterator()
+ defer it.Close()
+
+ if v := it.Find(k); v == nil {
+ return -1, nil
+ } else {
+ if s, err := Int64(v, nil); err != nil {
+ return 0, err
+ } else {
+ var rit *store.RangeLimitIterator
+
+ sk := db.zEncodeScoreKey(key, member, s)
+
+ if !reverse {
+ minKey := db.zEncodeStartScoreKey(key, MinScore)
+
+ rit = store.NewRangeIterator(it, &store.Range{minKey, sk, store.RangeClose})
+ } else {
+ maxKey := db.zEncodeStopScoreKey(key, MaxScore)
+ rit = store.NewRevRangeIterator(it, &store.Range{sk, maxKey, store.RangeClose})
+ }
+
+ var lastKey []byte = nil
+ var n int64 = 0
+
+ for ; rit.Valid(); rit.Next() {
+ n++
+
+ lastKey = rit.BufKey(lastKey)
+ }
+
+ if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) {
+ n--
+ return n, nil
+ }
+ }
+ }
+
+ return -1, nil
+}
+
+func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator {
+ minKey := db.zEncodeStartScoreKey(key, min)
+ maxKey := db.zEncodeStopScoreKey(key, max)
+
+ if !reverse {
+ return db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
+ } else {
+ return db.bucket.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
+ }
+}
+
+func (db *DB) zRemRange(t *batch, key []byte, min int64, max int64, offset int, count int) (int64, error) {
+ if len(key) > MaxKeySize {
+ return 0, errKeySize
+ }
+
+ it := db.zIterator(key, min, max, offset, count, false)
+ var num int64 = 0
+ for ; it.Valid(); it.Next() {
+ sk := it.RawKey()
+ _, m, _, err := db.zDecodeScoreKey(sk)
+ if err != nil {
+ continue
+ }
+
+ if n, err := db.zDelItem(t, key, m, true); err != nil {
+ return 0, err
+ } else if n == 1 {
+ num++
+ }
+
+ t.Delete(sk)
+ }
+ it.Close()
+
+ if _, err := db.zIncrSize(t, key, -num); err != nil {
+ return 0, err
+ }
+
+ return num, nil
+}
+
+func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, reverse bool) ([]ScorePair, error) {
+ if len(key) > MaxKeySize {
+ return nil, errKeySize
+ }
+
+ if offset < 0 {
+ return []ScorePair{}, nil
+ }
+
+ nv := 64
+ if count > 0 {
+ nv = count
+ }
+
+ v := make([]ScorePair, 0, nv)
+
+ var it *store.RangeLimitIterator
+
+ //if reverse and offset is 0, count < 0, we may use forward iterator then reverse
+ //because store iterator prev is slower than next
+ if !reverse || (offset == 0 && count < 0) {
+ it = db.zIterator(key, min, max, offset, count, false)
+ } else {
+ it = db.zIterator(key, min, max, offset, count, true)
+ }
+
+ for ; it.Valid(); it.Next() {
+ _, m, s, err := db.zDecodeScoreKey(it.Key())
+ //may be we will check key equal?
+ if err != nil {
+ continue
+ }
+
+ v = append(v, ScorePair{Member: m, Score: s})
+ }
+ it.Close()
+
+ if reverse && (offset == 0 && count < 0) {
+ for i, j := 0, len(v)-1; i < j; i, j = i+1, j-1 {
+ v[i], v[j] = v[j], v[i]
+ }
+ }
+
+ return v, nil
+}
+
+func (db *DB) zParseLimit(key []byte, start int, stop int) (offset int, count int, err error) {
+ if start < 0 || stop < 0 {
+ //refer redis implementation
+ var size int64
+ size, err = db.ZCard(key)
+ if err != nil {
+ return
+ }
+
+ llen := int(size)
+
+ if start < 0 {
+ start = llen + start
+ }
+ if stop < 0 {
+ stop = llen + stop
+ }
+
+ if start < 0 {
+ start = 0
+ }
+
+ if start >= llen {
+ offset = -1
+ return
+ }
+ }
+
+ if start > stop {
+ offset = -1
+ return
+ }
+
+ offset = start
+ count = (stop - start) + 1
+ return
+}
+
+func (db *DB) ZClear(key []byte) (int64, error) {
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ rmCnt, err := db.zRemRange(t, key, MinScore, MaxScore, 0, -1)
+ if err == nil {
+ err = t.Commit()
+ }
+
+ return rmCnt, err
+}
+
+func (db *DB) ZMclear(keys ...[]byte) (int64, error) {
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ for _, key := range keys {
+ if _, err := db.zRemRange(t, key, MinScore, MaxScore, 0, -1); err != nil {
+ return 0, err
+ }
+ }
+
+ err := t.Commit()
+
+ return int64(len(keys)), err
+}
+
+func (db *DB) ZRange(key []byte, start int, stop int) ([]ScorePair, error) {
+ return db.ZRangeGeneric(key, start, stop, false)
+}
+
+//min and max must be inclusive
+//if no limit, set offset = 0 and count = -1
+func (db *DB) ZRangeByScore(key []byte, min int64, max int64,
+ offset int, count int) ([]ScorePair, error) {
+ return db.ZRangeByScoreGeneric(key, min, max, offset, count, false)
+}
+
+func (db *DB) ZRank(key []byte, member []byte) (int64, error) {
+ return db.zrank(key, member, false)
+}
+
+func (db *DB) ZRemRangeByRank(key []byte, start int, stop int) (int64, error) {
+ offset, count, err := db.zParseLimit(key, start, stop)
+ if err != nil {
+ return 0, err
+ }
+
+ var rmCnt int64
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ rmCnt, err = db.zRemRange(t, key, MinScore, MaxScore, offset, count)
+ if err == nil {
+ err = t.Commit()
+ }
+
+ return rmCnt, err
+}
+
+//min and max must be inclusive
+func (db *DB) ZRemRangeByScore(key []byte, min int64, max int64) (int64, error) {
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ rmCnt, err := db.zRemRange(t, key, min, max, 0, -1)
+ if err == nil {
+ err = t.Commit()
+ }
+
+ return rmCnt, err
+}
+
+func (db *DB) ZRevRange(key []byte, start int, stop int) ([]ScorePair, error) {
+ return db.ZRangeGeneric(key, start, stop, true)
+}
+
+func (db *DB) ZRevRank(key []byte, member []byte) (int64, error) {
+ return db.zrank(key, member, true)
+}
+
+//min and max must be inclusive
+//if no limit, set offset = 0 and count = -1
+func (db *DB) ZRevRangeByScore(key []byte, min int64, max int64, offset int, count int) ([]ScorePair, error) {
+ return db.ZRangeByScoreGeneric(key, min, max, offset, count, true)
+}
+
+func (db *DB) ZRangeGeneric(key []byte, start int, stop int, reverse bool) ([]ScorePair, error) {
+ offset, count, err := db.zParseLimit(key, start, stop)
+ if err != nil {
+ return nil, err
+ }
+
+ return db.zRange(key, MinScore, MaxScore, offset, count, reverse)
+}
+
+//min and max must be inclusive
+//if no limit, set offset = 0 and count = -1
+func (db *DB) ZRangeByScoreGeneric(key []byte, min int64, max int64,
+ offset int, count int, reverse bool) ([]ScorePair, error) {
+
+ return db.zRange(key, min, max, offset, count, reverse)
+}
+
+func (db *DB) zFlush() (drop int64, err error) {
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+ return db.flushType(t, ZSetType)
+}
+
+func (db *DB) ZExpire(key []byte, duration int64) (int64, error) {
+ if duration <= 0 {
+ return 0, errExpireValue
+ }
+
+ return db.zExpireAt(key, time.Now().Unix()+duration)
+}
+
+func (db *DB) ZExpireAt(key []byte, when int64) (int64, error) {
+ if when <= time.Now().Unix() {
+ return 0, errExpireValue
+ }
+
+ return db.zExpireAt(key, when)
+}
+
+func (db *DB) ZTTL(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return -1, err
+ }
+
+ return db.ttl(ZSetType, key)
+}
+
+func (db *DB) ZPersist(key []byte) (int64, error) {
+ if err := checkKeySize(key); err != nil {
+ return 0, err
+ }
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ n, err := db.rmExpire(t, ZSetType, key)
+ if err != nil {
+ return 0, err
+ }
+
+ err = t.Commit()
+ return n, err
+}
+
+func getAggregateFunc(aggregate byte) func(int64, int64) int64 {
+ switch aggregate {
+ case AggregateSum:
+ return func(a int64, b int64) int64 {
+ return a + b
+ }
+ case AggregateMax:
+ return func(a int64, b int64) int64 {
+ if a > b {
+ return a
+ }
+ return b
+ }
+ case AggregateMin:
+ return func(a int64, b int64) int64 {
+ if a > b {
+ return b
+ }
+ return a
+ }
+ }
+ return nil
+}
+
+func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) {
+
+ var destMap = map[string]int64{}
+ aggregateFunc := getAggregateFunc(aggregate)
+ if aggregateFunc == nil {
+ return 0, errInvalidAggregate
+ }
+ if len(srcKeys) < 1 {
+ return 0, errInvalidSrcKeyNum
+ }
+ if weights != nil {
+ if len(srcKeys) != len(weights) {
+ return 0, errInvalidWeightNum
+ }
+ } else {
+ weights = make([]int64, len(srcKeys))
+ for i := 0; i < len(weights); i++ {
+ weights[i] = 1
+ }
+ }
+
+ for i, key := range srcKeys {
+ scorePairs, err := db.ZRange(key, 0, -1)
+ if err != nil {
+ return 0, err
+ }
+ for _, pair := range scorePairs {
+ if score, ok := destMap[String(pair.Member)]; !ok {
+ destMap[String(pair.Member)] = pair.Score * weights[i]
+ } else {
+ destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i])
+ }
+ }
+ }
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ db.zDelete(t, destKey)
+
+ for member, score := range destMap {
+ if err := checkZSetKMSize(destKey, []byte(member)); err != nil {
+ return 0, err
+ }
+
+ if _, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil {
+ return 0, err
+ }
+ }
+
+ var num = int64(len(destMap))
+ sk := db.zEncodeSizeKey(destKey)
+ t.Put(sk, PutInt64(num))
+
+ //todo add binlog
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ return num, nil
+}
+
+func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) {
+
+ aggregateFunc := getAggregateFunc(aggregate)
+ if aggregateFunc == nil {
+ return 0, errInvalidAggregate
+ }
+ if len(srcKeys) < 1 {
+ return 0, errInvalidSrcKeyNum
+ }
+ if weights != nil {
+ if len(srcKeys) != len(weights) {
+ return 0, errInvalidWeightNum
+ }
+ } else {
+ weights = make([]int64, len(srcKeys))
+ for i := 0; i < len(weights); i++ {
+ weights[i] = 1
+ }
+ }
+
+ var destMap = map[string]int64{}
+ scorePairs, err := db.ZRange(srcKeys[0], 0, -1)
+ if err != nil {
+ return 0, err
+ }
+ for _, pair := range scorePairs {
+ destMap[String(pair.Member)] = pair.Score * weights[0]
+ }
+
+ for i, key := range srcKeys[1:] {
+ scorePairs, err := db.ZRange(key, 0, -1)
+ if err != nil {
+ return 0, err
+ }
+ tmpMap := map[string]int64{}
+ for _, pair := range scorePairs {
+ if score, ok := destMap[String(pair.Member)]; ok {
+ tmpMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i+1])
+ }
+ }
+ destMap = tmpMap
+ }
+
+ t := db.zsetBatch
+ t.Lock()
+ defer t.Unlock()
+
+ db.zDelete(t, destKey)
+
+ for member, score := range destMap {
+ if err := checkZSetKMSize(destKey, []byte(member)); err != nil {
+ return 0, err
+ }
+ if _, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil {
+ return 0, err
+ }
+ }
+
+ var num int64 = int64(len(destMap))
+ sk := db.zEncodeSizeKey(destKey)
+ t.Put(sk, PutInt64(num))
+ //todo add binlog
+ if err := t.Commit(); err != nil {
+ return 0, err
+ }
+ return num, nil
+}
+
+func (db *DB) ZScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
+ return db.scan(ZSizeType, key, count, inclusive, match)
+}
diff --git a/vendor/github.com/lunny/nodb/tx.go b/vendor/github.com/lunny/nodb/tx.go
new file mode 100644
index 0000000000..5ce99db57a
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/tx.go
@@ -0,0 +1,113 @@
+package nodb
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/lunny/nodb/store"
+)
+
+var (
+ ErrNestTx = errors.New("nest transaction not supported")
+ ErrTxDone = errors.New("Transaction has already been committed or rolled back")
+)
+
+type Tx struct {
+ *DB
+
+ tx *store.Tx
+
+ logs [][]byte
+}
+
+func (db *DB) IsTransaction() bool {
+ return db.status == DBInTransaction
+}
+
+// Begin a transaction, it will block all other write operations before calling Commit or Rollback.
+// You must be very careful to prevent long-time transaction.
+func (db *DB) Begin() (*Tx, error) {
+ if db.IsTransaction() {
+ return nil, ErrNestTx
+ }
+
+ tx := new(Tx)
+
+ tx.DB = new(DB)
+ tx.DB.l = db.l
+
+ tx.l.wLock.Lock()
+
+ tx.DB.sdb = db.sdb
+
+ var err error
+ tx.tx, err = db.sdb.Begin()
+ if err != nil {
+ tx.l.wLock.Unlock()
+ return nil, err
+ }
+
+ tx.DB.bucket = tx.tx
+
+ tx.DB.status = DBInTransaction
+
+ tx.DB.index = db.index
+
+ tx.DB.kvBatch = tx.newBatch()
+ tx.DB.listBatch = tx.newBatch()
+ tx.DB.hashBatch = tx.newBatch()
+ tx.DB.zsetBatch = tx.newBatch()
+ tx.DB.binBatch = tx.newBatch()
+ tx.DB.setBatch = tx.newBatch()
+
+ return tx, nil
+}
+
+func (tx *Tx) Commit() error {
+ if tx.tx == nil {
+ return ErrTxDone
+ }
+
+ tx.l.commitLock.Lock()
+ err := tx.tx.Commit()
+ tx.tx = nil
+
+ if len(tx.logs) > 0 {
+ tx.l.binlog.Log(tx.logs...)
+ }
+
+ tx.l.commitLock.Unlock()
+
+ tx.l.wLock.Unlock()
+
+ tx.DB.bucket = nil
+
+ return err
+}
+
+func (tx *Tx) Rollback() error {
+ if tx.tx == nil {
+ return ErrTxDone
+ }
+
+ err := tx.tx.Rollback()
+ tx.tx = nil
+
+ tx.l.wLock.Unlock()
+ tx.DB.bucket = nil
+
+ return err
+}
+
+func (tx *Tx) newBatch() *batch {
+ return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
+}
+
+func (tx *Tx) Select(index int) error {
+ if index < 0 || index >= int(MaxDBNumber) {
+ return fmt.Errorf("invalid db index %d", index)
+ }
+
+ tx.DB.index = uint8(index)
+ return nil
+}
diff --git a/vendor/github.com/lunny/nodb/util.go b/vendor/github.com/lunny/nodb/util.go
new file mode 100644
index 0000000000..d5949a96e6
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/util.go
@@ -0,0 +1,113 @@
+package nodb
+
+import (
+ "encoding/binary"
+ "errors"
+ "reflect"
+ "strconv"
+ "unsafe"
+)
+
+var errIntNumber = errors.New("invalid integer")
+
+// no copy to change slice to string
+// use your own risk
+func String(b []byte) (s string) {
+ pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
+ pstring.Data = pbytes.Data
+ pstring.Len = pbytes.Len
+ return
+}
+
+// no copy to change string to slice
+// use your own risk
+func Slice(s string) (b []byte) {
+ pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
+ pbytes.Data = pstring.Data
+ pbytes.Len = pstring.Len
+ pbytes.Cap = pstring.Len
+ return
+}
+
+func Int64(v []byte, err error) (int64, error) {
+ if err != nil {
+ return 0, err
+ } else if v == nil || len(v) == 0 {
+ return 0, nil
+ } else if len(v) != 8 {
+ return 0, errIntNumber
+ }
+
+ return int64(binary.LittleEndian.Uint64(v)), nil
+}
+
+func PutInt64(v int64) []byte {
+ var b []byte
+ pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ pbytes.Data = uintptr(unsafe.Pointer(&v))
+ pbytes.Len = 8
+ pbytes.Cap = 8
+ return b
+}
+
+func StrInt64(v []byte, err error) (int64, error) {
+ if err != nil {
+ return 0, err
+ } else if v == nil {
+ return 0, nil
+ } else {
+ return strconv.ParseInt(String(v), 10, 64)
+ }
+}
+
+func StrInt32(v []byte, err error) (int32, error) {
+ if err != nil {
+ return 0, err
+ } else if v == nil {
+ return 0, nil
+ } else {
+ res, err := strconv.ParseInt(String(v), 10, 32)
+ return int32(res), err
+ }
+}
+
+func StrInt8(v []byte, err error) (int8, error) {
+ if err != nil {
+ return 0, err
+ } else if v == nil {
+ return 0, nil
+ } else {
+ res, err := strconv.ParseInt(String(v), 10, 8)
+ return int8(res), err
+ }
+}
+
+func StrPutInt64(v int64) []byte {
+ return strconv.AppendInt(nil, v, 10)
+}
+
+func MinUInt32(a uint32, b uint32) uint32 {
+ if a > b {
+ return b
+ } else {
+ return a
+ }
+}
+
+func MaxUInt32(a uint32, b uint32) uint32 {
+ if a > b {
+ return a
+ } else {
+ return b
+ }
+}
+
+func MaxInt32(a int32, b int32) int32 {
+ if a > b {
+ return a
+ } else {
+ return b
+ }
+}