diff options
Diffstat (limited to 'vendor/github.com/couchbase/go-couchbase/observe.go')
-rw-r--r-- | vendor/github.com/couchbase/go-couchbase/observe.go | 300 |
1 files changed, 300 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/go-couchbase/observe.go b/vendor/github.com/couchbase/go-couchbase/observe.go new file mode 100644 index 0000000000..6e746f5a16 --- /dev/null +++ b/vendor/github.com/couchbase/go-couchbase/observe.go @@ -0,0 +1,300 @@ +package couchbase + +import ( + "fmt" + "github.com/couchbase/goutils/logging" + "sync" +) + +type PersistTo uint8 + +const ( + PersistNone = PersistTo(0x00) + PersistMaster = PersistTo(0x01) + PersistOne = PersistTo(0x02) + PersistTwo = PersistTo(0x03) + PersistThree = PersistTo(0x04) + PersistFour = PersistTo(0x05) +) + +type ObserveTo uint8 + +const ( + ObserveNone = ObserveTo(0x00) + ObserveReplicateOne = ObserveTo(0x01) + ObserveReplicateTwo = ObserveTo(0x02) + ObserveReplicateThree = ObserveTo(0x03) + ObserveReplicateFour = ObserveTo(0x04) +) + +type JobType uint8 + +const ( + OBSERVE = JobType(0x00) + PERSIST = JobType(0x01) +) + +type ObservePersistJob struct { + vb uint16 + vbuuid uint64 + hostname string + jobType JobType + failover uint8 + lastPersistedSeqNo uint64 + currentSeqNo uint64 + resultChan chan *ObservePersistJob + errorChan chan *OPErrResponse +} + +type OPErrResponse struct { + vb uint16 + vbuuid uint64 + err error + job *ObservePersistJob +} + +var ObservePersistPool = NewPool(1024) +var OPJobChan = make(chan *ObservePersistJob, 1024) +var OPJobDone = make(chan bool) + +var wg sync.WaitGroup + +func (b *Bucket) StartOPPollers(maxWorkers int) { + + for i := 0; i < maxWorkers; i++ { + go b.OPJobPoll() + wg.Add(1) + } + wg.Wait() +} + +func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) { + + numNodes := len(b.Nodes()) + if int(nPersist) > numNodes || int(nObserve) > numNodes { + return fmt.Errorf("Not enough healthy nodes in the cluster") + } + + if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas { + return fmt.Errorf("Not enough replicas in the cluster") + } + + if EnableMutationToken == false { + return fmt.Errorf("Mutation Tokens not enabled ") + } + + b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)} + return +} + +func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) { + b.RLock() + ds := b.ds + b.RUnlock() + + if ds == nil { + return + } + + nj := 0 // total number of jobs + resultChan := make(chan *ObservePersistJob, 10) + errChan := make(chan *OPErrResponse, 10) + + nodes := b.GetNodeList(vb) + if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) { + return fmt.Errorf("Not enough healthy nodes in the cluster"), false + } + + logging.Infof("Node list %v", nodes) + + if ds.Observe >= ObserveReplicateOne { + // create a job for each host + for i := ObserveReplicateOne; i < ds.Observe+1; i++ { + opJob := ObservePersistPool.Get() + opJob.vb = vb + opJob.vbuuid = vbuuid + opJob.jobType = OBSERVE + opJob.hostname = nodes[i] + opJob.resultChan = resultChan + opJob.errorChan = errChan + + OPJobChan <- opJob + nj++ + + } + } + + if ds.Persist >= PersistMaster { + for i := PersistMaster; i < ds.Persist+1; i++ { + opJob := ObservePersistPool.Get() + opJob.vb = vb + opJob.vbuuid = vbuuid + opJob.jobType = PERSIST + opJob.hostname = nodes[i] + opJob.resultChan = resultChan + opJob.errorChan = errChan + + OPJobChan <- opJob + nj++ + + } + } + + ok := true + for ok { + select { + case res := <-resultChan: + jobDone := false + if res.failover == 0 { + // no failover + if res.jobType == PERSIST { + if res.lastPersistedSeqNo >= seqNo { + jobDone = true + } + + } else { + if res.currentSeqNo >= seqNo { + jobDone = true + } + } + + if jobDone == true { + nj-- + ObservePersistPool.Put(res) + } else { + // requeue this job + OPJobChan <- res + } + + } else { + // Not currently handling failover scenarios TODO + nj-- + ObservePersistPool.Put(res) + failover = true + } + + if nj == 0 { + // done with all the jobs + ok = false + close(resultChan) + close(errChan) + } + + case Err := <-errChan: + logging.Errorf("Error in Observe/Persist %v", Err.err) + err = fmt.Errorf("Error in Observe/Persist job %v", Err.err) + nj-- + ObservePersistPool.Put(Err.job) + if nj == 0 { + close(resultChan) + close(errChan) + ok = false + } + } + } + + return +} + +func (b *Bucket) OPJobPoll() { + + ok := true + for ok == true { + select { + case job := <-OPJobChan: + pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */) + if pool == nil { + errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} + errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname) + errRes.job = job + job.errorChan <- errRes + continue + } + conn, err := pool.Get() + if err != nil { + errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} + errRes.err = fmt.Errorf("Unable to get connection from pool %v", err) + errRes.job = job + job.errorChan <- errRes + continue + } + + res, err := conn.ObserveSeq(job.vb, job.vbuuid) + if err != nil { + errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} + errRes.err = fmt.Errorf("Command failed %v", err) + errRes.job = job + job.errorChan <- errRes + continue + + } + pool.Return(conn) + job.lastPersistedSeqNo = res.LastPersistedSeqNo + job.currentSeqNo = res.CurrentSeqNo + job.failover = res.Failover + + job.resultChan <- job + case <-OPJobDone: + logging.Infof("Observe Persist Poller exitting") + ok = false + } + } + wg.Done() +} + +func (b *Bucket) GetNodeList(vb uint16) []string { + + vbm := b.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + logging.Infof("vbmap smaller than vblist") + return nil + } + + nodes := make([]string, len(vbm.VBucketMap[vb])) + for i := 0; i < len(vbm.VBucketMap[vb]); i++ { + n := vbm.VBucketMap[vb][i] + if n < 0 { + continue + } + + node := b.getMasterNode(n) + if len(node) > 1 { + nodes[i] = node + } + continue + + } + return nodes +} + +//pool of ObservePersist Jobs +type OPpool struct { + pool chan *ObservePersistJob +} + +// NewPool creates a new pool of jobs +func NewPool(max int) *OPpool { + return &OPpool{ + pool: make(chan *ObservePersistJob, max), + } +} + +// Borrow a Client from the pool. +func (p *OPpool) Get() *ObservePersistJob { + var o *ObservePersistJob + select { + case o = <-p.pool: + default: + o = &ObservePersistJob{} + } + return o +} + +// Return returns a Client to the pool. +func (p *OPpool) Put(o *ObservePersistJob) { + select { + case p.pool <- o: + default: + // let it go, let it go... + } +} |