You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

introducer.go 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. // Copyright (c) 2017 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package scorch
  15. import (
  16. "fmt"
  17. "sync/atomic"
  18. "github.com/RoaringBitmap/roaring"
  19. "github.com/blevesearch/bleve/index"
  20. "github.com/blevesearch/bleve/index/scorch/segment"
  21. "github.com/blevesearch/bleve/index/scorch/segment/zap"
  22. )
  23. type segmentIntroduction struct {
  24. id uint64
  25. data segment.Segment
  26. obsoletes map[uint64]*roaring.Bitmap
  27. ids []string
  28. internal map[string][]byte
  29. applied chan error
  30. persisted chan error
  31. persistedCallback index.BatchCallback
  32. }
  33. type persistIntroduction struct {
  34. persisted map[uint64]segment.Segment
  35. applied notificationChan
  36. }
  37. type epochWatcher struct {
  38. epoch uint64
  39. notifyCh notificationChan
  40. }
  41. type snapshotReversion struct {
  42. snapshot *IndexSnapshot
  43. applied chan error
  44. persisted chan error
  45. }
  46. func (s *Scorch) mainLoop() {
  47. var epochWatchers []*epochWatcher
  48. OUTER:
  49. for {
  50. atomic.AddUint64(&s.stats.TotIntroduceLoop, 1)
  51. select {
  52. case <-s.closeCh:
  53. break OUTER
  54. case epochWatcher := <-s.introducerNotifier:
  55. epochWatchers = append(epochWatchers, epochWatcher)
  56. case nextMerge := <-s.merges:
  57. s.introduceMerge(nextMerge)
  58. case next := <-s.introductions:
  59. err := s.introduceSegment(next)
  60. if err != nil {
  61. continue OUTER
  62. }
  63. case persist := <-s.persists:
  64. s.introducePersist(persist)
  65. case revertTo := <-s.revertToSnapshots:
  66. err := s.revertToSnapshot(revertTo)
  67. if err != nil {
  68. continue OUTER
  69. }
  70. }
  71. var epochCurr uint64
  72. s.rootLock.RLock()
  73. if s.root != nil {
  74. epochCurr = s.root.epoch
  75. }
  76. s.rootLock.RUnlock()
  77. var epochWatchersNext []*epochWatcher
  78. for _, w := range epochWatchers {
  79. if w.epoch < epochCurr {
  80. close(w.notifyCh)
  81. } else {
  82. epochWatchersNext = append(epochWatchersNext, w)
  83. }
  84. }
  85. epochWatchers = epochWatchersNext
  86. }
  87. s.asyncTasks.Done()
  88. }
  89. func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
  90. atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
  91. defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
  92. s.rootLock.RLock()
  93. root := s.root
  94. root.AddRef()
  95. s.rootLock.RUnlock()
  96. defer func() { _ = root.DecRef() }()
  97. nsegs := len(root.segment)
  98. // prepare new index snapshot
  99. newSnapshot := &IndexSnapshot{
  100. parent: s,
  101. segment: make([]*SegmentSnapshot, 0, nsegs+1),
  102. offsets: make([]uint64, 0, nsegs+1),
  103. internal: make(map[string][]byte, len(root.internal)),
  104. refs: 1,
  105. creator: "introduceSegment",
  106. }
  107. // iterate through current segments
  108. var running uint64
  109. var docsToPersistCount, memSegments, fileSegments uint64
  110. for i := range root.segment {
  111. // see if optimistic work included this segment
  112. delta, ok := next.obsoletes[root.segment[i].id]
  113. if !ok {
  114. var err error
  115. delta, err = root.segment[i].segment.DocNumbers(next.ids)
  116. if err != nil {
  117. next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
  118. close(next.applied)
  119. _ = newSnapshot.DecRef()
  120. return err
  121. }
  122. }
  123. newss := &SegmentSnapshot{
  124. id: root.segment[i].id,
  125. segment: root.segment[i].segment,
  126. cachedDocs: root.segment[i].cachedDocs,
  127. creator: root.segment[i].creator,
  128. }
  129. // apply new obsoletions
  130. if root.segment[i].deleted == nil {
  131. newss.deleted = delta
  132. } else {
  133. newss.deleted = roaring.Or(root.segment[i].deleted, delta)
  134. }
  135. if newss.deleted.IsEmpty() {
  136. newss.deleted = nil
  137. }
  138. // check for live size before copying
  139. if newss.LiveSize() > 0 {
  140. newSnapshot.segment = append(newSnapshot.segment, newss)
  141. root.segment[i].segment.AddRef()
  142. newSnapshot.offsets = append(newSnapshot.offsets, running)
  143. running += newss.segment.Count()
  144. }
  145. if isMemorySegment(root.segment[i]) {
  146. docsToPersistCount += root.segment[i].Count()
  147. memSegments++
  148. } else {
  149. fileSegments++
  150. }
  151. }
  152. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  153. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  154. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  155. // append new segment, if any, to end of the new index snapshot
  156. if next.data != nil {
  157. newSegmentSnapshot := &SegmentSnapshot{
  158. id: next.id,
  159. segment: next.data, // take ownership of next.data's ref-count
  160. cachedDocs: &cachedDocs{cache: nil},
  161. creator: "introduceSegment",
  162. }
  163. newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot)
  164. newSnapshot.offsets = append(newSnapshot.offsets, running)
  165. // increment numItemsIntroduced which tracks the number of items
  166. // queued for persistence.
  167. atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count())
  168. atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
  169. }
  170. // copy old values
  171. for key, oldVal := range root.internal {
  172. newSnapshot.internal[key] = oldVal
  173. }
  174. // set new values and apply deletes
  175. for key, newVal := range next.internal {
  176. if newVal != nil {
  177. newSnapshot.internal[key] = newVal
  178. } else {
  179. delete(newSnapshot.internal, key)
  180. }
  181. }
  182. newSnapshot.updateSize()
  183. s.rootLock.Lock()
  184. if next.persisted != nil {
  185. s.rootPersisted = append(s.rootPersisted, next.persisted)
  186. }
  187. if next.persistedCallback != nil {
  188. s.persistedCallbacks = append(s.persistedCallbacks, next.persistedCallback)
  189. }
  190. // swap in new index snapshot
  191. newSnapshot.epoch = s.nextSnapshotEpoch
  192. s.nextSnapshotEpoch++
  193. rootPrev := s.root
  194. s.root = newSnapshot
  195. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  196. // release lock
  197. s.rootLock.Unlock()
  198. if rootPrev != nil {
  199. _ = rootPrev.DecRef()
  200. }
  201. close(next.applied)
  202. return nil
  203. }
  204. func (s *Scorch) introducePersist(persist *persistIntroduction) {
  205. atomic.AddUint64(&s.stats.TotIntroducePersistBeg, 1)
  206. defer atomic.AddUint64(&s.stats.TotIntroducePersistEnd, 1)
  207. s.rootLock.Lock()
  208. root := s.root
  209. root.AddRef()
  210. nextSnapshotEpoch := s.nextSnapshotEpoch
  211. s.nextSnapshotEpoch++
  212. s.rootLock.Unlock()
  213. defer func() { _ = root.DecRef() }()
  214. newIndexSnapshot := &IndexSnapshot{
  215. parent: s,
  216. epoch: nextSnapshotEpoch,
  217. segment: make([]*SegmentSnapshot, len(root.segment)),
  218. offsets: make([]uint64, len(root.offsets)),
  219. internal: make(map[string][]byte, len(root.internal)),
  220. refs: 1,
  221. creator: "introducePersist",
  222. }
  223. var docsToPersistCount, memSegments, fileSegments uint64
  224. for i, segmentSnapshot := range root.segment {
  225. // see if this segment has been replaced
  226. if replacement, ok := persist.persisted[segmentSnapshot.id]; ok {
  227. newSegmentSnapshot := &SegmentSnapshot{
  228. id: segmentSnapshot.id,
  229. segment: replacement,
  230. deleted: segmentSnapshot.deleted,
  231. cachedDocs: segmentSnapshot.cachedDocs,
  232. creator: "introducePersist",
  233. }
  234. newIndexSnapshot.segment[i] = newSegmentSnapshot
  235. delete(persist.persisted, segmentSnapshot.id)
  236. // update items persisted incase of a new segment snapshot
  237. atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
  238. atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
  239. fileSegments++
  240. } else {
  241. newIndexSnapshot.segment[i] = root.segment[i]
  242. newIndexSnapshot.segment[i].segment.AddRef()
  243. if isMemorySegment(root.segment[i]) {
  244. docsToPersistCount += root.segment[i].Count()
  245. memSegments++
  246. } else {
  247. fileSegments++
  248. }
  249. }
  250. newIndexSnapshot.offsets[i] = root.offsets[i]
  251. }
  252. for k, v := range root.internal {
  253. newIndexSnapshot.internal[k] = v
  254. }
  255. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  256. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  257. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  258. newIndexSnapshot.updateSize()
  259. s.rootLock.Lock()
  260. rootPrev := s.root
  261. s.root = newIndexSnapshot
  262. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  263. s.rootLock.Unlock()
  264. if rootPrev != nil {
  265. _ = rootPrev.DecRef()
  266. }
  267. close(persist.applied)
  268. }
  269. func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
  270. atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
  271. defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
  272. s.rootLock.RLock()
  273. root := s.root
  274. root.AddRef()
  275. s.rootLock.RUnlock()
  276. defer func() { _ = root.DecRef() }()
  277. newSnapshot := &IndexSnapshot{
  278. parent: s,
  279. internal: root.internal,
  280. refs: 1,
  281. creator: "introduceMerge",
  282. }
  283. // iterate through current segments
  284. newSegmentDeleted := roaring.NewBitmap()
  285. var running, docsToPersistCount, memSegments, fileSegments uint64
  286. for i := range root.segment {
  287. segmentID := root.segment[i].id
  288. if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
  289. // this segment is going away, see if anything else was deleted since we started the merge
  290. if segSnapAtMerge != nil && root.segment[i].deleted != nil {
  291. // assume all these deletes are new
  292. deletedSince := root.segment[i].deleted
  293. // if we already knew about some of them, remove
  294. if segSnapAtMerge.deleted != nil {
  295. deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
  296. }
  297. deletedSinceItr := deletedSince.Iterator()
  298. for deletedSinceItr.HasNext() {
  299. oldDocNum := deletedSinceItr.Next()
  300. newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum]
  301. newSegmentDeleted.Add(uint32(newDocNum))
  302. }
  303. }
  304. // clean up the old segment map to figure out the
  305. // obsolete segments wrt root in meantime, whatever
  306. // segments left behind in old map after processing
  307. // the root segments would be the obsolete segment set
  308. delete(nextMerge.old, segmentID)
  309. } else if root.segment[i].LiveSize() > 0 {
  310. // this segment is staying
  311. newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
  312. id: root.segment[i].id,
  313. segment: root.segment[i].segment,
  314. deleted: root.segment[i].deleted,
  315. cachedDocs: root.segment[i].cachedDocs,
  316. creator: root.segment[i].creator,
  317. })
  318. root.segment[i].segment.AddRef()
  319. newSnapshot.offsets = append(newSnapshot.offsets, running)
  320. running += root.segment[i].segment.Count()
  321. if isMemorySegment(root.segment[i]) {
  322. docsToPersistCount += root.segment[i].Count()
  323. memSegments++
  324. } else {
  325. fileSegments++
  326. }
  327. }
  328. }
  329. // before the newMerge introduction, need to clean the newly
  330. // merged segment wrt the current root segments, hence
  331. // applying the obsolete segment contents to newly merged segment
  332. for segID, ss := range nextMerge.old {
  333. obsoleted := ss.DocNumbersLive()
  334. if obsoleted != nil {
  335. obsoletedIter := obsoleted.Iterator()
  336. for obsoletedIter.HasNext() {
  337. oldDocNum := obsoletedIter.Next()
  338. newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
  339. newSegmentDeleted.Add(uint32(newDocNum))
  340. }
  341. }
  342. }
  343. // In case where all the docs in the newly merged segment getting
  344. // deleted by the time we reach here, can skip the introduction.
  345. if nextMerge.new != nil &&
  346. nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
  347. // put new segment at end
  348. newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
  349. id: nextMerge.id,
  350. segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
  351. deleted: newSegmentDeleted,
  352. cachedDocs: &cachedDocs{cache: nil},
  353. creator: "introduceMerge",
  354. })
  355. newSnapshot.offsets = append(newSnapshot.offsets, running)
  356. atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
  357. switch nextMerge.new.(type) {
  358. case *zap.SegmentBase:
  359. docsToPersistCount += nextMerge.new.Count() - newSegmentDeleted.GetCardinality()
  360. memSegments++
  361. case *zap.Segment:
  362. fileSegments++
  363. }
  364. }
  365. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  366. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  367. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  368. newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
  369. newSnapshot.updateSize()
  370. s.rootLock.Lock()
  371. // swap in new index snapshot
  372. newSnapshot.epoch = s.nextSnapshotEpoch
  373. s.nextSnapshotEpoch++
  374. rootPrev := s.root
  375. s.root = newSnapshot
  376. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  377. // release lock
  378. s.rootLock.Unlock()
  379. if rootPrev != nil {
  380. _ = rootPrev.DecRef()
  381. }
  382. // notify requester that we incorporated this
  383. nextMerge.notify <- newSnapshot
  384. close(nextMerge.notify)
  385. }
  386. func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
  387. atomic.AddUint64(&s.stats.TotIntroduceRevertBeg, 1)
  388. defer atomic.AddUint64(&s.stats.TotIntroduceRevertEnd, 1)
  389. if revertTo.snapshot == nil {
  390. err := fmt.Errorf("Cannot revert to a nil snapshot")
  391. revertTo.applied <- err
  392. return err
  393. }
  394. // acquire lock
  395. s.rootLock.Lock()
  396. // prepare a new index snapshot, based on next snapshot
  397. newSnapshot := &IndexSnapshot{
  398. parent: s,
  399. segment: make([]*SegmentSnapshot, len(revertTo.snapshot.segment)),
  400. offsets: revertTo.snapshot.offsets,
  401. internal: revertTo.snapshot.internal,
  402. epoch: s.nextSnapshotEpoch,
  403. refs: 1,
  404. creator: "revertToSnapshot",
  405. }
  406. s.nextSnapshotEpoch++
  407. var docsToPersistCount, memSegments, fileSegments uint64
  408. // iterate through segments
  409. for i, segmentSnapshot := range revertTo.snapshot.segment {
  410. newSnapshot.segment[i] = &SegmentSnapshot{
  411. id: segmentSnapshot.id,
  412. segment: segmentSnapshot.segment,
  413. deleted: segmentSnapshot.deleted,
  414. cachedDocs: segmentSnapshot.cachedDocs,
  415. creator: segmentSnapshot.creator,
  416. }
  417. newSnapshot.segment[i].segment.AddRef()
  418. // remove segment from ineligibleForRemoval map
  419. filename := zapFileName(segmentSnapshot.id)
  420. delete(s.ineligibleForRemoval, filename)
  421. if isMemorySegment(segmentSnapshot) {
  422. docsToPersistCount += segmentSnapshot.Count()
  423. memSegments++
  424. } else {
  425. fileSegments++
  426. }
  427. }
  428. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  429. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  430. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  431. if revertTo.persisted != nil {
  432. s.rootPersisted = append(s.rootPersisted, revertTo.persisted)
  433. }
  434. newSnapshot.updateSize()
  435. // swap in new snapshot
  436. rootPrev := s.root
  437. s.root = newSnapshot
  438. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  439. // release lock
  440. s.rootLock.Unlock()
  441. if rootPrev != nil {
  442. _ = rootPrev.DecRef()
  443. }
  444. close(revertTo.applied)
  445. return nil
  446. }
  447. func isMemorySegment(s *SegmentSnapshot) bool {
  448. switch s.segment.(type) {
  449. case *zap.SegmentBase:
  450. return true
  451. default:
  452. return false
  453. }
  454. }