aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbase/go-couchbase/upr.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/couchbase/go-couchbase/upr.go')
-rw-r--r--vendor/github.com/couchbase/go-couchbase/upr.go399
1 files changed, 399 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/go-couchbase/upr.go b/vendor/github.com/couchbase/go-couchbase/upr.go
new file mode 100644
index 0000000000..844bf91510
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/upr.go
@@ -0,0 +1,399 @@
+package couchbase
+
+import (
+ "log"
+ "sync"
+ "time"
+
+ "fmt"
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/gomemcached/client"
+ "github.com/couchbase/goutils/logging"
+)
+
+// A UprFeed streams mutation events from a bucket.
+//
+// Events from the bucket can be read from the channel 'C'. Remember
+// to call Close() on it when you're done, unless its channel has
+// closed itself already.
+type UprFeed struct {
+ C <-chan *memcached.UprEvent
+
+ bucket *Bucket
+ nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes
+ output chan *memcached.UprEvent // Same as C but writeably-typed
+ outputClosed bool
+ quit chan bool
+ name string // name of this UPR feed
+ sequence uint32 // sequence number for this feed
+ connected bool
+ killSwitch chan bool
+ closing bool
+ wg sync.WaitGroup
+ dcp_buffer_size uint32
+ data_chan_size int
+}
+
+// UprFeed from a single connection
+type FeedInfo struct {
+ uprFeed *memcached.UprFeed // UPR feed handle
+ host string // hostname
+ connected bool // connected
+ quit chan bool // quit channel
+}
+
+type FailoverLog map[uint16]memcached.FailoverLog
+
+// GetFailoverLogs, get the failover logs for a set of vbucket ids
+func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {
+
+ // map vbids to their corresponding hosts
+ vbHostList := make(map[string][]uint16)
+ vbm := b.VBServerMap()
+ if len(vbm.VBucketMap) < len(vBuckets) {
+ return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
+ vbm.VBucketMap, vBuckets)
+ }
+
+ for _, vb := range vBuckets {
+ masterID := vbm.VBucketMap[vb][0]
+ master := b.getMasterNode(masterID)
+ if master == "" {
+ return nil, fmt.Errorf("No master found for vb %d", vb)
+ }
+
+ vbList := vbHostList[master]
+ if vbList == nil {
+ vbList = make([]uint16, 0)
+ }
+ vbList = append(vbList, vb)
+ vbHostList[master] = vbList
+ }
+
+ failoverLogMap := make(FailoverLog)
+ for _, serverConn := range b.getConnPools(false /* not already locked */) {
+
+ vbList := vbHostList[serverConn.host]
+ if vbList == nil {
+ continue
+ }
+
+ mc, err := serverConn.Get()
+ if err != nil {
+ logging.Infof("No Free connections for vblist %v", vbList)
+ return nil, fmt.Errorf("No Free connections for host %s",
+ serverConn.host)
+
+ }
+ // close the connection so that it doesn't get reused for upr data
+ // connection
+ defer mc.Close()
+ mc.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
+ failoverlogs, err := mc.UprGetFailoverLog(vbList)
+ if err != nil {
+ return nil, fmt.Errorf("Error getting failover log %s host %s",
+ err.Error(), serverConn.host)
+
+ }
+
+ for vb, log := range failoverlogs {
+ failoverLogMap[vb] = *log
+ }
+ }
+
+ return failoverLogMap, nil
+}
+
+func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
+ return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
+}
+
+// StartUprFeed creates and starts a new Upr feed
+// No data will be sent on the channel unless vbuckets streams are requested
+func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {
+
+ feed := &UprFeed{
+ bucket: b,
+ output: make(chan *memcached.UprEvent, data_chan_size),
+ quit: make(chan bool),
+ nodeFeeds: make(map[string]*FeedInfo, 0),
+ name: name,
+ sequence: sequence,
+ killSwitch: make(chan bool),
+ dcp_buffer_size: dcp_buffer_size,
+ data_chan_size: data_chan_size,
+ }
+
+ err := feed.connectToNodes()
+ if err != nil {
+ return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
+ }
+ feed.connected = true
+ go feed.run()
+
+ feed.C = feed.output
+ return feed, nil
+}
+
+// UprRequestStream starts a stream for a vb on a feed
+func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
+ vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
+
+ defer func() {
+ if r := recover(); r != nil {
+ log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
+ }
+ }()
+
+ vbm := feed.bucket.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
+ vb, vbm.VBucketMap)
+ }
+
+ if int(vb) >= len(vbm.VBucketMap) {
+ return fmt.Errorf("Invalid vbucket id %d", vb)
+ }
+
+ masterID := vbm.VBucketMap[vb][0]
+ master := feed.bucket.getMasterNode(masterID)
+ if master == "" {
+ return fmt.Errorf("Master node not found for vbucket %d", vb)
+ }
+ singleFeed := feed.nodeFeeds[master]
+ if singleFeed == nil {
+ return fmt.Errorf("UprFeed for this host not found")
+ }
+
+ if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
+ vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// UprCloseStream ends a vbucket stream.
+func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {
+
+ defer func() {
+ if r := recover(); r != nil {
+ log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
+ }
+ }()
+
+ vbm := feed.bucket.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
+ vb, vbm.VBucketMap)
+ }
+
+ if int(vb) >= len(vbm.VBucketMap) {
+ return fmt.Errorf("Invalid vbucket id %d", vb)
+ }
+
+ masterID := vbm.VBucketMap[vb][0]
+ master := feed.bucket.getMasterNode(masterID)
+ if master == "" {
+ return fmt.Errorf("Master node not found for vbucket %d", vb)
+ }
+ singleFeed := feed.nodeFeeds[master]
+ if singleFeed == nil {
+ return fmt.Errorf("UprFeed for this host not found")
+ }
+
+ if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Goroutine that runs the feed
+func (feed *UprFeed) run() {
+ retryInterval := initialRetryInterval
+ bucketOK := true
+ for {
+ // Connect to the UPR feed of each server node:
+ if bucketOK {
+ // Run until one of the sub-feeds fails:
+ select {
+ case <-feed.killSwitch:
+ case <-feed.quit:
+ return
+ }
+ //feed.closeNodeFeeds()
+ retryInterval = initialRetryInterval
+ }
+
+ if feed.closing == true {
+ // we have been asked to shut down
+ return
+ }
+
+ // On error, try to refresh the bucket in case the list of nodes changed:
+ logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
+ feed.bucket.Name, retryInterval)
+
+ if err := feed.bucket.Refresh(); err != nil {
+ // if we fail to refresh the bucket, exit the feed
+ // MB-14917
+ logging.Infof("Unable to refresh bucket %s ", err.Error())
+ close(feed.output)
+ feed.outputClosed = true
+ feed.closeNodeFeeds()
+ return
+ }
+
+ // this will only connect to nodes that are not connected or changed
+ // user will have to reconnect the stream
+ err := feed.connectToNodes()
+ if err != nil {
+ logging.Infof("Unable to connect to nodes..exit ")
+ close(feed.output)
+ feed.outputClosed = true
+ feed.closeNodeFeeds()
+ return
+ }
+ bucketOK = err == nil
+
+ select {
+ case <-time.After(retryInterval):
+ case <-feed.quit:
+ return
+ }
+ if retryInterval *= 2; retryInterval > maximumRetryInterval {
+ retryInterval = maximumRetryInterval
+ }
+ }
+}
+
+func (feed *UprFeed) connectToNodes() (err error) {
+ nodeCount := 0
+ for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
+
+ // this maybe a reconnection, so check if the connection to the node
+ // already exists. Connect only if the node is not found in the list
+ // or connected == false
+ nodeFeed := feed.nodeFeeds[serverConn.host]
+
+ if nodeFeed != nil && nodeFeed.connected == true {
+ continue
+ }
+
+ var singleFeed *memcached.UprFeed
+ var name string
+ if feed.name == "" {
+ name = "DefaultUprClient"
+ } else {
+ name = feed.name
+ }
+ singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
+ if err != nil {
+ logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
+ feed.closeNodeFeeds()
+ return
+ }
+ // add the node to the connection map
+ feedInfo := &FeedInfo{
+ uprFeed: singleFeed,
+ connected: true,
+ host: serverConn.host,
+ quit: make(chan bool),
+ }
+ feed.nodeFeeds[serverConn.host] = feedInfo
+ go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
+ feed.wg.Add(1)
+ nodeCount++
+ }
+ if nodeCount == 0 {
+ return fmt.Errorf("No connection to bucket")
+ }
+
+ return nil
+}
+
+// Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
+func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
+ singleFeed := nodeFeed.uprFeed
+
+ defer func() {
+ feed.wg.Done()
+ if r := recover(); r != nil {
+ //if feed is not closing, re-throw the panic
+ if feed.outputClosed != true && feed.closing != true {
+ panic(r)
+ } else {
+ logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")
+
+ }
+ }
+ }()
+
+ for {
+ select {
+ case <-nodeFeed.quit:
+ nodeFeed.connected = false
+ return
+
+ case event, ok := <-singleFeed.C:
+ if !ok {
+ if singleFeed.Error != nil {
+ logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
+ }
+ killSwitch <- true
+ return
+ }
+ if feed.outputClosed == true {
+ // someone closed the node feed
+ logging.Infof("Node need closed, returning from forwardUprEvent")
+ return
+ }
+ feed.output <- event
+ if event.Status == gomemcached.NOT_MY_VBUCKET {
+ logging.Infof(" Got a not my vbucket error !! ")
+ if err := feed.bucket.Refresh(); err != nil {
+ logging.Errorf("Unable to refresh bucket %s ", err.Error())
+ feed.closeNodeFeeds()
+ return
+ }
+ // this will only connect to nodes that are not connected or changed
+ // user will have to reconnect the stream
+ if err := feed.connectToNodes(); err != nil {
+ logging.Errorf("Unable to connect to nodes %s", err.Error())
+ return
+ }
+
+ }
+ }
+ }
+}
+
+func (feed *UprFeed) closeNodeFeeds() {
+ for _, f := range feed.nodeFeeds {
+ logging.Infof(" Sending close to forwardUprEvent ")
+ close(f.quit)
+ f.uprFeed.Close()
+ }
+ feed.nodeFeeds = nil
+}
+
+// Close a Upr feed.
+func (feed *UprFeed) Close() error {
+ select {
+ case <-feed.quit:
+ return nil
+ default:
+ }
+
+ feed.closing = true
+ feed.closeNodeFeeds()
+ close(feed.quit)
+
+ feed.wg.Wait()
+ if feed.outputClosed == false {
+ feed.outputClosed = true
+ close(feed.output)
+ }
+
+ return nil
+}