diff options
Diffstat (limited to 'vendor/github.com/lunny/nodb/replication.go')
-rw-r--r-- | vendor/github.com/lunny/nodb/replication.go | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/vendor/github.com/lunny/nodb/replication.go b/vendor/github.com/lunny/nodb/replication.go new file mode 100644 index 0000000000..f9bc951085 --- /dev/null +++ b/vendor/github.com/lunny/nodb/replication.go @@ -0,0 +1,312 @@ +package nodb + +import ( + "bufio" + "bytes" + "errors" + "io" + "os" + "time" + + "github.com/lunny/log" + "github.com/lunny/nodb/store/driver" +) + +const ( + maxReplBatchNum = 100 + maxReplLogSize = 1 * 1024 * 1024 +) + +var ( + ErrSkipEvent = errors.New("skip to next event") +) + +var ( + errInvalidBinLogEvent = errors.New("invalid binglog event") + errInvalidBinLogFile = errors.New("invalid binlog file") +) + +type replBatch struct { + wb driver.IWriteBatch + events [][]byte + l *Nodb + + lastHead *BinLogHead +} + +func (b *replBatch) Commit() error { + b.l.commitLock.Lock() + defer b.l.commitLock.Unlock() + + err := b.wb.Commit() + if err != nil { + b.Rollback() + return err + } + + if b.l.binlog != nil { + if err = b.l.binlog.Log(b.events...); err != nil { + b.Rollback() + return err + } + } + + b.events = [][]byte{} + b.lastHead = nil + + return nil +} + +func (b *replBatch) Rollback() error { + b.wb.Rollback() + b.events = [][]byte{} + b.lastHead = nil + return nil +} + +func (l *Nodb) replicateEvent(b *replBatch, event []byte) error { + if len(event) == 0 { + return errInvalidBinLogEvent + } + + b.events = append(b.events, event) + + logType := uint8(event[0]) + switch logType { + case BinLogTypePut: + return l.replicatePutEvent(b, event) + case BinLogTypeDeletion: + return l.replicateDeleteEvent(b, event) + default: + return errInvalidBinLogEvent + } +} + +func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error { + key, value, err := decodeBinLogPut(event) + if err != nil { + return err + } + + b.wb.Put(key, value) + + return nil +} + +func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error { + key, err := decodeBinLogDelete(event) + if err != nil { + return err + } + + b.wb.Delete(key) + + return nil +} + +func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { + head := &BinLogHead{} + var err error + + for { + if err = head.Read(rb); err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + var dataBuf bytes.Buffer + + if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { + return err + } + + err = f(head, dataBuf.Bytes()) + if err != nil && err != ErrSkipEvent { + return err + } + } + + return nil +} + +func (l *Nodb) ReplicateFromReader(rb io.Reader) error { + b := new(replBatch) + + b.wb = l.ldb.NewWriteBatch() + b.l = l + + f := func(head *BinLogHead, event []byte) error { + if b.lastHead == nil { + b.lastHead = head + } else if !b.lastHead.InSameBatch(head) { + if err := b.Commit(); err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + b.lastHead = head + } + + err := l.replicateEvent(b, event) + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + return nil + } + + err := ReadEventFromReader(rb, f) + if err != nil { + b.Rollback() + return err + } + return b.Commit() +} + +func (l *Nodb) ReplicateFromData(data []byte) error { + rb := bytes.NewReader(data) + + err := l.ReplicateFromReader(rb) + + return err +} + +func (l *Nodb) ReplicateFromBinLog(filePath string) error { + f, err := os.Open(filePath) + if err != nil { + return err + } + + rb := bufio.NewReaderSize(f, 4096) + + err = l.ReplicateFromReader(rb) + + f.Close() + + return err +} + +// try to read events, if no events read, try to wait the new event singal until timeout seconds +func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { + lastIndex := info.LogFileIndex + lastPos := info.LogPos + + n = 0 + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + info.LogPos = 0 + return + } + + n, err = l.ReadEventsTo(info, w) + if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { + //no events read + select { + case <-l.binlog.Wait(): + case <-time.After(time.Duration(timeout) * time.Second): + } + return l.ReadEventsTo(info, w) + } + return +} + +func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { + n = 0 + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + info.LogPos = 0 + return + } + + index := info.LogFileIndex + offset := info.LogPos + + filePath := l.binlog.FormatLogFilePath(index) + + var f *os.File + f, err = os.Open(filePath) + if os.IsNotExist(err) { + lastIndex := l.binlog.LogFileIndex() + + if index == lastIndex { + //no binlog at all + info.LogPos = 0 + } else { + //slave binlog info had lost + info.LogFileIndex = -1 + } + } + + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return + } + + defer f.Close() + + var fileSize int64 + st, _ := f.Stat() + fileSize = st.Size() + + if fileSize == info.LogPos { + return + } + + if _, err = f.Seek(offset, os.SEEK_SET); err != nil { + //may be invliad seek offset + return + } + + var lastHead *BinLogHead = nil + + head := &BinLogHead{} + + batchNum := 0 + + for { + if err = head.Read(f); err != nil { + if err == io.EOF { + //we will try to use next binlog + if index < l.binlog.LogFileIndex() { + info.LogFileIndex += 1 + info.LogPos = 0 + } + err = nil + return + } else { + return + } + + } + + if lastHead == nil { + lastHead = head + batchNum++ + } else if !lastHead.InSameBatch(head) { + lastHead = head + batchNum++ + if batchNum > maxReplBatchNum || n > maxReplLogSize { + return + } + } + + if err = head.Write(w); err != nil { + return + } + + if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { + return + } + + n += (head.Len() + int(head.PayloadLen)) + info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) + } + + return +} |