You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

flushing_batch.go 1.7KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package rupture
  2. import (
  3. "github.com/blevesearch/bleve"
  4. )
  5. // FlushingBatch is a batch of operations that automatically flushes to the
  6. // underlying index once it reaches a certain size.
  7. type FlushingBatch interface {
  8. // Index adds the specified index operation batch, possibly triggering a
  9. // flush.
  10. Index(id string, data interface{}) error
  11. // Remove adds the specified delete operation to the batch, possibly
  12. // triggering a flush.
  13. Delete(id string) error
  14. // Flush flushes the batch's contents.
  15. Flush() error
  16. }
  17. type singleIndexFlushingBatch struct {
  18. maxBatchSize int
  19. batch *bleve.Batch
  20. index bleve.Index
  21. }
  22. func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch {
  23. return &singleIndexFlushingBatch{
  24. maxBatchSize: maxBatchSize,
  25. batch: index.NewBatch(),
  26. index: index,
  27. }
  28. }
  29. // NewFlushingBatch creates a new flushing batch for the specified index. Once
  30. // the number of operations in the batch reaches the specified limit, the batch
  31. // automatically flushes its operations to the index.
  32. func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch {
  33. return newFlushingBatch(index, maxBatchSize)
  34. }
  35. func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error {
  36. if err := b.batch.Index(id, data); err != nil {
  37. return err
  38. }
  39. return b.flushIfFull()
  40. }
  41. func (b *singleIndexFlushingBatch) Delete(id string) error {
  42. b.batch.Delete(id)
  43. return b.flushIfFull()
  44. }
  45. func (b *singleIndexFlushingBatch) flushIfFull() error {
  46. if b.batch.Size() < b.maxBatchSize {
  47. return nil
  48. }
  49. return b.Flush()
  50. }
  51. func (b *singleIndexFlushingBatch) Flush() error {
  52. err := b.index.Batch(b.batch)
  53. if err != nil {
  54. return err
  55. }
  56. b.batch.Reset()
  57. return nil
  58. }