summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/blevesearch/bleve
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/blevesearch/bleve')
-rw-r--r--vendor/github.com/blevesearch/bleve/README.md7
-rw-r--r--vendor/github.com/blevesearch/bleve/builder.go94
-rw-r--r--vendor/github.com/blevesearch/bleve/go.mod13
-rw-r--r--vendor/github.com/blevesearch/bleve/index.go14
-rw-r--r--vendor/github.com/blevesearch/bleve/index/index.go7
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/builder.go334
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/event.go8
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/introducer.go18
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/merge.go168
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go13
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/optimize.go60
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/persister.go85
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/rollback.go (renamed from vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go)0
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/scorch.go62
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go22
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go26
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go21
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go3
-rw-r--r--vendor/github.com/blevesearch/bleve/index/scorch/stats.go13
-rw-r--r--vendor/github.com/blevesearch/bleve/index_alias_impl.go16
-rw-r--r--vendor/github.com/blevesearch/bleve/index_impl.go3
-rw-r--r--vendor/github.com/blevesearch/bleve/mapping/document.go3
-rw-r--r--vendor/github.com/blevesearch/bleve/mapping/index.go16
-rw-r--r--vendor/github.com/blevesearch/bleve/search.go23
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go14
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go4
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go4
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go4
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go184
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go180
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go7
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go2
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_term.go2
-rw-r--r--vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go2
-rw-r--r--vendor/github.com/blevesearch/bleve/search/sort.go9
35 files changed, 1144 insertions, 297 deletions
diff --git a/vendor/github.com/blevesearch/bleve/README.md b/vendor/github.com/blevesearch/bleve/README.md
index 7c1a7c7c46..eff0be97e4 100644
--- a/vendor/github.com/blevesearch/bleve/README.md
+++ b/vendor/github.com/blevesearch/bleve/README.md
@@ -1,10 +1,13 @@
# ![bleve](docs/bleve.png) bleve
-[![Build Status](https://travis-ci.org/blevesearch/bleve.svg?branch=master)](https://travis-ci.org/blevesearch/bleve) [![Coverage Status](https://coveralls.io/repos/github/blevesearch/bleve/badge.svg?branch=master)](https://coveralls.io/github/blevesearch/bleve?branch=master) [![GoDoc](https://godoc.org/github.com/blevesearch/bleve?status.svg)](https://godoc.org/github.com/blevesearch/bleve)
+[![Tests](https://github.com/blevesearch/bleve/workflows/Tests/badge.svg?branch=master&event=push)](https://github.com/blevesearch/bleve/actions?query=workflow%3ATests+event%3Apush+branch%3Amaster)
+[![Coverage Status](https://coveralls.io/repos/github/blevesearch/bleve/badge.svg?branch=master)](https://coveralls.io/github/blevesearch/bleve?branch=master)
+[![GoDoc](https://godoc.org/github.com/blevesearch/bleve?status.svg)](https://godoc.org/github.com/blevesearch/bleve)
[![Join the chat at https://gitter.im/blevesearch/bleve](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/blevesearch/bleve?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![codebeat](https://codebeat.co/badges/38a7cbc9-9cf5-41c0-a315-0746178230f4)](https://codebeat.co/projects/github-com-blevesearch-bleve)
[![Go Report Card](https://goreportcard.com/badge/blevesearch/bleve)](https://goreportcard.com/report/blevesearch/bleve)
-[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
+[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge)
+[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
modern text indexing in go - [blevesearch.com](http://www.blevesearch.com/)
diff --git a/vendor/github.com/blevesearch/bleve/builder.go b/vendor/github.com/blevesearch/bleve/builder.go
new file mode 100644
index 0000000000..de00c97b6e
--- /dev/null
+++ b/vendor/github.com/blevesearch/bleve/builder.go
@@ -0,0 +1,94 @@
+// Copyright (c) 2019 Couchbase, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bleve
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/blevesearch/bleve/document"
+ "github.com/blevesearch/bleve/index"
+ "github.com/blevesearch/bleve/index/scorch"
+ "github.com/blevesearch/bleve/mapping"
+)
+
+type builderImpl struct {
+ b index.IndexBuilder
+ m mapping.IndexMapping
+}
+
+func (b *builderImpl) Index(id string, data interface{}) error {
+ if id == "" {
+ return ErrorEmptyID
+ }
+
+ doc := document.NewDocument(id)
+ err := b.m.MapDocument(doc, data)
+ if err != nil {
+ return err
+ }
+ err = b.b.Index(doc)
+ return err
+}
+
+func (b *builderImpl) Close() error {
+ return b.b.Close()
+}
+
+func newBuilder(path string, mapping mapping.IndexMapping, config map[string]interface{}) (Builder, error) {
+ if path == "" {
+ return nil, fmt.Errorf("builder requires path")
+ }
+
+ err := mapping.Validate()
+ if err != nil {
+ return nil, err
+ }
+
+ if config == nil {
+ config = map[string]interface{}{}
+ }
+
+ // the builder does not have an API to interact with internal storage
+ // however we can pass k/v pairs through the config
+ mappingBytes, err := json.Marshal(mapping)
+ if err != nil {
+ return nil, err
+ }
+ config["internal"] = map[string][]byte{
+ string(mappingInternalKey): mappingBytes,
+ }
+
+ // do not use real config, as these are options for the builder,
+ // not the resulting index
+ meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{})
+ err = meta.Save(path)
+ if err != nil {
+ return nil, err
+ }
+
+ config["path"] = indexStorePath(path)
+
+ b, err := scorch.NewBuilder(config)
+ if err != nil {
+ return nil, err
+ }
+ rv := &builderImpl{
+ b: b,
+ m: mapping,
+ }
+
+ return rv, nil
+}
diff --git a/vendor/github.com/blevesearch/bleve/go.mod b/vendor/github.com/blevesearch/bleve/go.mod
index d38cf8f921..9a12454fda 100644
--- a/vendor/github.com/blevesearch/bleve/go.mod
+++ b/vendor/github.com/blevesearch/bleve/go.mod
@@ -3,16 +3,17 @@ module github.com/blevesearch/bleve
go 1.13
require (
- github.com/RoaringBitmap/roaring v0.4.21
+ github.com/RoaringBitmap/roaring v0.4.23
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowballstem v0.9.0
- github.com/blevesearch/zap/v11 v11.0.7
- github.com/blevesearch/zap/v12 v12.0.7
- github.com/couchbase/ghistogram v0.1.0 // indirect
+ github.com/blevesearch/zap/v11 v11.0.10
+ github.com/blevesearch/zap/v12 v12.0.10
+ github.com/blevesearch/zap/v13 v13.0.2
+ github.com/blevesearch/zap/v14 v14.0.1
github.com/couchbase/moss v0.1.0
- github.com/couchbase/vellum v1.0.1
+ github.com/couchbase/vellum v1.0.2
github.com/golang/protobuf v1.3.2
github.com/kljensen/snowball v0.6.0
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563
@@ -20,6 +21,6 @@ require (
github.com/steveyen/gtreap v0.1.0
github.com/syndtr/goleveldb v1.0.0
github.com/willf/bitset v1.1.10
- go.etcd.io/bbolt v1.3.4
+ go.etcd.io/bbolt v1.3.5
golang.org/x/text v0.3.0
)
diff --git a/vendor/github.com/blevesearch/bleve/index.go b/vendor/github.com/blevesearch/bleve/index.go
index ef6ede9343..974358b81e 100644
--- a/vendor/github.com/blevesearch/bleve/index.go
+++ b/vendor/github.com/blevesearch/bleve/index.go
@@ -293,3 +293,17 @@ func Open(path string) (Index, error) {
func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error) {
return openIndexUsing(path, runtimeConfig)
}
+
+// Builder is a limited interface, used to build indexes in an offline mode.
+// Items cannot be updated or deleted, and the caller MUST ensure a document is
+// indexed only once.
+type Builder interface {
+ Index(id string, data interface{}) error
+ Close() error
+}
+
+// NewBuilder creates a builder, which will build an index at the specified path,
+// using the specified mapping and options.
+func NewBuilder(path string, mapping mapping.IndexMapping, config map[string]interface{}) (Builder, error) {
+ return newBuilder(path, mapping, config)
+}
diff --git a/vendor/github.com/blevesearch/bleve/index/index.go b/vendor/github.com/blevesearch/bleve/index/index.go
index 3e866f3aab..551f8de842 100644
--- a/vendor/github.com/blevesearch/bleve/index/index.go
+++ b/vendor/github.com/blevesearch/bleve/index/index.go
@@ -367,3 +367,10 @@ type OptimizableContext interface {
type DocValueReader interface {
VisitDocValues(id IndexInternalID, visitor DocumentFieldTermVisitor) error
}
+
+// IndexBuilder is an interface supported by some index schemes
+// to allow direct write-only index building
+type IndexBuilder interface {
+ Index(doc *document.Document) error
+ Close() error
+}
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/builder.go b/vendor/github.com/blevesearch/bleve/index/scorch/builder.go
new file mode 100644
index 0000000000..1f4b41d639
--- /dev/null
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/builder.go
@@ -0,0 +1,334 @@
+// Copyright (c) 2019 Couchbase, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package scorch
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+
+ "github.com/RoaringBitmap/roaring"
+ "github.com/blevesearch/bleve/document"
+ "github.com/blevesearch/bleve/index"
+ "github.com/blevesearch/bleve/index/scorch/segment"
+ bolt "go.etcd.io/bbolt"
+)
+
+const DefaultBuilderBatchSize = 1000
+const DefaultBuilderMergeMax = 10
+
+type Builder struct {
+ m sync.Mutex
+ segCount uint64
+ path string
+ buildPath string
+ segPaths []string
+ batchSize int
+ mergeMax int
+ batch *index.Batch
+ internal map[string][]byte
+ segPlugin segment.Plugin
+}
+
+func NewBuilder(config map[string]interface{}) (*Builder, error) {
+ path, ok := config["path"].(string)
+ if !ok {
+ return nil, fmt.Errorf("must specify path")
+ }
+
+ buildPathPrefix, _ := config["buildPathPrefix"].(string)
+ buildPath, err := ioutil.TempDir(buildPathPrefix, "scorch-offline-build")
+ if err != nil {
+ return nil, err
+ }
+
+ rv := &Builder{
+ path: path,
+ buildPath: buildPath,
+ mergeMax: DefaultBuilderMergeMax,
+ batchSize: DefaultBuilderBatchSize,
+ batch: index.NewBatch(),
+ segPlugin: defaultSegmentPlugin,
+ }
+
+ err = rv.parseConfig(config)
+ if err != nil {
+ return nil, fmt.Errorf("error parsing builder config: %v", err)
+ }
+
+ return rv, nil
+}
+
+func (o *Builder) parseConfig(config map[string]interface{}) (err error) {
+ if v, ok := config["mergeMax"]; ok {
+ var t int
+ if t, err = parseToInteger(v); err != nil {
+ return fmt.Errorf("mergeMax parse err: %v", err)
+ }
+ if t > 0 {
+ o.mergeMax = t
+ }
+ }
+
+ if v, ok := config["batchSize"]; ok {
+ var t int
+ if t, err = parseToInteger(v); err != nil {
+ return fmt.Errorf("batchSize parse err: %v", err)
+ }
+ if t > 0 {
+ o.batchSize = t
+ }
+ }
+
+ if v, ok := config["internal"]; ok {
+ if vinternal, ok := v.(map[string][]byte); ok {
+ o.internal = vinternal
+ }
+ }
+
+ forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
+ if err != nil {
+ return err
+ }
+ if forcedSegmentType != "" && forcedSegmentVersion != 0 {
+ segPlugin, err := chooseSegmentPlugin(forcedSegmentType,
+ uint32(forcedSegmentVersion))
+ if err != nil {
+ return err
+ }
+ o.segPlugin = segPlugin
+ }
+
+ return nil
+}
+
+// Index will place the document into the index.
+// It is invalid to index the same document multiple times.
+func (o *Builder) Index(doc *document.Document) error {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ o.batch.Update(doc)
+
+ return o.maybeFlushBatchLOCKED(o.batchSize)
+}
+
+func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error {
+ if len(o.batch.IndexOps) >= moreThan {
+ defer o.batch.Reset()
+ return o.executeBatchLOCKED(o.batch)
+ }
+ return nil
+}
+
+func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) {
+ analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps))
+ for _, doc := range batch.IndexOps {
+ if doc != nil {
+ // insert _id field
+ doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
+ // perform analysis directly
+ analysisResult := analyze(doc)
+ analysisResults = append(analysisResults, analysisResult)
+ }
+ }
+
+ seg, _, err := o.segPlugin.New(analysisResults)
+ if err != nil {
+ return fmt.Errorf("error building segment base: %v", err)
+ }
+
+ filename := zapFileName(o.segCount)
+ o.segCount++
+ path := o.buildPath + string(os.PathSeparator) + filename
+
+ if segUnpersisted, ok := seg.(segment.UnpersistedSegment); ok {
+ err = segUnpersisted.Persist(path)
+ if err != nil {
+ return fmt.Errorf("error persisting segment base to %s: %v", path, err)
+ }
+
+ o.segPaths = append(o.segPaths, path)
+ return nil
+ }
+
+ return fmt.Errorf("new segment does not implement unpersisted: %T", seg)
+}
+
+func (o *Builder) doMerge() error {
+ // as long as we have more than 1 segment, keep merging
+ for len(o.segPaths) > 1 {
+
+ // merge the next <mergeMax> number of segments into one new one
+ // or, if there are fewer than <mergeMax> remaining, merge them all
+ mergeCount := o.mergeMax
+ if mergeCount > len(o.segPaths) {
+ mergeCount = len(o.segPaths)
+ }
+
+ mergePaths := o.segPaths[0:mergeCount]
+ o.segPaths = o.segPaths[mergeCount:]
+
+ // open each of the segments to be merged
+ mergeSegs := make([]segment.Segment, 0, mergeCount)
+
+ // closeOpenedSegs attempts to close all opened
+ // segments even if an error occurs, in which case
+ // the first error is returned
+ closeOpenedSegs := func() error {
+ var err error
+ for _, seg := range mergeSegs {
+ clErr := seg.Close()
+ if clErr != nil && err == nil {
+ err = clErr
+ }
+ }
+ return err
+ }
+
+ for _, mergePath := range mergePaths {
+ seg, err := o.segPlugin.Open(mergePath)
+ if err != nil {
+ _ = closeOpenedSegs()
+ return fmt.Errorf("error opening segment (%s) for merge: %v", mergePath, err)
+ }
+ mergeSegs = append(mergeSegs, seg)
+ }
+
+ // do the merge
+ mergedSegPath := o.buildPath + string(os.PathSeparator) + zapFileName(o.segCount)
+ drops := make([]*roaring.Bitmap, mergeCount)
+ _, _, err := o.segPlugin.Merge(mergeSegs, drops, mergedSegPath, nil, nil)
+ if err != nil {
+ _ = closeOpenedSegs()
+ return fmt.Errorf("error merging segments (%v): %v", mergePaths, err)
+ }
+ o.segCount++
+ o.segPaths = append(o.segPaths, mergedSegPath)
+
+ // close segments opened for merge
+ err = closeOpenedSegs()
+ if err != nil {
+ return fmt.Errorf("error closing opened segments: %v", err)
+ }
+
+ // remove merged segments
+ for _, mergePath := range mergePaths {
+ err = os.RemoveAll(mergePath)
+ if err != nil {
+ return fmt.Errorf("error removing segment %s after merge: %v", mergePath, err)
+ }
+ }
+ }
+
+ return nil
+}
+
+func (o *Builder) Close() error {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ // see if there is a partial batch
+ err := o.maybeFlushBatchLOCKED(1)
+ if err != nil {
+ return fmt.Errorf("error flushing batch before close: %v", err)
+ }
+
+ // perform all the merging
+ err = o.doMerge()
+ if err != nil {
+ return fmt.Errorf("error while merging: %v", err)
+ }
+
+ // ensure the store path exists
+ err = os.MkdirAll(o.path, 0700)
+ if err != nil {
+ return err
+ }
+
+ // move final segment into place
+ // segment id 2 is chosen to match the behavior of a scorch
+ // index which indexes a single batch of data
+ finalSegPath := o.path + string(os.PathSeparator) + zapFileName(2)
+ err = os.Rename(o.segPaths[0], finalSegPath)
+ if err != nil {
+ return fmt.Errorf("error moving final segment into place: %v", err)
+ }
+
+ // remove the buildPath, as it is no longer needed
+ err = os.RemoveAll(o.buildPath)
+ if err != nil {
+ return fmt.Errorf("error removing build path: %v", err)
+ }
+
+ // prepare wrapping
+ seg, err := o.segPlugin.Open(finalSegPath)
+ if err != nil {
+ return fmt.Errorf("error opening final segment")
+ }
+
+ // create a segment snapshot for this segment
+ ss := &SegmentSnapshot{
+ segment: seg,
+ }
+ is := &IndexSnapshot{
+ epoch: 3, // chosen to match scorch behavior when indexing a single batch
+ segment: []*SegmentSnapshot{ss},
+ creator: "scorch-builder",
+ internal: o.internal,
+ }
+
+ // create the root bolt
+ rootBoltPath := o.path + string(os.PathSeparator) + "root.bolt"
+ rootBolt, err := bolt.Open(rootBoltPath, 0600, nil)
+ if err != nil {
+ return err
+ }
+
+ // start a write transaction
+ tx, err := rootBolt.Begin(true)
+ if err != nil {
+ return err
+ }
+
+ // fill the root bolt with this fake index snapshot
+ _, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin)
+ if err != nil {
+ _ = tx.Rollback()
+ _ = rootBolt.Close()
+ return fmt.Errorf("error preparing bolt snapshot in root.bolt: %v", err)
+ }
+
+ // commit bolt data
+ err = tx.Commit()
+ if err != nil {
+ _ = rootBolt.Close()
+ return fmt.Errorf("error committing bolt tx in root.bolt: %v", err)
+ }
+
+ // close bolt
+ err = rootBolt.Close()
+ if err != nil {
+ return fmt.Errorf("error closing root.bolt: %v", err)
+ }
+
+ // close final segment
+ err = seg.Close()
+ if err != nil {
+ return fmt.Errorf("error closing final segment: %v", err)
+ }
+ return nil
+}
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/event.go b/vendor/github.com/blevesearch/bleve/index/scorch/event.go
index dd79d6d066..8f3fc1914d 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/event.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/event.go
@@ -54,3 +54,11 @@ var EventKindBatchIntroductionStart = EventKind(5)
// EventKindBatchIntroduction is fired when Batch() completes.
var EventKindBatchIntroduction = EventKind(6)
+
+// EventKindMergeTaskIntroductionStart is fired when the merger is about to
+// start the introduction of merged segment from a single merge task.
+var EventKindMergeTaskIntroductionStart = EventKind(7)
+
+// EventKindMergeTaskIntroduction is fired when the merger has completed
+// the introduction of merged segment from a single merge task.
+var EventKindMergeTaskIntroduction = EventKind(8)
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go
index e5f00f80e1..7770c41c52 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go
@@ -45,13 +45,7 @@ type epochWatcher struct {
notifyCh notificationChan
}
-type snapshotReversion struct {
- snapshot *IndexSnapshot
- applied chan error
- persisted chan error
-}
-
-func (s *Scorch) mainLoop() {
+func (s *Scorch) introducerLoop() {
var epochWatchers []*epochWatcher
OUTER:
for {
@@ -389,6 +383,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
}
}
+ var skipped bool
// In case where all the docs in the newly merged segment getting
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
@@ -411,6 +406,9 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
docsToPersistCount += nextMerge.new.Count() - newSegmentDeleted.GetCardinality()
memSegments++
}
+ } else {
+ skipped = true
+ atomic.AddUint64(&s.stats.TotFileMergeIntroductionsObsoleted, 1)
}
atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
@@ -435,8 +433,10 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
// notify requester that we incorporated this
- nextMerge.notify <- newSnapshot
- close(nextMerge.notify)
+ nextMerge.notifyCh <- &mergeTaskIntroStatus{
+ indexSnapshot: newSnapshot,
+ skipped: skipped}
+ close(nextMerge.notifyCh)
}
func isMemorySegment(s *SegmentSnapshot) bool {
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go
index 37dca529a6..56c0953f46 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go
@@ -15,6 +15,7 @@
package scorch
import (
+ "context"
"encoding/json"
"fmt"
"os"
@@ -29,12 +30,16 @@ import (
func (s *Scorch) mergerLoop() {
var lastEpochMergePlanned uint64
+ var ctrlMsg *mergerCtrl
mergePlannerOptions, err := s.parseMergePlannerOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
s.asyncTasks.Done()
return
}
+ ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
+ options: mergePlannerOptions,
+ doneCh: nil}
OUTER:
for {
@@ -53,16 +58,30 @@ OUTER:
atomic.StoreUint64(&s.iStats.mergeEpoch, ourSnapshot.epoch)
s.rootLock.Unlock()
- if ourSnapshot.epoch != lastEpochMergePlanned {
+ if ctrlMsg == nil && ourSnapshot.epoch != lastEpochMergePlanned {
+ ctrlMsg = ctrlMsgDflt
+ }
+ if ctrlMsg != nil {
startTime := time.Now()
// lets get started
- err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions)
+ err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options,
+ ourSnapshot)
if err != nil {
atomic.StoreUint64(&s.iStats.mergeEpoch, 0)
if err == segment.ErrClosed {
// index has been closed
_ = ourSnapshot.DecRef()
+
+ // continue the workloop on a user triggered cancel
+ if ctrlMsg.doneCh != nil {
+ close(ctrlMsg.doneCh)
+ ctrlMsg = nil
+ continue OUTER
+ }
+
+ // exit the workloop on index closure
+ ctrlMsg = nil
break OUTER
}
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
@@ -70,6 +89,12 @@ OUTER:
atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
continue OUTER
}
+
+ if ctrlMsg.doneCh != nil {
+ close(ctrlMsg.doneCh)
+ }
+ ctrlMsg = nil
+
lastEpochMergePlanned = ourSnapshot.epoch
atomic.StoreUint64(&s.stats.LastMergedEpoch, ourSnapshot.epoch)
@@ -90,6 +115,8 @@ OUTER:
case <-s.closeCh:
break OUTER
case s.persisterNotifier <- ew:
+ case ctrlMsg = <-s.forceMergeRequestCh:
+ continue OUTER
}
// now wait for persister (but also detect close)
@@ -97,6 +124,7 @@ OUTER:
case <-s.closeCh:
break OUTER
case <-ew.notifyCh:
+ case ctrlMsg = <-s.forceMergeRequestCh:
}
}
@@ -106,6 +134,58 @@ OUTER:
s.asyncTasks.Done()
}
+type mergerCtrl struct {
+ ctx context.Context
+ options *mergeplan.MergePlanOptions
+ doneCh chan struct{}
+}
+
+// ForceMerge helps users trigger a merge operation on
+// an online scorch index.
+func (s *Scorch) ForceMerge(ctx context.Context,
+ mo *mergeplan.MergePlanOptions) error {
+ // check whether force merge is already under processing
+ s.rootLock.Lock()
+ if s.stats.TotFileMergeForceOpsStarted >
+ s.stats.TotFileMergeForceOpsCompleted {
+ s.rootLock.Unlock()
+ return fmt.Errorf("force merge already in progress")
+ }
+
+ s.stats.TotFileMergeForceOpsStarted++
+ s.rootLock.Unlock()
+
+ if mo != nil {
+ err := mergeplan.ValidateMergePlannerOptions(mo)
+ if err != nil {
+ return err
+ }
+ } else {
+ // assume the default single segment merge policy
+ mo = &mergeplan.SingleSegmentMergePlanOptions
+ }
+ msg := &mergerCtrl{options: mo,
+ doneCh: make(chan struct{}),
+ ctx: ctx,
+ }
+
+ // request the merger perform a force merge
+ select {
+ case s.forceMergeRequestCh <- msg:
+ case <-s.closeCh:
+ return nil
+ }
+
+ // wait for the force merge operation completion
+ select {
+ case <-msg.doneCh:
+ atomic.AddUint64(&s.stats.TotFileMergeForceOpsCompleted, 1)
+ case <-s.closeCh:
+ }
+
+ return nil
+}
+
func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
error) {
mergePlannerOptions := mergeplan.DefaultMergePlanOptions
@@ -128,8 +208,39 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
return &mergePlannerOptions, nil
}
-func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
- options *mergeplan.MergePlanOptions) error {
+type closeChWrapper struct {
+ ch1 chan struct{}
+ ctx context.Context
+ closeCh chan struct{}
+}
+
+func newCloseChWrapper(ch1 chan struct{},
+ ctx context.Context) *closeChWrapper {
+ return &closeChWrapper{ch1: ch1,
+ ctx: ctx,
+ closeCh: make(chan struct{})}
+}
+
+func (w *closeChWrapper) close() {
+ select {
+ case <-w.closeCh:
+ default:
+ close(w.closeCh)
+ }
+}
+
+func (w *closeChWrapper) listen() {
+ select {
+ case <-w.ch1:
+ w.close()
+ case <-w.ctx.Done():
+ w.close()
+ case <-w.closeCh:
+ }
+}
+
+func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
+ options *mergeplan.MergePlanOptions, ourSnapshot *IndexSnapshot) error {
// build list of persisted segments in this snapshot
var onlyPersistedSnapshots []mergeplan.Segment
for _, segmentSnapshot := range ourSnapshot.segment {
@@ -158,6 +269,11 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
// process tasks in serial for now
var filenames []string
+ cw := newCloseChWrapper(s.closeCh, ctx)
+ defer cw.close()
+
+ go cw.listen()
+
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
@@ -194,8 +310,9 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
var oldNewDocNums map[uint64][]uint64
var seg segment.Segment
+ var filename string
if len(segmentsToMerge) > 0 {
- filename := zapFileName(newSegmentID)
+ filename = zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
@@ -203,7 +320,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
- s.closeCh, s)
+ cw.closeCh, s)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
@@ -240,9 +357,11 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
old: oldMap,
oldNewDocNums: oldNewDocNums,
new: seg,
- notify: make(chan *IndexSnapshot),
+ notifyCh: make(chan *mergeTaskIntroStatus),
}
+ s.fireEvent(EventKindMergeTaskIntroductionStart, 0)
+
// give it to the introducer
select {
case <-s.closeCh:
@@ -255,18 +374,25 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
introStartTime := time.Now()
// it is safe to blockingly wait for the merge introduction
// here as the introducer is bound to handle the notify channel.
- newSnapshot := <-sm.notify
+ introStatus := <-sm.notifyCh
introTime := uint64(time.Since(introStartTime))
atomic.AddUint64(&s.stats.TotFileMergeZapIntroductionTime, introTime)
if atomic.LoadUint64(&s.stats.MaxFileMergeZapIntroductionTime) < introTime {
atomic.StoreUint64(&s.stats.MaxFileMergeZapIntroductionTime, introTime)
}
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
- if newSnapshot != nil {
- _ = newSnapshot.DecRef()
+ if introStatus != nil && introStatus.indexSnapshot != nil {
+ _ = introStatus.indexSnapshot.DecRef()
+ if introStatus.skipped {
+ // close the segment on skipping introduction.
+ s.unmarkIneligibleForRemoval(filename)
+ _ = seg.Close()
+ }
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
+
+ s.fireEvent(EventKindMergeTaskIntroduction, 0)
}
// once all the newly merged segment introductions are done,
@@ -279,12 +405,17 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
return nil
}
+type mergeTaskIntroStatus struct {
+ indexSnapshot *IndexSnapshot
+ skipped bool
+}
+
type segmentMerge struct {
id uint64
old map[uint64]*SegmentSnapshot
oldNewDocNums map[uint64][]uint64
new segment.Segment
- notify chan *IndexSnapshot
+ notifyCh chan *mergeTaskIntroStatus
}
// perform a merging of the given SegmentBase instances into a new,
@@ -334,7 +465,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
old: make(map[uint64]*SegmentSnapshot),
oldNewDocNums: make(map[uint64][]uint64),
new: seg,
- notify: make(chan *IndexSnapshot),
+ notifyCh: make(chan *mergeTaskIntroStatus),
}
for i, idx := range sbsIndexes {
@@ -351,11 +482,20 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
}
// blockingly wait for the introduction to complete
- newSnapshot := <-sm.notify
- if newSnapshot != nil {
+ var newSnapshot *IndexSnapshot
+ introStatus := <-sm.notifyCh
+ if introStatus != nil && introStatus.indexSnapshot != nil {
+ newSnapshot = introStatus.indexSnapshot
atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs)))
atomic.AddUint64(&s.stats.TotMemMergeDone, 1)
+ if introStatus.skipped {
+ // close the segment on skipping introduction.
+ _ = newSnapshot.DecRef()
+ _ = seg.Close()
+ newSnapshot = nil
+ }
}
+
return newSnapshot, newSegmentID, nil
}
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go b/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go
index c2a0d3c644..7523506626 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/mergeplan/merge_plan.go
@@ -134,6 +134,17 @@ var DefaultMergePlanOptions = MergePlanOptions{
ReclaimDeletesWeight: 2.0,
}
+// SingleSegmentMergePlanOptions helps in creating a
+// single segment index.
+var SingleSegmentMergePlanOptions = MergePlanOptions{
+ MaxSegmentsPerTier: 1,
+ MaxSegmentSize: 1 << 30,
+ TierGrowth: 1.0,
+ SegmentsPerMergeTask: 10,
+ FloorSegmentSize: 1 << 30,
+ ReclaimDeletesWeight: 2.0,
+}
+
// -------------------------------------------
func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) {
@@ -173,7 +184,7 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) {
calcBudget = CalcBudget
}
- budgetNumSegments := CalcBudget(eligiblesLiveSize, minLiveSize, o)
+ budgetNumSegments := calcBudget(eligiblesLiveSize, minLiveSize, o)
scoreSegments := o.ScoreSegments
if scoreSegments == nil {
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go b/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go
index b9cb9228af..658354cd71 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/optimize.go
@@ -16,10 +16,10 @@ package scorch
import (
"fmt"
-
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
+ "sync/atomic"
)
var OptimizeConjunction = true
@@ -40,7 +40,7 @@ func (s *IndexSnapshotTermFieldReader) Optimize(kind string,
return s.optimizeDisjunctionUnadorned(octx)
}
- return octx, nil
+ return nil, nil
}
var OptimizeDisjunctionUnadornedMinChildCardinality = uint64(256)
@@ -161,16 +161,8 @@ func (o *OptimizeTFRConjunctionUnadorned) Finish() (rv index.Optimized, err erro
// We use an artificial term and field because the optimized
// termFieldReader can represent multiple terms and fields.
- oTFR := &IndexSnapshotTermFieldReader{
- term: OptimizeTFRConjunctionUnadornedTerm,
- field: OptimizeTFRConjunctionUnadornedField,
- snapshot: o.snapshot,
- iterators: make([]segment.PostingsIterator, len(o.snapshot.segment)),
- segmentOffset: 0,
- includeFreq: false,
- includeNorm: false,
- includeTermVectors: false,
- }
+ oTFR := o.snapshot.unadornedTermFieldReader(
+ OptimizeTFRConjunctionUnadornedTerm, OptimizeTFRConjunctionUnadornedField)
var actualBMs []*roaring.Bitmap // Collected from regular posting lists.
@@ -265,6 +257,7 @@ OUTER:
oTFR.iterators[i] = segment.NewUnadornedPostingsIteratorFromBitmap(bm)
}
+ atomic.AddUint64(&o.snapshot.parent.stats.TotTermSearchersStarted, uint64(1))
return oTFR, nil
}
@@ -277,7 +270,9 @@ OUTER:
func (s *IndexSnapshotTermFieldReader) optimizeDisjunctionUnadorned(
octx index.OptimizableContext) (index.OptimizableContext, error) {
if octx == nil {
- octx = &OptimizeTFRDisjunctionUnadorned{snapshot: s.snapshot}
+ octx = &OptimizeTFRDisjunctionUnadorned{
+ snapshot: s.snapshot,
+ }
}
o, ok := octx.(*OptimizeTFRDisjunctionUnadorned)
@@ -328,27 +323,12 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro
}
}
}
-
- // Heuristic to skip the optimization if all the constituent
- // bitmaps are too small, where the processing & resource
- // overhead to create the OR'ed bitmap outweighs the benefit.
- if cMax < OptimizeDisjunctionUnadornedMinChildCardinality {
- return nil, nil
- }
}
// We use an artificial term and field because the optimized
// termFieldReader can represent multiple terms and fields.
- oTFR := &IndexSnapshotTermFieldReader{
- term: OptimizeTFRDisjunctionUnadornedTerm,
- field: OptimizeTFRDisjunctionUnadornedField,
- snapshot: o.snapshot,
- iterators: make([]segment.PostingsIterator, len(o.snapshot.segment)),
- segmentOffset: 0,
- includeFreq: false,
- includeNorm: false,
- includeTermVectors: false,
- }
+ oTFR := o.snapshot.unadornedTermFieldReader(
+ OptimizeTFRDisjunctionUnadornedTerm, OptimizeTFRDisjunctionUnadornedField)
var docNums []uint32 // Collected docNum's from 1-hit posting lists.
var actualBMs []*roaring.Bitmap // Collected from regular posting lists.
@@ -392,5 +372,25 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro
oTFR.iterators[i] = segment.NewUnadornedPostingsIteratorFromBitmap(bm)
}
+ atomic.AddUint64(&o.snapshot.parent.stats.TotTermSearchersStarted, uint64(1))
return oTFR, nil
}
+
+// ----------------------------------------------------------------
+
+func (i *IndexSnapshot) unadornedTermFieldReader(
+ term []byte, field string) *IndexSnapshotTermFieldReader {
+ // This IndexSnapshotTermFieldReader will not be recycled, more
+ // conversation here: https://github.com/blevesearch/bleve/pull/1438
+ return &IndexSnapshotTermFieldReader{
+ term: term,
+ field: field,
+ snapshot: i,
+ iterators: make([]segment.PostingsIterator, len(i.segment)),
+ segmentOffset: 0,
+ includeFreq: false,
+ includeNorm: false,
+ includeTermVectors: false,
+ recycle: false,
+ }
+}
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go
index 30e75df77f..498378a4f8 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go
@@ -256,7 +256,7 @@ func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64,
// for sufficient in-memory segments to pile up for the next
// memory merge cum persist loop.
if numFilesOnDisk < uint64(po.PersisterNapUnderNumFiles) &&
- po.PersisterNapTimeMSec > 0 && s.paused() == 0 {
+ po.PersisterNapTimeMSec > 0 && s.NumEventsBlocking() == 0 {
select {
case <-s.closeCh:
case <-time.After(time.Millisecond * time.Duration(po.PersisterNapTimeMSec)):
@@ -333,7 +333,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot,
// Perform in-memory segment merging only when the memory pressure is
// below the configured threshold, else the persister performs the
// direct persistence of segments.
- if s.paused() < po.MemoryPressurePauseThreshold {
+ if s.NumEventsBlocking() < po.MemoryPressurePauseThreshold {
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
if err != nil {
return err
@@ -428,55 +428,44 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
return true, nil
}
-func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
- // start a write transaction
- tx, err := s.rootBolt.Begin(true)
- if err != nil {
- return err
- }
- // defer rollback on error
- defer func() {
- if err != nil {
- _ = tx.Rollback()
- }
- }()
-
+func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
+ segPlugin segment.Plugin) ([]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
- return err
+ return nil, nil, err
}
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
if err != nil {
- return err
+ return nil, nil, err
}
// persist meta values
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(boltMetaDataKey)
if err != nil {
- return err
+ return nil, nil, err
}
- err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(s.segPlugin.Type()))
+ err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(segPlugin.Type()))
if err != nil {
- return err
+ return nil, nil, err
}
buf := make([]byte, binary.MaxVarintLen32)
- binary.BigEndian.PutUint32(buf, s.segPlugin.Version())
+ binary.BigEndian.PutUint32(buf, segPlugin.Version())
err = metaBucket.Put(boltMetaDataSegmentVersionKey, buf)
if err != nil {
- return err
+ return nil, nil, err
}
// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
if err != nil {
- return err
+ return nil, nil, err
}
// TODO optimize writing these in order?
for k, v := range snapshot.internal {
err = internalBucket.Put([]byte(k), v)
if err != nil {
- return err
+ return nil, nil, err
}
}
@@ -488,49 +477,69 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err != nil {
- return err
+ return nil, nil, err
}
switch seg := segmentSnapshot.segment.(type) {
case segment.PersistedSegment:
- path := seg.Path()
- filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
+ segPath := seg.Path()
+ filename := strings.TrimPrefix(segPath, path+string(os.PathSeparator))
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
- return err
+ return nil, nil, err
}
filenames = append(filenames, filename)
case segment.UnpersistedSegment:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
- path := s.path + string(os.PathSeparator) + filename
+ path := path + string(os.PathSeparator) + filename
err = seg.Persist(path)
if err != nil {
- return fmt.Errorf("error persisting segment: %v", err)
+ return nil, nil, fmt.Errorf("error persisting segment: %v", err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
- return err
+ return nil, nil, err
}
filenames = append(filenames, filename)
-
default:
- return fmt.Errorf("unknown segment type: %T", seg)
+ return nil, nil, fmt.Errorf("unknown segment type: %T", seg)
}
// store current deleted bits
var roaringBuf bytes.Buffer
if segmentSnapshot.deleted != nil {
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
if err != nil {
- return fmt.Errorf("error persisting roaring bytes: %v", err)
+ return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err)
}
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
if err != nil {
- return err
+ return nil, nil, err
}
}
}
+ return filenames, newSegmentPaths, nil
+}
+
+func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
+ // start a write transaction
+ tx, err := s.rootBolt.Begin(true)
+ if err != nil {
+ return err
+ }
+ // defer rollback on error
+ defer func() {
+ if err != nil {
+ _ = tx.Rollback()
+ }
+ }()
+
+ filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin)
+ if err != nil {
+ return err
+ }
+
// we need to swap in a new root only when we've persisted 1 or
// more segments -- whereby the new root would have 1-for-1
// replacements of in-memory segments with file-based segments
@@ -780,12 +789,6 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
return rv, nil
}
-type uint64Descending []uint64
-
-func (p uint64Descending) Len() int { return len(p) }
-func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
-func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
-
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go b/vendor/github.com/blevesearch/bleve/index/scorch/rollback.go
index 7cc87bdea0..7cc87bdea0 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_rollback.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/rollback.go
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go
index 80f9e3a797..ba98a460d3 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go
@@ -73,9 +73,7 @@ type Scorch struct {
onEvent func(event Event)
onAsyncError func(err error)
- pauseLock sync.RWMutex
-
- pauseCount uint64
+ forceMergeRequestCh chan *mergerCtrl
segPlugin segment.Plugin
}
@@ -101,18 +99,15 @@ func NewScorch(storeName string,
nextSnapshotEpoch: 1,
closeCh: make(chan struct{}),
ineligibleForRemoval: map[string]bool{},
+ forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,
}
- // check if the caller has requested a specific segment type/version
- forcedSegmentVersion, ok := config["forceSegmentVersion"].(int)
- if ok {
- forcedSegmentType, ok2 := config["forceSegmentType"].(string)
- if !ok2 {
- return nil, fmt.Errorf(
- "forceSegmentVersion set to %d, must also specify forceSegmentType", forcedSegmentVersion)
- }
-
+ forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
+ if err != nil {
+ return nil, err
+ }
+ if forcedSegmentType != "" && forcedSegmentVersion != 0 {
err := rv.loadSegmentPlugin(forcedSegmentType,
uint32(forcedSegmentVersion))
if err != nil {
@@ -140,30 +135,34 @@ func NewScorch(storeName string,
return rv, nil
}
-func (s *Scorch) paused() uint64 {
- s.pauseLock.Lock()
- pc := s.pauseCount
- s.pauseLock.Unlock()
- return pc
-}
+// configForceSegmentTypeVersion checks if the caller has requested a
+// specific segment type/version
+func configForceSegmentTypeVersion(config map[string]interface{}) (string, uint32, error) {
+ forcedSegmentVersion, err := parseToInteger(config["forceSegmentVersion"])
+ if err != nil {
+ return "", 0, nil
+ }
-func (s *Scorch) incrPause() {
- s.pauseLock.Lock()
- s.pauseCount++
- s.pauseLock.Unlock()
+ forcedSegmentType, ok := config["forceSegmentType"].(string)
+ if !ok {
+ return "", 0, fmt.Errorf(
+ "forceSegmentVersion set to %d, must also specify forceSegmentType", forcedSegmentVersion)
+ }
+
+ return forcedSegmentType, uint32(forcedSegmentVersion), nil
}
-func (s *Scorch) decrPause() {
- s.pauseLock.Lock()
- s.pauseCount--
- s.pauseLock.Unlock()
+func (s *Scorch) NumEventsBlocking() uint64 {
+ eventsCompleted := atomic.LoadUint64(&s.stats.TotEventTriggerCompleted)
+ eventsStarted := atomic.LoadUint64(&s.stats.TotEventTriggerStarted)
+ return eventsStarted - eventsCompleted
}
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
if s.onEvent != nil {
- s.incrPause()
+ atomic.AddUint64(&s.stats.TotEventTriggerStarted, 1)
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
- s.decrPause()
+ atomic.AddUint64(&s.stats.TotEventTriggerCompleted, 1)
}
}
@@ -181,7 +180,7 @@ func (s *Scorch) Open() error {
}
s.asyncTasks.Add(1)
- go s.mainLoop()
+ go s.introducerLoop()
if !s.readOnly && s.path != "" {
s.asyncTasks.Add(1)
@@ -241,6 +240,7 @@ func (s *Scorch) openBolt() error {
s.introducerNotifier = make(chan *epochWatcher, 1)
s.persisterNotifier = make(chan *epochWatcher, 1)
s.closeCh = make(chan struct{})
+ s.forceMergeRequestCh = make(chan *mergerCtrl, 1)
if !s.readOnly && s.path != "" {
err := s.removeOldZapFiles() // Before persister or merger create any new files.
@@ -567,6 +567,10 @@ func (s *Scorch) StatsMap() map[string]interface{} {
}
func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult {
+ return analyze(d)
+}
+
+func analyze(d *document.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)),
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go
index 9a4d6c76c9..db06562df1 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/unadorned.go
@@ -24,7 +24,6 @@ var reflectStaticSizeUnadornedPostingsIteratorBitmap int
var reflectStaticSizeUnadornedPostingsIterator1Hit int
var reflectStaticSizeUnadornedPosting int
-
func init() {
var pib UnadornedPostingsIteratorBitmap
reflectStaticSizeUnadornedPostingsIteratorBitmap = int(reflect.TypeOf(pib).Size())
@@ -34,7 +33,7 @@ func init() {
reflectStaticSizeUnadornedPosting = int(reflect.TypeOf(up).Size())
}
-type UnadornedPostingsIteratorBitmap struct{
+type UnadornedPostingsIteratorBitmap struct {
actual roaring.IntPeekable
actualBM *roaring.Bitmap
}
@@ -72,16 +71,29 @@ func (i *UnadornedPostingsIteratorBitmap) Size() int {
return reflectStaticSizeUnadornedPostingsIteratorBitmap
}
+func (i *UnadornedPostingsIteratorBitmap) ActualBitmap() *roaring.Bitmap {
+ return i.actualBM
+}
+
+func (i *UnadornedPostingsIteratorBitmap) DocNum1Hit() (uint64, bool) {
+ return 0, false
+}
+
+func (i *UnadornedPostingsIteratorBitmap) ReplaceActual(actual *roaring.Bitmap) {
+ i.actualBM = actual
+ i.actual = actual.Iterator()
+}
+
func NewUnadornedPostingsIteratorFromBitmap(bm *roaring.Bitmap) PostingsIterator {
return &UnadornedPostingsIteratorBitmap{
actualBM: bm,
- actual: bm.Iterator(),
+ actual: bm.Iterator(),
}
}
const docNum1HitFinished = math.MaxUint64
-type UnadornedPostingsIterator1Hit struct{
+type UnadornedPostingsIterator1Hit struct {
docNum uint64
}
@@ -145,4 +157,4 @@ func (p UnadornedPosting) Locations() []Location {
func (p UnadornedPosting) Size() int {
return reflectStaticSizeUnadornedPosting
-} \ No newline at end of file
+}
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go
index 01eda7fbd5..b830b2c052 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment_plugin.go
@@ -21,6 +21,8 @@ import (
zapv11 "github.com/blevesearch/zap/v11"
zapv12 "github.com/blevesearch/zap/v12"
+ zapv13 "github.com/blevesearch/zap/v13"
+ zapv14 "github.com/blevesearch/zap/v14"
)
var supportedSegmentPlugins map[string]map[uint32]segment.Plugin
@@ -28,6 +30,8 @@ var defaultSegmentPlugin segment.Plugin
func init() {
ResetPlugins()
+ RegisterPlugin(zapv14.Plugin(), false)
+ RegisterPlugin(zapv13.Plugin(), false)
RegisterPlugin(zapv12.Plugin(), false)
RegisterPlugin(zapv11.Plugin(), true)
}
@@ -60,18 +64,28 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) {
return rv
}
-func (s *Scorch) loadSegmentPlugin(forcedSegmentType string,
- forcedSegmentVersion uint32) error {
+func chooseSegmentPlugin(forcedSegmentType string,
+ forcedSegmentVersion uint32) (segment.Plugin, error) {
if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok {
if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok {
- s.segPlugin = segPlugin
- return nil
+ return segPlugin, nil
}
- return fmt.Errorf(
+ return nil, fmt.Errorf(
"unsupported version %d for segment type: %s, supported: %v",
forcedSegmentVersion, forcedSegmentType,
SupportedSegmentTypeVersions(forcedSegmentType))
}
- return fmt.Errorf("unsupported segment type: %s, supported: %v",
+ return nil, fmt.Errorf("unsupported segment type: %s, supported: %v",
forcedSegmentType, SupportedSegmentTypes())
}
+
+func (s *Scorch) loadSegmentPlugin(forcedSegmentType string,
+ forcedSegmentVersion uint32) error {
+ segPlugin, err := chooseSegmentPlugin(forcedSegmentType,
+ forcedSegmentVersion)
+ if err != nil {
+ return err
+ }
+ s.segPlugin = segPlugin
+ return nil
+}
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go
index 47cc809b21..9d17bcb2c5 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go
@@ -303,9 +303,12 @@ func (i *IndexSnapshot) newDocIDReader(results chan *asynchSegmentResult) (index
var err error
for count := 0; count < len(i.segment); count++ {
asr := <-results
- if asr.err != nil && err != nil {
- err = asr.err
- } else {
+ if asr.err != nil {
+ if err == nil {
+ // returns the first error encountered
+ err = asr.err
+ }
+ } else if err == nil {
rv.iterators[asr.index] = asr.docs.Iterator()
}
}
@@ -511,10 +514,20 @@ func (i *IndexSnapshot) allocTermFieldReaderDicts(field string) (tfr *IndexSnaps
}
}
i.m2.Unlock()
- return &IndexSnapshotTermFieldReader{}
+ return &IndexSnapshotTermFieldReader{
+ recycle: true,
+ }
}
func (i *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) {
+ if !tfr.recycle {
+ // Do not recycle an optimized unadorned term field reader (used for
+ // ConjunctionUnadorned or DisjunctionUnadorned), during when a fresh
+ // roaring.Bitmap is built by AND-ing or OR-ing individual bitmaps,
+ // and we'll need to release them for GC. (See MB-40916)
+ return
+ }
+
i.parent.rootLock.RLock()
obsolete := i.parent.root != i
i.parent.rootLock.RUnlock()
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go
index 5d56f19441..239f68fbe7 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index_tfr.go
@@ -45,6 +45,7 @@ type IndexSnapshotTermFieldReader struct {
includeTermVectors bool
currPosting segment.Posting
currID index.IndexInternalID
+ recycle bool
}
func (i *IndexSnapshotTermFieldReader) Size() int {
@@ -133,6 +134,8 @@ func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAllo
if err != nil {
return nil, err
}
+ // close the current term field reader before replacing it with a new one
+ _ = i.Close()
*i = *(i2.(*IndexSnapshotTermFieldReader))
}
num, err := docInternalToNumber(ID)
diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/stats.go b/vendor/github.com/blevesearch/bleve/index/scorch/stats.go
index e638362a71..626fff2e47 100644
--- a/vendor/github.com/blevesearch/bleve/index/scorch/stats.go
+++ b/vendor/github.com/blevesearch/bleve/index/scorch/stats.go
@@ -47,6 +47,9 @@ type Stats struct {
TotTermSearchersStarted uint64
TotTermSearchersFinished uint64
+ TotEventTriggerStarted uint64
+ TotEventTriggerCompleted uint64
+
TotIntroduceLoop uint64
TotIntroduceSegmentBeg uint64
TotIntroduceSegmentEnd uint64
@@ -82,6 +85,9 @@ type Stats struct {
TotFileMergeLoopErr uint64
TotFileMergeLoopEnd uint64
+ TotFileMergeForceOpsStarted uint64
+ TotFileMergeForceOpsCompleted uint64
+
TotFileMergePlan uint64
TotFileMergePlanErr uint64
TotFileMergePlanNone uint64
@@ -105,9 +111,10 @@ type Stats struct {
TotFileMergeZapIntroductionTime uint64
MaxFileMergeZapIntroductionTime uint64
- TotFileMergeIntroductions uint64
- TotFileMergeIntroductionsDone uint64
- TotFileMergeIntroductionsSkipped uint64
+ TotFileMergeIntroductions uint64
+ TotFileMergeIntroductionsDone uint64
+ TotFileMergeIntroductionsSkipped uint64
+ TotFileMergeIntroductionsObsoleted uint64
CurFilesIneligibleForRemoval uint64
TotSnapshotsRemovedFromMetaStore uint64
diff --git a/vendor/github.com/blevesearch/bleve/index_alias_impl.go b/vendor/github.com/blevesearch/bleve/index_alias_impl.go
index 4366fc7956..5aa57d8ac8 100644
--- a/vendor/github.com/blevesearch/bleve/index_alias_impl.go
+++ b/vendor/github.com/blevesearch/bleve/index_alias_impl.go
@@ -16,7 +16,6 @@ package bleve
import (
"context"
- "sort"
"sync"
"time"
@@ -44,6 +43,16 @@ func NewIndexAlias(indexes ...Index) *indexAliasImpl {
}
}
+// VisitIndexes invokes the visit callback on every
+// indexes included in the index alias.
+func (i *indexAliasImpl) VisitIndexes(visit func(Index)) {
+ i.mutex.RLock()
+ for _, idx := range i.indexes {
+ visit(idx)
+ }
+ i.mutex.RUnlock()
+}
+
func (i *indexAliasImpl) isAliasToSingleIndex() error {
if len(i.indexes) < 1 {
return ErrorAliasEmpty
@@ -511,10 +520,11 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
}
}
+ sortFunc := req.SortFunc()
// sort all hits with the requested order
if len(req.Sort) > 0 {
sorter := newSearchHitSorter(req.Sort, sr.Hits)
- sort.Sort(sorter)
+ sortFunc(sorter)
}
// now skip over the correct From
@@ -539,7 +549,7 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
req.Sort.Reverse()
// resort using the original order
mhs := newSearchHitSorter(req.Sort, sr.Hits)
- sort.Sort(mhs)
+ sortFunc(mhs)
// reset request
req.SearchBefore = req.SearchAfter
req.SearchAfter = nil
diff --git a/vendor/github.com/blevesearch/bleve/index_impl.go b/vendor/github.com/blevesearch/bleve/index_impl.go
index 6324d960eb..629cc9b2ff 100644
--- a/vendor/github.com/blevesearch/bleve/index_impl.go
+++ b/vendor/github.com/blevesearch/bleve/index_impl.go
@@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"
"os"
- "sort"
"sync"
"sync/atomic"
"time"
@@ -579,7 +578,7 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
req.Sort.Reverse()
// resort using the original order
mhs := newSearchHitSorter(req.Sort, hits)
- sort.Sort(mhs)
+ req.SortFunc()(mhs)
// reset request
req.SearchBefore = req.SearchAfter
req.SearchAfter = nil
diff --git a/vendor/github.com/blevesearch/bleve/mapping/document.go b/vendor/github.com/blevesearch/bleve/mapping/document.go
index 15cb6b5fa1..355a602e55 100644
--- a/vendor/github.com/blevesearch/bleve/mapping/document.go
+++ b/vendor/github.com/blevesearch/bleve/mapping/document.go
@@ -251,7 +251,6 @@ func (dm *DocumentMapping) AddFieldMapping(fm *FieldMapping) {
// UnmarshalJSON offers custom unmarshaling with optional strict validation
func (dm *DocumentMapping) UnmarshalJSON(data []byte) error {
-
var tmp map[string]json.RawMessage
err := json.Unmarshal(data, &tmp)
if err != nil {
@@ -308,8 +307,8 @@ func (dm *DocumentMapping) UnmarshalJSON(data []byte) error {
}
func (dm *DocumentMapping) defaultAnalyzerName(path []string) string {
- rv := ""
current := dm
+ rv := current.DefaultAnalyzer
for _, pathElement := range path {
var ok bool
current, ok = current.Properties[pathElement]
diff --git a/vendor/github.com/blevesearch/bleve/mapping/index.go b/vendor/github.com/blevesearch/bleve/mapping/index.go
index 602764cbbf..21ca5cce36 100644
--- a/vendor/github.com/blevesearch/bleve/mapping/index.go
+++ b/vendor/github.com/blevesearch/bleve/mapping/index.go
@@ -101,26 +101,26 @@ func (im *IndexMappingImpl) AddCustomTokenFilter(name string, config map[string]
// returned analyzer is registered in the IndexMapping.
//
// bleve comes with predefined analyzers, like
-// github.com/blevesearch/bleve/analysis/analyzers/custom_analyzer. They are
+// github.com/blevesearch/bleve/analysis/analyzer/custom. They are
// available only if their package is imported by client code. To achieve this,
// use their metadata to fill configuration entries:
//
// import (
-// "github.com/blevesearch/bleve/analysis/analyzers/custom_analyzer"
-// "github.com/blevesearch/bleve/analysis/char_filters/html_char_filter"
-// "github.com/blevesearch/bleve/analysis/token_filters/lower_case_filter"
-// "github.com/blevesearch/bleve/analysis/tokenizers/unicode"
+// "github.com/blevesearch/bleve/analysis/analyzer/custom"
+// "github.com/blevesearch/bleve/analysis/char/html"
+// "github.com/blevesearch/bleve/analysis/token/lowercase"
+// "github.com/blevesearch/bleve/analysis/tokenizer/unicode"
// )
//
// m := bleve.NewIndexMapping()
// err := m.AddCustomAnalyzer("html", map[string]interface{}{
-// "type": custom_analyzer.Name,
+// "type": custom.Name,
// "char_filters": []string{
-// html_char_filter.Name,
+// html.Name,
// },
// "tokenizer": unicode.Name,
// "token_filters": []string{
-// lower_case_filter.Name,
+// lowercase.Name,
// ...
// },
// })
diff --git a/vendor/github.com/blevesearch/bleve/search.go b/vendor/github.com/blevesearch/bleve/search.go
index b337edc9e4..f67450779a 100644
--- a/vendor/github.com/blevesearch/bleve/search.go
+++ b/vendor/github.com/blevesearch/bleve/search.go
@@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"reflect"
+ "sort"
"time"
"github.com/blevesearch/bleve/analysis"
@@ -264,6 +265,7 @@ func (h *HighlightRequest) AddField(field string) {
// Score controls the kind of scoring performed
// SearchAfter supports deep paging by providing a minimum sort key
// SearchBefore supports deep paging by providing a maximum sort key
+// sortFunc specifies the sort implementation to use for sorting results.
//
// A special field named "*" can be used to return all fields.
type SearchRequest struct {
@@ -279,6 +281,8 @@ type SearchRequest struct {
Score string `json:"score,omitempty"`
SearchAfter []string `json:"search_after"`
SearchBefore []string `json:"search_before"`
+
+ sortFunc func(sort.Interface)
}
func (r *SearchRequest) Validate() error {
@@ -606,3 +610,22 @@ func MemoryNeededForSearchResult(req *SearchRequest) uint64 {
return uint64(estimate)
}
+
+// SetSortFunc sets the sort implementation to use when sorting hits.
+//
+// SearchRequests can specify a custom sort implementation to meet
+// their needs. For instance, by specifying a parallel sort
+// that uses all available cores.
+func (r *SearchRequest) SetSortFunc(s func(sort.Interface)) {
+ r.sortFunc = s
+}
+
+// SortFunc returns the sort implementation to use when sorting hits.
+// Defaults to sort.Sort.
+func (r *SearchRequest) SortFunc() func(data sort.Interface) {
+ if r.sortFunc != nil {
+ return r.sortFunc
+ }
+
+ return sort.Sort
+}
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go
index 6a296b68fa..f47da27c4f 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction.go
@@ -16,7 +16,6 @@ package searcher
import (
"fmt"
-
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/search"
)
@@ -37,6 +36,11 @@ func NewDisjunctionSearcher(indexReader index.IndexReader,
return newDisjunctionSearcher(indexReader, qsearchers, min, options, true)
}
+func optionsDisjunctionOptimizable(options search.SearcherOptions) bool {
+ rv := options.Score == "none" && !options.IncludeTermVectors
+ return rv
+}
+
func newDisjunctionSearcher(indexReader index.IndexReader,
qsearchers []search.Searcher, min float64, options search.SearcherOptions,
limit bool) (search.Searcher, error) {
@@ -44,7 +48,7 @@ func newDisjunctionSearcher(indexReader index.IndexReader,
// do not need extra information like freq-norm's or term vectors
// and the requested min is simple
if len(qsearchers) > 1 && min <= 1 &&
- options.Score == "none" && !options.IncludeTermVectors {
+ optionsDisjunctionOptimizable(options) {
rv, err := optimizeCompositeSearcher("disjunction:unadorned",
indexReader, qsearchers, options)
if err != nil || rv != nil {
@@ -103,7 +107,7 @@ func tooManyClauses(count int) bool {
return false
}
-func tooManyClausesErr(count int) error {
- return fmt.Errorf("TooManyClauses[%d > maxClauseCount, which is set to %d]",
- count, DisjunctionMaxClauseCount)
+func tooManyClausesErr(field string, count int) error {
+ return fmt.Errorf("TooManyClauses over field: `%s` [%d > maxClauseCount,"+
+ " which is set to %d]", field, count, DisjunctionMaxClauseCount)
}
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go
index ec133f1f83..7f0a5a00e3 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_heap.go
@@ -62,7 +62,7 @@ func newDisjunctionHeapSearcher(indexReader index.IndexReader,
limit bool) (
*DisjunctionHeapSearcher, error) {
if limit && tooManyClauses(len(searchers)) {
- return nil, tooManyClausesErr(len(searchers))
+ return nil, tooManyClausesErr("", len(searchers))
}
// build our searcher
@@ -310,7 +310,7 @@ func (s *DisjunctionHeapSearcher) Optimize(kind string, octx index.OptimizableCo
}
}
- return octx, nil
+ return nil, nil
}
// heap impl
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go
index e47f39ad09..dc566ade57 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_disjunction_slice.go
@@ -50,7 +50,7 @@ func newDisjunctionSliceSearcher(indexReader index.IndexReader,
limit bool) (
*DisjunctionSliceSearcher, error) {
if limit && tooManyClauses(len(qsearchers)) {
- return nil, tooManyClausesErr(len(qsearchers))
+ return nil, tooManyClausesErr("", len(qsearchers))
}
// build the downstream searchers
searchers := make(OrderedSearcherList, len(qsearchers))
@@ -294,5 +294,5 @@ func (s *DisjunctionSliceSearcher) Optimize(kind string, octx index.OptimizableC
}
}
- return octx, nil
+ return nil, nil
}
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go
index 8176e59b51..aca8a7d9fa 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_fuzzy.go
@@ -75,7 +75,7 @@ func findFuzzyCandidateTerms(indexReader index.IndexReader, term string,
for err == nil && tfd != nil {
rv = append(rv, tfd.Term)
if tooManyClauses(len(rv)) {
- return nil, tooManyClausesErr(len(rv))
+ return nil, tooManyClausesErr(field, len(rv))
}
tfd, err = fieldDict.Next()
}
@@ -107,7 +107,7 @@ func findFuzzyCandidateTerms(indexReader index.IndexReader, term string,
if !exceeded && ld <= fuzziness {
rv = append(rv, tfd.Term)
if tooManyClauses(len(rv)) {
- return nil, tooManyClausesErr(len(rv))
+ return nil, tooManyClausesErr(field, len(rv))
}
}
tfd, err = fieldDict.Next()
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go
index c4b8af9270..76157f01a1 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_geoboundingbox.go
@@ -24,7 +24,7 @@ import (
type filterFunc func(key []byte) bool
-var GeoBitsShift1 = (geo.GeoBits << 1)
+var GeoBitsShift1 = geo.GeoBits << 1
var GeoBitsShift1Minus1 = GeoBitsShift1 - 1
func NewGeoBoundingBoxSearcher(indexReader index.IndexReader, minLon, minLat,
@@ -100,30 +100,42 @@ func NewGeoBoundingBoxSearcher(indexReader index.IndexReader, minLon, minLat,
var geoMaxShift = document.GeoPrecisionStep * 4
var geoDetailLevel = ((geo.GeoBits << 1) - geoMaxShift) / 2
+type closeFunc func() error
func ComputeGeoRange(term uint64, shift uint,
sminLon, sminLat, smaxLon, smaxLat float64, checkBoundaries bool,
indexReader index.IndexReader, field string) (
onBoundary [][]byte, notOnBoundary [][]byte, err error) {
- preallocBytesLen := 32
- preallocBytes := make([]byte, preallocBytesLen)
- makePrefixCoded := func(in int64, shift uint) (rv numeric.PrefixCoded) {
- if len(preallocBytes) <= 0 {
- preallocBytesLen = preallocBytesLen * 2
- preallocBytes = make([]byte, preallocBytesLen)
- }
-
- rv, preallocBytes, err =
- numeric.NewPrefixCodedInt64Prealloc(in, shift, preallocBytes)
+ isIndexed, closeF, err := buildIsIndexedFunc(indexReader, field)
+ if closeF != nil {
+ defer func() {
+ cerr := closeF()
+ if cerr != nil {
+ err = cerr
+ }
+ }()
+ }
- return rv
+ grc := &geoRangeCompute{
+ preallocBytesLen: 32,
+ preallocBytes: make([]byte, 32),
+ sminLon: sminLon,
+ sminLat: sminLat,
+ smaxLon: smaxLon,
+ smaxLat: smaxLat,
+ checkBoundaries: checkBoundaries,
+ isIndexed: isIndexed,
}
- var fieldDict index.FieldDictContains
- var isIndexed filterFunc
+ grc.computeGeoRange(term, shift)
+
+ return grc.onBoundary, grc.notOnBoundary, nil
+}
+
+func buildIsIndexedFunc(indexReader index.IndexReader, field string) (isIndexed filterFunc, closeF closeFunc, err error) {
if irr, ok := indexReader.(index.IndexReaderContains); ok {
- fieldDict, err = irr.FieldDictContains(field)
+ fieldDict, err := irr.FieldDictContains(field)
if err != nil {
return nil, nil, err
}
@@ -132,22 +144,18 @@ func ComputeGeoRange(term uint64, shift uint,
found, err := fieldDict.Contains(term)
return err == nil && found
}
- }
- defer func() {
- if fieldDict != nil {
+ closeF = func() error {
if fd, ok := fieldDict.(index.FieldDict); ok {
- cerr := fd.Close()
- if cerr != nil {
- err = cerr
+ err := fd.Close()
+ if err != nil {
+ return err
}
}
+ return nil
}
- }()
-
- if isIndexed == nil {
+ } else if indexReader != nil {
isIndexed = func(term []byte) bool {
- if indexReader != nil {
reader, err := indexReader.TermFieldReader(term, field, false, false, false)
if err != nil || reader == nil {
return false
@@ -157,68 +165,15 @@ func ComputeGeoRange(term uint64, shift uint,
return false
}
_ = reader.Close()
- }
- return true
+ return true
}
- }
- var computeGeoRange func(term uint64, shift uint) // declare for recursion
-
- relateAndRecurse := func(start, end uint64, res, level uint) {
- minLon := geo.MortonUnhashLon(start)
- minLat := geo.MortonUnhashLat(start)
- maxLon := geo.MortonUnhashLon(end)
- maxLat := geo.MortonUnhashLat(end)
-
- within := res%document.GeoPrecisionStep == 0 &&
- geo.RectWithin(minLon, minLat, maxLon, maxLat,
- sminLon, sminLat, smaxLon, smaxLat)
- if within || (level == geoDetailLevel &&
- geo.RectIntersects(minLon, minLat, maxLon, maxLat,
- sminLon, sminLat, smaxLon, smaxLat)) {
- codedTerm := makePrefixCoded(int64(start), res)
- if isIndexed(codedTerm) {
- if !within && checkBoundaries {
- onBoundary = append(onBoundary, codedTerm)
- } else {
- notOnBoundary = append(notOnBoundary, codedTerm)
- }
- }
- } else if level < geoDetailLevel &&
- geo.RectIntersects(minLon, minLat, maxLon, maxLat,
- sminLon, sminLat, smaxLon, smaxLat) {
- computeGeoRange(start, res-1)
+ } else {
+ isIndexed = func([]byte) bool {
+ return true
}
}
-
- computeGeoRange = func(term uint64, shift uint) {
- if err != nil {
- return
- }
-
- split := term | uint64(0x1)<<shift
- var upperMax uint64
- if shift < 63 {
- upperMax = term | ((uint64(1) << (shift + 1)) - 1)
- } else {
- upperMax = 0xffffffffffffffff
- }
-
- lowerMax := split - 1
-
- level := (GeoBitsShift1 - shift) >> 1
-
- relateAndRecurse(term, lowerMax, shift, level)
- relateAndRecurse(split, upperMax, shift, level)
- }
-
- computeGeoRange(term, shift)
-
- if err != nil {
- return nil, nil, err
- }
-
- return onBoundary, notOnBoundary, err
+ return isIndexed, closeF, err
}
func buildRectFilter(dvReader index.DocValueReader, field string,
@@ -252,3 +207,66 @@ func buildRectFilter(dvReader index.DocValueReader, field string,
return false
}
}
+
+type geoRangeCompute struct {
+ preallocBytesLen int
+ preallocBytes []byte
+ sminLon, sminLat, smaxLon, smaxLat float64
+ checkBoundaries bool
+ onBoundary, notOnBoundary [][]byte
+ isIndexed func(term []byte) bool
+}
+
+func (grc *geoRangeCompute) makePrefixCoded(in int64, shift uint) (rv numeric.PrefixCoded) {
+ if len(grc.preallocBytes) <= 0 {
+ grc.preallocBytesLen = grc.preallocBytesLen * 2
+ grc.preallocBytes = make([]byte, grc.preallocBytesLen)
+ }
+
+ rv, grc.preallocBytes, _ =
+ numeric.NewPrefixCodedInt64Prealloc(in, shift, grc.preallocBytes)
+
+ return rv
+}
+
+func (grc *geoRangeCompute) computeGeoRange(term uint64, shift uint) {
+ split := term | uint64(0x1)<<shift
+ var upperMax uint64
+ if shift < 63 {
+ upperMax = term | ((uint64(1) << (shift + 1)) - 1)
+ } else {
+ upperMax = 0xffffffffffffffff
+ }
+ lowerMax := split - 1
+ grc.relateAndRecurse(term, lowerMax, shift)
+ grc.relateAndRecurse(split, upperMax, shift)
+}
+
+func (grc *geoRangeCompute) relateAndRecurse(start, end uint64, res uint) {
+ minLon := geo.MortonUnhashLon(start)
+ minLat := geo.MortonUnhashLat(start)
+ maxLon := geo.MortonUnhashLon(end)
+ maxLat := geo.MortonUnhashLat(end)
+
+ level := (GeoBitsShift1 - res) >> 1
+
+ within := res%document.GeoPrecisionStep == 0 &&
+ geo.RectWithin(minLon, minLat, maxLon, maxLat,
+ grc.sminLon, grc.sminLat, grc.smaxLon, grc.smaxLat)
+ if within || (level == geoDetailLevel &&
+ geo.RectIntersects(minLon, minLat, maxLon, maxLat,
+ grc.sminLon, grc.sminLat, grc.smaxLon, grc.smaxLat)) {
+ codedTerm := grc.makePrefixCoded(int64(start), res)
+ if grc.isIndexed(codedTerm) {
+ if !within && grc.checkBoundaries {
+ grc.onBoundary = append(grc.onBoundary, codedTerm)
+ } else {
+ grc.notOnBoundary = append(grc.notOnBoundary, codedTerm)
+ }
+ }
+ } else if level < geoDetailLevel &&
+ geo.RectIntersects(minLon, minLat, maxLon, maxLat,
+ grc.sminLon, grc.sminLat, grc.smaxLon, grc.smaxLat) {
+ grc.computeGeoRange(start, res-1)
+ }
+} \ No newline at end of file
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go
index c48366ee27..70a2fa38c0 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_multi_term.go
@@ -15,6 +15,7 @@
package searcher
import (
+ "fmt"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/search"
)
@@ -22,10 +23,113 @@ import (
func NewMultiTermSearcher(indexReader index.IndexReader, terms []string,
field string, boost float64, options search.SearcherOptions, limit bool) (
search.Searcher, error) {
- if limit && tooManyClauses(len(terms)) {
- return nil, tooManyClausesErr(len(terms))
+
+ if tooManyClauses(len(terms)) {
+ if optionsDisjunctionOptimizable(options) {
+ return optimizeMultiTermSearcher(indexReader, terms, field, boost, options)
+ }
+ if limit {
+ return nil, tooManyClausesErr(field, len(terms))
+ }
+ }
+
+ qsearchers, err := makeBatchSearchers(indexReader, terms, field, boost, options)
+ if err != nil {
+ return nil, err
}
+ // build disjunction searcher of these ranges
+ return newMultiTermSearcherInternal(indexReader, qsearchers, field, boost,
+ options, limit)
+}
+
+func NewMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte,
+ field string, boost float64, options search.SearcherOptions, limit bool) (
+ search.Searcher, error) {
+
+ if tooManyClauses(len(terms)) {
+ if optionsDisjunctionOptimizable(options) {
+ return optimizeMultiTermSearcherBytes(indexReader, terms, field, boost, options)
+ }
+
+ if limit {
+ return nil, tooManyClausesErr(field, len(terms))
+ }
+ }
+
+ qsearchers, err := makeBatchSearchersBytes(indexReader, terms, field, boost, options)
+ if err != nil {
+ return nil, err
+ }
+
+ // build disjunction searcher of these ranges
+ return newMultiTermSearcherInternal(indexReader, qsearchers, field, boost,
+ options, limit)
+}
+
+func newMultiTermSearcherInternal(indexReader index.IndexReader,
+ searchers []search.Searcher, field string, boost float64,
+ options search.SearcherOptions, limit bool) (
+ search.Searcher, error) {
+
+ // build disjunction searcher of these ranges
+ searcher, err := newDisjunctionSearcher(indexReader, searchers, 0, options,
+ limit)
+ if err != nil {
+ for _, s := range searchers {
+ _ = s.Close()
+ }
+ return nil, err
+ }
+
+ return searcher, nil
+}
+
+func optimizeMultiTermSearcher(indexReader index.IndexReader, terms []string,
+ field string, boost float64, options search.SearcherOptions) (
+ search.Searcher, error) {
+ var finalSearcher search.Searcher
+ for len(terms) > 0 {
+ var batchTerms []string
+ if len(terms) > DisjunctionMaxClauseCount {
+ batchTerms = terms[:DisjunctionMaxClauseCount]
+ terms = terms[DisjunctionMaxClauseCount:]
+ } else {
+ batchTerms = terms
+ terms = nil
+ }
+ batch, err := makeBatchSearchers(indexReader, batchTerms, field, boost, options)
+ if err != nil {
+ return nil, err
+ }
+ if finalSearcher != nil {
+ batch = append(batch, finalSearcher)
+ }
+ cleanup := func() {
+ for _, searcher := range batch {
+ if searcher != nil {
+ _ = searcher.Close()
+ }
+ }
+ }
+ finalSearcher, err = optimizeCompositeSearcher("disjunction:unadorned",
+ indexReader, batch, options)
+ // all searchers in batch should be closed, regardless of error or optimization failure
+ // either we're returning, or continuing and only finalSearcher is needed for next loop
+ cleanup()
+ if err != nil {
+ return nil, err
+ }
+ if finalSearcher == nil {
+ return nil, fmt.Errorf("unable to optimize")
+ }
+ }
+ return finalSearcher, nil
+}
+
+func makeBatchSearchers(indexReader index.IndexReader, terms []string, field string,
+ boost float64, options search.SearcherOptions) ([]search.Searcher, error) {
+
qsearchers := make([]search.Searcher, len(terms))
qsearchersClose := func() {
for _, searcher := range qsearchers {
@@ -42,17 +146,54 @@ func NewMultiTermSearcher(indexReader index.IndexReader, terms []string,
return nil, err
}
}
- // build disjunction searcher of these ranges
- return newMultiTermSearcherBytes(indexReader, qsearchers, field, boost,
- options, limit)
+ return qsearchers, nil
}
-func NewMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte,
- field string, boost float64, options search.SearcherOptions, limit bool) (
+func optimizeMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte,
+ field string, boost float64, options search.SearcherOptions) (
search.Searcher, error) {
- if limit && tooManyClauses(len(terms)) {
- return nil, tooManyClausesErr(len(terms))
+
+ var finalSearcher search.Searcher
+ for len(terms) > 0 {
+ var batchTerms [][]byte
+ if len(terms) > DisjunctionMaxClauseCount {
+ batchTerms = terms[:DisjunctionMaxClauseCount]
+ terms = terms[DisjunctionMaxClauseCount:]
+ } else {
+ batchTerms = terms
+ terms = nil
+ }
+ batch, err := makeBatchSearchersBytes(indexReader, batchTerms, field, boost, options)
+ if err != nil {
+ return nil, err
+ }
+ if finalSearcher != nil {
+ batch = append(batch, finalSearcher)
+ }
+ cleanup := func() {
+ for _, searcher := range batch {
+ if searcher != nil {
+ _ = searcher.Close()
+ }
+ }
+ }
+ finalSearcher, err = optimizeCompositeSearcher("disjunction:unadorned",
+ indexReader, batch, options)
+ // all searchers in batch should be closed, regardless of error or optimization failure
+ // either we're returning, or continuing and only finalSearcher is needed for next loop
+ cleanup()
+ if err != nil {
+ return nil, err
+ }
+ if finalSearcher == nil {
+ return nil, fmt.Errorf("unable to optimize")
+ }
}
+ return finalSearcher, nil
+}
+
+func makeBatchSearchersBytes(indexReader index.IndexReader, terms [][]byte, field string,
+ boost float64, options search.SearcherOptions) ([]search.Searcher, error) {
qsearchers := make([]search.Searcher, len(terms))
qsearchersClose := func() {
@@ -70,24 +211,5 @@ func NewMultiTermSearcherBytes(indexReader index.IndexReader, terms [][]byte,
return nil, err
}
}
- return newMultiTermSearcherBytes(indexReader, qsearchers, field, boost,
- options, limit)
-}
-
-func newMultiTermSearcherBytes(indexReader index.IndexReader,
- searchers []search.Searcher, field string, boost float64,
- options search.SearcherOptions, limit bool) (
- search.Searcher, error) {
-
- // build disjunction searcher of these ranges
- searcher, err := newDisjunctionSearcher(indexReader, searchers, 0, options,
- limit)
- if err != nil {
- for _, s := range searchers {
- _ = s.Close()
- }
- return nil, err
- }
-
- return searcher, nil
+ return qsearchers, nil
}
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go
index 83107f0201..48d6226e11 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_numeric_range.go
@@ -74,9 +74,8 @@ func NewNumericRangeSearcher(indexReader index.IndexReader,
terms := termRanges.Enumerate(isIndexed)
if fieldDict != nil {
if fd, ok := fieldDict.(index.FieldDict); ok {
- cerr := fd.Close()
- if cerr != nil {
- err = cerr
+ if err = fd.Close(); err != nil {
+ return nil, err
}
}
}
@@ -97,7 +96,7 @@ func NewNumericRangeSearcher(indexReader index.IndexReader,
}
if tooManyClauses(len(terms)) {
- return nil, tooManyClausesErr(len(terms))
+ return nil, tooManyClausesErr(field, len(terms))
}
return NewMultiTermSearcherBytes(indexReader, terms, field, boost, options,
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go
index 4def832c47..11a44f159f 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_regexp.go
@@ -110,7 +110,7 @@ func findRegexpCandidateTerms(indexReader index.IndexReader,
if matchPos != nil && matchPos[0] == 0 && matchPos[1] == len(tfd.Term) {
rv = append(rv, tfd.Term)
if tooManyClauses(len(rv)) {
- return rv, tooManyClausesErr(len(rv))
+ return rv, tooManyClausesErr(field, len(rv))
}
}
tfd, err = fieldDict.Next()
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go
index c1af74c76e..e07d25333a 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_term.go
@@ -137,5 +137,5 @@ func (s *TermSearcher) Optimize(kind string, octx index.OptimizableContext) (
return o.Optimize(kind, octx)
}
- return octx, nil
+ return nil, nil
}
diff --git a/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go b/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go
index b5af4631fe..2a8f22cff1 100644
--- a/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go
+++ b/vendor/github.com/blevesearch/bleve/search/searcher/search_term_prefix.go
@@ -38,7 +38,7 @@ func NewTermPrefixSearcher(indexReader index.IndexReader, prefix string,
for err == nil && tfd != nil {
terms = append(terms, tfd.Term)
if tooManyClauses(len(terms)) {
- return nil, tooManyClausesErr(len(terms))
+ return nil, tooManyClausesErr(field, len(terms))
}
tfd, err = fieldDict.Next()
}
diff --git a/vendor/github.com/blevesearch/bleve/search/sort.go b/vendor/github.com/blevesearch/bleve/search/sort.go
index 6e4ed80fa2..dca422ebd4 100644
--- a/vendor/github.com/blevesearch/bleve/search/sort.go
+++ b/vendor/github.com/blevesearch/bleve/search/sort.go
@@ -233,7 +233,11 @@ func (so SortOrder) Compare(cachedScoring, cachedDesc []bool, i, j *DocumentMatc
} else {
iVal := i.Sort[x]
jVal := j.Sort[x]
- c = strings.Compare(iVal, jVal)
+ if iVal < jVal {
+ c = -1
+ } else if iVal > jVal {
+ c = 1
+ }
}
if c == 0 {
@@ -423,7 +427,8 @@ func (s *SortField) filterTermsByType(terms [][]byte) [][]byte {
allTermsPrefixCoded = false
}
}
- if allTermsPrefixCoded {
+ // reset the terms only when valid zero shift terms are found.
+ if allTermsPrefixCoded && len(termsWithShiftZero) > 0 {
terms = termsWithShiftZero
s.tmp = termsWithShiftZero[:0]
}