diff options
Diffstat (limited to 'vendor/github.com/lunny/nodb/replication.go')
-rw-r--r-- | vendor/github.com/lunny/nodb/replication.go | 312 |
1 files changed, 0 insertions, 312 deletions
diff --git a/vendor/github.com/lunny/nodb/replication.go b/vendor/github.com/lunny/nodb/replication.go deleted file mode 100644 index f9bc951085..0000000000 --- a/vendor/github.com/lunny/nodb/replication.go +++ /dev/null @@ -1,312 +0,0 @@ -package nodb - -import ( - "bufio" - "bytes" - "errors" - "io" - "os" - "time" - - "github.com/lunny/log" - "github.com/lunny/nodb/store/driver" -) - -const ( - maxReplBatchNum = 100 - maxReplLogSize = 1 * 1024 * 1024 -) - -var ( - ErrSkipEvent = errors.New("skip to next event") -) - -var ( - errInvalidBinLogEvent = errors.New("invalid binglog event") - errInvalidBinLogFile = errors.New("invalid binlog file") -) - -type replBatch struct { - wb driver.IWriteBatch - events [][]byte - l *Nodb - - lastHead *BinLogHead -} - -func (b *replBatch) Commit() error { - b.l.commitLock.Lock() - defer b.l.commitLock.Unlock() - - err := b.wb.Commit() - if err != nil { - b.Rollback() - return err - } - - if b.l.binlog != nil { - if err = b.l.binlog.Log(b.events...); err != nil { - b.Rollback() - return err - } - } - - b.events = [][]byte{} - b.lastHead = nil - - return nil -} - -func (b *replBatch) Rollback() error { - b.wb.Rollback() - b.events = [][]byte{} - b.lastHead = nil - return nil -} - -func (l *Nodb) replicateEvent(b *replBatch, event []byte) error { - if len(event) == 0 { - return errInvalidBinLogEvent - } - - b.events = append(b.events, event) - - logType := uint8(event[0]) - switch logType { - case BinLogTypePut: - return l.replicatePutEvent(b, event) - case BinLogTypeDeletion: - return l.replicateDeleteEvent(b, event) - default: - return errInvalidBinLogEvent - } -} - -func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error { - key, value, err := decodeBinLogPut(event) - if err != nil { - return err - } - - b.wb.Put(key, value) - - return nil -} - -func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error { - key, err := decodeBinLogDelete(event) - if err != nil { - return err - } - - b.wb.Delete(key) - - return nil -} - -func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { - head := &BinLogHead{} - var err error - - for { - if err = head.Read(rb); err != nil { - if err == io.EOF { - break - } else { - return err - } - } - - var dataBuf bytes.Buffer - - if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { - return err - } - - err = f(head, dataBuf.Bytes()) - if err != nil && err != ErrSkipEvent { - return err - } - } - - return nil -} - -func (l *Nodb) ReplicateFromReader(rb io.Reader) error { - b := new(replBatch) - - b.wb = l.ldb.NewWriteBatch() - b.l = l - - f := func(head *BinLogHead, event []byte) error { - if b.lastHead == nil { - b.lastHead = head - } else if !b.lastHead.InSameBatch(head) { - if err := b.Commit(); err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) - return ErrSkipEvent - } - b.lastHead = head - } - - err := l.replicateEvent(b, event) - if err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) - return ErrSkipEvent - } - return nil - } - - err := ReadEventFromReader(rb, f) - if err != nil { - b.Rollback() - return err - } - return b.Commit() -} - -func (l *Nodb) ReplicateFromData(data []byte) error { - rb := bytes.NewReader(data) - - err := l.ReplicateFromReader(rb) - - return err -} - -func (l *Nodb) ReplicateFromBinLog(filePath string) error { - f, err := os.Open(filePath) - if err != nil { - return err - } - - rb := bufio.NewReaderSize(f, 4096) - - err = l.ReplicateFromReader(rb) - - f.Close() - - return err -} - -// try to read events, if no events read, try to wait the new event singal until timeout seconds -func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { - lastIndex := info.LogFileIndex - lastPos := info.LogPos - - n = 0 - if l.binlog == nil { - //binlog not supported - info.LogFileIndex = 0 - info.LogPos = 0 - return - } - - n, err = l.ReadEventsTo(info, w) - if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { - //no events read - select { - case <-l.binlog.Wait(): - case <-time.After(time.Duration(timeout) * time.Second): - } - return l.ReadEventsTo(info, w) - } - return -} - -func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { - n = 0 - if l.binlog == nil { - //binlog not supported - info.LogFileIndex = 0 - info.LogPos = 0 - return - } - - index := info.LogFileIndex - offset := info.LogPos - - filePath := l.binlog.FormatLogFilePath(index) - - var f *os.File - f, err = os.Open(filePath) - if os.IsNotExist(err) { - lastIndex := l.binlog.LogFileIndex() - - if index == lastIndex { - //no binlog at all - info.LogPos = 0 - } else { - //slave binlog info had lost - info.LogFileIndex = -1 - } - } - - if err != nil { - if os.IsNotExist(err) { - err = nil - } - return - } - - defer f.Close() - - var fileSize int64 - st, _ := f.Stat() - fileSize = st.Size() - - if fileSize == info.LogPos { - return - } - - if _, err = f.Seek(offset, os.SEEK_SET); err != nil { - //may be invliad seek offset - return - } - - var lastHead *BinLogHead = nil - - head := &BinLogHead{} - - batchNum := 0 - - for { - if err = head.Read(f); err != nil { - if err == io.EOF { - //we will try to use next binlog - if index < l.binlog.LogFileIndex() { - info.LogFileIndex += 1 - info.LogPos = 0 - } - err = nil - return - } else { - return - } - - } - - if lastHead == nil { - lastHead = head - batchNum++ - } else if !lastHead.InSameBatch(head) { - lastHead = head - batchNum++ - if batchNum > maxReplBatchNum || n > maxReplLogSize { - return - } - } - - if err = head.Write(w); err != nil { - return - } - - if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { - return - } - - n += (head.Len() + int(head.PayloadLen)) - info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) - } - - return -} |