diff options
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/session_util.go')
-rw-r--r-- | vendor/github.com/syndtr/goleveldb/leveldb/session_util.go | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go new file mode 100644 index 0000000000..40cb2cf957 --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -0,0 +1,271 @@ +// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package leveldb + +import ( + "fmt" + "sync/atomic" + + "github.com/syndtr/goleveldb/leveldb/journal" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +// Logging. + +type dropper struct { + s *session + fd storage.FileDesc +} + +func (d dropper) Drop(err error) { + if e, ok := err.(*journal.ErrCorrupted); ok { + d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) + } else { + d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) + } +} + +func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) } +func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) } + +// File utils. + +func (s *session) newTemp() storage.FileDesc { + num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 + return storage.FileDesc{Type: storage.TypeTemp, Num: num} +} + +func (s *session) addFileRef(fd storage.FileDesc, ref int) int { + ref += s.fileRef[fd.Num] + if ref > 0 { + s.fileRef[fd.Num] = ref + } else if ref == 0 { + delete(s.fileRef, fd.Num) + } else { + panic(fmt.Sprintf("negative ref: %v", fd)) + } + return ref +} + +// Session state. + +// Get current version. This will incr version ref, must call +// version.release (exactly once) after use. +func (s *session) version() *version { + s.vmu.Lock() + defer s.vmu.Unlock() + s.stVersion.incref() + return s.stVersion +} + +func (s *session) tLen(level int) int { + s.vmu.Lock() + defer s.vmu.Unlock() + return s.stVersion.tLen(level) +} + +// Set current version to v. +func (s *session) setVersion(v *version) { + s.vmu.Lock() + defer s.vmu.Unlock() + // Hold by session. It is important to call this first before releasing + // current version, otherwise the still used files might get released. + v.incref() + if s.stVersion != nil { + // Release current version. + s.stVersion.releaseNB() + } + s.stVersion = v +} + +// Get current unused file number. +func (s *session) nextFileNum() int64 { + return atomic.LoadInt64(&s.stNextFileNum) +} + +// Set current unused file number to num. +func (s *session) setNextFileNum(num int64) { + atomic.StoreInt64(&s.stNextFileNum, num) +} + +// Mark file number as used. +func (s *session) markFileNum(num int64) { + nextFileNum := num + 1 + for { + old, x := s.stNextFileNum, nextFileNum + if old > x { + x = old + } + if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { + break + } + } +} + +// Allocate a file number. +func (s *session) allocFileNum() int64 { + return atomic.AddInt64(&s.stNextFileNum, 1) - 1 +} + +// Reuse given file number. +func (s *session) reuseFileNum(num int64) { + for { + old, x := s.stNextFileNum, num + if old != x+1 { + x = old + } + if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { + break + } + } +} + +// Set compaction ptr at given level; need external synchronization. +func (s *session) setCompPtr(level int, ik internalKey) { + if level >= len(s.stCompPtrs) { + newCompPtrs := make([]internalKey, level+1) + copy(newCompPtrs, s.stCompPtrs) + s.stCompPtrs = newCompPtrs + } + s.stCompPtrs[level] = append(internalKey{}, ik...) +} + +// Get compaction ptr at given level; need external synchronization. +func (s *session) getCompPtr(level int) internalKey { + if level >= len(s.stCompPtrs) { + return nil + } + return s.stCompPtrs[level] +} + +// Manifest related utils. + +// Fill given session record obj with current states; need external +// synchronization. +func (s *session) fillRecord(r *sessionRecord, snapshot bool) { + r.setNextFileNum(s.nextFileNum()) + + if snapshot { + if !r.has(recJournalNum) { + r.setJournalNum(s.stJournalNum) + } + + if !r.has(recSeqNum) { + r.setSeqNum(s.stSeqNum) + } + + for level, ik := range s.stCompPtrs { + if ik != nil { + r.addCompPtr(level, ik) + } + } + + r.setComparer(s.icmp.uName()) + } +} + +// Mark if record has been committed, this will update session state; +// need external synchronization. +func (s *session) recordCommited(rec *sessionRecord) { + if rec.has(recJournalNum) { + s.stJournalNum = rec.journalNum + } + + if rec.has(recPrevJournalNum) { + s.stPrevJournalNum = rec.prevJournalNum + } + + if rec.has(recSeqNum) { + s.stSeqNum = rec.seqNum + } + + for _, r := range rec.compPtrs { + s.setCompPtr(r.level, internalKey(r.ikey)) + } +} + +// Create a new manifest file; need external synchronization. +func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { + fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()} + writer, err := s.stor.Create(fd) + if err != nil { + return + } + jw := journal.NewWriter(writer) + + if v == nil { + v = s.version() + defer v.release() + } + if rec == nil { + rec = &sessionRecord{} + } + s.fillRecord(rec, true) + v.fillRecord(rec) + + defer func() { + if err == nil { + s.recordCommited(rec) + if s.manifest != nil { + s.manifest.Close() + } + if s.manifestWriter != nil { + s.manifestWriter.Close() + } + if !s.manifestFd.Zero() { + s.stor.Remove(s.manifestFd) + } + s.manifestFd = fd + s.manifestWriter = writer + s.manifest = jw + } else { + writer.Close() + s.stor.Remove(fd) + s.reuseFileNum(fd.Num) + } + }() + + w, err := jw.Next() + if err != nil { + return + } + err = rec.encode(w) + if err != nil { + return + } + err = jw.Flush() + if err != nil { + return + } + err = s.stor.SetMeta(fd) + return +} + +// Flush record to disk. +func (s *session) flushManifest(rec *sessionRecord) (err error) { + s.fillRecord(rec, false) + w, err := s.manifest.Next() + if err != nil { + return + } + err = rec.encode(w) + if err != nil { + return + } + err = s.manifest.Flush() + if err != nil { + return + } + if !s.o.GetNoSync() { + err = s.manifestWriter.Sync() + if err != nil { + return + } + } + s.recordCommited(rec) + return +} |