summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/ethantkoenig/rupture/sharded_index.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/ethantkoenig/rupture/sharded_index.go')
-rw-r--r--vendor/github.com/ethantkoenig/rupture/sharded_index.go146
1 files changed, 146 insertions, 0 deletions
diff --git a/vendor/github.com/ethantkoenig/rupture/sharded_index.go b/vendor/github.com/ethantkoenig/rupture/sharded_index.go
new file mode 100644
index 0000000000..8e4cb9338c
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/sharded_index.go
@@ -0,0 +1,146 @@
+package rupture
+
+import (
+ "fmt"
+ "hash/fnv"
+ "path/filepath"
+ "strconv"
+
+ "github.com/blevesearch/bleve"
+ "github.com/blevesearch/bleve/document"
+ "github.com/blevesearch/bleve/mapping"
+)
+
+// ShardedIndex an index that is built onto of multiple underlying bleve
+// indices (i.e. shards). Similar to bleve's index aliases, some methods may
+// not be supported.
+type ShardedIndex interface {
+ bleve.Index
+ shards() []bleve.Index
+}
+
+// a type alias for bleve.Index, so that the anonymous field of
+// shardedIndex does not conflict with the Index(..) method.
+type bleveIndex bleve.Index
+
+type shardedIndex struct {
+ bleveIndex
+ indices []bleve.Index
+}
+
+func hash(id string, n int) uint64 {
+ fnvHash := fnv.New64()
+ fnvHash.Write([]byte(id))
+ return fnvHash.Sum64() % uint64(n)
+}
+
+func childIndexerPath(rootPath string, i int) string {
+ return filepath.Join(rootPath, strconv.Itoa(i))
+}
+
+// NewShardedIndex creates a sharded index at the specified path, with the
+// specified mapping and number of shards.
+func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) {
+ if numShards <= 0 {
+ return nil, fmt.Errorf("Invalid number of shards: %d", numShards)
+ }
+ err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards})
+ if err != nil {
+ return nil, err
+ }
+
+ s := &shardedIndex{
+ indices: make([]bleve.Index, numShards),
+ }
+ for i := 0; i < numShards; i++ {
+ s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping)
+ if err != nil {
+ return nil, err
+ }
+ }
+ s.bleveIndex = bleve.NewIndexAlias(s.indices...)
+ return s, nil
+}
+
+// OpenShardedIndex opens a sharded index at the specified path.
+func OpenShardedIndex(path string) (ShardedIndex, error) {
+ var meta shardedIndexMetadata
+ var err error
+ if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil {
+ return nil, err
+ }
+
+ s := &shardedIndex{
+ indices: make([]bleve.Index, meta.NumShards),
+ }
+ for i := 0; i < meta.NumShards; i++ {
+ s.indices[i], err = bleve.Open(childIndexerPath(path, i))
+ if err != nil {
+ return nil, err
+ }
+ }
+ s.bleveIndex = bleve.NewIndexAlias(s.indices...)
+ return s, nil
+}
+
+func (s *shardedIndex) Index(id string, data interface{}) error {
+ return s.indices[hash(id, len(s.indices))].Index(id, data)
+}
+
+func (s *shardedIndex) Delete(id string) error {
+ return s.indices[hash(id, len(s.indices))].Delete(id)
+}
+
+func (s *shardedIndex) Document(id string) (*document.Document, error) {
+ return s.indices[hash(id, len(s.indices))].Document(id)
+}
+
+func (s *shardedIndex) Close() error {
+ if err := s.bleveIndex.Close(); err != nil {
+ return err
+ }
+ for _, index := range s.indices {
+ if err := index.Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *shardedIndex) shards() []bleve.Index {
+ return s.indices
+}
+
+type shardedIndexFlushingBatch struct {
+ batches []*singleIndexFlushingBatch
+}
+
+// NewShardedFlushingBatch creates a flushing batch with the specified batch
+// size for the specified sharded index.
+func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch {
+ indices := index.shards()
+ b := &shardedIndexFlushingBatch{
+ batches: make([]*singleIndexFlushingBatch, len(indices)),
+ }
+ for i, index := range indices {
+ b.batches[i] = newFlushingBatch(index, maxBatchSize)
+ }
+ return b
+}
+
+func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error {
+ return b.batches[hash(id, len(b.batches))].Index(id, data)
+}
+
+func (b *shardedIndexFlushingBatch) Delete(id string) error {
+ return b.batches[hash(id, len(b.batches))].Delete(id)
+}
+
+func (b *shardedIndexFlushingBatch) Flush() error {
+ for _, batch := range b.batches {
+ if err := batch.Flush(); err != nil {
+ return err
+ }
+ }
+ return nil
+}