diff options
Diffstat (limited to 'vendor/github.com/ethantkoenig/rupture/sharded_index.go')
-rw-r--r-- | vendor/github.com/ethantkoenig/rupture/sharded_index.go | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/vendor/github.com/ethantkoenig/rupture/sharded_index.go b/vendor/github.com/ethantkoenig/rupture/sharded_index.go new file mode 100644 index 0000000000..8e4cb9338c --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/sharded_index.go @@ -0,0 +1,146 @@ +package rupture + +import ( + "fmt" + "hash/fnv" + "path/filepath" + "strconv" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/mapping" +) + +// ShardedIndex an index that is built onto of multiple underlying bleve +// indices (i.e. shards). Similar to bleve's index aliases, some methods may +// not be supported. +type ShardedIndex interface { + bleve.Index + shards() []bleve.Index +} + +// a type alias for bleve.Index, so that the anonymous field of +// shardedIndex does not conflict with the Index(..) method. +type bleveIndex bleve.Index + +type shardedIndex struct { + bleveIndex + indices []bleve.Index +} + +func hash(id string, n int) uint64 { + fnvHash := fnv.New64() + fnvHash.Write([]byte(id)) + return fnvHash.Sum64() % uint64(n) +} + +func childIndexerPath(rootPath string, i int) string { + return filepath.Join(rootPath, strconv.Itoa(i)) +} + +// NewShardedIndex creates a sharded index at the specified path, with the +// specified mapping and number of shards. +func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) { + if numShards <= 0 { + return nil, fmt.Errorf("Invalid number of shards: %d", numShards) + } + err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards}) + if err != nil { + return nil, err + } + + s := &shardedIndex{ + indices: make([]bleve.Index, numShards), + } + for i := 0; i < numShards; i++ { + s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping) + if err != nil { + return nil, err + } + } + s.bleveIndex = bleve.NewIndexAlias(s.indices...) + return s, nil +} + +// OpenShardedIndex opens a sharded index at the specified path. +func OpenShardedIndex(path string) (ShardedIndex, error) { + var meta shardedIndexMetadata + var err error + if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil { + return nil, err + } + + s := &shardedIndex{ + indices: make([]bleve.Index, meta.NumShards), + } + for i := 0; i < meta.NumShards; i++ { + s.indices[i], err = bleve.Open(childIndexerPath(path, i)) + if err != nil { + return nil, err + } + } + s.bleveIndex = bleve.NewIndexAlias(s.indices...) + return s, nil +} + +func (s *shardedIndex) Index(id string, data interface{}) error { + return s.indices[hash(id, len(s.indices))].Index(id, data) +} + +func (s *shardedIndex) Delete(id string) error { + return s.indices[hash(id, len(s.indices))].Delete(id) +} + +func (s *shardedIndex) Document(id string) (*document.Document, error) { + return s.indices[hash(id, len(s.indices))].Document(id) +} + +func (s *shardedIndex) Close() error { + if err := s.bleveIndex.Close(); err != nil { + return err + } + for _, index := range s.indices { + if err := index.Close(); err != nil { + return err + } + } + return nil +} + +func (s *shardedIndex) shards() []bleve.Index { + return s.indices +} + +type shardedIndexFlushingBatch struct { + batches []*singleIndexFlushingBatch +} + +// NewShardedFlushingBatch creates a flushing batch with the specified batch +// size for the specified sharded index. +func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch { + indices := index.shards() + b := &shardedIndexFlushingBatch{ + batches: make([]*singleIndexFlushingBatch, len(indices)), + } + for i, index := range indices { + b.batches[i] = newFlushingBatch(index, maxBatchSize) + } + return b +} + +func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error { + return b.batches[hash(id, len(b.batches))].Index(id, data) +} + +func (b *shardedIndexFlushingBatch) Delete(id string) error { + return b.batches[hash(id, len(b.batches))].Delete(id) +} + +func (b *shardedIndexFlushingBatch) Flush() error { + for _, batch := range b.batches { + if err := batch.Flush(); err != nil { + return err + } + } + return nil +} |