summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/ethantkoenig/rupture/flushing_batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/ethantkoenig/rupture/flushing_batch.go')
-rw-r--r--vendor/github.com/ethantkoenig/rupture/flushing_batch.go67
1 files changed, 67 insertions, 0 deletions
diff --git a/vendor/github.com/ethantkoenig/rupture/flushing_batch.go b/vendor/github.com/ethantkoenig/rupture/flushing_batch.go
new file mode 100644
index 0000000000..b4948f674c
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/flushing_batch.go
@@ -0,0 +1,67 @@
+package rupture
+
+import (
+ "github.com/blevesearch/bleve"
+)
+
+// FlushingBatch is a batch of operations that automatically flushes to the
+// underlying index once it reaches a certain size.
+type FlushingBatch interface {
+ // Index adds the specified index operation batch, possibly triggering a
+ // flush.
+ Index(id string, data interface{}) error
+ // Remove adds the specified delete operation to the batch, possibly
+ // triggering a flush.
+ Delete(id string) error
+ // Flush flushes the batch's contents.
+ Flush() error
+}
+
+type singleIndexFlushingBatch struct {
+ maxBatchSize int
+ batch *bleve.Batch
+ index bleve.Index
+}
+
+func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch {
+ return &singleIndexFlushingBatch{
+ maxBatchSize: maxBatchSize,
+ batch: index.NewBatch(),
+ index: index,
+ }
+}
+
+// NewFlushingBatch creates a new flushing batch for the specified index. Once
+// the number of operations in the batch reaches the specified limit, the batch
+// automatically flushes its operations to the index.
+func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch {
+ return newFlushingBatch(index, maxBatchSize)
+}
+
+func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error {
+ if err := b.batch.Index(id, data); err != nil {
+ return err
+ }
+ return b.flushIfFull()
+}
+
+func (b *singleIndexFlushingBatch) Delete(id string) error {
+ b.batch.Delete(id)
+ return b.flushIfFull()
+}
+
+func (b *singleIndexFlushingBatch) flushIfFull() error {
+ if b.batch.Size() < b.maxBatchSize {
+ return nil
+ }
+ return b.Flush()
+}
+
+func (b *singleIndexFlushingBatch) Flush() error {
+ err := b.index.Batch(b.batch)
+ if err != nil {
+ return err
+ }
+ b.batch.Reset()
+ return nil
+}