You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

merge.go 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. // Copyright (c) 2017 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package scorch
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "strings"
  21. "sync/atomic"
  22. "time"
  23. "github.com/RoaringBitmap/roaring"
  24. "github.com/blevesearch/bleve/v2/index/scorch/mergeplan"
  25. segment "github.com/blevesearch/scorch_segment_api/v2"
  26. )
  27. func (s *Scorch) mergerLoop() {
  28. var lastEpochMergePlanned uint64
  29. var ctrlMsg *mergerCtrl
  30. mergePlannerOptions, err := s.parseMergePlannerOptions()
  31. if err != nil {
  32. s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
  33. s.asyncTasks.Done()
  34. return
  35. }
  36. ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
  37. options: mergePlannerOptions,
  38. doneCh: nil}
  39. OUTER:
  40. for {
  41. atomic.AddUint64(&s.stats.TotFileMergeLoopBeg, 1)
  42. select {
  43. case <-s.closeCh:
  44. break OUTER
  45. default:
  46. // check to see if there is a new snapshot to persist
  47. s.rootLock.Lock()
  48. ourSnapshot := s.root
  49. ourSnapshot.AddRef()
  50. atomic.StoreUint64(&s.iStats.mergeSnapshotSize, uint64(ourSnapshot.Size()))
  51. atomic.StoreUint64(&s.iStats.mergeEpoch, ourSnapshot.epoch)
  52. s.rootLock.Unlock()
  53. if ctrlMsg == nil && ourSnapshot.epoch != lastEpochMergePlanned {
  54. ctrlMsg = ctrlMsgDflt
  55. }
  56. if ctrlMsg != nil {
  57. startTime := time.Now()
  58. // lets get started
  59. err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options,
  60. ourSnapshot)
  61. if err != nil {
  62. atomic.StoreUint64(&s.iStats.mergeEpoch, 0)
  63. if err == segment.ErrClosed {
  64. // index has been closed
  65. _ = ourSnapshot.DecRef()
  66. // continue the workloop on a user triggered cancel
  67. if ctrlMsg.doneCh != nil {
  68. close(ctrlMsg.doneCh)
  69. ctrlMsg = nil
  70. continue OUTER
  71. }
  72. // exit the workloop on index closure
  73. ctrlMsg = nil
  74. break OUTER
  75. }
  76. s.fireAsyncError(fmt.Errorf("merging err: %v", err))
  77. _ = ourSnapshot.DecRef()
  78. atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
  79. continue OUTER
  80. }
  81. if ctrlMsg.doneCh != nil {
  82. close(ctrlMsg.doneCh)
  83. }
  84. ctrlMsg = nil
  85. lastEpochMergePlanned = ourSnapshot.epoch
  86. atomic.StoreUint64(&s.stats.LastMergedEpoch, ourSnapshot.epoch)
  87. s.fireEvent(EventKindMergerProgress, time.Since(startTime))
  88. }
  89. _ = ourSnapshot.DecRef()
  90. // tell the persister we're waiting for changes
  91. // first make a epochWatcher chan
  92. ew := &epochWatcher{
  93. epoch: lastEpochMergePlanned,
  94. notifyCh: make(notificationChan, 1),
  95. }
  96. // give it to the persister
  97. select {
  98. case <-s.closeCh:
  99. break OUTER
  100. case s.persisterNotifier <- ew:
  101. case ctrlMsg = <-s.forceMergeRequestCh:
  102. continue OUTER
  103. }
  104. // now wait for persister (but also detect close)
  105. select {
  106. case <-s.closeCh:
  107. break OUTER
  108. case <-ew.notifyCh:
  109. case ctrlMsg = <-s.forceMergeRequestCh:
  110. }
  111. }
  112. atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1)
  113. }
  114. s.asyncTasks.Done()
  115. }
  116. type mergerCtrl struct {
  117. ctx context.Context
  118. options *mergeplan.MergePlanOptions
  119. doneCh chan struct{}
  120. }
  121. // ForceMerge helps users trigger a merge operation on
  122. // an online scorch index.
  123. func (s *Scorch) ForceMerge(ctx context.Context,
  124. mo *mergeplan.MergePlanOptions) error {
  125. // check whether force merge is already under processing
  126. s.rootLock.Lock()
  127. if s.stats.TotFileMergeForceOpsStarted >
  128. s.stats.TotFileMergeForceOpsCompleted {
  129. s.rootLock.Unlock()
  130. return fmt.Errorf("force merge already in progress")
  131. }
  132. s.stats.TotFileMergeForceOpsStarted++
  133. s.rootLock.Unlock()
  134. if mo != nil {
  135. err := mergeplan.ValidateMergePlannerOptions(mo)
  136. if err != nil {
  137. return err
  138. }
  139. } else {
  140. // assume the default single segment merge policy
  141. mo = &mergeplan.SingleSegmentMergePlanOptions
  142. }
  143. msg := &mergerCtrl{options: mo,
  144. doneCh: make(chan struct{}),
  145. ctx: ctx,
  146. }
  147. // request the merger perform a force merge
  148. select {
  149. case s.forceMergeRequestCh <- msg:
  150. case <-s.closeCh:
  151. return nil
  152. }
  153. // wait for the force merge operation completion
  154. select {
  155. case <-msg.doneCh:
  156. atomic.AddUint64(&s.stats.TotFileMergeForceOpsCompleted, 1)
  157. case <-s.closeCh:
  158. }
  159. return nil
  160. }
  161. func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
  162. error) {
  163. mergePlannerOptions := mergeplan.DefaultMergePlanOptions
  164. if v, ok := s.config["scorchMergePlanOptions"]; ok {
  165. b, err := json.Marshal(v)
  166. if err != nil {
  167. return &mergePlannerOptions, err
  168. }
  169. err = json.Unmarshal(b, &mergePlannerOptions)
  170. if err != nil {
  171. return &mergePlannerOptions, err
  172. }
  173. err = mergeplan.ValidateMergePlannerOptions(&mergePlannerOptions)
  174. if err != nil {
  175. return nil, err
  176. }
  177. }
  178. return &mergePlannerOptions, nil
  179. }
  180. type closeChWrapper struct {
  181. ch1 chan struct{}
  182. ctx context.Context
  183. closeCh chan struct{}
  184. }
  185. func newCloseChWrapper(ch1 chan struct{},
  186. ctx context.Context) *closeChWrapper {
  187. return &closeChWrapper{ch1: ch1,
  188. ctx: ctx,
  189. closeCh: make(chan struct{})}
  190. }
  191. func (w *closeChWrapper) close() {
  192. select {
  193. case <-w.closeCh:
  194. default:
  195. close(w.closeCh)
  196. }
  197. }
  198. func (w *closeChWrapper) listen() {
  199. select {
  200. case <-w.ch1:
  201. w.close()
  202. case <-w.ctx.Done():
  203. w.close()
  204. case <-w.closeCh:
  205. }
  206. }
  207. func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
  208. options *mergeplan.MergePlanOptions, ourSnapshot *IndexSnapshot) error {
  209. // build list of persisted segments in this snapshot
  210. var onlyPersistedSnapshots []mergeplan.Segment
  211. for _, segmentSnapshot := range ourSnapshot.segment {
  212. if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok {
  213. onlyPersistedSnapshots = append(onlyPersistedSnapshots, segmentSnapshot)
  214. }
  215. }
  216. atomic.AddUint64(&s.stats.TotFileMergePlan, 1)
  217. // give this list to the planner
  218. resultMergePlan, err := mergeplan.Plan(onlyPersistedSnapshots, options)
  219. if err != nil {
  220. atomic.AddUint64(&s.stats.TotFileMergePlanErr, 1)
  221. return fmt.Errorf("merge planning err: %v", err)
  222. }
  223. if resultMergePlan == nil {
  224. // nothing to do
  225. atomic.AddUint64(&s.stats.TotFileMergePlanNone, 1)
  226. return nil
  227. }
  228. atomic.AddUint64(&s.stats.TotFileMergePlanOk, 1)
  229. atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks)))
  230. // process tasks in serial for now
  231. var filenames []string
  232. cw := newCloseChWrapper(s.closeCh, ctx)
  233. defer cw.close()
  234. go cw.listen()
  235. for _, task := range resultMergePlan.Tasks {
  236. if len(task.Segments) == 0 {
  237. atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
  238. continue
  239. }
  240. atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegments, uint64(len(task.Segments)))
  241. oldMap := make(map[uint64]*SegmentSnapshot)
  242. newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
  243. segmentsToMerge := make([]segment.Segment, 0, len(task.Segments))
  244. docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
  245. for _, planSegment := range task.Segments {
  246. if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
  247. oldMap[segSnapshot.id] = segSnapshot
  248. if persistedSeg, ok := segSnapshot.segment.(segment.PersistedSegment); ok {
  249. if segSnapshot.LiveSize() == 0 {
  250. atomic.AddUint64(&s.stats.TotFileMergeSegmentsEmpty, 1)
  251. oldMap[segSnapshot.id] = nil
  252. } else {
  253. segmentsToMerge = append(segmentsToMerge, segSnapshot.segment)
  254. docsToDrop = append(docsToDrop, segSnapshot.deleted)
  255. }
  256. // track the files getting merged for unsetting the
  257. // removal ineligibility. This helps to unflip files
  258. // even with fast merger, slow persister work flows.
  259. path := persistedSeg.Path()
  260. filenames = append(filenames,
  261. strings.TrimPrefix(path, s.path+string(os.PathSeparator)))
  262. }
  263. }
  264. }
  265. var oldNewDocNums map[uint64][]uint64
  266. var seg segment.Segment
  267. var filename string
  268. if len(segmentsToMerge) > 0 {
  269. filename = zapFileName(newSegmentID)
  270. s.markIneligibleForRemoval(filename)
  271. path := s.path + string(os.PathSeparator) + filename
  272. fileMergeZapStartTime := time.Now()
  273. atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
  274. newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
  275. cw.closeCh, s)
  276. atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
  277. fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
  278. atomic.AddUint64(&s.stats.TotFileMergeZapTime, fileMergeZapTime)
  279. if atomic.LoadUint64(&s.stats.MaxFileMergeZapTime) < fileMergeZapTime {
  280. atomic.StoreUint64(&s.stats.MaxFileMergeZapTime, fileMergeZapTime)
  281. }
  282. if err != nil {
  283. s.unmarkIneligibleForRemoval(filename)
  284. atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
  285. if err == segment.ErrClosed {
  286. return err
  287. }
  288. return fmt.Errorf("merging failed: %v", err)
  289. }
  290. seg, err = s.segPlugin.Open(path)
  291. if err != nil {
  292. s.unmarkIneligibleForRemoval(filename)
  293. atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
  294. return err
  295. }
  296. oldNewDocNums = make(map[uint64][]uint64)
  297. for i, segNewDocNums := range newDocNums {
  298. oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
  299. }
  300. atomic.AddUint64(&s.stats.TotFileMergeSegments, uint64(len(segmentsToMerge)))
  301. }
  302. sm := &segmentMerge{
  303. id: newSegmentID,
  304. old: oldMap,
  305. oldNewDocNums: oldNewDocNums,
  306. new: seg,
  307. notifyCh: make(chan *mergeTaskIntroStatus),
  308. }
  309. s.fireEvent(EventKindMergeTaskIntroductionStart, 0)
  310. // give it to the introducer
  311. select {
  312. case <-s.closeCh:
  313. _ = seg.Close()
  314. return segment.ErrClosed
  315. case s.merges <- sm:
  316. atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1)
  317. }
  318. introStartTime := time.Now()
  319. // it is safe to blockingly wait for the merge introduction
  320. // here as the introducer is bound to handle the notify channel.
  321. introStatus := <-sm.notifyCh
  322. introTime := uint64(time.Since(introStartTime))
  323. atomic.AddUint64(&s.stats.TotFileMergeZapIntroductionTime, introTime)
  324. if atomic.LoadUint64(&s.stats.MaxFileMergeZapIntroductionTime) < introTime {
  325. atomic.StoreUint64(&s.stats.MaxFileMergeZapIntroductionTime, introTime)
  326. }
  327. atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
  328. if introStatus != nil && introStatus.indexSnapshot != nil {
  329. _ = introStatus.indexSnapshot.DecRef()
  330. if introStatus.skipped {
  331. // close the segment on skipping introduction.
  332. s.unmarkIneligibleForRemoval(filename)
  333. _ = seg.Close()
  334. }
  335. }
  336. atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
  337. s.fireEvent(EventKindMergeTaskIntroduction, 0)
  338. }
  339. // once all the newly merged segment introductions are done,
  340. // its safe to unflip the removal ineligibility for the replaced
  341. // older segments
  342. for _, f := range filenames {
  343. s.unmarkIneligibleForRemoval(f)
  344. }
  345. return nil
  346. }
  347. type mergeTaskIntroStatus struct {
  348. indexSnapshot *IndexSnapshot
  349. skipped bool
  350. }
  351. type segmentMerge struct {
  352. id uint64
  353. old map[uint64]*SegmentSnapshot
  354. oldNewDocNums map[uint64][]uint64
  355. new segment.Segment
  356. notifyCh chan *mergeTaskIntroStatus
  357. }
  358. // perform a merging of the given SegmentBase instances into a new,
  359. // persisted segment, and synchronously introduce that new segment
  360. // into the root
  361. func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
  362. sbs []segment.Segment, sbsDrops []*roaring.Bitmap,
  363. sbsIndexes []int) (*IndexSnapshot, uint64, error) {
  364. atomic.AddUint64(&s.stats.TotMemMergeBeg, 1)
  365. memMergeZapStartTime := time.Now()
  366. atomic.AddUint64(&s.stats.TotMemMergeZapBeg, 1)
  367. newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
  368. filename := zapFileName(newSegmentID)
  369. path := s.path + string(os.PathSeparator) + filename
  370. newDocNums, _, err :=
  371. s.segPlugin.Merge(sbs, sbsDrops, path, s.closeCh, s)
  372. atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)
  373. memMergeZapTime := uint64(time.Since(memMergeZapStartTime))
  374. atomic.AddUint64(&s.stats.TotMemMergeZapTime, memMergeZapTime)
  375. if atomic.LoadUint64(&s.stats.MaxMemMergeZapTime) < memMergeZapTime {
  376. atomic.StoreUint64(&s.stats.MaxMemMergeZapTime, memMergeZapTime)
  377. }
  378. if err != nil {
  379. atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
  380. return nil, 0, err
  381. }
  382. seg, err := s.segPlugin.Open(path)
  383. if err != nil {
  384. atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
  385. return nil, 0, err
  386. }
  387. // update persisted stats
  388. atomic.AddUint64(&s.stats.TotPersistedItems, seg.Count())
  389. atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
  390. sm := &segmentMerge{
  391. id: newSegmentID,
  392. old: make(map[uint64]*SegmentSnapshot),
  393. oldNewDocNums: make(map[uint64][]uint64),
  394. new: seg,
  395. notifyCh: make(chan *mergeTaskIntroStatus),
  396. }
  397. for i, idx := range sbsIndexes {
  398. ss := snapshot.segment[idx]
  399. sm.old[ss.id] = ss
  400. sm.oldNewDocNums[ss.id] = newDocNums[i]
  401. }
  402. select { // send to introducer
  403. case <-s.closeCh:
  404. _ = seg.DecRef()
  405. return nil, 0, segment.ErrClosed
  406. case s.merges <- sm:
  407. }
  408. // blockingly wait for the introduction to complete
  409. var newSnapshot *IndexSnapshot
  410. introStatus := <-sm.notifyCh
  411. if introStatus != nil && introStatus.indexSnapshot != nil {
  412. newSnapshot = introStatus.indexSnapshot
  413. atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs)))
  414. atomic.AddUint64(&s.stats.TotMemMergeDone, 1)
  415. if introStatus.skipped {
  416. // close the segment on skipping introduction.
  417. _ = newSnapshot.DecRef()
  418. _ = seg.Close()
  419. newSnapshot = nil
  420. }
  421. }
  422. return newSnapshot, newSegmentID, nil
  423. }
  424. func (s *Scorch) ReportBytesWritten(bytesWritten uint64) {
  425. atomic.AddUint64(&s.stats.TotFileMergeWrittenBytes, bytesWritten)
  426. }