diff options
author | Antoine GIRARD <sapk@users.noreply.github.com> | 2018-11-11 00:55:36 +0100 |
---|---|---|
committer | techknowlogick <hello@techknowlogick.com> | 2018-11-10 18:55:36 -0500 |
commit | 4c1f1f96465e809161f7d634a07eb60b4511db35 (patch) | |
tree | 923ab95c7c96c133ce39fce7ca2b9872f116fb2c /vendor/github.com/blevesearch | |
parent | b3000ae623c30a58d3eb25c9fd7104db274381b7 (diff) | |
download | gitea-4c1f1f96465e809161f7d634a07eb60b4511db35.tar.gz gitea-4c1f1f96465e809161f7d634a07eb60b4511db35.zip |
Remove x/net/context vendor by using std package (#5202)
* Update dep github.com/markbates/goth
* Update dep github.com/blevesearch/bleve
* Update dep golang.org/x/oauth2
* Fix github.com/blevesearch/bleve to c74e08f039e56cef576e4336382b2a2d12d9e026
* Update dep golang.org/x/oauth2
Diffstat (limited to 'vendor/github.com/blevesearch')
24 files changed, 1044 insertions, 466 deletions
diff --git a/vendor/github.com/blevesearch/bleve/index.go b/vendor/github.com/blevesearch/bleve/index.go index e85652d967..ea7b3832ac 100644 --- a/vendor/github.com/blevesearch/bleve/index.go +++ b/vendor/github.com/blevesearch/bleve/index.go @@ -15,11 +15,12 @@ package bleve import ( + "context" + "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/mapping" - "golang.org/x/net/context" ) // A Batch groups together multiple Index and Delete diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go index 4499fa41bd..1a7d656ca7 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go @@ -100,8 +100,8 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { // prepare new index snapshot newSnapshot := &IndexSnapshot{ parent: s, - segment: make([]*SegmentSnapshot, nsegs, nsegs+1), - offsets: make([]uint64, nsegs, nsegs+1), + segment: make([]*SegmentSnapshot, 0, nsegs+1), + offsets: make([]uint64, 0, nsegs+1), internal: make(map[string][]byte, len(s.root.internal)), epoch: s.nextSnapshotEpoch, refs: 1, @@ -124,24 +124,29 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { return err } } - newSnapshot.segment[i] = &SegmentSnapshot{ + + newss := &SegmentSnapshot{ id: s.root.segment[i].id, segment: s.root.segment[i].segment, cachedDocs: s.root.segment[i].cachedDocs, } - s.root.segment[i].segment.AddRef() // apply new obsoletions if s.root.segment[i].deleted == nil { - newSnapshot.segment[i].deleted = delta + newss.deleted = delta } else { - newSnapshot.segment[i].deleted = roaring.Or(s.root.segment[i].deleted, delta) + newss.deleted = roaring.Or(s.root.segment[i].deleted, delta) } - newSnapshot.offsets[i] = running - running += s.root.segment[i].Count() - + // check for live size before copying + if newss.LiveSize() > 0 { + newSnapshot.segment = append(newSnapshot.segment, newss) + s.root.segment[i].segment.AddRef() + newSnapshot.offsets = append(newSnapshot.offsets, running) + running += s.root.segment[i].Count() + } } + // append new segment, if any, to end of the new index snapshot if next.data != nil { newSegmentSnapshot := &SegmentSnapshot{ @@ -193,6 +198,12 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { // prepare new index snapshot currSize := len(s.root.segment) newSize := currSize + 1 - len(nextMerge.old) + + // empty segments deletion + if nextMerge.new == nil { + newSize-- + } + newSnapshot := &IndexSnapshot{ parent: s, segment: make([]*SegmentSnapshot, 0, newSize), @@ -210,7 +221,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { segmentID := s.root.segment[i].id if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok { // this segment is going away, see if anything else was deleted since we started the merge - if s.root.segment[i].deleted != nil { + if segSnapAtMerge != nil && s.root.segment[i].deleted != nil { // assume all these deletes are new deletedSince := s.root.segment[i].deleted // if we already knew about some of them, remove @@ -224,7 +235,13 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { newSegmentDeleted.Add(uint32(newDocNum)) } } - } else { + // clean up the old segment map to figure out the + // obsolete segments wrt root in meantime, whatever + // segments left behind in old map after processing + // the root segments would be the obsolete segment set + delete(nextMerge.old, segmentID) + + } else if s.root.segment[i].LiveSize() > 0 { // this segment is staying newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ id: s.root.segment[i].id, @@ -238,14 +255,35 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { } } - // put new segment at end - newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ - id: nextMerge.id, - segment: nextMerge.new, // take ownership for nextMerge.new's ref-count - deleted: newSegmentDeleted, - cachedDocs: &cachedDocs{cache: nil}, - }) - newSnapshot.offsets = append(newSnapshot.offsets, running) + // before the newMerge introduction, need to clean the newly + // merged segment wrt the current root segments, hence + // applying the obsolete segment contents to newly merged segment + for segID, ss := range nextMerge.old { + obsoleted := ss.DocNumbersLive() + if obsoleted != nil { + obsoletedIter := obsoleted.Iterator() + for obsoletedIter.HasNext() { + oldDocNum := obsoletedIter.Next() + newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum] + newSegmentDeleted.Add(uint32(newDocNum)) + } + } + } + // In case where all the docs in the newly merged segment getting + // deleted by the time we reach here, can skip the introduction. + if nextMerge.new != nil && + nextMerge.new.Count() > newSegmentDeleted.GetCardinality() { + // put new segment at end + newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ + id: nextMerge.id, + segment: nextMerge.new, // take ownership for nextMerge.new's ref-count + deleted: newSegmentDeleted, + cachedDocs: &cachedDocs{cache: nil}, + }) + newSnapshot.offsets = append(newSnapshot.offsets, running) + } + + newSnapshot.AddRef() // 1 ref for the nextMerge.notify response // swap in new segment rootPrev := s.root @@ -257,7 +295,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { _ = rootPrev.DecRef() } - // notify merger we incorporated this + // notify requester that we incorporated this + nextMerge.notify <- newSnapshot close(nextMerge.notify) } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go index 5ded29b5a3..ad756588a6 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go @@ -15,6 +15,9 @@ package scorch import ( + "bytes" + "encoding/json" + "fmt" "os" "sync/atomic" @@ -28,6 +31,13 @@ import ( func (s *Scorch) mergerLoop() { var lastEpochMergePlanned uint64 + mergePlannerOptions, err := s.parseMergePlannerOptions() + if err != nil { + s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err)) + s.asyncTasks.Done() + return + } + OUTER: for { select { @@ -45,7 +55,7 @@ OUTER: startTime := time.Now() // lets get started - err := s.planMergeAtSnapshot(ourSnapshot) + err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions) if err != nil { s.fireAsyncError(fmt.Errorf("merging err: %v", err)) _ = ourSnapshot.DecRef() @@ -58,51 +68,49 @@ OUTER: _ = ourSnapshot.DecRef() // tell the persister we're waiting for changes - // first make a notification chan - notifyUs := make(notificationChan) + // first make a epochWatcher chan + ew := &epochWatcher{ + epoch: lastEpochMergePlanned, + notifyCh: make(notificationChan, 1), + } // give it to the persister select { case <-s.closeCh: break OUTER - case s.persisterNotifier <- notifyUs: - } - - // check again - s.rootLock.RLock() - ourSnapshot = s.root - ourSnapshot.AddRef() - s.rootLock.RUnlock() - - if ourSnapshot.epoch != lastEpochMergePlanned { - startTime := time.Now() - - // lets get started - err := s.planMergeAtSnapshot(ourSnapshot) - if err != nil { - s.fireAsyncError(fmt.Errorf("merging err: %v", err)) - _ = ourSnapshot.DecRef() - continue OUTER - } - lastEpochMergePlanned = ourSnapshot.epoch - - s.fireEvent(EventKindMergerProgress, time.Since(startTime)) + case s.persisterNotifier <- ew: } - _ = ourSnapshot.DecRef() - // now wait for it (but also detect close) + // now wait for persister (but also detect close) select { case <-s.closeCh: break OUTER - case <-notifyUs: - // woken up, next loop should pick up work + case <-ew.notifyCh: } } } s.asyncTasks.Done() } -func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { +func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions, + error) { + mergePlannerOptions := mergeplan.DefaultMergePlanOptions + if v, ok := s.config["scorchMergePlanOptions"]; ok { + b, err := json.Marshal(v) + if err != nil { + return &mergePlannerOptions, err + } + + err = json.Unmarshal(b, &mergePlannerOptions) + if err != nil { + return &mergePlannerOptions, err + } + } + return &mergePlannerOptions, nil +} + +func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, + options *mergeplan.MergePlanOptions) error { // build list of zap segments in this snapshot var onlyZapSnapshots []mergeplan.Segment for _, segmentSnapshot := range ourSnapshot.segment { @@ -112,7 +120,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { } // give this list to the planner - resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil) + resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, options) if err != nil { return fmt.Errorf("merge planning err: %v", err) } @@ -122,8 +130,12 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { } // process tasks in serial for now - var notifications []notificationChan + var notifications []chan *IndexSnapshot for _, task := range resultMergePlan.Tasks { + if len(task.Segments) == 0 { + continue + } + oldMap := make(map[uint64]*SegmentSnapshot) newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1) segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments)) @@ -132,40 +144,51 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok { oldMap[segSnapshot.id] = segSnapshot if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok { - segmentsToMerge = append(segmentsToMerge, zapSeg) - docsToDrop = append(docsToDrop, segSnapshot.deleted) + if segSnapshot.LiveSize() == 0 { + oldMap[segSnapshot.id] = nil + } else { + segmentsToMerge = append(segmentsToMerge, zapSeg) + docsToDrop = append(docsToDrop, segSnapshot.deleted) + } } } } - filename := zapFileName(newSegmentID) - s.markIneligibleForRemoval(filename) - path := s.path + string(os.PathSeparator) + filename - newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor) - if err != nil { - s.unmarkIneligibleForRemoval(filename) - return fmt.Errorf("merging failed: %v", err) - } - segment, err := zap.Open(path) - if err != nil { - s.unmarkIneligibleForRemoval(filename) - return err + var oldNewDocNums map[uint64][]uint64 + var segment segment.Segment + if len(segmentsToMerge) > 0 { + filename := zapFileName(newSegmentID) + s.markIneligibleForRemoval(filename) + path := s.path + string(os.PathSeparator) + filename + newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024) + if err != nil { + s.unmarkIneligibleForRemoval(filename) + return fmt.Errorf("merging failed: %v", err) + } + segment, err = zap.Open(path) + if err != nil { + s.unmarkIneligibleForRemoval(filename) + return err + } + oldNewDocNums = make(map[uint64][]uint64) + for i, segNewDocNums := range newDocNums { + oldNewDocNums[task.Segments[i].Id()] = segNewDocNums + } } + sm := &segmentMerge{ id: newSegmentID, old: oldMap, - oldNewDocNums: make(map[uint64][]uint64), + oldNewDocNums: oldNewDocNums, new: segment, - notify: make(notificationChan), + notify: make(chan *IndexSnapshot, 1), } notifications = append(notifications, sm.notify) - for i, segNewDocNums := range newDocNums { - sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums - } // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } @@ -174,7 +197,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { select { case <-s.closeCh: return nil - case <-notification: + case newSnapshot := <-notification: + if newSnapshot != nil { + _ = newSnapshot.DecRef() + } } } return nil @@ -185,5 +211,72 @@ type segmentMerge struct { old map[uint64]*SegmentSnapshot oldNewDocNums map[uint64][]uint64 new segment.Segment - notify notificationChan + notify chan *IndexSnapshot +} + +// perform a merging of the given SegmentBase instances into a new, +// persisted segment, and synchronously introduce that new segment +// into the root +func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, + sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int, + chunkFactor uint32) (uint64, *IndexSnapshot, uint64, error) { + var br bytes.Buffer + + cr := zap.NewCountHashWriter(&br) + + newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, + docValueOffset, dictLocs, fieldsInv, fieldsMap, err := + zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr) + if err != nil { + return 0, nil, 0, err + } + + sb, err := zap.InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor, + fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset, + docValueOffset, dictLocs) + if err != nil { + return 0, nil, 0, err + } + + newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1) + + filename := zapFileName(newSegmentID) + path := s.path + string(os.PathSeparator) + filename + err = zap.PersistSegmentBase(sb, path) + if err != nil { + return 0, nil, 0, err + } + + segment, err := zap.Open(path) + if err != nil { + return 0, nil, 0, err + } + + sm := &segmentMerge{ + id: newSegmentID, + old: make(map[uint64]*SegmentSnapshot), + oldNewDocNums: make(map[uint64][]uint64), + new: segment, + notify: make(chan *IndexSnapshot, 1), + } + + for i, idx := range sbsIndexes { + ss := snapshot.segment[idx] + sm.old[ss.id] = ss + sm.oldNewDocNums[ss.id] = newDocNums[i] + } + + select { // send to introducer + case <-s.closeCh: + _ = segment.DecRef() + return 0, nil, 0, nil // TODO: return ErrInterruptedClosed? + case s.merges <- sm: + } + + select { // wait for introduction to complete + case <-s.closeCh: + return 0, nil, 0, nil // TODO: return ErrInterruptedClosed? + case newSnapshot := <-sm.notify: + return numDocs, newSnapshot, newSegmentID, nil + } } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go b/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go index 0afc3ce5c6..62f643f431 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go @@ -186,13 +186,13 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) { // While we’re over budget, keep looping, which might produce // another MergeTask. - for len(eligibles) > budgetNumSegments { + for len(eligibles) > 0 && (len(eligibles)+len(rv.Tasks)) > budgetNumSegments { // Track a current best roster as we examine and score // potential rosters of merges. var bestRoster []Segment var bestRosterScore float64 // Lower score is better. - for startIdx := 0; startIdx < len(eligibles)-o.SegmentsPerMergeTask; startIdx++ { + for startIdx := 0; startIdx < len(eligibles); startIdx++ { var roster []Segment var rosterLiveSize int64 diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go index cdcee37c2e..c21bb14394 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go @@ -34,22 +34,39 @@ import ( var DefaultChunkFactor uint32 = 1024 +// Arbitrary number, need to make it configurable. +// Lower values like 10/making persister really slow +// doesn't work well as it is creating more files to +// persist for in next persist iteration and spikes the # FDs. +// Ideal value should let persister also proceed at +// an optimum pace so that the merger can skip +// many intermediate snapshots. +// This needs to be based on empirical data. +// TODO - may need to revisit this approach/value. +var epochDistance = uint64(5) + type notificationChan chan struct{} func (s *Scorch) persisterLoop() { defer s.asyncTasks.Done() - var notifyChs []notificationChan - var lastPersistedEpoch uint64 + var persistWatchers []*epochWatcher + var lastPersistedEpoch, lastMergedEpoch uint64 + var ew *epochWatcher OUTER: for { select { case <-s.closeCh: break OUTER - case notifyCh := <-s.persisterNotifier: - notifyChs = append(notifyChs, notifyCh) + case ew = <-s.persisterNotifier: + persistWatchers = append(persistWatchers, ew) default: } + if ew != nil && ew.epoch > lastMergedEpoch { + lastMergedEpoch = ew.epoch + } + persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch, + &lastMergedEpoch, persistWatchers) var ourSnapshot *IndexSnapshot var ourPersisted []chan error @@ -81,10 +98,11 @@ OUTER: } lastPersistedEpoch = ourSnapshot.epoch - for _, notifyCh := range notifyChs { - close(notifyCh) + for _, ew := range persistWatchers { + close(ew.notifyCh) } - notifyChs = nil + + persistWatchers = nil _ = ourSnapshot.DecRef() changed := false @@ -120,27 +138,155 @@ OUTER: break OUTER case <-w.notifyCh: // woken up, next loop should pick up work + continue OUTER + case ew = <-s.persisterNotifier: + // if the watchers are already caught up then let them wait, + // else let them continue to do the catch up + persistWatchers = append(persistWatchers, ew) + } + } +} + +func notifyMergeWatchers(lastPersistedEpoch uint64, + persistWatchers []*epochWatcher) []*epochWatcher { + var watchersNext []*epochWatcher + for _, w := range persistWatchers { + if w.epoch < lastPersistedEpoch { + close(w.notifyCh) + } else { + watchersNext = append(watchersNext, w) } } + return watchersNext +} + +func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch *uint64, + persistWatchers []*epochWatcher) []*epochWatcher { + + // first, let the watchers proceed if they lag behind + persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers) + +OUTER: + // check for slow merger and await until the merger catch up + for lastPersistedEpoch > *lastMergedEpoch+epochDistance { + + select { + case <-s.closeCh: + break OUTER + case ew := <-s.persisterNotifier: + persistWatchers = append(persistWatchers, ew) + *lastMergedEpoch = ew.epoch + } + + // let the watchers proceed if they lag behind + persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers) + } + + return persistWatchers } func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { - // start a write transaction - tx, err := s.rootBolt.Begin(true) + persisted, err := s.persistSnapshotMaybeMerge(snapshot) if err != nil { return err } - // defer fsync of the rootbolt - defer func() { - if err == nil { - err = s.rootBolt.Sync() + if persisted { + return nil + } + + return s.persistSnapshotDirect(snapshot) +} + +// DefaultMinSegmentsForInMemoryMerge represents the default number of +// in-memory zap segments that persistSnapshotMaybeMerge() needs to +// see in an IndexSnapshot before it decides to merge and persist +// those segments +var DefaultMinSegmentsForInMemoryMerge = 2 + +// persistSnapshotMaybeMerge examines the snapshot and might merge and +// persist the in-memory zap segments if there are enough of them +func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( + bool, error) { + // collect the in-memory zap segments (SegmentBase instances) + var sbs []*zap.SegmentBase + var sbsDrops []*roaring.Bitmap + var sbsIndexes []int + + for i, segmentSnapshot := range snapshot.segment { + if sb, ok := segmentSnapshot.segment.(*zap.SegmentBase); ok { + sbs = append(sbs, sb) + sbsDrops = append(sbsDrops, segmentSnapshot.deleted) + sbsIndexes = append(sbsIndexes, i) } + } + + if len(sbs) < DefaultMinSegmentsForInMemoryMerge { + return false, nil + } + + _, newSnapshot, newSegmentID, err := s.mergeSegmentBases( + snapshot, sbs, sbsDrops, sbsIndexes, DefaultChunkFactor) + if err != nil { + return false, err + } + if newSnapshot == nil { + return false, nil + } + + defer func() { + _ = newSnapshot.DecRef() }() - // defer commit/rollback transaction + + mergedSegmentIDs := map[uint64]struct{}{} + for _, idx := range sbsIndexes { + mergedSegmentIDs[snapshot.segment[idx].id] = struct{}{} + } + + // construct a snapshot that's logically equivalent to the input + // snapshot, but with merged segments replaced by the new segment + equiv := &IndexSnapshot{ + parent: snapshot.parent, + segment: make([]*SegmentSnapshot, 0, len(snapshot.segment)), + internal: snapshot.internal, + epoch: snapshot.epoch, + } + + // copy to the equiv the segments that weren't replaced + for _, segment := range snapshot.segment { + if _, wasMerged := mergedSegmentIDs[segment.id]; !wasMerged { + equiv.segment = append(equiv.segment, segment) + } + } + + // append to the equiv the new segment + for _, segment := range newSnapshot.segment { + if segment.id == newSegmentID { + equiv.segment = append(equiv.segment, &SegmentSnapshot{ + id: newSegmentID, + segment: segment.segment, + deleted: nil, // nil since merging handled deletions + }) + break + } + } + + err = s.persistSnapshotDirect(equiv) + if err != nil { + return false, err + } + + return true, nil +} + +func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) { + // start a write transaction + tx, err := s.rootBolt.Begin(true) + if err != nil { + return err + } + // defer rollback on error defer func() { - if err == nil { - err = tx.Commit() - } else { + if err != nil { _ = tx.Rollback() } }() @@ -172,20 +318,20 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { newSegmentPaths := make(map[uint64]string) // first ensure that each segment in this snapshot has been persisted - for i, segmentSnapshot := range snapshot.segment { - snapshotSegmentKey := segment.EncodeUvarintAscending(nil, uint64(i)) - snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) - if err2 != nil { - return err2 + for _, segmentSnapshot := range snapshot.segment { + snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) + snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) + if err != nil { + return err } switch seg := segmentSnapshot.segment.(type) { case *zap.SegmentBase: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) path := s.path + string(os.PathSeparator) + filename - err2 := zap.PersistSegmentBase(seg, path) - if err2 != nil { - return fmt.Errorf("error persisting segment: %v", err2) + err = zap.PersistSegmentBase(seg, path) + if err != nil { + return fmt.Errorf("error persisting segment: %v", err) } newSegmentPaths[segmentSnapshot.id] = path err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) @@ -218,19 +364,28 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { } } - // only alter the root if we actually persisted a segment - // (sometimes its just a new snapshot, possibly with new internal values) + // we need to swap in a new root only when we've persisted 1 or + // more segments -- whereby the new root would have 1-for-1 + // replacements of in-memory segments with file-based segments + // + // other cases like updates to internal values only, and/or when + // there are only deletions, are already covered and persisted by + // the newly populated boltdb snapshotBucket above if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -255,6 +410,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else { @@ -266,9 +422,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { for k, v := range s.root.internal { newIndexSnapshot.internal[k] = v } - for _, filename := range filenames { - delete(s.ineligibleForRemoval, filename) - } + rootPrev := s.root s.root = newIndexSnapshot s.rootLock.Unlock() @@ -277,6 +431,24 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { } } + err = tx.Commit() + if err != nil { + return err + } + + err = s.rootBolt.Sync() + if err != nil { + return err + } + + // allow files to become eligible for removal after commit, such + // as file segments from snapshots that came from the merger + s.rootLock.Lock() + for _, filename := range filenames { + delete(s.ineligibleForRemoval, filename) + } + s.rootLock.Unlock() + return nil } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go index 311077653a..f539313d1c 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go @@ -61,7 +61,7 @@ type Scorch struct { merges chan *segmentMerge introducerNotifier chan *epochWatcher revertToSnapshots chan *snapshotReversion - persisterNotifier chan notificationChan + persisterNotifier chan *epochWatcher rootBolt *bolt.DB asyncTasks sync.WaitGroup @@ -114,6 +114,25 @@ func (s *Scorch) fireAsyncError(err error) { } func (s *Scorch) Open() error { + err := s.openBolt() + if err != nil { + return err + } + + s.asyncTasks.Add(1) + go s.mainLoop() + + if !s.readOnly && s.path != "" { + s.asyncTasks.Add(1) + go s.persisterLoop() + s.asyncTasks.Add(1) + go s.mergerLoop() + } + + return nil +} + +func (s *Scorch) openBolt() error { var ok bool s.path, ok = s.config["path"].(string) if !ok { @@ -136,6 +155,7 @@ func (s *Scorch) Open() error { } } } + rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt" var err error if s.path != "" { @@ -156,7 +176,7 @@ func (s *Scorch) Open() error { s.merges = make(chan *segmentMerge) s.introducerNotifier = make(chan *epochWatcher, 1) s.revertToSnapshots = make(chan *snapshotReversion) - s.persisterNotifier = make(chan notificationChan) + s.persisterNotifier = make(chan *epochWatcher, 1) if !s.readOnly && s.path != "" { err := s.removeOldZapFiles() // Before persister or merger create any new files. @@ -166,16 +186,6 @@ func (s *Scorch) Open() error { } } - s.asyncTasks.Add(1) - go s.mainLoop() - - if !s.readOnly && s.path != "" { - s.asyncTasks.Add(1) - go s.persisterLoop() - s.asyncTasks.Add(1) - go s.mergerLoop() - } - return nil } @@ -310,17 +320,21 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, introduction.persisted = make(chan error, 1) } - // get read lock, to optimistically prepare obsoleted info + // optimistically prepare obsoletes outside of rootLock s.rootLock.RLock() - for _, seg := range s.root.segment { + root := s.root + root.AddRef() + s.rootLock.RUnlock() + + for _, seg := range root.segment { delta, err := seg.segment.DocNumbers(ids) if err != nil { - s.rootLock.RUnlock() return err } introduction.obsoletes[seg.id] = delta } - s.rootLock.RUnlock() + + _ = root.DecRef() s.introductions <- introduction diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/build.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/build.go index d3344ce301..57d60dc890 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/build.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/build.go @@ -95,6 +95,21 @@ func (s *Segment) initializeDict(results []*index.AnalysisResult) { var numTokenFrequencies int var totLocs int + // initial scan for all fieldID's to sort them + for _, result := range results { + for _, field := range result.Document.CompositeFields { + s.getOrDefineField(field.Name()) + } + for _, field := range result.Document.Fields { + s.getOrDefineField(field.Name()) + } + } + sort.Strings(s.FieldsInv[1:]) // keep _id as first field + s.FieldsMap = make(map[string]uint16, len(s.FieldsInv)) + for fieldID, fieldName := range s.FieldsInv { + s.FieldsMap[fieldName] = uint16(fieldID + 1) + } + processField := func(fieldID uint16, tfs analysis.TokenFrequencies) { for term, tf := range tfs { pidPlus1, exists := s.Dicts[fieldID][term] diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/dict.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/dict.go index 939c287e98..cf92ef71f6 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/dict.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/mem/dict.go @@ -76,6 +76,8 @@ type DictionaryIterator struct { prefix string end string offset int + + dictEntry index.DictEntry // reused across Next()'s } // Next returns the next entry in the dictionary @@ -95,8 +97,7 @@ func (d *DictionaryIterator) Next() (*index.DictEntry, error) { d.offset++ postingID := d.d.segment.Dicts[d.d.fieldID][next] - return &index.DictEntry{ - Term: next, - Count: d.d.segment.Postings[postingID-1].GetCardinality(), - }, nil + d.dictEntry.Term = next + d.dictEntry.Count = d.d.segment.Postings[postingID-1].GetCardinality() + return &d.dictEntry, nil } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/build.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/build.go index 58f9faeaf6..72357ae7d7 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/build.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/build.go @@ -28,7 +28,7 @@ import ( "github.com/golang/snappy" ) -const version uint32 = 2 +const version uint32 = 3 const fieldNotUninverted = math.MaxUint64 @@ -187,79 +187,42 @@ func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint3 } func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { - var curr int var metaBuf bytes.Buffer var data, compressed []byte + metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) + docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) for docNum, storedValues := range memSegment.Stored { if docNum != 0 { // reset buffer if necessary + curr = 0 metaBuf.Reset() data = data[:0] compressed = compressed[:0] - curr = 0 } - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - st := memSegment.StoredTypes[docNum] sp := memSegment.StoredPos[docNum] // encode fields in order for fieldID := range memSegment.FieldsInv { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { - // has stored values for this field - num := len(storedFieldValues) - stf := st[uint16(fieldID)] spf := sp[uint16(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(stf[i])) - if err2 != nil { - return 0, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) - if err2 != nil { - return 0, err2 - } - // encode all array positions - for _, pos := range spf[i] { - _, err2 = metaEncoder.PutU64(pos) - if err2 != nil { - return 0, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, err2 } } } - metaEncoder.Close() + metaEncoder.Close() metaBytes := metaBuf.Bytes() // compress the data @@ -299,6 +262,51 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) return rv, nil } +func persistStoredFieldValues(fieldID int, + storedFieldValues [][]byte, stf []byte, spf [][]uint64, + curr int, metaEncoder *govarint.Base128Encoder, data []byte) ( + int, []byte, error) { + for i := 0; i < len(storedFieldValues); i++ { + // encode field + _, err := metaEncoder.PutU64(uint64(fieldID)) + if err != nil { + return 0, nil, err + } + // encode type + _, err = metaEncoder.PutU64(uint64(stf[i])) + if err != nil { + return 0, nil, err + } + // encode start offset + _, err = metaEncoder.PutU64(uint64(curr)) + if err != nil { + return 0, nil, err + } + // end len + _, err = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) + if err != nil { + return 0, nil, err + } + // encode number of array pos + _, err = metaEncoder.PutU64(uint64(len(spf[i]))) + if err != nil { + return 0, nil, err + } + // encode all array positions + for _, pos := range spf[i] { + _, err = metaEncoder.PutU64(pos) + if err != nil { + return 0, nil, err + } + } + + data = append(data, storedFieldValues[i]...) + curr += len(storedFieldValues[i]) + } + + return curr, data, nil +} + func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { var freqOffsets, locOfffsets []uint64 tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) @@ -580,7 +588,7 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter, if err != nil { return nil, err } - // resetting encoder for the next field + // reseting encoder for the next field fdvEncoder.Reset() } @@ -625,12 +633,21 @@ func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, return nil, err } + return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor, + memSegment.FieldsMap, memSegment.FieldsInv, numDocs, + storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs) +} + +func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32, + fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, + storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64, + dictLocs []uint64) (*SegmentBase, error) { sb := &SegmentBase{ - mem: br.Bytes(), - memCRC: cr.Sum32(), + mem: mem, + memCRC: memCRC, chunkFactor: chunkFactor, - fieldsMap: memSegment.FieldsMap, - fieldsInv: memSegment.FieldsInv, + fieldsMap: fieldsMap, + fieldsInv: fieldsInv, numDocs: numDocs, storedIndexOffset: storedIndexOffset, fieldsIndexOffset: fieldsIndexOffset, @@ -639,7 +656,7 @@ func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, fieldDvIterMap: make(map[uint16]*docValueIterator), } - err = sb.loadDvIterators() + err := sb.loadDvIterators() if err != nil { return nil, err } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/contentcoder.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/contentcoder.go index b03940497f..83457146ec 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/contentcoder.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/contentcoder.go @@ -39,7 +39,7 @@ type chunkedContentCoder struct { // MetaData represents the data information inside a // chunk. type MetaData struct { - DocID uint64 // docid of the data inside the chunk + DocNum uint64 // docNum of the data inside the chunk DocDvLoc uint64 // starting offset for a given docid DocDvLen uint64 // length of data inside the chunk for the given docid } @@ -52,7 +52,7 @@ func newChunkedContentCoder(chunkSize uint64, rv := &chunkedContentCoder{ chunkSize: chunkSize, chunkLens: make([]uint64, total), - chunkMeta: []MetaData{}, + chunkMeta: make([]MetaData, 0, total), } return rv @@ -68,7 +68,7 @@ func (c *chunkedContentCoder) Reset() { for i := range c.chunkLens { c.chunkLens[i] = 0 } - c.chunkMeta = []MetaData{} + c.chunkMeta = c.chunkMeta[:0] } // Close indicates you are done calling Add() this allows @@ -88,7 +88,7 @@ func (c *chunkedContentCoder) flushContents() error { // write out the metaData slice for _, meta := range c.chunkMeta { - _, err := writeUvarints(&c.chunkMetaBuf, meta.DocID, meta.DocDvLoc, meta.DocDvLen) + _, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvLoc, meta.DocDvLen) if err != nil { return err } @@ -118,7 +118,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { // clearing the chunk specific meta for next chunk c.chunkBuf.Reset() c.chunkMetaBuf.Reset() - c.chunkMeta = []MetaData{} + c.chunkMeta = c.chunkMeta[:0] c.currChunk = chunk } @@ -130,7 +130,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { } c.chunkMeta = append(c.chunkMeta, MetaData{ - DocID: docNum, + DocNum: docNum, DocDvLoc: uint64(dvOffset), DocDvLen: uint64(dvSize), }) diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go index 0f5145fba8..e5d7126866 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go @@ -34,32 +34,47 @@ type Dictionary struct { // PostingsList returns the postings list for the specified term func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.PostingsList, error) { - return d.postingsList([]byte(term), except) + return d.postingsList([]byte(term), except, nil) } -func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap) (*PostingsList, error) { - rv := &PostingsList{ - sb: d.sb, - term: term, - except: except, +func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + if d.fst == nil { + return d.postingsListInit(rv, except), nil } - if d.fst != nil { - postingsOffset, exists, err := d.fst.Get(term) - if err != nil { - return nil, fmt.Errorf("vellum err: %v", err) - } - if exists { - err = rv.read(postingsOffset, d) - if err != nil { - return nil, err - } - } + postingsOffset, exists, err := d.fst.Get(term) + if err != nil { + return nil, fmt.Errorf("vellum err: %v", err) + } + if !exists { + return d.postingsListInit(rv, except), nil + } + + return d.postingsListFromOffset(postingsOffset, except, rv) +} + +func (d *Dictionary) postingsListFromOffset(postingsOffset uint64, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + rv = d.postingsListInit(rv, except) + + err := rv.read(postingsOffset, d) + if err != nil { + return nil, err } return rv, nil } +func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) *PostingsList { + if rv == nil { + rv = &PostingsList{} + } else { + *rv = PostingsList{} // clear the struct + } + rv.sb = d.sb + rv.except = except + return rv +} + // Iterator returns an iterator for this dictionary func (d *Dictionary) Iterator() segment.DictionaryIterator { rv := &DictionaryIterator{ diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go index fb5b348a5b..0514bd307c 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go @@ -99,7 +99,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, func (di *docValueIterator) loadDvChunk(chunkNumber, localDocNum uint64, s *SegmentBase) error { // advance to the chunk where the docValues - // reside for the given docID + // reside for the given docNum destChunkDataLoc := di.dvDataLoc for i := 0; i < int(chunkNumber); i++ { destChunkDataLoc += di.chunkLens[i] @@ -116,7 +116,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, offset := uint64(0) di.curChunkHeader = make([]MetaData, int(numDocs)) for i := 0; i < int(numDocs); i++ { - di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) @@ -131,10 +131,10 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, return nil } -func (di *docValueIterator) visitDocValues(docID uint64, +func (di *docValueIterator) visitDocValues(docNum uint64, visitor index.DocumentFieldTermVisitor) error { - // binary search the term locations for the docID - start, length := di.getDocValueLocs(docID) + // binary search the term locations for the docNum + start, length := di.getDocValueLocs(docNum) if start == math.MaxUint64 || length == math.MaxUint64 { return nil } @@ -144,7 +144,7 @@ func (di *docValueIterator) visitDocValues(docID uint64, return err } - // pick the terms for the given docID + // pick the terms for the given docNum uncompressed = uncompressed[start : start+length] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) @@ -159,11 +159,11 @@ func (di *docValueIterator) visitDocValues(docID uint64, return nil } -func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) { +func (di *docValueIterator) getDocValueLocs(docNum uint64) (uint64, uint64) { i := sort.Search(len(di.curChunkHeader), func(i int) bool { - return di.curChunkHeader[i].DocID >= docID + return di.curChunkHeader[i].DocNum >= docNum }) - if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocID == docID { + if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum { return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen } return math.MaxUint64, math.MaxUint64 diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/enumerator.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/enumerator.go new file mode 100644 index 0000000000..3c708dd577 --- /dev/null +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/enumerator.go @@ -0,0 +1,124 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + + "github.com/couchbase/vellum" +) + +// enumerator provides an ordered traversal of multiple vellum +// iterators. Like JOIN of iterators, the enumerator produces a +// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC, +// then iteratorIndex ASC, where the same key might be seen or +// repeated across multiple child iterators. +type enumerator struct { + itrs []vellum.Iterator + currKs [][]byte + currVs []uint64 + + lowK []byte + lowIdxs []int + lowCurr int +} + +// newEnumerator returns a new enumerator over the vellum Iterators +func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) { + rv := &enumerator{ + itrs: itrs, + currKs: make([][]byte, len(itrs)), + currVs: make([]uint64, len(itrs)), + lowIdxs: make([]int, 0, len(itrs)), + } + for i, itr := range rv.itrs { + rv.currKs[i], rv.currVs[i] = itr.Current() + } + rv.updateMatches() + if rv.lowK == nil { + return rv, vellum.ErrIteratorDone + } + return rv, nil +} + +// updateMatches maintains the low key matches based on the currKs +func (m *enumerator) updateMatches() { + m.lowK = nil + m.lowIdxs = m.lowIdxs[:0] + m.lowCurr = 0 + + for i, key := range m.currKs { + if key == nil { + continue + } + + cmp := bytes.Compare(key, m.lowK) + if cmp < 0 || m.lowK == nil { + // reached a new low + m.lowK = key + m.lowIdxs = m.lowIdxs[:0] + m.lowIdxs = append(m.lowIdxs, i) + } else if cmp == 0 { + m.lowIdxs = append(m.lowIdxs, i) + } + } +} + +// Current returns the enumerator's current key, iterator-index, and +// value. If the enumerator is not pointing at a valid value (because +// Next returned an error previously), Current will return nil,0,0. +func (m *enumerator) Current() ([]byte, int, uint64) { + var i int + var v uint64 + if m.lowCurr < len(m.lowIdxs) { + i = m.lowIdxs[m.lowCurr] + v = m.currVs[i] + } + return m.lowK, i, v +} + +// Next advances the enumerator to the next key/iterator/value result, +// else vellum.ErrIteratorDone is returned. +func (m *enumerator) Next() error { + m.lowCurr += 1 + if m.lowCurr >= len(m.lowIdxs) { + // move all the current low iterators forwards + for _, vi := range m.lowIdxs { + err := m.itrs[vi].Next() + if err != nil && err != vellum.ErrIteratorDone { + return err + } + m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current() + } + m.updateMatches() + } + if m.lowK == nil { + return vellum.ErrIteratorDone + } + return nil +} + +// Close all the underlying Iterators. The first error, if any, will +// be returned. +func (m *enumerator) Close() error { + var rv error + for _, itr := range m.itrs { + err := itr.Close() + if rv == nil { + rv = err + } + } + return rv +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/intcoder.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/intcoder.go index e9f295023b..b505fec94e 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/intcoder.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/intcoder.go @@ -30,6 +30,8 @@ type chunkedIntCoder struct { encoder *govarint.Base128Encoder chunkLens []uint64 currChunk uint64 + + buf []byte } // newChunkedIntCoder returns a new chunk int coder which packs data into @@ -67,12 +69,8 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { // starting a new chunk if c.encoder != nil { // close out last - c.encoder.Close() - encodingBytes := c.chunkBuf.Bytes() - c.chunkLens[c.currChunk] = uint64(len(encodingBytes)) - c.final = append(c.final, encodingBytes...) + c.Close() c.chunkBuf.Reset() - c.encoder = govarint.NewU64Base128Encoder(&c.chunkBuf) } c.currChunk = chunk } @@ -98,26 +96,25 @@ func (c *chunkedIntCoder) Close() { // Write commits all the encoded chunked integers to the provided writer. func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { - var tw int - buf := make([]byte, binary.MaxVarintLen64) - // write out the number of chunks + bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens)) + if len(c.buf) < bufNeeded { + c.buf = make([]byte, bufNeeded) + } + buf := c.buf + + // write out the number of chunks & each chunkLen n := binary.PutUvarint(buf, uint64(len(c.chunkLens))) - nw, err := w.Write(buf[:n]) - tw += nw + for _, chunkLen := range c.chunkLens { + n += binary.PutUvarint(buf[n:], uint64(chunkLen)) + } + + tw, err := w.Write(buf[:n]) if err != nil { return tw, err } - // write out the chunk lens - for _, chunkLen := range c.chunkLens { - n := binary.PutUvarint(buf, uint64(chunkLen)) - nw, err = w.Write(buf[:n]) - tw += nw - if err != nil { - return tw, err - } - } + // write out the data - nw, err = w.Write(c.final) + nw, err := w.Write(c.final) tw += nw if err != nil { return tw, err diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go index cc348d7207..ae8c5b197b 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "os" + "sort" "github.com/RoaringBitmap/roaring" "github.com/Smerity/govarint" @@ -28,6 +29,8 @@ import ( "github.com/golang/snappy" ) +const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc + // Merge takes a slice of zap segments and bit masks describing which // documents may be dropped, and creates a new segment containing the // remaining data. This new segment is built at the specified path, @@ -46,47 +49,26 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, _ = os.Remove(path) } + segmentBases := make([]*SegmentBase, len(segments)) + for segmenti, segment := range segments { + segmentBases[segmenti] = &segment.SegmentBase + } + // buffer the output br := bufio.NewWriter(f) // wrap it for counting (tracking offsets) cr := NewCountHashWriter(br) - fieldsInv := mergeFields(segments) - fieldsMap := mapFields(fieldsInv) - - var newDocNums [][]uint64 - var storedIndexOffset uint64 - fieldDvLocsOffset := uint64(fieldNotUninverted) - var dictLocs []uint64 - - newSegDocCount := computeNewDocCount(segments, drops) - if newSegDocCount > 0 { - storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, newSegDocCount, cr) - if err != nil { - cleanup() - return nil, err - } - - dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, - newDocNums, newSegDocCount, chunkFactor, cr) - if err != nil { - cleanup() - return nil, err - } - } else { - dictLocs = make([]uint64, len(fieldsInv)) - } - - fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs) + newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, _, _, err := + MergeToWriter(segmentBases, drops, chunkFactor, cr) if err != nil { cleanup() return nil, err } - err = persistFooter(newSegDocCount, storedIndexOffset, - fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr) + err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, + docValueOffset, chunkFactor, cr.Sum32(), cr) if err != nil { cleanup() return nil, err @@ -113,21 +95,59 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, return newDocNums, nil } -// mapFields takes the fieldsInv list and builds the map +func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, + chunkFactor uint32, cr *CountHashWriter) ( + newDocNums [][]uint64, + numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, + dictLocs []uint64, fieldsInv []string, fieldsMap map[string]uint16, + err error) { + docValueOffset = uint64(fieldNotUninverted) + + var fieldsSame bool + fieldsSame, fieldsInv = mergeFields(segments) + fieldsMap = mapFields(fieldsInv) + + numDocs = computeNewDocCount(segments, drops) + if numDocs > 0 { + storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, + fieldsMap, fieldsInv, fieldsSame, numDocs, cr) + if err != nil { + return nil, 0, 0, 0, 0, nil, nil, nil, err + } + + dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, + newDocNums, numDocs, chunkFactor, cr) + if err != nil { + return nil, 0, 0, 0, 0, nil, nil, nil, err + } + } else { + dictLocs = make([]uint64, len(fieldsInv)) + } + + fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs) + if err != nil { + return nil, 0, 0, 0, 0, nil, nil, nil, err + } + + return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, fieldsInv, fieldsMap, nil +} + +// mapFields takes the fieldsInv list and returns a map of fieldName +// to fieldID+1 func mapFields(fields []string) map[string]uint16 { rv := make(map[string]uint16, len(fields)) for i, fieldName := range fields { - rv[fieldName] = uint16(i) + rv[fieldName] = uint16(i) + 1 } return rv } // computeNewDocCount determines how many documents will be in the newly // merged segment when obsoleted docs are dropped -func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 { +func computeNewDocCount(segments []*SegmentBase, drops []*roaring.Bitmap) uint64 { var newDocCount uint64 for segI, segment := range segments { - newDocCount += segment.NumDocs() + newDocCount += segment.numDocs if drops[segI] != nil { newDocCount -= drops[segI].GetCardinality() } @@ -135,8 +155,8 @@ func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 { return newDocCount } -func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64, +func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, + fieldsInv []string, fieldsMap map[string]uint16, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkFactor uint32, w *CountHashWriter) ([]uint64, uint64, error) { @@ -144,9 +164,14 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 + var postings *PostingsList + var postItr *PostingsIterator + rv := make([]uint64, len(fieldsInv)) fieldDvLocs := make([]uint64, len(fieldsInv)) - fieldDvLocsOffset := uint64(fieldNotUninverted) + + tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) + locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) // docTermMap is keyed by docNum, where the array impl provides // better memory usage behavior than a sparse-friendlier hashmap @@ -166,36 +191,31 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, return nil, 0, err } - // collect FST iterators from all segments for this field + // collect FST iterators from all active segments for this field + var newDocNums [][]uint64 + var drops []*roaring.Bitmap var dicts []*Dictionary var itrs []vellum.Iterator - for _, segment := range segments { + + for segmentI, segment := range segments { dict, err2 := segment.dictionary(fieldName) if err2 != nil { return nil, 0, err2 } - dicts = append(dicts, dict) - if dict != nil && dict.fst != nil { itr, err2 := dict.fst.Iterator(nil, nil) if err2 != nil && err2 != vellum.ErrIteratorDone { return nil, 0, err2 } if itr != nil { + newDocNums = append(newDocNums, newDocNumsIn[segmentI]) + drops = append(drops, dropsIn[segmentI]) + dicts = append(dicts, dict) itrs = append(itrs, itr) } } } - // create merging iterator - mergeItr, err := vellum.NewMergeIterator(itrs, func(postingOffsets []uint64) uint64 { - // we don't actually use the merged value - return 0 - }) - - tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - if uint64(cap(docTermMap)) < newSegDocCount { docTermMap = make([][]byte, newSegDocCount) } else { @@ -205,70 +225,14 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, } } - for err == nil { - term, _ := mergeItr.Current() - - newRoaring := roaring.NewBitmap() - newRoaringLocs := roaring.NewBitmap() - - tfEncoder.Reset() - locEncoder.Reset() - - // now go back and get posting list for this term - // but pass in the deleted docs for that segment - for dictI, dict := range dicts { - if dict == nil { - continue - } - postings, err2 := dict.postingsList(term, drops[dictI]) - if err2 != nil { - return nil, 0, err2 - } - - postItr := postings.Iterator() - next, err2 := postItr.Next() - for next != nil && err2 == nil { - hitNewDocNum := newDocNums[dictI][next.Number()] - if hitNewDocNum == docDropped { - return nil, 0, fmt.Errorf("see hit with dropped doc num") - } - newRoaring.Add(uint32(hitNewDocNum)) - // encode norm bits - norm := next.Norm() - normBits := math.Float32bits(float32(norm)) - err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err != nil { - return nil, 0, err - } - locs := next.Locations() - if len(locs) > 0 { - newRoaringLocs.Add(uint32(hitNewDocNum)) - for _, loc := range locs { - if cap(bufLoc) < 5+len(loc.ArrayPositions()) { - bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) - } - args := bufLoc[0:5] - args[0] = uint64(fieldsMap[loc.Field()]) - args[1] = loc.Pos() - args[2] = loc.Start() - args[3] = loc.End() - args[4] = uint64(len(loc.ArrayPositions())) - args = append(args, loc.ArrayPositions()...) - err = locEncoder.Add(hitNewDocNum, args...) - if err != nil { - return nil, 0, err - } - } - } + var prevTerm []byte - docTermMap[hitNewDocNum] = - append(append(docTermMap[hitNewDocNum], term...), termSeparator) + newRoaring := roaring.NewBitmap() + newRoaringLocs := roaring.NewBitmap() - next, err2 = postItr.Next() - } - if err2 != nil { - return nil, 0, err2 - } + finishTerm := func(term []byte) error { + if term == nil { + return nil } tfEncoder.Close() @@ -277,59 +241,142 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, if newRoaring.GetCardinality() > 0 { // this field/term actually has hits in the new segment, lets write it down freqOffset := uint64(w.Count()) - _, err = tfEncoder.Write(w) + _, err := tfEncoder.Write(w) if err != nil { - return nil, 0, err + return err } locOffset := uint64(w.Count()) _, err = locEncoder.Write(w) if err != nil { - return nil, 0, err + return err } postingLocOffset := uint64(w.Count()) _, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } postingOffset := uint64(w.Count()) + // write out the start of the term info - buf := bufMaxVarintLen64 - n := binary.PutUvarint(buf, freqOffset) - _, err = w.Write(buf[:n]) + n := binary.PutUvarint(bufMaxVarintLen64, freqOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - // write out the start of the loc info - n = binary.PutUvarint(buf, locOffset) - _, err = w.Write(buf[:n]) + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - - // write out the start of the loc posting list - n = binary.PutUvarint(buf, postingLocOffset) - _, err = w.Write(buf[:n]) + // write out the start of the posting locs + n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } _, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } err = newVellum.Insert(term, postingOffset) if err != nil { + return err + } + } + + newRoaring = roaring.NewBitmap() + newRoaringLocs = roaring.NewBitmap() + + tfEncoder.Reset() + locEncoder.Reset() + + return nil + } + + enumerator, err := newEnumerator(itrs) + + for err == nil { + term, itrI, postingsOffset := enumerator.Current() + + if !bytes.Equal(prevTerm, term) { + // if the term changed, write out the info collected + // for the previous term + err2 := finishTerm(prevTerm) + if err2 != nil { + return nil, 0, err2 + } + } + + var err2 error + postings, err2 = dicts[itrI].postingsListFromOffset( + postingsOffset, drops[itrI], postings) + if err2 != nil { + return nil, 0, err2 + } + + newDocNumsI := newDocNums[itrI] + + postItr = postings.iterator(postItr) + next, err2 := postItr.Next() + for next != nil && err2 == nil { + hitNewDocNum := newDocNumsI[next.Number()] + if hitNewDocNum == docDropped { + return nil, 0, fmt.Errorf("see hit with dropped doc num") + } + newRoaring.Add(uint32(hitNewDocNum)) + // encode norm bits + norm := next.Norm() + normBits := math.Float32bits(float32(norm)) + err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) + if err != nil { return nil, 0, err } + locs := next.Locations() + if len(locs) > 0 { + newRoaringLocs.Add(uint32(hitNewDocNum)) + for _, loc := range locs { + if cap(bufLoc) < 5+len(loc.ArrayPositions()) { + bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) + } + args := bufLoc[0:5] + args[0] = uint64(fieldsMap[loc.Field()] - 1) + args[1] = loc.Pos() + args[2] = loc.Start() + args[3] = loc.End() + args[4] = uint64(len(loc.ArrayPositions())) + args = append(args, loc.ArrayPositions()...) + err = locEncoder.Add(hitNewDocNum, args...) + if err != nil { + return nil, 0, err + } + } + } + + docTermMap[hitNewDocNum] = + append(append(docTermMap[hitNewDocNum], term...), termSeparator) + + next, err2 = postItr.Next() + } + if err2 != nil { + return nil, 0, err2 } - err = mergeItr.Next() + prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem + prevTerm = append(prevTerm, term...) + + err = enumerator.Next() } if err != nil && err != vellum.ErrIteratorDone { return nil, 0, err } + err = finishTerm(prevTerm) + if err != nil { + return nil, 0, err + } + dictOffset := uint64(w.Count()) err = newVellum.Close() @@ -378,7 +425,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, } } - fieldDvLocsOffset = uint64(w.Count()) + fieldDvLocsOffset := uint64(w.Count()) buf := bufMaxVarintLen64 for _, offset := range fieldDvLocs { @@ -392,10 +439,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, return rv, fieldDvLocsOffset, nil } -const docDropped = math.MaxUint64 - -func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, - fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64, +func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, + fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. @@ -417,10 +462,30 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, for segI, segment := range segments { segNewDocNums := make([]uint64, segment.numDocs) + dropsI := drops[segI] + + // optimize when the field mapping is the same across all + // segments and there are no deletions, via byte-copying + // of stored docs bytes directly to the writer + if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { + err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) + if err != nil { + return 0, nil, err + } + + for i := uint64(0); i < segment.numDocs; i++ { + segNewDocNums[i] = newDocNum + newDocNum++ + } + rv = append(rv, segNewDocNums) + + continue + } + // for each doc num for docNum := uint64(0); docNum < segment.numDocs; docNum++ { // TODO: roaring's API limits docNums to 32-bits? - if drops[segI] != nil && drops[segI].Contains(uint32(docNum)) { + if dropsI != nil && dropsI.Contains(uint32(docNum)) { segNewDocNums[docNum] = docDropped continue } @@ -439,7 +504,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, poss[i] = poss[i][:0] } err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool { - fieldID := int(fieldsMap[field]) + fieldID := int(fieldsMap[field]) - 1 vals[fieldID] = append(vals[fieldID], value) typs[fieldID] = append(typs[fieldID], typ) poss[fieldID] = append(poss[fieldID], pos) @@ -453,47 +518,14 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, for fieldID := range fieldsInv { storedFieldValues := vals[int(fieldID)] - // has stored values for this field - num := len(storedFieldValues) + stf := typs[int(fieldID)] + spf := poss[int(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, nil, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i])) - if err2 != nil { - return 0, nil, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, nil, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode all array positions - for j := 0; j < len(poss[int(fieldID)][i]); j++ { - _, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j]) - if err2 != nil { - return 0, nil, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, nil, err2 } } @@ -528,36 +560,87 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, } // return value is the start of the stored index - offset := uint64(w.Count()) + storedIndexOffset := uint64(w.Count()) // now write out the stored doc index - for docNum := range docNumOffsets { - err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum]) + for _, docNumOffset := range docNumOffsets { + err := binary.Write(w, binary.BigEndian, docNumOffset) if err != nil { return 0, nil, err } } - return offset, rv, nil + return storedIndexOffset, rv, nil } -// mergeFields builds a unified list of fields used across all the input segments -func mergeFields(segments []*Segment) []string { - fieldsMap := map[string]struct{}{} +// copyStoredDocs writes out a segment's stored doc info, optimized by +// using a single Write() call for the entire set of bytes. The +// newDocNumOffsets is filled with the new offsets for each doc. +func (s *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, + w *CountHashWriter) error { + if s.numDocs <= 0 { + return nil + } + + indexOffset0, storedOffset0, _, _, _ := + s.getDocStoredOffsets(0) // the segment's first doc + + indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN := + s.getDocStoredOffsets(s.numDocs - 1) // the segment's last doc + + storedOffset0New := uint64(w.Count()) + + storedBytes := s.mem[storedOffset0 : storedOffsetN+readN+metaLenN+dataLenN] + _, err := w.Write(storedBytes) + if err != nil { + return err + } + + // remap the storedOffset's for the docs into new offsets relative + // to storedOffset0New, filling the given docNumOffsetsOut array + for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += 8 { + storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8]) + storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New + newDocNumOffsets[newDocNum] = storedOffsetNew + newDocNum += 1 + } + + return nil +} + +// mergeFields builds a unified list of fields used across all the +// input segments, and computes whether the fields are the same across +// segments (which depends on fields to be sorted in the same way +// across segments) +func mergeFields(segments []*SegmentBase) (bool, []string) { + fieldsSame := true + + var segment0Fields []string + if len(segments) > 0 { + segment0Fields = segments[0].Fields() + } + + fieldsExist := map[string]struct{}{} for _, segment := range segments { fields := segment.Fields() - for _, field := range fields { - fieldsMap[field] = struct{}{} + for fieldi, field := range fields { + fieldsExist[field] = struct{}{} + if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { + fieldsSame = false + } } } - rv := make([]string, 0, len(fieldsMap)) + rv := make([]string, 0, len(fieldsExist)) // ensure _id stays first rv = append(rv, "_id") - for k := range fieldsMap { + for k := range fieldsExist { if k != "_id" { rv = append(rv, k) } } - return rv + + sort.Strings(rv[1:]) // leave _id as first + + return fieldsSame, rv } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/posting.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/posting.go index 67e08d1ae3..d504885d05 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/posting.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/posting.go @@ -28,21 +28,27 @@ import ( // PostingsList is an in-memory represenation of a postings list type PostingsList struct { sb *SegmentBase - term []byte postingsOffset uint64 freqOffset uint64 locOffset uint64 locBitmap *roaring.Bitmap postings *roaring.Bitmap except *roaring.Bitmap - postingKey []byte } // Iterator returns an iterator for this postings list func (p *PostingsList) Iterator() segment.PostingsIterator { - rv := &PostingsIterator{ - postings: p, + return p.iterator(nil) +} + +func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { + if rv == nil { + rv = &PostingsIterator{} + } else { + *rv = PostingsIterator{} // clear the struct } + rv.postings = p + if p.postings != nil { // prepare the freq chunk details var n uint64 diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/read.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/read.go index 0c5b9e17fa..e47d4c6abd 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/read.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/read.go @@ -17,15 +17,27 @@ package zap import "encoding/binary" func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) { - docStoredStartAddr := s.storedIndexOffset + (8 * docNum) - docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8]) + _, storedOffset, n, metaLen, dataLen := s.getDocStoredOffsets(docNum) + + meta := s.mem[storedOffset+n : storedOffset+n+metaLen] + data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen] + + return meta, data +} + +func (s *SegmentBase) getDocStoredOffsets(docNum uint64) ( + uint64, uint64, uint64, uint64, uint64) { + indexOffset := s.storedIndexOffset + (8 * docNum) + + storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8]) + var n uint64 - metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64]) + + metaLen, read := binary.Uvarint(s.mem[storedOffset : storedOffset+binary.MaxVarintLen64]) n += uint64(read) - var dataLen uint64 - dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64]) + + dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64]) n += uint64(read) - meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen] - data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen] - return meta, data + + return indexOffset, storedOffset, n, metaLen, dataLen } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go index 94268caceb..40c0af2741 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go @@ -343,8 +343,9 @@ func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) { return nil, err } + var postings *PostingsList for _, id := range ids { - postings, err := idDict.postingsList([]byte(id), nil) + postings, err = idDict.postingsList([]byte(id), nil, postings) if err != nil { return nil, err } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go index 43c3ba9f1e..247003311e 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go @@ -31,10 +31,9 @@ func (r *RollbackPoint) GetInternal(key []byte) []byte { return r.meta[string(key)] } -// RollbackPoints returns an array of rollback points available -// for the application to make a decision on where to rollback -// to. A nil return value indicates that there are no available -// rollback points. +// RollbackPoints returns an array of rollback points available for +// the application to rollback to, with more recent rollback points +// (higher epochs) coming first. func (s *Scorch) RollbackPoints() ([]*RollbackPoint, error) { if s.rootBolt == nil { return nil, fmt.Errorf("RollbackPoints: root is nil") @@ -54,7 +53,7 @@ func (s *Scorch) RollbackPoints() ([]*RollbackPoint, error) { snapshots := tx.Bucket(boltSnapshotsBucket) if snapshots == nil { - return nil, fmt.Errorf("RollbackPoints: no snapshots available") + return nil, nil } rollbackPoints := []*RollbackPoint{} @@ -150,10 +149,7 @@ func (s *Scorch) Rollback(to *RollbackPoint) error { revert.snapshot = indexSnapshot revert.applied = make(chan error) - - if !s.unsafeBatch { - revert.persisted = make(chan error) - } + revert.persisted = make(chan error) return nil }) @@ -173,9 +169,5 @@ func (s *Scorch) Rollback(to *RollbackPoint) error { return fmt.Errorf("Rollback: failed with err: %v", err) } - if revert.persisted != nil { - err = <-revert.persisted - } - - return err + return <-revert.persisted } diff --git a/vendor/github.com/blevesearch/bleve/index/upsidedown/upsidedown.go b/vendor/github.com/blevesearch/bleve/index/upsidedown/upsidedown.go index 1243375b76..70e6e457f6 100644 --- a/vendor/github.com/blevesearch/bleve/index/upsidedown/upsidedown.go +++ b/vendor/github.com/blevesearch/bleve/index/upsidedown/upsidedown.go @@ -837,6 +837,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { docBackIndexRowErr = err return } + defer func() { + if cerr := kvreader.Close(); err == nil && cerr != nil { + docBackIndexRowErr = cerr + } + }() for docID, doc := range batch.IndexOps { backIndexRow, err := backIndexRowForDoc(kvreader, index.IndexInternalID(docID)) @@ -847,12 +852,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow} } - - err = kvreader.Close() - if err != nil { - docBackIndexRowErr = err - return - } }() // wait for analysis result diff --git a/vendor/github.com/blevesearch/bleve/index_alias_impl.go b/vendor/github.com/blevesearch/bleve/index_alias_impl.go index 9e9a3594ff..f678a059b7 100644 --- a/vendor/github.com/blevesearch/bleve/index_alias_impl.go +++ b/vendor/github.com/blevesearch/bleve/index_alias_impl.go @@ -15,12 +15,11 @@ package bleve import ( + "context" "sort" "sync" "time" - "golang.org/x/net/context" - "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/store" diff --git a/vendor/github.com/blevesearch/bleve/index_impl.go b/vendor/github.com/blevesearch/bleve/index_impl.go index 799b582a06..caea1b8e04 100644 --- a/vendor/github.com/blevesearch/bleve/index_impl.go +++ b/vendor/github.com/blevesearch/bleve/index_impl.go @@ -15,6 +15,7 @@ package bleve import ( + "context" "encoding/json" "fmt" "os" @@ -22,8 +23,6 @@ import ( "sync/atomic" "time" - "golang.org/x/net/context" - "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/store" diff --git a/vendor/github.com/blevesearch/bleve/search/collector.go b/vendor/github.com/blevesearch/bleve/search/collector.go index cba4829d46..0d163a9d9d 100644 --- a/vendor/github.com/blevesearch/bleve/search/collector.go +++ b/vendor/github.com/blevesearch/bleve/search/collector.go @@ -15,11 +15,10 @@ package search import ( + "context" "time" "github.com/blevesearch/bleve/index" - - "golang.org/x/net/context" ) type Collector interface { diff --git a/vendor/github.com/blevesearch/bleve/search/collector/topn.go b/vendor/github.com/blevesearch/bleve/search/collector/topn.go index 2c7c6752df..388370e7e7 100644 --- a/vendor/github.com/blevesearch/bleve/search/collector/topn.go +++ b/vendor/github.com/blevesearch/bleve/search/collector/topn.go @@ -15,11 +15,11 @@ package collector import ( + "context" "time" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/search" - "golang.org/x/net/context" ) type collectorStore interface { |