diff options
Diffstat (limited to 'vendor/github.com/couchbase/go-couchbase/upr.go')
-rw-r--r-- | vendor/github.com/couchbase/go-couchbase/upr.go | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/go-couchbase/upr.go b/vendor/github.com/couchbase/go-couchbase/upr.go new file mode 100644 index 0000000000..844bf91510 --- /dev/null +++ b/vendor/github.com/couchbase/go-couchbase/upr.go @@ -0,0 +1,399 @@ +package couchbase + +import ( + "log" + "sync" + "time" + + "fmt" + "github.com/couchbase/gomemcached" + "github.com/couchbase/gomemcached/client" + "github.com/couchbase/goutils/logging" +) + +// A UprFeed streams mutation events from a bucket. +// +// Events from the bucket can be read from the channel 'C'. Remember +// to call Close() on it when you're done, unless its channel has +// closed itself already. +type UprFeed struct { + C <-chan *memcached.UprEvent + + bucket *Bucket + nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes + output chan *memcached.UprEvent // Same as C but writeably-typed + outputClosed bool + quit chan bool + name string // name of this UPR feed + sequence uint32 // sequence number for this feed + connected bool + killSwitch chan bool + closing bool + wg sync.WaitGroup + dcp_buffer_size uint32 + data_chan_size int +} + +// UprFeed from a single connection +type FeedInfo struct { + uprFeed *memcached.UprFeed // UPR feed handle + host string // hostname + connected bool // connected + quit chan bool // quit channel +} + +type FailoverLog map[uint16]memcached.FailoverLog + +// GetFailoverLogs, get the failover logs for a set of vbucket ids +func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) { + + // map vbids to their corresponding hosts + vbHostList := make(map[string][]uint16) + vbm := b.VBServerMap() + if len(vbm.VBucketMap) < len(vBuckets) { + return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", + vbm.VBucketMap, vBuckets) + } + + for _, vb := range vBuckets { + masterID := vbm.VBucketMap[vb][0] + master := b.getMasterNode(masterID) + if master == "" { + return nil, fmt.Errorf("No master found for vb %d", vb) + } + + vbList := vbHostList[master] + if vbList == nil { + vbList = make([]uint16, 0) + } + vbList = append(vbList, vb) + vbHostList[master] = vbList + } + + failoverLogMap := make(FailoverLog) + for _, serverConn := range b.getConnPools(false /* not already locked */) { + + vbList := vbHostList[serverConn.host] + if vbList == nil { + continue + } + + mc, err := serverConn.Get() + if err != nil { + logging.Infof("No Free connections for vblist %v", vbList) + return nil, fmt.Errorf("No Free connections for host %s", + serverConn.host) + + } + // close the connection so that it doesn't get reused for upr data + // connection + defer mc.Close() + mc.SetDeadline(getDeadline(time.Time{}, DefaultTimeout)) + failoverlogs, err := mc.UprGetFailoverLog(vbList) + if err != nil { + return nil, fmt.Errorf("Error getting failover log %s host %s", + err.Error(), serverConn.host) + + } + + for vb, log := range failoverlogs { + failoverLogMap[vb] = *log + } + } + + return failoverLogMap, nil +} + +func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) { + return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE) +} + +// StartUprFeed creates and starts a new Upr feed +// No data will be sent on the channel unless vbuckets streams are requested +func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) { + + feed := &UprFeed{ + bucket: b, + output: make(chan *memcached.UprEvent, data_chan_size), + quit: make(chan bool), + nodeFeeds: make(map[string]*FeedInfo, 0), + name: name, + sequence: sequence, + killSwitch: make(chan bool), + dcp_buffer_size: dcp_buffer_size, + data_chan_size: data_chan_size, + } + + err := feed.connectToNodes() + if err != nil { + return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error()) + } + feed.connected = true + go feed.run() + + feed.C = feed.output + return feed, nil +} + +// UprRequestStream starts a stream for a vb on a feed +func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32, + vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { + + defer func() { + if r := recover(); r != nil { + log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket) + } + }() + + vbm := feed.bucket.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", + vb, vbm.VBucketMap) + } + + if int(vb) >= len(vbm.VBucketMap) { + return fmt.Errorf("Invalid vbucket id %d", vb) + } + + masterID := vbm.VBucketMap[vb][0] + master := feed.bucket.getMasterNode(masterID) + if master == "" { + return fmt.Errorf("Master node not found for vbucket %d", vb) + } + singleFeed := feed.nodeFeeds[master] + if singleFeed == nil { + return fmt.Errorf("UprFeed for this host not found") + } + + if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags, + vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil { + return err + } + + return nil +} + +// UprCloseStream ends a vbucket stream. +func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error { + + defer func() { + if r := recover(); r != nil { + log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket) + } + }() + + vbm := feed.bucket.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", + vb, vbm.VBucketMap) + } + + if int(vb) >= len(vbm.VBucketMap) { + return fmt.Errorf("Invalid vbucket id %d", vb) + } + + masterID := vbm.VBucketMap[vb][0] + master := feed.bucket.getMasterNode(masterID) + if master == "" { + return fmt.Errorf("Master node not found for vbucket %d", vb) + } + singleFeed := feed.nodeFeeds[master] + if singleFeed == nil { + return fmt.Errorf("UprFeed for this host not found") + } + + if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil { + return err + } + return nil +} + +// Goroutine that runs the feed +func (feed *UprFeed) run() { + retryInterval := initialRetryInterval + bucketOK := true + for { + // Connect to the UPR feed of each server node: + if bucketOK { + // Run until one of the sub-feeds fails: + select { + case <-feed.killSwitch: + case <-feed.quit: + return + } + //feed.closeNodeFeeds() + retryInterval = initialRetryInterval + } + + if feed.closing == true { + // we have been asked to shut down + return + } + + // On error, try to refresh the bucket in case the list of nodes changed: + logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v", + feed.bucket.Name, retryInterval) + + if err := feed.bucket.Refresh(); err != nil { + // if we fail to refresh the bucket, exit the feed + // MB-14917 + logging.Infof("Unable to refresh bucket %s ", err.Error()) + close(feed.output) + feed.outputClosed = true + feed.closeNodeFeeds() + return + } + + // this will only connect to nodes that are not connected or changed + // user will have to reconnect the stream + err := feed.connectToNodes() + if err != nil { + logging.Infof("Unable to connect to nodes..exit ") + close(feed.output) + feed.outputClosed = true + feed.closeNodeFeeds() + return + } + bucketOK = err == nil + + select { + case <-time.After(retryInterval): + case <-feed.quit: + return + } + if retryInterval *= 2; retryInterval > maximumRetryInterval { + retryInterval = maximumRetryInterval + } + } +} + +func (feed *UprFeed) connectToNodes() (err error) { + nodeCount := 0 + for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { + + // this maybe a reconnection, so check if the connection to the node + // already exists. Connect only if the node is not found in the list + // or connected == false + nodeFeed := feed.nodeFeeds[serverConn.host] + + if nodeFeed != nil && nodeFeed.connected == true { + continue + } + + var singleFeed *memcached.UprFeed + var name string + if feed.name == "" { + name = "DefaultUprClient" + } else { + name = feed.name + } + singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size) + if err != nil { + logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err) + feed.closeNodeFeeds() + return + } + // add the node to the connection map + feedInfo := &FeedInfo{ + uprFeed: singleFeed, + connected: true, + host: serverConn.host, + quit: make(chan bool), + } + feed.nodeFeeds[serverConn.host] = feedInfo + go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host) + feed.wg.Add(1) + nodeCount++ + } + if nodeCount == 0 { + return fmt.Errorf("No connection to bucket") + } + + return nil +} + +// Goroutine that forwards Upr events from a single node's feed to the aggregate feed. +func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) { + singleFeed := nodeFeed.uprFeed + + defer func() { + feed.wg.Done() + if r := recover(); r != nil { + //if feed is not closing, re-throw the panic + if feed.outputClosed != true && feed.closing != true { + panic(r) + } else { + logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully") + + } + } + }() + + for { + select { + case <-nodeFeed.quit: + nodeFeed.connected = false + return + + case event, ok := <-singleFeed.C: + if !ok { + if singleFeed.Error != nil { + logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error) + } + killSwitch <- true + return + } + if feed.outputClosed == true { + // someone closed the node feed + logging.Infof("Node need closed, returning from forwardUprEvent") + return + } + feed.output <- event + if event.Status == gomemcached.NOT_MY_VBUCKET { + logging.Infof(" Got a not my vbucket error !! ") + if err := feed.bucket.Refresh(); err != nil { + logging.Errorf("Unable to refresh bucket %s ", err.Error()) + feed.closeNodeFeeds() + return + } + // this will only connect to nodes that are not connected or changed + // user will have to reconnect the stream + if err := feed.connectToNodes(); err != nil { + logging.Errorf("Unable to connect to nodes %s", err.Error()) + return + } + + } + } + } +} + +func (feed *UprFeed) closeNodeFeeds() { + for _, f := range feed.nodeFeeds { + logging.Infof(" Sending close to forwardUprEvent ") + close(f.quit) + f.uprFeed.Close() + } + feed.nodeFeeds = nil +} + +// Close a Upr feed. +func (feed *UprFeed) Close() error { + select { + case <-feed.quit: + return nil + default: + } + + feed.closing = true + feed.closeNodeFeeds() + close(feed.quit) + + feed.wg.Wait() + if feed.outputClosed == false { + feed.outputClosed = true + close(feed.output) + } + + return nil +} |