You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

introducer.go 9.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Copyright (c) 2017 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package scorch
  15. import (
  16. "fmt"
  17. "sync/atomic"
  18. "github.com/RoaringBitmap/roaring"
  19. "github.com/blevesearch/bleve/index/scorch/segment"
  20. )
  21. type segmentIntroduction struct {
  22. id uint64
  23. data segment.Segment
  24. obsoletes map[uint64]*roaring.Bitmap
  25. ids []string
  26. internal map[string][]byte
  27. applied chan error
  28. persisted chan error
  29. }
  30. type epochWatcher struct {
  31. epoch uint64
  32. notifyCh notificationChan
  33. }
  34. type snapshotReversion struct {
  35. snapshot *IndexSnapshot
  36. applied chan error
  37. persisted chan error
  38. }
  39. func (s *Scorch) mainLoop() {
  40. var epochWatchers []*epochWatcher
  41. OUTER:
  42. for {
  43. select {
  44. case <-s.closeCh:
  45. break OUTER
  46. case epochWatcher := <-s.introducerNotifier:
  47. epochWatchers = append(epochWatchers, epochWatcher)
  48. case nextMerge := <-s.merges:
  49. s.introduceMerge(nextMerge)
  50. case next := <-s.introductions:
  51. err := s.introduceSegment(next)
  52. if err != nil {
  53. continue OUTER
  54. }
  55. case revertTo := <-s.revertToSnapshots:
  56. err := s.revertToSnapshot(revertTo)
  57. if err != nil {
  58. continue OUTER
  59. }
  60. }
  61. var epochCurr uint64
  62. s.rootLock.RLock()
  63. if s.root != nil {
  64. epochCurr = s.root.epoch
  65. }
  66. s.rootLock.RUnlock()
  67. var epochWatchersNext []*epochWatcher
  68. for _, w := range epochWatchers {
  69. if w.epoch < epochCurr {
  70. close(w.notifyCh)
  71. } else {
  72. epochWatchersNext = append(epochWatchersNext, w)
  73. }
  74. }
  75. epochWatchers = epochWatchersNext
  76. }
  77. s.asyncTasks.Done()
  78. }
  79. func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
  80. // acquire lock
  81. s.rootLock.Lock()
  82. nsegs := len(s.root.segment)
  83. // prepare new index snapshot
  84. newSnapshot := &IndexSnapshot{
  85. parent: s,
  86. segment: make([]*SegmentSnapshot, 0, nsegs+1),
  87. offsets: make([]uint64, 0, nsegs+1),
  88. internal: make(map[string][]byte, len(s.root.internal)),
  89. epoch: s.nextSnapshotEpoch,
  90. refs: 1,
  91. }
  92. s.nextSnapshotEpoch++
  93. // iterate through current segments
  94. var running uint64
  95. for i := range s.root.segment {
  96. // see if optimistic work included this segment
  97. delta, ok := next.obsoletes[s.root.segment[i].id]
  98. if !ok {
  99. var err error
  100. delta, err = s.root.segment[i].segment.DocNumbers(next.ids)
  101. if err != nil {
  102. s.rootLock.Unlock()
  103. next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
  104. close(next.applied)
  105. _ = newSnapshot.DecRef()
  106. return err
  107. }
  108. }
  109. newss := &SegmentSnapshot{
  110. id: s.root.segment[i].id,
  111. segment: s.root.segment[i].segment,
  112. cachedDocs: s.root.segment[i].cachedDocs,
  113. }
  114. // apply new obsoletions
  115. if s.root.segment[i].deleted == nil {
  116. newss.deleted = delta
  117. } else {
  118. newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
  119. }
  120. // check for live size before copying
  121. if newss.LiveSize() > 0 {
  122. newSnapshot.segment = append(newSnapshot.segment, newss)
  123. s.root.segment[i].segment.AddRef()
  124. newSnapshot.offsets = append(newSnapshot.offsets, running)
  125. running += s.root.segment[i].Count()
  126. }
  127. }
  128. // append new segment, if any, to end of the new index snapshot
  129. if next.data != nil {
  130. newSegmentSnapshot := &SegmentSnapshot{
  131. id: next.id,
  132. segment: next.data, // take ownership of next.data's ref-count
  133. cachedDocs: &cachedDocs{cache: nil},
  134. }
  135. newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot)
  136. newSnapshot.offsets = append(newSnapshot.offsets, running)
  137. // increment numItemsIntroduced which tracks the number of items
  138. // queued for persistence.
  139. atomic.AddUint64(&s.stats.numItemsIntroduced, newSegmentSnapshot.Count())
  140. }
  141. // copy old values
  142. for key, oldVal := range s.root.internal {
  143. newSnapshot.internal[key] = oldVal
  144. }
  145. // set new values and apply deletes
  146. for key, newVal := range next.internal {
  147. if newVal != nil {
  148. newSnapshot.internal[key] = newVal
  149. } else {
  150. delete(newSnapshot.internal, key)
  151. }
  152. }
  153. if next.persisted != nil {
  154. s.rootPersisted = append(s.rootPersisted, next.persisted)
  155. }
  156. // swap in new index snapshot
  157. rootPrev := s.root
  158. s.root = newSnapshot
  159. // release lock
  160. s.rootLock.Unlock()
  161. if rootPrev != nil {
  162. _ = rootPrev.DecRef()
  163. }
  164. close(next.applied)
  165. return nil
  166. }
  167. func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
  168. // acquire lock
  169. s.rootLock.Lock()
  170. // prepare new index snapshot
  171. currSize := len(s.root.segment)
  172. newSize := currSize + 1 - len(nextMerge.old)
  173. // empty segments deletion
  174. if nextMerge.new == nil {
  175. newSize--
  176. }
  177. newSnapshot := &IndexSnapshot{
  178. parent: s,
  179. segment: make([]*SegmentSnapshot, 0, newSize),
  180. offsets: make([]uint64, 0, newSize),
  181. internal: s.root.internal,
  182. epoch: s.nextSnapshotEpoch,
  183. refs: 1,
  184. }
  185. s.nextSnapshotEpoch++
  186. // iterate through current segments
  187. newSegmentDeleted := roaring.NewBitmap()
  188. var running uint64
  189. for i := range s.root.segment {
  190. segmentID := s.root.segment[i].id
  191. if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
  192. // this segment is going away, see if anything else was deleted since we started the merge
  193. if segSnapAtMerge != nil && s.root.segment[i].deleted != nil {
  194. // assume all these deletes are new
  195. deletedSince := s.root.segment[i].deleted
  196. // if we already knew about some of them, remove
  197. if segSnapAtMerge.deleted != nil {
  198. deletedSince = roaring.AndNot(s.root.segment[i].deleted, segSnapAtMerge.deleted)
  199. }
  200. deletedSinceItr := deletedSince.Iterator()
  201. for deletedSinceItr.HasNext() {
  202. oldDocNum := deletedSinceItr.Next()
  203. newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum]
  204. newSegmentDeleted.Add(uint32(newDocNum))
  205. }
  206. }
  207. // clean up the old segment map to figure out the
  208. // obsolete segments wrt root in meantime, whatever
  209. // segments left behind in old map after processing
  210. // the root segments would be the obsolete segment set
  211. delete(nextMerge.old, segmentID)
  212. } else if s.root.segment[i].LiveSize() > 0 {
  213. // this segment is staying
  214. newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
  215. id: s.root.segment[i].id,
  216. segment: s.root.segment[i].segment,
  217. deleted: s.root.segment[i].deleted,
  218. cachedDocs: s.root.segment[i].cachedDocs,
  219. })
  220. s.root.segment[i].segment.AddRef()
  221. newSnapshot.offsets = append(newSnapshot.offsets, running)
  222. running += s.root.segment[i].Count()
  223. }
  224. }
  225. // before the newMerge introduction, need to clean the newly
  226. // merged segment wrt the current root segments, hence
  227. // applying the obsolete segment contents to newly merged segment
  228. for segID, ss := range nextMerge.old {
  229. obsoleted := ss.DocNumbersLive()
  230. if obsoleted != nil {
  231. obsoletedIter := obsoleted.Iterator()
  232. for obsoletedIter.HasNext() {
  233. oldDocNum := obsoletedIter.Next()
  234. newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
  235. newSegmentDeleted.Add(uint32(newDocNum))
  236. }
  237. }
  238. }
  239. // In case where all the docs in the newly merged segment getting
  240. // deleted by the time we reach here, can skip the introduction.
  241. if nextMerge.new != nil &&
  242. nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
  243. // put new segment at end
  244. newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
  245. id: nextMerge.id,
  246. segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
  247. deleted: newSegmentDeleted,
  248. cachedDocs: &cachedDocs{cache: nil},
  249. })
  250. newSnapshot.offsets = append(newSnapshot.offsets, running)
  251. }
  252. newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
  253. // swap in new segment
  254. rootPrev := s.root
  255. s.root = newSnapshot
  256. // release lock
  257. s.rootLock.Unlock()
  258. if rootPrev != nil {
  259. _ = rootPrev.DecRef()
  260. }
  261. // notify requester that we incorporated this
  262. nextMerge.notify <- newSnapshot
  263. close(nextMerge.notify)
  264. }
  265. func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
  266. if revertTo.snapshot == nil {
  267. err := fmt.Errorf("Cannot revert to a nil snapshot")
  268. revertTo.applied <- err
  269. return err
  270. }
  271. // acquire lock
  272. s.rootLock.Lock()
  273. // prepare a new index snapshot, based on next snapshot
  274. newSnapshot := &IndexSnapshot{
  275. parent: s,
  276. segment: make([]*SegmentSnapshot, len(revertTo.snapshot.segment)),
  277. offsets: revertTo.snapshot.offsets,
  278. internal: revertTo.snapshot.internal,
  279. epoch: s.nextSnapshotEpoch,
  280. refs: 1,
  281. }
  282. s.nextSnapshotEpoch++
  283. // iterate through segments
  284. for i, segmentSnapshot := range revertTo.snapshot.segment {
  285. newSnapshot.segment[i] = &SegmentSnapshot{
  286. id: segmentSnapshot.id,
  287. segment: segmentSnapshot.segment,
  288. deleted: segmentSnapshot.deleted,
  289. cachedDocs: segmentSnapshot.cachedDocs,
  290. }
  291. newSnapshot.segment[i].segment.AddRef()
  292. // remove segment from ineligibleForRemoval map
  293. filename := zapFileName(segmentSnapshot.id)
  294. delete(s.ineligibleForRemoval, filename)
  295. }
  296. if revertTo.persisted != nil {
  297. s.rootPersisted = append(s.rootPersisted, revertTo.persisted)
  298. }
  299. // swap in new snapshot
  300. rootPrev := s.root
  301. s.root = newSnapshot
  302. // release lock
  303. s.rootLock.Unlock()
  304. if rootPrev != nil {
  305. _ = rootPrev.DecRef()
  306. }
  307. close(revertTo.applied)
  308. return nil
  309. }