diff options
Diffstat (limited to 'vendor/github.com/blevesearch/zap/v11/docvalues.go')
-rw-r--r-- | vendor/github.com/blevesearch/zap/v11/docvalues.go | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/vendor/github.com/blevesearch/zap/v11/docvalues.go b/vendor/github.com/blevesearch/zap/v11/docvalues.go new file mode 100644 index 0000000000..2566dc6d8c --- /dev/null +++ b/vendor/github.com/blevesearch/zap/v11/docvalues.go @@ -0,0 +1,307 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + "reflect" + "sort" + + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" + "github.com/blevesearch/bleve/size" + "github.com/golang/snappy" +) + +var reflectStaticSizedocValueReader int + +func init() { + var dvi docValueReader + reflectStaticSizedocValueReader = int(reflect.TypeOf(dvi).Size()) +} + +type docNumTermsVisitor func(docNum uint64, terms []byte) error + +type docVisitState struct { + dvrs map[uint16]*docValueReader + segment *SegmentBase +} + +type docValueReader struct { + field string + curChunkNum uint64 + chunkOffsets []uint64 + dvDataLoc uint64 + curChunkHeader []MetaData + curChunkData []byte // compressed data cache + uncompressed []byte // temp buf for snappy decompression +} + +func (di *docValueReader) size() int { + return reflectStaticSizedocValueReader + size.SizeOfPtr + + len(di.field) + + len(di.chunkOffsets)*size.SizeOfUint64 + + len(di.curChunkHeader)*reflectStaticSizeMetaData + + len(di.curChunkData) +} + +func (di *docValueReader) cloneInto(rv *docValueReader) *docValueReader { + if rv == nil { + rv = &docValueReader{} + } + + rv.field = di.field + rv.curChunkNum = math.MaxUint64 + rv.chunkOffsets = di.chunkOffsets // immutable, so it's sharable + rv.dvDataLoc = di.dvDataLoc + rv.curChunkHeader = rv.curChunkHeader[:0] + rv.curChunkData = nil + rv.uncompressed = rv.uncompressed[:0] + + return rv +} + +func (di *docValueReader) curChunkNumber() uint64 { + return di.curChunkNum +} + +func (s *SegmentBase) loadFieldDocValueReader(field string, + fieldDvLocStart, fieldDvLocEnd uint64) (*docValueReader, error) { + // get the docValue offset for the given fields + if fieldDvLocStart == fieldNotUninverted { + // no docValues found, nothing to do + return nil, nil + } + + // read the number of chunks, and chunk offsets position + var numChunks, chunkOffsetsPosition uint64 + + if fieldDvLocEnd-fieldDvLocStart > 16 { + numChunks = binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-8 : fieldDvLocEnd]) + // read the length of chunk offsets + chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8]) + // acquire position of chunk offsets + chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen + } else { + return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart) + } + + fdvIter := &docValueReader{ + curChunkNum: math.MaxUint64, + field: field, + chunkOffsets: make([]uint64, int(numChunks)), + } + + // read the chunk offsets + var offset uint64 + for i := 0; i < int(numChunks); i++ { + loc, read := binary.Uvarint(s.mem[chunkOffsetsPosition+offset : chunkOffsetsPosition+offset+binary.MaxVarintLen64]) + if read <= 0 { + return nil, fmt.Errorf("corrupted chunk offset during segment load") + } + fdvIter.chunkOffsets[i] = loc + offset += uint64(read) + } + + // set the data offset + fdvIter.dvDataLoc = fieldDvLocStart + + return fdvIter, nil +} + +func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error { + // advance to the chunk where the docValues + // reside for the given docNum + destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc + start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets) + if start >= end { + di.curChunkHeader = di.curChunkHeader[:0] + di.curChunkData = nil + di.curChunkNum = chunkNumber + di.uncompressed = di.uncompressed[:0] + return nil + } + + destChunkDataLoc += start + curChunkEnd += end + + // read the number of docs reside in the chunk + numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) + if read <= 0 { + return fmt.Errorf("failed to read the chunk") + } + chunkMetaLoc := destChunkDataLoc + uint64(read) + + offset := uint64(0) + if cap(di.curChunkHeader) < int(numDocs) { + di.curChunkHeader = make([]MetaData, int(numDocs)) + } else { + di.curChunkHeader = di.curChunkHeader[:int(numDocs)] + } + for i := 0; i < int(numDocs); i++ { + di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + offset += uint64(read) + di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + offset += uint64(read) + } + + compressedDataLoc := chunkMetaLoc + offset + dataLength := curChunkEnd - compressedDataLoc + di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] + di.curChunkNum = chunkNumber + di.uncompressed = di.uncompressed[:0] + return nil +} + +func (di *docValueReader) iterateAllDocValues(s *SegmentBase, visitor docNumTermsVisitor) error { + for i := 0; i < len(di.chunkOffsets); i++ { + err := di.loadDvChunk(uint64(i), s) + if err != nil { + return err + } + if di.curChunkData == nil || len(di.curChunkHeader) == 0 { + continue + } + + // uncompress the already loaded data + uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + if err != nil { + return err + } + di.uncompressed = uncompressed + + start := uint64(0) + for _, entry := range di.curChunkHeader { + err = visitor(entry.DocNum, uncompressed[start:entry.DocDvOffset]) + if err != nil { + return err + } + + start = entry.DocDvOffset + } + } + + return nil +} + +func (di *docValueReader) visitDocValues(docNum uint64, + visitor index.DocumentFieldTermVisitor) error { + // binary search the term locations for the docNum + start, end := di.getDocValueLocs(docNum) + if start == math.MaxUint64 || end == math.MaxUint64 || start == end { + return nil + } + + var uncompressed []byte + var err error + // use the uncompressed copy if available + if len(di.uncompressed) > 0 { + uncompressed = di.uncompressed + } else { + // uncompress the already loaded data + uncompressed, err = snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + if err != nil { + return err + } + di.uncompressed = uncompressed + } + + // pick the terms for the given docNum + uncompressed = uncompressed[start:end] + for { + i := bytes.Index(uncompressed, termSeparatorSplitSlice) + if i < 0 { + break + } + + visitor(di.field, uncompressed[0:i]) + uncompressed = uncompressed[i+1:] + } + + return nil +} + +func (di *docValueReader) getDocValueLocs(docNum uint64) (uint64, uint64) { + i := sort.Search(len(di.curChunkHeader), func(i int) bool { + return di.curChunkHeader[i].DocNum >= docNum + }) + if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum { + return ReadDocValueBoundary(i, di.curChunkHeader) + } + return math.MaxUint64, math.MaxUint64 +} + +// VisitDocumentFieldTerms is an implementation of the +// DocumentFieldTermVisitable interface +func (s *SegmentBase) VisitDocumentFieldTerms(localDocNum uint64, fields []string, + visitor index.DocumentFieldTermVisitor, dvsIn segment.DocVisitState) ( + segment.DocVisitState, error) { + dvs, ok := dvsIn.(*docVisitState) + if !ok || dvs == nil { + dvs = &docVisitState{} + } else { + if dvs.segment != s { + dvs.segment = s + dvs.dvrs = nil + } + } + + var fieldIDPlus1 uint16 + if dvs.dvrs == nil { + dvs.dvrs = make(map[uint16]*docValueReader, len(fields)) + for _, field := range fields { + if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { + continue + } + fieldID := fieldIDPlus1 - 1 + if dvIter, exists := s.fieldDvReaders[fieldID]; exists && + dvIter != nil { + dvs.dvrs[fieldID] = dvIter.cloneInto(dvs.dvrs[fieldID]) + } + } + } + + // find the chunkNumber where the docValues are stored + docInChunk := localDocNum / uint64(s.chunkFactor) + var dvr *docValueReader + for _, field := range fields { + if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { + continue + } + fieldID := fieldIDPlus1 - 1 + if dvr, ok = dvs.dvrs[fieldID]; ok && dvr != nil { + // check if the chunk is already loaded + if docInChunk != dvr.curChunkNumber() { + err := dvr.loadDvChunk(docInChunk, s) + if err != nil { + return dvs, err + } + } + + _ = dvr.visitDocValues(localDocNum, visitor) + } + } + return dvs, nil +} + +// VisitableDocValueFields returns the list of fields with +// persisted doc value terms ready to be visitable using the +// VisitDocumentFieldTerms method. +func (s *SegmentBase) VisitableDocValueFields() ([]string, error) { + return s.fieldDvNames, nil +} |