aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbase/go-couchbase/observe.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/couchbase/go-couchbase/observe.go')
-rw-r--r--vendor/github.com/couchbase/go-couchbase/observe.go300
1 files changed, 300 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/go-couchbase/observe.go b/vendor/github.com/couchbase/go-couchbase/observe.go
new file mode 100644
index 0000000000..6e746f5a16
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/observe.go
@@ -0,0 +1,300 @@
+package couchbase
+
+import (
+ "fmt"
+ "github.com/couchbase/goutils/logging"
+ "sync"
+)
+
+type PersistTo uint8
+
+const (
+ PersistNone = PersistTo(0x00)
+ PersistMaster = PersistTo(0x01)
+ PersistOne = PersistTo(0x02)
+ PersistTwo = PersistTo(0x03)
+ PersistThree = PersistTo(0x04)
+ PersistFour = PersistTo(0x05)
+)
+
+type ObserveTo uint8
+
+const (
+ ObserveNone = ObserveTo(0x00)
+ ObserveReplicateOne = ObserveTo(0x01)
+ ObserveReplicateTwo = ObserveTo(0x02)
+ ObserveReplicateThree = ObserveTo(0x03)
+ ObserveReplicateFour = ObserveTo(0x04)
+)
+
+type JobType uint8
+
+const (
+ OBSERVE = JobType(0x00)
+ PERSIST = JobType(0x01)
+)
+
+type ObservePersistJob struct {
+ vb uint16
+ vbuuid uint64
+ hostname string
+ jobType JobType
+ failover uint8
+ lastPersistedSeqNo uint64
+ currentSeqNo uint64
+ resultChan chan *ObservePersistJob
+ errorChan chan *OPErrResponse
+}
+
+type OPErrResponse struct {
+ vb uint16
+ vbuuid uint64
+ err error
+ job *ObservePersistJob
+}
+
+var ObservePersistPool = NewPool(1024)
+var OPJobChan = make(chan *ObservePersistJob, 1024)
+var OPJobDone = make(chan bool)
+
+var wg sync.WaitGroup
+
+func (b *Bucket) StartOPPollers(maxWorkers int) {
+
+ for i := 0; i < maxWorkers; i++ {
+ go b.OPJobPoll()
+ wg.Add(1)
+ }
+ wg.Wait()
+}
+
+func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
+
+ numNodes := len(b.Nodes())
+ if int(nPersist) > numNodes || int(nObserve) > numNodes {
+ return fmt.Errorf("Not enough healthy nodes in the cluster")
+ }
+
+ if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
+ return fmt.Errorf("Not enough replicas in the cluster")
+ }
+
+ if EnableMutationToken == false {
+ return fmt.Errorf("Mutation Tokens not enabled ")
+ }
+
+ b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
+ return
+}
+
+func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
+ b.RLock()
+ ds := b.ds
+ b.RUnlock()
+
+ if ds == nil {
+ return
+ }
+
+ nj := 0 // total number of jobs
+ resultChan := make(chan *ObservePersistJob, 10)
+ errChan := make(chan *OPErrResponse, 10)
+
+ nodes := b.GetNodeList(vb)
+ if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
+ return fmt.Errorf("Not enough healthy nodes in the cluster"), false
+ }
+
+ logging.Infof("Node list %v", nodes)
+
+ if ds.Observe >= ObserveReplicateOne {
+ // create a job for each host
+ for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
+ opJob := ObservePersistPool.Get()
+ opJob.vb = vb
+ opJob.vbuuid = vbuuid
+ opJob.jobType = OBSERVE
+ opJob.hostname = nodes[i]
+ opJob.resultChan = resultChan
+ opJob.errorChan = errChan
+
+ OPJobChan <- opJob
+ nj++
+
+ }
+ }
+
+ if ds.Persist >= PersistMaster {
+ for i := PersistMaster; i < ds.Persist+1; i++ {
+ opJob := ObservePersistPool.Get()
+ opJob.vb = vb
+ opJob.vbuuid = vbuuid
+ opJob.jobType = PERSIST
+ opJob.hostname = nodes[i]
+ opJob.resultChan = resultChan
+ opJob.errorChan = errChan
+
+ OPJobChan <- opJob
+ nj++
+
+ }
+ }
+
+ ok := true
+ for ok {
+ select {
+ case res := <-resultChan:
+ jobDone := false
+ if res.failover == 0 {
+ // no failover
+ if res.jobType == PERSIST {
+ if res.lastPersistedSeqNo >= seqNo {
+ jobDone = true
+ }
+
+ } else {
+ if res.currentSeqNo >= seqNo {
+ jobDone = true
+ }
+ }
+
+ if jobDone == true {
+ nj--
+ ObservePersistPool.Put(res)
+ } else {
+ // requeue this job
+ OPJobChan <- res
+ }
+
+ } else {
+ // Not currently handling failover scenarios TODO
+ nj--
+ ObservePersistPool.Put(res)
+ failover = true
+ }
+
+ if nj == 0 {
+ // done with all the jobs
+ ok = false
+ close(resultChan)
+ close(errChan)
+ }
+
+ case Err := <-errChan:
+ logging.Errorf("Error in Observe/Persist %v", Err.err)
+ err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
+ nj--
+ ObservePersistPool.Put(Err.job)
+ if nj == 0 {
+ close(resultChan)
+ close(errChan)
+ ok = false
+ }
+ }
+ }
+
+ return
+}
+
+func (b *Bucket) OPJobPoll() {
+
+ ok := true
+ for ok == true {
+ select {
+ case job := <-OPJobChan:
+ pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
+ if pool == nil {
+ errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
+ errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
+ errRes.job = job
+ job.errorChan <- errRes
+ continue
+ }
+ conn, err := pool.Get()
+ if err != nil {
+ errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
+ errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
+ errRes.job = job
+ job.errorChan <- errRes
+ continue
+ }
+
+ res, err := conn.ObserveSeq(job.vb, job.vbuuid)
+ if err != nil {
+ errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
+ errRes.err = fmt.Errorf("Command failed %v", err)
+ errRes.job = job
+ job.errorChan <- errRes
+ continue
+
+ }
+ pool.Return(conn)
+ job.lastPersistedSeqNo = res.LastPersistedSeqNo
+ job.currentSeqNo = res.CurrentSeqNo
+ job.failover = res.Failover
+
+ job.resultChan <- job
+ case <-OPJobDone:
+ logging.Infof("Observe Persist Poller exitting")
+ ok = false
+ }
+ }
+ wg.Done()
+}
+
+func (b *Bucket) GetNodeList(vb uint16) []string {
+
+ vbm := b.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ logging.Infof("vbmap smaller than vblist")
+ return nil
+ }
+
+ nodes := make([]string, len(vbm.VBucketMap[vb]))
+ for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
+ n := vbm.VBucketMap[vb][i]
+ if n < 0 {
+ continue
+ }
+
+ node := b.getMasterNode(n)
+ if len(node) > 1 {
+ nodes[i] = node
+ }
+ continue
+
+ }
+ return nodes
+}
+
+//pool of ObservePersist Jobs
+type OPpool struct {
+ pool chan *ObservePersistJob
+}
+
+// NewPool creates a new pool of jobs
+func NewPool(max int) *OPpool {
+ return &OPpool{
+ pool: make(chan *ObservePersistJob, max),
+ }
+}
+
+// Borrow a Client from the pool.
+func (p *OPpool) Get() *ObservePersistJob {
+ var o *ObservePersistJob
+ select {
+ case o = <-p.pool:
+ default:
+ o = &ObservePersistJob{}
+ }
+ return o
+}
+
+// Return returns a Client to the pool.
+func (p *OPpool) Put(o *ObservePersistJob) {
+ select {
+ case p.pool <- o:
+ default:
+ // let it go, let it go...
+ }
+}