diff options
author | techknowlogick <matti@mdranta.net> | 2019-02-05 11:52:51 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-05 11:52:51 -0500 |
commit | 9de871a0f8911030f8e06a881803cf722b8798ea (patch) | |
tree | 206400f0a5873d7d078fcdd004956036f07a1db5 /vendor/github.com/lunny | |
parent | bf4badad1d68c18d7ffb92c69e09e4e8aa252935 (diff) | |
download | gitea-9de871a0f8911030f8e06a881803cf722b8798ea.tar.gz gitea-9de871a0f8911030f8e06a881803cf722b8798ea.zip |
add other session providers (#5963)
Diffstat (limited to 'vendor/github.com/lunny')
41 files changed, 8075 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/log/LICENSE b/vendor/github.com/lunny/log/LICENSE new file mode 100644 index 0000000000..c9338f8293 --- /dev/null +++ b/vendor/github.com/lunny/log/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2014 - 2016 lunny +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the {organization} nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/lunny/log/dbwriter.go b/vendor/github.com/lunny/log/dbwriter.go new file mode 100644 index 0000000000..e8ff00bd89 --- /dev/null +++ b/vendor/github.com/lunny/log/dbwriter.go @@ -0,0 +1,36 @@ +package log + +import ( + "database/sql" + "time" +) + +type DBWriter struct { + db *sql.DB + stmt *sql.Stmt + content chan []byte +} + +func NewDBWriter(db *sql.DB) (*DBWriter, error) { + _, err := db.Exec("CREATE TABLE IF NOT EXISTS log (id int, content text, created datetime)") + if err != nil { + return nil, err + } + stmt, err := db.Prepare("INSERT INTO log (content, created) values (?, ?)") + if err != nil { + return nil, err + } + return &DBWriter{db, stmt, make(chan []byte, 1000)}, nil +} + +func (w *DBWriter) Write(p []byte) (n int, err error) { + _, err = w.stmt.Exec(string(p), time.Now()) + if err == nil { + n = len(p) + } + return +} + +func (w *DBWriter) Close() { + w.stmt.Close() +} diff --git a/vendor/github.com/lunny/log/filewriter.go b/vendor/github.com/lunny/log/filewriter.go new file mode 100644 index 0000000000..f0bb4d1df1 --- /dev/null +++ b/vendor/github.com/lunny/log/filewriter.go @@ -0,0 +1,112 @@ +package log + +import ( + "io" + "os" + "path/filepath" + "sync" + "time" +) + +var _ io.Writer = &Files{} + +type ByType int + +const ( + ByDay ByType = iota + ByHour + ByMonth +) + +var ( + formats = map[ByType]string{ + ByDay: "2006-01-02", + ByHour: "2006-01-02-15", + ByMonth: "2006-01", + } +) + +func SetFileFormat(t ByType, format string) { + formats[t] = format +} + +func (b ByType) Format() string { + return formats[b] +} + +type Files struct { + FileOptions + f *os.File + lastFormat string + lock sync.Mutex +} + +type FileOptions struct { + Dir string + ByType ByType + Loc *time.Location +} + +func prepareFileOption(opts []FileOptions) FileOptions { + var opt FileOptions + if len(opts) > 0 { + opt = opts[0] + } + if opt.Dir == "" { + opt.Dir = "./" + } + err := os.MkdirAll(opt.Dir, os.ModePerm) + if err != nil { + panic(err.Error()) + } + + if opt.Loc == nil { + opt.Loc = time.Local + } + return opt +} + +func NewFileWriter(opts ...FileOptions) *Files { + opt := prepareFileOption(opts) + return &Files{ + FileOptions: opt, + } +} + +func (f *Files) getFile() (*os.File, error) { + var err error + t := time.Now().In(f.Loc) + if f.f == nil { + f.lastFormat = t.Format(f.ByType.Format()) + f.f, err = os.OpenFile(filepath.Join(f.Dir, f.lastFormat+".log"), + os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + return f.f, err + } + if f.lastFormat != t.Format(f.ByType.Format()) { + f.f.Close() + f.lastFormat = t.Format(f.ByType.Format()) + f.f, err = os.OpenFile(filepath.Join(f.Dir, f.lastFormat+".log"), + os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + return f.f, err + } + return f.f, nil +} + +func (f *Files) Write(bs []byte) (int, error) { + f.lock.Lock() + defer f.lock.Unlock() + + w, err := f.getFile() + if err != nil { + return 0, err + } + return w.Write(bs) +} + +func (f *Files) Close() { + if f.f != nil { + f.f.Close() + f.f = nil + } + f.lastFormat = "" +} diff --git a/vendor/github.com/lunny/log/logext.go b/vendor/github.com/lunny/log/logext.go new file mode 100644 index 0000000000..215c45f309 --- /dev/null +++ b/vendor/github.com/lunny/log/logext.go @@ -0,0 +1,595 @@ +package log + +import ( + "bytes" + "fmt" + "io" + "os" + "runtime" + "strings" + "sync" + "time" +) + +// These flags define which text to prefix to each log entry generated by the Logger. +const ( + // Bits or'ed together to control what's printed. There is no control over the + // order they appear (the order listed here) or the format they present (as + // described in the comments). A colon appears after these items: + // 2009/0123 01:23:23.123123 /a/b/c/d.go:23: message + Ldate = 1 << iota // the date: 2009/0123 + Ltime // the time: 01:23:23 + Lmicroseconds // microsecond resolution: 01:23:23.123123. assumes Ltime. + Llongfile // full file name and line number: /a/b/c/d.go:23 + Lshortfile // final file name element and line number: d.go:23. overrides Llongfile + Lmodule // module name + Llevel // level: 0(Debug), 1(Info), 2(Warn), 3(Error), 4(Panic), 5(Fatal) + Llongcolor // color will start [info] end of line + Lshortcolor // color only include [info] + LstdFlags = Ldate | Ltime // initial values for the standard logger + //Ldefault = Llevel | LstdFlags | Lshortfile | Llongcolor +) // [prefix][time][level][module][shortfile|longfile] + +func Ldefault() int { + if runtime.GOOS == "windows" { + return Llevel | LstdFlags | Lshortfile + } + return Llevel | LstdFlags | Lshortfile | Llongcolor +} + +func Version() string { + return "0.2.0.1121" +} + +const ( + Lall = iota +) +const ( + Ldebug = iota + Linfo + Lwarn + Lerror + Lpanic + Lfatal + Lnone +) + +const ( + ForeBlack = iota + 30 //30 + ForeRed //31 + ForeGreen //32 + ForeYellow //33 + ForeBlue //34 + ForePurple //35 + ForeCyan //36 + ForeWhite //37 +) + +const ( + BackBlack = iota + 40 //40 + BackRed //41 + BackGreen //42 + BackYellow //43 + BackBlue //44 + BackPurple //45 + BackCyan //46 + BackWhite //47 +) + +var levels = []string{ + "[Debug]", + "[Info]", + "[Warn]", + "[Error]", + "[Panic]", + "[Fatal]", +} + +// MUST called before all logs +func SetLevels(lvs []string) { + levels = lvs +} + +var colors = []int{ + ForeCyan, + ForeGreen, + ForeYellow, + ForeRed, + ForePurple, + ForeBlue, +} + +// MUST called before all logs +func SetColors(cls []int) { + colors = cls +} + +// A Logger represents an active logging object that generates lines of +// output to an io.Writer. Each logging operation makes a single call to +// the Writer's Write method. A Logger can be used simultaneously from +// multiple goroutines; it guarantees to serialize access to the Writer. +type Logger struct { + mu sync.Mutex // ensures atomic writes; protects the following fields + prefix string // prefix to write at beginning of each line + flag int // properties + Level int + out io.Writer // destination for output + buf bytes.Buffer // for accumulating text to write + levelStats [6]int64 + loc *time.Location +} + +// New creates a new Logger. The out variable sets the +// destination to which log data will be written. +// The prefix appears at the beginning of each generated log line. +// The flag argument defines the logging properties. +func New(out io.Writer, prefix string, flag int) *Logger { + l := &Logger{out: out, prefix: prefix, Level: 1, flag: flag, loc: time.Local} + if out != os.Stdout { + l.flag = RmColorFlags(l.flag) + } + return l +} + +var Std = New(os.Stderr, "", Ldefault()) + +// Cheap integer to fixed-width decimal ASCII. Give a negative width to avoid zero-padding. +// Knows the buffer has capacity. +func itoa(buf *bytes.Buffer, i int, wid int) { + var u uint = uint(i) + if u == 0 && wid <= 1 { + buf.WriteByte('0') + return + } + + // Assemble decimal in reverse order. + var b [32]byte + bp := len(b) + for ; u > 0 || wid > 0; u /= 10 { + bp-- + wid-- + b[bp] = byte(u%10) + '0' + } + + // avoid slicing b to avoid an allocation. + for bp < len(b) { + buf.WriteByte(b[bp]) + bp++ + } +} + +func moduleOf(file string) string { + pos := strings.LastIndex(file, "/") + if pos != -1 { + pos1 := strings.LastIndex(file[:pos], "/src/") + if pos1 != -1 { + return file[pos1+5 : pos] + } + } + return "UNKNOWN" +} + +func (l *Logger) formatHeader(buf *bytes.Buffer, t time.Time, + file string, line int, lvl int, reqId string) { + if l.prefix != "" { + buf.WriteString(l.prefix) + } + if l.flag&(Ldate|Ltime|Lmicroseconds) != 0 { + if l.flag&Ldate != 0 { + year, month, day := t.Date() + itoa(buf, year, 4) + buf.WriteByte('/') + itoa(buf, int(month), 2) + buf.WriteByte('/') + itoa(buf, day, 2) + buf.WriteByte(' ') + } + if l.flag&(Ltime|Lmicroseconds) != 0 { + hour, min, sec := t.Clock() + itoa(buf, hour, 2) + buf.WriteByte(':') + itoa(buf, min, 2) + buf.WriteByte(':') + itoa(buf, sec, 2) + if l.flag&Lmicroseconds != 0 { + buf.WriteByte('.') + itoa(buf, t.Nanosecond()/1e3, 6) + } + buf.WriteByte(' ') + } + } + if reqId != "" { + buf.WriteByte('[') + buf.WriteString(reqId) + buf.WriteByte(']') + buf.WriteByte(' ') + } + + if l.flag&(Lshortcolor|Llongcolor) != 0 { + buf.WriteString(fmt.Sprintf("\033[1;%dm", colors[lvl])) + } + if l.flag&Llevel != 0 { + buf.WriteString(levels[lvl]) + buf.WriteByte(' ') + } + if l.flag&Lshortcolor != 0 { + buf.WriteString("\033[0m") + } + + if l.flag&Lmodule != 0 { + buf.WriteByte('[') + buf.WriteString(moduleOf(file)) + buf.WriteByte(']') + buf.WriteByte(' ') + } + if l.flag&(Lshortfile|Llongfile) != 0 { + if l.flag&Lshortfile != 0 { + short := file + for i := len(file) - 1; i > 0; i-- { + if file[i] == '/' { + short = file[i+1:] + break + } + } + file = short + } + buf.WriteString(file) + buf.WriteByte(':') + itoa(buf, line, -1) + buf.WriteByte(' ') + } +} + +// Output writes the output for a logging event. The string s contains +// the text to print after the prefix specified by the flags of the +// Logger. A newline is appended if the last character of s is not +// already a newline. Calldepth is used to recover the PC and is +// provided for generality, although at the moment on all pre-defined +// paths it will be 2. +func (l *Logger) Output(reqId string, lvl int, calldepth int, s string) error { + if lvl < l.Level { + return nil + } + now := time.Now().In(l.loc) // get this early. + var file string + var line int + l.mu.Lock() + defer l.mu.Unlock() + if l.flag&(Lshortfile|Llongfile|Lmodule) != 0 { + // release lock while getting caller info - it's expensive. + l.mu.Unlock() + var ok bool + _, file, line, ok = runtime.Caller(calldepth) + if !ok { + file = "???" + line = 0 + } + l.mu.Lock() + } + l.levelStats[lvl]++ + l.buf.Reset() + l.formatHeader(&l.buf, now, file, line, lvl, reqId) + l.buf.WriteString(s) + if l.flag&Llongcolor != 0 { + l.buf.WriteString("\033[0m") + } + if len(s) > 0 && s[len(s)-1] != '\n' { + l.buf.WriteByte('\n') + } + _, err := l.out.Write(l.buf.Bytes()) + return err +} + +// ----------------------------------------- + +// Printf calls l.Output to print to the logger. +// Arguments are handled in the manner of fmt.Printf. +func (l *Logger) Printf(format string, v ...interface{}) { + l.Output("", Linfo, 2, fmt.Sprintf(format, v...)) +} + +// Print calls l.Output to print to the logger. +// Arguments are handled in the manner of fmt.Print. +func (l *Logger) Print(v ...interface{}) { + l.Output("", Linfo, 2, fmt.Sprint(v...)) +} + +// Println calls l.Output to print to the logger. +// Arguments are handled in the manner of fmt.Println. +func (l *Logger) Println(v ...interface{}) { + l.Output("", Linfo, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func (l *Logger) Debugf(format string, v ...interface{}) { + l.Output("", Ldebug, 2, fmt.Sprintf(format, v...)) +} + +func (l *Logger) Debug(v ...interface{}) { + l.Output("", Ldebug, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- +func (l *Logger) Infof(format string, v ...interface{}) { + l.Output("", Linfo, 2, fmt.Sprintf(format, v...)) +} + +func (l *Logger) Info(v ...interface{}) { + l.Output("", Linfo, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- +func (l *Logger) Warnf(format string, v ...interface{}) { + l.Output("", Lwarn, 2, fmt.Sprintf(format, v...)) +} + +func (l *Logger) Warn(v ...interface{}) { + l.Output("", Lwarn, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func (l *Logger) Errorf(format string, v ...interface{}) { + l.Output("", Lerror, 2, fmt.Sprintf(format, v...)) +} + +func (l *Logger) Error(v ...interface{}) { + l.Output("", Lerror, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func (l *Logger) Fatal(v ...interface{}) { + l.Output("", Lfatal, 2, fmt.Sprintln(v...)) + os.Exit(1) +} + +// Fatalf is equivalent to l.Printf() followed by a call to os.Exit(1). +func (l *Logger) Fatalf(format string, v ...interface{}) { + l.Output("", Lfatal, 2, fmt.Sprintf(format, v...)) + os.Exit(1) +} + +// ----------------------------------------- +// Panic is equivalent to l.Print() followed by a call to panic(). +func (l *Logger) Panic(v ...interface{}) { + s := fmt.Sprintln(v...) + l.Output("", Lpanic, 2, s) + panic(s) +} + +// Panicf is equivalent to l.Printf() followed by a call to panic(). +func (l *Logger) Panicf(format string, v ...interface{}) { + s := fmt.Sprintf(format, v...) + l.Output("", Lpanic, 2, s) + panic(s) +} + +// ----------------------------------------- +func (l *Logger) Stack(v ...interface{}) { + s := fmt.Sprint(v...) + s += "\n" + buf := make([]byte, 1024*1024) + n := runtime.Stack(buf, true) + s += string(buf[:n]) + s += "\n" + l.Output("", Lerror, 2, s) +} + +// ----------------------------------------- +func (l *Logger) Stat() (stats []int64) { + l.mu.Lock() + v := l.levelStats + l.mu.Unlock() + return v[:] +} + +// Flags returns the output flags for the logger. +func (l *Logger) Flags() int { + l.mu.Lock() + defer l.mu.Unlock() + return l.flag +} + +func RmColorFlags(flag int) int { + // for un std out, it should not show color since almost them don't support + if flag&Llongcolor != 0 { + flag = flag ^ Llongcolor + } + if flag&Lshortcolor != 0 { + flag = flag ^ Lshortcolor + } + return flag +} + +func (l *Logger) Location() *time.Location { + return l.loc +} + +func (l *Logger) SetLocation(loc *time.Location) { + l.loc = loc +} + +// SetFlags sets the output flags for the logger. +func (l *Logger) SetFlags(flag int) { + l.mu.Lock() + defer l.mu.Unlock() + if l.out != os.Stdout { + flag = RmColorFlags(flag) + } + l.flag = flag +} + +// Prefix returns the output prefix for the logger. +func (l *Logger) Prefix() string { + l.mu.Lock() + defer l.mu.Unlock() + return l.prefix +} + +// SetPrefix sets the output prefix for the logger. +func (l *Logger) SetPrefix(prefix string) { + l.mu.Lock() + defer l.mu.Unlock() + l.prefix = prefix +} + +// SetOutputLevel sets the output level for the logger. +func (l *Logger) SetOutputLevel(lvl int) { + l.mu.Lock() + defer l.mu.Unlock() + l.Level = lvl +} + +func (l *Logger) OutputLevel() int { + return l.Level +} + +func (l *Logger) SetOutput(w io.Writer) { + l.mu.Lock() + defer l.mu.Unlock() + l.out = w + if w != os.Stdout { + l.flag = RmColorFlags(l.flag) + } +} + +// SetOutput sets the output destination for the standard logger. +func SetOutput(w io.Writer) { + Std.SetOutput(w) +} + +func SetLocation(loc *time.Location) { + Std.SetLocation(loc) +} + +func Location() *time.Location { + return Std.Location() +} + +// Flags returns the output flags for the standard logger. +func Flags() int { + return Std.Flags() +} + +// SetFlags sets the output flags for the standard logger. +func SetFlags(flag int) { + Std.SetFlags(flag) +} + +// Prefix returns the output prefix for the standard logger. +func Prefix() string { + return Std.Prefix() +} + +// SetPrefix sets the output prefix for the standard logger. +func SetPrefix(prefix string) { + Std.SetPrefix(prefix) +} + +func SetOutputLevel(lvl int) { + Std.SetOutputLevel(lvl) +} + +func OutputLevel() int { + return Std.OutputLevel() +} + +// ----------------------------------------- + +// Print calls Output to print to the standard logger. +// Arguments are handled in the manner of fmt.Print. +func Print(v ...interface{}) { + Std.Output("", Linfo, 2, fmt.Sprintln(v...)) +} + +// Printf calls Output to print to the standard logger. +// Arguments are handled in the manner of fmt.Printf. +func Printf(format string, v ...interface{}) { + Std.Output("", Linfo, 2, fmt.Sprintf(format, v...)) +} + +// Println calls Output to print to the standard logger. +// Arguments are handled in the manner of fmt.Println. +func Println(v ...interface{}) { + Std.Output("", Linfo, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func Debugf(format string, v ...interface{}) { + Std.Output("", Ldebug, 2, fmt.Sprintf(format, v...)) +} + +func Debug(v ...interface{}) { + Std.Output("", Ldebug, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func Infof(format string, v ...interface{}) { + Std.Output("", Linfo, 2, fmt.Sprintf(format, v...)) +} + +func Info(v ...interface{}) { + Std.Output("", Linfo, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func Warnf(format string, v ...interface{}) { + Std.Output("", Lwarn, 2, fmt.Sprintf(format, v...)) +} + +func Warn(v ...interface{}) { + Std.Output("", Lwarn, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +func Errorf(format string, v ...interface{}) { + Std.Output("", Lerror, 2, fmt.Sprintf(format, v...)) +} + +func Error(v ...interface{}) { + Std.Output("", Lerror, 2, fmt.Sprintln(v...)) +} + +// ----------------------------------------- + +// Fatal is equivalent to Print() followed by a call to os.Exit(1). +func Fatal(v ...interface{}) { + Std.Output("", Lfatal, 2, fmt.Sprintln(v...)) +} + +// Fatalf is equivalent to Printf() followed by a call to os.Exit(1). +func Fatalf(format string, v ...interface{}) { + Std.Output("", Lfatal, 2, fmt.Sprintf(format, v...)) +} + +// ----------------------------------------- + +// Panic is equivalent to Print() followed by a call to panic(). +func Panic(v ...interface{}) { + Std.Output("", Lpanic, 2, fmt.Sprintln(v...)) +} + +// Panicf is equivalent to Printf() followed by a call to panic(). +func Panicf(format string, v ...interface{}) { + Std.Output("", Lpanic, 2, fmt.Sprintf(format, v...)) +} + +// ----------------------------------------- + +func Stack(v ...interface{}) { + s := fmt.Sprint(v...) + s += "\n" + buf := make([]byte, 1024*1024) + n := runtime.Stack(buf, true) + s += string(buf[:n]) + s += "\n" + Std.Output("", Lerror, 2, s) +} + +// ----------------------------------------- diff --git a/vendor/github.com/lunny/nodb/LICENSE b/vendor/github.com/lunny/nodb/LICENSE new file mode 100644 index 0000000000..7ece9fdf5a --- /dev/null +++ b/vendor/github.com/lunny/nodb/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 siddontang + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/vendor/github.com/lunny/nodb/batch.go b/vendor/github.com/lunny/nodb/batch.go new file mode 100644 index 0000000000..e69d96a122 --- /dev/null +++ b/vendor/github.com/lunny/nodb/batch.go @@ -0,0 +1,106 @@ +package nodb + +import ( + "sync" + + "github.com/lunny/nodb/store" +) + +type batch struct { + l *Nodb + + store.WriteBatch + + sync.Locker + + logs [][]byte + + tx *Tx +} + +func (b *batch) Commit() error { + b.l.commitLock.Lock() + defer b.l.commitLock.Unlock() + + err := b.WriteBatch.Commit() + + if b.l.binlog != nil { + if err == nil { + if b.tx == nil { + b.l.binlog.Log(b.logs...) + } else { + b.tx.logs = append(b.tx.logs, b.logs...) + } + } + b.logs = [][]byte{} + } + + return err +} + +func (b *batch) Lock() { + b.Locker.Lock() +} + +func (b *batch) Unlock() { + if b.l.binlog != nil { + b.logs = [][]byte{} + } + b.WriteBatch.Rollback() + b.Locker.Unlock() +} + +func (b *batch) Put(key []byte, value []byte) { + if b.l.binlog != nil { + buf := encodeBinLogPut(key, value) + b.logs = append(b.logs, buf) + } + b.WriteBatch.Put(key, value) +} + +func (b *batch) Delete(key []byte) { + if b.l.binlog != nil { + buf := encodeBinLogDelete(key) + b.logs = append(b.logs, buf) + } + b.WriteBatch.Delete(key) +} + +type dbBatchLocker struct { + l *sync.Mutex + wrLock *sync.RWMutex +} + +func (l *dbBatchLocker) Lock() { + l.wrLock.RLock() + l.l.Lock() +} + +func (l *dbBatchLocker) Unlock() { + l.l.Unlock() + l.wrLock.RUnlock() +} + +type txBatchLocker struct { +} + +func (l *txBatchLocker) Lock() {} +func (l *txBatchLocker) Unlock() {} + +type multiBatchLocker struct { +} + +func (l *multiBatchLocker) Lock() {} +func (l *multiBatchLocker) Unlock() {} + +func (l *Nodb) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch { + b := new(batch) + b.l = l + b.WriteBatch = wb + + b.tx = tx + b.Locker = locker + + b.logs = [][]byte{} + return b +} diff --git a/vendor/github.com/lunny/nodb/binlog.go b/vendor/github.com/lunny/nodb/binlog.go new file mode 100644 index 0000000000..4c094d9463 --- /dev/null +++ b/vendor/github.com/lunny/nodb/binlog.go @@ -0,0 +1,391 @@ +package nodb + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/lunny/log" + "github.com/lunny/nodb/config" +) + +type BinLogHead struct { + CreateTime uint32 + BatchId uint32 + PayloadLen uint32 +} + +func (h *BinLogHead) Len() int { + return 12 +} + +func (h *BinLogHead) Write(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil { + return err + } + + return nil +} + +func (h *BinLogHead) handleReadError(err error) error { + if err == io.EOF { + return io.ErrUnexpectedEOF + } else { + return err + } +} + +func (h *BinLogHead) Read(r io.Reader) error { + var err error + if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil { + return err + } + + if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil { + return h.handleReadError(err) + } + + if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil { + return h.handleReadError(err) + } + + return nil +} + +func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool { + if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId { + return true + } else { + return false + } +} + +/* +index file format: +ledis-bin.00001 +ledis-bin.00002 +ledis-bin.00003 + +log file format + +Log: Head|PayloadData + +Head: createTime|batchId|payloadData + +*/ + +type BinLog struct { + sync.Mutex + + path string + + cfg *config.BinLogConfig + + logFile *os.File + + logWb *bufio.Writer + + indexName string + logNames []string + lastLogIndex int64 + + batchId uint32 + + ch chan struct{} +} + +func NewBinLog(cfg *config.Config) (*BinLog, error) { + l := new(BinLog) + + l.cfg = &cfg.BinLog + l.cfg.Adjust() + + l.path = path.Join(cfg.DataDir, "binlog") + + if err := os.MkdirAll(l.path, os.ModePerm); err != nil { + return nil, err + } + + l.logNames = make([]string, 0, 16) + + l.ch = make(chan struct{}) + + if err := l.loadIndex(); err != nil { + return nil, err + } + + return l, nil +} + +func (l *BinLog) flushIndex() error { + data := strings.Join(l.logNames, "\n") + + bakName := fmt.Sprintf("%s.bak", l.indexName) + f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + log.Error("create binlog bak index error %s", err.Error()) + return err + } + + if _, err := f.WriteString(data); err != nil { + log.Error("write binlog index error %s", err.Error()) + f.Close() + return err + } + + f.Close() + + if err := os.Rename(bakName, l.indexName); err != nil { + log.Error("rename binlog bak index error %s", err.Error()) + return err + } + + return nil +} + +func (l *BinLog) loadIndex() error { + l.indexName = path.Join(l.path, fmt.Sprintf("ledis-bin.index")) + if _, err := os.Stat(l.indexName); os.IsNotExist(err) { + //no index file, nothing to do + } else { + indexData, err := ioutil.ReadFile(l.indexName) + if err != nil { + return err + } + + lines := strings.Split(string(indexData), "\n") + for _, line := range lines { + line = strings.Trim(line, "\r\n ") + if len(line) == 0 { + continue + } + + if _, err := os.Stat(path.Join(l.path, line)); err != nil { + log.Error("load index line %s error %s", line, err.Error()) + return err + } else { + l.logNames = append(l.logNames, line) + } + } + } + if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum { + //remove oldest logfile + if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil { + return err + } + } + + var err error + if len(l.logNames) == 0 { + l.lastLogIndex = 1 + } else { + lastName := l.logNames[len(l.logNames)-1] + + if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { + log.Error("invalid logfile name %s", err.Error()) + return err + } + + //like mysql, if server restart, a new binlog will create + l.lastLogIndex++ + } + + return nil +} + +func (l *BinLog) getLogFile() string { + return l.FormatLogFileName(l.lastLogIndex) +} + +func (l *BinLog) openNewLogFile() error { + var err error + lastName := l.getLogFile() + + logPath := path.Join(l.path, lastName) + if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil { + log.Error("open new logfile error %s", err.Error()) + return err + } + + if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum { + l.purge(1) + } + + l.logNames = append(l.logNames, lastName) + + if l.logWb == nil { + l.logWb = bufio.NewWriterSize(l.logFile, 1024) + } else { + l.logWb.Reset(l.logFile) + } + + if err = l.flushIndex(); err != nil { + return err + } + + return nil +} + +func (l *BinLog) checkLogFileSize() bool { + if l.logFile == nil { + return false + } + + st, _ := l.logFile.Stat() + if st.Size() >= int64(l.cfg.MaxFileSize) { + l.closeLog() + return true + } + + return false +} + +func (l *BinLog) closeLog() { + l.lastLogIndex++ + + l.logFile.Close() + l.logFile = nil +} + +func (l *BinLog) purge(n int) { + for i := 0; i < n; i++ { + logPath := path.Join(l.path, l.logNames[i]) + os.Remove(logPath) + } + + copy(l.logNames[0:], l.logNames[n:]) + l.logNames = l.logNames[0 : len(l.logNames)-n] +} + +func (l *BinLog) Close() { + if l.logFile != nil { + l.logFile.Close() + l.logFile = nil + } +} + +func (l *BinLog) LogNames() []string { + return l.logNames +} + +func (l *BinLog) LogFileName() string { + return l.getLogFile() +} + +func (l *BinLog) LogFilePos() int64 { + if l.logFile == nil { + return 0 + } else { + st, _ := l.logFile.Stat() + return st.Size() + } +} + +func (l *BinLog) LogFileIndex() int64 { + return l.lastLogIndex +} + +func (l *BinLog) FormatLogFileName(index int64) string { + return fmt.Sprintf("ledis-bin.%07d", index) +} + +func (l *BinLog) FormatLogFilePath(index int64) string { + return path.Join(l.path, l.FormatLogFileName(index)) +} + +func (l *BinLog) LogPath() string { + return l.path +} + +func (l *BinLog) Purge(n int) error { + l.Lock() + defer l.Unlock() + + if len(l.logNames) == 0 { + return nil + } + + if n >= len(l.logNames) { + n = len(l.logNames) + //can not purge current log file + if l.logNames[n-1] == l.getLogFile() { + n = n - 1 + } + } + + l.purge(n) + + return l.flushIndex() +} + +func (l *BinLog) PurgeAll() error { + l.Lock() + defer l.Unlock() + + l.closeLog() + return l.openNewLogFile() +} + +func (l *BinLog) Log(args ...[]byte) error { + l.Lock() + defer l.Unlock() + + var err error + + if l.logFile == nil { + if err = l.openNewLogFile(); err != nil { + return err + } + } + + head := &BinLogHead{} + + head.CreateTime = uint32(time.Now().Unix()) + head.BatchId = l.batchId + + l.batchId++ + + for _, data := range args { + head.PayloadLen = uint32(len(data)) + + if err := head.Write(l.logWb); err != nil { + return err + } + + if _, err := l.logWb.Write(data); err != nil { + return err + } + } + + if err = l.logWb.Flush(); err != nil { + log.Error("write log error %s", err.Error()) + return err + } + + l.checkLogFileSize() + + close(l.ch) + l.ch = make(chan struct{}) + + return nil +} + +func (l *BinLog) Wait() <-chan struct{} { + return l.ch +} diff --git a/vendor/github.com/lunny/nodb/binlog_util.go b/vendor/github.com/lunny/nodb/binlog_util.go new file mode 100644 index 0000000000..22124dda07 --- /dev/null +++ b/vendor/github.com/lunny/nodb/binlog_util.go @@ -0,0 +1,215 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "fmt" + "strconv" +) + +var ( + errBinLogDeleteType = errors.New("invalid bin log delete type") + errBinLogPutType = errors.New("invalid bin log put type") + errBinLogCommandType = errors.New("invalid bin log command type") +) + +func encodeBinLogDelete(key []byte) []byte { + buf := make([]byte, 1+len(key)) + buf[0] = BinLogTypeDeletion + copy(buf[1:], key) + return buf +} + +func decodeBinLogDelete(sz []byte) ([]byte, error) { + if len(sz) < 1 || sz[0] != BinLogTypeDeletion { + return nil, errBinLogDeleteType + } + + return sz[1:], nil +} + +func encodeBinLogPut(key []byte, value []byte) []byte { + buf := make([]byte, 3+len(key)+len(value)) + buf[0] = BinLogTypePut + pos := 1 + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + copy(buf[pos:], key) + pos += len(key) + copy(buf[pos:], value) + + return buf +} + +func decodeBinLogPut(sz []byte) ([]byte, []byte, error) { + if len(sz) < 3 || sz[0] != BinLogTypePut { + return nil, nil, errBinLogPutType + } + + keyLen := int(binary.BigEndian.Uint16(sz[1:])) + if 3+keyLen > len(sz) { + return nil, nil, errBinLogPutType + } + + return sz[3 : 3+keyLen], sz[3+keyLen:], nil +} + +func FormatBinLogEvent(event []byte) (string, error) { + logType := uint8(event[0]) + + var err error + var k []byte + var v []byte + + var buf []byte = make([]byte, 0, 1024) + + switch logType { + case BinLogTypePut: + k, v, err = decodeBinLogPut(event) + buf = append(buf, "PUT "...) + case BinLogTypeDeletion: + k, err = decodeBinLogDelete(event) + buf = append(buf, "DELETE "...) + default: + err = errInvalidBinLogEvent + } + + if err != nil { + return "", err + } + + if buf, err = formatDataKey(buf, k); err != nil { + return "", err + } + + if v != nil && len(v) != 0 { + buf = append(buf, fmt.Sprintf(" %q", v)...) + } + + return String(buf), nil +} + +func formatDataKey(buf []byte, k []byte) ([]byte, error) { + if len(k) < 2 { + return nil, errInvalidBinLogEvent + } + + buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) + buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...) + + db := new(DB) + db.index = k[0] + + //to do format at respective place + + switch k[1] { + case KVType: + if key, err := db.decodeKVKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case HashType: + if key, field, err := db.hDecodeHashKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(field)) + } + case HSizeType: + if key, err := db.hDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ListType: + if key, seq, err := db.lDecodeListKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, int64(seq), 10) + } + case LMetaType: + if key, err := db.lDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZSetType: + if key, m, err := db.zDecodeSetKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + } + case ZSizeType: + if key, err := db.zDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZScoreType: + if key, m, score, err := db.zDecodeScoreKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, score, 10) + } + case BitType: + if key, seq, err := db.bDecodeBinKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendUint(buf, uint64(seq), 10) + } + case BitMetaType: + if key, err := db.bDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case SetType: + if key, member, err := db.sDecodeSetKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(member)) + } + case SSizeType: + if key, err := db.sDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ExpTimeType: + if tp, key, t, err := db.expDecodeTimeKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, t, 10) + } + case ExpMetaType: + if tp, key, err := db.expDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + } + default: + return nil, errInvalidBinLogEvent + } + + return buf, nil +} diff --git a/vendor/github.com/lunny/nodb/config/config.go b/vendor/github.com/lunny/nodb/config/config.go new file mode 100644 index 0000000000..3b44d3043f --- /dev/null +++ b/vendor/github.com/lunny/nodb/config/config.go @@ -0,0 +1,135 @@ +package config + +import ( + "io/ioutil" + + "github.com/BurntSushi/toml" +) + +type Size int + +const ( + DefaultAddr string = "127.0.0.1:6380" + DefaultHttpAddr string = "127.0.0.1:11181" + + DefaultDBName string = "goleveldb" + + DefaultDataDir string = "./data" +) + +const ( + MaxBinLogFileSize int = 1024 * 1024 * 1024 + MaxBinLogFileNum int = 10000 + + DefaultBinLogFileSize int = MaxBinLogFileSize + DefaultBinLogFileNum int = 10 +) + +type LevelDBConfig struct { + Compression bool `toml:"compression"` + BlockSize int `toml:"block_size"` + WriteBufferSize int `toml:"write_buffer_size"` + CacheSize int `toml:"cache_size"` + MaxOpenFiles int `toml:"max_open_files"` +} + +type LMDBConfig struct { + MapSize int `toml:"map_size"` + NoSync bool `toml:"nosync"` +} + +type BinLogConfig struct { + MaxFileSize int `toml:"max_file_size"` + MaxFileNum int `toml:"max_file_num"` +} + +type Config struct { + DataDir string `toml:"data_dir"` + + DBName string `toml:"db_name"` + + LevelDB LevelDBConfig `toml:"leveldb"` + + LMDB LMDBConfig `toml:"lmdb"` + + BinLog BinLogConfig `toml:"binlog"` + + SlaveOf string `toml:"slaveof"` + + AccessLog string `toml:"access_log"` +} + +func NewConfigWithFile(fileName string) (*Config, error) { + data, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + + return NewConfigWithData(data) +} + +func NewConfigWithData(data []byte) (*Config, error) { + cfg := NewConfigDefault() + + _, err := toml.Decode(string(data), cfg) + if err != nil { + return nil, err + } + + return cfg, nil +} + +func NewConfigDefault() *Config { + cfg := new(Config) + + cfg.DataDir = DefaultDataDir + + cfg.DBName = DefaultDBName + + // disable binlog + cfg.BinLog.MaxFileNum = 0 + cfg.BinLog.MaxFileSize = 0 + + // disable replication + cfg.SlaveOf = "" + + // disable access log + cfg.AccessLog = "" + + cfg.LMDB.MapSize = 20 * 1024 * 1024 + cfg.LMDB.NoSync = true + + return cfg +} + +func (cfg *LevelDBConfig) Adjust() { + if cfg.CacheSize <= 0 { + cfg.CacheSize = 4 * 1024 * 1024 + } + + if cfg.BlockSize <= 0 { + cfg.BlockSize = 4 * 1024 + } + + if cfg.WriteBufferSize <= 0 { + cfg.WriteBufferSize = 4 * 1024 * 1024 + } + + if cfg.MaxOpenFiles < 1024 { + cfg.MaxOpenFiles = 1024 + } +} + +func (cfg *BinLogConfig) Adjust() { + if cfg.MaxFileSize <= 0 { + cfg.MaxFileSize = DefaultBinLogFileSize + } else if cfg.MaxFileSize > MaxBinLogFileSize { + cfg.MaxFileSize = MaxBinLogFileSize + } + + if cfg.MaxFileNum <= 0 { + cfg.MaxFileNum = DefaultBinLogFileNum + } else if cfg.MaxFileNum > MaxBinLogFileNum { + cfg.MaxFileNum = MaxBinLogFileNum + } +} diff --git a/vendor/github.com/lunny/nodb/const.go b/vendor/github.com/lunny/nodb/const.go new file mode 100644 index 0000000000..446dae634e --- /dev/null +++ b/vendor/github.com/lunny/nodb/const.go @@ -0,0 +1,98 @@ +package nodb + +import ( + "errors" +) + +const ( + NoneType byte = 0 + KVType byte = 1 + HashType byte = 2 + HSizeType byte = 3 + ListType byte = 4 + LMetaType byte = 5 + ZSetType byte = 6 + ZSizeType byte = 7 + ZScoreType byte = 8 + BitType byte = 9 + BitMetaType byte = 10 + SetType byte = 11 + SSizeType byte = 12 + + maxDataType byte = 100 + + ExpTimeType byte = 101 + ExpMetaType byte = 102 +) + +var ( + TypeName = map[byte]string{ + KVType: "kv", + HashType: "hash", + HSizeType: "hsize", + ListType: "list", + LMetaType: "lmeta", + ZSetType: "zset", + ZSizeType: "zsize", + ZScoreType: "zscore", + BitType: "bit", + BitMetaType: "bitmeta", + SetType: "set", + SSizeType: "ssize", + ExpTimeType: "exptime", + ExpMetaType: "expmeta", + } +) + +const ( + defaultScanCount int = 10 +) + +var ( + errKeySize = errors.New("invalid key size") + errValueSize = errors.New("invalid value size") + errHashFieldSize = errors.New("invalid hash field size") + errSetMemberSize = errors.New("invalid set member size") + errZSetMemberSize = errors.New("invalid zset member size") + errExpireValue = errors.New("invalid expire value") +) + +const ( + //we don't support too many databases + MaxDBNumber uint8 = 16 + + //max key size + MaxKeySize int = 1024 + + //max hash field size + MaxHashFieldSize int = 1024 + + //max zset member size + MaxZSetMemberSize int = 1024 + + //max set member size + MaxSetMemberSize int = 1024 + + //max value size + MaxValueSize int = 10 * 1024 * 1024 +) + +var ( + ErrScoreMiss = errors.New("zset score miss") +) + +const ( + BinLogTypeDeletion uint8 = 0x0 + BinLogTypePut uint8 = 0x1 + BinLogTypeCommand uint8 = 0x2 +) + +const ( + DBAutoCommit uint8 = 0x0 + DBInTransaction uint8 = 0x1 + DBInMulti uint8 = 0x2 +) + +var ( + Version = "0.1" +) diff --git a/vendor/github.com/lunny/nodb/doc.go b/vendor/github.com/lunny/nodb/doc.go new file mode 100644 index 0000000000..2f7df33ffd --- /dev/null +++ b/vendor/github.com/lunny/nodb/doc.go @@ -0,0 +1,61 @@ +// package nodb is a high performance embedded NoSQL. +// +// nodb supports various data structure like kv, list, hash and zset like redis. +// +// Other features include binlog replication, data with a limited time-to-live. +// +// Usage +// +// First create a nodb instance before use: +// +// l := nodb.Open(cfg) +// +// cfg is a Config instance which contains configuration for nodb use, +// like DataDir (root directory for nodb working to store data). +// +// After you create a nodb instance, you can select a DB to store you data: +// +// db, _ := l.Select(0) +// +// DB must be selected by a index, nodb supports only 16 databases, so the index range is [0-15]. +// +// KV +// +// KV is the most basic nodb type like any other key-value database. +// +// err := db.Set(key, value) +// value, err := db.Get(key) +// +// List +// +// List is simply lists of values, sorted by insertion order. +// You can push or pop value on the list head (left) or tail (right). +// +// err := db.LPush(key, value1) +// err := db.RPush(key, value2) +// value1, err := db.LPop(key) +// value2, err := db.RPop(key) +// +// Hash +// +// Hash is a map between fields and values. +// +// n, err := db.HSet(key, field1, value1) +// n, err := db.HSet(key, field2, value2) +// value1, err := db.HGet(key, field1) +// value2, err := db.HGet(key, field2) +// +// ZSet +// +// ZSet is a sorted collections of values. +// Every member of zset is associated with score, a int64 value which used to sort, from smallest to greatest score. +// Members are unique, but score may be same. +// +// n, err := db.ZAdd(key, ScorePair{score1, member1}, ScorePair{score2, member2}) +// ay, err := db.ZRangeByScore(key, minScore, maxScore, 0, -1) +// +// Binlog +// +// nodb supports binlog, so you can sync binlog to another server for replication. If you want to open binlog support, set UseBinLog to true in config. +// +package nodb diff --git a/vendor/github.com/lunny/nodb/dump.go b/vendor/github.com/lunny/nodb/dump.go new file mode 100644 index 0000000000..3c9722e00d --- /dev/null +++ b/vendor/github.com/lunny/nodb/dump.go @@ -0,0 +1,200 @@ +package nodb + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "os" + + "github.com/siddontang/go-snappy/snappy" +) + +//dump format +// fileIndex(bigendian int64)|filePos(bigendian int64) +// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... +// +//key and value are both compressed for fast transfer dump on network using snappy + +type BinLogAnchor struct { + LogFileIndex int64 + LogPos int64 +} + +func (m *BinLogAnchor) WriteTo(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil { + return err + } + return nil +} + +func (m *BinLogAnchor) ReadFrom(r io.Reader) error { + err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) + if err != nil { + return err + } + + err = binary.Read(r, binary.BigEndian, &m.LogPos) + if err != nil { + return err + } + + return nil +} + +func (l *Nodb) DumpFile(path string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + return l.Dump(f) +} + +func (l *Nodb) Dump(w io.Writer) error { + m := new(BinLogAnchor) + + var err error + + l.wLock.Lock() + defer l.wLock.Unlock() + + if l.binlog != nil { + m.LogFileIndex = l.binlog.LogFileIndex() + m.LogPos = l.binlog.LogFilePos() + } + + wb := bufio.NewWriterSize(w, 4096) + if err = m.WriteTo(wb); err != nil { + return err + } + + it := l.ldb.NewIterator() + it.SeekToFirst() + + compressBuf := make([]byte, 4096) + + var key []byte + var value []byte + for ; it.Valid(); it.Next() { + key = it.RawKey() + value = it.RawValue() + + if key, err = snappy.Encode(compressBuf, key); err != nil { + return err + } + + if err = binary.Write(wb, binary.BigEndian, uint16(len(key))); err != nil { + return err + } + + if _, err = wb.Write(key); err != nil { + return err + } + + if value, err = snappy.Encode(compressBuf, value); err != nil { + return err + } + + if err = binary.Write(wb, binary.BigEndian, uint32(len(value))); err != nil { + return err + } + + if _, err = wb.Write(value); err != nil { + return err + } + } + + if err = wb.Flush(); err != nil { + return err + } + + compressBuf = nil + + return nil +} + +func (l *Nodb) LoadDumpFile(path string) (*BinLogAnchor, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + return l.LoadDump(f) +} + +func (l *Nodb) LoadDump(r io.Reader) (*BinLogAnchor, error) { + l.wLock.Lock() + defer l.wLock.Unlock() + + info := new(BinLogAnchor) + + rb := bufio.NewReaderSize(r, 4096) + + err := info.ReadFrom(rb) + if err != nil { + return nil, err + } + + var keyLen uint16 + var valueLen uint32 + + var keyBuf bytes.Buffer + var valueBuf bytes.Buffer + + deKeyBuf := make([]byte, 4096) + deValueBuf := make([]byte, 4096) + + var key, value []byte + + for { + if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF { + return nil, err + } else if err == io.EOF { + break + } + + if _, err = io.CopyN(&keyBuf, rb, int64(keyLen)); err != nil { + return nil, err + } + + if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil { + return nil, err + } + + if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil { + return nil, err + } + + if _, err = io.CopyN(&valueBuf, rb, int64(valueLen)); err != nil { + return nil, err + } + + if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil { + return nil, err + } + + if err = l.ldb.Put(key, value); err != nil { + return nil, err + } + + keyBuf.Reset() + valueBuf.Reset() + } + + deKeyBuf = nil + deValueBuf = nil + + //if binlog enable, we will delete all binlogs and open a new one for handling simply + if l.binlog != nil { + l.binlog.PurgeAll() + } + + return info, nil +} diff --git a/vendor/github.com/lunny/nodb/info.go b/vendor/github.com/lunny/nodb/info.go new file mode 100644 index 0000000000..3fd37e3d44 --- /dev/null +++ b/vendor/github.com/lunny/nodb/info.go @@ -0,0 +1,24 @@ +package nodb + +// todo, add info + +// type Keyspace struct { +// Kvs int `json:"kvs"` +// KvExpires int `json:"kv_expires"` + +// Lists int `json:"lists"` +// ListExpires int `json:"list_expires"` + +// Bitmaps int `json:"bitmaps"` +// BitmapExpires int `json:"bitmap_expires"` + +// ZSets int `json:"zsets"` +// ZSetExpires int `json:"zset_expires"` + +// Hashes int `json:"hashes"` +// HashExpires int `json:"hahsh_expires"` +// } + +// type Info struct { +// KeySpaces [MaxDBNumber]Keyspace +// } diff --git a/vendor/github.com/lunny/nodb/multi.go b/vendor/github.com/lunny/nodb/multi.go new file mode 100644 index 0000000000..ca581ce9a2 --- /dev/null +++ b/vendor/github.com/lunny/nodb/multi.go @@ -0,0 +1,73 @@ +package nodb + +import ( + "errors" + "fmt" +) + +var ( + ErrNestMulti = errors.New("nest multi not supported") + ErrMultiDone = errors.New("multi has been closed") +) + +type Multi struct { + *DB +} + +func (db *DB) IsInMulti() bool { + return db.status == DBInMulti +} + +// begin a mutli to execute commands, +// it will block any other write operations before you close the multi, unlike transaction, mutli can not rollback +func (db *DB) Multi() (*Multi, error) { + if db.IsInMulti() { + return nil, ErrNestMulti + } + + m := new(Multi) + + m.DB = new(DB) + m.DB.status = DBInMulti + + m.DB.l = db.l + + m.l.wLock.Lock() + + m.DB.sdb = db.sdb + + m.DB.bucket = db.sdb + + m.DB.index = db.index + + m.DB.kvBatch = m.newBatch() + m.DB.listBatch = m.newBatch() + m.DB.hashBatch = m.newBatch() + m.DB.zsetBatch = m.newBatch() + m.DB.binBatch = m.newBatch() + m.DB.setBatch = m.newBatch() + + return m, nil +} + +func (m *Multi) newBatch() *batch { + return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil) +} + +func (m *Multi) Close() error { + if m.bucket == nil { + return ErrMultiDone + } + m.l.wLock.Unlock() + m.bucket = nil + return nil +} + +func (m *Multi) Select(index int) error { + if index < 0 || index >= int(MaxDBNumber) { + return fmt.Errorf("invalid db index %d", index) + } + + m.DB.index = uint8(index) + return nil +} diff --git a/vendor/github.com/lunny/nodb/nodb.go b/vendor/github.com/lunny/nodb/nodb.go new file mode 100644 index 0000000000..fdd0272c94 --- /dev/null +++ b/vendor/github.com/lunny/nodb/nodb.go @@ -0,0 +1,128 @@ +package nodb + +import ( + "fmt" + "sync" + "time" + + "github.com/lunny/log" + "github.com/lunny/nodb/config" + "github.com/lunny/nodb/store" +) + +type Nodb struct { + cfg *config.Config + + ldb *store.DB + dbs [MaxDBNumber]*DB + + quit chan struct{} + jobs *sync.WaitGroup + + binlog *BinLog + + wLock sync.RWMutex //allow one write at same time + commitLock sync.Mutex //allow one write commit at same time +} + +func Open(cfg *config.Config) (*Nodb, error) { + if len(cfg.DataDir) == 0 { + cfg.DataDir = config.DefaultDataDir + } + + ldb, err := store.Open(cfg) + if err != nil { + return nil, err + } + + l := new(Nodb) + + l.quit = make(chan struct{}) + l.jobs = new(sync.WaitGroup) + + l.ldb = ldb + + if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 { + l.binlog, err = NewBinLog(cfg) + if err != nil { + return nil, err + } + } else { + l.binlog = nil + } + + for i := uint8(0); i < MaxDBNumber; i++ { + l.dbs[i] = l.newDB(i) + } + + l.activeExpireCycle() + + return l, nil +} + +func (l *Nodb) Close() { + close(l.quit) + l.jobs.Wait() + + l.ldb.Close() + + if l.binlog != nil { + l.binlog.Close() + l.binlog = nil + } +} + +func (l *Nodb) Select(index int) (*DB, error) { + if index < 0 || index >= int(MaxDBNumber) { + return nil, fmt.Errorf("invalid db index %d", index) + } + + return l.dbs[index], nil +} + +func (l *Nodb) FlushAll() error { + for index, db := range l.dbs { + if _, err := db.FlushAll(); err != nil { + log.Error("flush db %d error %s", index, err.Error()) + } + } + + return nil +} + +// very dangerous to use +func (l *Nodb) DataDB() *store.DB { + return l.ldb +} + +func (l *Nodb) activeExpireCycle() { + var executors []*elimination = make([]*elimination, len(l.dbs)) + for i, db := range l.dbs { + executors[i] = db.newEliminator() + } + + l.jobs.Add(1) + go func() { + tick := time.NewTicker(1 * time.Second) + end := false + done := make(chan struct{}) + for !end { + select { + case <-tick.C: + go func() { + for _, eli := range executors { + eli.active() + } + done <- struct{}{} + }() + <-done + case <-l.quit: + end = true + break + } + } + + tick.Stop() + l.jobs.Done() + }() +} diff --git a/vendor/github.com/lunny/nodb/nodb_db.go b/vendor/github.com/lunny/nodb/nodb_db.go new file mode 100644 index 0000000000..f68ebaa0d4 --- /dev/null +++ b/vendor/github.com/lunny/nodb/nodb_db.go @@ -0,0 +1,171 @@ +package nodb + +import ( + "fmt" + "sync" + + "github.com/lunny/nodb/store" +) + +type ibucket interface { + Get(key []byte) ([]byte, error) + + Put(key []byte, value []byte) error + Delete(key []byte) error + + NewIterator() *store.Iterator + + NewWriteBatch() store.WriteBatch + + RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator + RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator + RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator + RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator +} + +type DB struct { + l *Nodb + + sdb *store.DB + + bucket ibucket + + index uint8 + + kvBatch *batch + listBatch *batch + hashBatch *batch + zsetBatch *batch + binBatch *batch + setBatch *batch + + status uint8 +} + +func (l *Nodb) newDB(index uint8) *DB { + d := new(DB) + + d.l = l + + d.sdb = l.ldb + + d.bucket = d.sdb + + d.status = DBAutoCommit + d.index = index + + d.kvBatch = d.newBatch() + d.listBatch = d.newBatch() + d.hashBatch = d.newBatch() + d.zsetBatch = d.newBatch() + d.binBatch = d.newBatch() + d.setBatch = d.newBatch() + + return d +} + +func (db *DB) newBatch() *batch { + return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil) +} + +func (db *DB) Index() int { + return int(db.index) +} + +func (db *DB) IsAutoCommit() bool { + return db.status == DBAutoCommit +} + +func (db *DB) FlushAll() (drop int64, err error) { + all := [...](func() (int64, error)){ + db.flush, + db.lFlush, + db.hFlush, + db.zFlush, + db.bFlush, + db.sFlush} + + for _, flush := range all { + if n, e := flush(); e != nil { + err = e + return + } else { + drop += n + } + } + + return +} + +func (db *DB) newEliminator() *elimination { + eliminator := newEliminator(db) + + eliminator.regRetireContext(KVType, db.kvBatch, db.delete) + eliminator.regRetireContext(ListType, db.listBatch, db.lDelete) + eliminator.regRetireContext(HashType, db.hashBatch, db.hDelete) + eliminator.regRetireContext(ZSetType, db.zsetBatch, db.zDelete) + eliminator.regRetireContext(BitType, db.binBatch, db.bDelete) + eliminator.regRetireContext(SetType, db.setBatch, db.sDelete) + + return eliminator +} + +func (db *DB) flushRegion(t *batch, minKey []byte, maxKey []byte) (drop int64, err error) { + it := db.bucket.RangeIterator(minKey, maxKey, store.RangeROpen) + for ; it.Valid(); it.Next() { + t.Delete(it.RawKey()) + drop++ + if drop&1023 == 0 { + if err = t.Commit(); err != nil { + return + } + } + } + it.Close() + return +} + +func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) { + var deleteFunc func(t *batch, key []byte) int64 + var metaDataType byte + switch dataType { + case KVType: + deleteFunc = db.delete + metaDataType = KVType + case ListType: + deleteFunc = db.lDelete + metaDataType = LMetaType + case HashType: + deleteFunc = db.hDelete + metaDataType = HSizeType + case ZSetType: + deleteFunc = db.zDelete + metaDataType = ZSizeType + case BitType: + deleteFunc = db.bDelete + metaDataType = BitMetaType + case SetType: + deleteFunc = db.sDelete + metaDataType = SSizeType + default: + return 0, fmt.Errorf("invalid data type: %s", TypeName[dataType]) + } + + var keys [][]byte + keys, err = db.scan(metaDataType, nil, 1024, false, "") + for len(keys) != 0 || err != nil { + for _, key := range keys { + deleteFunc(t, key) + db.rmExpire(t, dataType, key) + + } + + if err = t.Commit(); err != nil { + return + } else { + drop += int64(len(keys)) + } + keys, err = db.scan(metaDataType, nil, 1024, false, "") + } + return +} diff --git a/vendor/github.com/lunny/nodb/replication.go b/vendor/github.com/lunny/nodb/replication.go new file mode 100644 index 0000000000..f9bc951085 --- /dev/null +++ b/vendor/github.com/lunny/nodb/replication.go @@ -0,0 +1,312 @@ +package nodb + +import ( + "bufio" + "bytes" + "errors" + "io" + "os" + "time" + + "github.com/lunny/log" + "github.com/lunny/nodb/store/driver" +) + +const ( + maxReplBatchNum = 100 + maxReplLogSize = 1 * 1024 * 1024 +) + +var ( + ErrSkipEvent = errors.New("skip to next event") +) + +var ( + errInvalidBinLogEvent = errors.New("invalid binglog event") + errInvalidBinLogFile = errors.New("invalid binlog file") +) + +type replBatch struct { + wb driver.IWriteBatch + events [][]byte + l *Nodb + + lastHead *BinLogHead +} + +func (b *replBatch) Commit() error { + b.l.commitLock.Lock() + defer b.l.commitLock.Unlock() + + err := b.wb.Commit() + if err != nil { + b.Rollback() + return err + } + + if b.l.binlog != nil { + if err = b.l.binlog.Log(b.events...); err != nil { + b.Rollback() + return err + } + } + + b.events = [][]byte{} + b.lastHead = nil + + return nil +} + +func (b *replBatch) Rollback() error { + b.wb.Rollback() + b.events = [][]byte{} + b.lastHead = nil + return nil +} + +func (l *Nodb) replicateEvent(b *replBatch, event []byte) error { + if len(event) == 0 { + return errInvalidBinLogEvent + } + + b.events = append(b.events, event) + + logType := uint8(event[0]) + switch logType { + case BinLogTypePut: + return l.replicatePutEvent(b, event) + case BinLogTypeDeletion: + return l.replicateDeleteEvent(b, event) + default: + return errInvalidBinLogEvent + } +} + +func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error { + key, value, err := decodeBinLogPut(event) + if err != nil { + return err + } + + b.wb.Put(key, value) + + return nil +} + +func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error { + key, err := decodeBinLogDelete(event) + if err != nil { + return err + } + + b.wb.Delete(key) + + return nil +} + +func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { + head := &BinLogHead{} + var err error + + for { + if err = head.Read(rb); err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + var dataBuf bytes.Buffer + + if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { + return err + } + + err = f(head, dataBuf.Bytes()) + if err != nil && err != ErrSkipEvent { + return err + } + } + + return nil +} + +func (l *Nodb) ReplicateFromReader(rb io.Reader) error { + b := new(replBatch) + + b.wb = l.ldb.NewWriteBatch() + b.l = l + + f := func(head *BinLogHead, event []byte) error { + if b.lastHead == nil { + b.lastHead = head + } else if !b.lastHead.InSameBatch(head) { + if err := b.Commit(); err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + b.lastHead = head + } + + err := l.replicateEvent(b, event) + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + return nil + } + + err := ReadEventFromReader(rb, f) + if err != nil { + b.Rollback() + return err + } + return b.Commit() +} + +func (l *Nodb) ReplicateFromData(data []byte) error { + rb := bytes.NewReader(data) + + err := l.ReplicateFromReader(rb) + + return err +} + +func (l *Nodb) ReplicateFromBinLog(filePath string) error { + f, err := os.Open(filePath) + if err != nil { + return err + } + + rb := bufio.NewReaderSize(f, 4096) + + err = l.ReplicateFromReader(rb) + + f.Close() + + return err +} + +// try to read events, if no events read, try to wait the new event singal until timeout seconds +func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { + lastIndex := info.LogFileIndex + lastPos := info.LogPos + + n = 0 + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + info.LogPos = 0 + return + } + + n, err = l.ReadEventsTo(info, w) + if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { + //no events read + select { + case <-l.binlog.Wait(): + case <-time.After(time.Duration(timeout) * time.Second): + } + return l.ReadEventsTo(info, w) + } + return +} + +func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { + n = 0 + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + info.LogPos = 0 + return + } + + index := info.LogFileIndex + offset := info.LogPos + + filePath := l.binlog.FormatLogFilePath(index) + + var f *os.File + f, err = os.Open(filePath) + if os.IsNotExist(err) { + lastIndex := l.binlog.LogFileIndex() + + if index == lastIndex { + //no binlog at all + info.LogPos = 0 + } else { + //slave binlog info had lost + info.LogFileIndex = -1 + } + } + + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return + } + + defer f.Close() + + var fileSize int64 + st, _ := f.Stat() + fileSize = st.Size() + + if fileSize == info.LogPos { + return + } + + if _, err = f.Seek(offset, os.SEEK_SET); err != nil { + //may be invliad seek offset + return + } + + var lastHead *BinLogHead = nil + + head := &BinLogHead{} + + batchNum := 0 + + for { + if err = head.Read(f); err != nil { + if err == io.EOF { + //we will try to use next binlog + if index < l.binlog.LogFileIndex() { + info.LogFileIndex += 1 + info.LogPos = 0 + } + err = nil + return + } else { + return + } + + } + + if lastHead == nil { + lastHead = head + batchNum++ + } else if !lastHead.InSameBatch(head) { + lastHead = head + batchNum++ + if batchNum > maxReplBatchNum || n > maxReplLogSize { + return + } + } + + if err = head.Write(w); err != nil { + return + } + + if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { + return + } + + n += (head.Len() + int(head.PayloadLen)) + info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) + } + + return +} diff --git a/vendor/github.com/lunny/nodb/scan.go b/vendor/github.com/lunny/nodb/scan.go new file mode 100644 index 0000000000..e989db3fed --- /dev/null +++ b/vendor/github.com/lunny/nodb/scan.go @@ -0,0 +1,144 @@ +package nodb + +import ( + "bytes" + "errors" + "regexp" + + "github.com/lunny/nodb/store" +) + +var errDataType = errors.New("error data type") +var errMetaKey = errors.New("error meta key") + +// Seek search the prefix key +func (db *DB) Seek(key []byte) (*store.Iterator, error) { + return db.seek(KVType, key) +} + +func (db *DB) seek(dataType byte, key []byte) (*store.Iterator, error) { + var minKey []byte + var err error + + if len(key) > 0 { + if err = checkKeySize(key); err != nil { + return nil, err + } + if minKey, err = db.encodeMetaKey(dataType, key); err != nil { + return nil, err + } + + } else { + if minKey, err = db.encodeMinKey(dataType); err != nil { + return nil, err + } + } + + it := db.bucket.NewIterator() + it.Seek(minKey) + return it, nil +} + +func (db *DB) MaxKey() ([]byte, error) { + return db.encodeMaxKey(KVType) +} + +func (db *DB) Key(it *store.Iterator) ([]byte, error) { + return db.decodeMetaKey(KVType, it.Key()) +} + +func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match string) ([][]byte, error) { + var minKey, maxKey []byte + var err error + var r *regexp.Regexp + + if len(match) > 0 { + if r, err = regexp.Compile(match); err != nil { + return nil, err + } + } + + if len(key) > 0 { + if err = checkKeySize(key); err != nil { + return nil, err + } + if minKey, err = db.encodeMetaKey(dataType, key); err != nil { + return nil, err + } + + } else { + if minKey, err = db.encodeMinKey(dataType); err != nil { + return nil, err + } + } + + if maxKey, err = db.encodeMaxKey(dataType); err != nil { + return nil, err + } + + if count <= 0 { + count = defaultScanCount + } + + v := make([][]byte, 0, count) + + it := db.bucket.NewIterator() + it.Seek(minKey) + + if !inclusive { + if it.Valid() && bytes.Equal(it.RawKey(), minKey) { + it.Next() + } + } + + for i := 0; it.Valid() && i < count && bytes.Compare(it.RawKey(), maxKey) < 0; it.Next() { + if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil { + continue + } else if r != nil && !r.Match(k) { + continue + } else { + v = append(v, k) + i++ + } + } + it.Close() + return v, nil +} + +func (db *DB) encodeMinKey(dataType byte) ([]byte, error) { + return db.encodeMetaKey(dataType, nil) +} + +func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) { + k, err := db.encodeMetaKey(dataType, nil) + if err != nil { + return nil, err + } + k[len(k)-1] = dataType + 1 + return k, nil +} + +func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) { + switch dataType { + case KVType: + return db.encodeKVKey(key), nil + case LMetaType: + return db.lEncodeMetaKey(key), nil + case HSizeType: + return db.hEncodeSizeKey(key), nil + case ZSizeType: + return db.zEncodeSizeKey(key), nil + case BitMetaType: + return db.bEncodeMetaKey(key), nil + case SSizeType: + return db.sEncodeSizeKey(key), nil + default: + return nil, errDataType + } +} +func (db *DB) decodeMetaKey(dataType byte, ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != dataType { + return nil, errMetaKey + } + return ek[2:], nil +} diff --git a/vendor/github.com/lunny/nodb/store/db.go b/vendor/github.com/lunny/nodb/store/db.go new file mode 100644 index 0000000000..00a8831a67 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/db.go @@ -0,0 +1,61 @@ +package store + +import ( + "github.com/lunny/nodb/store/driver" +) + +type DB struct { + driver.IDB +} + +func (db *DB) NewIterator() *Iterator { + it := new(Iterator) + it.it = db.IDB.NewIterator() + + return it +} + +func (db *DB) NewWriteBatch() WriteBatch { + return db.IDB.NewWriteBatch() +} + +func (db *DB) NewSnapshot() (*Snapshot, error) { + var err error + s := &Snapshot{} + if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil { + return nil, err + } + + return s, nil +} + +func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} + +func (db *DB) Begin() (*Tx, error) { + tx, err := db.IDB.Begin() + if err != nil { + return nil, err + } + + return &Tx{tx}, nil +} diff --git a/vendor/github.com/lunny/nodb/store/driver/batch.go b/vendor/github.com/lunny/nodb/store/driver/batch.go new file mode 100644 index 0000000000..6b79c21c48 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/driver/batch.go @@ -0,0 +1,39 @@ +package driver + +type BatchPuter interface { + BatchPut([]Write) error +} + +type Write struct { + Key []byte + Value []byte +} + +type WriteBatch struct { + batch BatchPuter + wb []Write +} + +func (w *WriteBatch) Put(key, value []byte) { + if value == nil { + value = []byte{} + } + w.wb = append(w.wb, Write{key, value}) +} + +func (w *WriteBatch) Delete(key []byte) { + w.wb = append(w.wb, Write{key, nil}) +} + +func (w *WriteBatch) Commit() error { + return w.batch.BatchPut(w.wb) +} + +func (w *WriteBatch) Rollback() error { + w.wb = w.wb[0:0] + return nil +} + +func NewWriteBatch(puter BatchPuter) IWriteBatch { + return &WriteBatch{puter, []Write{}} +} diff --git a/vendor/github.com/lunny/nodb/store/driver/driver.go b/vendor/github.com/lunny/nodb/store/driver/driver.go new file mode 100644 index 0000000000..6da67df083 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/driver/driver.go @@ -0,0 +1,67 @@ +package driver + +import ( + "errors" +) + +var ( + ErrTxSupport = errors.New("transaction is not supported") +) + +type IDB interface { + Close() error + + Get(key []byte) ([]byte, error) + + Put(key []byte, value []byte) error + Delete(key []byte) error + + NewIterator() IIterator + + NewWriteBatch() IWriteBatch + + NewSnapshot() (ISnapshot, error) + + Begin() (Tx, error) +} + +type ISnapshot interface { + Get(key []byte) ([]byte, error) + NewIterator() IIterator + Close() +} + +type IIterator interface { + Close() error + + First() + Last() + Seek(key []byte) + + Next() + Prev() + + Valid() bool + + Key() []byte + Value() []byte +} + +type IWriteBatch interface { + Put(key []byte, value []byte) + Delete(key []byte) + Commit() error + Rollback() error +} + +type Tx interface { + Get(key []byte) ([]byte, error) + Put(key []byte, value []byte) error + Delete(key []byte) error + + NewIterator() IIterator + NewWriteBatch() IWriteBatch + + Commit() error + Rollback() error +} diff --git a/vendor/github.com/lunny/nodb/store/driver/store.go b/vendor/github.com/lunny/nodb/store/driver/store.go new file mode 100644 index 0000000000..173431d4c1 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/driver/store.go @@ -0,0 +1,46 @@ +package driver + +import ( + "fmt" + + "github.com/lunny/nodb/config" +) + +type Store interface { + String() string + Open(path string, cfg *config.Config) (IDB, error) + Repair(path string, cfg *config.Config) error +} + +var dbs = map[string]Store{} + +func Register(s Store) { + name := s.String() + if _, ok := dbs[name]; ok { + panic(fmt.Errorf("store %s is registered", s)) + } + + dbs[name] = s +} + +func ListStores() []string { + s := []string{} + for k, _ := range dbs { + s = append(s, k) + } + + return s +} + +func GetStore(cfg *config.Config) (Store, error) { + if len(cfg.DBName) == 0 { + cfg.DBName = config.DefaultDBName + } + + s, ok := dbs[cfg.DBName] + if !ok { + return nil, fmt.Errorf("store %s is not registered", cfg.DBName) + } + + return s, nil +} diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/batch.go b/vendor/github.com/lunny/nodb/store/goleveldb/batch.go new file mode 100644 index 0000000000..b17e85e750 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/goleveldb/batch.go @@ -0,0 +1,27 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +type WriteBatch struct { + db *DB + wbatch *leveldb.Batch +} + +func (w *WriteBatch) Put(key, value []byte) { + w.wbatch.Put(key, value) +} + +func (w *WriteBatch) Delete(key []byte) { + w.wbatch.Delete(key) +} + +func (w *WriteBatch) Commit() error { + return w.db.db.Write(w.wbatch, nil) +} + +func (w *WriteBatch) Rollback() error { + w.wbatch.Reset() + return nil +} diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/const.go b/vendor/github.com/lunny/nodb/store/goleveldb/const.go new file mode 100644 index 0000000000..2fffa7c82b --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/goleveldb/const.go @@ -0,0 +1,4 @@ +package goleveldb + +const DBName = "goleveldb" +const MemDBName = "memory" diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/db.go b/vendor/github.com/lunny/nodb/store/goleveldb/db.go new file mode 100644 index 0000000000..a36e87f628 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/goleveldb/db.go @@ -0,0 +1,187 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/cache" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" + + "github.com/lunny/nodb/config" + "github.com/lunny/nodb/store/driver" + + "os" +) + +const defaultFilterBits int = 10 + +type Store struct { +} + +func (s Store) String() string { + return DBName +} + +type MemStore struct { +} + +func (s MemStore) String() string { + return MemDBName +} + +type DB struct { + path string + + cfg *config.LevelDBConfig + + db *leveldb.DB + + opts *opt.Options + + iteratorOpts *opt.ReadOptions + + cache cache.Cache + + filter filter.Filter +} + +func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.path = path + db.cfg = &cfg.LevelDB + + db.initOpts() + + var err error + db.db, err = leveldb.OpenFile(db.path, db.opts) + + if err != nil { + return nil, err + } + + return db, nil +} + +func (s Store) Repair(path string, cfg *config.Config) error { + db, err := leveldb.RecoverFile(path, newOptions(&cfg.LevelDB)) + if err != nil { + return err + } + + db.Close() + return nil +} + +func (s MemStore) Open(path string, cfg *config.Config) (driver.IDB, error) { + db := new(DB) + db.path = path + db.cfg = &cfg.LevelDB + + db.initOpts() + + var err error + db.db, err = leveldb.Open(storage.NewMemStorage(), db.opts) + if err != nil { + return nil, err + } + + return db, nil +} + +func (s MemStore) Repair(path string, cfg *config.Config) error { + return nil +} + +func (db *DB) initOpts() { + db.opts = newOptions(db.cfg) + + db.iteratorOpts = &opt.ReadOptions{} + db.iteratorOpts.DontFillCache = true +} + +func newOptions(cfg *config.LevelDBConfig) *opt.Options { + opts := &opt.Options{} + opts.ErrorIfMissing = false + + cfg.Adjust() + + //opts.BlockCacher = cache.NewLRU(cfg.CacheSize) + opts.BlockCacheCapacity = cfg.CacheSize + + //we must use bloomfilter + opts.Filter = filter.NewBloomFilter(defaultFilterBits) + + if !cfg.Compression { + opts.Compression = opt.NoCompression + } else { + opts.Compression = opt.SnappyCompression + } + + opts.BlockSize = cfg.BlockSize + opts.WriteBuffer = cfg.WriteBufferSize + + return opts +} + +func (db *DB) Close() error { + return db.db.Close() +} + +func (db *DB) Put(key, value []byte) error { + return db.db.Put(key, value, nil) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + v, err := db.db.Get(key, nil) + if err == leveldb.ErrNotFound { + return nil, nil + } + return v, nil +} + +func (db *DB) Delete(key []byte) error { + return db.db.Delete(key, nil) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: new(leveldb.Batch), + } + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := &Iterator{ + db.db.NewIterator(nil, db.iteratorOpts), + } + + return it +} + +func (db *DB) Begin() (driver.Tx, error) { + return nil, driver.ErrTxSupport +} + +func (db *DB) NewSnapshot() (driver.ISnapshot, error) { + snapshot, err := db.db.GetSnapshot() + if err != nil { + return nil, err + } + + s := &Snapshot{ + db: db, + snp: snapshot, + } + + return s, nil +} + +func init() { + driver.Register(Store{}) + driver.Register(MemStore{}) +} diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/iterator.go b/vendor/github.com/lunny/nodb/store/goleveldb/iterator.go new file mode 100644 index 0000000000..c1fd8b5573 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/goleveldb/iterator.go @@ -0,0 +1,49 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type Iterator struct { + it iterator.Iterator +} + +func (it *Iterator) Key() []byte { + return it.it.Key() +} + +func (it *Iterator) Value() []byte { + return it.it.Value() +} + +func (it *Iterator) Close() error { + if it.it != nil { + it.it.Release() + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return it.it.Valid() +} + +func (it *Iterator) Next() { + it.it.Next() +} + +func (it *Iterator) Prev() { + it.it.Prev() +} + +func (it *Iterator) First() { + it.it.First() +} + +func (it *Iterator) Last() { + it.it.Last() +} + +func (it *Iterator) Seek(key []byte) { + it.it.Seek(key) +} diff --git a/vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go b/vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go new file mode 100644 index 0000000000..fe2b409c3f --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/goleveldb/snapshot.go @@ -0,0 +1,26 @@ +package goleveldb + +import ( + "github.com/lunny/nodb/store/driver" + "github.com/syndtr/goleveldb/leveldb" +) + +type Snapshot struct { + db *DB + snp *leveldb.Snapshot +} + +func (s *Snapshot) Get(key []byte) ([]byte, error) { + return s.snp.Get(key, s.db.iteratorOpts) +} + +func (s *Snapshot) NewIterator() driver.IIterator { + it := &Iterator{ + s.snp.NewIterator(nil, s.db.iteratorOpts), + } + return it +} + +func (s *Snapshot) Close() { + s.snp.Release() +} diff --git a/vendor/github.com/lunny/nodb/store/iterator.go b/vendor/github.com/lunny/nodb/store/iterator.go new file mode 100644 index 0000000000..27bf689da2 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/iterator.go @@ -0,0 +1,327 @@ +package store + +import ( + "bytes" + + "github.com/lunny/nodb/store/driver" +) + +const ( + IteratorForward uint8 = 0 + IteratorBackward uint8 = 1 +) + +const ( + RangeClose uint8 = 0x00 + RangeLOpen uint8 = 0x01 + RangeROpen uint8 = 0x10 + RangeOpen uint8 = 0x11 +) + +// min must less or equal than max +// +// range type: +// +// close: [min, max] +// open: (min, max) +// lopen: (min, max] +// ropen: [min, max) +// +type Range struct { + Min []byte + Max []byte + + Type uint8 +} + +type Limit struct { + Offset int + Count int +} + +type Iterator struct { + it driver.IIterator +} + +// Returns a copy of key. +func (it *Iterator) Key() []byte { + k := it.it.Key() + if k == nil { + return nil + } + + return append([]byte{}, k...) +} + +// Returns a copy of value. +func (it *Iterator) Value() []byte { + v := it.it.Value() + if v == nil { + return nil + } + + return append([]byte{}, v...) +} + +// Returns a reference of key. +// you must be careful that it will be changed after next iterate. +func (it *Iterator) RawKey() []byte { + return it.it.Key() +} + +// Returns a reference of value. +// you must be careful that it will be changed after next iterate. +func (it *Iterator) RawValue() []byte { + return it.it.Value() +} + +// Copy key to b, if b len is small or nil, returns a new one. +func (it *Iterator) BufKey(b []byte) []byte { + k := it.RawKey() + if k == nil { + return nil + } + if b == nil { + b = []byte{} + } + + b = b[0:0] + return append(b, k...) +} + +// Copy value to b, if b len is small or nil, returns a new one. +func (it *Iterator) BufValue(b []byte) []byte { + v := it.RawValue() + if v == nil { + return nil + } + + if b == nil { + b = []byte{} + } + + b = b[0:0] + return append(b, v...) +} + +func (it *Iterator) Close() { + if it.it != nil { + it.it.Close() + it.it = nil + } +} + +func (it *Iterator) Valid() bool { + return it.it.Valid() +} + +func (it *Iterator) Next() { + it.it.Next() +} + +func (it *Iterator) Prev() { + it.it.Prev() +} + +func (it *Iterator) SeekToFirst() { + it.it.First() +} + +func (it *Iterator) SeekToLast() { + it.it.Last() +} + +func (it *Iterator) Seek(key []byte) { + it.it.Seek(key) +} + +// Finds by key, if not found, nil returns. +func (it *Iterator) Find(key []byte) []byte { + it.Seek(key) + if it.Valid() { + k := it.RawKey() + if k == nil { + return nil + } else if bytes.Equal(k, key) { + return it.Value() + } + } + + return nil +} + +// Finds by key, if not found, nil returns, else a reference of value returns. +// you must be careful that it will be changed after next iterate. +func (it *Iterator) RawFind(key []byte) []byte { + it.Seek(key) + if it.Valid() { + k := it.RawKey() + if k == nil { + return nil + } else if bytes.Equal(k, key) { + return it.RawValue() + } + } + + return nil +} + +type RangeLimitIterator struct { + it *Iterator + + r *Range + l *Limit + + step int + + //0 for IteratorForward, 1 for IteratorBackward + direction uint8 +} + +func (it *RangeLimitIterator) Key() []byte { + return it.it.Key() +} + +func (it *RangeLimitIterator) Value() []byte { + return it.it.Value() +} + +func (it *RangeLimitIterator) RawKey() []byte { + return it.it.RawKey() +} + +func (it *RangeLimitIterator) RawValue() []byte { + return it.it.RawValue() +} + +func (it *RangeLimitIterator) BufKey(b []byte) []byte { + return it.it.BufKey(b) +} + +func (it *RangeLimitIterator) BufValue(b []byte) []byte { + return it.it.BufValue(b) +} + +func (it *RangeLimitIterator) Valid() bool { + if it.l.Offset < 0 { + return false + } else if !it.it.Valid() { + return false + } else if it.l.Count >= 0 && it.step >= it.l.Count { + return false + } + + if it.direction == IteratorForward { + if it.r.Max != nil { + r := bytes.Compare(it.it.RawKey(), it.r.Max) + if it.r.Type&RangeROpen > 0 { + return !(r >= 0) + } else { + return !(r > 0) + } + } + } else { + if it.r.Min != nil { + r := bytes.Compare(it.it.RawKey(), it.r.Min) + if it.r.Type&RangeLOpen > 0 { + return !(r <= 0) + } else { + return !(r < 0) + } + } + } + + return true +} + +func (it *RangeLimitIterator) Next() { + it.step++ + + if it.direction == IteratorForward { + it.it.Next() + } else { + it.it.Prev() + } +} + +func (it *RangeLimitIterator) Close() { + it.it.Close() +} + +func NewRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterator { + return rangeLimitIterator(i, r, l, IteratorForward) +} + +func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterator { + return rangeLimitIterator(i, r, l, IteratorBackward) +} + +func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward) +} + +func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward) +} + +func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator { + it := new(RangeLimitIterator) + + it.it = i + + it.r = r + it.l = l + it.direction = direction + + it.step = 0 + + if l.Offset < 0 { + return it + } + + if direction == IteratorForward { + if r.Min == nil { + it.it.SeekToFirst() + } else { + it.it.Seek(r.Min) + + if r.Type&RangeLOpen > 0 { + if it.it.Valid() && bytes.Equal(it.it.RawKey(), r.Min) { + it.it.Next() + } + } + } + } else { + if r.Max == nil { + it.it.SeekToLast() + } else { + it.it.Seek(r.Max) + + if !it.it.Valid() { + it.it.SeekToLast() + } else { + if !bytes.Equal(it.it.RawKey(), r.Max) { + it.it.Prev() + } + } + + if r.Type&RangeROpen > 0 { + if it.it.Valid() && bytes.Equal(it.it.RawKey(), r.Max) { + it.it.Prev() + } + } + } + } + + for i := 0; i < l.Offset; i++ { + if it.it.Valid() { + if it.direction == IteratorForward { + it.it.Next() + } else { + it.it.Prev() + } + } + } + + return it +} diff --git a/vendor/github.com/lunny/nodb/store/snapshot.go b/vendor/github.com/lunny/nodb/store/snapshot.go new file mode 100644 index 0000000000..75ba0497db --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/snapshot.go @@ -0,0 +1,16 @@ +package store + +import ( + "github.com/lunny/nodb/store/driver" +) + +type Snapshot struct { + driver.ISnapshot +} + +func (s *Snapshot) NewIterator() *Iterator { + it := new(Iterator) + it.it = s.ISnapshot.NewIterator() + + return it +} diff --git a/vendor/github.com/lunny/nodb/store/store.go b/vendor/github.com/lunny/nodb/store/store.go new file mode 100644 index 0000000000..5d0ade1bf0 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/store.go @@ -0,0 +1,51 @@ +package store + +import ( + "fmt" + "os" + "path" + "github.com/lunny/nodb/config" + "github.com/lunny/nodb/store/driver" + + _ "github.com/lunny/nodb/store/goleveldb" +) + +func getStorePath(cfg *config.Config) string { + return path.Join(cfg.DataDir, fmt.Sprintf("%s_data", cfg.DBName)) +} + +func Open(cfg *config.Config) (*DB, error) { + s, err := driver.GetStore(cfg) + if err != nil { + return nil, err + } + + path := getStorePath(cfg) + + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return nil, err + } + + idb, err := s.Open(path, cfg) + if err != nil { + return nil, err + } + + db := &DB{idb} + + return db, nil +} + +func Repair(cfg *config.Config) error { + s, err := driver.GetStore(cfg) + if err != nil { + return err + } + + path := getStorePath(cfg) + + return s.Repair(path, cfg) +} + +func init() { +} diff --git a/vendor/github.com/lunny/nodb/store/tx.go b/vendor/github.com/lunny/nodb/store/tx.go new file mode 100644 index 0000000000..32bcbcda4b --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/tx.go @@ -0,0 +1,42 @@ +package store + +import ( + "github.com/lunny/nodb/store/driver" +) + +type Tx struct { + driver.Tx +} + +func (tx *Tx) NewIterator() *Iterator { + it := new(Iterator) + it.it = tx.Tx.NewIterator() + + return it +} + +func (tx *Tx) NewWriteBatch() WriteBatch { + return tx.Tx.NewWriteBatch() +} + +func (tx *Tx) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +func (tx *Tx) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRevRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (tx *Tx) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRevRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} diff --git a/vendor/github.com/lunny/nodb/store/writebatch.go b/vendor/github.com/lunny/nodb/store/writebatch.go new file mode 100644 index 0000000000..23e079eba6 --- /dev/null +++ b/vendor/github.com/lunny/nodb/store/writebatch.go @@ -0,0 +1,9 @@ +package store + +import ( + "github.com/lunny/nodb/store/driver" +) + +type WriteBatch interface { + driver.IWriteBatch +} diff --git a/vendor/github.com/lunny/nodb/t_bit.go b/vendor/github.com/lunny/nodb/t_bit.go new file mode 100644 index 0000000000..930d4ba568 --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_bit.go @@ -0,0 +1,922 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "sort" + "time" + + "github.com/lunny/nodb/store" +) + +const ( + OPand uint8 = iota + 1 + OPor + OPxor + OPnot +) + +type BitPair struct { + Pos int32 + Val uint8 +} + +type segBitInfo struct { + Seq uint32 + Off uint32 + Val uint8 +} + +type segBitInfoArray []segBitInfo + +const ( + // byte + segByteWidth uint32 = 9 + segByteSize uint32 = 1 << segByteWidth + + // bit + segBitWidth uint32 = segByteWidth + 3 + segBitSize uint32 = segByteSize << 3 + + maxByteSize uint32 = 8 << 20 + maxSegCount uint32 = maxByteSize / segByteSize + + minSeq uint32 = 0 + maxSeq uint32 = uint32((maxByteSize << 3) - 1) +) + +var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, + 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, + 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, + 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, + 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, + 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, + 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, + 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, + 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} + +var fillBits = [...]uint8{1, 3, 7, 15, 31, 63, 127, 255} + +var emptySegment []byte = make([]byte, segByteSize, segByteSize) + +var fillSegment []byte = func() []byte { + data := make([]byte, segByteSize, segByteSize) + for i := uint32(0); i < segByteSize; i++ { + data[i] = 0xff + } + return data +}() + +var errBinKey = errors.New("invalid bin key") +var errOffset = errors.New("invalid offset") +var errDuplicatePos = errors.New("duplicate bit pos") + +func getBit(sz []byte, offset uint32) uint8 { + index := offset >> 3 + if index >= uint32(len(sz)) { + return 0 // error("overflow") + } + + offset -= index << 3 + return sz[index] >> offset & 1 +} + +func setBit(sz []byte, offset uint32, val uint8) bool { + if val != 1 && val != 0 { + return false // error("invalid val") + } + + index := offset >> 3 + if index >= uint32(len(sz)) { + return false // error("overflow") + } + + offset -= index << 3 + if sz[index]>>offset&1 != val { + sz[index] ^= (1 << offset) + } + return true +} + +func (datas segBitInfoArray) Len() int { + return len(datas) +} + +func (datas segBitInfoArray) Less(i, j int) bool { + res := (datas)[i].Seq < (datas)[j].Seq + if !res && (datas)[i].Seq == (datas)[j].Seq { + res = (datas)[i].Off < (datas)[j].Off + } + return res +} + +func (datas segBitInfoArray) Swap(i, j int) { + datas[i], datas[j] = datas[j], datas[i] +} + +func (db *DB) bEncodeMetaKey(key []byte) []byte { + mk := make([]byte, len(key)+2) + mk[0] = db.index + mk[1] = BitMetaType + + copy(mk[2:], key) + return mk +} + +func (db *DB) bDecodeMetaKey(bkey []byte) ([]byte, error) { + if len(bkey) < 2 || bkey[0] != db.index || bkey[1] != BitMetaType { + return nil, errBinKey + } + + return bkey[2:], nil +} + +func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { + bk := make([]byte, len(key)+8) + + pos := 0 + bk[pos] = db.index + pos++ + bk[pos] = BitType + pos++ + + binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) + pos += 2 + + copy(bk[pos:], key) + pos += len(key) + + binary.BigEndian.PutUint32(bk[pos:], seq) + + return bk +} + +func (db *DB) bDecodeBinKey(bkey []byte) (key []byte, seq uint32, err error) { + if len(bkey) < 8 || bkey[0] != db.index { + err = errBinKey + return + } + + keyLen := binary.BigEndian.Uint16(bkey[2:4]) + if int(keyLen+8) != len(bkey) { + err = errBinKey + return + } + + key = bkey[4 : 4+keyLen] + seq = uint32(binary.BigEndian.Uint32(bkey[4+keyLen:])) + return +} + +func (db *DB) bCapByteSize(seq uint32, off uint32) uint32 { + var offByteSize uint32 = (off >> 3) + 1 + if offByteSize > segByteSize { + offByteSize = segByteSize + } + + return seq<<segByteWidth + offByteSize +} + +func (db *DB) bParseOffset(key []byte, offset int32) (seq uint32, off uint32, err error) { + if offset < 0 { + if tailSeq, tailOff, e := db.bGetMeta(key); e != nil { + err = e + return + } else if tailSeq >= 0 { + offset += int32((uint32(tailSeq)<<segBitWidth | uint32(tailOff)) + 1) + if offset < 0 { + err = errOffset + return + } + } + } + + off = uint32(offset) + + seq = off >> segBitWidth + off &= (segBitSize - 1) + return +} + +func (db *DB) bGetMeta(key []byte) (tailSeq int32, tailOff int32, err error) { + var v []byte + + mk := db.bEncodeMetaKey(key) + v, err = db.bucket.Get(mk) + if err != nil { + return + } + + if v != nil { + tailSeq = int32(binary.LittleEndian.Uint32(v[0:4])) + tailOff = int32(binary.LittleEndian.Uint32(v[4:8])) + } else { + tailSeq = -1 + tailOff = -1 + } + return +} + +func (db *DB) bSetMeta(t *batch, key []byte, tailSeq uint32, tailOff uint32) { + ek := db.bEncodeMetaKey(key) + + buf := make([]byte, 8) + binary.LittleEndian.PutUint32(buf[0:4], tailSeq) + binary.LittleEndian.PutUint32(buf[4:8], tailOff) + + t.Put(ek, buf) + return +} + +func (db *DB) bUpdateMeta(t *batch, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { + var tseq, toff int32 + var update bool = false + + if tseq, toff, err = db.bGetMeta(key); err != nil { + return + } else if tseq < 0 { + update = true + } else { + tailSeq = uint32(MaxInt32(tseq, 0)) + tailOff = uint32(MaxInt32(toff, 0)) + update = (seq > tailSeq || (seq == tailSeq && off > tailOff)) + } + + if update { + db.bSetMeta(t, key, seq, off) + tailSeq = seq + tailOff = off + } + return +} + +func (db *DB) bDelete(t *batch, key []byte) (drop int64) { + mk := db.bEncodeMetaKey(key) + t.Delete(mk) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, maxSeq) + it := db.bucket.RangeIterator(minKey, maxKey, store.RangeClose) + for ; it.Valid(); it.Next() { + t.Delete(it.RawKey()) + drop++ + } + it.Close() + + return drop +} + +func (db *DB) bGetSegment(key []byte, seq uint32) ([]byte, []byte, error) { + bk := db.bEncodeBinKey(key, seq) + segment, err := db.bucket.Get(bk) + if err != nil { + return bk, nil, err + } + return bk, segment, nil +} + +func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { + bk, segment, err := db.bGetSegment(key, seq) + if err == nil && segment == nil { + segment = make([]byte, segByteSize, segByteSize) + } + return bk, segment, err +} + +func (db *DB) bIterator(key []byte) *store.RangeLimitIterator { + sk := db.bEncodeBinKey(key, minSeq) + ek := db.bEncodeBinKey(key, maxSeq) + return db.bucket.RangeIterator(sk, ek, store.RangeClose) +} + +func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) { + if a == nil || b == nil { + *res = nil + return + } + + data := *res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] & b[i] + } + return +} + +func (db *DB) bSegOr(a []byte, b []byte, res *[]byte) { + if a == nil || b == nil { + if a == nil && b == nil { + *res = nil + } else if a == nil { + *res = b + } else { + *res = a + } + return + } + + data := *res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] | b[i] + } + return +} + +func (db *DB) bSegXor(a []byte, b []byte, res *[]byte) { + if a == nil && b == nil { + *res = fillSegment + return + } + + if a == nil { + a = emptySegment + } + + if b == nil { + b = emptySegment + } + + data := *res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] ^ b[i] + } + + return +} + +func (db *DB) bExpireAt(key []byte, when int64) (int64, error) { + t := db.binBatch + t.Lock() + defer t.Unlock() + + if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { + return 0, err + } else { + db.expireAt(t, BitType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) bCountByte(val byte, soff uint32, eoff uint32) int32 { + if soff > eoff { + soff, eoff = eoff, soff + } + + mask := uint8(0) + if soff > 0 { + mask |= fillBits[soff-1] + } + if eoff < 7 { + mask |= (fillBits[7] ^ fillBits[eoff]) + } + mask = fillBits[7] ^ mask + + return bitsInByte[val&mask] +} + +func (db *DB) bCountSeg(key []byte, seq uint32, soff uint32, eoff uint32) (cnt int32, err error) { + if soff >= segBitSize || soff < 0 || + eoff >= segBitSize || eoff < 0 { + return + } + + var segment []byte + if _, segment, err = db.bGetSegment(key, seq); err != nil { + return + } + + if segment == nil { + return + } + + if soff > eoff { + soff, eoff = eoff, soff + } + + headIdx := int(soff >> 3) + endIdx := int(eoff >> 3) + sByteOff := soff - ((soff >> 3) << 3) + eByteOff := eoff - ((eoff >> 3) << 3) + + if headIdx == endIdx { + cnt = db.bCountByte(segment[headIdx], sByteOff, eByteOff) + } else { + cnt = db.bCountByte(segment[headIdx], sByteOff, 7) + + db.bCountByte(segment[endIdx], 0, eByteOff) + } + + // sum up following bytes + for idx, end := headIdx+1, endIdx-1; idx <= end; idx += 1 { + cnt += bitsInByte[segment[idx]] + if idx == end { + break + } + } + + return +} + +func (db *DB) BGet(key []byte) (data []byte, err error) { + if err = checkKeySize(key); err != nil { + return + } + + var ts, to int32 + if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 { + return + } + + var tailSeq, tailOff = uint32(ts), uint32(to) + var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff) + data = make([]byte, capByteSize, capByteSize) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, tailSeq) + it := db.bucket.RangeIterator(minKey, maxKey, store.RangeClose) + + var seq, s, e uint32 + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil { + data = nil + break + } + + s = seq << segByteWidth + e = MinUInt32(s+segByteSize, capByteSize) + copy(data[s:e], it.RawValue()) + } + it.Close() + + return +} + +func (db *DB) BDelete(key []byte) (drop int64, err error) { + if err = checkKeySize(key); err != nil { + return + } + + t := db.binBatch + t.Lock() + defer t.Unlock() + + drop = db.bDelete(t, key) + db.rmExpire(t, BitType, key) + + err = t.Commit() + return +} + +func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) { + if err = checkKeySize(key); err != nil { + return + } + + // todo : check offset + var seq, off uint32 + if seq, off, err = db.bParseOffset(key, offset); err != nil { + return 0, err + } + + var bk, segment []byte + if bk, segment, err = db.bAllocateSegment(key, seq); err != nil { + return 0, err + } + + if segment != nil { + ori = getBit(segment, off) + if setBit(segment, off, val) { + t := db.binBatch + t.Lock() + defer t.Unlock() + + t.Put(bk, segment) + if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil { + err = e + return + } + + err = t.Commit() + } + } + + return +} + +func (db *DB) BMSetBit(key []byte, args ...BitPair) (place int64, err error) { + if err = checkKeySize(key); err != nil { + return + } + + // (ps : so as to aviod wasting memory copy while calling db.Get() and batch.Put(), + // here we sequence the params by pos, so that we can merge the execution of + // diff pos setting which targets on the same segment respectively. ) + + // #1 : sequence request data + var argCnt = len(args) + var bitInfos segBitInfoArray = make(segBitInfoArray, argCnt) + var seq, off uint32 + + for i, info := range args { + if seq, off, err = db.bParseOffset(key, info.Pos); err != nil { + return + } + + bitInfos[i].Seq = seq + bitInfos[i].Off = off + bitInfos[i].Val = info.Val + } + + sort.Sort(bitInfos) + + for i := 1; i < argCnt; i++ { + if bitInfos[i].Seq == bitInfos[i-1].Seq && bitInfos[i].Off == bitInfos[i-1].Off { + return 0, errDuplicatePos + } + } + + // #2 : execute bit set in order + t := db.binBatch + t.Lock() + defer t.Unlock() + + var curBinKey, curSeg []byte + var curSeq, maxSeq, maxOff uint32 + + for _, info := range bitInfos { + if curSeg != nil && info.Seq != curSeq { + t.Put(curBinKey, curSeg) + curSeg = nil + } + + if curSeg == nil { + curSeq = info.Seq + if curBinKey, curSeg, err = db.bAllocateSegment(key, info.Seq); err != nil { + return + } + + if curSeg == nil { + continue + } + } + + if setBit(curSeg, info.Off, info.Val) { + maxSeq = info.Seq + maxOff = info.Off + place++ + } + } + + if curSeg != nil { + t.Put(curBinKey, curSeg) + } + + // finally, update meta + if place > 0 { + if _, _, err = db.bUpdateMeta(t, key, maxSeq, maxOff); err != nil { + return + } + + err = t.Commit() + } + + return +} + +func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) { + if seq, off, err := db.bParseOffset(key, offset); err != nil { + return 0, err + } else { + _, segment, err := db.bGetSegment(key, seq) + if err != nil { + return 0, err + } + + if segment == nil { + return 0, nil + } else { + return getBit(segment, off), nil + } + } +} + +// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) { +// section := make([]byte) + +// return +// } + +func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) { + var sseq, soff uint32 + if sseq, soff, err = db.bParseOffset(key, start); err != nil { + return + } + + var eseq, eoff uint32 + if eseq, eoff, err = db.bParseOffset(key, end); err != nil { + return + } + + if sseq > eseq || (sseq == eseq && soff > eoff) { + sseq, eseq = eseq, sseq + soff, eoff = eoff, soff + } + + var segCnt int32 + if eseq == sseq { + if segCnt, err = db.bCountSeg(key, sseq, soff, eoff); err != nil { + return 0, err + } + + cnt = segCnt + + } else { + if segCnt, err = db.bCountSeg(key, sseq, soff, segBitSize-1); err != nil { + return 0, err + } else { + cnt += segCnt + } + + if segCnt, err = db.bCountSeg(key, eseq, 0, eoff); err != nil { + return 0, err + } else { + cnt += segCnt + } + } + + // middle segs + var segment []byte + skey := db.bEncodeBinKey(key, sseq) + ekey := db.bEncodeBinKey(key, eseq) + + it := db.bucket.RangeIterator(skey, ekey, store.RangeOpen) + for ; it.Valid(); it.Next() { + segment = it.RawValue() + for _, bt := range segment { + cnt += bitsInByte[bt] + } + } + it.Close() + + return +} + +func (db *DB) BTail(key []byte) (int32, error) { + // effective length of data, the highest bit-pos set in history + tailSeq, tailOff, err := db.bGetMeta(key) + if err != nil { + return 0, err + } + + tail := int32(-1) + if tailSeq >= 0 { + tail = int32(uint32(tailSeq)<<segBitWidth | uint32(tailOff)) + } + + return tail, nil +} + +func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32, err error) { + // blen - + // the total bit size of data stored in destination key, + // that is equal to the size of the longest input string. + + var exeOp func([]byte, []byte, *[]byte) + switch op { + case OPand: + exeOp = db.bSegAnd + case OPor: + exeOp = db.bSegOr + case OPxor, OPnot: + exeOp = db.bSegXor + default: + return + } + + if dstkey == nil || srckeys == nil { + return + } + + t := db.binBatch + t.Lock() + defer t.Unlock() + + var srcKseq, srcKoff int32 + var seq, off, maxDstSeq, maxDstOff uint32 + + var keyNum int = len(srckeys) + var validKeyNum int + for i := 0; i < keyNum; i++ { + if srcKseq, srcKoff, err = db.bGetMeta(srckeys[i]); err != nil { + return + } else if srcKseq < 0 { + srckeys[i] = nil + continue + } + + validKeyNum++ + + seq = uint32(srcKseq) + off = uint32(srcKoff) + if seq > maxDstSeq || (seq == maxDstSeq && off > maxDstOff) { + maxDstSeq = seq + maxDstOff = off + } + } + + if (op == OPnot && validKeyNum != 1) || + (op != OPnot && validKeyNum < 2) { + return // with not enough existing source key + } + + var srcIdx int + for srcIdx = 0; srcIdx < keyNum; srcIdx++ { + if srckeys[srcIdx] != nil { + break + } + } + + // init - data + var segments = make([][]byte, maxDstSeq+1) + + if op == OPnot { + // ps : + // ( ~num == num ^ 0x11111111 ) + // we init the result segments with all bit set, + // then we can calculate through the way of 'xor'. + + // ahead segments bin format : 1111 ... 1111 + for i := uint32(0); i < maxDstSeq; i++ { + segments[i] = fillSegment + } + + // last segment bin format : 1111..1100..0000 + var tailSeg = make([]byte, segByteSize, segByteSize) + var fillByte = fillBits[7] + var tailSegLen = db.bCapByteSize(uint32(0), maxDstOff) + for i := uint32(0); i < tailSegLen-1; i++ { + tailSeg[i] = fillByte + } + tailSeg[tailSegLen-1] = fillBits[maxDstOff-(tailSegLen-1)<<3] + segments[maxDstSeq] = tailSeg + + } else { + // ps : init segments by data corresponding to the 1st valid source key + it := db.bIterator(srckeys[srcIdx]) + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil { + // to do ... + it.Close() + return + } + segments[seq] = it.Value() + } + it.Close() + srcIdx++ + } + + // operation with following keys + var res []byte + for i := srcIdx; i < keyNum; i++ { + if srckeys[i] == nil { + continue + } + + it := db.bIterator(srckeys[i]) + for idx, end := uint32(0), false; !end; it.Next() { + end = !it.Valid() + if !end { + if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil { + // to do ... + it.Close() + return + } + } else { + seq = maxDstSeq + 1 + } + + // todo : + // operation 'and' can be optimize here : + // if seq > max_segments_idx, this loop can be break, + // which can avoid cost from Key() and bDecodeBinKey() + + for ; idx < seq; idx++ { + res = nil + exeOp(segments[idx], nil, &res) + segments[idx] = res + } + + if !end { + res = it.Value() + exeOp(segments[seq], res, &res) + segments[seq] = res + idx++ + } + } + it.Close() + } + + // clear the old data in case + db.bDelete(t, dstkey) + db.rmExpire(t, BitType, dstkey) + + // set data + db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) + + var bk []byte + for seq, segt := range segments { + if segt != nil { + bk = db.bEncodeBinKey(dstkey, uint32(seq)) + t.Put(bk, segt) + } + } + + err = t.Commit() + if err == nil { + // blen = int32(db.bCapByteSize(maxDstOff, maxDstOff)) + blen = int32(maxDstSeq<<segBitWidth | maxDstOff + 1) + } + + return +} + +func (db *DB) BExpire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.bExpireAt(key, time.Now().Unix()+duration) +} + +func (db *DB) BExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.bExpireAt(key, when) +} + +func (db *DB) BTTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(BitType, key) +} + +func (db *DB) BPersist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.binBatch + t.Lock() + defer t.Unlock() + + n, err := db.rmExpire(t, BitType, key) + if err != nil { + return 0, err + } + + err = t.Commit() + return n, err +} + +func (db *DB) BScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(BitMetaType, key, count, inclusive, match) +} + +func (db *DB) bFlush() (drop int64, err error) { + t := db.binBatch + t.Lock() + defer t.Unlock() + + return db.flushType(t, BitType) +} diff --git a/vendor/github.com/lunny/nodb/t_hash.go b/vendor/github.com/lunny/nodb/t_hash.go new file mode 100644 index 0000000000..bedfbf7c3e --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_hash.go @@ -0,0 +1,509 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "time" + + "github.com/lunny/nodb/store" +) + +type FVPair struct { + Field []byte + Value []byte +} + +var errHashKey = errors.New("invalid hash key") +var errHSizeKey = errors.New("invalid hsize key") + +const ( + hashStartSep byte = ':' + hashStopSep byte = hashStartSep + 1 +) + +func checkHashKFSize(key []byte, field []byte) error { + if len(key) > MaxKeySize || len(key) == 0 { + return errKeySize + } else if len(field) > MaxHashFieldSize || len(field) == 0 { + return errHashFieldSize + } + return nil +} + +func (db *DB) hEncodeSizeKey(key []byte) []byte { + buf := make([]byte, len(key)+2) + + buf[0] = db.index + buf[1] = HSizeType + + copy(buf[2:], key) + return buf +} + +func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != HSizeType { + return nil, errHSizeKey + } + + return ek[2:], nil +} + +func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte { + buf := make([]byte, len(key)+len(field)+1+1+2+1) + + pos := 0 + buf[pos] = db.index + pos++ + buf[pos] = HashType + pos++ + + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + + copy(buf[pos:], key) + pos += len(key) + + buf[pos] = hashStartSep + pos++ + copy(buf[pos:], field) + + return buf +} + +func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) { + if len(ek) < 5 || ek[0] != db.index || ek[1] != HashType { + return nil, nil, errHashKey + } + + pos := 2 + keyLen := int(binary.BigEndian.Uint16(ek[pos:])) + pos += 2 + + if keyLen+5 > len(ek) { + return nil, nil, errHashKey + } + + key := ek[pos : pos+keyLen] + pos += keyLen + + if ek[pos] != hashStartSep { + return nil, nil, errHashKey + } + + pos++ + field := ek[pos:] + return key, field, nil +} + +func (db *DB) hEncodeStartKey(key []byte) []byte { + return db.hEncodeHashKey(key, nil) +} + +func (db *DB) hEncodeStopKey(key []byte) []byte { + k := db.hEncodeHashKey(key, nil) + + k[len(k)-1] = hashStopSep + + return k +} + +func (db *DB) hSetItem(key []byte, field []byte, value []byte) (int64, error) { + t := db.hashBatch + + ek := db.hEncodeHashKey(key, field) + + var n int64 = 1 + if v, _ := db.bucket.Get(ek); v != nil { + n = 0 + } else { + if _, err := db.hIncrSize(key, 1); err != nil { + return 0, err + } + } + + t.Put(ek, value) + return n, nil +} + +// ps : here just focus on deleting the hash data, +// any other likes expire is ignore. +func (db *DB) hDelete(t *batch, key []byte) int64 { + sk := db.hEncodeSizeKey(key) + start := db.hEncodeStartKey(key) + stop := db.hEncodeStopKey(key) + + var num int64 = 0 + it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + num++ + } + it.Close() + + t.Delete(sk) + return num +} + +func (db *DB) hExpireAt(key []byte, when int64) (int64, error) { + t := db.hashBatch + t.Lock() + defer t.Unlock() + + if hlen, err := db.HLen(key); err != nil || hlen == 0 { + return 0, err + } else { + db.expireAt(t, HashType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) HLen(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + return Int64(db.bucket.Get(db.hEncodeSizeKey(key))) +} + +func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { + if err := checkHashKFSize(key, field); err != nil { + return 0, err + } else if err := checkValueSize(value); err != nil { + return 0, err + } + + t := db.hashBatch + t.Lock() + defer t.Unlock() + + n, err := db.hSetItem(key, field, value) + if err != nil { + return 0, err + } + + //todo add binlog + + err = t.Commit() + return n, err +} + +func (db *DB) HGet(key []byte, field []byte) ([]byte, error) { + if err := checkHashKFSize(key, field); err != nil { + return nil, err + } + + return db.bucket.Get(db.hEncodeHashKey(key, field)) +} + +func (db *DB) HMset(key []byte, args ...FVPair) error { + t := db.hashBatch + t.Lock() + defer t.Unlock() + + var err error + var ek []byte + var num int64 = 0 + for i := 0; i < len(args); i++ { + if err := checkHashKFSize(key, args[i].Field); err != nil { + return err + } else if err := checkValueSize(args[i].Value); err != nil { + return err + } + + ek = db.hEncodeHashKey(key, args[i].Field) + + if v, err := db.bucket.Get(ek); err != nil { + return err + } else if v == nil { + num++ + } + + t.Put(ek, args[i].Value) + } + + if _, err = db.hIncrSize(key, num); err != nil { + return err + } + + //todo add binglog + err = t.Commit() + return err +} + +func (db *DB) HMget(key []byte, args ...[]byte) ([][]byte, error) { + var ek []byte + + it := db.bucket.NewIterator() + defer it.Close() + + r := make([][]byte, len(args)) + for i := 0; i < len(args); i++ { + if err := checkHashKFSize(key, args[i]); err != nil { + return nil, err + } + + ek = db.hEncodeHashKey(key, args[i]) + + r[i] = it.Find(ek) + } + + return r, nil +} + +func (db *DB) HDel(key []byte, args ...[]byte) (int64, error) { + t := db.hashBatch + + var ek []byte + var v []byte + var err error + + t.Lock() + defer t.Unlock() + + it := db.bucket.NewIterator() + defer it.Close() + + var num int64 = 0 + for i := 0; i < len(args); i++ { + if err := checkHashKFSize(key, args[i]); err != nil { + return 0, err + } + + ek = db.hEncodeHashKey(key, args[i]) + + v = it.RawFind(ek) + if v == nil { + continue + } else { + num++ + t.Delete(ek) + } + } + + if _, err = db.hIncrSize(key, -num); err != nil { + return 0, err + } + + err = t.Commit() + + return num, err +} + +func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) { + t := db.hashBatch + sk := db.hEncodeSizeKey(key) + + var err error + var size int64 = 0 + if size, err = Int64(db.bucket.Get(sk)); err != nil { + return 0, err + } else { + size += delta + if size <= 0 { + size = 0 + t.Delete(sk) + db.rmExpire(t, HashType, key) + } else { + t.Put(sk, PutInt64(size)) + } + } + + return size, nil +} + +func (db *DB) HIncrBy(key []byte, field []byte, delta int64) (int64, error) { + if err := checkHashKFSize(key, field); err != nil { + return 0, err + } + + t := db.hashBatch + var ek []byte + var err error + + t.Lock() + defer t.Unlock() + + ek = db.hEncodeHashKey(key, field) + + var n int64 = 0 + if n, err = StrInt64(db.bucket.Get(ek)); err != nil { + return 0, err + } + + n += delta + + _, err = db.hSetItem(key, field, StrPutInt64(n)) + if err != nil { + return 0, err + } + + err = t.Commit() + + return n, err +} + +func (db *DB) HGetAll(key []byte) ([]FVPair, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + start := db.hEncodeStartKey(key) + stop := db.hEncodeStopKey(key) + + v := make([]FVPair, 0, 16) + + it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + _, f, err := db.hDecodeHashKey(it.Key()) + if err != nil { + return nil, err + } + + v = append(v, FVPair{Field: f, Value: it.Value()}) + } + + it.Close() + + return v, nil +} + +func (db *DB) HKeys(key []byte) ([][]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + start := db.hEncodeStartKey(key) + stop := db.hEncodeStopKey(key) + + v := make([][]byte, 0, 16) + + it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + _, f, err := db.hDecodeHashKey(it.Key()) + if err != nil { + return nil, err + } + v = append(v, f) + } + + it.Close() + + return v, nil +} + +func (db *DB) HValues(key []byte) ([][]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + start := db.hEncodeStartKey(key) + stop := db.hEncodeStopKey(key) + + v := make([][]byte, 0, 16) + + it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + _, _, err := db.hDecodeHashKey(it.Key()) + if err != nil { + return nil, err + } + + v = append(v, it.Value()) + } + + it.Close() + + return v, nil +} + +func (db *DB) HClear(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.hashBatch + t.Lock() + defer t.Unlock() + + num := db.hDelete(t, key) + db.rmExpire(t, HashType, key) + + err := t.Commit() + return num, err +} + +func (db *DB) HMclear(keys ...[]byte) (int64, error) { + t := db.hashBatch + t.Lock() + defer t.Unlock() + + for _, key := range keys { + if err := checkKeySize(key); err != nil { + return 0, err + } + + db.hDelete(t, key) + db.rmExpire(t, HashType, key) + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (db *DB) hFlush() (drop int64, err error) { + t := db.hashBatch + + t.Lock() + defer t.Unlock() + + return db.flushType(t, HashType) +} + +func (db *DB) HScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(HSizeType, key, count, inclusive, match) +} + +func (db *DB) HExpire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.hExpireAt(key, time.Now().Unix()+duration) +} + +func (db *DB) HExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.hExpireAt(key, when) +} + +func (db *DB) HTTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(HashType, key) +} + +func (db *DB) HPersist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.hashBatch + t.Lock() + defer t.Unlock() + + n, err := db.rmExpire(t, HashType, key) + if err != nil { + return 0, err + } + + err = t.Commit() + return n, err +} diff --git a/vendor/github.com/lunny/nodb/t_kv.go b/vendor/github.com/lunny/nodb/t_kv.go new file mode 100644 index 0000000000..82a12f7027 --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_kv.go @@ -0,0 +1,387 @@ +package nodb + +import ( + "errors" + "time" +) + +type KVPair struct { + Key []byte + Value []byte +} + +var errKVKey = errors.New("invalid encode kv key") + +func checkKeySize(key []byte) error { + if len(key) > MaxKeySize || len(key) == 0 { + return errKeySize + } + return nil +} + +func checkValueSize(value []byte) error { + if len(value) > MaxValueSize { + return errValueSize + } + + return nil +} + +func (db *DB) encodeKVKey(key []byte) []byte { + ek := make([]byte, len(key)+2) + ek[0] = db.index + ek[1] = KVType + copy(ek[2:], key) + return ek +} + +func (db *DB) decodeKVKey(ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType { + return nil, errKVKey + } + + return ek[2:], nil +} + +func (db *DB) encodeKVMinKey() []byte { + ek := db.encodeKVKey(nil) + return ek +} + +func (db *DB) encodeKVMaxKey() []byte { + ek := db.encodeKVKey(nil) + ek[len(ek)-1] = KVType + 1 + return ek +} + +func (db *DB) incr(key []byte, delta int64) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + var err error + key = db.encodeKVKey(key) + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + var n int64 + n, err = StrInt64(db.bucket.Get(key)) + if err != nil { + return 0, err + } + + n += delta + + t.Put(key, StrPutInt64(n)) + + //todo binlog + + err = t.Commit() + return n, err +} + +// ps : here just focus on deleting the key-value data, +// any other likes expire is ignore. +func (db *DB) delete(t *batch, key []byte) int64 { + key = db.encodeKVKey(key) + t.Delete(key) + return 1 +} + +func (db *DB) setExpireAt(key []byte, when int64) (int64, error) { + t := db.kvBatch + t.Lock() + defer t.Unlock() + + if exist, err := db.Exists(key); err != nil || exist == 0 { + return 0, err + } else { + db.expireAt(t, KVType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) Decr(key []byte) (int64, error) { + return db.incr(key, -1) +} + +func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) { + return db.incr(key, -decrement) +} + +func (db *DB) Del(keys ...[]byte) (int64, error) { + if len(keys) == 0 { + return 0, nil + } + + codedKeys := make([][]byte, len(keys)) + for i, k := range keys { + codedKeys[i] = db.encodeKVKey(k) + } + + t := db.kvBatch + t.Lock() + defer t.Unlock() + + for i, k := range keys { + t.Delete(codedKeys[i]) + db.rmExpire(t, KVType, k) + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (db *DB) Exists(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + var err error + key = db.encodeKVKey(key) + + var v []byte + v, err = db.bucket.Get(key) + if v != nil && err == nil { + return 1, nil + } + + return 0, err +} + +func (db *DB) Get(key []byte) ([]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + key = db.encodeKVKey(key) + + return db.bucket.Get(key) +} + +func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } else if err := checkValueSize(value); err != nil { + return nil, err + } + + key = db.encodeKVKey(key) + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + oldValue, err := db.bucket.Get(key) + if err != nil { + return nil, err + } + + t.Put(key, value) + //todo, binlog + + err = t.Commit() + + return oldValue, err +} + +func (db *DB) Incr(key []byte) (int64, error) { + return db.incr(key, 1) +} + +func (db *DB) IncrBy(key []byte, increment int64) (int64, error) { + return db.incr(key, increment) +} + +func (db *DB) MGet(keys ...[]byte) ([][]byte, error) { + values := make([][]byte, len(keys)) + + it := db.bucket.NewIterator() + defer it.Close() + + for i := range keys { + if err := checkKeySize(keys[i]); err != nil { + return nil, err + } + + values[i] = it.Find(db.encodeKVKey(keys[i])) + } + + return values, nil +} + +func (db *DB) MSet(args ...KVPair) error { + if len(args) == 0 { + return nil + } + + t := db.kvBatch + + var err error + var key []byte + var value []byte + + t.Lock() + defer t.Unlock() + + for i := 0; i < len(args); i++ { + if err := checkKeySize(args[i].Key); err != nil { + return err + } else if err := checkValueSize(args[i].Value); err != nil { + return err + } + + key = db.encodeKVKey(args[i].Key) + + value = args[i].Value + + t.Put(key, value) + + //todo binlog + } + + err = t.Commit() + return err +} + +func (db *DB) Set(key []byte, value []byte) error { + if err := checkKeySize(key); err != nil { + return err + } else if err := checkValueSize(value); err != nil { + return err + } + + var err error + key = db.encodeKVKey(key) + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + t.Put(key, value) + + err = t.Commit() + + return err +} + +func (db *DB) SetNX(key []byte, value []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } else if err := checkValueSize(value); err != nil { + return 0, err + } + + var err error + key = db.encodeKVKey(key) + + var n int64 = 1 + + t := db.kvBatch + + t.Lock() + defer t.Unlock() + + if v, err := db.bucket.Get(key); err != nil { + return 0, err + } else if v != nil { + n = 0 + } else { + t.Put(key, value) + + //todo binlog + + err = t.Commit() + } + + return n, err +} + +func (db *DB) flush() (drop int64, err error) { + t := db.kvBatch + t.Lock() + defer t.Unlock() + return db.flushType(t, KVType) +} + +//if inclusive is true, scan range [key, inf) else (key, inf) +func (db *DB) Scan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(KVType, key, count, inclusive, match) +} + +func (db *DB) Expire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.setExpireAt(key, time.Now().Unix()+duration) +} + +func (db *DB) ExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.setExpireAt(key, when) +} + +func (db *DB) TTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(KVType, key) +} + +func (db *DB) Persist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.kvBatch + t.Lock() + defer t.Unlock() + n, err := db.rmExpire(t, KVType, key) + if err != nil { + return 0, err + } + + err = t.Commit() + return n, err +} + +func (db *DB) Lock() { + t := db.kvBatch + t.Lock() +} + +func (db *DB) Remove(key []byte) bool { + if len(key) == 0 { + return false + } + t := db.kvBatch + t.Delete(db.encodeKVKey(key)) + _, err := db.rmExpire(t, KVType, key) + if err != nil { + return false + } + return true +} + +func (db *DB) Commit() error { + t := db.kvBatch + return t.Commit() +} + +func (db *DB) Unlock() { + t := db.kvBatch + t.Unlock() +} diff --git a/vendor/github.com/lunny/nodb/t_list.go b/vendor/github.com/lunny/nodb/t_list.go new file mode 100644 index 0000000000..5b9d9d9c21 --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_list.go @@ -0,0 +1,492 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "time" + + "github.com/lunny/nodb/store" +) + +const ( + listHeadSeq int32 = 1 + listTailSeq int32 = 2 + + listMinSeq int32 = 1000 + listMaxSeq int32 = 1<<31 - 1000 + listInitialSeq int32 = listMinSeq + (listMaxSeq-listMinSeq)/2 +) + +var errLMetaKey = errors.New("invalid lmeta key") +var errListKey = errors.New("invalid list key") +var errListSeq = errors.New("invalid list sequence, overflow") + +func (db *DB) lEncodeMetaKey(key []byte) []byte { + buf := make([]byte, len(key)+2) + buf[0] = db.index + buf[1] = LMetaType + + copy(buf[2:], key) + return buf +} + +func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType { + return nil, errLMetaKey + } + + return ek[2:], nil +} + +func (db *DB) lEncodeListKey(key []byte, seq int32) []byte { + buf := make([]byte, len(key)+8) + + pos := 0 + buf[pos] = db.index + pos++ + buf[pos] = ListType + pos++ + + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + + copy(buf[pos:], key) + pos += len(key) + + binary.BigEndian.PutUint32(buf[pos:], uint32(seq)) + + return buf +} + +func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) { + if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType { + err = errListKey + return + } + + keyLen := int(binary.BigEndian.Uint16(ek[2:])) + if keyLen+8 != len(ek) { + err = errListKey + return + } + + key = ek[4 : 4+keyLen] + seq = int32(binary.BigEndian.Uint32(ek[4+keyLen:])) + return +} + +func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + var headSeq int32 + var tailSeq int32 + var size int32 + var err error + + t := db.listBatch + t.Lock() + defer t.Unlock() + + metaKey := db.lEncodeMetaKey(key) + headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey) + if err != nil { + return 0, err + } + + var pushCnt int = len(args) + if pushCnt == 0 { + return int64(size), nil + } + + var seq int32 = headSeq + var delta int32 = -1 + if whereSeq == listTailSeq { + seq = tailSeq + delta = 1 + } + + // append elements + if size > 0 { + seq += delta + } + + for i := 0; i < pushCnt; i++ { + ek := db.lEncodeListKey(key, seq+int32(i)*delta) + t.Put(ek, args[i]) + } + + seq += int32(pushCnt-1) * delta + if seq <= listMinSeq || seq >= listMaxSeq { + return 0, errListSeq + } + + // set meta info + if whereSeq == listHeadSeq { + headSeq = seq + } else { + tailSeq = seq + } + + db.lSetMeta(metaKey, headSeq, tailSeq) + + err = t.Commit() + return int64(size) + int64(pushCnt), err +} + +func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + t := db.listBatch + t.Lock() + defer t.Unlock() + + var headSeq int32 + var tailSeq int32 + var err error + + metaKey := db.lEncodeMetaKey(key) + headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey) + if err != nil { + return nil, err + } + + var value []byte + + var seq int32 = headSeq + if whereSeq == listTailSeq { + seq = tailSeq + } + + itemKey := db.lEncodeListKey(key, seq) + value, err = db.bucket.Get(itemKey) + if err != nil { + return nil, err + } + + if whereSeq == listHeadSeq { + headSeq += 1 + } else { + tailSeq -= 1 + } + + t.Delete(itemKey) + size := db.lSetMeta(metaKey, headSeq, tailSeq) + if size == 0 { + db.rmExpire(t, HashType, key) + } + + err = t.Commit() + return value, err +} + +// ps : here just focus on deleting the list data, +// any other likes expire is ignore. +func (db *DB) lDelete(t *batch, key []byte) int64 { + mk := db.lEncodeMetaKey(key) + + var headSeq int32 + var tailSeq int32 + var err error + + it := db.bucket.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, mk) + if err != nil { + return 0 + } + + var num int64 = 0 + startKey := db.lEncodeListKey(key, headSeq) + stopKey := db.lEncodeListKey(key, tailSeq) + + rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose}) + for ; rit.Valid(); rit.Next() { + t.Delete(rit.RawKey()) + num++ + } + + t.Delete(mk) + + return num +} + +func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { + var v []byte + if it != nil { + v = it.Find(ek) + } else { + v, err = db.bucket.Get(ek) + } + if err != nil { + return + } else if v == nil { + headSeq = listInitialSeq + tailSeq = listInitialSeq + size = 0 + return + } else { + headSeq = int32(binary.LittleEndian.Uint32(v[0:4])) + tailSeq = int32(binary.LittleEndian.Uint32(v[4:8])) + size = tailSeq - headSeq + 1 + } + return +} + +func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32) int32 { + t := db.listBatch + + var size int32 = tailSeq - headSeq + 1 + if size < 0 { + // todo : log error + panic + } else if size == 0 { + t.Delete(ek) + } else { + buf := make([]byte, 8) + + binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq)) + binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq)) + + t.Put(ek, buf) + } + + return size +} + +func (db *DB) lExpireAt(key []byte, when int64) (int64, error) { + t := db.listBatch + t.Lock() + defer t.Unlock() + + if llen, err := db.LLen(key); err != nil || llen == 0 { + return 0, err + } else { + db.expireAt(t, ListType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + var seq int32 + var headSeq int32 + var tailSeq int32 + var err error + + metaKey := db.lEncodeMetaKey(key) + + it := db.bucket.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey) + if err != nil { + return nil, err + } + + if index >= 0 { + seq = headSeq + index + } else { + seq = tailSeq + index + 1 + } + + sk := db.lEncodeListKey(key, seq) + v := it.Find(sk) + + return v, nil +} + +func (db *DB) LLen(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + ek := db.lEncodeMetaKey(key) + _, _, size, err := db.lGetMeta(nil, ek) + return int64(size), err +} + +func (db *DB) LPop(key []byte) ([]byte, error) { + return db.lpop(key, listHeadSeq) +} + +func (db *DB) LPush(key []byte, arg1 []byte, args ...[]byte) (int64, error) { + var argss = [][]byte{arg1} + argss = append(argss, args...) + return db.lpush(key, listHeadSeq, argss...) +} + +func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + var headSeq int32 + var llen int32 + var err error + + metaKey := db.lEncodeMetaKey(key) + + it := db.bucket.NewIterator() + defer it.Close() + + if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil { + return nil, err + } + + if start < 0 { + start = llen + start + } + if stop < 0 { + stop = llen + stop + } + if start < 0 { + start = 0 + } + + if start > stop || start >= llen { + return [][]byte{}, nil + } + + if stop >= llen { + stop = llen - 1 + } + + limit := (stop - start) + 1 + headSeq += start + + v := make([][]byte, 0, limit) + + startKey := db.lEncodeListKey(key, headSeq) + rit := store.NewRangeLimitIterator(it, + &store.Range{ + Min: startKey, + Max: nil, + Type: store.RangeClose}, + &store.Limit{ + Offset: 0, + Count: int(limit)}) + + for ; rit.Valid(); rit.Next() { + v = append(v, rit.Value()) + } + + return v, nil +} + +func (db *DB) RPop(key []byte) ([]byte, error) { + return db.lpop(key, listTailSeq) +} + +func (db *DB) RPush(key []byte, arg1 []byte, args ...[]byte) (int64, error) { + var argss = [][]byte{arg1} + argss = append(argss, args...) + return db.lpush(key, listTailSeq, argss...) +} + +func (db *DB) LClear(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.listBatch + t.Lock() + defer t.Unlock() + + num := db.lDelete(t, key) + db.rmExpire(t, ListType, key) + + err := t.Commit() + return num, err +} + +func (db *DB) LMclear(keys ...[]byte) (int64, error) { + t := db.listBatch + t.Lock() + defer t.Unlock() + + for _, key := range keys { + if err := checkKeySize(key); err != nil { + return 0, err + } + + db.lDelete(t, key) + db.rmExpire(t, ListType, key) + + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (db *DB) lFlush() (drop int64, err error) { + t := db.listBatch + t.Lock() + defer t.Unlock() + return db.flushType(t, ListType) +} + +func (db *DB) LExpire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.lExpireAt(key, time.Now().Unix()+duration) +} + +func (db *DB) LExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.lExpireAt(key, when) +} + +func (db *DB) LTTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(ListType, key) +} + +func (db *DB) LPersist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.listBatch + t.Lock() + defer t.Unlock() + + n, err := db.rmExpire(t, ListType, key) + if err != nil { + return 0, err + } + + err = t.Commit() + return n, err +} + +func (db *DB) LScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(LMetaType, key, count, inclusive, match) +} + +func (db *DB) lEncodeMinKey() []byte { + return db.lEncodeMetaKey(nil) +} + +func (db *DB) lEncodeMaxKey() []byte { + ek := db.lEncodeMetaKey(nil) + ek[len(ek)-1] = LMetaType + 1 + return ek +} diff --git a/vendor/github.com/lunny/nodb/t_set.go b/vendor/github.com/lunny/nodb/t_set.go new file mode 100644 index 0000000000..41ce30e8ce --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_set.go @@ -0,0 +1,601 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "time" + + "github.com/lunny/nodb/store" +) + +var errSetKey = errors.New("invalid set key") +var errSSizeKey = errors.New("invalid ssize key") + +const ( + setStartSep byte = ':' + setStopSep byte = setStartSep + 1 + UnionType byte = 51 + DiffType byte = 52 + InterType byte = 53 +) + +func checkSetKMSize(key []byte, member []byte) error { + if len(key) > MaxKeySize || len(key) == 0 { + return errKeySize + } else if len(member) > MaxSetMemberSize || len(member) == 0 { + return errSetMemberSize + } + return nil +} + +func (db *DB) sEncodeSizeKey(key []byte) []byte { + buf := make([]byte, len(key)+2) + + buf[0] = db.index + buf[1] = SSizeType + + copy(buf[2:], key) + return buf +} + +func (db *DB) sDecodeSizeKey(ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != SSizeType { + return nil, errSSizeKey + } + + return ek[2:], nil +} + +func (db *DB) sEncodeSetKey(key []byte, member []byte) []byte { + buf := make([]byte, len(key)+len(member)+1+1+2+1) + + pos := 0 + buf[pos] = db.index + pos++ + buf[pos] = SetType + pos++ + + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + + copy(buf[pos:], key) + pos += len(key) + + buf[pos] = setStartSep + pos++ + copy(buf[pos:], member) + + return buf +} + +func (db *DB) sDecodeSetKey(ek []byte) ([]byte, []byte, error) { + if len(ek) < 5 || ek[0] != db.index || ek[1] != SetType { + return nil, nil, errSetKey + } + + pos := 2 + keyLen := int(binary.BigEndian.Uint16(ek[pos:])) + pos += 2 + + if keyLen+5 > len(ek) { + return nil, nil, errSetKey + } + + key := ek[pos : pos+keyLen] + pos += keyLen + + if ek[pos] != hashStartSep { + return nil, nil, errSetKey + } + + pos++ + member := ek[pos:] + return key, member, nil +} + +func (db *DB) sEncodeStartKey(key []byte) []byte { + return db.sEncodeSetKey(key, nil) +} + +func (db *DB) sEncodeStopKey(key []byte) []byte { + k := db.sEncodeSetKey(key, nil) + + k[len(k)-1] = setStopSep + + return k +} + +func (db *DB) sFlush() (drop int64, err error) { + + t := db.setBatch + t.Lock() + defer t.Unlock() + + return db.flushType(t, SetType) +} + +func (db *DB) sDelete(t *batch, key []byte) int64 { + sk := db.sEncodeSizeKey(key) + start := db.sEncodeStartKey(key) + stop := db.sEncodeStopKey(key) + + var num int64 = 0 + it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + t.Delete(it.RawKey()) + num++ + } + + it.Close() + + t.Delete(sk) + return num +} + +func (db *DB) sIncrSize(key []byte, delta int64) (int64, error) { + t := db.setBatch + sk := db.sEncodeSizeKey(key) + + var err error + var size int64 = 0 + if size, err = Int64(db.bucket.Get(sk)); err != nil { + return 0, err + } else { + size += delta + if size <= 0 { + size = 0 + t.Delete(sk) + db.rmExpire(t, SetType, key) + } else { + t.Put(sk, PutInt64(size)) + } + } + + return size, nil +} + +func (db *DB) sExpireAt(key []byte, when int64) (int64, error) { + t := db.setBatch + t.Lock() + defer t.Unlock() + + if scnt, err := db.SCard(key); err != nil || scnt == 0 { + return 0, err + } else { + db.expireAt(t, SetType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + + } + + return 1, nil +} + +func (db *DB) sSetItem(key []byte, member []byte) (int64, error) { + t := db.setBatch + ek := db.sEncodeSetKey(key, member) + + var n int64 = 1 + if v, _ := db.bucket.Get(ek); v != nil { + n = 0 + } else { + if _, err := db.sIncrSize(key, 1); err != nil { + return 0, err + } + } + + t.Put(ek, nil) + return n, nil +} + +func (db *DB) SAdd(key []byte, args ...[]byte) (int64, error) { + t := db.setBatch + t.Lock() + defer t.Unlock() + + var err error + var ek []byte + var num int64 = 0 + for i := 0; i < len(args); i++ { + if err := checkSetKMSize(key, args[i]); err != nil { + return 0, err + } + + ek = db.sEncodeSetKey(key, args[i]) + + if v, err := db.bucket.Get(ek); err != nil { + return 0, err + } else if v == nil { + num++ + } + + t.Put(ek, nil) + } + + if _, err = db.sIncrSize(key, num); err != nil { + return 0, err + } + + err = t.Commit() + return num, err + +} + +func (db *DB) SCard(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + sk := db.sEncodeSizeKey(key) + + return Int64(db.bucket.Get(sk)) +} + +func (db *DB) sDiffGeneric(keys ...[]byte) ([][]byte, error) { + destMap := make(map[string]bool) + + members, err := db.SMembers(keys[0]) + if err != nil { + return nil, err + } + + for _, m := range members { + destMap[String(m)] = true + } + + for _, k := range keys[1:] { + members, err := db.SMembers(k) + if err != nil { + return nil, err + } + + for _, m := range members { + if _, ok := destMap[String(m)]; !ok { + continue + } else if ok { + delete(destMap, String(m)) + } + } + // O - A = O, O is zero set. + if len(destMap) == 0 { + return nil, nil + } + } + + slice := make([][]byte, len(destMap)) + idx := 0 + for k, v := range destMap { + if !v { + continue + } + slice[idx] = []byte(k) + idx++ + } + + return slice, nil +} + +func (db *DB) SDiff(keys ...[]byte) ([][]byte, error) { + v, err := db.sDiffGeneric(keys...) + return v, err +} + +func (db *DB) SDiffStore(dstKey []byte, keys ...[]byte) (int64, error) { + n, err := db.sStoreGeneric(dstKey, DiffType, keys...) + return n, err +} + +func (db *DB) sInterGeneric(keys ...[]byte) ([][]byte, error) { + destMap := make(map[string]bool) + + members, err := db.SMembers(keys[0]) + if err != nil { + return nil, err + } + + for _, m := range members { + destMap[String(m)] = true + } + + for _, key := range keys[1:] { + if err := checkKeySize(key); err != nil { + return nil, err + } + + members, err := db.SMembers(key) + if err != nil { + return nil, err + } else if len(members) == 0 { + return nil, err + } + + tempMap := make(map[string]bool) + for _, member := range members { + if err := checkKeySize(member); err != nil { + return nil, err + } + if _, ok := destMap[String(member)]; ok { + tempMap[String(member)] = true //mark this item as selected + } + } + destMap = tempMap //reduce the size of the result set + if len(destMap) == 0 { + return nil, nil + } + } + + slice := make([][]byte, len(destMap)) + idx := 0 + for k, v := range destMap { + if !v { + continue + } + + slice[idx] = []byte(k) + idx++ + } + + return slice, nil + +} + +func (db *DB) SInter(keys ...[]byte) ([][]byte, error) { + v, err := db.sInterGeneric(keys...) + return v, err + +} + +func (db *DB) SInterStore(dstKey []byte, keys ...[]byte) (int64, error) { + n, err := db.sStoreGeneric(dstKey, InterType, keys...) + return n, err +} + +func (db *DB) SIsMember(key []byte, member []byte) (int64, error) { + ek := db.sEncodeSetKey(key, member) + + var n int64 = 1 + if v, err := db.bucket.Get(ek); err != nil { + return 0, err + } else if v == nil { + n = 0 + } + return n, nil +} + +func (db *DB) SMembers(key []byte) ([][]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + start := db.sEncodeStartKey(key) + stop := db.sEncodeStopKey(key) + + v := make([][]byte, 0, 16) + + it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + _, m, err := db.sDecodeSetKey(it.Key()) + if err != nil { + return nil, err + } + + v = append(v, m) + } + + it.Close() + + return v, nil +} + +func (db *DB) SRem(key []byte, args ...[]byte) (int64, error) { + t := db.setBatch + t.Lock() + defer t.Unlock() + + var ek []byte + var v []byte + var err error + + it := db.bucket.NewIterator() + defer it.Close() + + var num int64 = 0 + for i := 0; i < len(args); i++ { + if err := checkSetKMSize(key, args[i]); err != nil { + return 0, err + } + + ek = db.sEncodeSetKey(key, args[i]) + + v = it.RawFind(ek) + if v == nil { + continue + } else { + num++ + t.Delete(ek) + } + } + + if _, err = db.sIncrSize(key, -num); err != nil { + return 0, err + } + + err = t.Commit() + return num, err + +} + +func (db *DB) sUnionGeneric(keys ...[]byte) ([][]byte, error) { + dstMap := make(map[string]bool) + + for _, key := range keys { + if err := checkKeySize(key); err != nil { + return nil, err + } + + members, err := db.SMembers(key) + if err != nil { + return nil, err + } + + for _, member := range members { + dstMap[String(member)] = true + } + } + + slice := make([][]byte, len(dstMap)) + idx := 0 + for k, v := range dstMap { + if !v { + continue + } + slice[idx] = []byte(k) + idx++ + } + + return slice, nil +} + +func (db *DB) SUnion(keys ...[]byte) ([][]byte, error) { + v, err := db.sUnionGeneric(keys...) + return v, err +} + +func (db *DB) SUnionStore(dstKey []byte, keys ...[]byte) (int64, error) { + n, err := db.sStoreGeneric(dstKey, UnionType, keys...) + return n, err +} + +func (db *DB) sStoreGeneric(dstKey []byte, optType byte, keys ...[]byte) (int64, error) { + if err := checkKeySize(dstKey); err != nil { + return 0, err + } + + t := db.setBatch + t.Lock() + defer t.Unlock() + + db.sDelete(t, dstKey) + + var err error + var ek []byte + var v [][]byte + + switch optType { + case UnionType: + v, err = db.sUnionGeneric(keys...) + case DiffType: + v, err = db.sDiffGeneric(keys...) + case InterType: + v, err = db.sInterGeneric(keys...) + } + + if err != nil { + return 0, err + } + + for _, m := range v { + if err := checkSetKMSize(dstKey, m); err != nil { + return 0, err + } + + ek = db.sEncodeSetKey(dstKey, m) + + if _, err := db.bucket.Get(ek); err != nil { + return 0, err + } + + t.Put(ek, nil) + } + + var num = int64(len(v)) + sk := db.sEncodeSizeKey(dstKey) + t.Put(sk, PutInt64(num)) + + if err = t.Commit(); err != nil { + return 0, err + } + return num, nil +} + +func (db *DB) SClear(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.setBatch + t.Lock() + defer t.Unlock() + + num := db.sDelete(t, key) + db.rmExpire(t, SetType, key) + + err := t.Commit() + return num, err +} + +func (db *DB) SMclear(keys ...[]byte) (int64, error) { + t := db.setBatch + t.Lock() + defer t.Unlock() + + for _, key := range keys { + if err := checkKeySize(key); err != nil { + return 0, err + } + + db.sDelete(t, key) + db.rmExpire(t, SetType, key) + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (db *DB) SExpire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.sExpireAt(key, time.Now().Unix()+duration) + +} + +func (db *DB) SExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.sExpireAt(key, when) + +} + +func (db *DB) STTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(SetType, key) +} + +func (db *DB) SPersist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.setBatch + t.Lock() + defer t.Unlock() + + n, err := db.rmExpire(t, SetType, key) + if err != nil { + return 0, err + } + err = t.Commit() + return n, err +} + +func (db *DB) SScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(SSizeType, key, count, inclusive, match) +} diff --git a/vendor/github.com/lunny/nodb/t_ttl.go b/vendor/github.com/lunny/nodb/t_ttl.go new file mode 100644 index 0000000000..5c3638891c --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_ttl.go @@ -0,0 +1,195 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "time" + + "github.com/lunny/nodb/store" +) + +var ( + errExpMetaKey = errors.New("invalid expire meta key") + errExpTimeKey = errors.New("invalid expire time key") +) + +type retireCallback func(*batch, []byte) int64 + +type elimination struct { + db *DB + exp2Tx []*batch + exp2Retire []retireCallback +} + +var errExpType = errors.New("invalid expire type") + +func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte { + buf := make([]byte, len(key)+11) + + buf[0] = db.index + buf[1] = ExpTimeType + buf[2] = dataType + pos := 3 + + binary.BigEndian.PutUint64(buf[pos:], uint64(when)) + pos += 8 + + copy(buf[pos:], key) + + return buf +} + +func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte { + buf := make([]byte, len(key)+3) + + buf[0] = db.index + buf[1] = ExpMetaType + buf[2] = dataType + pos := 3 + + copy(buf[pos:], key) + + return buf +} + +func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { + if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType { + return 0, nil, errExpMetaKey + } + + return mk[2], mk[3:], nil +} + +func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) { + if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType { + return 0, nil, 0, errExpTimeKey + } + + return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil +} + +func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) { + db.expireAt(t, dataType, key, time.Now().Unix()+duration) +} + +func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) { + mk := db.expEncodeMetaKey(dataType, key) + tk := db.expEncodeTimeKey(dataType, key, when) + + t.Put(tk, mk) + t.Put(mk, PutInt64(when)) +} + +func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { + mk := db.expEncodeMetaKey(dataType, key) + + if t, err = Int64(db.bucket.Get(mk)); err != nil || t == 0 { + t = -1 + } else { + t -= time.Now().Unix() + if t <= 0 { + t = -1 + } + // if t == -1 : to remove ???? + } + + return t, err +} + +func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) { + mk := db.expEncodeMetaKey(dataType, key) + if v, err := db.bucket.Get(mk); err != nil { + return 0, err + } else if v == nil { + return 0, nil + } else if when, err2 := Int64(v, nil); err2 != nil { + return 0, err2 + } else { + tk := db.expEncodeTimeKey(dataType, key, when) + t.Delete(mk) + t.Delete(tk) + return 1, nil + } +} + +func (db *DB) expFlush(t *batch, dataType byte) (err error) { + minKey := make([]byte, 3) + minKey[0] = db.index + minKey[1] = ExpTimeType + minKey[2] = dataType + + maxKey := make([]byte, 3) + maxKey[0] = db.index + maxKey[1] = ExpMetaType + maxKey[2] = dataType + 1 + + _, err = db.flushRegion(t, minKey, maxKey) + err = t.Commit() + return +} + +////////////////////////////////////////////////////////// +// +////////////////////////////////////////////////////////// + +func newEliminator(db *DB) *elimination { + eli := new(elimination) + eli.db = db + eli.exp2Tx = make([]*batch, maxDataType) + eli.exp2Retire = make([]retireCallback, maxDataType) + return eli +} + +func (eli *elimination) regRetireContext(dataType byte, t *batch, onRetire retireCallback) { + + // todo .. need to ensure exist - mapExpMetaType[expType] + + eli.exp2Tx[dataType] = t + eli.exp2Retire[dataType] = onRetire +} + +// call by outside ... (from *db to another *db) +func (eli *elimination) active() { + now := time.Now().Unix() + db := eli.db + dbGet := db.bucket.Get + + minKey := db.expEncodeTimeKey(NoneType, nil, 0) + maxKey := db.expEncodeTimeKey(maxDataType, nil, now) + + it := db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + tk := it.RawKey() + mk := it.RawValue() + + dt, k, _, err := db.expDecodeTimeKey(tk) + if err != nil { + continue + } + + t := eli.exp2Tx[dt] + onRetire := eli.exp2Retire[dt] + if tk == nil || onRetire == nil { + continue + } + + t.Lock() + + if exp, err := Int64(dbGet(mk)); err == nil { + // check expire again + if exp <= now { + onRetire(t, k) + t.Delete(tk) + t.Delete(mk) + + t.Commit() + } + + } + + t.Unlock() + } + it.Close() + + return +} diff --git a/vendor/github.com/lunny/nodb/t_zset.go b/vendor/github.com/lunny/nodb/t_zset.go new file mode 100644 index 0000000000..d0ffb7ccf3 --- /dev/null +++ b/vendor/github.com/lunny/nodb/t_zset.go @@ -0,0 +1,943 @@ +package nodb + +import ( + "bytes" + "encoding/binary" + "errors" + "time" + + "github.com/lunny/nodb/store" +) + +const ( + MinScore int64 = -1<<63 + 1 + MaxScore int64 = 1<<63 - 1 + InvalidScore int64 = -1 << 63 + + AggregateSum byte = 0 + AggregateMin byte = 1 + AggregateMax byte = 2 +) + +type ScorePair struct { + Score int64 + Member []byte +} + +var errZSizeKey = errors.New("invalid zsize key") +var errZSetKey = errors.New("invalid zset key") +var errZScoreKey = errors.New("invalid zscore key") +var errScoreOverflow = errors.New("zset score overflow") +var errInvalidAggregate = errors.New("invalid aggregate") +var errInvalidWeightNum = errors.New("invalid weight number") +var errInvalidSrcKeyNum = errors.New("invalid src key number") + +const ( + zsetNScoreSep byte = '<' + zsetPScoreSep byte = zsetNScoreSep + 1 + zsetStopScoreSep byte = zsetPScoreSep + 1 + + zsetStartMemSep byte = ':' + zsetStopMemSep byte = zsetStartMemSep + 1 +) + +func checkZSetKMSize(key []byte, member []byte) error { + if len(key) > MaxKeySize || len(key) == 0 { + return errKeySize + } else if len(member) > MaxZSetMemberSize || len(member) == 0 { + return errZSetMemberSize + } + return nil +} + +func (db *DB) zEncodeSizeKey(key []byte) []byte { + buf := make([]byte, len(key)+2) + buf[0] = db.index + buf[1] = ZSizeType + + copy(buf[2:], key) + return buf +} + +func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) { + if len(ek) < 2 || ek[0] != db.index || ek[1] != ZSizeType { + return nil, errZSizeKey + } + + return ek[2:], nil +} + +func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte { + buf := make([]byte, len(key)+len(member)+5) + + pos := 0 + buf[pos] = db.index + pos++ + + buf[pos] = ZSetType + pos++ + + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + + copy(buf[pos:], key) + pos += len(key) + + buf[pos] = zsetStartMemSep + pos++ + + copy(buf[pos:], member) + + return buf +} + +func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) { + if len(ek) < 5 || ek[0] != db.index || ek[1] != ZSetType { + return nil, nil, errZSetKey + } + + keyLen := int(binary.BigEndian.Uint16(ek[2:])) + if keyLen+5 > len(ek) { + return nil, nil, errZSetKey + } + + key := ek[4 : 4+keyLen] + + if ek[4+keyLen] != zsetStartMemSep { + return nil, nil, errZSetKey + } + + member := ek[5+keyLen:] + return key, member, nil +} + +func (db *DB) zEncodeStartSetKey(key []byte) []byte { + k := db.zEncodeSetKey(key, nil) + return k +} + +func (db *DB) zEncodeStopSetKey(key []byte) []byte { + k := db.zEncodeSetKey(key, nil) + k[len(k)-1] = zsetStartMemSep + 1 + return k +} + +func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte { + buf := make([]byte, len(key)+len(member)+14) + + pos := 0 + buf[pos] = db.index + pos++ + + buf[pos] = ZScoreType + pos++ + + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + + copy(buf[pos:], key) + pos += len(key) + + if score < 0 { + buf[pos] = zsetNScoreSep + } else { + buf[pos] = zsetPScoreSep + } + + pos++ + binary.BigEndian.PutUint64(buf[pos:], uint64(score)) + pos += 8 + + buf[pos] = zsetStartMemSep + pos++ + + copy(buf[pos:], member) + return buf +} + +func (db *DB) zEncodeStartScoreKey(key []byte, score int64) []byte { + return db.zEncodeScoreKey(key, nil, score) +} + +func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte { + k := db.zEncodeScoreKey(key, nil, score) + k[len(k)-1] = zsetStopMemSep + return k +} + +func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) { + if len(ek) < 14 || ek[0] != db.index || ek[1] != ZScoreType { + err = errZScoreKey + return + } + + keyLen := int(binary.BigEndian.Uint16(ek[2:])) + if keyLen+14 > len(ek) { + err = errZScoreKey + return + } + + key = ek[4 : 4+keyLen] + pos := 4 + keyLen + + if (ek[pos] != zsetNScoreSep) && (ek[pos] != zsetPScoreSep) { + err = errZScoreKey + return + } + pos++ + + score = int64(binary.BigEndian.Uint64(ek[pos:])) + pos += 8 + + if ek[pos] != zsetStartMemSep { + err = errZScoreKey + return + } + + pos++ + + member = ek[pos:] + return +} + +func (db *DB) zSetItem(t *batch, key []byte, score int64, member []byte) (int64, error) { + if score <= MinScore || score >= MaxScore { + return 0, errScoreOverflow + } + + var exists int64 = 0 + ek := db.zEncodeSetKey(key, member) + + if v, err := db.bucket.Get(ek); err != nil { + return 0, err + } else if v != nil { + exists = 1 + + if s, err := Int64(v, err); err != nil { + return 0, err + } else { + sk := db.zEncodeScoreKey(key, member, s) + t.Delete(sk) + } + } + + t.Put(ek, PutInt64(score)) + + sk := db.zEncodeScoreKey(key, member, score) + t.Put(sk, []byte{}) + + return exists, nil +} + +func (db *DB) zDelItem(t *batch, key []byte, member []byte, skipDelScore bool) (int64, error) { + ek := db.zEncodeSetKey(key, member) + if v, err := db.bucket.Get(ek); err != nil { + return 0, err + } else if v == nil { + //not exists + return 0, nil + } else { + //exists + if !skipDelScore { + //we must del score + if s, err := Int64(v, err); err != nil { + return 0, err + } else { + sk := db.zEncodeScoreKey(key, member, s) + t.Delete(sk) + } + } + } + + t.Delete(ek) + + return 1, nil +} + +func (db *DB) zDelete(t *batch, key []byte) int64 { + delMembCnt, _ := db.zRemRange(t, key, MinScore, MaxScore, 0, -1) + // todo : log err + return delMembCnt +} + +func (db *DB) zExpireAt(key []byte, when int64) (int64, error) { + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 { + return 0, err + } else { + db.expireAt(t, ZSetType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) { + if len(args) == 0 { + return 0, nil + } + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + var num int64 = 0 + for i := 0; i < len(args); i++ { + score := args[i].Score + member := args[i].Member + + if err := checkZSetKMSize(key, member); err != nil { + return 0, err + } + + if n, err := db.zSetItem(t, key, score, member); err != nil { + return 0, err + } else if n == 0 { + //add new + num++ + } + } + + if _, err := db.zIncrSize(t, key, num); err != nil { + return 0, err + } + + //todo add binlog + err := t.Commit() + return num, err +} + +func (db *DB) zIncrSize(t *batch, key []byte, delta int64) (int64, error) { + sk := db.zEncodeSizeKey(key) + + size, err := Int64(db.bucket.Get(sk)) + if err != nil { + return 0, err + } else { + size += delta + if size <= 0 { + size = 0 + t.Delete(sk) + db.rmExpire(t, ZSetType, key) + } else { + t.Put(sk, PutInt64(size)) + } + } + + return size, nil +} + +func (db *DB) ZCard(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + sk := db.zEncodeSizeKey(key) + return Int64(db.bucket.Get(sk)) +} + +func (db *DB) ZScore(key []byte, member []byte) (int64, error) { + if err := checkZSetKMSize(key, member); err != nil { + return InvalidScore, err + } + + var score int64 = InvalidScore + + k := db.zEncodeSetKey(key, member) + if v, err := db.bucket.Get(k); err != nil { + return InvalidScore, err + } else if v == nil { + return InvalidScore, ErrScoreMiss + } else { + if score, err = Int64(v, nil); err != nil { + return InvalidScore, err + } + } + + return score, nil +} + +func (db *DB) ZRem(key []byte, members ...[]byte) (int64, error) { + if len(members) == 0 { + return 0, nil + } + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + var num int64 = 0 + for i := 0; i < len(members); i++ { + if err := checkZSetKMSize(key, members[i]); err != nil { + return 0, err + } + + if n, err := db.zDelItem(t, key, members[i], false); err != nil { + return 0, err + } else if n == 1 { + num++ + } + } + + if _, err := db.zIncrSize(t, key, -num); err != nil { + return 0, err + } + + err := t.Commit() + return num, err +} + +func (db *DB) ZIncrBy(key []byte, delta int64, member []byte) (int64, error) { + if err := checkZSetKMSize(key, member); err != nil { + return InvalidScore, err + } + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + ek := db.zEncodeSetKey(key, member) + + var oldScore int64 = 0 + v, err := db.bucket.Get(ek) + if err != nil { + return InvalidScore, err + } else if v == nil { + db.zIncrSize(t, key, 1) + } else { + if oldScore, err = Int64(v, err); err != nil { + return InvalidScore, err + } + } + + newScore := oldScore + delta + if newScore >= MaxScore || newScore <= MinScore { + return InvalidScore, errScoreOverflow + } + + sk := db.zEncodeScoreKey(key, member, newScore) + t.Put(sk, []byte{}) + t.Put(ek, PutInt64(newScore)) + + if v != nil { + // so as to update score, we must delete the old one + oldSk := db.zEncodeScoreKey(key, member, oldScore) + t.Delete(oldSk) + } + + err = t.Commit() + return newScore, err +} + +func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + minKey := db.zEncodeStartScoreKey(key, min) + maxKey := db.zEncodeStopScoreKey(key, max) + + rangeType := store.RangeROpen + + it := db.bucket.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1) + var n int64 = 0 + for ; it.Valid(); it.Next() { + n++ + } + it.Close() + + return n, nil +} + +func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { + if err := checkZSetKMSize(key, member); err != nil { + return 0, err + } + + k := db.zEncodeSetKey(key, member) + + it := db.bucket.NewIterator() + defer it.Close() + + if v := it.Find(k); v == nil { + return -1, nil + } else { + if s, err := Int64(v, nil); err != nil { + return 0, err + } else { + var rit *store.RangeLimitIterator + + sk := db.zEncodeScoreKey(key, member, s) + + if !reverse { + minKey := db.zEncodeStartScoreKey(key, MinScore) + + rit = store.NewRangeIterator(it, &store.Range{minKey, sk, store.RangeClose}) + } else { + maxKey := db.zEncodeStopScoreKey(key, MaxScore) + rit = store.NewRevRangeIterator(it, &store.Range{sk, maxKey, store.RangeClose}) + } + + var lastKey []byte = nil + var n int64 = 0 + + for ; rit.Valid(); rit.Next() { + n++ + + lastKey = rit.BufKey(lastKey) + } + + if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) { + n-- + return n, nil + } + } + } + + return -1, nil +} + +func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator { + minKey := db.zEncodeStartScoreKey(key, min) + maxKey := db.zEncodeStopScoreKey(key, max) + + if !reverse { + return db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count) + } else { + return db.bucket.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count) + } +} + +func (db *DB) zRemRange(t *batch, key []byte, min int64, max int64, offset int, count int) (int64, error) { + if len(key) > MaxKeySize { + return 0, errKeySize + } + + it := db.zIterator(key, min, max, offset, count, false) + var num int64 = 0 + for ; it.Valid(); it.Next() { + sk := it.RawKey() + _, m, _, err := db.zDecodeScoreKey(sk) + if err != nil { + continue + } + + if n, err := db.zDelItem(t, key, m, true); err != nil { + return 0, err + } else if n == 1 { + num++ + } + + t.Delete(sk) + } + it.Close() + + if _, err := db.zIncrSize(t, key, -num); err != nil { + return 0, err + } + + return num, nil +} + +func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, reverse bool) ([]ScorePair, error) { + if len(key) > MaxKeySize { + return nil, errKeySize + } + + if offset < 0 { + return []ScorePair{}, nil + } + + nv := 64 + if count > 0 { + nv = count + } + + v := make([]ScorePair, 0, nv) + + var it *store.RangeLimitIterator + + //if reverse and offset is 0, count < 0, we may use forward iterator then reverse + //because store iterator prev is slower than next + if !reverse || (offset == 0 && count < 0) { + it = db.zIterator(key, min, max, offset, count, false) + } else { + it = db.zIterator(key, min, max, offset, count, true) + } + + for ; it.Valid(); it.Next() { + _, m, s, err := db.zDecodeScoreKey(it.Key()) + //may be we will check key equal? + if err != nil { + continue + } + + v = append(v, ScorePair{Member: m, Score: s}) + } + it.Close() + + if reverse && (offset == 0 && count < 0) { + for i, j := 0, len(v)-1; i < j; i, j = i+1, j-1 { + v[i], v[j] = v[j], v[i] + } + } + + return v, nil +} + +func (db *DB) zParseLimit(key []byte, start int, stop int) (offset int, count int, err error) { + if start < 0 || stop < 0 { + //refer redis implementation + var size int64 + size, err = db.ZCard(key) + if err != nil { + return + } + + llen := int(size) + + if start < 0 { + start = llen + start + } + if stop < 0 { + stop = llen + stop + } + + if start < 0 { + start = 0 + } + + if start >= llen { + offset = -1 + return + } + } + + if start > stop { + offset = -1 + return + } + + offset = start + count = (stop - start) + 1 + return +} + +func (db *DB) ZClear(key []byte) (int64, error) { + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + rmCnt, err := db.zRemRange(t, key, MinScore, MaxScore, 0, -1) + if err == nil { + err = t.Commit() + } + + return rmCnt, err +} + +func (db *DB) ZMclear(keys ...[]byte) (int64, error) { + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + for _, key := range keys { + if _, err := db.zRemRange(t, key, MinScore, MaxScore, 0, -1); err != nil { + return 0, err + } + } + + err := t.Commit() + + return int64(len(keys)), err +} + +func (db *DB) ZRange(key []byte, start int, stop int) ([]ScorePair, error) { + return db.ZRangeGeneric(key, start, stop, false) +} + +//min and max must be inclusive +//if no limit, set offset = 0 and count = -1 +func (db *DB) ZRangeByScore(key []byte, min int64, max int64, + offset int, count int) ([]ScorePair, error) { + return db.ZRangeByScoreGeneric(key, min, max, offset, count, false) +} + +func (db *DB) ZRank(key []byte, member []byte) (int64, error) { + return db.zrank(key, member, false) +} + +func (db *DB) ZRemRangeByRank(key []byte, start int, stop int) (int64, error) { + offset, count, err := db.zParseLimit(key, start, stop) + if err != nil { + return 0, err + } + + var rmCnt int64 + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + rmCnt, err = db.zRemRange(t, key, MinScore, MaxScore, offset, count) + if err == nil { + err = t.Commit() + } + + return rmCnt, err +} + +//min and max must be inclusive +func (db *DB) ZRemRangeByScore(key []byte, min int64, max int64) (int64, error) { + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + rmCnt, err := db.zRemRange(t, key, min, max, 0, -1) + if err == nil { + err = t.Commit() + } + + return rmCnt, err +} + +func (db *DB) ZRevRange(key []byte, start int, stop int) ([]ScorePair, error) { + return db.ZRangeGeneric(key, start, stop, true) +} + +func (db *DB) ZRevRank(key []byte, member []byte) (int64, error) { + return db.zrank(key, member, true) +} + +//min and max must be inclusive +//if no limit, set offset = 0 and count = -1 +func (db *DB) ZRevRangeByScore(key []byte, min int64, max int64, offset int, count int) ([]ScorePair, error) { + return db.ZRangeByScoreGeneric(key, min, max, offset, count, true) +} + +func (db *DB) ZRangeGeneric(key []byte, start int, stop int, reverse bool) ([]ScorePair, error) { + offset, count, err := db.zParseLimit(key, start, stop) + if err != nil { + return nil, err + } + + return db.zRange(key, MinScore, MaxScore, offset, count, reverse) +} + +//min and max must be inclusive +//if no limit, set offset = 0 and count = -1 +func (db *DB) ZRangeByScoreGeneric(key []byte, min int64, max int64, + offset int, count int, reverse bool) ([]ScorePair, error) { + + return db.zRange(key, min, max, offset, count, reverse) +} + +func (db *DB) zFlush() (drop int64, err error) { + t := db.zsetBatch + t.Lock() + defer t.Unlock() + return db.flushType(t, ZSetType) +} + +func (db *DB) ZExpire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.zExpireAt(key, time.Now().Unix()+duration) +} + +func (db *DB) ZExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.zExpireAt(key, when) +} + +func (db *DB) ZTTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(ZSetType, key) +} + +func (db *DB) ZPersist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + n, err := db.rmExpire(t, ZSetType, key) + if err != nil { + return 0, err + } + + err = t.Commit() + return n, err +} + +func getAggregateFunc(aggregate byte) func(int64, int64) int64 { + switch aggregate { + case AggregateSum: + return func(a int64, b int64) int64 { + return a + b + } + case AggregateMax: + return func(a int64, b int64) int64 { + if a > b { + return a + } + return b + } + case AggregateMin: + return func(a int64, b int64) int64 { + if a > b { + return b + } + return a + } + } + return nil +} + +func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { + + var destMap = map[string]int64{} + aggregateFunc := getAggregateFunc(aggregate) + if aggregateFunc == nil { + return 0, errInvalidAggregate + } + if len(srcKeys) < 1 { + return 0, errInvalidSrcKeyNum + } + if weights != nil { + if len(srcKeys) != len(weights) { + return 0, errInvalidWeightNum + } + } else { + weights = make([]int64, len(srcKeys)) + for i := 0; i < len(weights); i++ { + weights[i] = 1 + } + } + + for i, key := range srcKeys { + scorePairs, err := db.ZRange(key, 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + if score, ok := destMap[String(pair.Member)]; !ok { + destMap[String(pair.Member)] = pair.Score * weights[i] + } else { + destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) + } + } + } + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + db.zDelete(t, destKey) + + for member, score := range destMap { + if err := checkZSetKMSize(destKey, []byte(member)); err != nil { + return 0, err + } + + if _, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil { + return 0, err + } + } + + var num = int64(len(destMap)) + sk := db.zEncodeSizeKey(destKey) + t.Put(sk, PutInt64(num)) + + //todo add binlog + if err := t.Commit(); err != nil { + return 0, err + } + return num, nil +} + +func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { + + aggregateFunc := getAggregateFunc(aggregate) + if aggregateFunc == nil { + return 0, errInvalidAggregate + } + if len(srcKeys) < 1 { + return 0, errInvalidSrcKeyNum + } + if weights != nil { + if len(srcKeys) != len(weights) { + return 0, errInvalidWeightNum + } + } else { + weights = make([]int64, len(srcKeys)) + for i := 0; i < len(weights); i++ { + weights[i] = 1 + } + } + + var destMap = map[string]int64{} + scorePairs, err := db.ZRange(srcKeys[0], 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + destMap[String(pair.Member)] = pair.Score * weights[0] + } + + for i, key := range srcKeys[1:] { + scorePairs, err := db.ZRange(key, 0, -1) + if err != nil { + return 0, err + } + tmpMap := map[string]int64{} + for _, pair := range scorePairs { + if score, ok := destMap[String(pair.Member)]; ok { + tmpMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i+1]) + } + } + destMap = tmpMap + } + + t := db.zsetBatch + t.Lock() + defer t.Unlock() + + db.zDelete(t, destKey) + + for member, score := range destMap { + if err := checkZSetKMSize(destKey, []byte(member)); err != nil { + return 0, err + } + if _, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil { + return 0, err + } + } + + var num int64 = int64(len(destMap)) + sk := db.zEncodeSizeKey(destKey) + t.Put(sk, PutInt64(num)) + //todo add binlog + if err := t.Commit(); err != nil { + return 0, err + } + return num, nil +} + +func (db *DB) ZScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) { + return db.scan(ZSizeType, key, count, inclusive, match) +} diff --git a/vendor/github.com/lunny/nodb/tx.go b/vendor/github.com/lunny/nodb/tx.go new file mode 100644 index 0000000000..5ce99db57a --- /dev/null +++ b/vendor/github.com/lunny/nodb/tx.go @@ -0,0 +1,113 @@ +package nodb + +import ( + "errors" + "fmt" + + "github.com/lunny/nodb/store" +) + +var ( + ErrNestTx = errors.New("nest transaction not supported") + ErrTxDone = errors.New("Transaction has already been committed or rolled back") +) + +type Tx struct { + *DB + + tx *store.Tx + + logs [][]byte +} + +func (db *DB) IsTransaction() bool { + return db.status == DBInTransaction +} + +// Begin a transaction, it will block all other write operations before calling Commit or Rollback. +// You must be very careful to prevent long-time transaction. +func (db *DB) Begin() (*Tx, error) { + if db.IsTransaction() { + return nil, ErrNestTx + } + + tx := new(Tx) + + tx.DB = new(DB) + tx.DB.l = db.l + + tx.l.wLock.Lock() + + tx.DB.sdb = db.sdb + + var err error + tx.tx, err = db.sdb.Begin() + if err != nil { + tx.l.wLock.Unlock() + return nil, err + } + + tx.DB.bucket = tx.tx + + tx.DB.status = DBInTransaction + + tx.DB.index = db.index + + tx.DB.kvBatch = tx.newBatch() + tx.DB.listBatch = tx.newBatch() + tx.DB.hashBatch = tx.newBatch() + tx.DB.zsetBatch = tx.newBatch() + tx.DB.binBatch = tx.newBatch() + tx.DB.setBatch = tx.newBatch() + + return tx, nil +} + +func (tx *Tx) Commit() error { + if tx.tx == nil { + return ErrTxDone + } + + tx.l.commitLock.Lock() + err := tx.tx.Commit() + tx.tx = nil + + if len(tx.logs) > 0 { + tx.l.binlog.Log(tx.logs...) + } + + tx.l.commitLock.Unlock() + + tx.l.wLock.Unlock() + + tx.DB.bucket = nil + + return err +} + +func (tx *Tx) Rollback() error { + if tx.tx == nil { + return ErrTxDone + } + + err := tx.tx.Rollback() + tx.tx = nil + + tx.l.wLock.Unlock() + tx.DB.bucket = nil + + return err +} + +func (tx *Tx) newBatch() *batch { + return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx) +} + +func (tx *Tx) Select(index int) error { + if index < 0 || index >= int(MaxDBNumber) { + return fmt.Errorf("invalid db index %d", index) + } + + tx.DB.index = uint8(index) + return nil +} diff --git a/vendor/github.com/lunny/nodb/util.go b/vendor/github.com/lunny/nodb/util.go new file mode 100644 index 0000000000..d5949a96e6 --- /dev/null +++ b/vendor/github.com/lunny/nodb/util.go @@ -0,0 +1,113 @@ +package nodb + +import ( + "encoding/binary" + "errors" + "reflect" + "strconv" + "unsafe" +) + +var errIntNumber = errors.New("invalid integer") + +// no copy to change slice to string +// use your own risk +func String(b []byte) (s string) { + pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pstring := (*reflect.StringHeader)(unsafe.Pointer(&s)) + pstring.Data = pbytes.Data + pstring.Len = pbytes.Len + return +} + +// no copy to change string to slice +// use your own risk +func Slice(s string) (b []byte) { + pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pstring := (*reflect.StringHeader)(unsafe.Pointer(&s)) + pbytes.Data = pstring.Data + pbytes.Len = pstring.Len + pbytes.Cap = pstring.Len + return +} + +func Int64(v []byte, err error) (int64, error) { + if err != nil { + return 0, err + } else if v == nil || len(v) == 0 { + return 0, nil + } else if len(v) != 8 { + return 0, errIntNumber + } + + return int64(binary.LittleEndian.Uint64(v)), nil +} + +func PutInt64(v int64) []byte { + var b []byte + pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pbytes.Data = uintptr(unsafe.Pointer(&v)) + pbytes.Len = 8 + pbytes.Cap = 8 + return b +} + +func StrInt64(v []byte, err error) (int64, error) { + if err != nil { + return 0, err + } else if v == nil { + return 0, nil + } else { + return strconv.ParseInt(String(v), 10, 64) + } +} + +func StrInt32(v []byte, err error) (int32, error) { + if err != nil { + return 0, err + } else if v == nil { + return 0, nil + } else { + res, err := strconv.ParseInt(String(v), 10, 32) + return int32(res), err + } +} + +func StrInt8(v []byte, err error) (int8, error) { + if err != nil { + return 0, err + } else if v == nil { + return 0, nil + } else { + res, err := strconv.ParseInt(String(v), 10, 8) + return int8(res), err + } +} + +func StrPutInt64(v int64) []byte { + return strconv.AppendInt(nil, v, 10) +} + +func MinUInt32(a uint32, b uint32) uint32 { + if a > b { + return b + } else { + return a + } +} + +func MaxUInt32(a uint32, b uint32) uint32 { + if a > b { + return a + } else { + return b + } +} + +func MaxInt32(a int32, b int32) int32 { + if a > b { + return a + } else { + return b + } +} |