diff options
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/db_write.go')
-rw-r--r-- | vendor/github.com/syndtr/goleveldb/leveldb/db_write.go | 358 |
1 files changed, 0 insertions, 358 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go deleted file mode 100644 index 5576761fed..0000000000 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go +++ /dev/null @@ -1,358 +0,0 @@ -// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> -// All rights reserved. -// -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package leveldb - -import ( - "time" - - "github.com/syndtr/goleveldb/leveldb/memdb" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" -) - -func (db *DB) writeJournal(b *Batch) error { - w, err := db.journal.Next() - if err != nil { - return err - } - if _, err := w.Write(b.encode()); err != nil { - return err - } - if err := db.journal.Flush(); err != nil { - return err - } - if b.sync { - return db.journalWriter.Sync() - } - return nil -} - -func (db *DB) jWriter() { - defer db.closeW.Done() - for { - select { - case b := <-db.journalC: - if b != nil { - db.journalAckC <- db.writeJournal(b) - } - case _, _ = <-db.closeC: - return - } - } -} - -func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { - // Wait for pending memdb compaction. - err = db.compTriggerWait(db.mcompCmdC) - if err != nil { - return - } - - // Create new memdb and journal. - mem, err = db.newMem(n) - if err != nil { - return - } - - // Schedule memdb compaction. - if wait { - err = db.compTriggerWait(db.mcompCmdC) - } else { - db.compTrigger(db.mcompCmdC) - } - return -} - -func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { - delayed := false - flush := func() (retry bool) { - v := db.s.version() - defer v.release() - mdb = db.getEffectiveMem() - defer func() { - if retry { - mdb.decref() - mdb = nil - } - }() - mdbFree = mdb.Free() - switch { - case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: - delayed = true - time.Sleep(time.Millisecond) - case mdbFree >= n: - return false - case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): - delayed = true - err = db.compTriggerWait(db.tcompCmdC) - if err != nil { - return false - } - default: - // Allow memdb to grow if it has no entry. - if mdb.Len() == 0 { - mdbFree = n - } else { - mdb.decref() - mdb, err = db.rotateMem(n, false) - if err == nil { - mdbFree = mdb.Free() - } else { - mdbFree = 0 - } - } - return false - } - return true - } - start := time.Now() - for flush() { - } - if delayed { - db.writeDelay += time.Since(start) - db.writeDelayN++ - } else if db.writeDelayN > 0 { - db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) - db.writeDelay = 0 - db.writeDelayN = 0 - } - return -} - -// Write apply the given batch to the DB. The batch will be applied -// sequentially. -// -// It is safe to modify the contents of the arguments after Write returns. -func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { - err = db.ok() - if err != nil || b == nil || b.Len() == 0 { - return - } - - b.init(wo.GetSync() && !db.s.o.GetNoSync()) - - if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { - // Writes using transaction. - tr, err1 := db.OpenTransaction() - if err1 != nil { - return err1 - } - if err1 := tr.Write(b, wo); err1 != nil { - tr.Discard() - return err1 - } - return tr.Commit() - } - - // The write happen synchronously. - select { - case db.writeC <- b: - if <-db.writeMergedC { - return <-db.writeAckC - } - // Continue, the write lock already acquired by previous writer - // and handed out to us. - case db.writeLockC <- struct{}{}: - case err = <-db.compPerErrC: - return - case _, _ = <-db.closeC: - return ErrClosed - } - - merged := 0 - danglingMerge := false - defer func() { - for i := 0; i < merged; i++ { - db.writeAckC <- err - } - if danglingMerge { - // Only one dangling merge at most, so this is safe. - db.writeMergedC <- false - } else { - <-db.writeLockC - } - }() - - mdb, mdbFree, err := db.flush(b.size()) - if err != nil { - return - } - defer mdb.decref() - - // Calculate maximum size of the batch. - m := 1 << 20 - if x := b.size(); x <= 128<<10 { - m = x + (128 << 10) - } - m = minInt(m, mdbFree) - - // Merge with other batch. -drain: - for b.size() < m && !b.sync { - select { - case nb := <-db.writeC: - if b.size()+nb.size() <= m { - b.append(nb) - db.writeMergedC <- true - merged++ - } else { - danglingMerge = true - break drain - } - default: - break drain - } - } - - // Set batch first seq number relative from last seq. - b.seq = db.seq + 1 - - // Write journal concurrently if it is large enough. - if b.size() >= (128 << 10) { - // Push the write batch to the journal writer - select { - case db.journalC <- b: - // Write into memdb - if berr := b.memReplay(mdb.DB); berr != nil { - panic(berr) - } - case err = <-db.compPerErrC: - return - case _, _ = <-db.closeC: - err = ErrClosed - return - } - // Wait for journal writer - select { - case err = <-db.journalAckC: - if err != nil { - // Revert memdb if error detected - if berr := b.revertMemReplay(mdb.DB); berr != nil { - panic(berr) - } - return - } - case _, _ = <-db.closeC: - err = ErrClosed - return - } - } else { - err = db.writeJournal(b) - if err != nil { - return - } - if berr := b.memReplay(mdb.DB); berr != nil { - panic(berr) - } - } - - // Set last seq number. - db.addSeq(uint64(b.Len())) - - if b.size() >= mdbFree { - db.rotateMem(0, false) - } - return -} - -// Put sets the value for the given key. It overwrites any previous value -// for that key; a DB is not a multi-map. -// -// It is safe to modify the contents of the arguments after Put returns. -func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { - b := new(Batch) - b.Put(key, value) - return db.Write(b, wo) -} - -// Delete deletes the value for the given key. -// -// It is safe to modify the contents of the arguments after Delete returns. -func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { - b := new(Batch) - b.Delete(key) - return db.Write(b, wo) -} - -func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { - iter := mem.NewIterator(nil) - defer iter.Release() - return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) && - (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0)) -} - -// CompactRange compacts the underlying DB for the given key range. -// In particular, deleted and overwritten versions are discarded, -// and the data is rearranged to reduce the cost of operations -// needed to access the data. This operation should typically only -// be invoked by users who understand the underlying implementation. -// -// A nil Range.Start is treated as a key before all keys in the DB. -// And a nil Range.Limit is treated as a key after all keys in the DB. -// Therefore if both is nil then it will compact entire DB. -func (db *DB) CompactRange(r util.Range) error { - if err := db.ok(); err != nil { - return err - } - - // Lock writer. - select { - case db.writeLockC <- struct{}{}: - case err := <-db.compPerErrC: - return err - case _, _ = <-db.closeC: - return ErrClosed - } - - // Check for overlaps in memdb. - mdb := db.getEffectiveMem() - defer mdb.decref() - if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { - // Memdb compaction. - if _, err := db.rotateMem(0, false); err != nil { - <-db.writeLockC - return err - } - <-db.writeLockC - if err := db.compTriggerWait(db.mcompCmdC); err != nil { - return err - } - } else { - <-db.writeLockC - } - - // Table compaction. - return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit) -} - -// SetReadOnly makes DB read-only. It will stay read-only until reopened. -func (db *DB) SetReadOnly() error { - if err := db.ok(); err != nil { - return err - } - - // Lock writer. - select { - case db.writeLockC <- struct{}{}: - db.compWriteLocking = true - case err := <-db.compPerErrC: - return err - case _, _ = <-db.closeC: - return ErrClosed - } - - // Set compaction read-only. - select { - case db.compErrSetC <- ErrReadOnly: - case perr := <-db.compPerErrC: - return perr - case _, _ = <-db.closeC: - return ErrClosed - } - - return nil -} |