You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

introducer.go 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. // Copyright (c) 2017 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package scorch
  15. import (
  16. "fmt"
  17. "sync/atomic"
  18. "github.com/RoaringBitmap/roaring"
  19. "github.com/blevesearch/bleve/index"
  20. "github.com/blevesearch/bleve/index/scorch/segment"
  21. )
  22. type segmentIntroduction struct {
  23. id uint64
  24. data segment.Segment
  25. obsoletes map[uint64]*roaring.Bitmap
  26. ids []string
  27. internal map[string][]byte
  28. applied chan error
  29. persisted chan error
  30. persistedCallback index.BatchCallback
  31. }
  32. type persistIntroduction struct {
  33. persisted map[uint64]segment.Segment
  34. applied notificationChan
  35. }
  36. type epochWatcher struct {
  37. epoch uint64
  38. notifyCh notificationChan
  39. }
  40. type snapshotReversion struct {
  41. snapshot *IndexSnapshot
  42. applied chan error
  43. persisted chan error
  44. }
  45. func (s *Scorch) mainLoop() {
  46. var epochWatchers []*epochWatcher
  47. OUTER:
  48. for {
  49. atomic.AddUint64(&s.stats.TotIntroduceLoop, 1)
  50. select {
  51. case <-s.closeCh:
  52. break OUTER
  53. case epochWatcher := <-s.introducerNotifier:
  54. epochWatchers = append(epochWatchers, epochWatcher)
  55. case nextMerge := <-s.merges:
  56. s.introduceMerge(nextMerge)
  57. case next := <-s.introductions:
  58. err := s.introduceSegment(next)
  59. if err != nil {
  60. continue OUTER
  61. }
  62. case persist := <-s.persists:
  63. s.introducePersist(persist)
  64. }
  65. var epochCurr uint64
  66. s.rootLock.RLock()
  67. if s.root != nil {
  68. epochCurr = s.root.epoch
  69. }
  70. s.rootLock.RUnlock()
  71. var epochWatchersNext []*epochWatcher
  72. for _, w := range epochWatchers {
  73. if w.epoch < epochCurr {
  74. close(w.notifyCh)
  75. } else {
  76. epochWatchersNext = append(epochWatchersNext, w)
  77. }
  78. }
  79. epochWatchers = epochWatchersNext
  80. }
  81. s.asyncTasks.Done()
  82. }
  83. func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
  84. atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
  85. defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
  86. s.rootLock.RLock()
  87. root := s.root
  88. root.AddRef()
  89. s.rootLock.RUnlock()
  90. defer func() { _ = root.DecRef() }()
  91. nsegs := len(root.segment)
  92. // prepare new index snapshot
  93. newSnapshot := &IndexSnapshot{
  94. parent: s,
  95. segment: make([]*SegmentSnapshot, 0, nsegs+1),
  96. offsets: make([]uint64, 0, nsegs+1),
  97. internal: make(map[string][]byte, len(root.internal)),
  98. refs: 1,
  99. creator: "introduceSegment",
  100. }
  101. // iterate through current segments
  102. var running uint64
  103. var docsToPersistCount, memSegments, fileSegments uint64
  104. for i := range root.segment {
  105. // see if optimistic work included this segment
  106. delta, ok := next.obsoletes[root.segment[i].id]
  107. if !ok {
  108. var err error
  109. delta, err = root.segment[i].segment.DocNumbers(next.ids)
  110. if err != nil {
  111. next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
  112. close(next.applied)
  113. _ = newSnapshot.DecRef()
  114. return err
  115. }
  116. }
  117. newss := &SegmentSnapshot{
  118. id: root.segment[i].id,
  119. segment: root.segment[i].segment,
  120. cachedDocs: root.segment[i].cachedDocs,
  121. creator: root.segment[i].creator,
  122. }
  123. // apply new obsoletions
  124. if root.segment[i].deleted == nil {
  125. newss.deleted = delta
  126. } else {
  127. newss.deleted = roaring.Or(root.segment[i].deleted, delta)
  128. }
  129. if newss.deleted.IsEmpty() {
  130. newss.deleted = nil
  131. }
  132. // check for live size before copying
  133. if newss.LiveSize() > 0 {
  134. newSnapshot.segment = append(newSnapshot.segment, newss)
  135. root.segment[i].segment.AddRef()
  136. newSnapshot.offsets = append(newSnapshot.offsets, running)
  137. running += newss.segment.Count()
  138. }
  139. if isMemorySegment(root.segment[i]) {
  140. docsToPersistCount += root.segment[i].Count()
  141. memSegments++
  142. } else {
  143. fileSegments++
  144. }
  145. }
  146. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  147. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  148. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  149. // append new segment, if any, to end of the new index snapshot
  150. if next.data != nil {
  151. newSegmentSnapshot := &SegmentSnapshot{
  152. id: next.id,
  153. segment: next.data, // take ownership of next.data's ref-count
  154. cachedDocs: &cachedDocs{cache: nil},
  155. creator: "introduceSegment",
  156. }
  157. newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot)
  158. newSnapshot.offsets = append(newSnapshot.offsets, running)
  159. // increment numItemsIntroduced which tracks the number of items
  160. // queued for persistence.
  161. atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count())
  162. atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
  163. }
  164. // copy old values
  165. for key, oldVal := range root.internal {
  166. newSnapshot.internal[key] = oldVal
  167. }
  168. // set new values and apply deletes
  169. for key, newVal := range next.internal {
  170. if newVal != nil {
  171. newSnapshot.internal[key] = newVal
  172. } else {
  173. delete(newSnapshot.internal, key)
  174. }
  175. }
  176. newSnapshot.updateSize()
  177. s.rootLock.Lock()
  178. if next.persisted != nil {
  179. s.rootPersisted = append(s.rootPersisted, next.persisted)
  180. }
  181. if next.persistedCallback != nil {
  182. s.persistedCallbacks = append(s.persistedCallbacks, next.persistedCallback)
  183. }
  184. // swap in new index snapshot
  185. newSnapshot.epoch = s.nextSnapshotEpoch
  186. s.nextSnapshotEpoch++
  187. rootPrev := s.root
  188. s.root = newSnapshot
  189. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  190. // release lock
  191. s.rootLock.Unlock()
  192. if rootPrev != nil {
  193. _ = rootPrev.DecRef()
  194. }
  195. close(next.applied)
  196. return nil
  197. }
  198. func (s *Scorch) introducePersist(persist *persistIntroduction) {
  199. atomic.AddUint64(&s.stats.TotIntroducePersistBeg, 1)
  200. defer atomic.AddUint64(&s.stats.TotIntroducePersistEnd, 1)
  201. s.rootLock.Lock()
  202. root := s.root
  203. root.AddRef()
  204. nextSnapshotEpoch := s.nextSnapshotEpoch
  205. s.nextSnapshotEpoch++
  206. s.rootLock.Unlock()
  207. defer func() { _ = root.DecRef() }()
  208. newIndexSnapshot := &IndexSnapshot{
  209. parent: s,
  210. epoch: nextSnapshotEpoch,
  211. segment: make([]*SegmentSnapshot, len(root.segment)),
  212. offsets: make([]uint64, len(root.offsets)),
  213. internal: make(map[string][]byte, len(root.internal)),
  214. refs: 1,
  215. creator: "introducePersist",
  216. }
  217. var docsToPersistCount, memSegments, fileSegments uint64
  218. for i, segmentSnapshot := range root.segment {
  219. // see if this segment has been replaced
  220. if replacement, ok := persist.persisted[segmentSnapshot.id]; ok {
  221. newSegmentSnapshot := &SegmentSnapshot{
  222. id: segmentSnapshot.id,
  223. segment: replacement,
  224. deleted: segmentSnapshot.deleted,
  225. cachedDocs: segmentSnapshot.cachedDocs,
  226. creator: "introducePersist",
  227. }
  228. newIndexSnapshot.segment[i] = newSegmentSnapshot
  229. delete(persist.persisted, segmentSnapshot.id)
  230. // update items persisted incase of a new segment snapshot
  231. atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
  232. atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
  233. fileSegments++
  234. } else {
  235. newIndexSnapshot.segment[i] = root.segment[i]
  236. newIndexSnapshot.segment[i].segment.AddRef()
  237. if isMemorySegment(root.segment[i]) {
  238. docsToPersistCount += root.segment[i].Count()
  239. memSegments++
  240. } else {
  241. fileSegments++
  242. }
  243. }
  244. newIndexSnapshot.offsets[i] = root.offsets[i]
  245. }
  246. for k, v := range root.internal {
  247. newIndexSnapshot.internal[k] = v
  248. }
  249. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  250. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  251. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  252. newIndexSnapshot.updateSize()
  253. s.rootLock.Lock()
  254. rootPrev := s.root
  255. s.root = newIndexSnapshot
  256. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  257. s.rootLock.Unlock()
  258. if rootPrev != nil {
  259. _ = rootPrev.DecRef()
  260. }
  261. close(persist.applied)
  262. }
  263. // The introducer should definitely handle the segmentMerge.notify
  264. // channel before exiting the introduceMerge.
  265. func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
  266. atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
  267. defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
  268. s.rootLock.RLock()
  269. root := s.root
  270. root.AddRef()
  271. s.rootLock.RUnlock()
  272. defer func() { _ = root.DecRef() }()
  273. newSnapshot := &IndexSnapshot{
  274. parent: s,
  275. internal: root.internal,
  276. refs: 1,
  277. creator: "introduceMerge",
  278. }
  279. // iterate through current segments
  280. newSegmentDeleted := roaring.NewBitmap()
  281. var running, docsToPersistCount, memSegments, fileSegments uint64
  282. for i := range root.segment {
  283. segmentID := root.segment[i].id
  284. if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
  285. // this segment is going away, see if anything else was deleted since we started the merge
  286. if segSnapAtMerge != nil && root.segment[i].deleted != nil {
  287. // assume all these deletes are new
  288. deletedSince := root.segment[i].deleted
  289. // if we already knew about some of them, remove
  290. if segSnapAtMerge.deleted != nil {
  291. deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
  292. }
  293. deletedSinceItr := deletedSince.Iterator()
  294. for deletedSinceItr.HasNext() {
  295. oldDocNum := deletedSinceItr.Next()
  296. newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum]
  297. newSegmentDeleted.Add(uint32(newDocNum))
  298. }
  299. }
  300. // clean up the old segment map to figure out the
  301. // obsolete segments wrt root in meantime, whatever
  302. // segments left behind in old map after processing
  303. // the root segments would be the obsolete segment set
  304. delete(nextMerge.old, segmentID)
  305. } else if root.segment[i].LiveSize() > 0 {
  306. // this segment is staying
  307. newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
  308. id: root.segment[i].id,
  309. segment: root.segment[i].segment,
  310. deleted: root.segment[i].deleted,
  311. cachedDocs: root.segment[i].cachedDocs,
  312. creator: root.segment[i].creator,
  313. })
  314. root.segment[i].segment.AddRef()
  315. newSnapshot.offsets = append(newSnapshot.offsets, running)
  316. running += root.segment[i].segment.Count()
  317. if isMemorySegment(root.segment[i]) {
  318. docsToPersistCount += root.segment[i].Count()
  319. memSegments++
  320. } else {
  321. fileSegments++
  322. }
  323. }
  324. }
  325. // before the newMerge introduction, need to clean the newly
  326. // merged segment wrt the current root segments, hence
  327. // applying the obsolete segment contents to newly merged segment
  328. for segID, ss := range nextMerge.old {
  329. obsoleted := ss.DocNumbersLive()
  330. if obsoleted != nil {
  331. obsoletedIter := obsoleted.Iterator()
  332. for obsoletedIter.HasNext() {
  333. oldDocNum := obsoletedIter.Next()
  334. newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
  335. newSegmentDeleted.Add(uint32(newDocNum))
  336. }
  337. }
  338. }
  339. // In case where all the docs in the newly merged segment getting
  340. // deleted by the time we reach here, can skip the introduction.
  341. if nextMerge.new != nil &&
  342. nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
  343. // put new segment at end
  344. newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
  345. id: nextMerge.id,
  346. segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
  347. deleted: newSegmentDeleted,
  348. cachedDocs: &cachedDocs{cache: nil},
  349. creator: "introduceMerge",
  350. })
  351. newSnapshot.offsets = append(newSnapshot.offsets, running)
  352. atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
  353. switch nextMerge.new.(type) {
  354. case segment.PersistedSegment:
  355. fileSegments++
  356. default:
  357. docsToPersistCount += nextMerge.new.Count() - newSegmentDeleted.GetCardinality()
  358. memSegments++
  359. }
  360. }
  361. atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
  362. atomic.StoreUint64(&s.stats.TotMemorySegmentsAtRoot, memSegments)
  363. atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments)
  364. newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
  365. newSnapshot.updateSize()
  366. s.rootLock.Lock()
  367. // swap in new index snapshot
  368. newSnapshot.epoch = s.nextSnapshotEpoch
  369. s.nextSnapshotEpoch++
  370. rootPrev := s.root
  371. s.root = newSnapshot
  372. atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch)
  373. // release lock
  374. s.rootLock.Unlock()
  375. if rootPrev != nil {
  376. _ = rootPrev.DecRef()
  377. }
  378. // notify requester that we incorporated this
  379. nextMerge.notify <- newSnapshot
  380. close(nextMerge.notify)
  381. }
  382. func isMemorySegment(s *SegmentSnapshot) bool {
  383. switch s.segment.(type) {
  384. case segment.PersistedSegment:
  385. return false
  386. default:
  387. return true
  388. }
  389. }