Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

upr.go 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package couchbase
  2. import (
  3. "log"
  4. "sync"
  5. "time"
  6. "fmt"
  7. "github.com/couchbase/gomemcached"
  8. "github.com/couchbase/gomemcached/client"
  9. "github.com/couchbase/goutils/logging"
  10. )
  11. // A UprFeed streams mutation events from a bucket.
  12. //
  13. // Events from the bucket can be read from the channel 'C'. Remember
  14. // to call Close() on it when you're done, unless its channel has
  15. // closed itself already.
  16. type UprFeed struct {
  17. C <-chan *memcached.UprEvent
  18. bucket *Bucket
  19. nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes
  20. output chan *memcached.UprEvent // Same as C but writeably-typed
  21. outputClosed bool
  22. quit chan bool
  23. name string // name of this UPR feed
  24. sequence uint32 // sequence number for this feed
  25. connected bool
  26. killSwitch chan bool
  27. closing bool
  28. wg sync.WaitGroup
  29. dcp_buffer_size uint32
  30. data_chan_size int
  31. }
  32. // UprFeed from a single connection
  33. type FeedInfo struct {
  34. uprFeed *memcached.UprFeed // UPR feed handle
  35. host string // hostname
  36. connected bool // connected
  37. quit chan bool // quit channel
  38. }
  39. type FailoverLog map[uint16]memcached.FailoverLog
  40. // GetFailoverLogs, get the failover logs for a set of vbucket ids
  41. func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {
  42. // map vbids to their corresponding hosts
  43. vbHostList := make(map[string][]uint16)
  44. vbm := b.VBServerMap()
  45. if len(vbm.VBucketMap) < len(vBuckets) {
  46. return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
  47. vbm.VBucketMap, vBuckets)
  48. }
  49. for _, vb := range vBuckets {
  50. masterID := vbm.VBucketMap[vb][0]
  51. master := b.getMasterNode(masterID)
  52. if master == "" {
  53. return nil, fmt.Errorf("No master found for vb %d", vb)
  54. }
  55. vbList := vbHostList[master]
  56. if vbList == nil {
  57. vbList = make([]uint16, 0)
  58. }
  59. vbList = append(vbList, vb)
  60. vbHostList[master] = vbList
  61. }
  62. failoverLogMap := make(FailoverLog)
  63. for _, serverConn := range b.getConnPools(false /* not already locked */) {
  64. vbList := vbHostList[serverConn.host]
  65. if vbList == nil {
  66. continue
  67. }
  68. mc, err := serverConn.Get()
  69. if err != nil {
  70. logging.Infof("No Free connections for vblist %v", vbList)
  71. return nil, fmt.Errorf("No Free connections for host %s",
  72. serverConn.host)
  73. }
  74. // close the connection so that it doesn't get reused for upr data
  75. // connection
  76. defer mc.Close()
  77. failoverlogs, err := mc.UprGetFailoverLog(vbList)
  78. if err != nil {
  79. return nil, fmt.Errorf("Error getting failover log %s host %s",
  80. err.Error(), serverConn.host)
  81. }
  82. for vb, log := range failoverlogs {
  83. failoverLogMap[vb] = *log
  84. }
  85. }
  86. return failoverLogMap, nil
  87. }
  88. func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
  89. return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
  90. }
  91. // StartUprFeed creates and starts a new Upr feed
  92. // No data will be sent on the channel unless vbuckets streams are requested
  93. func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {
  94. feed := &UprFeed{
  95. bucket: b,
  96. output: make(chan *memcached.UprEvent, data_chan_size),
  97. quit: make(chan bool),
  98. nodeFeeds: make(map[string]*FeedInfo, 0),
  99. name: name,
  100. sequence: sequence,
  101. killSwitch: make(chan bool),
  102. dcp_buffer_size: dcp_buffer_size,
  103. data_chan_size: data_chan_size,
  104. }
  105. err := feed.connectToNodes()
  106. if err != nil {
  107. return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
  108. }
  109. feed.connected = true
  110. go feed.run()
  111. feed.C = feed.output
  112. return feed, nil
  113. }
  114. // UprRequestStream starts a stream for a vb on a feed
  115. func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
  116. vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
  117. defer func() {
  118. if r := recover(); r != nil {
  119. log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
  120. }
  121. }()
  122. vbm := feed.bucket.VBServerMap()
  123. if len(vbm.VBucketMap) < int(vb) {
  124. return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
  125. vb, vbm.VBucketMap)
  126. }
  127. if int(vb) >= len(vbm.VBucketMap) {
  128. return fmt.Errorf("Invalid vbucket id %d", vb)
  129. }
  130. masterID := vbm.VBucketMap[vb][0]
  131. master := feed.bucket.getMasterNode(masterID)
  132. if master == "" {
  133. return fmt.Errorf("Master node not found for vbucket %d", vb)
  134. }
  135. singleFeed := feed.nodeFeeds[master]
  136. if singleFeed == nil {
  137. return fmt.Errorf("UprFeed for this host not found")
  138. }
  139. if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
  140. vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
  141. return err
  142. }
  143. return nil
  144. }
  145. // UprCloseStream ends a vbucket stream.
  146. func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {
  147. defer func() {
  148. if r := recover(); r != nil {
  149. log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
  150. }
  151. }()
  152. vbm := feed.bucket.VBServerMap()
  153. if len(vbm.VBucketMap) < int(vb) {
  154. return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
  155. vb, vbm.VBucketMap)
  156. }
  157. if int(vb) >= len(vbm.VBucketMap) {
  158. return fmt.Errorf("Invalid vbucket id %d", vb)
  159. }
  160. masterID := vbm.VBucketMap[vb][0]
  161. master := feed.bucket.getMasterNode(masterID)
  162. if master == "" {
  163. return fmt.Errorf("Master node not found for vbucket %d", vb)
  164. }
  165. singleFeed := feed.nodeFeeds[master]
  166. if singleFeed == nil {
  167. return fmt.Errorf("UprFeed for this host not found")
  168. }
  169. if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
  170. return err
  171. }
  172. return nil
  173. }
  174. // Goroutine that runs the feed
  175. func (feed *UprFeed) run() {
  176. retryInterval := initialRetryInterval
  177. bucketOK := true
  178. for {
  179. // Connect to the UPR feed of each server node:
  180. if bucketOK {
  181. // Run until one of the sub-feeds fails:
  182. select {
  183. case <-feed.killSwitch:
  184. case <-feed.quit:
  185. return
  186. }
  187. //feed.closeNodeFeeds()
  188. retryInterval = initialRetryInterval
  189. }
  190. if feed.closing == true {
  191. // we have been asked to shut down
  192. return
  193. }
  194. // On error, try to refresh the bucket in case the list of nodes changed:
  195. logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
  196. feed.bucket.Name, retryInterval)
  197. if err := feed.bucket.Refresh(); err != nil {
  198. // if we fail to refresh the bucket, exit the feed
  199. // MB-14917
  200. logging.Infof("Unable to refresh bucket %s ", err.Error())
  201. close(feed.output)
  202. feed.outputClosed = true
  203. feed.closeNodeFeeds()
  204. return
  205. }
  206. // this will only connect to nodes that are not connected or changed
  207. // user will have to reconnect the stream
  208. err := feed.connectToNodes()
  209. if err != nil {
  210. logging.Infof("Unable to connect to nodes..exit ")
  211. close(feed.output)
  212. feed.outputClosed = true
  213. feed.closeNodeFeeds()
  214. return
  215. }
  216. bucketOK = err == nil
  217. select {
  218. case <-time.After(retryInterval):
  219. case <-feed.quit:
  220. return
  221. }
  222. if retryInterval *= 2; retryInterval > maximumRetryInterval {
  223. retryInterval = maximumRetryInterval
  224. }
  225. }
  226. }
  227. func (feed *UprFeed) connectToNodes() (err error) {
  228. nodeCount := 0
  229. for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
  230. // this maybe a reconnection, so check if the connection to the node
  231. // already exists. Connect only if the node is not found in the list
  232. // or connected == false
  233. nodeFeed := feed.nodeFeeds[serverConn.host]
  234. if nodeFeed != nil && nodeFeed.connected == true {
  235. continue
  236. }
  237. var singleFeed *memcached.UprFeed
  238. var name string
  239. if feed.name == "" {
  240. name = "DefaultUprClient"
  241. } else {
  242. name = feed.name
  243. }
  244. singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
  245. if err != nil {
  246. logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
  247. feed.closeNodeFeeds()
  248. return
  249. }
  250. // add the node to the connection map
  251. feedInfo := &FeedInfo{
  252. uprFeed: singleFeed,
  253. connected: true,
  254. host: serverConn.host,
  255. quit: make(chan bool),
  256. }
  257. feed.nodeFeeds[serverConn.host] = feedInfo
  258. go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
  259. feed.wg.Add(1)
  260. nodeCount++
  261. }
  262. if nodeCount == 0 {
  263. return fmt.Errorf("No connection to bucket")
  264. }
  265. return nil
  266. }
  267. // Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
  268. func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
  269. singleFeed := nodeFeed.uprFeed
  270. defer func() {
  271. feed.wg.Done()
  272. if r := recover(); r != nil {
  273. //if feed is not closing, re-throw the panic
  274. if feed.outputClosed != true && feed.closing != true {
  275. panic(r)
  276. } else {
  277. logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")
  278. }
  279. }
  280. }()
  281. for {
  282. select {
  283. case <-nodeFeed.quit:
  284. nodeFeed.connected = false
  285. return
  286. case event, ok := <-singleFeed.C:
  287. if !ok {
  288. if singleFeed.Error != nil {
  289. logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
  290. }
  291. killSwitch <- true
  292. return
  293. }
  294. if feed.outputClosed == true {
  295. // someone closed the node feed
  296. logging.Infof("Node need closed, returning from forwardUprEvent")
  297. return
  298. }
  299. feed.output <- event
  300. if event.Status == gomemcached.NOT_MY_VBUCKET {
  301. logging.Infof(" Got a not my vbucket error !! ")
  302. if err := feed.bucket.Refresh(); err != nil {
  303. logging.Errorf("Unable to refresh bucket %s ", err.Error())
  304. feed.closeNodeFeeds()
  305. return
  306. }
  307. // this will only connect to nodes that are not connected or changed
  308. // user will have to reconnect the stream
  309. if err := feed.connectToNodes(); err != nil {
  310. logging.Errorf("Unable to connect to nodes %s", err.Error())
  311. return
  312. }
  313. }
  314. }
  315. }
  316. }
  317. func (feed *UprFeed) closeNodeFeeds() {
  318. for _, f := range feed.nodeFeeds {
  319. logging.Infof(" Sending close to forwardUprEvent ")
  320. close(f.quit)
  321. f.uprFeed.Close()
  322. }
  323. feed.nodeFeeds = nil
  324. }
  325. // Close a Upr feed.
  326. func (feed *UprFeed) Close() error {
  327. select {
  328. case <-feed.quit:
  329. return nil
  330. default:
  331. }
  332. feed.closing = true
  333. feed.closeNodeFeeds()
  334. close(feed.quit)
  335. feed.wg.Wait()
  336. if feed.outputClosed == false {
  337. feed.outputClosed = true
  338. close(feed.output)
  339. }
  340. return nil
  341. }