diff options
Diffstat (limited to 'vendor/github.com/couchbaselabs/go-couchbase')
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/LICENSE | 19 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/audit.go | 32 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/client.go | 1385 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go | 387 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/ddocs.go | 288 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/observe.go | 300 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/pools.go | 1282 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/streaming.go | 209 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/tap.go | 143 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/upr.go | 398 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/users.go | 119 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/util.go | 49 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/vbmap.go | 77 | ||||
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/views.go | 231 |
14 files changed, 4919 insertions, 0 deletions
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/LICENSE b/vendor/github.com/couchbaselabs/go-couchbase/LICENSE new file mode 100644 index 0000000000..0b23ef358e --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2013 Couchbase, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/couchbaselabs/go-couchbase/audit.go b/vendor/github.com/couchbaselabs/go-couchbase/audit.go new file mode 100644 index 0000000000..3db7d9f9ff --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/audit.go @@ -0,0 +1,32 @@ +package couchbase + +import () + +// Sample data: +// {"disabled":["12333", "22244"],"uid":"132492431","auditdEnabled":true, +// "disabledUsers":[{"name":"bill","domain":"local"},{"name":"bob","domain":"local"}], +// "logPath":"/Users/johanlarson/Library/Application Support/Couchbase/var/lib/couchbase/logs", +// "rotateInterval":86400,"rotateSize":20971520} +type AuditSpec struct { + Disabled []uint32 `json:"disabled"` + Uid string `json:"uid"` + AuditdEnabled bool `json:"auditdEnabled` + DisabledUsers []AuditUser `json:"disabledUsers"` + LogPath string `json:"logPath"` + RotateInterval int64 `json:"rotateInterval"` + RotateSize int64 `json:"rotateSize"` +} + +type AuditUser struct { + Name string `json:"name"` + Domain string `json:"domain"` +} + +func (c *Client) GetAuditSpec() (*AuditSpec, error) { + ret := &AuditSpec{} + err := c.parseURLResponse("/settings/audit", ret) + if err != nil { + return nil, err + } + return ret, nil +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/client.go b/vendor/github.com/couchbaselabs/go-couchbase/client.go new file mode 100644 index 0000000000..43c3382960 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/client.go @@ -0,0 +1,1385 @@ +/* +Package couchbase provides a smart client for go. + +Usage: + + client, err := couchbase.Connect("http://myserver:8091/") + handleError(err) + pool, err := client.GetPool("default") + handleError(err) + bucket, err := pool.GetBucket("MyAwesomeBucket") + handleError(err) + ... + +or a shortcut for the bucket directly + + bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default") + +in any case, you can specify authentication credentials using +standard URL userinfo syntax: + + b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/", + "default", "bucket") +*/ +package couchbase + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "runtime" + "strconv" + "strings" + "sync" + "time" + "unsafe" + + "github.com/couchbase/gomemcached" + "github.com/couchbase/gomemcached/client" // package name is 'memcached' + "github.com/couchbase/goutils/logging" +) + +// Mutation Token +type MutationToken struct { + VBid uint16 // vbucket id + Guard uint64 // vbuuid + Value uint64 // sequence number +} + +// Maximum number of times to retry a chunk of a bulk get on error. +var MaxBulkRetries = 5000 +var backOffDuration time.Duration = 100 * time.Millisecond +var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s) + +// If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call +// takes longer than that. +var SlowServerCallWarningThreshold time.Duration + +func slowLog(startTime time.Time, format string, args ...interface{}) { + if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold { + pc, _, _, _ := runtime.Caller(2) + caller := runtime.FuncForPC(pc).Name() + logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...) + } +} + +// Return true if error is KEY_ENOENT. Required by cbq-engine +func IsKeyEExistsError(err error) bool { + + res, ok := err.(*gomemcached.MCResponse) + if ok && res.Status == gomemcached.KEY_EEXISTS { + return true + } + + return false +} + +// Return true if error is KEY_ENOENT. Required by cbq-engine +func IsKeyNoEntError(err error) bool { + + res, ok := err.(*gomemcached.MCResponse) + if ok && res.Status == gomemcached.KEY_ENOENT { + return true + } + + return false +} + +// Return true if error suggests a bucket refresh is required. Required by cbq-engine +func IsRefreshRequired(err error) bool { + + res, ok := err.(*gomemcached.MCResponse) + if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) { + return true + } + + return false +} + +// ClientOpCallback is called for each invocation of Do. +var ClientOpCallback func(opname, k string, start time.Time, err error) + +// Do executes a function on a memcached connection to the node owning key "k" +// +// Note that this automatically handles transient errors by replaying +// your function on a "not-my-vbucket" error, so don't assume +// your command will only be executed only once. +func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) { + return b.Do2(k, f, true) +} + +func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) { + if SlowServerCallWarningThreshold > 0 { + defer slowLog(time.Now(), "call to Do(%q)", k) + } + + vb := b.VBHash(k) + maxTries := len(b.Nodes()) * 2 + for i := 0; i < maxTries; i++ { + conn, pool, err := b.getConnectionToVBucket(vb) + if err != nil { + if isConnError(err) && backOff(i, maxTries, backOffDuration, true) { + b.Refresh() + continue + } + return err + } + + if deadline && DefaultTimeout > 0 { + conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout)) + err = f(conn, uint16(vb)) + conn.SetDeadline(noDeadline) + } else { + err = f(conn, uint16(vb)) + } + + var retry bool + discard := isOutOfBoundsError(err) + + // MB-30967 / MB-31001 implement back off for transient errors + if resp, ok := err.(*gomemcached.MCResponse); ok { + switch resp.Status { + case gomemcached.NOT_MY_VBUCKET: + b.Refresh() + // MB-28842: in case of NMVB, check if the node is still part of the map + // and ditch the connection if it isn't. + discard = b.checkVBmap(pool.Node()) + retry = true + case gomemcached.NOT_SUPPORTED: + discard = true + retry = true + case gomemcached.ENOMEM: + fallthrough + case gomemcached.TMPFAIL: + retry = backOff(i, maxTries, backOffDuration, true) + default: + retry = false + } + } else if err != nil && isConnError(err) && backOff(i, maxTries, backOffDuration, true) { + retry = true + } + + if discard { + pool.Discard(conn) + } else { + pool.Return(conn) + } + + if !retry { + return err + } + } + + return fmt.Errorf("unable to complete action after %v attemps", maxTries) +} + +type GatheredStats struct { + Server string + Stats map[string]string + Err error +} + +func getStatsParallel(sn string, b *Bucket, offset int, which string, + ch chan<- GatheredStats) { + pool := b.getConnPool(offset) + var gatheredStats GatheredStats + + conn, err := pool.Get() + defer func() { + pool.Return(conn) + ch <- gatheredStats + }() + + if err != nil { + gatheredStats = GatheredStats{Server: sn, Err: err} + } else { + sm, err := conn.StatsMap(which) + gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err} + } +} + +// GetStats gets a set of stats from all servers. +// +// Returns a map of server ID -> map of stat key to map value. +func (b *Bucket) GetStats(which string) map[string]map[string]string { + rv := map[string]map[string]string{} + for server, gs := range b.GatherStats(which) { + if len(gs.Stats) > 0 { + rv[server] = gs.Stats + } + } + return rv +} + +// GatherStats returns a map of server ID -> GatheredStats from all servers. +func (b *Bucket) GatherStats(which string) map[string]GatheredStats { + vsm := b.VBServerMap() + if vsm.ServerList == nil { + return nil + } + + // Go grab all the things at once. + ch := make(chan GatheredStats, len(vsm.ServerList)) + for i, sn := range vsm.ServerList { + go getStatsParallel(sn, b, i, which, ch) + } + + // Gather the results + rv := map[string]GatheredStats{} + for range vsm.ServerList { + gs := <-ch + rv[gs.Server] = gs + } + return rv +} + +// Get bucket count through the bucket stats +func (b *Bucket) GetCount(refresh bool) (count int64, err error) { + if refresh { + b.Refresh() + } + + var cnt int64 + for _, gs := range b.GatherStats("") { + if len(gs.Stats) > 0 { + cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64) + if err != nil { + return 0, err + } + count += cnt + } + } + + return count, nil +} + +func isAuthError(err error) bool { + estr := err.Error() + return strings.Contains(estr, "Auth failure") +} + +func IsReadTimeOutError(err error) bool { + estr := err.Error() + return strings.Contains(estr, "read tcp") || + strings.Contains(estr, "i/o timeout") +} + +func isTimeoutError(err error) bool { + estr := err.Error() + return strings.Contains(estr, "i/o timeout") || + strings.Contains(estr, "connection timed out") || + strings.Contains(estr, "no route to host") +} + +// Errors that are not considered fatal for our fetch loop +func isConnError(err error) bool { + if err == io.EOF { + return true + } + estr := err.Error() + return strings.Contains(estr, "broken pipe") || + strings.Contains(estr, "connection reset") || + strings.Contains(estr, "connection refused") || + strings.Contains(estr, "connection pool is closed") +} + +func isOutOfBoundsError(err error) bool { + return err != nil && strings.Contains(err.Error(), "Out of Bounds error") + +} + +func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time { + if reqDeadline.IsZero() && duration > 0 { + return time.Now().Add(duration) + } + return reqDeadline +} + +func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool { + if attempt < maxAttempts { + // 0th attempt return immediately + if attempt > 0 { + if exponential { + duration = time.Duration(attempt) * duration + } + time.Sleep(duration) + } + return true + } + + return false +} + +func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time, + ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string, + eStatus *errorStatus) { + if SlowServerCallWarningThreshold > 0 { + defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys)) + } + + rv := _STRING_MCRESPONSE_POOL.Get() + attempts := 0 + backOffAttempts := 0 + done := false + bname := b.Name + for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ { + + if len(b.VBServerMap().VBucketMap) < int(vb) { + //fatal + err := fmt.Errorf("vbmap smaller than requested for %v", bname) + logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap)) + ech <- err + return + } + + masterID := b.VBServerMap().VBucketMap[vb][0] + + if masterID < 0 { + // fatal + err := fmt.Errorf("No master node available for %v vb %d", bname, vb) + logging.Errorf("%v", err.Error()) + ech <- err + return + } + + // This stack frame exists to ensure we can clean up + // connection at a reasonable time. + err := func() error { + pool := b.getConnPool(masterID) + conn, err := pool.Get() + if err != nil { + if isAuthError(err) || isTimeoutError(err) { + logging.Errorf("Fatal Error %v : %v", bname, err) + ech <- err + return err + } else if isConnError(err) { + if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { + logging.Errorf("Connection Error %v : %v", bname, err) + ech <- err + return err + } + b.Refresh() + backOffAttempts++ + } + logging.Infof("Pool Get returned %v: %v", bname, err) + // retry + return nil + } + + conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) + err = conn.GetBulk(vb, keys, rv, subPaths) + conn.SetDeadline(noDeadline) + + discard := false + defer func() { + if discard { + pool.Discard(conn) + } else { + pool.Return(conn) + } + }() + + switch err.(type) { + case *gomemcached.MCResponse: + notSMaxTries := len(b.Nodes()) * 2 + st := err.(*gomemcached.MCResponse).Status + if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) { + b.Refresh() + discard = b.checkVBmap(pool.Node()) + return nil // retry + } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED { + if (attempts % (MaxBulkRetries / 100)) == 0 { + logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)", + err.Error(), bname, vb, keys) + } + return nil // retry + } else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { + // MB-30967 / MB-31001 use backoff for TMPFAIL too + backOffAttempts++ + logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)", + err.Error(), bname, vb, keys) + return nil // retry + } + ech <- err + return err + case error: + if isOutOfBoundsError(err) { + // We got an out of bound error, retry the operation + discard = true + return nil + } else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { + backOffAttempts++ + logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)", + err.Error(), bname, vb, keys) + discard = true + b.Refresh() + return nil // retry + } + ech <- err + ch <- rv + return err + } + + done = true + return nil + }() + + if err != nil { + return + } + } + + if attempts >= MaxBulkRetries { + err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys) + logging.Errorf("%v", err.Error()) + ech <- err + } + + ch <- rv +} + +type errorStatus struct { + errStatus bool +} + +type vbBulkGet struct { + b *Bucket + ch chan<- map[string]*gomemcached.MCResponse + ech chan<- error + k uint16 + keys []string + reqDeadline time.Time + wg *sync.WaitGroup + subPaths []string + groupError *errorStatus +} + +const _NUM_CHANNELS = 5 + +var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2 +var DefaultDialTimeout = time.Duration(0) +var DefaultTimeout = time.Duration(0) +var noDeadline = time.Time{} + +// Buffer 4k requests per worker +var _VB_BULK_GET_CHANNELS []chan *vbBulkGet + +func InitBulkGet() { + + DefaultDialTimeout = 20 * time.Second + DefaultTimeout = 120 * time.Second + + memcached.SetDefaultDialTimeout(DefaultDialTimeout) + + _VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS) + + for i := 0; i < _NUM_CHANNELS; i++ { + channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS) + _VB_BULK_GET_CHANNELS[i] = channel + + for j := 0; j < _NUM_CHANNEL_WORKERS; j++ { + go vbBulkGetWorker(channel) + } + } +} + +func vbBulkGetWorker(ch chan *vbBulkGet) { + defer func() { + // Workers cannot panic and die + recover() + go vbBulkGetWorker(ch) + }() + + for vbg := range ch { + vbDoBulkGet(vbg) + } +} + +func vbDoBulkGet(vbg *vbBulkGet) { + defer vbg.wg.Done() + defer func() { + // Workers cannot panic and die + recover() + }() + vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError) +} + +var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.") + +func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time, + ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string, + eStatus *errorStatus) { + + defer close(ch) + defer close(ech) + + wg := &sync.WaitGroup{} + + for k, keys := range kdm { + + // GetBulk() group has error donot Queue items for this group + if eStatus.errStatus { + break + } + + vbg := &vbBulkGet{ + b: b, + ch: ch, + ech: ech, + k: k, + keys: keys, + reqDeadline: reqDeadline, + wg: wg, + subPaths: subPaths, + groupError: eStatus, + } + + wg.Add(1) + + // Random int + // Right shift to avoid 8-byte alignment, and take low bits + c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS + + select { + case _VB_BULK_GET_CHANNELS[c] <- vbg: + // No-op + default: + // Buffer full, abandon the bulk get + ech <- _ERR_CHAN_FULL + wg.Add(-1) + } + } + + // Wait for my vb bulk gets + wg.Wait() +} + +type multiError []error + +func (m multiError) Error() string { + if len(m) == 0 { + panic("Error of none") + } + + return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error()) +} + +// Convert a stream of errors from ech into a multiError (or nil) and +// send down eout. +// +// At least one send is guaranteed on eout, but two is possible, so +// buffer the out channel appropriately. +func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) { + defer func() { eout <- nil }() + var errs multiError + for e := range ech { + if !eStatus.errStatus && !IsKeyNoEntError(e) { + eStatus.errStatus = true + } + + errs = append(errs, e) + } + + if len(errs) > 0 { + eout <- errs + } +} + +// Fetches multiple keys concurrently, with []byte values +// +// This is a wrapper around GetBulk which converts all values returned +// by GetBulk from raw memcached responses into []byte slices. +// Returns one document for duplicate keys +func (b *Bucket) GetBulkRaw(keys []string) (map[string][]byte, error) { + + resp, eout := b.getBulk(keys, noDeadline, nil) + + rv := make(map[string][]byte, len(keys)) + for k, av := range resp { + rv[k] = av.Body + } + + b.ReleaseGetBulkPools(resp) + return rv, eout + +} + +// GetBulk fetches multiple keys concurrently. +// +// Unlike more convenient GETs, the entire response is returned in the +// map array for each key. Keys that were not found will not be included in +// the map. + +func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) { + return b.getBulk(keys, reqDeadline, subPaths) +} + +func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) { + _STRING_MCRESPONSE_POOL.Put(rv) +} + +func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) { + kdm := _VB_STRING_POOL.Get() + defer _VB_STRING_POOL.Put(kdm) + for _, k := range keys { + if k != "" { + vb := uint16(b.VBHash(k)) + a, ok1 := kdm[vb] + if !ok1 { + a = _STRING_POOL.Get() + } + kdm[vb] = append(a, k) + } + } + + eout := make(chan error, 2) + groupErrorStatus := &errorStatus{} + + // processBulkGet will own both of these channels and + // guarantee they're closed before it returns. + ch := make(chan map[string]*gomemcached.MCResponse) + ech := make(chan error) + + go errorCollector(ech, eout, groupErrorStatus) + go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus) + + var rv map[string]*gomemcached.MCResponse + + for m := range ch { + if rv == nil { + rv = m + continue + } + + for k, v := range m { + rv[k] = v + } + _STRING_MCRESPONSE_POOL.Put(m) + } + + return rv, <-eout +} + +// WriteOptions is the set of option flags availble for the Write +// method. They are ORed together to specify the desired request. +type WriteOptions int + +const ( + // Raw specifies that the value is raw []byte or nil; don't + // JSON-encode it. + Raw = WriteOptions(1 << iota) + // AddOnly indicates an item should only be written if it + // doesn't exist, otherwise ErrKeyExists is returned. + AddOnly + // Persist causes the operation to block until the server + // confirms the item is persisted. + Persist + // Indexable causes the operation to block until it's availble via the index. + Indexable + // Append indicates the given value should be appended to the + // existing value for the given key. + Append +) + +var optNames = []struct { + opt WriteOptions + name string +}{ + {Raw, "raw"}, + {AddOnly, "addonly"}, {Persist, "persist"}, + {Indexable, "indexable"}, {Append, "append"}, +} + +// String representation of WriteOptions +func (w WriteOptions) String() string { + f := []string{} + for _, on := range optNames { + if w&on.opt != 0 { + f = append(f, on.name) + w &= ^on.opt + } + } + if len(f) == 0 || w != 0 { + f = append(f, fmt.Sprintf("0x%x", int(w))) + } + return strings.Join(f, "|") +} + +// Error returned from Write with AddOnly flag, when key already exists in the bucket. +var ErrKeyExists = errors.New("key exists") + +// General-purpose value setter. +// +// The Set, Add and Delete methods are just wrappers around this. The +// interpretation of `v` depends on whether the `Raw` option is +// given. If it is, v must be a byte array or nil. (A nil value causes +// a delete.) If `Raw` is not given, `v` will be marshaled as JSON +// before being written. It must be JSON-marshalable and it must not +// be nil. +func (b *Bucket) Write(k string, flags, exp int, v interface{}, + opt WriteOptions) (err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { + ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) + }(time.Now()) + } + + var data []byte + if opt&Raw == 0 { + data, err = json.Marshal(v) + if err != nil { + return err + } + } else if v != nil { + data = v.([]byte) + } + + var res *gomemcached.MCResponse + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + if opt&AddOnly != 0 { + res, err = memcached.UnwrapMemcachedError( + mc.Add(vb, k, flags, exp, data)) + if err == nil && res.Status != gomemcached.SUCCESS { + if res.Status == gomemcached.KEY_EEXISTS { + err = ErrKeyExists + } else { + err = res + } + } + } else if opt&Append != 0 { + res, err = mc.Append(vb, k, data) + } else if data == nil { + res, err = mc.Del(vb, k) + } else { + res, err = mc.Set(vb, k, flags, exp, data) + } + + return err + }) + + if err == nil && (opt&(Persist|Indexable) != 0) { + err = b.WaitForPersistence(k, res.Cas, data == nil) + } + + return err +} + +func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{}, + opt WriteOptions) (mt *MutationToken, err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { + ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err) + }(time.Now()) + } + + var data []byte + if opt&Raw == 0 { + data, err = json.Marshal(v) + if err != nil { + return nil, err + } + } else if v != nil { + data = v.([]byte) + } + + var res *gomemcached.MCResponse + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + if opt&AddOnly != 0 { + res, err = memcached.UnwrapMemcachedError( + mc.Add(vb, k, flags, exp, data)) + if err == nil && res.Status != gomemcached.SUCCESS { + if res.Status == gomemcached.KEY_EEXISTS { + err = ErrKeyExists + } else { + err = res + } + } + } else if opt&Append != 0 { + res, err = mc.Append(vb, k, data) + } else if data == nil { + res, err = mc.Del(vb, k) + } else { + res, err = mc.Set(vb, k, flags, exp, data) + } + + if len(res.Extras) >= 16 { + vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8])) + seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16])) + mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo} + } + + return err + }) + + if err == nil && (opt&(Persist|Indexable) != 0) { + err = b.WaitForPersistence(k, res.Cas, data == nil) + } + + return mt, err +} + +// Set a value in this bucket with Cas and return the new Cas value +func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) (uint64, error) { + return b.WriteCas(k, 0, exp, cas, v, 0) +} + +// Set a value in this bucket with Cas without json encoding it +func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) (uint64, error) { + return b.WriteCas(k, 0, exp, cas, v, Raw) +} + +func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{}, + opt WriteOptions) (newCas uint64, err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { + ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) + }(time.Now()) + } + + var data []byte + if opt&Raw == 0 { + data, err = json.Marshal(v) + if err != nil { + return 0, err + } + } else if v != nil { + data = v.([]byte) + } + + var res *gomemcached.MCResponse + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err = mc.SetCas(vb, k, flags, exp, cas, data) + return err + }) + + if err == nil && (opt&(Persist|Indexable) != 0) { + err = b.WaitForPersistence(k, res.Cas, data == nil) + } + + return res.Cas, err +} + +// Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard +func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) { + return b.WriteCasWithMT(k, flags, exp, cas, v, 0) +} + +func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) { + return b.WriteCasWithMT(k, flags, exp, cas, v, Raw) +} + +func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{}, + opt WriteOptions) (newCas uint64, mt *MutationToken, err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { + ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) + }(time.Now()) + } + + var data []byte + if opt&Raw == 0 { + data, err = json.Marshal(v) + if err != nil { + return 0, nil, err + } + } else if v != nil { + data = v.([]byte) + } + + var res *gomemcached.MCResponse + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err = mc.SetCas(vb, k, flags, exp, cas, data) + return err + }) + + if err != nil { + return 0, nil, err + } + + // check for extras + if len(res.Extras) >= 16 { + vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8])) + seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16])) + vb := b.VBHash(k) + mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo} + } + + if err == nil && (opt&(Persist|Indexable) != 0) { + err = b.WaitForPersistence(k, res.Cas, data == nil) + } + + return res.Cas, mt, err +} + +// Set a value in this bucket. +// The value will be serialized into a JSON document. +func (b *Bucket) Set(k string, exp int, v interface{}) error { + return b.Write(k, 0, exp, v, 0) +} + +// Set a value in this bucket with with flags +func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}) (*MutationToken, error) { + return b.WriteWithMT(k, flags, exp, v, 0) +} + +// SetRaw sets a value in this bucket without JSON encoding it. +func (b *Bucket) SetRaw(k string, exp int, v []byte) error { + return b.Write(k, 0, exp, v, Raw) +} + +// Add adds a value to this bucket; like Set except that nothing +// happens if the key exists. The value will be serialized into a +// JSON document. +func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error) { + err = b.Write(k, 0, exp, v, AddOnly) + if err == ErrKeyExists { + return false, nil + } + return (err == nil), err +} + +// AddRaw adds a value to this bucket; like SetRaw except that nothing +// happens if the key exists. The value will be stored as raw bytes. +func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error) { + err = b.Write(k, 0, exp, v, AddOnly|Raw) + if err == ErrKeyExists { + return false, nil + } + return (err == nil), err +} + +// Add adds a value to this bucket; like Set except that nothing +// happens if the key exists. The value will be serialized into a +// JSON document. +func (b *Bucket) AddWithMT(k string, exp int, v interface{}) (added bool, mt *MutationToken, err error) { + mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly) + if err == ErrKeyExists { + return false, mt, nil + } + return (err == nil), mt, err +} + +// AddRaw adds a value to this bucket; like SetRaw except that nothing +// happens if the key exists. The value will be stored as raw bytes. +func (b *Bucket) AddRawWithMT(k string, exp int, v []byte) (added bool, mt *MutationToken, err error) { + mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw) + if err == ErrKeyExists { + return false, mt, nil + } + return (err == nil), mt, err +} + +// Append appends raw data to an existing item. +func (b *Bucket) Append(k string, data []byte) error { + return b.Write(k, 0, 0, data, Append|Raw) +} + +// Get a value straight from Memcached +func (b *Bucket) GetsMC(key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) { + var err error + var response *gomemcached.MCResponse + + if key == "" { + return nil, nil + } + + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now()) + } + + err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { + var err1 error + + mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) + response, err1 = mc.Get(vb, key) + mc.SetDeadline(noDeadline) + if err1 != nil { + return err1 + } + return nil + }, false) + return response, err +} + +// Get a value through the subdoc API +func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string) (*gomemcached.MCResponse, error) { + var err error + var response *gomemcached.MCResponse + + if key == "" { + return nil, nil + } + + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now()) + } + + err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { + var err1 error + + mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) + response, err1 = mc.GetSubdoc(vb, key, subPaths) + mc.SetDeadline(noDeadline) + if err1 != nil { + return err1 + } + return nil + }, false) + return response, err +} + +// GetsRaw gets a raw value from this bucket including its CAS +// counter and flags. +func (b *Bucket) GetsRaw(k string) (data []byte, flags int, + cas uint64, err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now()) + } + + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err := mc.Get(vb, k) + if err != nil { + return err + } + cas = res.Cas + if len(res.Extras) >= 4 { + flags = int(binary.BigEndian.Uint32(res.Extras)) + } + data = res.Body + return nil + }) + return +} + +// Gets gets a value from this bucket, including its CAS counter. The +// value is expected to be a JSON stream and will be deserialized into +// rv. +func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error { + data, _, cas, err := b.GetsRaw(k) + if err != nil { + return err + } + if caso != nil { + *caso = cas + } + return json.Unmarshal(data, rv) +} + +// Get a value from this bucket. +// The value is expected to be a JSON stream and will be deserialized +// into rv. +func (b *Bucket) Get(k string, rv interface{}) error { + return b.Gets(k, rv, nil) +} + +// GetRaw gets a raw value from this bucket. No marshaling is performed. +func (b *Bucket) GetRaw(k string) ([]byte, error) { + d, _, _, err := b.GetsRaw(k) + return d, err +} + +// GetAndTouchRaw gets a raw value from this bucket including its CAS +// counter and flags, and updates the expiry on the doc. +func (b *Bucket) GetAndTouchRaw(k string, exp int) (data []byte, + cas uint64, err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now()) + } + + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err := mc.GetAndTouch(vb, k, exp) + if err != nil { + return err + } + cas = res.Cas + data = res.Body + return nil + }) + return data, cas, err +} + +// GetMeta returns the meta values for a key +func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64) (err error) { + + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now()) + } + + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err := mc.GetMeta(vb, k) + if err != nil { + return err + } + + *cas = res.Cas + if len(res.Extras) >= 8 { + *flags = int(binary.BigEndian.Uint32(res.Extras[4:])) + } + + if len(res.Extras) >= 12 { + *expiry = int(binary.BigEndian.Uint32(res.Extras[8:])) + } + + if len(res.Extras) >= 20 { + *seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:])) + } + + return nil + }) + + return err +} + +// Delete a key from this bucket. +func (b *Bucket) Delete(k string) error { + return b.Write(k, 0, 0, nil, Raw) +} + +// Incr increments the value at a given key by amt and defaults to def if no value present. +func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error) { + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now()) + } + + var rv uint64 + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err := mc.Incr(vb, k, amt, def, exp) + if err != nil { + return err + } + rv = res + return nil + }) + return rv, err +} + +// Decr decrements the value at a given key by amt and defaults to def if no value present +func (b *Bucket) Decr(k string, amt, def uint64, exp int) (val uint64, err error) { + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now()) + } + + var rv uint64 + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + res, err := mc.Decr(vb, k, amt, def, exp) + if err != nil { + return err + } + rv = res + return nil + }) + return rv, err +} + +// Wrapper around memcached.CASNext() +func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool { + if ClientOpCallback != nil { + defer func(t time.Time) { + ClientOpCallback("casNext", k, t, state.Err) + }(time.Now()) + } + + keepGoing := false + state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + keepGoing = mc.CASNext(vb, k, exp, state) + return state.Err + }) + return keepGoing && state.Err == nil +} + +// An UpdateFunc is a callback function to update a document +type UpdateFunc func(current []byte) (updated []byte, err error) + +// Return this as the error from an UpdateFunc to cancel the Update +// operation. +const UpdateCancel = memcached.CASQuit + +// Update performs a Safe update of a document, avoiding conflicts by +// using CAS. +// +// The callback function will be invoked with the current raw document +// contents (or nil if the document doesn't exist); it should return +// the updated raw contents (or nil to delete.) If it decides not to +// change anything it can return UpdateCancel as the error. +// +// If another writer modifies the document between the get and the +// set, the callback will be invoked again with the newer value. +func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error { + _, err := b.update(k, exp, callback) + return err +} + +// internal version of Update that returns a CAS value +func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) { + var state memcached.CASState + for b.casNext(k, exp, &state) { + var err error + if state.Value, err = callback(state.Value); err != nil { + return 0, err + } + } + return state.Cas, state.Err +} + +// A WriteUpdateFunc is a callback function to update a document +type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error) + +// WriteUpdate performs a Safe update of a document, avoiding +// conflicts by using CAS. WriteUpdate is like Update, except that +// the callback can return a set of WriteOptions, of which Persist and +// Indexable are recognized: these cause the call to wait until the +// document update has been persisted to disk and/or become available +// to index. +func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error { + var writeOpts WriteOptions + var deletion bool + // Wrap the callback in an UpdateFunc we can pass to Update: + updateCallback := func(current []byte) (updated []byte, err error) { + update, opt, err := callback(current) + writeOpts = opt + deletion = (update == nil) + return update, err + } + cas, err := b.update(k, exp, updateCallback) + if err != nil { + return err + } + // If callback asked, wait for persistence or indexability: + if writeOpts&(Persist|Indexable) != 0 { + err = b.WaitForPersistence(k, cas, deletion) + } + return err +} + +// Observe observes the current state of a document. +func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) { + if ClientOpCallback != nil { + defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now()) + } + + err = b.Do(k, func(mc *memcached.Client, vb uint16) error { + result, err = mc.Observe(vb, k) + return err + }) + return +} + +// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) +// if the value has been overwritten by another before being persisted. +var ErrOverwritten = errors.New("overwritten") + +// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) +// if the value hasn't been persisted by the timeout interval +var ErrTimeout = errors.New("timeout") + +// WaitForPersistence waits for an item to be considered durable. +// +// Besides transport errors, ErrOverwritten may be returned if the +// item is overwritten before it reaches durability. ErrTimeout may +// occur if the item isn't found durable in a reasonable amount of +// time. +func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error { + timeout := 10 * time.Second + sleepDelay := 5 * time.Millisecond + start := time.Now() + for { + time.Sleep(sleepDelay) + sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time + + result, err := b.Observe(k) + if err != nil { + return err + } + if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten { + return ErrOverwritten + } else if persisted { + return nil + } + + if result.PersistenceTime > 0 { + timeout = 2 * result.PersistenceTime + } + if time.Since(start) >= timeout-sleepDelay { + return ErrTimeout + } + } +} + +var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16) + +type stringPool struct { + pool *sync.Pool + size int +} + +func newStringPool(size int) *stringPool { + rv := &stringPool{ + pool: &sync.Pool{ + New: func() interface{} { + return make([]string, 0, size) + }, + }, + size: size, + } + + return rv +} + +func (this *stringPool) Get() []string { + return this.pool.Get().([]string) +} + +func (this *stringPool) Put(s []string) { + if s == nil || cap(s) < this.size || cap(s) > 2*this.size { + return + } + + this.pool.Put(s[0:0]) +} + +var _STRING_POOL = newStringPool(16) + +type vbStringPool struct { + pool *sync.Pool + strPool *stringPool +} + +func newVBStringPool(size int, sp *stringPool) *vbStringPool { + rv := &vbStringPool{ + pool: &sync.Pool{ + New: func() interface{} { + return make(map[uint16][]string, size) + }, + }, + strPool: sp, + } + + return rv +} + +func (this *vbStringPool) Get() map[uint16][]string { + return this.pool.Get().(map[uint16][]string) +} + +func (this *vbStringPool) Put(s map[uint16][]string) { + if s == nil { + return + } + + for k, v := range s { + delete(s, k) + this.strPool.Put(v) + } + + this.pool.Put(s) +} + +var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL) diff --git a/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go b/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go new file mode 100644 index 0000000000..babd3adb6a --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go @@ -0,0 +1,387 @@ +package couchbase + +import ( + "errors" + "sync/atomic" + "time" + + "github.com/couchbase/gomemcached" + "github.com/couchbase/gomemcached/client" + "github.com/couchbase/goutils/logging" +) + +// GenericMcdAuthHandler is a kind of AuthHandler that performs +// special auth exchange (like non-standard auth, possibly followed by +// select-bucket). +type GenericMcdAuthHandler interface { + AuthHandler + AuthenticateMemcachedConn(host string, conn *memcached.Client) error +} + +// Error raised when a connection can't be retrieved from a pool. +var TimeoutError = errors.New("timeout waiting to build connection") +var errClosedPool = errors.New("the connection pool is closed") +var errNoPool = errors.New("no connection pool") + +// Default timeout for retrieving a connection from the pool. +var ConnPoolTimeout = time.Hour * 24 * 30 + +// overflow connection closer cycle time +var ConnCloserInterval = time.Second * 30 + +// ConnPoolAvailWaitTime is the amount of time to wait for an existing +// connection from the pool before considering the creation of a new +// one. +var ConnPoolAvailWaitTime = time.Millisecond + +type connectionPool struct { + host string + mkConn func(host string, ah AuthHandler) (*memcached.Client, error) + auth AuthHandler + connections chan *memcached.Client + createsem chan bool + bailOut chan bool + poolSize int + connCount uint64 + inUse bool +} + +func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int) *connectionPool { + connSize := poolSize + if closer { + connSize += poolOverflow + } + rv := &connectionPool{ + host: host, + connections: make(chan *memcached.Client, connSize), + createsem: make(chan bool, poolSize+poolOverflow), + mkConn: defaultMkConn, + auth: ah, + poolSize: poolSize, + } + if closer { + rv.bailOut = make(chan bool, 1) + go rv.connCloser() + } + return rv +} + +// ConnPoolTimeout is notified whenever connections are acquired from a pool. +var ConnPoolCallback func(host string, source string, start time.Time, err error) + +func defaultMkConn(host string, ah AuthHandler) (*memcached.Client, error) { + var features memcached.Features + + conn, err := memcached.Connect("tcp", host) + if err != nil { + return nil, err + } + + if TCPKeepalive == true { + conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second) + } + + if EnableMutationToken == true { + features = append(features, memcached.FeatureMutationToken) + } + if EnableDataType == true { + features = append(features, memcached.FeatureDataType) + } + + if EnableXattr == true { + features = append(features, memcached.FeatureXattr) + } + + if len(features) > 0 { + if DefaultTimeout > 0 { + conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout)) + } + + res, err := conn.EnableFeatures(features) + + if DefaultTimeout > 0 { + conn.SetDeadline(noDeadline) + } + + if err != nil && isTimeoutError(err) { + conn.Close() + return nil, err + } + + if err != nil || res.Status != gomemcached.SUCCESS { + logging.Warnf("Unable to enable features %v", err) + } + } + + if gah, ok := ah.(GenericMcdAuthHandler); ok { + err = gah.AuthenticateMemcachedConn(host, conn) + if err != nil { + conn.Close() + return nil, err + } + return conn, nil + } + name, pass, bucket := ah.GetCredentials() + if name != "default" { + _, err = conn.Auth(name, pass) + if err != nil { + conn.Close() + return nil, err + } + // Select bucket (Required for cb_auth creds) + // Required when doing auth with _admin credentials + if bucket != "" && bucket != name { + _, err = conn.SelectBucket(bucket) + if err != nil { + conn.Close() + return nil, err + } + } + } + return conn, nil +} + +func (cp *connectionPool) Close() (err error) { + defer func() { + if recover() != nil { + err = errors.New("connectionPool.Close error") + } + }() + if cp.bailOut != nil { + + // defensively, we won't wait if the channel is full + select { + case cp.bailOut <- false: + default: + } + } + close(cp.connections) + for c := range cp.connections { + c.Close() + } + return +} + +func (cp *connectionPool) Node() string { + return cp.host +} + +func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) { + if cp == nil { + return nil, errNoPool + } + + path := "" + + if ConnPoolCallback != nil { + defer func(path *string, start time.Time) { + ConnPoolCallback(cp.host, *path, start, err) + }(&path, time.Now()) + } + + path = "short-circuit" + + // short-circuit available connetions. + select { + case rv, isopen := <-cp.connections: + if !isopen { + return nil, errClosedPool + } + atomic.AddUint64(&cp.connCount, 1) + return rv, nil + default: + } + + t := time.NewTimer(ConnPoolAvailWaitTime) + defer t.Stop() + + // Try to grab an available connection within 1ms + select { + case rv, isopen := <-cp.connections: + path = "avail1" + if !isopen { + return nil, errClosedPool + } + atomic.AddUint64(&cp.connCount, 1) + return rv, nil + case <-t.C: + // No connection came around in time, let's see + // whether we can get one or build a new one first. + t.Reset(d) // Reuse the timer for the full timeout. + select { + case rv, isopen := <-cp.connections: + path = "avail2" + if !isopen { + return nil, errClosedPool + } + atomic.AddUint64(&cp.connCount, 1) + return rv, nil + case cp.createsem <- true: + path = "create" + // Build a connection if we can't get a real one. + // This can potentially be an overflow connection, or + // a pooled connection. + rv, err := cp.mkConn(cp.host, cp.auth) + if err != nil { + // On error, release our create hold + <-cp.createsem + } else { + atomic.AddUint64(&cp.connCount, 1) + } + return rv, err + case <-t.C: + return nil, ErrTimeout + } + } +} + +func (cp *connectionPool) Get() (*memcached.Client, error) { + return cp.GetWithTimeout(ConnPoolTimeout) +} + +func (cp *connectionPool) Return(c *memcached.Client) { + if c == nil { + return + } + + if cp == nil { + c.Close() + } + + if c.IsHealthy() { + defer func() { + if recover() != nil { + // This happens when the pool has already been + // closed and we're trying to return a + // connection to it anyway. Just close the + // connection. + c.Close() + } + }() + + select { + case cp.connections <- c: + default: + <-cp.createsem + c.Close() + } + } else { + <-cp.createsem + c.Close() + } +} + +// give the ability to discard a connection from a pool +// useful for ditching connections to the wrong node after a rebalance +func (cp *connectionPool) Discard(c *memcached.Client) { + <-cp.createsem + c.Close() +} + +// asynchronous connection closer +func (cp *connectionPool) connCloser() { + var connCount uint64 + + t := time.NewTimer(ConnCloserInterval) + defer t.Stop() + + for { + connCount = cp.connCount + + // we don't exist anymore! bail out! + select { + case <-cp.bailOut: + return + case <-t.C: + } + t.Reset(ConnCloserInterval) + + // no overflow connections open or sustained requests for connections + // nothing to do until the next cycle + if len(cp.connections) <= cp.poolSize || + ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) { + continue + } + + // close overflow connections now that they are not needed + for c := range cp.connections { + select { + case <-cp.bailOut: + return + default: + } + + // bail out if close did not work out + if !cp.connCleanup(c) { + return + } + if len(cp.connections) <= cp.poolSize { + break + } + } + } +} + +// close connection with recovery on error +func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) { + + // just in case we are closing a connection after + // bailOut has been sent but we haven't yet read it + defer func() { + if recover() != nil { + rv = false + } + }() + rv = true + + c.Close() + <-cp.createsem + return +} + +func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) { + if cp == nil { + return nil, errNoPool + } + mc, err := cp.Get() + if err != nil { + return nil, err + } + + // A connection can't be used after TAP; Dont' count it against the + // connection pool capacity + <-cp.createsem + + return mc.StartTapFeed(*args) +} + +const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb + +func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) { + if cp == nil { + return nil, errNoPool + } + mc, err := cp.Get() + if err != nil { + return nil, err + } + + // A connection can't be used after it has been allocated to UPR; + // Dont' count it against the connection pool capacity + <-cp.createsem + + uf, err := mc.NewUprFeed() + if err != nil { + return nil, err + } + + if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil { + return nil, err + } + + if err := uf.StartFeedWithConfig(data_chan_size); err != nil { + return nil, err + } + + return uf, nil +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go b/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go new file mode 100644 index 0000000000..f9cc343aa8 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go @@ -0,0 +1,288 @@ +package couchbase + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/couchbase/goutils/logging" + "io/ioutil" + "net/http" +) + +// ViewDefinition represents a single view within a design document. +type ViewDefinition struct { + Map string `json:"map"` + Reduce string `json:"reduce,omitempty"` +} + +// DDoc is the document body of a design document specifying a view. +type DDoc struct { + Language string `json:"language,omitempty"` + Views map[string]ViewDefinition `json:"views"` +} + +// DDocsResult represents the result from listing the design +// documents. +type DDocsResult struct { + Rows []struct { + DDoc struct { + Meta map[string]interface{} + JSON DDoc + } `json:"doc"` + } `json:"rows"` +} + +// GetDDocs lists all design documents +func (b *Bucket) GetDDocs() (DDocsResult, error) { + var ddocsResult DDocsResult + b.RLock() + pool := b.pool + uri := b.DDocs.URI + b.RUnlock() + + // MB-23555 ephemeral buckets have no ddocs + if uri == "" { + return DDocsResult{}, nil + } + + err := pool.client.parseURLResponse(uri, &ddocsResult) + if err != nil { + return DDocsResult{}, err + } + return ddocsResult, nil +} + +func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error { + ddocURI := fmt.Sprintf("/%s/_design/%s", b.GetName(), docname) + err := b.parseAPIResponse(ddocURI, &into) + if err != nil { + return err + } + return nil +} + +func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error) { + var ddocsResult DDocsResult + b.RLock() + uri := b.DDocs.URI + b.RUnlock() + + // MB-23555 ephemeral buckets have no ddocs + if uri == "" { + return DDocsResult{}, nil + } + + err := b.parseURLResponse(uri, &ddocsResult) + if err != nil { + return DDocsResult{}, err + } + return ddocsResult, nil +} + +func (b *Bucket) ddocURL(docname string) (string, error) { + u, err := b.randomBaseURL() + if err != nil { + return "", err + } + u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname) + return u.String(), nil +} + +func (b *Bucket) ddocURLNext(nodeId int, docname string) (string, int, error) { + u, selected, err := b.randomNextURL(nodeId) + if err != nil { + return "", -1, err + } + u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname) + return u.String(), selected, nil +} + +const ABS_MAX_RETRIES = 10 +const ABS_MIN_RETRIES = 3 + +func (b *Bucket) getMaxRetries() (int, error) { + + maxRetries := len(b.Nodes()) + + if maxRetries == 0 { + return 0, fmt.Errorf("No available Couch rest URLs") + } + + if maxRetries > ABS_MAX_RETRIES { + maxRetries = ABS_MAX_RETRIES + } else if maxRetries < ABS_MIN_RETRIES { + maxRetries = ABS_MIN_RETRIES + } + + return maxRetries, nil +} + +// PutDDoc installs a design document. +func (b *Bucket) PutDDoc(docname string, value interface{}) error { + + var Err error + + maxRetries, err := b.getMaxRetries() + if err != nil { + return err + } + + lastNode := START_NODE_ID + + for retryCount := 0; retryCount < maxRetries; retryCount++ { + + Err = nil + + ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname) + if err != nil { + return err + } + + lastNode = selectedNode + + logging.Infof(" Trying with selected node %d", selectedNode) + j, err := json.Marshal(value) + if err != nil { + return err + } + + req, err := http.NewRequest("PUT", ddocU, bytes.NewReader(j)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */)) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + if res.StatusCode != 201 { + body, _ := ioutil.ReadAll(res.Body) + Err = fmt.Errorf("error installing view: %v / %s", + res.Status, body) + logging.Errorf(" Error in PutDDOC %v. Retrying...", Err) + res.Body.Close() + b.Refresh() + continue + } + + res.Body.Close() + break + } + + return Err +} + +// GetDDoc retrieves a specific a design doc. +func (b *Bucket) GetDDoc(docname string, into interface{}) error { + var Err error + var res *http.Response + + maxRetries, err := b.getMaxRetries() + if err != nil { + return err + } + + lastNode := START_NODE_ID + for retryCount := 0; retryCount < maxRetries; retryCount++ { + + Err = nil + ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname) + if err != nil { + return err + } + + lastNode = selectedNode + logging.Infof(" Trying with selected node %d", selectedNode) + + req, err := http.NewRequest("GET", ddocU, nil) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */)) + if err != nil { + return err + } + + res, err = doHTTPRequest(req) + if err != nil { + return err + } + if res.StatusCode != 200 { + body, _ := ioutil.ReadAll(res.Body) + Err = fmt.Errorf("error reading view: %v / %s", + res.Status, body) + logging.Errorf(" Error in GetDDOC %v Retrying...", Err) + b.Refresh() + res.Body.Close() + continue + } + defer res.Body.Close() + break + } + + if Err != nil { + return Err + } + + d := json.NewDecoder(res.Body) + return d.Decode(into) +} + +// DeleteDDoc removes a design document. +func (b *Bucket) DeleteDDoc(docname string) error { + + var Err error + + maxRetries, err := b.getMaxRetries() + if err != nil { + return err + } + + lastNode := START_NODE_ID + + for retryCount := 0; retryCount < maxRetries; retryCount++ { + + Err = nil + ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname) + if err != nil { + return err + } + + lastNode = selectedNode + logging.Infof(" Trying with selected node %d", selectedNode) + + req, err := http.NewRequest("DELETE", ddocU, nil) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + err = maybeAddAuth(req, b.authHandler(false /* bucket not already locked */)) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + if res.StatusCode != 200 { + body, _ := ioutil.ReadAll(res.Body) + Err = fmt.Errorf("error deleting view : %v / %s", res.Status, body) + logging.Errorf(" Error in DeleteDDOC %v. Retrying ... ", Err) + b.Refresh() + res.Body.Close() + continue + } + + res.Body.Close() + break + } + return Err +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/observe.go b/vendor/github.com/couchbaselabs/go-couchbase/observe.go new file mode 100644 index 0000000000..6e746f5a16 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/observe.go @@ -0,0 +1,300 @@ +package couchbase + +import ( + "fmt" + "github.com/couchbase/goutils/logging" + "sync" +) + +type PersistTo uint8 + +const ( + PersistNone = PersistTo(0x00) + PersistMaster = PersistTo(0x01) + PersistOne = PersistTo(0x02) + PersistTwo = PersistTo(0x03) + PersistThree = PersistTo(0x04) + PersistFour = PersistTo(0x05) +) + +type ObserveTo uint8 + +const ( + ObserveNone = ObserveTo(0x00) + ObserveReplicateOne = ObserveTo(0x01) + ObserveReplicateTwo = ObserveTo(0x02) + ObserveReplicateThree = ObserveTo(0x03) + ObserveReplicateFour = ObserveTo(0x04) +) + +type JobType uint8 + +const ( + OBSERVE = JobType(0x00) + PERSIST = JobType(0x01) +) + +type ObservePersistJob struct { + vb uint16 + vbuuid uint64 + hostname string + jobType JobType + failover uint8 + lastPersistedSeqNo uint64 + currentSeqNo uint64 + resultChan chan *ObservePersistJob + errorChan chan *OPErrResponse +} + +type OPErrResponse struct { + vb uint16 + vbuuid uint64 + err error + job *ObservePersistJob +} + +var ObservePersistPool = NewPool(1024) +var OPJobChan = make(chan *ObservePersistJob, 1024) +var OPJobDone = make(chan bool) + +var wg sync.WaitGroup + +func (b *Bucket) StartOPPollers(maxWorkers int) { + + for i := 0; i < maxWorkers; i++ { + go b.OPJobPoll() + wg.Add(1) + } + wg.Wait() +} + +func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) { + + numNodes := len(b.Nodes()) + if int(nPersist) > numNodes || int(nObserve) > numNodes { + return fmt.Errorf("Not enough healthy nodes in the cluster") + } + + if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas { + return fmt.Errorf("Not enough replicas in the cluster") + } + + if EnableMutationToken == false { + return fmt.Errorf("Mutation Tokens not enabled ") + } + + b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)} + return +} + +func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) { + b.RLock() + ds := b.ds + b.RUnlock() + + if ds == nil { + return + } + + nj := 0 // total number of jobs + resultChan := make(chan *ObservePersistJob, 10) + errChan := make(chan *OPErrResponse, 10) + + nodes := b.GetNodeList(vb) + if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) { + return fmt.Errorf("Not enough healthy nodes in the cluster"), false + } + + logging.Infof("Node list %v", nodes) + + if ds.Observe >= ObserveReplicateOne { + // create a job for each host + for i := ObserveReplicateOne; i < ds.Observe+1; i++ { + opJob := ObservePersistPool.Get() + opJob.vb = vb + opJob.vbuuid = vbuuid + opJob.jobType = OBSERVE + opJob.hostname = nodes[i] + opJob.resultChan = resultChan + opJob.errorChan = errChan + + OPJobChan <- opJob + nj++ + + } + } + + if ds.Persist >= PersistMaster { + for i := PersistMaster; i < ds.Persist+1; i++ { + opJob := ObservePersistPool.Get() + opJob.vb = vb + opJob.vbuuid = vbuuid + opJob.jobType = PERSIST + opJob.hostname = nodes[i] + opJob.resultChan = resultChan + opJob.errorChan = errChan + + OPJobChan <- opJob + nj++ + + } + } + + ok := true + for ok { + select { + case res := <-resultChan: + jobDone := false + if res.failover == 0 { + // no failover + if res.jobType == PERSIST { + if res.lastPersistedSeqNo >= seqNo { + jobDone = true + } + + } else { + if res.currentSeqNo >= seqNo { + jobDone = true + } + } + + if jobDone == true { + nj-- + ObservePersistPool.Put(res) + } else { + // requeue this job + OPJobChan <- res + } + + } else { + // Not currently handling failover scenarios TODO + nj-- + ObservePersistPool.Put(res) + failover = true + } + + if nj == 0 { + // done with all the jobs + ok = false + close(resultChan) + close(errChan) + } + + case Err := <-errChan: + logging.Errorf("Error in Observe/Persist %v", Err.err) + err = fmt.Errorf("Error in Observe/Persist job %v", Err.err) + nj-- + ObservePersistPool.Put(Err.job) + if nj == 0 { + close(resultChan) + close(errChan) + ok = false + } + } + } + + return +} + +func (b *Bucket) OPJobPoll() { + + ok := true + for ok == true { + select { + case job := <-OPJobChan: + pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */) + if pool == nil { + errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} + errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname) + errRes.job = job + job.errorChan <- errRes + continue + } + conn, err := pool.Get() + if err != nil { + errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} + errRes.err = fmt.Errorf("Unable to get connection from pool %v", err) + errRes.job = job + job.errorChan <- errRes + continue + } + + res, err := conn.ObserveSeq(job.vb, job.vbuuid) + if err != nil { + errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} + errRes.err = fmt.Errorf("Command failed %v", err) + errRes.job = job + job.errorChan <- errRes + continue + + } + pool.Return(conn) + job.lastPersistedSeqNo = res.LastPersistedSeqNo + job.currentSeqNo = res.CurrentSeqNo + job.failover = res.Failover + + job.resultChan <- job + case <-OPJobDone: + logging.Infof("Observe Persist Poller exitting") + ok = false + } + } + wg.Done() +} + +func (b *Bucket) GetNodeList(vb uint16) []string { + + vbm := b.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + logging.Infof("vbmap smaller than vblist") + return nil + } + + nodes := make([]string, len(vbm.VBucketMap[vb])) + for i := 0; i < len(vbm.VBucketMap[vb]); i++ { + n := vbm.VBucketMap[vb][i] + if n < 0 { + continue + } + + node := b.getMasterNode(n) + if len(node) > 1 { + nodes[i] = node + } + continue + + } + return nodes +} + +//pool of ObservePersist Jobs +type OPpool struct { + pool chan *ObservePersistJob +} + +// NewPool creates a new pool of jobs +func NewPool(max int) *OPpool { + return &OPpool{ + pool: make(chan *ObservePersistJob, max), + } +} + +// Borrow a Client from the pool. +func (p *OPpool) Get() *ObservePersistJob { + var o *ObservePersistJob + select { + case o = <-p.pool: + default: + o = &ObservePersistJob{} + } + return o +} + +// Return returns a Client to the pool. +func (p *OPpool) Put(o *ObservePersistJob) { + select { + case p.pool <- o: + default: + // let it go, let it go... + } +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/pools.go b/vendor/github.com/couchbaselabs/go-couchbase/pools.go new file mode 100644 index 0000000000..5f3ff8c495 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/pools.go @@ -0,0 +1,1282 @@ +package couchbase + +import ( + "bufio" + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "runtime" + "sort" + "strings" + "sync" + "unsafe" + + "github.com/couchbase/goutils/logging" + + "github.com/couchbase/gomemcached" // package name is 'gomemcached' + "github.com/couchbase/gomemcached/client" // package name is 'memcached' +) + +// HTTPClient to use for REST and view operations. +var MaxIdleConnsPerHost = 256 +var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost} +var HTTPClient = &http.Client{Transport: HTTPTransport} + +// PoolSize is the size of each connection pool (per host). +var PoolSize = 64 + +// PoolOverflow is the number of overflow connections allowed in a +// pool. +var PoolOverflow = 16 + +// AsynchronousCloser turns on asynchronous closing for overflow connections +var AsynchronousCloser = false + +// TCP KeepAlive enabled/disabled +var TCPKeepalive = false + +// Enable MutationToken +var EnableMutationToken = false + +// Enable Data Type response +var EnableDataType = false + +// Enable Xattr +var EnableXattr = false + +// TCP keepalive interval in seconds. Default 30 minutes +var TCPKeepaliveInterval = 30 * 60 + +// Used to decide whether to skip verification of certificates when +// connecting to an ssl port. +var skipVerify = true +var certFile = "" +var keyFile = "" +var rootFile = "" + +func SetSkipVerify(skip bool) { + skipVerify = skip +} + +func SetCertFile(cert string) { + certFile = cert +} + +func SetKeyFile(cert string) { + keyFile = cert +} + +func SetRootFile(cert string) { + rootFile = cert +} + +// Allow applications to speciify the Poolsize and Overflow +func SetConnectionPoolParams(size, overflow int) { + + if size > 0 { + PoolSize = size + } + + if overflow > 0 { + PoolOverflow = overflow + } +} + +// Turn off overflow connections +func DisableOverflowConnections() { + PoolOverflow = 0 +} + +// Toggle asynchronous overflow closer +func EnableAsynchronousCloser(closer bool) { + AsynchronousCloser = closer +} + +// Allow TCP keepalive parameters to be set by the application +func SetTcpKeepalive(enabled bool, interval int) { + + TCPKeepalive = enabled + + if interval > 0 { + TCPKeepaliveInterval = interval + } +} + +// AuthHandler is a callback that gets the auth username and password +// for the given bucket. +type AuthHandler interface { + GetCredentials() (string, string, string) +} + +// AuthHandler is a callback that gets the auth username and password +// for the given bucket and sasl for memcached. +type AuthWithSaslHandler interface { + AuthHandler + GetSaslCredentials() (string, string) +} + +// MultiBucketAuthHandler is kind of AuthHandler that may perform +// different auth for different buckets. +type MultiBucketAuthHandler interface { + AuthHandler + ForBucket(bucket string) AuthHandler +} + +// HTTPAuthHandler is kind of AuthHandler that performs more general +// for outgoing http requests than is possible via simple +// GetCredentials() call (i.e. digest auth or different auth per +// different destinations). +type HTTPAuthHandler interface { + AuthHandler + SetCredsForRequest(req *http.Request) error +} + +// RestPool represents a single pool returned from the pools REST API. +type RestPool struct { + Name string `json:"name"` + StreamingURI string `json:"streamingUri"` + URI string `json:"uri"` +} + +// Pools represents the collection of pools as returned from the REST API. +type Pools struct { + ComponentsVersion map[string]string `json:"componentsVersion,omitempty"` + ImplementationVersion string `json:"implementationVersion"` + IsAdmin bool `json:"isAdminCreds"` + UUID string `json:"uuid"` + Pools []RestPool `json:"pools"` +} + +// A Node is a computer in a cluster running the couchbase software. +type Node struct { + ClusterCompatibility int `json:"clusterCompatibility"` + ClusterMembership string `json:"clusterMembership"` + CouchAPIBase string `json:"couchApiBase"` + Hostname string `json:"hostname"` + InterestingStats map[string]float64 `json:"interestingStats,omitempty"` + MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"` + MCDMemoryReserved float64 `json:"mcdMemoryReserved"` + MemoryFree float64 `json:"memoryFree"` + MemoryTotal float64 `json:"memoryTotal"` + OS string `json:"os"` + Ports map[string]int `json:"ports"` + Services []string `json:"services"` + Status string `json:"status"` + Uptime int `json:"uptime,string"` + Version string `json:"version"` + ThisNode bool `json:"thisNode,omitempty"` +} + +// A Pool of nodes and buckets. +type Pool struct { + BucketMap map[string]Bucket + Nodes []Node + + BucketURL map[string]string `json:"buckets"` + + client Client +} + +// VBucketServerMap is the a mapping of vbuckets to nodes. +type VBucketServerMap struct { + HashAlgorithm string `json:"hashAlgorithm"` + NumReplicas int `json:"numReplicas"` + ServerList []string `json:"serverList"` + VBucketMap [][]int `json:"vBucketMap"` +} + +type DurablitySettings struct { + Persist PersistTo + Observe ObserveTo +} + +// Bucket is the primary entry point for most data operations. +// Bucket is a locked data structure. All access to its fields should be done using read or write locking, +// as appropriate. +// +// Some access methods require locking, but rely on the caller to do so. These are appropriate +// for calls from methods that have already locked the structure. Methods like this +// take a boolean parameter "bucketLocked". +type Bucket struct { + sync.RWMutex + AuthType string `json:"authType"` + Capabilities []string `json:"bucketCapabilities"` + CapabilitiesVersion string `json:"bucketCapabilitiesVer"` + Type string `json:"bucketType"` + Name string `json:"name"` + NodeLocator string `json:"nodeLocator"` + Quota map[string]float64 `json:"quota,omitempty"` + Replicas int `json:"replicaNumber"` + Password string `json:"saslPassword"` + URI string `json:"uri"` + StreamingURI string `json:"streamingUri"` + LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"` + UUID string `json:"uuid"` + ConflictResolutionType string `json:"conflictResolutionType,omitempty"` + DDocs struct { + URI string `json:"uri"` + } `json:"ddocs,omitempty"` + BasicStats map[string]interface{} `json:"basicStats,omitempty"` + Controllers map[string]interface{} `json:"controllers,omitempty"` + + // These are used for JSON IO, but isn't used for processing + // since it needs to be swapped out safely. + VBSMJson VBucketServerMap `json:"vBucketServerMap"` + NodesJSON []Node `json:"nodes"` + + pool *Pool + connPools unsafe.Pointer // *[]*connectionPool + vBucketServerMap unsafe.Pointer // *VBucketServerMap + nodeList unsafe.Pointer // *[]Node + commonSufix string + ah AuthHandler // auth handler + ds *DurablitySettings // Durablity Settings for this bucket + Scopes Scopes +} + +// PoolServices is all the bucket-independent services in a pool +type PoolServices struct { + Rev int `json:"rev"` + NodesExt []NodeServices `json:"nodesExt"` +} + +// NodeServices is all the bucket-independent services running on +// a node (given by Hostname) +type NodeServices struct { + Services map[string]int `json:"services,omitempty"` + Hostname string `json:"hostname"` + ThisNode bool `json:"thisNode"` +} + +type BucketNotFoundError struct { + bucket string +} + +func (e *BucketNotFoundError) Error() string { + return fmt.Sprint("No bucket named " + e.bucket) +} + +type BucketAuth struct { + name string + saslPwd string + bucket string +} + +func newBucketAuth(name string, pass string, bucket string) *BucketAuth { + return &BucketAuth{name: name, saslPwd: pass, bucket: bucket} +} + +func (ba *BucketAuth) GetCredentials() (string, string, string) { + return ba.name, ba.saslPwd, ba.bucket +} + +// VBServerMap returns the current VBucketServerMap. +func (b *Bucket) VBServerMap() *VBucketServerMap { + b.RLock() + defer b.RUnlock() + ret := (*VBucketServerMap)(b.vBucketServerMap) + return ret +} + +func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) { + vbmap := b.VBServerMap() + servers := vbmap.ServerList + if addrs == nil { + addrs = vbmap.ServerList + } + + m := make(map[string][]uint16) + for _, addr := range addrs { + m[addr] = make([]uint16, 0) + } + for vbno, idxs := range vbmap.VBucketMap { + if len(idxs) == 0 { + return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno) + } else if idxs[0] < 0 || idxs[0] >= len(servers) { + return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno) + } + addr := servers[idxs[0]] + if _, ok := m[addr]; ok { + m[addr] = append(m[addr], uint16(vbno)) + } + } + return m, nil +} + +// true if node is not on the bucket VBmap +func (b *Bucket) checkVBmap(node string) bool { + vbmap := b.VBServerMap() + servers := vbmap.ServerList + + for _, idxs := range vbmap.VBucketMap { + if len(idxs) == 0 { + return true + } else if idxs[0] < 0 || idxs[0] >= len(servers) { + return true + } + if servers[idxs[0]] == node { + return false + } + } + return true +} + +func (b *Bucket) GetName() string { + b.RLock() + defer b.RUnlock() + ret := b.Name + return ret +} + +// Nodes returns teh current list of nodes servicing this bucket. +func (b *Bucket) Nodes() []Node { + b.RLock() + defer b.RUnlock() + ret := *(*[]Node)(b.nodeList) + return ret +} + +// return the list of healthy nodes +func (b *Bucket) HealthyNodes() []Node { + nodes := []Node{} + + for _, n := range b.Nodes() { + if n.Status == "healthy" && n.CouchAPIBase != "" { + nodes = append(nodes, n) + } + if n.Status != "healthy" { // log non-healthy node + logging.Infof("Non-healthy node; node details:") + logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode) + } + } + + return nodes +} + +func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool { + if !bucketLocked { + b.RLock() + defer b.RUnlock() + } + if b.connPools != nil { + return *(*[]*connectionPool)(b.connPools) + } else { + return nil + } +} + +func (b *Bucket) replaceConnPools(with []*connectionPool) { + b.Lock() + defer b.Unlock() + + old := b.connPools + b.connPools = unsafe.Pointer(&with) + if old != nil { + for _, pool := range *(*[]*connectionPool)(old) { + if pool != nil { + pool.Close() + } + } + } + return +} + +func (b *Bucket) getConnPool(i int) *connectionPool { + + if i < 0 { + return nil + } + + p := b.getConnPools(false /* not already locked */) + if len(p) > i { + return p[i] + } + + return nil +} + +func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool { + pools := b.getConnPools(bucketLocked) + for _, p := range pools { + if p != nil && p.host == host { + return p + } + } + + return nil +} + +// Given a vbucket number, returns a memcached connection to it. +// The connection must be returned to its pool after use. +func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) { + for { + vbm := b.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v", + vb, vbm.VBucketMap) + } + masterId := vbm.VBucketMap[vb][0] + if masterId < 0 { + return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb) + } + pool := b.getConnPool(masterId) + conn, err := pool.Get() + if err != errClosedPool { + return conn, pool, err + } + // If conn pool was closed, because another goroutine refreshed the vbucket map, retry... + } +} + +// To get random documents, we need to cover all the nodes, so select +// a connection at random. + +func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) { + for { + var currentPool = 0 + pools := b.getConnPools(false /* not already locked */) + if len(pools) == 0 { + return nil, nil, fmt.Errorf("No connection pool found") + } else if len(pools) > 1 { // choose a random connection + currentPool = rand.Intn(len(pools)) + } // if only one pool, currentPool defaults to 0, i.e., the only pool + + // get the pool + pool := pools[currentPool] + conn, err := pool.Get() + if err != errClosedPool { + return conn, pool, err + } + + // If conn pool was closed, because another goroutine refreshed the vbucket map, retry... + } +} + +// +// Get a random document from a bucket. Since the bucket may be distributed +// across nodes, we must first select a random connection, and then use the +// Client.GetRandomDoc() call to get a random document from that node. +// + +func (b *Bucket) GetRandomDoc() (*gomemcached.MCResponse, error) { + // get a connection from the pool + conn, pool, err := b.getRandomConnection() + + if err != nil { + return nil, err + } + + // get a randomm document from the connection + doc, err := conn.GetRandomDoc() + // need to return the connection to the pool + pool.Return(conn) + return doc, err +} + +func (b *Bucket) getMasterNode(i int) string { + p := b.getConnPools(false /* not already locked */) + if len(p) > i { + return p[i].host + } + return "" +} + +func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) { + if !bucketLocked { + b.RLock() + defer b.RUnlock() + } + pool := b.pool + name := b.Name + + if pool != nil { + ah = pool.client.ah + } + if mbah, ok := ah.(MultiBucketAuthHandler); ok { + return mbah.ForBucket(name) + } + if ah == nil { + ah = &basicAuth{name, ""} + } + return +} + +// NodeAddresses gets the (sorted) list of memcached node addresses +// (hostname:port). +func (b *Bucket) NodeAddresses() []string { + vsm := b.VBServerMap() + rv := make([]string, len(vsm.ServerList)) + copy(rv, vsm.ServerList) + sort.Strings(rv) + return rv +} + +// CommonAddressSuffix finds the longest common suffix of all +// host:port strings in the node list. +func (b *Bucket) CommonAddressSuffix() string { + input := []string{} + for _, n := range b.Nodes() { + input = append(input, n.Hostname) + } + return FindCommonSuffix(input) +} + +// A Client is the starting point for all services across all buckets +// in a Couchbase cluster. +type Client struct { + BaseURL *url.URL + ah AuthHandler + Info Pools +} + +func maybeAddAuth(req *http.Request, ah AuthHandler) error { + if hah, ok := ah.(HTTPAuthHandler); ok { + return hah.SetCredsForRequest(req) + } + if ah != nil { + user, pass, _ := ah.GetCredentials() + req.Header.Set("Authorization", "Basic "+ + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))) + } + return nil +} + +// arbitary number, may need to be tuned #FIXME +const HTTP_MAX_RETRY = 5 + +// Someday golang network packages will implement standard +// error codes. Until then #sigh +func isHttpConnError(err error) bool { + + estr := err.Error() + return strings.Contains(estr, "broken pipe") || + strings.Contains(estr, "broken connection") || + strings.Contains(estr, "connection reset") +} + +var client *http.Client + +func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) { + cfg := &tls.Config{} + + if certFile != "" && keyFile != "" { + tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + cfg.Certificates = []tls.Certificate{tlsCert} + } else { + //error need to pass both certfile and keyfile + return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile") + } + + var caCert []byte + var err1 error + + caCertPool := x509.NewCertPool() + if rootFile != "" { + // Read that value in + caCert, err1 = ioutil.ReadFile(rootFile) + if err1 != nil { + return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1) + } + caCertPool.AppendCertsFromPEM(caCert) + } + + cfg.RootCAs = caCertPool + return cfg, nil +} + +func doHTTPRequest(req *http.Request) (*http.Response, error) { + + var err error + var res *http.Response + + tr := &http.Transport{} + + // we need a client that ignores certificate errors, since we self-sign + // our certs + if client == nil && req.URL.Scheme == "https" { + if skipVerify { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } else { + // Handle cases with cert + + cfg, err := ClientConfigForX509(certFile, keyFile, rootFile) + if err != nil { + return nil, err + } + + tr = &http.Transport{ + TLSClientConfig: cfg, + } + } + + client = &http.Client{Transport: tr} + + } else if client == nil { + client = HTTPClient + } + + for i := 0; i < HTTP_MAX_RETRY; i++ { + res, err = client.Do(req) + if err != nil && isHttpConnError(err) { + continue + } + break + } + + if err != nil { + return nil, err + } + + return res, err +} + +func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error { + return doOutputAPI("PUT", baseURL, path, params, authHandler, out) +} + +func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error { + return doOutputAPI("POST", baseURL, path, params, authHandler, out) +} + +func doOutputAPI( + httpVerb string, + baseURL *url.URL, + path string, + params map[string]interface{}, + authHandler AuthHandler, + out interface{}) error { + + var requestUrl string + + if q := strings.Index(path, "?"); q > 0 { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] + } else { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path + } + + postData := url.Values{} + for k, v := range params { + postData.Set(k, fmt.Sprintf("%v", v)) + } + + req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode())) + if err != nil { + return err + } + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + err = maybeAddAuth(req, authHandler) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + return fmt.Errorf("HTTP error %v getting %q: %s", + res.Status, requestUrl, bod) + } + + d := json.NewDecoder(res.Body) + if err = d.Decode(&out); err != nil { + return err + } + return nil +} + +func queryRestAPI( + baseURL *url.URL, + path string, + authHandler AuthHandler, + out interface{}) error { + + var requestUrl string + + if q := strings.Index(path, "?"); q > 0 { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] + } else { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path + } + + req, err := http.NewRequest("GET", requestUrl, nil) + if err != nil { + return err + } + + err = maybeAddAuth(req, authHandler) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + return fmt.Errorf("HTTP error %v getting %q: %s", + res.Status, requestUrl, bod) + } + + d := json.NewDecoder(res.Body) + if err = d.Decode(&out); err != nil { + return err + } + return nil +} + +func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error { + return c.processStream(c.BaseURL, path, c.ah, callb, data) +} + +// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309 +func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error { + var requestUrl string + + if q := strings.Index(path, "?"); q > 0 { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] + } else { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path + } + + req, err := http.NewRequest("GET", requestUrl, nil) + if err != nil { + return err + } + + err = maybeAddAuth(req, authHandler) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + return fmt.Errorf("HTTP error %v getting %q: %s", + res.Status, requestUrl, bod) + } + + reader := bufio.NewReader(res.Body) + for { + bs, err := reader.ReadBytes('\n') + if err != nil { + return err + } + if len(bs) == 1 && bs[0] == '\n' { + continue + } + + err = json.Unmarshal(bs, data) + if err != nil { + return err + } + err = callb(data) + if err != nil { + return err + } + } + return nil + +} + +func (c *Client) parseURLResponse(path string, out interface{}) error { + return queryRestAPI(c.BaseURL, path, c.ah, out) +} + +func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error { + return doPostAPI(c.BaseURL, path, params, c.ah, out) +} + +func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error { + return doPutAPI(c.BaseURL, path, params, c.ah, out) +} + +func (b *Bucket) parseURLResponse(path string, out interface{}) error { + nodes := b.Nodes() + if len(nodes) == 0 { + return errors.New("no couch rest URLs") + } + + // Pick a random node to start querying. + startNode := rand.Intn(len(nodes)) + maxRetries := len(nodes) + for i := 0; i < maxRetries; i++ { + node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list. + // Skip non-healthy nodes. + if node.Status != "healthy" || node.CouchAPIBase == "" { + continue + } + url := &url.URL{ + Host: node.Hostname, + Scheme: "http", + } + + // Lock here to avoid having pool closed under us. + b.RLock() + err := queryRestAPI(url, path, b.pool.client.ah, out) + b.RUnlock() + if err == nil { + return err + } + } + return errors.New("All nodes failed to respond or no healthy nodes for bucket found") +} + +func (b *Bucket) parseAPIResponse(path string, out interface{}) error { + nodes := b.Nodes() + if len(nodes) == 0 { + return errors.New("no couch rest URLs") + } + + var err error + var u *url.URL + + // Pick a random node to start querying. + startNode := rand.Intn(len(nodes)) + maxRetries := len(nodes) + for i := 0; i < maxRetries; i++ { + node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list. + // Skip non-healthy nodes. + if node.Status != "healthy" || node.CouchAPIBase == "" { + continue + } + + u, err = ParseURL(node.CouchAPIBase) + // Lock here so pool does not get closed under us. + b.RLock() + if err != nil { + b.RUnlock() + return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", + b.Name, i, node.CouchAPIBase, err) + } else if b.pool != nil { + u.User = b.pool.client.BaseURL.User + } + u.Path = path + + // generate the path so that the strings are properly escaped + // MB-13770 + requestPath := strings.Split(u.String(), u.Host)[1] + + err = queryRestAPI(u, requestPath, b.pool.client.ah, out) + b.RUnlock() + if err == nil { + return err + } + } + + var errStr string + if err != nil { + errStr = "Error " + err.Error() + } + + return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr) +} + +type basicAuth struct { + u, p string +} + +func (b basicAuth) GetCredentials() (string, string, string) { + return b.u, b.p, b.u +} + +func basicAuthFromURL(us string) (ah AuthHandler) { + u, err := ParseURL(us) + if err != nil { + return + } + if user := u.User; user != nil { + pw, _ := user.Password() + ah = basicAuth{user.Username(), pw} + } + return +} + +// ConnectWithAuth connects to a couchbase cluster with the given +// authentication handler. +func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) { + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + c.ah = ah + + return c, c.parseURLResponse("/pools", &c.Info) +} + +// ConnectWithAuthCreds connects to a couchbase cluster with the give +// authorization creds returned by cb_auth +func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) { + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + + c.ah = newBucketAuth(username, password, "") + return c, c.parseURLResponse("/pools", &c.Info) + +} + +// Connect to a couchbase cluster. An authentication handler will be +// created from the userinfo in the URL if provided. +func Connect(baseU string) (Client, error) { + return ConnectWithAuth(baseU, basicAuthFromURL(baseU)) +} + +type BucketInfo struct { + Name string // name of bucket + Password string // SASL password of bucket +} + +//Get SASL buckets +func GetBucketList(baseU string) (bInfo []BucketInfo, err error) { + + c := &Client{} + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + c.ah = basicAuthFromURL(baseU) + + var buckets []Bucket + err = c.parseURLResponse("/pools/default/buckets", &buckets) + if err != nil { + return + } + bInfo = make([]BucketInfo, 0) + for _, bucket := range buckets { + bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password} + bInfo = append(bInfo, bucketInfo) + } + return bInfo, err +} + +//Set viewUpdateDaemonOptions +func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) { + + c := &Client{} + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + c.ah = basicAuthFromURL(baseU) + + if len(params) < 1 { + return nil, fmt.Errorf("No params to set") + } + + err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts) + if err != nil { + return + } + return viewOpts, err +} + +// This API lets the caller know, if the list of nodes a bucket is +// connected to has gone through an edit (a rebalance operation) +// since the last update to the bucket, in which case a Refresh is +// advised. +func (b *Bucket) NodeListChanged() bool { + b.RLock() + pool := b.pool + uri := b.URI + b.RUnlock() + + tmpb := &Bucket{} + err := pool.client.parseURLResponse(uri, tmpb) + if err != nil { + return true + } + + bNodes := *(*[]Node)(b.nodeList) + if len(bNodes) != len(tmpb.NodesJSON) { + return true + } + + bucketHostnames := map[string]bool{} + for _, node := range bNodes { + bucketHostnames[node.Hostname] = true + } + + for _, node := range tmpb.NodesJSON { + if _, found := bucketHostnames[node.Hostname]; !found { + return true + } + } + + return false +} + +// Sample data for scopes and collections as returned from the +// /pooles/default/$BUCKET_NAME/collections API. +// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}} + +// A Scopes holds the set of scopes in a bucket. +// The map key is the name of the scope. +type Scopes map[string]Collections + +// A Collections holds the set of collections in a scope. +// The map key is the name of the collection. +type Collections map[string]Collection + +// A Collection holds the information for a collection. +// It is currently returned empty. +type Collection struct{} + +func getScopesAndCollections(pool *Pool, bucketName string) (Scopes, error) { + scopes := make(Scopes) + // This URL is a bit of a hack. The "default" is the name of the pool, and should + // be a parameter. But the name does not appear to be available anywhere, + // and in any case we never use a pool other than "default". + err := pool.client.parseURLResponse(fmt.Sprintf("/pools/default/buckets/%s/collections", bucketName), &scopes) + if err != nil { + return nil, err + } + return scopes, nil +} + +func (b *Bucket) Refresh() error { + b.RLock() + pool := b.pool + uri := b.URI + name := b.Name + b.RUnlock() + + tmpb := &Bucket{} + err := pool.client.parseURLResponse(uri, tmpb) + if err != nil { + return err + } + + scopes, err := getScopesAndCollections(pool, name) + if err != nil { + return err + } + + pools := b.getConnPools(false /* bucket not already locked */) + + // We need this lock to ensure that bucket refreshes happening because + // of NMVb errors received during bulkGet do not end up over-writing + // pool.inUse. + b.Lock() + + for _, pool := range pools { + if pool != nil { + pool.inUse = false + } + } + + newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) + for i := range newcps { + + pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) + if pool != nil && pool.inUse == false { + // if the hostname and index is unchanged then reuse this pool + newcps[i] = pool + pool.inUse = true + continue + } + + if b.ah != nil { + newcps[i] = newConnectionPool( + tmpb.VBSMJson.ServerList[i], + b.ah, AsynchronousCloser, PoolSize, PoolOverflow) + + } else { + newcps[i] = newConnectionPool( + tmpb.VBSMJson.ServerList[i], + b.authHandler(true /* bucket already locked */), + AsynchronousCloser, PoolSize, PoolOverflow) + } + } + b.replaceConnPools2(newcps, true /* bucket already locked */) + tmpb.ah = b.ah + b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) + b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) + b.Scopes = scopes + + b.Unlock() + return nil +} + +func (p *Pool) refresh() (err error) { + p.BucketMap = make(map[string]Bucket) + + buckets := []Bucket{} + err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets) + if err != nil { + return err + } + for _, b := range buckets { + b.pool = p + b.nodeList = unsafe.Pointer(&b.NodesJSON) + b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList))) + + p.BucketMap[b.Name] = b + } + return nil +} + +// GetPool gets a pool from within the couchbase cluster (usually +// "default"). +func (c *Client) GetPool(name string) (p Pool, err error) { + var poolURI string + for _, p := range c.Info.Pools { + if p.Name == name { + poolURI = p.URI + } + } + if poolURI == "" { + return p, errors.New("No pool named " + name) + } + + err = c.parseURLResponse(poolURI, &p) + + p.client = *c + + err = p.refresh() + return +} + +// GetPoolServices returns all the bucket-independent services in a pool. +// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV) +func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) { + var poolName string + for _, p := range c.Info.Pools { + if p.Name == name { + poolName = p.Name + } + } + if poolName == "" { + return ps, errors.New("No pool named " + name) + } + + poolURI := "/pools/" + poolName + "/nodeServices" + err = c.parseURLResponse(poolURI, &ps) + + return +} + +// Close marks this bucket as no longer needed, closing connections it +// may have open. +func (b *Bucket) Close() { + b.Lock() + defer b.Unlock() + if b.connPools != nil { + for _, c := range b.getConnPools(true /* already locked */) { + if c != nil { + c.Close() + } + } + b.connPools = nil + } +} + +func bucketFinalizer(b *Bucket) { + if b.connPools != nil { + logging.Warnf("Finalizing a bucket with active connections.") + } +} + +// GetBucket gets a bucket from within this pool. +func (p *Pool) GetBucket(name string) (*Bucket, error) { + rv, ok := p.BucketMap[name] + if !ok { + return nil, &BucketNotFoundError{bucket: name} + } + runtime.SetFinalizer(&rv, bucketFinalizer) + err := rv.Refresh() + if err != nil { + return nil, err + } + return &rv, nil +} + +// GetBucket gets a bucket from within this pool. +func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) { + rv, ok := p.BucketMap[bucket] + if !ok { + return nil, &BucketNotFoundError{bucket: bucket} + } + runtime.SetFinalizer(&rv, bucketFinalizer) + rv.ah = newBucketAuth(username, password, bucket) + err := rv.Refresh() + if err != nil { + return nil, err + } + return &rv, nil +} + +// GetPool gets the pool to which this bucket belongs. +func (b *Bucket) GetPool() *Pool { + b.RLock() + defer b.RUnlock() + ret := b.pool + return ret +} + +// GetClient gets the client from which we got this pool. +func (p *Pool) GetClient() *Client { + return &p.client +} + +// GetBucket is a convenience function for getting a named bucket from +// a URL +func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) { + var err error + client, err := Connect(endpoint) + if err != nil { + return nil, err + } + + pool, err := client.GetPool(poolname) + if err != nil { + return nil, err + } + + return pool.GetBucket(bucketname) +} + +// ConnectWithAuthAndGetBucket is a convenience function for +// getting a named bucket from a given URL and an auth callback +func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string, + ah AuthHandler) (*Bucket, error) { + client, err := ConnectWithAuth(endpoint, ah) + if err != nil { + return nil, err + } + + pool, err := client.GetPool(poolname) + if err != nil { + return nil, err + } + + return pool.GetBucket(bucketname) +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/streaming.go b/vendor/github.com/couchbaselabs/go-couchbase/streaming.go new file mode 100644 index 0000000000..6467635371 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/streaming.go @@ -0,0 +1,209 @@ +package couchbase + +import ( + "encoding/json" + "fmt" + "github.com/couchbase/goutils/logging" + "io" + "io/ioutil" + "math/rand" + "net" + "net/http" + "time" + "unsafe" +) + +// Bucket auto-updater gets the latest version of the bucket config from +// the server. If the configuration has changed then updated the local +// bucket information. If the bucket has been deleted then notify anyone +// who is holding a reference to this bucket + +const MAX_RETRY_COUNT = 5 +const DISCONNECT_PERIOD = 120 * time.Second + +type NotifyFn func(bucket string, err error) + +// Use TCP keepalive to detect half close sockets +var updaterTransport http.RoundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, +} + +var updaterHTTPClient = &http.Client{Transport: updaterTransport} + +func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) { + + var err error + var res *http.Response + + for i := 0; i < HTTP_MAX_RETRY; i++ { + res, err = updaterHTTPClient.Do(req) + if err != nil && isHttpConnError(err) { + continue + } + break + } + + if err != nil { + return nil, err + } + + return res, err +} + +func (b *Bucket) RunBucketUpdater(notify NotifyFn) { + go func() { + err := b.UpdateBucket() + if err != nil { + if notify != nil { + notify(b.GetName(), err) + } + logging.Errorf(" Bucket Updater exited with err %v", err) + } + }() +} + +func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) { + if !bucketLocked { + b.Lock() + defer b.Unlock() + } + old := b.connPools + b.connPools = unsafe.Pointer(&with) + if old != nil { + for _, pool := range *(*[]*connectionPool)(old) { + if pool != nil && pool.inUse == false { + pool.Close() + } + } + } + return +} + +func (b *Bucket) UpdateBucket() error { + + var failures int + var returnErr error + + for { + + if failures == MAX_RETRY_COUNT { + logging.Errorf(" Maximum failures reached. Exiting loop...") + return fmt.Errorf("Max failures reached. Last Error %v", returnErr) + } + + nodes := b.Nodes() + if len(nodes) < 1 { + return fmt.Errorf("No healthy nodes found") + } + + startNode := rand.Intn(len(nodes)) + node := nodes[(startNode)%len(nodes)] + + streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, b.GetName()) + logging.Infof(" Trying with %s", streamUrl) + req, err := http.NewRequest("GET", streamUrl, nil) + if err != nil { + return err + } + + b.RLock() + pool := b.pool + bucketName := b.Name + b.RUnlock() + scopes, err := getScopesAndCollections(pool, bucketName) + if err != nil { + return err + } + + // Lock here to avoid having pool closed under us. + b.RLock() + err = maybeAddAuth(req, b.pool.client.ah) + b.RUnlock() + if err != nil { + return err + } + + res, err := doHTTPRequestForUpdate(req) + if err != nil { + return err + } + + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod) + res.Body.Close() + returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod) + failures++ + continue + } + + dec := json.NewDecoder(res.Body) + + tmpb := &Bucket{} + for { + + err := dec.Decode(&tmpb) + if err != nil { + returnErr = err + res.Body.Close() + break + } + + // if we got here, reset failure count + failures = 0 + b.Lock() + + // mark all the old connection pools for deletion + pools := b.getConnPools(true /* already locked */) + for _, pool := range pools { + if pool != nil { + pool.inUse = false + } + } + + newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) + for i := range newcps { + // get the old connection pool and check if it is still valid + pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) + if pool != nil && pool.inUse == false { + // if the hostname and index is unchanged then reuse this pool + newcps[i] = pool + pool.inUse = true + continue + } + // else create a new pool + if b.ah != nil { + newcps[i] = newConnectionPool( + tmpb.VBSMJson.ServerList[i], + b.ah, false, PoolSize, PoolOverflow) + + } else { + newcps[i] = newConnectionPool( + tmpb.VBSMJson.ServerList[i], + b.authHandler(true /* bucket already locked */), + false, PoolSize, PoolOverflow) + } + } + + b.replaceConnPools2(newcps, true /* bucket already locked */) + + tmpb.ah = b.ah + b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) + b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) + b.Scopes = scopes + b.Unlock() + + logging.Infof("Got new configuration for bucket %s", b.GetName()) + + } + // we are here because of an error + failures++ + continue + + } + return nil +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/tap.go b/vendor/github.com/couchbaselabs/go-couchbase/tap.go new file mode 100644 index 0000000000..86edd30554 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/tap.go @@ -0,0 +1,143 @@ +package couchbase + +import ( + "github.com/couchbase/gomemcached/client" + "github.com/couchbase/goutils/logging" + "sync" + "time" +) + +const initialRetryInterval = 1 * time.Second +const maximumRetryInterval = 30 * time.Second + +// A TapFeed streams mutation events from a bucket. +// +// Events from the bucket can be read from the channel 'C'. Remember +// to call Close() on it when you're done, unless its channel has +// closed itself already. +type TapFeed struct { + C <-chan memcached.TapEvent + + bucket *Bucket + args *memcached.TapArguments + nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes + output chan memcached.TapEvent // Same as C but writeably-typed + wg sync.WaitGroup + quit chan bool +} + +// StartTapFeed creates and starts a new Tap feed +func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) { + if args == nil { + defaultArgs := memcached.DefaultTapArguments() + args = &defaultArgs + } + + feed := &TapFeed{ + bucket: b, + args: args, + output: make(chan memcached.TapEvent, 10), + quit: make(chan bool), + } + + go feed.run() + + feed.C = feed.output + return feed, nil +} + +// Goroutine that runs the feed +func (feed *TapFeed) run() { + retryInterval := initialRetryInterval + bucketOK := true + for { + // Connect to the TAP feed of each server node: + if bucketOK { + killSwitch, err := feed.connectToNodes() + if err == nil { + // Run until one of the sub-feeds fails: + select { + case <-killSwitch: + case <-feed.quit: + return + } + feed.closeNodeFeeds() + retryInterval = initialRetryInterval + } + } + + // On error, try to refresh the bucket in case the list of nodes changed: + logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v", + feed.bucket.Name, retryInterval) + err := feed.bucket.Refresh() + bucketOK = err == nil + + select { + case <-time.After(retryInterval): + case <-feed.quit: + return + } + if retryInterval *= 2; retryInterval > maximumRetryInterval { + retryInterval = maximumRetryInterval + } + } +} + +func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) { + killSwitch = make(chan bool) + for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { + var singleFeed *memcached.TapFeed + singleFeed, err = serverConn.StartTapFeed(feed.args) + if err != nil { + logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err) + feed.closeNodeFeeds() + return + } + feed.nodeFeeds = append(feed.nodeFeeds, singleFeed) + go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host) + feed.wg.Add(1) + } + return +} + +// Goroutine that forwards Tap events from a single node's feed to the aggregate feed. +func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) { + defer feed.wg.Done() + for { + select { + case event, ok := <-singleFeed.C: + if !ok { + if singleFeed.Error != nil { + logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error) + } + killSwitch <- true + return + } + feed.output <- event + case <-feed.quit: + return + } + } +} + +func (feed *TapFeed) closeNodeFeeds() { + for _, f := range feed.nodeFeeds { + f.Close() + } + feed.nodeFeeds = nil +} + +// Close a Tap feed. +func (feed *TapFeed) Close() error { + select { + case <-feed.quit: + return nil + default: + } + + feed.closeNodeFeeds() + close(feed.quit) + feed.wg.Wait() + close(feed.output) + return nil +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/upr.go b/vendor/github.com/couchbaselabs/go-couchbase/upr.go new file mode 100644 index 0000000000..bf1b209b7e --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/upr.go @@ -0,0 +1,398 @@ +package couchbase + +import ( + "log" + "sync" + "time" + + "fmt" + "github.com/couchbase/gomemcached" + "github.com/couchbase/gomemcached/client" + "github.com/couchbase/goutils/logging" +) + +// A UprFeed streams mutation events from a bucket. +// +// Events from the bucket can be read from the channel 'C'. Remember +// to call Close() on it when you're done, unless its channel has +// closed itself already. +type UprFeed struct { + C <-chan *memcached.UprEvent + + bucket *Bucket + nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes + output chan *memcached.UprEvent // Same as C but writeably-typed + outputClosed bool + quit chan bool + name string // name of this UPR feed + sequence uint32 // sequence number for this feed + connected bool + killSwitch chan bool + closing bool + wg sync.WaitGroup + dcp_buffer_size uint32 + data_chan_size int +} + +// UprFeed from a single connection +type FeedInfo struct { + uprFeed *memcached.UprFeed // UPR feed handle + host string // hostname + connected bool // connected + quit chan bool // quit channel +} + +type FailoverLog map[uint16]memcached.FailoverLog + +// GetFailoverLogs, get the failover logs for a set of vbucket ids +func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) { + + // map vbids to their corresponding hosts + vbHostList := make(map[string][]uint16) + vbm := b.VBServerMap() + if len(vbm.VBucketMap) < len(vBuckets) { + return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", + vbm.VBucketMap, vBuckets) + } + + for _, vb := range vBuckets { + masterID := vbm.VBucketMap[vb][0] + master := b.getMasterNode(masterID) + if master == "" { + return nil, fmt.Errorf("No master found for vb %d", vb) + } + + vbList := vbHostList[master] + if vbList == nil { + vbList = make([]uint16, 0) + } + vbList = append(vbList, vb) + vbHostList[master] = vbList + } + + failoverLogMap := make(FailoverLog) + for _, serverConn := range b.getConnPools(false /* not already locked */) { + + vbList := vbHostList[serverConn.host] + if vbList == nil { + continue + } + + mc, err := serverConn.Get() + if err != nil { + logging.Infof("No Free connections for vblist %v", vbList) + return nil, fmt.Errorf("No Free connections for host %s", + serverConn.host) + + } + // close the connection so that it doesn't get reused for upr data + // connection + defer mc.Close() + failoverlogs, err := mc.UprGetFailoverLog(vbList) + if err != nil { + return nil, fmt.Errorf("Error getting failover log %s host %s", + err.Error(), serverConn.host) + + } + + for vb, log := range failoverlogs { + failoverLogMap[vb] = *log + } + } + + return failoverLogMap, nil +} + +func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) { + return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE) +} + +// StartUprFeed creates and starts a new Upr feed +// No data will be sent on the channel unless vbuckets streams are requested +func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) { + + feed := &UprFeed{ + bucket: b, + output: make(chan *memcached.UprEvent, data_chan_size), + quit: make(chan bool), + nodeFeeds: make(map[string]*FeedInfo, 0), + name: name, + sequence: sequence, + killSwitch: make(chan bool), + dcp_buffer_size: dcp_buffer_size, + data_chan_size: data_chan_size, + } + + err := feed.connectToNodes() + if err != nil { + return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error()) + } + feed.connected = true + go feed.run() + + feed.C = feed.output + return feed, nil +} + +// UprRequestStream starts a stream for a vb on a feed +func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32, + vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { + + defer func() { + if r := recover(); r != nil { + log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket) + } + }() + + vbm := feed.bucket.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", + vb, vbm.VBucketMap) + } + + if int(vb) >= len(vbm.VBucketMap) { + return fmt.Errorf("Invalid vbucket id %d", vb) + } + + masterID := vbm.VBucketMap[vb][0] + master := feed.bucket.getMasterNode(masterID) + if master == "" { + return fmt.Errorf("Master node not found for vbucket %d", vb) + } + singleFeed := feed.nodeFeeds[master] + if singleFeed == nil { + return fmt.Errorf("UprFeed for this host not found") + } + + if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags, + vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil { + return err + } + + return nil +} + +// UprCloseStream ends a vbucket stream. +func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error { + + defer func() { + if r := recover(); r != nil { + log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket) + } + }() + + vbm := feed.bucket.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", + vb, vbm.VBucketMap) + } + + if int(vb) >= len(vbm.VBucketMap) { + return fmt.Errorf("Invalid vbucket id %d", vb) + } + + masterID := vbm.VBucketMap[vb][0] + master := feed.bucket.getMasterNode(masterID) + if master == "" { + return fmt.Errorf("Master node not found for vbucket %d", vb) + } + singleFeed := feed.nodeFeeds[master] + if singleFeed == nil { + return fmt.Errorf("UprFeed for this host not found") + } + + if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil { + return err + } + return nil +} + +// Goroutine that runs the feed +func (feed *UprFeed) run() { + retryInterval := initialRetryInterval + bucketOK := true + for { + // Connect to the UPR feed of each server node: + if bucketOK { + // Run until one of the sub-feeds fails: + select { + case <-feed.killSwitch: + case <-feed.quit: + return + } + //feed.closeNodeFeeds() + retryInterval = initialRetryInterval + } + + if feed.closing == true { + // we have been asked to shut down + return + } + + // On error, try to refresh the bucket in case the list of nodes changed: + logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v", + feed.bucket.Name, retryInterval) + + if err := feed.bucket.Refresh(); err != nil { + // if we fail to refresh the bucket, exit the feed + // MB-14917 + logging.Infof("Unable to refresh bucket %s ", err.Error()) + close(feed.output) + feed.outputClosed = true + feed.closeNodeFeeds() + return + } + + // this will only connect to nodes that are not connected or changed + // user will have to reconnect the stream + err := feed.connectToNodes() + if err != nil { + logging.Infof("Unable to connect to nodes..exit ") + close(feed.output) + feed.outputClosed = true + feed.closeNodeFeeds() + return + } + bucketOK = err == nil + + select { + case <-time.After(retryInterval): + case <-feed.quit: + return + } + if retryInterval *= 2; retryInterval > maximumRetryInterval { + retryInterval = maximumRetryInterval + } + } +} + +func (feed *UprFeed) connectToNodes() (err error) { + nodeCount := 0 + for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { + + // this maybe a reconnection, so check if the connection to the node + // already exists. Connect only if the node is not found in the list + // or connected == false + nodeFeed := feed.nodeFeeds[serverConn.host] + + if nodeFeed != nil && nodeFeed.connected == true { + continue + } + + var singleFeed *memcached.UprFeed + var name string + if feed.name == "" { + name = "DefaultUprClient" + } else { + name = feed.name + } + singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size) + if err != nil { + logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err) + feed.closeNodeFeeds() + return + } + // add the node to the connection map + feedInfo := &FeedInfo{ + uprFeed: singleFeed, + connected: true, + host: serverConn.host, + quit: make(chan bool), + } + feed.nodeFeeds[serverConn.host] = feedInfo + go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host) + feed.wg.Add(1) + nodeCount++ + } + if nodeCount == 0 { + return fmt.Errorf("No connection to bucket") + } + + return nil +} + +// Goroutine that forwards Upr events from a single node's feed to the aggregate feed. +func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) { + singleFeed := nodeFeed.uprFeed + + defer func() { + feed.wg.Done() + if r := recover(); r != nil { + //if feed is not closing, re-throw the panic + if feed.outputClosed != true && feed.closing != true { + panic(r) + } else { + logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully") + + } + } + }() + + for { + select { + case <-nodeFeed.quit: + nodeFeed.connected = false + return + + case event, ok := <-singleFeed.C: + if !ok { + if singleFeed.Error != nil { + logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error) + } + killSwitch <- true + return + } + if feed.outputClosed == true { + // someone closed the node feed + logging.Infof("Node need closed, returning from forwardUprEvent") + return + } + feed.output <- event + if event.Status == gomemcached.NOT_MY_VBUCKET { + logging.Infof(" Got a not my vbucket error !! ") + if err := feed.bucket.Refresh(); err != nil { + logging.Errorf("Unable to refresh bucket %s ", err.Error()) + feed.closeNodeFeeds() + return + } + // this will only connect to nodes that are not connected or changed + // user will have to reconnect the stream + if err := feed.connectToNodes(); err != nil { + logging.Errorf("Unable to connect to nodes %s", err.Error()) + return + } + + } + } + } +} + +func (feed *UprFeed) closeNodeFeeds() { + for _, f := range feed.nodeFeeds { + logging.Infof(" Sending close to forwardUprEvent ") + close(f.quit) + f.uprFeed.Close() + } + feed.nodeFeeds = nil +} + +// Close a Upr feed. +func (feed *UprFeed) Close() error { + select { + case <-feed.quit: + return nil + default: + } + + feed.closing = true + feed.closeNodeFeeds() + close(feed.quit) + + feed.wg.Wait() + if feed.outputClosed == false { + feed.outputClosed = true + close(feed.output) + } + + return nil +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/users.go b/vendor/github.com/couchbaselabs/go-couchbase/users.go new file mode 100644 index 0000000000..47d4861522 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/users.go @@ -0,0 +1,119 @@ +package couchbase + +import ( + "bytes" + "fmt" +) + +type User struct { + Name string + Id string + Domain string + Roles []Role +} + +type Role struct { + Role string + BucketName string `json:"bucket_name"` +} + +// Sample: +// {"role":"admin","name":"Admin","desc":"Can manage ALL cluster features including security.","ce":true} +// {"role":"query_select","bucket_name":"*","name":"Query Select","desc":"Can execute SELECT statement on bucket to retrieve data"} +type RoleDescription struct { + Role string + Name string + Desc string + Ce bool + BucketName string `json:"bucket_name"` +} + +// Return user-role data, as parsed JSON. +// Sample: +// [{"id":"ivanivanov","name":"Ivan Ivanov","roles":[{"role":"cluster_admin"},{"bucket_name":"default","role":"bucket_admin"}]}, +// {"id":"petrpetrov","name":"Petr Petrov","roles":[{"role":"replication_admin"}]}] +func (c *Client) GetUserRoles() ([]interface{}, error) { + ret := make([]interface{}, 0, 1) + err := c.parseURLResponse("/settings/rbac/users", &ret) + if err != nil { + return nil, err + } + + // Get the configured administrator. + // Expected result: {"port":8091,"username":"Administrator"} + adminInfo := make(map[string]interface{}, 2) + err = c.parseURLResponse("/settings/web", &adminInfo) + if err != nil { + return nil, err + } + + // Create a special entry for the configured administrator. + adminResult := map[string]interface{}{ + "name": adminInfo["username"], + "id": adminInfo["username"], + "domain": "ns_server", + "roles": []interface{}{ + map[string]interface{}{ + "role": "admin", + }, + }, + } + + // Add the configured administrator to the list of results. + ret = append(ret, adminResult) + + return ret, nil +} + +func (c *Client) GetUserInfoAll() ([]User, error) { + ret := make([]User, 0, 16) + err := c.parseURLResponse("/settings/rbac/users", &ret) + if err != nil { + return nil, err + } + return ret, nil +} + +func rolesToParamFormat(roles []Role) string { + var buffer bytes.Buffer + for i, role := range roles { + if i > 0 { + buffer.WriteString(",") + } + buffer.WriteString(role.Role) + if role.BucketName != "" { + buffer.WriteString("[") + buffer.WriteString(role.BucketName) + buffer.WriteString("]") + } + } + return buffer.String() +} + +func (c *Client) PutUserInfo(u *User) error { + params := map[string]interface{}{ + "name": u.Name, + "roles": rolesToParamFormat(u.Roles), + } + var target string + switch u.Domain { + case "external": + target = "/settings/rbac/users/" + u.Id + case "local": + target = "/settings/rbac/users/local/" + u.Id + default: + return fmt.Errorf("Unknown user type: %s", u.Domain) + } + var ret string // PUT returns an empty string. We ignore it. + err := c.parsePutURLResponse(target, params, &ret) + return err +} + +func (c *Client) GetRolesAll() ([]RoleDescription, error) { + ret := make([]RoleDescription, 0, 32) + err := c.parseURLResponse("/settings/rbac/roles", &ret) + if err != nil { + return nil, err + } + return ret, nil +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/util.go b/vendor/github.com/couchbaselabs/go-couchbase/util.go new file mode 100644 index 0000000000..4d286a3271 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/util.go @@ -0,0 +1,49 @@ +package couchbase + +import ( + "fmt" + "net/url" + "strings" +) + +// CleanupHost returns the hostname with the given suffix removed. +func CleanupHost(h, commonSuffix string) string { + if strings.HasSuffix(h, commonSuffix) { + return h[:len(h)-len(commonSuffix)] + } + return h +} + +// FindCommonSuffix returns the longest common suffix from the given +// strings. +func FindCommonSuffix(input []string) string { + rv := "" + if len(input) < 2 { + return "" + } + from := input + for i := len(input[0]); i > 0; i-- { + common := true + suffix := input[0][i:] + for _, s := range from { + if !strings.HasSuffix(s, suffix) { + common = false + break + } + } + if common { + rv = suffix + } + } + return rv +} + +// ParseURL is a wrapper around url.Parse with some sanity-checking +func ParseURL(urlStr string) (result *url.URL, err error) { + result, err = url.Parse(urlStr) + if result != nil && result.Scheme == "" { + result = nil + err = fmt.Errorf("invalid URL <%s>", urlStr) + } + return +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go b/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go new file mode 100644 index 0000000000..b96a18ed57 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go @@ -0,0 +1,77 @@ +package couchbase + +var crc32tab = []uint32{ + 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, + 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3, + 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, + 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, + 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de, + 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, + 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, + 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5, + 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, + 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, + 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940, + 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, + 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, + 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f, + 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, + 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, + 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a, + 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, + 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, + 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01, + 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, + 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, + 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c, + 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, + 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, + 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb, + 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, + 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, + 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086, + 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, + 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, + 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad, + 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, + 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, + 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8, + 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, + 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, + 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7, + 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, + 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, + 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252, + 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, + 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, + 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79, + 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, + 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, + 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04, + 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, + 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, + 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713, + 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, + 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, + 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e, + 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, + 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, + 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45, + 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, + 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, + 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0, + 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, + 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, + 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf, + 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, + 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d} + +// VBHash finds the vbucket for the given key. +func (b *Bucket) VBHash(key string) uint32 { + crc := uint32(0xffffffff) + for x := 0; x < len(key); x++ { + crc = (crc >> 8) ^ crc32tab[(uint64(crc)^uint64(key[x]))&0xff] + } + vbm := b.VBServerMap() + return ((^crc) >> 16) & 0x7fff & (uint32(len(vbm.VBucketMap)) - 1) +} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/views.go b/vendor/github.com/couchbaselabs/go-couchbase/views.go new file mode 100644 index 0000000000..2f68642f5a --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/views.go @@ -0,0 +1,231 @@ +package couchbase + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "time" +) + +// ViewRow represents a single result from a view. +// +// Doc is present only if include_docs was set on the request. +type ViewRow struct { + ID string + Key interface{} + Value interface{} + Doc *interface{} +} + +// A ViewError is a node-specific error indicating a partial failure +// within a view result. +type ViewError struct { + From string + Reason string +} + +func (ve ViewError) Error() string { + return "Node: " + ve.From + ", reason: " + ve.Reason +} + +// ViewResult holds the entire result set from a view request, +// including the rows and the errors. +type ViewResult struct { + TotalRows int `json:"total_rows"` + Rows []ViewRow + Errors []ViewError +} + +func (b *Bucket) randomBaseURL() (*url.URL, error) { + nodes := b.HealthyNodes() + if len(nodes) == 0 { + return nil, errors.New("no available couch rest URLs") + } + nodeNo := rand.Intn(len(nodes)) + node := nodes[nodeNo] + + b.RLock() + name := b.Name + pool := b.pool + b.RUnlock() + + u, err := ParseURL(node.CouchAPIBase) + if err != nil { + return nil, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", + name, nodeNo, node.CouchAPIBase, err) + } else if pool != nil { + u.User = pool.client.BaseURL.User + } + return u, err +} + +const START_NODE_ID = -1 + +func (b *Bucket) randomNextURL(lastNode int) (*url.URL, int, error) { + nodes := b.HealthyNodes() + if len(nodes) == 0 { + return nil, -1, errors.New("no available couch rest URLs") + } + + var nodeNo int + if lastNode == START_NODE_ID || lastNode >= len(nodes) { + // randomly select a node if the value of lastNode is invalid + nodeNo = rand.Intn(len(nodes)) + } else { + // wrap around the node list + nodeNo = (lastNode + 1) % len(nodes) + } + + b.RLock() + name := b.Name + pool := b.pool + b.RUnlock() + + node := nodes[nodeNo] + u, err := ParseURL(node.CouchAPIBase) + if err != nil { + return nil, -1, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", + name, nodeNo, node.CouchAPIBase, err) + } else if pool != nil { + u.User = pool.client.BaseURL.User + } + return u, nodeNo, err +} + +// DocID is the document ID type for the startkey_docid parameter in +// views. +type DocID string + +func qParam(k, v string) string { + format := `"%s"` + switch k { + case "startkey_docid", "endkey_docid", "stale": + format = "%s" + } + return fmt.Sprintf(format, v) +} + +// ViewURL constructs a URL for a view with the given ddoc, view name, +// and parameters. +func (b *Bucket) ViewURL(ddoc, name string, + params map[string]interface{}) (string, error) { + u, err := b.randomBaseURL() + if err != nil { + return "", err + } + + values := url.Values{} + for k, v := range params { + switch t := v.(type) { + case DocID: + values[k] = []string{string(t)} + case string: + values[k] = []string{qParam(k, t)} + case int: + values[k] = []string{fmt.Sprintf(`%d`, t)} + case bool: + values[k] = []string{fmt.Sprintf(`%v`, t)} + default: + b, err := json.Marshal(v) + if err != nil { + return "", fmt.Errorf("unsupported value-type %T in Query, "+ + "json encoder said %v", t, err) + } + values[k] = []string{fmt.Sprintf(`%v`, string(b))} + } + } + + if ddoc == "" && name == "_all_docs" { + u.Path = fmt.Sprintf("/%s/_all_docs", b.GetName()) + } else { + u.Path = fmt.Sprintf("/%s/_design/%s/_view/%s", b.GetName(), ddoc, name) + } + u.RawQuery = values.Encode() + + return u.String(), nil +} + +// ViewCallback is called for each view invocation. +var ViewCallback func(ddoc, name string, start time.Time, err error) + +// ViewCustom performs a view request that can map row values to a +// custom type. +// +// See the source to View for an example usage. +func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{}, + vres interface{}) (err error) { + if SlowServerCallWarningThreshold > 0 { + defer slowLog(time.Now(), "call to ViewCustom(%q, %q)", ddoc, name) + } + + if ViewCallback != nil { + defer func(t time.Time) { ViewCallback(ddoc, name, t, err) }(time.Now()) + } + + u, err := b.ViewURL(ddoc, name, params) + if err != nil { + return err + } + + req, err := http.NewRequest("GET", u, nil) + if err != nil { + return err + } + + ah := b.authHandler(false /* bucket not yet locked */) + maybeAddAuth(req, ah) + + res, err := doHTTPRequest(req) + if err != nil { + return fmt.Errorf("error starting view req at %v: %v", u, err) + } + defer res.Body.Close() + + if res.StatusCode != 200 { + bod := make([]byte, 512) + l, _ := res.Body.Read(bod) + return fmt.Errorf("error executing view req at %v: %v - %s", + u, res.Status, bod[:l]) + } + + body, err := ioutil.ReadAll(res.Body) + if err := json.Unmarshal(body, vres); err != nil { + return nil + } + + return nil +} + +// View executes a view. +// +// The ddoc parameter is just the bare name of your design doc without +// the "_design/" prefix. +// +// Parameters are string keys with values that correspond to couchbase +// view parameters. Primitive should work fairly naturally (booleans, +// ints, strings, etc...) and other values will attempt to be JSON +// marshaled (useful for array indexing on on view keys, for example). +// +// Example: +// +// res, err := couchbase.View("myddoc", "myview", map[string]interface{}{ +// "group_level": 2, +// "startkey_docid": []interface{}{"thing"}, +// "endkey_docid": []interface{}{"thing", map[string]string{}}, +// "stale": false, +// }) +func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error) { + vres := ViewResult{} + + if err := b.ViewCustom(ddoc, name, params, &vres); err != nil { + //error in accessing views. Retry once after a bucket refresh + b.Refresh() + return vres, b.ViewCustom(ddoc, name, params, &vres) + } else { + return vres, nil + } +} |