diff options
Diffstat (limited to 'vendor/github.com/blevesearch/bleve')
35 files changed, 1144 insertions, 297 deletions
diff --git a/vendor/github.com/blevesearch/bleve/README.md b/vendor/github.com/blevesearch/bleve/README.md index 7c1a7c7c46..eff0be97e4 100644 --- a/vendor/github.com/blevesearch/bleve/README.md +++ b/vendor/github.com/blevesearch/bleve/README.md @@ -1,10 +1,13 @@ # ![bleve](docs/bleve.png) bleve -[![Build Status](https://travis-ci.org/blevesearch/bleve.svg?branch=master)](https://travis-ci.org/blevesearch/bleve) [![Coverage Status](https://coveralls.io/repos/github/blevesearch/bleve/badge.svg?branch=master)](https://coveralls.io/github/blevesearch/bleve?branch=master) [![GoDoc](https://godoc.org/github.com/blevesearch/bleve?status.svg)](https://godoc.org/github.com/blevesearch/bleve) +[![Tests](https://github.com/blevesearch/bleve/workflows/Tests/badge.svg?branch=master&event=push)](https://github.com/blevesearch/bleve/actions?query=workflow%3ATests+event%3Apush+branch%3Amaster) +[![Coverage Status](https://coveralls.io/repos/github/blevesearch/bleve/badge.svg?branch=master)](https://coveralls.io/github/blevesearch/bleve?branch=master) +[![GoDoc](https://godoc.org/github.com/blevesearch/bleve?status.svg)](https://godoc.org/github.com/blevesearch/bleve) [![Join the chat at https://gitter.im/blevesearch/bleve](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/blevesearch/bleve?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![codebeat](https://codebeat.co/badges/38a7cbc9-9cf5-41c0-a315-0746178230f4)](https://codebeat.co/projects/github-com-blevesearch-bleve) [![Go Report Card](https://goreportcard.com/badge/blevesearch/bleve)](https://goreportcard.com/report/blevesearch/bleve) -[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) modern text indexing in go - [blevesearch.com](http://www.blevesearch.com/) diff --git a/vendor/github.com/blevesearch/bleve/builder.go b/vendor/github.com/blevesearch/bleve/builder.go new file mode 100644 index 0000000000..de00c97b6e --- /dev/null +++ b/vendor/github.com/blevesearch/bleve/builder.go @@ -0,0 +1,94 @@ +// Copyright (c) 2019 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bleve + +import ( + "encoding/json" + "fmt" + + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch" + "github.com/blevesearch/bleve/mapping" +) + +type builderImpl struct { + b index.IndexBuilder + m mapping.IndexMapping +} + +func (b *builderImpl) Index(id string, data interface{}) error { + if id == "" { + return ErrorEmptyID + } + + doc := document.NewDocument(id) + err := b.m.MapDocument(doc, data) + if err != nil { + return err + } + err = b.b.Index(doc) + return err +} + +func (b *builderImpl) Close() error { + return b.b.Close() +} + +func newBuilder(path string, mapping mapping.IndexMapping, config map[string]interface{}) (Builder, error) { + if path == "" { + return nil, fmt.Errorf("builder requires path") + } + + err := mapping.Validate() + if err != nil { + return nil, err + } + + if config == nil { + config = map[string]interface{}{} + } + + // the builder does not have an API to interact with internal storage + // however we can pass k/v pairs through the config + mappingBytes, err := json.Marshal(mapping) + if err != nil { + return nil, err + } + config["internal"] = map[string][]byte{ + string(mappingInternalKey): mappingBytes, + } + + // do not use real config, as these are options for the builder, + // not the resulting index + meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{}) + err = meta.Save(path) + if err != nil { + return nil, err + } + + config["path"] = indexStorePath(path) + + b, err := scorch.NewBuilder(config) + if err != nil { + return nil, err + } + rv := &builderImpl{ + b: b, + m: mapping, + } + + return rv, nil +} diff --git a/vendor/github.com/blevesearch/bleve/go.mod b/vendor/github.com/blevesearch/bleve/go.mod index d38cf8f921..9a12454fda 100644 --- a/vendor/github.com/blevesearch/bleve/go.mod +++ b/vendor/github.com/blevesearch/bleve/go.mod @@ -3,16 +3,17 @@ module github.com/blevesearch/bleve go 1.13 require ( - github.com/RoaringBitmap/roaring v0.4.21 + github.com/RoaringBitmap/roaring v0.4.23 github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 github.com/blevesearch/go-porterstemmer v1.0.3 github.com/blevesearch/segment v0.9.0 github.com/blevesearch/snowballstem v0.9.0 - github.com/blevesearch/zap/v11 v11.0.7 - github.com/blevesearch/zap/v12 v12.0.7 - github.com/couchbase/ghistogram v0.1.0 // indirect + github.com/blevesearch/zap/v11 v11.0.10 + github.com/blevesearch/zap/v12 v12.0.10 + github.com/blevesearch/zap/v13 v13.0.2 + github.com/blevesearch/zap/v14 v14.0.1 github.com/couchbase/moss v0.1.0 - github.com/couchbase/vellum v1.0.1 + github.com/couchbase/vellum v1.0.2 github.com/golang/protobuf v1.3.2 github.com/kljensen/snowball v0.6.0 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 @@ -20,6 +21,6 @@ require ( github.com/steveyen/gtreap v0.1.0 github.com/syndtr/goleveldb v1.0.0 github.com/willf/bitset v1.1.10 - go.etcd.io/bbolt v1.3.4 + go.etcd.io/bbolt v1.3.5 golang.org/x/text v0.3.0 ) diff --git a/vendor/github.com/blevesearch/bleve/index.go b/vendor/github.com/blevesearch/bleve/index.go index ef6ede9343..974358b81e 100644 --- a/vendor/github.com/blevesearch/bleve/index.go +++ b/vendor/github.com/blevesearch/bleve/index.go @@ -293,3 +293,17 @@ func Open(path string) (Index, error) { func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error) { return openIndexUsing(path, runtimeConfig) } + +// Builder is a limited interface, used to build indexes in an offline mode. +// Items cannot be updated or deleted, and the caller MUST ensure a document is +// indexed only once. +type Builder interface { + Index(id string, data interface{}) error + Close() error +} + +// NewBuilder creates a builder, which will build an index at the specified path, +// using the specified mapping and options. +func NewBuilder(path string, mapping mapping.IndexMapping, config map[string]interface{}) (Builder, error) { + return newBuilder(path, mapping, config) +} diff --git a/vendor/github.com/blevesearch/bleve/index/index.go b/vendor/github.com/blevesearch/bleve/index/index.go index 3e866f3aab..551f8de842 100644 --- a/vendor/github.com/blevesearch/bleve/index/index.go +++ b/vendor/github.com/blevesearch/bleve/index/index.go @@ -367,3 +367,10 @@ type OptimizableContext interface { type DocValueReader interface { VisitDocValues(id IndexInternalID, visitor DocumentFieldTermVisitor) error } + +// IndexBuilder is an interface supported by some index schemes +// to allow direct write-only index building +type IndexBuilder interface { + Index(doc *document.Document) error + Close() error +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/builder.go b/vendor/github.com/blevesearch/bleve/index/scorch/builder.go new file mode 100644 index 0000000000..1f4b41d639 --- /dev/null +++ b/vendor/github.com/blevesearch/bleve/index/scorch/builder.go @@ -0,0 +1,334 @@ +// Copyright (c) 2019 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scorch + +import ( + "fmt" + "io/ioutil" + "os" + "sync" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" + bolt "go.etcd.io/bbolt" +) + +const DefaultBuilderBatchSize = 1000 +const DefaultBuilderMergeMax = 10 + +type Builder struct { + m sync.Mutex + segCount uint64 + path string + buildPath string + segPaths []string + batchSize int + mergeMax int + batch *index.Batch + internal map[string][]byte + segPlugin segment.Plugin +} + +func NewBuilder(config map[string]interface{}) (*Builder, error) { + path, ok := config["path"].(string) + if !ok { + return nil, fmt.Errorf("must specify path") + } + + buildPathPrefix, _ := config["buildPathPrefix"].(string) + buildPath, err := ioutil.TempDir(buildPathPrefix, "scorch-offline-build") + if err != nil { + return nil, err + } + + rv := &Builder{ + path: path, + buildPath: buildPath, + mergeMax: DefaultBuilderMergeMax, + batchSize: DefaultBuilderBatchSize, + batch: index.NewBatch(), + segPlugin: defaultSegmentPlugin, + } + + err = rv.parseConfig(config) + if err != nil { + return nil, fmt.Errorf("error parsing builder config: %v", err) + } + + return rv, nil +} + +func (o *Builder) parseConfig(config map[string]interface{}) (err error) { + if v, ok := config["mergeMax"]; ok { + var t int + if t, err = parseToInteger(v); err != nil { + return fmt.Errorf("mergeMax parse err: %v", err) + } + if t > 0 { + o.mergeMax = t + } + } + + if v, ok := config["batchSize"]; ok { + var t int + if t, err = parseToInteger(v); err != nil { + return fmt.Errorf("batchSize parse err: %v", err) + } + if t > 0 { + o.batchSize = t + } + } + + if v, ok := config["internal"]; ok { + if vinternal, ok := v.(map[string][]byte); ok { + o.internal = vinternal + } + } + + forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) + if err != nil { + return err + } + if forcedSegmentType != "" && forcedSegmentVersion != 0 { + segPlugin, err := chooseSegmentPlugin(forcedSegmentType, + uint32(forcedSegmentVersion)) + if err != nil { + return err + } + o.segPlugin = segPlugin + } + + return nil +} + +// Index will place the document into the index. +// It is invalid to index the same document multiple times. +func (o *Builder) Index(doc *document.Document) error { + o.m.Lock() + defer o.m.Unlock() + + o.batch.Update(doc) + + return o.maybeFlushBatchLOCKED(o.batchSize) +} + +func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error { + if len(o.batch.IndexOps) >= moreThan { + defer o.batch.Reset() + return o.executeBatchLOCKED(o.batch) + } + return nil +} + +func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) { + analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps)) + for _, doc := range batch.IndexOps { + if doc != nil { + // insert _id field + doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil)) + // perform analysis directly + analysisResult := analyze(doc) + analysisResults = append(analysisResults, analysisResult) + } + } + + seg, _, err := o.segPlugin.New(analysisResults) + if err != nil { + return fmt.Errorf("error building segment base: %v", err) + } + + filename := zapFileName(o.segCount) + o.segCount++ + path := o.buildPath + string(os.PathSeparator) + filename + + if segUnpersisted, ok := seg.(segment.UnpersistedSegment); ok { + err = segUnpersisted.Persist(path) + if err != nil { + return fmt.Errorf("error persisting segment base to %s: %v", path, err) + } + + o.segPaths = append(o.segPaths, path) + return nil + } + + return fmt.Errorf("new segment does not implement unpersisted: %T", seg) +} + +func (o *Builder) doMerge() error { + // as long as we have more than 1 segment, keep merging + for len(o.segPaths) > 1 { + + // merge the next <mergeMax> number of segments into one new one + // or, if there are fewer than <mergeMax> remaining, merge them all + mergeCount := o.mergeMax + if mergeCount > len(o.segPaths) { + mergeCount = len(o.segPaths) + } + + mergePaths := o.segPaths[0:mergeCount] + o.segPaths = o.segPaths[mergeCount:] + + // open each of the segments to be merged + mergeSegs := make([]segment.Segment, 0, mergeCount) + + // closeOpenedSegs attempts to close all opened + // segments even if an error occurs, in which case + // the first error is returned + closeOpenedSegs := func() error { + var err error + for _, seg := range mergeSegs { + clErr := seg.Close() + if clErr != nil && err == nil { + err = clErr + } + } + return err + } + + for _, mergePath := range mergePaths { + seg, err := o.segPlugin.Open(mergePath) + if err != nil { + _ = closeOpenedSegs() + return fmt.Errorf("error opening segment (%s) for merge: %v", mergePath, err) + } + mergeSegs = append(mergeSegs, seg) + } + + // do the merge + mergedSegPath := o.buildPath + string(os.PathSeparator) + zapFileName(o.segCount) + drops := make([]*roaring.Bitmap, mergeCount) + _, _, err := o.segPlugin.Merge(mergeSegs, drops, mergedSegPath, nil, nil) + if err != nil { + _ = closeOpenedSegs() + return fmt.Errorf("error merging segments (%v): %v", mergePaths, err) + } + o.segCount++ + o.segPaths = append(o.segPaths, mergedSegPath) + + // close segments opened for merge + err = closeOpenedSegs() + if err != nil { + return fmt.Errorf("error closing opened segments: %v", err) + } + + // remove merged segments + for _, mergePath := range mergePaths { + err = os.RemoveAll(mergePath) + if err != nil { + return fmt.Errorf("error removing segment %s after merge: %v", mergePath, err) + } + } + } + + return nil +} + +func (o *Builder) Close() error { + o.m.Lock() + defer o.m.Unlock() + + // see if there is a partial batch + err := o.maybeFlushBatchLOCKED(1) + if err != nil { + return fmt.Errorf("error flushing batch before close: %v", err) + } + + // perform all the merging + err = o.doMerge() + if err != nil { + return fmt.Errorf("error while merging: %v", err) + } + + // ensure the store path exists + err = os.MkdirAll(o.path, 0700) + if err != nil { + return err + } + + // move final segment into place + // segment id 2 is chosen to match the behavior of a scorch + // index which indexes a single batch of data + finalSegPath := o.path + string(os.PathSeparator) + zapFileName(2) + err = os.Rename(o.segPaths[0], finalSegPath) + if err != nil { + return fmt.Errorf("error moving final segment into place: %v", err) + } + + // remove the buildPath, as it is no longer needed + err = os.RemoveAll(o.buildPath) + if err != nil { + return fmt.Errorf("error removing build path: %v", err) + } + + // prepare wrapping + seg, err := o.segPlugin.Open(finalSegPath) + if err != nil { + return fmt.Errorf("error opening final segment") + } + + // create a segment snapshot for this segment + ss := &SegmentSnapshot{ + segment: seg, + } + is := &IndexSnapshot{ + epoch: 3, // chosen to match scorch behavior when indexing a single batch + segment: []*SegmentSnapshot{ss}, + creator: "scorch-builder", + internal: o.internal, + } + + // create the root bolt + rootBoltPath := o.path + string(os.PathSeparator) + "root.bolt" + rootBolt, err := bolt.Open(rootBoltPath, 0600, nil) + if err != nil { + return err + } + + // start a write transaction + tx, err := rootBolt.Begin(true) + if err != nil { + return err + } + + // fill the root bolt with this fake index snapshot + _, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin) + if err != nil { + _ = tx.Rollback() + _ = rootBolt.Close() + return fmt.Errorf("error preparing bolt snapshot in root.bolt: %v", err) + } + + // commit bolt data + err = tx.Commit() + if err != nil { + _ = rootBolt.Close() + return fmt.Errorf("error committing bolt tx in root.bolt: %v", err) + } + + // close bolt + err = rootBolt.Close() + if err != nil { + return fmt.Errorf("error closing root.bolt: %v", err) + } + + // close final segment + err = seg.Close() + if err != nil { + return fmt.Errorf("error closing final segment: %v", err) + } + return nil +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/event.go b/vendor/github.com/blevesearch/bleve/index/scorch/event.go index dd79d6d066..8f3fc1914d 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/event.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/event.go @@ -54,3 +54,11 @@ var EventKindBatchIntroductionStart = EventKind(5) // EventKindBatchIntroduction is fired when Batch() completes. var EventKindBatchIntroduction = EventKind(6) + +// EventKindMergeTaskIntroductionStart is fired when the merger is about to +// start the introduction of merged segment from a single merge task. +var EventKindMergeTaskIntroductionStart = EventKind(7) + +// EventKindMergeTaskIntroduction is fired when the merger has completed +// the introduction of merged segment from a single merge task. +var EventKindMergeTaskIntroduction = EventKind(8) diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go index e5f00f80e1..7770c41c52 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go @@ -45,13 +45,7 @@ type epochWatcher struct { notifyCh notificationChan } -type snapshotReversion struct { - snapshot *IndexSnapshot - applied chan error - persisted chan error -} - -func (s *Scorch) mainLoop() { +func (s *Scorch) introducerLoop() { var epochWatchers []*epochWatcher OUTER: for { @@ -389,6 +383,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { } } } + var skipped bool // In case where all the docs in the newly merged segment getting // deleted by the time we reach here, can skip the introduction. if nextMerge.new != nil && @@ -411,6 +406,9 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { docsToPersistCount += nextMerge.new.Count() - newSegmentDeleted.GetCardinality() memSegments++ } + } else { + skipped = true + atomic.AddUint64(&s.stats.TotFileMergeIntroductionsObsoleted, 1) } atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount) @@ -435,8 +433,10 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { } // notify requester that we incorporated this - nextMerge.notify <- newSnapshot - close(nextMerge.notify) + nextMerge.notifyCh <- &mergeTaskIntroStatus{ + indexSnapshot: newSnapshot, + skipped: skipped} + close(nextMerge.notifyCh) } func isMemorySegment(s *SegmentSnapshot) bool { diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go index 37dca529a6..56c0953f46 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go @@ -15,6 +15,7 @@ package scorch import ( + "context" "encoding/json" "fmt" "os" @@ -29,12 +30,16 @@ import ( func (s *Scorch) mergerLoop() { var lastEpochMergePlanned uint64 + var ctrlMsg *mergerCtrl mergePlannerOptions, err := s.parseMergePlannerOptions() if err != nil { s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err)) s.asyncTasks.Done() return } + ctrlMsgDflt := &mergerCtrl{ctx: context.Background(), + options: mergePlannerOptions, + doneCh: nil} OUTER: for { @@ -53,16 +58,30 @@ OUTER: atomic.StoreUint64(&s.iStats.mergeEpoch, ourSnapshot.epoch) s.rootLock.Unlock() - if ourSnapshot.epoch != lastEpochMergePlanned { + if ctrlMsg == nil && ourSnapshot.epoch != lastEpochMergePlanned { + ctrlMsg = ctrlMsgDflt + } + if ctrlMsg != nil { startTime := time.Now() // lets get started - err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions) + err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options, + ourSnapshot) if err != nil { atomic.StoreUint64(&s.iStats.mergeEpoch, 0) if err == segment.ErrClosed { // index has been closed _ = ourSnapshot.DecRef() + + // continue the workloop on a user triggered cancel + if ctrlMsg.doneCh != nil { + close(ctrlMsg.doneCh) + ctrlMsg = nil + continue OUTER + } + + // exit the workloop on index closure + ctrlMsg = nil break OUTER } s.fireAsyncError(fmt.Errorf("merging err: %v", err)) @@ -70,6 +89,12 @@ OUTER: atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1) continue OUTER } + + if ctrlMsg.doneCh != nil { + close(ctrlMsg.doneCh) + } + ctrlMsg = nil + lastEpochMergePlanned = ourSnapshot.epoch atomic.StoreUint64(&s.stats.LastMergedEpoch, ourSnapshot.epoch) @@ -90,6 +115,8 @@ OUTER: case <-s.closeCh: break OUTER case s.persisterNotifier <- ew: + case ctrlMsg = <-s.forceMergeRequestCh: + continue OUTER } // now wait for persister (but also detect close) @@ -97,6 +124,7 @@ OUTER: case <-s.closeCh: break OUTER case <-ew.notifyCh: + case ctrlMsg = <-s.forceMergeRequestCh: } } @@ -106,6 +134,58 @@ OUTER: s.asyncTasks.Done() } +type mergerCtrl struct { + ctx context.Context + options *mergeplan.MergePlanOptions + doneCh chan struct{} +} + +// ForceMerge helps users trigger a merge operation on +// an online scorch index. +func (s *Scorch) ForceMerge(ctx context.Context, + mo *mergeplan.MergePlanOptions) error { + // check whether force merge is already under processing + s.rootLock.Lock() + if s.stats.TotFileMergeForceOpsStarted > + s.stats.TotFileMergeForceOpsCompleted { + s.rootLock.Unlock() + return fmt.Errorf("force merge already in progress") + } + + s.stats.TotFileMergeForceOpsStarted++ + s.rootLock.Unlock() + + if mo != nil { + err := mergeplan.ValidateMergePlannerOptions(mo) + if err != nil { + return err + } + } else { + // assume the default single segment merge policy + mo = &mergeplan.SingleSegmentMergePlanOptions + } + msg := &mergerCtrl{options: mo, + doneCh: make(chan struct{}), + ctx: ctx, + } + + // request the merger perform a force merge + select { + case s.forceMergeRequestCh <- msg: + case <-s.closeCh: + return nil + } + + // wait for the force merge operation completion + select { + case <-msg.doneCh: + atomic.AddUint64(&s.stats.TotFileMergeForceOpsCompleted, 1) + case <-s.closeCh: + } + + return nil +} + func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions, error) { mergePlannerOptions := mergeplan.DefaultMergePlanOptions @@ -128,8 +208,39 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions, return &mergePlannerOptions, nil } -func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, - options *mergeplan.MergePlanOptions) error { +type closeChWrapper struct { + ch1 chan struct{} + ctx context.Context + closeCh chan struct{} +} + +func newCloseChWrapper(ch1 chan struct{}, + ctx context.Context) *closeChWrapper { + return &closeChWrapper{ch1: ch1, + ctx: ctx, + closeCh: make(chan struct{})} +} + +func (w *closeChWrapper) close() { + select { + case <-w.closeCh: + default: + close(w.closeCh) + } +} + +func (w *closeChWrapper) listen() { + select { + case <-w.ch1: + w.close() + case <-w.ctx.Done(): + w.close() + case <-w.closeCh: + } +} + +func (s *Scorch) planMergeAtSnapshot(ctx context.Context, + options *mergeplan.MergePlanOptions, ourSnapshot *IndexSnapshot) error { // build list of persisted segments in this snapshot var onlyPersistedSnapshots []mergeplan.Segment for _, segmentSnapshot := range ourSnapshot.segment { @@ -158,6 +269,11 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, // process tasks in serial for now var filenames []string + cw := newCloseChWrapper(s.closeCh, ctx) + defer cw.close() + + go cw.listen() + for _, task := range resultMergePlan.Tasks { if len(task.Segments) == 0 { atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1) @@ -194,8 +310,9 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, var oldNewDocNums map[uint64][]uint64 var seg segment.Segment + var filename string if len(segmentsToMerge) > 0 { - filename := zapFileName(newSegmentID) + filename = zapFileName(newSegmentID) s.markIneligibleForRemoval(filename) path := s.path + string(os.PathSeparator) + filename @@ -203,7 +320,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1) newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path, - s.closeCh, s) + cw.closeCh, s) atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1) fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime)) @@ -240,9 +357,11 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, old: oldMap, oldNewDocNums: oldNewDocNums, new: seg, - notify: make(chan *IndexSnapshot), + notifyCh: make(chan *mergeTaskIntroStatus), } + s.fireEvent(EventKindMergeTaskIntroductionStart, 0) + // give it to the introducer select { case <-s.closeCh: @@ -255,18 +374,25 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, introStartTime := time.Now() // it is safe to blockingly wait for the merge introduction // here as the introducer is bound to handle the notify channel. - newSnapshot := <-sm.notify + introStatus := <-sm.notifyCh introTime := uint64(time.Since(introStartTime)) atomic.AddUint64(&s.stats.TotFileMergeZapIntroductionTime, introTime) if atomic.LoadUint64(&s.stats.MaxFileMergeZapIntroductionTime) < introTime { atomic.StoreUint64(&s.stats.MaxFileMergeZapIntroductionTime, introTime) } atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1) - if newSnapshot != nil { - _ = newSnapshot.DecRef() + if introStatus != nil && introStatus.indexSnapshot != nil { + _ = introStatus.indexSnapshot.DecRef() + if introStatus.skipped { + // close the segment on skipping introduction. + s.unmarkIneligibleForRemoval(filename) + _ = seg.Close() + } } atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1) + + s.fireEvent(EventKindMergeTaskIntroduction, 0) } // once all the newly merged segment introductions are done, @@ -279,12 +405,17 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, return nil } +type mergeTaskIntroStatus struct { + indexSnapshot *IndexSnapshot + skipped bool +} + type segmentMerge struct { id uint64 old map[uint64]*SegmentSnapshot oldNewDocNums map[uint64][]uint64 new segment.Segment - notify chan *IndexSnapshot + notifyCh chan *mergeTaskIntroStatus } // perform a merging of the given SegmentBase instances into a new, @@ -334,7 +465,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, old: make(map[uint64]*SegmentSnapshot), oldNewDocNums: make(map[uint64][]uint64), new: seg, - notify: make(chan *IndexSnapshot), + notifyCh: make(chan *mergeTaskIntroStatus), } for i, idx := range sbsIndexes { @@ -351,11 +482,20 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, } // blockingly wait for the introduction to complete - newSnapshot := <-sm.notify - if newSnapshot != nil { + var newSnapshot *IndexSnapshot + introStatus := <-sm.notifyCh + if introStatus != nil && introStatus.indexSnapshot != nil { + newSnapshot = introStatus.indexSnapshot atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs))) atomic.AddUint64(&s.stats.TotMemMergeDone, 1) + if introStatus.skipped { + // close the segment on skipping introduction. + _ = newSnapshot.DecRef() + _ = seg.Close() + newSnapshot = nil + } } + return newSnapshot, newSegmentID, nil } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go b/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go index c2a0d3c644..7523506626 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go @@ -134,6 +134,17 @@ var DefaultMergePlanOptions = MergePlanOptions{ ReclaimDeletesWeight: 2.0, } +// SingleSegmentMergePlanOptions helps in creating a +// single segment index. +var SingleSegmentMergePlanOptions = MergePlanOptions{ + MaxSegmentsPerTier: 1, + MaxSegmentSize: 1 << 30, + TierGrowth: 1.0, + SegmentsPerMergeTask: 10, + FloorSegmentSize: 1 << 30, + ReclaimDeletesWeight: 2.0, +} + // ------------------------------------------- func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) { @@ -173,7 +184,7 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) { calcBudget = CalcBudget } - budgetNumSegments := CalcBudget(eligiblesLiveSize, minLiveSize, o) + budgetNumSegments := calcBudget(eligiblesLiveSize, minLiveSize, o) scoreSegments := o.ScoreSegments if scoreSegments == nil { diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go b/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go index b9cb9228af..658354cd71 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go @@ -16,10 +16,10 @@ package scorch import ( "fmt" - "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" + "sync/atomic" ) var OptimizeConjunction = true @@ -40,7 +40,7 @@ func (s *IndexSnapshotTermFieldReader) Optimize(kind string, return s.optimizeDisjunctionUnadorned(octx) } - return octx, nil + return nil, nil } var OptimizeDisjunctionUnadornedMinChildCardinality = uint64(256) @@ -161,16 +161,8 @@ func (o *OptimizeTFRConjunctionUnadorned) Finish() (rv index.Optimized, err erro // We use an artificial term and field because the optimized // termFieldReader can represent multiple terms and fields. - oTFR := &IndexSnapshotTermFieldReader{ - term: OptimizeTFRConjunctionUnadornedTerm, - field: OptimizeTFRConjunctionUnadornedField, - snapshot: o.snapshot, - iterators: make([]segment.PostingsIterator, len(o.snapshot.segment)), - segmentOffset: 0, - includeFreq: false, - includeNorm: false, - includeTermVectors: false, - } + oTFR := o.snapshot.unadornedTermFieldReader( + OptimizeTFRConjunctionUnadornedTerm, OptimizeTFRConjunctionUnadornedField) var actualBMs []*roaring.Bitmap // Collected from regular posting lists. @@ -265,6 +257,7 @@ OUTER: oTFR.iterators[i] = segment.NewUnadornedPostingsIteratorFromBitmap(bm) } + atomic.AddUint64(&o.snapshot.parent.stats.TotTermSearchersStarted, uint64(1)) return oTFR, nil } @@ -277,7 +270,9 @@ OUTER: func (s *IndexSnapshotTermFieldReader) optimizeDisjunctionUnadorned( octx index.OptimizableContext) (index.OptimizableContext, error) { if octx == nil { - octx = &OptimizeTFRDisjunctionUnadorned{snapshot: s.snapshot} + octx = &OptimizeTFRDisjunctionUnadorned{ + snapshot: s.snapshot, + } } o, ok := octx.(*OptimizeTFRDisjunctionUnadorned) @@ -328,27 +323,12 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro } } } - - // Heuristic to skip the optimization if all the constituent - // bitmaps are too small, where the processing & resource - // overhead to create the OR'ed bitmap outweighs the benefit. - if cMax < OptimizeDisjunctionUnadornedMinChildCardinality { - return nil, nil - } } // We use an artificial term and field because the optimized // termFieldReader can represent multiple terms and fields. - oTFR := &IndexSnapshotTermFieldReader{ - term: OptimizeTFRDisjunctionUnadornedTerm, - field: OptimizeTFRDisjunctionUnadornedField, - snapshot: o.snapshot, - iterators: make([]segment.PostingsIterator, len(o.snapshot.segment)), - segmentOffset: 0, - includeFreq: false, - includeNorm: false, - includeTermVectors: false, - } + oTFR := o.snapshot.unadornedTermFieldReader( + OptimizeTFRDisjunctionUnadornedTerm, OptimizeTFRDisjunctionUnadornedField) var docNums []uint32 // Collected docNum's from 1-hit posting lists. var actualBMs []*roaring.Bitmap // Collected from regular posting lists. @@ -392,5 +372,25 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro oTFR.iterators[i] = segment.NewUnadornedPostingsIteratorFromBitmap(bm) } + atomic.AddUint64(&o.snapshot.parent.stats.TotTermSearchersStarted, uint64(1)) return oTFR, nil } + +// ---------------------------------------------------------------- + +func (i *IndexSnapshot) unadornedTermFieldReader( + term []byte, field string) *IndexSnapshotTermFieldReader { + // This IndexSnapshotTermFieldReader will not be recycled, more + // conversation here: https://github.com/blevesearch/bleve/pull/1438 + return &IndexSnapshotTermFieldReader{ + term: term, + field: field, + snapshot: i, + iterators: make([]segment.PostingsIterator, len(i.segment)), + segmentOffset: 0, + includeFreq: false, + includeNorm: false, + includeTermVectors: false, + recycle: false, + } +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go index 30e75df77f..498378a4f8 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go @@ -256,7 +256,7 @@ func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, // for sufficient in-memory segments to pile up for the next // memory merge cum persist loop. if numFilesOnDisk < uint64(po.PersisterNapUnderNumFiles) && - po.PersisterNapTimeMSec > 0 && s.paused() == 0 { + po.PersisterNapTimeMSec > 0 && s.NumEventsBlocking() == 0 { select { case <-s.closeCh: case <-time.After(time.Millisecond * time.Duration(po.PersisterNapTimeMSec)): @@ -333,7 +333,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot, // Perform in-memory segment merging only when the memory pressure is // below the configured threshold, else the persister performs the // direct persistence of segments. - if s.paused() < po.MemoryPressurePauseThreshold { + if s.NumEventsBlocking() < po.MemoryPressurePauseThreshold { persisted, err := s.persistSnapshotMaybeMerge(snapshot) if err != nil { return err @@ -428,55 +428,44 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( return true, nil } -func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) { - // start a write transaction - tx, err := s.rootBolt.Begin(true) - if err != nil { - return err - } - // defer rollback on error - defer func() { - if err != nil { - _ = tx.Rollback() - } - }() - +func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, + segPlugin segment.Plugin) ([]string, map[uint64]string, error) { snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket) if err != nil { - return err + return nil, nil, err } newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch) snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey) if err != nil { - return err + return nil, nil, err } // persist meta values metaBucket, err := snapshotBucket.CreateBucketIfNotExists(boltMetaDataKey) if err != nil { - return err + return nil, nil, err } - err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(s.segPlugin.Type())) + err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(segPlugin.Type())) if err != nil { - return err + return nil, nil, err } buf := make([]byte, binary.MaxVarintLen32) - binary.BigEndian.PutUint32(buf, s.segPlugin.Version()) + binary.BigEndian.PutUint32(buf, segPlugin.Version()) err = metaBucket.Put(boltMetaDataSegmentVersionKey, buf) if err != nil { - return err + return nil, nil, err } // persist internal values internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey) if err != nil { - return err + return nil, nil, err } // TODO optimize writing these in order? for k, v := range snapshot.internal { err = internalBucket.Put([]byte(k), v) if err != nil { - return err + return nil, nil, err } } @@ -488,49 +477,69 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) if err != nil { - return err + return nil, nil, err } switch seg := segmentSnapshot.segment.(type) { case segment.PersistedSegment: - path := seg.Path() - filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator)) + segPath := seg.Path() + filename := strings.TrimPrefix(segPath, path+string(os.PathSeparator)) err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) if err != nil { - return err + return nil, nil, err } filenames = append(filenames, filename) case segment.UnpersistedSegment: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) - path := s.path + string(os.PathSeparator) + filename + path := path + string(os.PathSeparator) + filename err = seg.Persist(path) if err != nil { - return fmt.Errorf("error persisting segment: %v", err) + return nil, nil, fmt.Errorf("error persisting segment: %v", err) } newSegmentPaths[segmentSnapshot.id] = path err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) if err != nil { - return err + return nil, nil, err } filenames = append(filenames, filename) - default: - return fmt.Errorf("unknown segment type: %T", seg) + return nil, nil, fmt.Errorf("unknown segment type: %T", seg) } // store current deleted bits var roaringBuf bytes.Buffer if segmentSnapshot.deleted != nil { _, err = segmentSnapshot.deleted.WriteTo(&roaringBuf) if err != nil { - return fmt.Errorf("error persisting roaring bytes: %v", err) + return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err) } err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes()) if err != nil { - return err + return nil, nil, err } } } + return filenames, newSegmentPaths, nil +} + +func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) { + // start a write transaction + tx, err := s.rootBolt.Begin(true) + if err != nil { + return err + } + // defer rollback on error + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin) + if err != nil { + return err + } + // we need to swap in a new root only when we've persisted 1 or // more segments -- whereby the new root would have 1-for-1 // replacements of in-memory segments with file-based segments @@ -780,12 +789,6 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return rv, nil } -type uint64Descending []uint64 - -func (p uint64Descending) Len() int { return len(p) } -func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] } -func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - func (s *Scorch) removeOldData() { removed, err := s.removeOldBoltSnapshots() if err != nil { diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go b/vendor/github.com/blevesearch/bleve/index/scorch/rollback.go index 7cc87bdea0..7cc87bdea0 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/rollback.go diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go index 80f9e3a797..ba98a460d3 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go @@ -73,9 +73,7 @@ type Scorch struct { onEvent func(event Event) onAsyncError func(err error) - pauseLock sync.RWMutex - - pauseCount uint64 + forceMergeRequestCh chan *mergerCtrl segPlugin segment.Plugin } @@ -101,18 +99,15 @@ func NewScorch(storeName string, nextSnapshotEpoch: 1, closeCh: make(chan struct{}), ineligibleForRemoval: map[string]bool{}, + forceMergeRequestCh: make(chan *mergerCtrl, 1), segPlugin: defaultSegmentPlugin, } - // check if the caller has requested a specific segment type/version - forcedSegmentVersion, ok := config["forceSegmentVersion"].(int) - if ok { - forcedSegmentType, ok2 := config["forceSegmentType"].(string) - if !ok2 { - return nil, fmt.Errorf( - "forceSegmentVersion set to %d, must also specify forceSegmentType", forcedSegmentVersion) - } - + forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) + if err != nil { + return nil, err + } + if forcedSegmentType != "" && forcedSegmentVersion != 0 { err := rv.loadSegmentPlugin(forcedSegmentType, uint32(forcedSegmentVersion)) if err != nil { @@ -140,30 +135,34 @@ func NewScorch(storeName string, return rv, nil } -func (s *Scorch) paused() uint64 { - s.pauseLock.Lock() - pc := s.pauseCount - s.pauseLock.Unlock() - return pc -} +// configForceSegmentTypeVersion checks if the caller has requested a +// specific segment type/version +func configForceSegmentTypeVersion(config map[string]interface{}) (string, uint32, error) { + forcedSegmentVersion, err := parseToInteger(config["forceSegmentVersion"]) + if err != nil { + return "", 0, nil + } -func (s *Scorch) incrPause() { - s.pauseLock.Lock() - s.pauseCount++ - s.pauseLock.Unlock() + forcedSegmentType, ok := config["forceSegmentType"].(string) + if !ok { + return "", 0, fmt.Errorf( + "forceSegmentVersion set to %d, must also specify forceSegmentType", forcedSegmentVersion) + } + + return forcedSegmentType, uint32(forcedSegmentVersion), nil } -func (s *Scorch) decrPause() { - s.pauseLock.Lock() - s.pauseCount-- - s.pauseLock.Unlock() +func (s *Scorch) NumEventsBlocking() uint64 { + eventsCompleted := atomic.LoadUint64(&s.stats.TotEventTriggerCompleted) + eventsStarted := atomic.LoadUint64(&s.stats.TotEventTriggerStarted) + return eventsStarted - eventsCompleted } func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) { if s.onEvent != nil { - s.incrPause() + atomic.AddUint64(&s.stats.TotEventTriggerStarted, 1) s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur}) - s.decrPause() + atomic.AddUint64(&s.stats.TotEventTriggerCompleted, 1) } } @@ -181,7 +180,7 @@ func (s *Scorch) Open() error { } s.asyncTasks.Add(1) - go s.mainLoop() + go s.introducerLoop() if !s.readOnly && s.path != "" { s.asyncTasks.Add(1) @@ -241,6 +240,7 @@ func (s *Scorch) openBolt() error { s.introducerNotifier = make(chan *epochWatcher, 1) s.persisterNotifier = make(chan *epochWatcher, 1) s.closeCh = make(chan struct{}) + s.forceMergeRequestCh = make(chan *mergerCtrl, 1) if !s.readOnly && s.path != "" { err := s.removeOldZapFiles() // Before persister or merger create any new files. @@ -567,6 +567,10 @@ func (s *Scorch) StatsMap() map[string]interface{} { } func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult { + return analyze(d) +} + +func analyze(d *document.Document) *index.AnalysisResult { rv := &index.AnalysisResult{ Document: d, Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)), diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go index 9a4d6c76c9..db06562df1 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go @@ -24,7 +24,6 @@ var reflectStaticSizeUnadornedPostingsIteratorBitmap int var reflectStaticSizeUnadornedPostingsIterator1Hit int var reflectStaticSizeUnadornedPosting int - func init() { var pib UnadornedPostingsIteratorBitmap reflectStaticSizeUnadornedPostingsIteratorBitmap = int(reflect.TypeOf(pib).Size()) @@ -34,7 +33,7 @@ func init() { reflectStaticSizeUnadornedPosting = int(reflect.TypeOf(up).Size()) } -type UnadornedPostingsIteratorBitmap struct{ +type UnadornedPostingsIteratorBitmap struct { actual roaring.IntPeekable actualBM *roaring.Bitmap } @@ -72,16 +71,29 @@ func (i *UnadornedPostingsIteratorBitmap) Size() int { return reflectStaticSizeUnadornedPostingsIteratorBitmap } +func (i *UnadornedPostingsIteratorBitmap) ActualBitmap() *roaring.Bitmap { + return i.actualBM +} + +func (i *UnadornedPostingsIteratorBitmap) DocNum1Hit() (uint64, bool) { + return 0, false +} + +func (i *UnadornedPostingsIteratorBitmap) ReplaceActual(actual *roaring.Bitmap) { + i.actualBM = actual + i.actual = actual.Iterator() +} + func NewUnadornedPostingsIteratorFromBitmap(bm *roaring.Bitmap) PostingsIterator { return &UnadornedPostingsIteratorBitmap{ actualBM: bm, - actual: bm.Iterator(), + actual: bm.Iterator(), } } const docNum1HitFinished = math.MaxUint64 -type UnadornedPostingsIterator1Hit struct{ +type UnadornedPostingsIterator1Hit struct { docNum uint64 } @@ -145,4 +157,4 @@ func (p UnadornedPosting) Locations() []Location { func (p UnadornedPosting) Size() int { return reflectStaticSizeUnadornedPosting -}
\ No newline at end of file +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go index 01eda7fbd5..b830b2c052 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go @@ -21,6 +21,8 @@ import ( zapv11 "github.com/blevesearch/zap/v11" zapv12 "github.com/blevesearch/zap/v12" + zapv13 "github.com/blevesearch/zap/v13" + zapv14 "github.com/blevesearch/zap/v14" ) var supportedSegmentPlugins map[string]map[uint32]segment.Plugin @@ -28,6 +30,8 @@ var defaultSegmentPlugin segment.Plugin func init() { ResetPlugins() + RegisterPlugin(zapv14.Plugin(), false) + RegisterPlugin(zapv13.Plugin(), false) RegisterPlugin(zapv12.Plugin(), false) RegisterPlugin(zapv11.Plugin(), true) } @@ -60,18 +64,28 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) { return rv } -func (s *Scorch) loadSegmentPlugin(forcedSegmentType string, - forcedSegmentVersion uint32) error { +func chooseSegmentPlugin(forcedSegmentType string, + forcedSegmentVersion uint32) (segment.Plugin, error) { if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok { if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok { - s.segPlugin = segPlugin - return nil + return segPlugin, nil } - return fmt.Errorf( + return nil, fmt.Errorf( "unsupported version %d for segment type: %s, supported: %v", forcedSegmentVersion, forcedSegmentType, SupportedSegmentTypeVersions(forcedSegmentType)) } - return fmt.Errorf("unsupported segment type: %s, supported: %v", + return nil, fmt.Errorf("unsupported segment type: %s, supported: %v", forcedSegmentType, SupportedSegmentTypes()) } + +func (s *Scorch) loadSegmentPlugin(forcedSegmentType string, + forcedSegmentVersion uint32) error { + segPlugin, err := chooseSegmentPlugin(forcedSegmentType, + forcedSegmentVersion) + if err != nil { + return err + } + s.segPlugin = segPlugin + return nil +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go index 47cc809b21..9d17bcb2c5 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go @@ -303,9 +303,12 @@ func (i *IndexSnapshot) newDocIDReader(results chan *asynchSegmentResult) (index var err error for count := 0; count < len(i.segment); count++ { asr := <-results - if asr.err != nil && err != nil { - err = asr.err - } else { + if asr.err != nil { + if err == nil { + // returns the first error encountered + err = asr.err + } + } else if err == nil { rv.iterators[asr.index] = asr.docs.Iterator() } } @@ -511,10 +514,20 @@ func (i *IndexSnapshot) allocTermFieldReaderDicts(field string) (tfr *IndexSnaps } } i.m2.Unlock() - return &IndexSnapshotTermFieldReader{} + return &IndexSnapshotTermFieldReader{ + recycle: true, + } } func (i *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) { + if !tfr.recycle { + // Do not recycle an optimized unadorned term field reader (used for + // ConjunctionUnadorned or DisjunctionUnadorned), during when a fresh + // roaring.Bitmap is built by AND-ing or OR-ing individual bitmaps, + // and we'll need to release them for GC. (See MB-40916) + return + } + i.parent.rootLock.RLock() obsolete := i.parent.root != i i.parent.rootLock.RUnlock() diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go index 5d56f19441..239f68fbe7 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go @@ -45,6 +45,7 @@ type IndexSnapshotTermFieldReader struct { includeTermVectors bool currPosting segment.Posting currID index.IndexInternalID + recycle bool } func (i *IndexSnapshotTermFieldReader) Size() int { @@ -133,6 +134,8 @@ func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAllo if err != nil { return nil, err } + // close the current term field reader before replacing it with a new one + _ = i.Close() *i = *(i2.(*IndexSnapshotTermFieldReader)) } num, err := docInternalToNumber(ID) diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/stats.go b/vendor/github.com/blevesearch/bleve/index/scorch/stats.go index e638362a71..626fff2e47 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/stats.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/stats.go @@ -47,6 +47,9 @@ type Stats struct { TotTermSearchersStarted uint64 TotTermSearchersFinished uint64 + TotEventTriggerStarted uint64 + TotEventTriggerCompleted uint64 + TotIntroduceLoop uint64 TotIntroduceSegmentBeg uint64 TotIntroduceSegmentEnd uint64 @@ -82,6 +85,9 @@ type Stats struct { TotFileMergeLoopErr uint64 TotFileMergeLoopEnd uint64 + TotFileMergeForceOpsStarted uint64 + TotFileMergeForceOpsCompleted uint64 + TotFileMergePlan uint64 TotFileMergePlanErr uint64 TotFileMergePlanNone uint64 @@ -105,9 +111,10 @@ type Stats struct { TotFileMergeZapIntroductionTime uint64 MaxFileMergeZapIntroductionTime uint64 - TotFileMergeIntroductions uint64 - TotFileMergeIntroductionsDone uint64 - TotFileMergeIntroductionsSkipped uint64 + TotFileMergeIntroductions uint64 + TotFileMergeIntroductionsDone uint64 + TotFileMergeIntroductionsSkipped uint64 + TotFileMergeIntroductionsObsoleted uint64 CurFilesIneligibleForRemoval uint64 TotSnapshotsRemovedFromMetaStore uint64 diff --git a/vendor/github.com/blevesearch/bleve/index_alias_impl.go b/vendor/github.com/blevesearch/bleve/index_alias_impl.go index 4366fc7956..5aa57d8ac8 100644 --- a/vendor/github.com/blevesearch/bleve/index_alias_impl.go +++ b/vendor/github.com/blevesearch/bleve/index_alias_impl.go @@ -16,7 +16,6 @@ package bleve import ( "context" - "sort" "sync" "time" @@ -44,6 +43,16 @@ func NewIndexAlias(indexes ...Index) *indexAliasImpl { } } +// VisitIndexes invokes the visit callback on every +// indexes included in the index alias. +func (i *indexAliasImpl) VisitIndexes(visit func(Index)) { + i.mutex.RLock() + for _, idx := range i.indexes { + visit(idx) + } + i.mutex.RUnlock() +} + func (i *indexAliasImpl) isAliasToSingleIndex() error { if len(i.indexes) < 1 { return ErrorAliasEmpty @@ -511,10 +520,11 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se } } + sortFunc := req.SortFunc() // sort all hits with the requested order if len(req.Sort) > 0 { sorter := newSearchHitSorter(req.Sort, sr.Hits) - sort.Sort(sorter) + sortFunc(sorter) } // now skip over the correct From @@ -539,7 +549,7 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se req.Sort.Reverse() // resort using the original order mhs := newSearchHitSorter(req.Sort, sr.Hits) - sort.Sort(mhs) + sortFunc(mhs) // reset request req.SearchBefore = req.SearchAfter req.SearchAfter = nil diff --git a/vendor/github.com/blevesearch/bleve/index_impl.go b/vendor/github.com/blevesearch/bleve/index_impl.go index 6324d960eb..629cc9b2ff 100644 --- a/vendor/github.com/blevesearch/bleve/index_impl.go +++ b/vendor/github.com/blevesearch/bleve/index_impl.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "os" - "sort" "sync" "sync/atomic" "time" @@ -579,7 +578,7 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr req.Sort.Reverse() // resort using the original order mhs := newSearchHitSorter(req.Sort, hits) - sort.Sort(mhs) + req.SortFunc()(mhs) // reset request req.SearchBefore = req.SearchAfter req.SearchAfter = nil diff --git a/vendor/github.com/blevesearch/bleve/mapping/document.go b/vendor/github.com/blevesearch/bleve/mapping/document.go index 15cb6b5fa1..355a602e55 100644 --- a/vendor/github.com/blevesearch/bleve/mapping/document.go +++ b/vendor/github.com/blevesearch/bleve/mapping/document.go @@ -251,7 +251,6 @@ func (dm *DocumentMapping) AddFieldMapping(fm *FieldMapping) { // UnmarshalJSON offers custom unmarshaling with optional strict validation func (dm *DocumentMapping) UnmarshalJSON(data []byte) error { - var tmp map[string]json.RawMessage err := json.Unmarshal(data, &tmp) if err != nil { @@ -308,8 +307,8 @@ func (dm *DocumentMapping) UnmarshalJSON(data []byte) error { } func (dm *DocumentMapping) defaultAnalyzerName(path []string) string { - rv := "" current := dm + rv := current.DefaultAnalyzer for _, pathElement := range path { var ok bool current, ok = current.Properties[pathElement] diff --git a/vendor/github.com/blevesearch/bleve/mapping/index.go b/vendor/github.com/blevesearch/bleve/mapping/index.go index 602764cbbf..21ca5cce36 100644 --- a/vendor/github.com/blevesearch/bleve/mapping/index.go +++ b/vendor/github.com/blevesearch/bleve/mapping/index.go @@ -101,26 +101,26 @@ func (im *IndexMappingImpl) AddCustomTokenFilter(name string, config map[string] // returned analyzer is registered in the IndexMapping. // // bleve comes with predefined analyzers, like -// github.com/blevesearch/bleve/analysis/analyzers/custom_analyzer. They are +// github.com/blevesearch/bleve/analysis/analyzer/custom. They are // available only if their package is imported by client code. To achieve this, // use their metadata to fill configuration entries: // // import ( -// "github.com/blevesearch/bleve/analysis/analyzers/custom_analyzer" -// "github.com/blevesearch/bleve/analysis/char_filters/html_char_filter" -// "github.com/blevesearch/bleve/analysis/token_filters/lower_case_filter" -// "github.com/blevesearch/bleve/analysis/tokenizers/unicode" +// "github.com/blevesearch/bleve/analysis/analyzer/custom" +// "github.com/blevesearch/bleve/analysis/char/html" +// "github.com/blevesearch/bleve/analysis/token/lowercase" +// "github.com/blevesearch/bleve/analysis/tokenizer/unicode" // ) // // m := bleve.NewIndexMapping() // err := m.AddCustomAnalyzer("html", map[string]interface{}{ -// "type": custom_analyzer.Name, +// "type": custom.Name, // "char_filters": []string{ -// html_char_filter.Name, +// html.Name, // }, // "tokenizer": unicode.Name, // "token_filters": []string{ -// lower_case_filter.Name, +// lowercase.Name, // ... // }, // }) diff --git a/vendor/github.com/blevesearch/bleve/search.go b/vendor/github.com/blevesearch/bleve/search.go index b337edc9e4..f67450779a 100644 --- a/vendor/github.com/blevesearch/bleve/search.go +++ b/vendor/github.com/blevesearch/bleve/search.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "reflect" + "sort" "time" "github.com/blevesearch/bleve/analysis" @@ -264,6 +265,7 @@ func (h *HighlightRequest) AddField(field string) { // Score controls the kind of scoring performed // SearchAfter supports deep paging by providing a minimum sort key // SearchBefore supports deep paging by providing a maximum sort key +// sortFunc specifies the sort implementation to use for sorting results. // // A special field named "*" can be used to return all fields. type SearchRequest struct { @@ -279,6 +281,8 @@ type SearchRequest struct { Score string `json:"score,omitempty"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` + + sortFunc func(sort.Interface) } func (r *SearchRequest) Validate() error { @@ -606,3 +610,22 @@ func MemoryNeededForSearchResult(req *SearchRequest) uint64 { return uint64(estimate) } + +// SetSortFunc sets the sort implementation to use when sorting hits. +// +// SearchRequests can specify a custom sort implementation to meet +// their needs. For instance, by specifying a parallel sort +// that uses all available cores. +func (r *SearchRequest) SetSortFunc(s func(sort.Interface)) { + r.sortFunc = s +} + +// SortFunc returns the sort implementation to use when sorting hits. +// Defaults to sort.Sort. +func (r *SearchRequest) SortFunc() func(data sort.Interface) { + if r.sortFunc != nil { + return r.sortFunc + } + + return sort.Sort +} diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go index 6a296b68fa..f47da27c4f 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go @@ -16,7 +16,6 @@ package searcher import ( "fmt" - "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/search" ) @@ -37,6 +36,11 @@ func NewDisjunctionSearcher(indexReader index.IndexReader, return newDisjunctionSearcher(indexReader, qsearchers, min, options, true) } +func optionsDisjunctionOptimizable(options search.SearcherOptions) bool { + rv := options.Score == "none" && !options.IncludeTermVectors + return rv +} + func newDisjunctionSearcher(indexReader index.IndexReader, qsearchers []search.Searcher, min float64, options search.SearcherOptions, limit bool) (search.Searcher, error) { @@ -44,7 +48,7 @@ func newDisjunctionSearcher(indexReader index.IndexReader, // do not need extra information like freq-norm's or term vectors // and the requested min is simple if len(qsearchers) > 1 && min <= 1 && - options.Score == "none" && !options.IncludeTermVectors { + optionsDisjunctionOptimizable(options) { rv, err := optimizeCompositeSearcher("disjunction:unadorned", indexReader, qsearchers, options) if err != nil || rv != nil { @@ -103,7 +107,7 @@ func tooManyClauses(count int) bool { return false } -func tooManyClausesErr(count int) error { - return fmt.Errorf("TooManyClauses[%d > maxClauseCount, which is set to %d]", - count, DisjunctionMaxClauseCount) +func tooManyClausesErr(field string, count int) error { + return fmt.Errorf("TooManyClauses over field: `%s` [%d > maxClauseCount,"+ + " which is set to %d]", field, count, DisjunctionMaxClauseCount) } diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go index ec133f1f83..7f0a5a00e3 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go @@ -62,7 +62,7 @@ func newDisjunctionHeapSearcher(indexReader index.IndexReader, limit bool) ( *DisjunctionHeapSearcher, error) { if limit && tooManyClauses(len(searchers)) { - return nil, tooManyClausesErr(len(searchers)) + return nil, tooManyClausesErr("", len(searchers)) } // build our searcher @@ -310,7 +310,7 @@ func (s *DisjunctionHeapSearcher) Optimize(kind string, octx index.OptimizableCo } } - return octx, nil + return nil, nil } // heap impl diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go index e47f39ad09..dc566ade57 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go @@ -50,7 +50,7 @@ func newDisjunctionSliceSearcher(indexReader index.IndexReader, limit bool) ( *DisjunctionSliceSearcher, error) { if limit && tooManyClauses(len(qsearchers)) { - return nil, tooManyClausesErr(len(qsearchers)) + return nil, tooManyClausesErr("", len(qsearchers)) } // build the downstream searchers searchers := make(OrderedSearcherList, len(qsearchers)) @@ -294,5 +294,5 @@ func (s *DisjunctionSliceSearcher) Optimize(kind string, octx index.OptimizableC } } - return octx, nil + return nil, nil } diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go index 8176e59b51..aca8a7d9fa 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go @@ -75,7 +75,7 @@ func findFuzzyCandidateTerms(indexReader index.IndexReader, term string, for err == nil && tfd != nil { rv = append(rv, tfd.Term) if tooManyClauses(len(rv)) { - return nil, tooManyClausesErr(len(rv)) + return nil, tooManyClausesErr(field, len(rv)) } tfd, err = fieldDict.Next() } @@ -107,7 +107,7 @@ func findFuzzyCandidateTerms(indexReader index.IndexReader, term string, if !exceeded && ld <= fuzziness { rv = append(rv, tfd.Term) if tooManyClauses(len(rv)) { - return nil, tooManyClausesErr(len(rv)) + return nil, tooManyClausesErr(field, len(rv)) } } tfd, err = fieldDict.Next() diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go index c4b8af9270..76157f01a1 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go @@ -24,7 +24,7 @@ import ( type filterFunc func(key []byte) bool -var GeoBitsShift1 = (geo.GeoBits << 1) +var GeoBitsShift1 = geo.GeoBits << 1 var GeoBitsShift1Minus1 = GeoBitsShift1 - 1 func NewGeoBoundingBoxSearcher(indexReader index.IndexReader, minLon, minLat, @@ -100,30 +100,42 @@ func NewGeoBoundingBoxSearcher(indexReader index.IndexReader, minLon, minLat, var geoMaxShift = document.GeoPrecisionStep * 4 var geoDetailLevel = ((geo.GeoBits << 1) - geoMaxShift) / 2 +type closeFunc func() error func ComputeGeoRange(term uint64, shift uint, sminLon, sminLat, smaxLon, smaxLat float64, checkBoundaries bool, indexReader index.IndexReader, field string) ( onBoundary [][]byte, notOnBoundary [][]byte, err error) { - preallocBytesLen := 32 - preallocBytes := make([]byte, preallocBytesLen) - makePrefixCoded := func(in int64, shift uint) (rv numeric.PrefixCoded) { - if len(preallocBytes) <= 0 { - preallocBytesLen = preallocBytesLen * 2 - preallocBytes = make([]byte, preallocBytesLen) - } - - rv, preallocBytes, err = - numeric.NewPrefixCodedInt64Prealloc(in, shift, preallocBytes) + isIndexed, closeF, err := buildIsIndexedFunc(indexReader, field) + if closeF != nil { + defer func() { + cerr := closeF() + if cerr != nil { + err = cerr + } + }() + } - return rv + grc := &geoRangeCompute{ + preallocBytesLen: 32, + preallocBytes: make([]byte, 32), + sminLon: sminLon, + sminLat: sminLat, + smaxLon: smaxLon, + smaxLat: smaxLat, + checkBoundaries: checkBoundaries, + isIndexed: isIndexed, } - var fieldDict index.FieldDictContains - var isIndexed filterFunc + grc.computeGeoRange(term, shift) + + return grc.onBoundary, grc.notOnBoundary, nil +} + +func buildIsIndexedFunc(indexReader index.IndexReader, field string) (isIndexed filterFunc, closeF closeFunc, err error) { if irr, ok := indexReader.(index.IndexReaderContains); ok { - fieldDict, err = irr.FieldDictContains(field) + fieldDict, err := irr.FieldDictContains(field) if err != nil { return nil, nil, err } @@ -132,22 +144,18 @@ func ComputeGeoRange(term uint64, shift uint, found, err := fieldDict.Contains(term) return err == nil && found } - } - defer func() { - if fieldDict != nil { + closeF = func() error { if fd, ok := fieldDict.(index.FieldDict); ok { - cerr := fd.Close() - if cerr != nil { - err = cerr + err := fd.Close() + if err != nil { + return err } } + return nil } - }() - - if isIndexed == nil { + } else if indexReader != nil { isIndexed = func(term []byte) bool { - if indexReader != nil { reader, err := indexReader.TermFieldReader(term, field, false, false, false) if err != nil || reader == nil { return false @@ -157,68 +165,15 @@ func ComputeGeoRange(term uint64, shift uint, return false } _ = reader.Close() - } - return true + return true } - } - var computeGeoRange func(term uint64, shift uint) // declare for recursion - - relateAndRecurse := func(start, end uint64, res, level uint) { - minLon := geo.MortonUnhashLon(start) - minLat := geo.MortonUnhashLat(start) - maxLon := geo.MortonUnhashLon(end) - maxLat := geo.MortonUnhashLat(end) - - within := res%document.GeoPrecisionStep == 0 && - geo.RectWithin(minLon, minLat, maxLon, maxLat, - sminLon, sminLat, smaxLon, smaxLat) - if within || (level == geoDetailLevel && - geo.RectIntersects(minLon, minLat, maxLon, maxLat, - sminLon, sminLat, smaxLon, smaxLat)) { - codedTerm := makePrefixCoded(int64(start), res) - if isIndexed(codedTerm) { - if !within && checkBoundaries { - onBoundary = append(onBoundary, codedTerm) - } else { - notOnBoundary = append(notOnBoundary, codedTerm) - } - } - } else if level < geoDetailLevel && - geo.RectIntersects(minLon, minLat, maxLon, maxLat, - sminLon, sminLat, smaxLon, smaxLat) { - computeGeoRange(start, res-1) + } else { + isIndexed = func([]byte) bool { + return true } } - - computeGeoRange = func(term uint64, shift uint) { - if err != nil { - return - } - - split := term | uint64(0x1)<<shift - var upperMax uint64 - if shift < 63 { - upperMax = term | ((uint64(1) << (shift + 1)) - 1) - } else { - upperMax = 0xffffffffffffffff - } - - lowerMax := split - 1 - - level := (GeoBitsShift1 - shift) >> 1 - - relateAndRecurse(term, lowerMax, shift, level) - relateAndRecurse(split, upperMax, shift, level) - } - - computeGeoRange(term, shift) - - if err != nil { - return nil, nil, err - } - - return onBoundary, notOnBoundary, err + return isIndexed, closeF, err } func buildRectFilter(dvReader index.DocValueReader, field string, @@ -252,3 +207,66 @@ func buildRectFilter(dvReader index.DocValueReader, field string, return false } } + +type geoRangeCompute struct { + preallocBytesLen int + preallocBytes []byte + sminLon, sminLat, smaxLon, smaxLat float64 + checkBoundaries bool + onBoundary, notOnBoundary [][]byte + isIndexed func(term []byte) bool +} + +func (grc *geoRangeCompute) makePrefixCoded(in int64, shift uint) (rv numeric.PrefixCoded) { + if len(grc.preallocBytes) <= 0 { + grc.preallocBytesLen = grc.preallocBytesLen * 2 + grc.preallocBytes = make([]byte, grc.preallocBytesLen) + } + + rv, grc.preallocBytes, _ = + numeric.NewPrefixCodedInt64Prealloc(in, shift, grc.preallocBytes) + + return rv +} + +func (grc *geoRangeCompute) computeGeoRange(term uint64, shift uint) { + split := term | uint64(0x1)<<shift + var upperMax uint64 + if shift < 63 { + upperMax = term | ((uint64(1) << (shift + 1)) - 1) + } else { + upperMax = 0xffffffffffffffff + } + lowerMax := split - 1 + grc.relateAndRecurse(term, lowerMax, shift) + grc.relateAndRecurse(split, upperMax, shift) +} + +func (grc *geoRangeCompute) relateAndRecurse(start, end uint64, res uint) { + minLon := geo.MortonUnhashLon(start) + minLat := geo.MortonUnhashLat(start) + maxLon := geo.MortonUnhashLon(end) + maxLat := geo.MortonUnhashLat(end) + + level := (GeoBitsShift1 - res) >> 1 + + within := res%document.GeoPrecisionStep == 0 && + geo.RectWithin(minLon, minLat, maxLon, maxLat, + grc.sminLon, grc.sminLat, grc.smaxLon, grc.smaxLat) + if within || (level == geoDetailLevel && + geo.RectIntersects(minLon, minLat, maxLon, maxLat, + grc.sminLon, grc.sminLat, grc.smaxLon, grc.smaxLat)) { + codedTerm := grc.makePrefixCoded(int64(start), res) + if grc.isIndexed(codedTerm) { + if !within && grc.checkBoundaries { + grc.onBoundary = append(grc.onBoundary, codedTerm) + } else { + grc.notOnBoundary = append(grc.notOnBoundary, codedTerm) + } + } + } else if level < geoDetailLevel && + geo.RectIntersects(minLon, minLat, maxLon, maxLat, + grc.sminLon, grc.sminLat, grc.smaxLon, grc.smaxLat) { + grc.computeGeoRange(start, res-1) + } +}
\ No newline at end of file diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go index c48366ee27..70a2fa38c0 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go @@ -15,6 +15,7 @@ package searcher import ( + "fmt" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/search" ) @@ -22,10 +23,113 @@ import ( func NewMultiTermSearcher(indexReader index.IndexReader, terms []string, field string, boost float64, options search.SearcherOptions, limit bool) ( search.Searcher, error) { - if limit && tooManyClauses(len(terms)) { - return nil, tooManyClausesErr(len(terms)) + + if tooManyClauses(len(terms)) { + if optionsDisjunctionOptimizable(options) { + return optimizeMultiTermSearcher(indexReader, terms, field, boost, options) + } + if limit { + return nil, tooManyClausesErr(field, len(terms)) + } + } + + qsearchers, err := makeBatchSearchers(indexReader, terms, field, boost, options) + if err != nil { + return nil, err } + // build disjunction searcher of these ranges + return newMultiTermSearcherInternal(indexReader, qsearchers, field, boost, + options, limit) +} + +func NewMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte, + field string, boost float64, options search.SearcherOptions, limit bool) ( + search.Searcher, error) { + + if tooManyClauses(len(terms)) { + if optionsDisjunctionOptimizable(options) { + return optimizeMultiTermSearcherBytes(indexReader, terms, field, boost, options) + } + + if limit { + return nil, tooManyClausesErr(field, len(terms)) + } + } + + qsearchers, err := makeBatchSearchersBytes(indexReader, terms, field, boost, options) + if err != nil { + return nil, err + } + + // build disjunction searcher of these ranges + return newMultiTermSearcherInternal(indexReader, qsearchers, field, boost, + options, limit) +} + +func newMultiTermSearcherInternal(indexReader index.IndexReader, + searchers []search.Searcher, field string, boost float64, + options search.SearcherOptions, limit bool) ( + search.Searcher, error) { + + // build disjunction searcher of these ranges + searcher, err := newDisjunctionSearcher(indexReader, searchers, 0, options, + limit) + if err != nil { + for _, s := range searchers { + _ = s.Close() + } + return nil, err + } + + return searcher, nil +} + +func optimizeMultiTermSearcher(indexReader index.IndexReader, terms []string, + field string, boost float64, options search.SearcherOptions) ( + search.Searcher, error) { + var finalSearcher search.Searcher + for len(terms) > 0 { + var batchTerms []string + if len(terms) > DisjunctionMaxClauseCount { + batchTerms = terms[:DisjunctionMaxClauseCount] + terms = terms[DisjunctionMaxClauseCount:] + } else { + batchTerms = terms + terms = nil + } + batch, err := makeBatchSearchers(indexReader, batchTerms, field, boost, options) + if err != nil { + return nil, err + } + if finalSearcher != nil { + batch = append(batch, finalSearcher) + } + cleanup := func() { + for _, searcher := range batch { + if searcher != nil { + _ = searcher.Close() + } + } + } + finalSearcher, err = optimizeCompositeSearcher("disjunction:unadorned", + indexReader, batch, options) + // all searchers in batch should be closed, regardless of error or optimization failure + // either we're returning, or continuing and only finalSearcher is needed for next loop + cleanup() + if err != nil { + return nil, err + } + if finalSearcher == nil { + return nil, fmt.Errorf("unable to optimize") + } + } + return finalSearcher, nil +} + +func makeBatchSearchers(indexReader index.IndexReader, terms []string, field string, + boost float64, options search.SearcherOptions) ([]search.Searcher, error) { + qsearchers := make([]search.Searcher, len(terms)) qsearchersClose := func() { for _, searcher := range qsearchers { @@ -42,17 +146,54 @@ func NewMultiTermSearcher(indexReader index.IndexReader, terms []string, return nil, err } } - // build disjunction searcher of these ranges - return newMultiTermSearcherBytes(indexReader, qsearchers, field, boost, - options, limit) + return qsearchers, nil } -func NewMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte, - field string, boost float64, options search.SearcherOptions, limit bool) ( +func optimizeMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte, + field string, boost float64, options search.SearcherOptions) ( search.Searcher, error) { - if limit && tooManyClauses(len(terms)) { - return nil, tooManyClausesErr(len(terms)) + + var finalSearcher search.Searcher + for len(terms) > 0 { + var batchTerms [][]byte + if len(terms) > DisjunctionMaxClauseCount { + batchTerms = terms[:DisjunctionMaxClauseCount] + terms = terms[DisjunctionMaxClauseCount:] + } else { + batchTerms = terms + terms = nil + } + batch, err := makeBatchSearchersBytes(indexReader, batchTerms, field, boost, options) + if err != nil { + return nil, err + } + if finalSearcher != nil { + batch = append(batch, finalSearcher) + } + cleanup := func() { + for _, searcher := range batch { + if searcher != nil { + _ = searcher.Close() + } + } + } + finalSearcher, err = optimizeCompositeSearcher("disjunction:unadorned", + indexReader, batch, options) + // all searchers in batch should be closed, regardless of error or optimization failure + // either we're returning, or continuing and only finalSearcher is needed for next loop + cleanup() + if err != nil { + return nil, err + } + if finalSearcher == nil { + return nil, fmt.Errorf("unable to optimize") + } } + return finalSearcher, nil +} + +func makeBatchSearchersBytes(indexReader index.IndexReader, terms [][]byte, field string, + boost float64, options search.SearcherOptions) ([]search.Searcher, error) { qsearchers := make([]search.Searcher, len(terms)) qsearchersClose := func() { @@ -70,24 +211,5 @@ func NewMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte, return nil, err } } - return newMultiTermSearcherBytes(indexReader, qsearchers, field, boost, - options, limit) -} - -func newMultiTermSearcherBytes(indexReader index.IndexReader, - searchers []search.Searcher, field string, boost float64, - options search.SearcherOptions, limit bool) ( - search.Searcher, error) { - - // build disjunction searcher of these ranges - searcher, err := newDisjunctionSearcher(indexReader, searchers, 0, options, - limit) - if err != nil { - for _, s := range searchers { - _ = s.Close() - } - return nil, err - } - - return searcher, nil + return qsearchers, nil } diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go index 83107f0201..48d6226e11 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go @@ -74,9 +74,8 @@ func NewNumericRangeSearcher(indexReader index.IndexReader, terms := termRanges.Enumerate(isIndexed) if fieldDict != nil { if fd, ok := fieldDict.(index.FieldDict); ok { - cerr := fd.Close() - if cerr != nil { - err = cerr + if err = fd.Close(); err != nil { + return nil, err } } } @@ -97,7 +96,7 @@ func NewNumericRangeSearcher(indexReader index.IndexReader, } if tooManyClauses(len(terms)) { - return nil, tooManyClausesErr(len(terms)) + return nil, tooManyClausesErr(field, len(terms)) } return NewMultiTermSearcherBytes(indexReader, terms, field, boost, options, diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go index 4def832c47..11a44f159f 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go @@ -110,7 +110,7 @@ func findRegexpCandidateTerms(indexReader index.IndexReader, if matchPos != nil && matchPos[0] == 0 && matchPos[1] == len(tfd.Term) { rv = append(rv, tfd.Term) if tooManyClauses(len(rv)) { - return rv, tooManyClausesErr(len(rv)) + return rv, tooManyClausesErr(field, len(rv)) } } tfd, err = fieldDict.Next() diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go index c1af74c76e..e07d25333a 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go @@ -137,5 +137,5 @@ func (s *TermSearcher) Optimize(kind string, octx index.OptimizableContext) ( return o.Optimize(kind, octx) } - return octx, nil + return nil, nil } diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go index b5af4631fe..2a8f22cff1 100644 --- a/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go +++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go @@ -38,7 +38,7 @@ func NewTermPrefixSearcher(indexReader index.IndexReader, prefix string, for err == nil && tfd != nil { terms = append(terms, tfd.Term) if tooManyClauses(len(terms)) { - return nil, tooManyClausesErr(len(terms)) + return nil, tooManyClausesErr(field, len(terms)) } tfd, err = fieldDict.Next() } diff --git a/vendor/github.com/blevesearch/bleve/search/sort.go b/vendor/github.com/blevesearch/bleve/search/sort.go index 6e4ed80fa2..dca422ebd4 100644 --- a/vendor/github.com/blevesearch/bleve/search/sort.go +++ b/vendor/github.com/blevesearch/bleve/search/sort.go @@ -233,7 +233,11 @@ func (so SortOrder) Compare(cachedScoring, cachedDesc []bool, i, j *DocumentMatc } else { iVal := i.Sort[x] jVal := j.Sort[x] - c = strings.Compare(iVal, jVal) + if iVal < jVal { + c = -1 + } else if iVal > jVal { + c = 1 + } } if c == 0 { @@ -423,7 +427,8 @@ func (s *SortField) filterTermsByType(terms [][]byte) [][]byte { allTermsPrefixCoded = false } } - if allTermsPrefixCoded { + // reset the terms only when valid zero shift terms are found. + if allTermsPrefixCoded && len(termsWithShiftZero) > 0 { terms = termsWithShiftZero s.tmp = termsWithShiftZero[:0] } |