summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/lunny/nodb/replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/lunny/nodb/replication.go')
-rw-r--r--vendor/github.com/lunny/nodb/replication.go312
1 files changed, 312 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/nodb/replication.go b/vendor/github.com/lunny/nodb/replication.go
new file mode 100644
index 0000000000..f9bc951085
--- /dev/null
+++ b/vendor/github.com/lunny/nodb/replication.go
@@ -0,0 +1,312 @@
+package nodb
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "io"
+ "os"
+ "time"
+
+ "github.com/lunny/log"
+ "github.com/lunny/nodb/store/driver"
+)
+
+const (
+ maxReplBatchNum = 100
+ maxReplLogSize = 1 * 1024 * 1024
+)
+
+var (
+ ErrSkipEvent = errors.New("skip to next event")
+)
+
+var (
+ errInvalidBinLogEvent = errors.New("invalid binglog event")
+ errInvalidBinLogFile = errors.New("invalid binlog file")
+)
+
+type replBatch struct {
+ wb driver.IWriteBatch
+ events [][]byte
+ l *Nodb
+
+ lastHead *BinLogHead
+}
+
+func (b *replBatch) Commit() error {
+ b.l.commitLock.Lock()
+ defer b.l.commitLock.Unlock()
+
+ err := b.wb.Commit()
+ if err != nil {
+ b.Rollback()
+ return err
+ }
+
+ if b.l.binlog != nil {
+ if err = b.l.binlog.Log(b.events...); err != nil {
+ b.Rollback()
+ return err
+ }
+ }
+
+ b.events = [][]byte{}
+ b.lastHead = nil
+
+ return nil
+}
+
+func (b *replBatch) Rollback() error {
+ b.wb.Rollback()
+ b.events = [][]byte{}
+ b.lastHead = nil
+ return nil
+}
+
+func (l *Nodb) replicateEvent(b *replBatch, event []byte) error {
+ if len(event) == 0 {
+ return errInvalidBinLogEvent
+ }
+
+ b.events = append(b.events, event)
+
+ logType := uint8(event[0])
+ switch logType {
+ case BinLogTypePut:
+ return l.replicatePutEvent(b, event)
+ case BinLogTypeDeletion:
+ return l.replicateDeleteEvent(b, event)
+ default:
+ return errInvalidBinLogEvent
+ }
+}
+
+func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error {
+ key, value, err := decodeBinLogPut(event)
+ if err != nil {
+ return err
+ }
+
+ b.wb.Put(key, value)
+
+ return nil
+}
+
+func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error {
+ key, err := decodeBinLogDelete(event)
+ if err != nil {
+ return err
+ }
+
+ b.wb.Delete(key)
+
+ return nil
+}
+
+func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
+ head := &BinLogHead{}
+ var err error
+
+ for {
+ if err = head.Read(rb); err != nil {
+ if err == io.EOF {
+ break
+ } else {
+ return err
+ }
+ }
+
+ var dataBuf bytes.Buffer
+
+ if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
+ return err
+ }
+
+ err = f(head, dataBuf.Bytes())
+ if err != nil && err != ErrSkipEvent {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (l *Nodb) ReplicateFromReader(rb io.Reader) error {
+ b := new(replBatch)
+
+ b.wb = l.ldb.NewWriteBatch()
+ b.l = l
+
+ f := func(head *BinLogHead, event []byte) error {
+ if b.lastHead == nil {
+ b.lastHead = head
+ } else if !b.lastHead.InSameBatch(head) {
+ if err := b.Commit(); err != nil {
+ log.Fatal("replication error %s, skip to next", err.Error())
+ return ErrSkipEvent
+ }
+ b.lastHead = head
+ }
+
+ err := l.replicateEvent(b, event)
+ if err != nil {
+ log.Fatal("replication error %s, skip to next", err.Error())
+ return ErrSkipEvent
+ }
+ return nil
+ }
+
+ err := ReadEventFromReader(rb, f)
+ if err != nil {
+ b.Rollback()
+ return err
+ }
+ return b.Commit()
+}
+
+func (l *Nodb) ReplicateFromData(data []byte) error {
+ rb := bytes.NewReader(data)
+
+ err := l.ReplicateFromReader(rb)
+
+ return err
+}
+
+func (l *Nodb) ReplicateFromBinLog(filePath string) error {
+ f, err := os.Open(filePath)
+ if err != nil {
+ return err
+ }
+
+ rb := bufio.NewReaderSize(f, 4096)
+
+ err = l.ReplicateFromReader(rb)
+
+ f.Close()
+
+ return err
+}
+
+// try to read events, if no events read, try to wait the new event singal until timeout seconds
+func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
+ lastIndex := info.LogFileIndex
+ lastPos := info.LogPos
+
+ n = 0
+ if l.binlog == nil {
+ //binlog not supported
+ info.LogFileIndex = 0
+ info.LogPos = 0
+ return
+ }
+
+ n, err = l.ReadEventsTo(info, w)
+ if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
+ //no events read
+ select {
+ case <-l.binlog.Wait():
+ case <-time.After(time.Duration(timeout) * time.Second):
+ }
+ return l.ReadEventsTo(info, w)
+ }
+ return
+}
+
+func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
+ n = 0
+ if l.binlog == nil {
+ //binlog not supported
+ info.LogFileIndex = 0
+ info.LogPos = 0
+ return
+ }
+
+ index := info.LogFileIndex
+ offset := info.LogPos
+
+ filePath := l.binlog.FormatLogFilePath(index)
+
+ var f *os.File
+ f, err = os.Open(filePath)
+ if os.IsNotExist(err) {
+ lastIndex := l.binlog.LogFileIndex()
+
+ if index == lastIndex {
+ //no binlog at all
+ info.LogPos = 0
+ } else {
+ //slave binlog info had lost
+ info.LogFileIndex = -1
+ }
+ }
+
+ if err != nil {
+ if os.IsNotExist(err) {
+ err = nil
+ }
+ return
+ }
+
+ defer f.Close()
+
+ var fileSize int64
+ st, _ := f.Stat()
+ fileSize = st.Size()
+
+ if fileSize == info.LogPos {
+ return
+ }
+
+ if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
+ //may be invliad seek offset
+ return
+ }
+
+ var lastHead *BinLogHead = nil
+
+ head := &BinLogHead{}
+
+ batchNum := 0
+
+ for {
+ if err = head.Read(f); err != nil {
+ if err == io.EOF {
+ //we will try to use next binlog
+ if index < l.binlog.LogFileIndex() {
+ info.LogFileIndex += 1
+ info.LogPos = 0
+ }
+ err = nil
+ return
+ } else {
+ return
+ }
+
+ }
+
+ if lastHead == nil {
+ lastHead = head
+ batchNum++
+ } else if !lastHead.InSameBatch(head) {
+ lastHead = head
+ batchNum++
+ if batchNum > maxReplBatchNum || n > maxReplLogSize {
+ return
+ }
+ }
+
+ if err = head.Write(w); err != nil {
+ return
+ }
+
+ if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
+ return
+ }
+
+ n += (head.Len() + int(head.PayloadLen))
+ info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
+ }
+
+ return
+}