aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbase/go-couchbase
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/couchbase/go-couchbase')
-rw-r--r--vendor/github.com/couchbase/go-couchbase/.gitignore14
-rw-r--r--vendor/github.com/couchbase/go-couchbase/.travis.yml5
-rw-r--r--vendor/github.com/couchbase/go-couchbase/LICENSE19
-rw-r--r--vendor/github.com/couchbase/go-couchbase/README.markdown37
-rw-r--r--vendor/github.com/couchbase/go-couchbase/audit.go32
-rw-r--r--vendor/github.com/couchbase/go-couchbase/client.go1513
-rw-r--r--vendor/github.com/couchbase/go-couchbase/conn_pool.go421
-rw-r--r--vendor/github.com/couchbase/go-couchbase/ddocs.go288
-rw-r--r--vendor/github.com/couchbase/go-couchbase/go.mod3
-rw-r--r--vendor/github.com/couchbase/go-couchbase/observe.go300
-rw-r--r--vendor/github.com/couchbase/go-couchbase/pools.go1746
-rw-r--r--vendor/github.com/couchbase/go-couchbase/port_map.go106
-rw-r--r--vendor/github.com/couchbase/go-couchbase/streaming.go228
-rw-r--r--vendor/github.com/couchbase/go-couchbase/tap.go143
-rw-r--r--vendor/github.com/couchbase/go-couchbase/upr.go399
-rw-r--r--vendor/github.com/couchbase/go-couchbase/users.go121
-rw-r--r--vendor/github.com/couchbase/go-couchbase/util.go49
-rw-r--r--vendor/github.com/couchbase/go-couchbase/vbmap.go77
-rw-r--r--vendor/github.com/couchbase/go-couchbase/views.go231
19 files changed, 5732 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/go-couchbase/.gitignore b/vendor/github.com/couchbase/go-couchbase/.gitignore
new file mode 100644
index 0000000000..eda885ce8d
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/.gitignore
@@ -0,0 +1,14 @@
+#*
+*.6
+*.a
+*~
+*.swp
+/examples/basic/basic
+/hello/hello
+/populate/populate
+/tools/view2go/view2go
+/tools/loadfile/loadfile
+gotags.files
+TAGS
+6.out
+_*
diff --git a/vendor/github.com/couchbase/go-couchbase/.travis.yml b/vendor/github.com/couchbase/go-couchbase/.travis.yml
new file mode 100644
index 0000000000..4ecafb1894
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/.travis.yml
@@ -0,0 +1,5 @@
+language: go
+install: go get -v -d ./... && go build -v ./...
+script: go test -v ./...
+
+go: 1.1.1
diff --git a/vendor/github.com/couchbase/go-couchbase/LICENSE b/vendor/github.com/couchbase/go-couchbase/LICENSE
new file mode 100644
index 0000000000..0b23ef358e
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2013 Couchbase, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is furnished to do
+so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/couchbase/go-couchbase/README.markdown b/vendor/github.com/couchbase/go-couchbase/README.markdown
new file mode 100644
index 0000000000..bf5fe49421
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/README.markdown
@@ -0,0 +1,37 @@
+# A smart client for couchbase in go
+
+This is a *unoffical* version of a Couchbase Golang client. If you are
+looking for the *Offical* Couchbase Golang client please see
+ [CB-go])[https://github.com/couchbaselabs/gocb].
+
+This is an evolving package, but does provide a useful interface to a
+[couchbase](http://www.couchbase.com/) server including all of the
+pool/bucket discovery features, compatible key distribution with other
+clients, and vbucket motion awareness so application can continue to
+operate during rebalances.
+
+It also supports view querying with source node randomization so you
+don't bang on all one node to do all the work.
+
+## Install
+
+ go get github.com/couchbase/go-couchbase
+
+## Example
+
+ c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/")
+ if err != nil {
+ log.Fatalf("Error connecting: %v", err)
+ }
+
+ pool, err := c.GetPool("default")
+ if err != nil {
+ log.Fatalf("Error getting pool: %v", err)
+ }
+
+ bucket, err := pool.GetBucket("default")
+ if err != nil {
+ log.Fatalf("Error getting bucket: %v", err)
+ }
+
+ bucket.Set("someKey", 0, []string{"an", "example", "list"})
diff --git a/vendor/github.com/couchbase/go-couchbase/audit.go b/vendor/github.com/couchbase/go-couchbase/audit.go
new file mode 100644
index 0000000000..3db7d9f9ff
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/audit.go
@@ -0,0 +1,32 @@
+package couchbase
+
+import ()
+
+// Sample data:
+// {"disabled":["12333", "22244"],"uid":"132492431","auditdEnabled":true,
+// "disabledUsers":[{"name":"bill","domain":"local"},{"name":"bob","domain":"local"}],
+// "logPath":"/Users/johanlarson/Library/Application Support/Couchbase/var/lib/couchbase/logs",
+// "rotateInterval":86400,"rotateSize":20971520}
+type AuditSpec struct {
+ Disabled []uint32 `json:"disabled"`
+ Uid string `json:"uid"`
+ AuditdEnabled bool `json:"auditdEnabled`
+ DisabledUsers []AuditUser `json:"disabledUsers"`
+ LogPath string `json:"logPath"`
+ RotateInterval int64 `json:"rotateInterval"`
+ RotateSize int64 `json:"rotateSize"`
+}
+
+type AuditUser struct {
+ Name string `json:"name"`
+ Domain string `json:"domain"`
+}
+
+func (c *Client) GetAuditSpec() (*AuditSpec, error) {
+ ret := &AuditSpec{}
+ err := c.parseURLResponse("/settings/audit", ret)
+ if err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/client.go b/vendor/github.com/couchbase/go-couchbase/client.go
new file mode 100644
index 0000000000..63d125dade
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/client.go
@@ -0,0 +1,1513 @@
+/*
+Package couchbase provides a smart client for go.
+
+Usage:
+
+ client, err := couchbase.Connect("http://myserver:8091/")
+ handleError(err)
+ pool, err := client.GetPool("default")
+ handleError(err)
+ bucket, err := pool.GetBucket("MyAwesomeBucket")
+ handleError(err)
+ ...
+
+or a shortcut for the bucket directly
+
+ bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")
+
+in any case, you can specify authentication credentials using
+standard URL userinfo syntax:
+
+ b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
+ "default", "bucket")
+*/
+package couchbase
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+ "unsafe"
+
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/gomemcached/client" // package name is 'memcached'
+ "github.com/couchbase/goutils/logging"
+)
+
+// Mutation Token
+type MutationToken struct {
+ VBid uint16 // vbucket id
+ Guard uint64 // vbuuid
+ Value uint64 // sequence number
+}
+
+// Maximum number of times to retry a chunk of a bulk get on error.
+var MaxBulkRetries = 5000
+var backOffDuration time.Duration = 100 * time.Millisecond
+var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s)
+
+// If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call
+// takes longer than that.
+var SlowServerCallWarningThreshold time.Duration
+
+func slowLog(startTime time.Time, format string, args ...interface{}) {
+ if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold {
+ pc, _, _, _ := runtime.Caller(2)
+ caller := runtime.FuncForPC(pc).Name()
+ logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...)
+ }
+}
+
+// Return true if error is KEY_ENOENT. Required by cbq-engine
+func IsKeyEExistsError(err error) bool {
+
+ res, ok := err.(*gomemcached.MCResponse)
+ if ok && res.Status == gomemcached.KEY_EEXISTS {
+ return true
+ }
+
+ return false
+}
+
+// Return true if error is KEY_ENOENT. Required by cbq-engine
+func IsKeyNoEntError(err error) bool {
+
+ res, ok := err.(*gomemcached.MCResponse)
+ if ok && res.Status == gomemcached.KEY_ENOENT {
+ return true
+ }
+
+ return false
+}
+
+// Return true if error suggests a bucket refresh is required. Required by cbq-engine
+func IsRefreshRequired(err error) bool {
+
+ res, ok := err.(*gomemcached.MCResponse)
+ if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) {
+ return true
+ }
+
+ return false
+}
+
+// Return true if a collection is not known. Required by cbq-engine
+func IsUnknownCollection(err error) bool {
+
+ res, ok := err.(*gomemcached.MCResponse)
+ if ok && (res.Status == gomemcached.UNKNOWN_COLLECTION) {
+ return true
+ }
+
+ return false
+}
+
+// ClientOpCallback is called for each invocation of Do.
+var ClientOpCallback func(opname, k string, start time.Time, err error)
+
+// Do executes a function on a memcached connection to the node owning key "k"
+//
+// Note that this automatically handles transient errors by replaying
+// your function on a "not-my-vbucket" error, so don't assume
+// your command will only be executed only once.
+func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) {
+ return b.Do2(k, f, true)
+}
+
+func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) {
+ if SlowServerCallWarningThreshold > 0 {
+ defer slowLog(time.Now(), "call to Do(%q)", k)
+ }
+
+ vb := b.VBHash(k)
+ maxTries := len(b.Nodes()) * 2
+ for i := 0; i < maxTries; i++ {
+ conn, pool, err := b.getConnectionToVBucket(vb)
+ if err != nil {
+ if isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
+ b.Refresh()
+ continue
+ }
+ return err
+ }
+
+ if deadline && DefaultTimeout > 0 {
+ conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
+ } else {
+ conn.SetDeadline(noDeadline)
+ }
+ err = f(conn, uint16(vb))
+
+ var retry bool
+ discard := isOutOfBoundsError(err)
+
+ // MB-30967 / MB-31001 implement back off for transient errors
+ if resp, ok := err.(*gomemcached.MCResponse); ok {
+ switch resp.Status {
+ case gomemcached.NOT_MY_VBUCKET:
+ b.Refresh()
+ // MB-28842: in case of NMVB, check if the node is still part of the map
+ // and ditch the connection if it isn't.
+ discard = b.checkVBmap(pool.Node())
+ retry = true
+ case gomemcached.NOT_SUPPORTED:
+ discard = true
+ retry = true
+ case gomemcached.ENOMEM:
+ fallthrough
+ case gomemcached.TMPFAIL:
+ retry = backOff(i, maxTries, backOffDuration, true)
+ default:
+ retry = false
+ }
+ } else if err != nil && isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
+ retry = true
+ }
+
+ if discard {
+ pool.Discard(conn)
+ } else {
+ pool.Return(conn)
+ }
+
+ if !retry {
+ return err
+ }
+ }
+
+ return fmt.Errorf("unable to complete action after %v attemps", maxTries)
+}
+
+type GatheredStats struct {
+ Server string
+ Stats map[string]string
+ Err error
+}
+
+func getStatsParallel(sn string, b *Bucket, offset int, which string,
+ ch chan<- GatheredStats) {
+ pool := b.getConnPool(offset)
+ var gatheredStats GatheredStats
+
+ conn, err := pool.Get()
+ defer func() {
+ pool.Return(conn)
+ ch <- gatheredStats
+ }()
+
+ if err != nil {
+ gatheredStats = GatheredStats{Server: sn, Err: err}
+ } else {
+ conn.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
+ sm, err := conn.StatsMap(which)
+ gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err}
+ }
+}
+
+// GetStats gets a set of stats from all servers.
+//
+// Returns a map of server ID -> map of stat key to map value.
+func (b *Bucket) GetStats(which string) map[string]map[string]string {
+ rv := map[string]map[string]string{}
+ for server, gs := range b.GatherStats(which) {
+ if len(gs.Stats) > 0 {
+ rv[server] = gs.Stats
+ }
+ }
+ return rv
+}
+
+// GatherStats returns a map of server ID -> GatheredStats from all servers.
+func (b *Bucket) GatherStats(which string) map[string]GatheredStats {
+ vsm := b.VBServerMap()
+ if vsm.ServerList == nil {
+ return nil
+ }
+
+ // Go grab all the things at once.
+ ch := make(chan GatheredStats, len(vsm.ServerList))
+ for i, sn := range vsm.ServerList {
+ go getStatsParallel(sn, b, i, which, ch)
+ }
+
+ // Gather the results
+ rv := map[string]GatheredStats{}
+ for range vsm.ServerList {
+ gs := <-ch
+ rv[gs.Server] = gs
+ }
+ return rv
+}
+
+// Get bucket count through the bucket stats
+func (b *Bucket) GetCount(refresh bool, context ...*memcached.ClientContext) (count int64, err error) {
+ if refresh {
+ b.Refresh()
+ }
+
+ var cnt int64
+ if len(context) > 0 {
+ key := fmt.Sprintf("collections-byid 0x%x", context[0].CollId)
+ resKey := ""
+ for _, gs := range b.GatherStats(key) {
+ if len(gs.Stats) > 0 {
+
+ // the key encodes the scope and collection id
+ // we don't have the scope id, so we have to find it...
+ if resKey == "" {
+ for k, _ := range gs.Stats {
+ resKey = strings.TrimRightFunc(k, func(r rune) bool {
+ return r != ':'
+ }) + "items"
+ break
+ }
+ }
+ cnt, err = strconv.ParseInt(gs.Stats[resKey], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ count += cnt
+ } else if gs.Err != nil {
+ return 0, gs.Err
+ }
+ }
+ } else {
+ for _, gs := range b.GatherStats("") {
+ if len(gs.Stats) > 0 {
+ cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ count += cnt
+ } else if gs.Err != nil {
+ return 0, gs.Err
+ }
+ }
+ }
+
+ return count, nil
+}
+
+// Get bucket document size through the bucket stats
+func (b *Bucket) GetSize(refresh bool, context ...*memcached.ClientContext) (size int64, err error) {
+
+ if refresh {
+ b.Refresh()
+ }
+
+ var sz int64
+ if len(context) > 0 {
+ key := fmt.Sprintf("collections-byid 0x%x", context[0].CollId)
+ resKey := ""
+ for _, gs := range b.GatherStats(key) {
+ if len(gs.Stats) > 0 {
+
+ // the key encodes the scope and collection id
+ // we don't have the scope id, so we have to find it...
+ if resKey == "" {
+ for k, _ := range gs.Stats {
+ resKey = strings.TrimRightFunc(k, func(r rune) bool {
+ return r != ':'
+ }) + "disk_size"
+ break
+ }
+ }
+ sz, err = strconv.ParseInt(gs.Stats[resKey], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ size += sz
+ } else if gs.Err != nil {
+ return 0, gs.Err
+ }
+ }
+ } else {
+ for _, gs := range b.GatherStats("") {
+ if len(gs.Stats) > 0 {
+ sz, err = strconv.ParseInt(gs.Stats["ep_value_size"], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ size += sz
+ } else if gs.Err != nil {
+ return 0, gs.Err
+ }
+ }
+ }
+
+ return size, nil
+}
+
+func isAuthError(err error) bool {
+ estr := err.Error()
+ return strings.Contains(estr, "Auth failure")
+}
+
+func IsReadTimeOutError(err error) bool {
+ estr := err.Error()
+ return strings.Contains(estr, "read tcp") ||
+ strings.Contains(estr, "i/o timeout")
+}
+
+func isTimeoutError(err error) bool {
+ estr := err.Error()
+ return strings.Contains(estr, "i/o timeout") ||
+ strings.Contains(estr, "connection timed out") ||
+ strings.Contains(estr, "no route to host")
+}
+
+// Errors that are not considered fatal for our fetch loop
+func isConnError(err error) bool {
+ if err == io.EOF {
+ return true
+ }
+ estr := err.Error()
+ return strings.Contains(estr, "broken pipe") ||
+ strings.Contains(estr, "connection reset") ||
+ strings.Contains(estr, "connection refused") ||
+ strings.Contains(estr, "connection pool is closed")
+}
+
+func isOutOfBoundsError(err error) bool {
+ return err != nil && strings.Contains(err.Error(), "Out of Bounds error")
+
+}
+
+func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time {
+ if reqDeadline.IsZero() {
+ if duration > 0 {
+ return time.Unix(time.Now().Unix(), 0).Add(duration)
+ } else {
+ return noDeadline
+ }
+ }
+ return reqDeadline
+}
+
+func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool {
+ if attempt < maxAttempts {
+ // 0th attempt return immediately
+ if attempt > 0 {
+ if exponential {
+ duration = time.Duration(attempt) * duration
+ }
+ time.Sleep(duration)
+ }
+ return true
+ }
+
+ return false
+}
+
+func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time,
+ ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
+ eStatus *errorStatus, context ...*memcached.ClientContext) {
+ if SlowServerCallWarningThreshold > 0 {
+ defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys))
+ }
+
+ rv := _STRING_MCRESPONSE_POOL.Get()
+ attempts := 0
+ backOffAttempts := 0
+ done := false
+ bname := b.Name
+ for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ {
+
+ if len(b.VBServerMap().VBucketMap) < int(vb) {
+ //fatal
+ err := fmt.Errorf("vbmap smaller than requested for %v", bname)
+ logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap))
+ ech <- err
+ return
+ }
+
+ masterID := b.VBServerMap().VBucketMap[vb][0]
+
+ if masterID < 0 {
+ // fatal
+ err := fmt.Errorf("No master node available for %v vb %d", bname, vb)
+ logging.Errorf("%v", err.Error())
+ ech <- err
+ return
+ }
+
+ // This stack frame exists to ensure we can clean up
+ // connection at a reasonable time.
+ err := func() error {
+ pool := b.getConnPool(masterID)
+ conn, err := pool.Get()
+ if err != nil {
+ if isAuthError(err) || isTimeoutError(err) {
+ logging.Errorf("Fatal Error %v : %v", bname, err)
+ ech <- err
+ return err
+ } else if isConnError(err) {
+ if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
+ logging.Errorf("Connection Error %v : %v", bname, err)
+ ech <- err
+ return err
+ }
+ b.Refresh()
+ backOffAttempts++
+ }
+ logging.Infof("Pool Get returned %v: %v", bname, err)
+ // retry
+ return nil
+ }
+
+ conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
+ err = conn.GetBulk(vb, keys, rv, subPaths, context...)
+
+ discard := false
+ defer func() {
+ if discard {
+ pool.Discard(conn)
+ } else {
+ pool.Return(conn)
+ }
+ }()
+
+ switch err.(type) {
+ case *gomemcached.MCResponse:
+ notSMaxTries := len(b.Nodes()) * 2
+ st := err.(*gomemcached.MCResponse).Status
+ if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) {
+ b.Refresh()
+ discard = b.checkVBmap(pool.Node())
+ return nil // retry
+ } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED {
+ if (attempts % (MaxBulkRetries / 100)) == 0 {
+ logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
+ err.Error(), bname, vb, keys)
+ }
+ return nil // retry
+ } else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
+ // MB-30967 / MB-31001 use backoff for TMPFAIL too
+ backOffAttempts++
+ logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
+ err.Error(), bname, vb, keys)
+ return nil // retry
+ }
+ ech <- err
+ return err
+ case error:
+ if isOutOfBoundsError(err) {
+ // We got an out of bound error, retry the operation
+ discard = true
+ return nil
+ } else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
+ backOffAttempts++
+ logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)",
+ err.Error(), bname, vb, keys)
+ discard = true
+ b.Refresh()
+ return nil // retry
+ }
+ ech <- err
+ ch <- rv
+ return err
+ }
+
+ done = true
+ return nil
+ }()
+
+ if err != nil {
+ return
+ }
+ }
+
+ if attempts >= MaxBulkRetries {
+ err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys)
+ logging.Errorf("%v", err.Error())
+ ech <- err
+ }
+
+ ch <- rv
+}
+
+type errorStatus struct {
+ errStatus bool
+}
+
+type vbBulkGet struct {
+ b *Bucket
+ ch chan<- map[string]*gomemcached.MCResponse
+ ech chan<- error
+ k uint16
+ keys []string
+ reqDeadline time.Time
+ wg *sync.WaitGroup
+ subPaths []string
+ groupError *errorStatus
+ context []*memcached.ClientContext
+}
+
+const _NUM_CHANNELS = 5
+
+var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2
+var DefaultDialTimeout = time.Duration(0)
+var DefaultTimeout = time.Duration(0)
+var noDeadline = time.Time{}
+
+// Buffer 4k requests per worker
+var _VB_BULK_GET_CHANNELS []chan *vbBulkGet
+
+func InitBulkGet() {
+
+ DefaultDialTimeout = 20 * time.Second
+ DefaultTimeout = 120 * time.Second
+
+ memcached.SetDefaultDialTimeout(DefaultDialTimeout)
+
+ _VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS)
+
+ for i := 0; i < _NUM_CHANNELS; i++ {
+ channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS)
+ _VB_BULK_GET_CHANNELS[i] = channel
+
+ for j := 0; j < _NUM_CHANNEL_WORKERS; j++ {
+ go vbBulkGetWorker(channel)
+ }
+ }
+}
+
+func vbBulkGetWorker(ch chan *vbBulkGet) {
+ defer func() {
+ // Workers cannot panic and die
+ recover()
+ go vbBulkGetWorker(ch)
+ }()
+
+ for vbg := range ch {
+ vbDoBulkGet(vbg)
+ }
+}
+
+func vbDoBulkGet(vbg *vbBulkGet) {
+ defer vbg.wg.Done()
+ defer func() {
+ // Workers cannot panic and die
+ recover()
+ }()
+ vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError, vbg.context...)
+}
+
+var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.")
+
+func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time,
+ ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
+ eStatus *errorStatus, context ...*memcached.ClientContext) {
+
+ defer close(ch)
+ defer close(ech)
+
+ wg := &sync.WaitGroup{}
+
+ for k, keys := range kdm {
+
+ // GetBulk() group has error donot Queue items for this group
+ if eStatus.errStatus {
+ break
+ }
+
+ vbg := &vbBulkGet{
+ b: b,
+ ch: ch,
+ ech: ech,
+ k: k,
+ keys: keys,
+ reqDeadline: reqDeadline,
+ wg: wg,
+ subPaths: subPaths,
+ groupError: eStatus,
+ context: context,
+ }
+
+ wg.Add(1)
+
+ // Random int
+ // Right shift to avoid 8-byte alignment, and take low bits
+ c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS
+
+ select {
+ case _VB_BULK_GET_CHANNELS[c] <- vbg:
+ // No-op
+ default:
+ // Buffer full, abandon the bulk get
+ ech <- _ERR_CHAN_FULL
+ wg.Add(-1)
+ }
+ }
+
+ // Wait for my vb bulk gets
+ wg.Wait()
+}
+
+type multiError []error
+
+func (m multiError) Error() string {
+ if len(m) == 0 {
+ panic("Error of none")
+ }
+
+ return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error())
+}
+
+// Convert a stream of errors from ech into a multiError (or nil) and
+// send down eout.
+//
+// At least one send is guaranteed on eout, but two is possible, so
+// buffer the out channel appropriately.
+func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) {
+ defer func() { eout <- nil }()
+ var errs multiError
+ for e := range ech {
+ if !eStatus.errStatus && !IsKeyNoEntError(e) {
+ eStatus.errStatus = true
+ }
+
+ errs = append(errs, e)
+ }
+
+ if len(errs) > 0 {
+ eout <- errs
+ }
+}
+
+// Fetches multiple keys concurrently, with []byte values
+//
+// This is a wrapper around GetBulk which converts all values returned
+// by GetBulk from raw memcached responses into []byte slices.
+// Returns one document for duplicate keys
+func (b *Bucket) GetBulkRaw(keys []string, context ...*memcached.ClientContext) (map[string][]byte, error) {
+
+ resp, eout := b.getBulk(keys, noDeadline, nil, context...)
+
+ rv := make(map[string][]byte, len(keys))
+ for k, av := range resp {
+ rv[k] = av.Body
+ }
+
+ b.ReleaseGetBulkPools(resp)
+ return rv, eout
+
+}
+
+// GetBulk fetches multiple keys concurrently.
+//
+// Unlike more convenient GETs, the entire response is returned in the
+// map array for each key. Keys that were not found will not be included in
+// the map.
+
+func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, error) {
+ return b.getBulk(keys, reqDeadline, subPaths, context...)
+}
+
+func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) {
+ _STRING_MCRESPONSE_POOL.Put(rv)
+}
+
+func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, error) {
+ kdm := _VB_STRING_POOL.Get()
+ defer _VB_STRING_POOL.Put(kdm)
+ for _, k := range keys {
+ if k != "" {
+ vb := uint16(b.VBHash(k))
+ a, ok1 := kdm[vb]
+ if !ok1 {
+ a = _STRING_POOL.Get()
+ }
+ kdm[vb] = append(a, k)
+ }
+ }
+
+ eout := make(chan error, 2)
+ groupErrorStatus := &errorStatus{}
+
+ // processBulkGet will own both of these channels and
+ // guarantee they're closed before it returns.
+ ch := make(chan map[string]*gomemcached.MCResponse)
+ ech := make(chan error)
+
+ go errorCollector(ech, eout, groupErrorStatus)
+ go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus, context...)
+
+ var rv map[string]*gomemcached.MCResponse
+
+ for m := range ch {
+ if rv == nil {
+ rv = m
+ continue
+ }
+
+ for k, v := range m {
+ rv[k] = v
+ }
+ _STRING_MCRESPONSE_POOL.Put(m)
+ }
+
+ return rv, <-eout
+}
+
+// WriteOptions is the set of option flags availble for the Write
+// method. They are ORed together to specify the desired request.
+type WriteOptions int
+
+const (
+ // Raw specifies that the value is raw []byte or nil; don't
+ // JSON-encode it.
+ Raw = WriteOptions(1 << iota)
+ // AddOnly indicates an item should only be written if it
+ // doesn't exist, otherwise ErrKeyExists is returned.
+ AddOnly
+ // Persist causes the operation to block until the server
+ // confirms the item is persisted.
+ Persist
+ // Indexable causes the operation to block until it's availble via the index.
+ Indexable
+ // Append indicates the given value should be appended to the
+ // existing value for the given key.
+ Append
+)
+
+var optNames = []struct {
+ opt WriteOptions
+ name string
+}{
+ {Raw, "raw"},
+ {AddOnly, "addonly"}, {Persist, "persist"},
+ {Indexable, "indexable"}, {Append, "append"},
+}
+
+// String representation of WriteOptions
+func (w WriteOptions) String() string {
+ f := []string{}
+ for _, on := range optNames {
+ if w&on.opt != 0 {
+ f = append(f, on.name)
+ w &= ^on.opt
+ }
+ }
+ if len(f) == 0 || w != 0 {
+ f = append(f, fmt.Sprintf("0x%x", int(w)))
+ }
+ return strings.Join(f, "|")
+}
+
+// Error returned from Write with AddOnly flag, when key already exists in the bucket.
+var ErrKeyExists = errors.New("key exists")
+
+// General-purpose value setter.
+//
+// The Set, Add and Delete methods are just wrappers around this. The
+// interpretation of `v` depends on whether the `Raw` option is
+// given. If it is, v must be a byte array or nil. (A nil value causes
+// a delete.) If `Raw` is not given, `v` will be marshaled as JSON
+// before being written. It must be JSON-marshalable and it must not
+// be nil.
+func (b *Bucket) Write(k string, flags, exp int, v interface{},
+ opt WriteOptions, context ...*memcached.ClientContext) (err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) {
+ ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
+ }(time.Now())
+ }
+
+ var data []byte
+ if opt&Raw == 0 {
+ data, err = json.Marshal(v)
+ if err != nil {
+ return err
+ }
+ } else if v != nil {
+ data = v.([]byte)
+ }
+
+ var res *gomemcached.MCResponse
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ if opt&AddOnly != 0 {
+ res, err = memcached.UnwrapMemcachedError(
+ mc.Add(vb, k, flags, exp, data, context...))
+ if err == nil && res.Status != gomemcached.SUCCESS {
+ if res.Status == gomemcached.KEY_EEXISTS {
+ err = ErrKeyExists
+ } else {
+ err = res
+ }
+ }
+ } else if opt&Append != 0 {
+ res, err = mc.Append(vb, k, data, context...)
+ } else if data == nil {
+ res, err = mc.Del(vb, k, context...)
+ } else {
+ res, err = mc.Set(vb, k, flags, exp, data, context...)
+ }
+
+ return err
+ })
+
+ if err == nil && (opt&(Persist|Indexable) != 0) {
+ err = b.WaitForPersistence(k, res.Cas, data == nil)
+ }
+
+ return err
+}
+
+func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{},
+ opt WriteOptions, context ...*memcached.ClientContext) (mt *MutationToken, err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) {
+ ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err)
+ }(time.Now())
+ }
+
+ var data []byte
+ if opt&Raw == 0 {
+ data, err = json.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ } else if v != nil {
+ data = v.([]byte)
+ }
+
+ var res *gomemcached.MCResponse
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ if opt&AddOnly != 0 {
+ res, err = memcached.UnwrapMemcachedError(
+ mc.Add(vb, k, flags, exp, data, context...))
+ if err == nil && res.Status != gomemcached.SUCCESS {
+ if res.Status == gomemcached.KEY_EEXISTS {
+ err = ErrKeyExists
+ } else {
+ err = res
+ }
+ }
+ } else if opt&Append != 0 {
+ res, err = mc.Append(vb, k, data, context...)
+ } else if data == nil {
+ res, err = mc.Del(vb, k, context...)
+ } else {
+ res, err = mc.Set(vb, k, flags, exp, data, context...)
+ }
+
+ if len(res.Extras) >= 16 {
+ vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
+ seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
+ mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo}
+ }
+
+ return err
+ })
+
+ if err == nil && (opt&(Persist|Indexable) != 0) {
+ err = b.WaitForPersistence(k, res.Cas, data == nil)
+ }
+
+ return mt, err
+}
+
+// Set a value in this bucket with Cas and return the new Cas value
+func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, error) {
+ return b.WriteCas(k, 0, exp, cas, v, 0, context...)
+}
+
+// Set a value in this bucket with Cas without json encoding it
+func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, error) {
+ return b.WriteCas(k, 0, exp, cas, v, Raw, context...)
+}
+
+func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
+ opt WriteOptions, context ...*memcached.ClientContext) (newCas uint64, err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) {
+ ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
+ }(time.Now())
+ }
+
+ var data []byte
+ if opt&Raw == 0 {
+ data, err = json.Marshal(v)
+ if err != nil {
+ return 0, err
+ }
+ } else if v != nil {
+ data = v.([]byte)
+ }
+
+ var res *gomemcached.MCResponse
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err = mc.SetCas(vb, k, flags, exp, cas, data, context...)
+ return err
+ })
+
+ if err == nil && (opt&(Persist|Indexable) != 0) {
+ err = b.WaitForPersistence(k, res.Cas, data == nil)
+ }
+
+ return res.Cas, err
+}
+
+// Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard
+func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, *MutationToken, error) {
+ return b.WriteCasWithMT(k, flags, exp, cas, v, 0, context...)
+}
+
+func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, *MutationToken, error) {
+ return b.WriteCasWithMT(k, flags, exp, cas, v, Raw, context...)
+}
+
+func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{},
+ opt WriteOptions, context ...*memcached.ClientContext) (newCas uint64, mt *MutationToken, err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) {
+ ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
+ }(time.Now())
+ }
+
+ var data []byte
+ if opt&Raw == 0 {
+ data, err = json.Marshal(v)
+ if err != nil {
+ return 0, nil, err
+ }
+ } else if v != nil {
+ data = v.([]byte)
+ }
+
+ var res *gomemcached.MCResponse
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err = mc.SetCas(vb, k, flags, exp, cas, data, context...)
+ return err
+ })
+
+ if err != nil {
+ return 0, nil, err
+ }
+
+ // check for extras
+ if len(res.Extras) >= 16 {
+ vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
+ seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
+ vb := b.VBHash(k)
+ mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo}
+ }
+
+ if err == nil && (opt&(Persist|Indexable) != 0) {
+ err = b.WaitForPersistence(k, res.Cas, data == nil)
+ }
+
+ return res.Cas, mt, err
+}
+
+// Set a value in this bucket.
+// The value will be serialized into a JSON document.
+func (b *Bucket) Set(k string, exp int, v interface{}, context ...*memcached.ClientContext) error {
+ return b.Write(k, 0, exp, v, 0, context...)
+}
+
+// Set a value in this bucket with with flags
+func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}, context ...*memcached.ClientContext) (*MutationToken, error) {
+ return b.WriteWithMT(k, flags, exp, v, 0, context...)
+}
+
+// SetRaw sets a value in this bucket without JSON encoding it.
+func (b *Bucket) SetRaw(k string, exp int, v []byte, context ...*memcached.ClientContext) error {
+ return b.Write(k, 0, exp, v, Raw, context...)
+}
+
+// Add adds a value to this bucket; like Set except that nothing
+// happens if the key exists. The value will be serialized into a
+// JSON document.
+func (b *Bucket) Add(k string, exp int, v interface{}, context ...*memcached.ClientContext) (added bool, err error) {
+ err = b.Write(k, 0, exp, v, AddOnly, context...)
+ if err == ErrKeyExists {
+ return false, nil
+ }
+ return (err == nil), err
+}
+
+// AddRaw adds a value to this bucket; like SetRaw except that nothing
+// happens if the key exists. The value will be stored as raw bytes.
+func (b *Bucket) AddRaw(k string, exp int, v []byte, context ...*memcached.ClientContext) (added bool, err error) {
+ err = b.Write(k, 0, exp, v, AddOnly|Raw, context...)
+ if err == ErrKeyExists {
+ return false, nil
+ }
+ return (err == nil), err
+}
+
+// Add adds a value to this bucket; like Set except that nothing
+// happens if the key exists. The value will be serialized into a
+// JSON document.
+func (b *Bucket) AddWithMT(k string, exp int, v interface{}, context ...*memcached.ClientContext) (added bool, mt *MutationToken, err error) {
+ mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly, context...)
+ if err == ErrKeyExists {
+ return false, mt, nil
+ }
+ return (err == nil), mt, err
+}
+
+// AddRaw adds a value to this bucket; like SetRaw except that nothing
+// happens if the key exists. The value will be stored as raw bytes.
+func (b *Bucket) AddRawWithMT(k string, exp int, v []byte, context ...*memcached.ClientContext) (added bool, mt *MutationToken, err error) {
+ mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw, context...)
+ if err == ErrKeyExists {
+ return false, mt, nil
+ }
+ return (err == nil), mt, err
+}
+
+// Append appends raw data to an existing item.
+func (b *Bucket) Append(k string, data []byte, context ...*memcached.ClientContext) error {
+ return b.Write(k, 0, 0, data, Append|Raw, context...)
+}
+
+// Returns collectionUid, manifestUid, error.
+func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline time.Time) (uint32, uint32, error) {
+ var err error
+ var response *gomemcached.MCResponse
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("GetCollectionCID", scope+"."+collection, t, err) }(time.Now())
+ }
+
+ var key = "DUMMY" // Contact any server.
+ var manifestUid uint32
+ var collUid uint32
+ err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
+ var err1 error
+
+ mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
+ _, err1 = mc.SelectBucket(b.Name)
+ if err1 != nil {
+ return err1
+ }
+
+ response, err1 = mc.CollectionsGetCID(scope, collection)
+ if err1 != nil {
+ return err1
+ }
+
+ manifestUid = binary.BigEndian.Uint32(response.Extras[4:8])
+ collUid = binary.BigEndian.Uint32(response.Extras[8:12])
+
+ return nil
+ }, false)
+
+ return collUid, manifestUid, err
+}
+
+// Get a value straight from Memcached
+func (b *Bucket) GetsMC(key string, reqDeadline time.Time, context ...*memcached.ClientContext) (*gomemcached.MCResponse, error) {
+ var err error
+ var response *gomemcached.MCResponse
+
+ if key == "" {
+ return nil, nil
+ }
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now())
+ }
+
+ err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
+ var err1 error
+
+ mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
+ response, err1 = mc.Get(vb, key, context...)
+ if err1 != nil {
+ return err1
+ }
+ return nil
+ }, false)
+ return response, err
+}
+
+// Get a value through the subdoc API
+func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (*gomemcached.MCResponse, error) {
+ var err error
+ var response *gomemcached.MCResponse
+
+ if key == "" {
+ return nil, nil
+ }
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now())
+ }
+
+ err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
+ var err1 error
+
+ mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
+ response, err1 = mc.GetSubdoc(vb, key, subPaths, context...)
+ if err1 != nil {
+ return err1
+ }
+ return nil
+ }, false)
+ return response, err
+}
+
+// GetsRaw gets a raw value from this bucket including its CAS
+// counter and flags.
+func (b *Bucket) GetsRaw(k string, context ...*memcached.ClientContext) (data []byte, flags int,
+ cas uint64, err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
+ }
+
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err := mc.Get(vb, k, context...)
+ if err != nil {
+ return err
+ }
+ cas = res.Cas
+ if len(res.Extras) >= 4 {
+ flags = int(binary.BigEndian.Uint32(res.Extras))
+ }
+ data = res.Body
+ return nil
+ })
+ return
+}
+
+// Gets gets a value from this bucket, including its CAS counter. The
+// value is expected to be a JSON stream and will be deserialized into
+// rv.
+func (b *Bucket) Gets(k string, rv interface{}, caso *uint64, context ...*memcached.ClientContext) error {
+ data, _, cas, err := b.GetsRaw(k, context...)
+ if err != nil {
+ return err
+ }
+ if caso != nil {
+ *caso = cas
+ }
+ return json.Unmarshal(data, rv)
+}
+
+// Get a value from this bucket.
+// The value is expected to be a JSON stream and will be deserialized
+// into rv.
+func (b *Bucket) Get(k string, rv interface{}, context ...*memcached.ClientContext) error {
+ return b.Gets(k, rv, nil, context...)
+}
+
+// GetRaw gets a raw value from this bucket. No marshaling is performed.
+func (b *Bucket) GetRaw(k string, context ...*memcached.ClientContext) ([]byte, error) {
+ d, _, _, err := b.GetsRaw(k, context...)
+ return d, err
+}
+
+// GetAndTouchRaw gets a raw value from this bucket including its CAS
+// counter and flags, and updates the expiry on the doc.
+func (b *Bucket) GetAndTouchRaw(k string, exp int, context ...*memcached.ClientContext) (data []byte,
+ cas uint64, err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
+ }
+
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err := mc.GetAndTouch(vb, k, exp, context...)
+ if err != nil {
+ return err
+ }
+ cas = res.Cas
+ data = res.Body
+ return nil
+ })
+ return data, cas, err
+}
+
+// GetMeta returns the meta values for a key
+func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64, context ...*memcached.ClientContext) (err error) {
+
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now())
+ }
+
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err := mc.GetMeta(vb, k, context...)
+ if err != nil {
+ return err
+ }
+
+ *cas = res.Cas
+ if len(res.Extras) >= 8 {
+ *flags = int(binary.BigEndian.Uint32(res.Extras[4:]))
+ }
+
+ if len(res.Extras) >= 12 {
+ *expiry = int(binary.BigEndian.Uint32(res.Extras[8:]))
+ }
+
+ if len(res.Extras) >= 20 {
+ *seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:]))
+ }
+
+ return nil
+ })
+
+ return err
+}
+
+// Delete a key from this bucket.
+func (b *Bucket) Delete(k string, context ...*memcached.ClientContext) error {
+ return b.Write(k, 0, 0, nil, Raw, context...)
+}
+
+// Incr increments the value at a given key by amt and defaults to def if no value present.
+func (b *Bucket) Incr(k string, amt, def uint64, exp int, context ...*memcached.ClientContext) (val uint64, err error) {
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now())
+ }
+
+ var rv uint64
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err := mc.Incr(vb, k, amt, def, exp, context...)
+ if err != nil {
+ return err
+ }
+ rv = res
+ return nil
+ })
+ return rv, err
+}
+
+// Decr decrements the value at a given key by amt and defaults to def if no value present
+func (b *Bucket) Decr(k string, amt, def uint64, exp int, context ...*memcached.ClientContext) (val uint64, err error) {
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now())
+ }
+
+ var rv uint64
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ res, err := mc.Decr(vb, k, amt, def, exp, context...)
+ if err != nil {
+ return err
+ }
+ rv = res
+ return nil
+ })
+ return rv, err
+}
+
+// Wrapper around memcached.CASNext()
+func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool {
+ if ClientOpCallback != nil {
+ defer func(t time.Time) {
+ ClientOpCallback("casNext", k, t, state.Err)
+ }(time.Now())
+ }
+
+ keepGoing := false
+ state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ keepGoing = mc.CASNext(vb, k, exp, state)
+ return state.Err
+ })
+ return keepGoing && state.Err == nil
+}
+
+// An UpdateFunc is a callback function to update a document
+type UpdateFunc func(current []byte) (updated []byte, err error)
+
+// Return this as the error from an UpdateFunc to cancel the Update
+// operation.
+const UpdateCancel = memcached.CASQuit
+
+// Update performs a Safe update of a document, avoiding conflicts by
+// using CAS.
+//
+// The callback function will be invoked with the current raw document
+// contents (or nil if the document doesn't exist); it should return
+// the updated raw contents (or nil to delete.) If it decides not to
+// change anything it can return UpdateCancel as the error.
+//
+// If another writer modifies the document between the get and the
+// set, the callback will be invoked again with the newer value.
+func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error {
+ _, err := b.update(k, exp, callback)
+ return err
+}
+
+// internal version of Update that returns a CAS value
+func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) {
+ var state memcached.CASState
+ for b.casNext(k, exp, &state) {
+ var err error
+ if state.Value, err = callback(state.Value); err != nil {
+ return 0, err
+ }
+ }
+ return state.Cas, state.Err
+}
+
+// A WriteUpdateFunc is a callback function to update a document
+type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)
+
+// WriteUpdate performs a Safe update of a document, avoiding
+// conflicts by using CAS. WriteUpdate is like Update, except that
+// the callback can return a set of WriteOptions, of which Persist and
+// Indexable are recognized: these cause the call to wait until the
+// document update has been persisted to disk and/or become available
+// to index.
+func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error {
+ var writeOpts WriteOptions
+ var deletion bool
+ // Wrap the callback in an UpdateFunc we can pass to Update:
+ updateCallback := func(current []byte) (updated []byte, err error) {
+ update, opt, err := callback(current)
+ writeOpts = opt
+ deletion = (update == nil)
+ return update, err
+ }
+ cas, err := b.update(k, exp, updateCallback)
+ if err != nil {
+ return err
+ }
+ // If callback asked, wait for persistence or indexability:
+ if writeOpts&(Persist|Indexable) != 0 {
+ err = b.WaitForPersistence(k, cas, deletion)
+ }
+ return err
+}
+
+// Observe observes the current state of a document.
+func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) {
+ if ClientOpCallback != nil {
+ defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now())
+ }
+
+ err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
+ result, err = mc.Observe(vb, k)
+ return err
+ })
+ return
+}
+
+// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
+// if the value has been overwritten by another before being persisted.
+var ErrOverwritten = errors.New("overwritten")
+
+// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
+// if the value hasn't been persisted by the timeout interval
+var ErrTimeout = errors.New("timeout")
+
+// WaitForPersistence waits for an item to be considered durable.
+//
+// Besides transport errors, ErrOverwritten may be returned if the
+// item is overwritten before it reaches durability. ErrTimeout may
+// occur if the item isn't found durable in a reasonable amount of
+// time.
+func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error {
+ timeout := 10 * time.Second
+ sleepDelay := 5 * time.Millisecond
+ start := time.Now()
+ for {
+ time.Sleep(sleepDelay)
+ sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time
+
+ result, err := b.Observe(k)
+ if err != nil {
+ return err
+ }
+ if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten {
+ return ErrOverwritten
+ } else if persisted {
+ return nil
+ }
+
+ if result.PersistenceTime > 0 {
+ timeout = 2 * result.PersistenceTime
+ }
+ if time.Since(start) >= timeout-sleepDelay {
+ return ErrTimeout
+ }
+ }
+}
+
+var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16)
+
+type stringPool struct {
+ pool *sync.Pool
+ size int
+}
+
+func newStringPool(size int) *stringPool {
+ rv := &stringPool{
+ pool: &sync.Pool{
+ New: func() interface{} {
+ return make([]string, 0, size)
+ },
+ },
+ size: size,
+ }
+
+ return rv
+}
+
+func (this *stringPool) Get() []string {
+ return this.pool.Get().([]string)
+}
+
+func (this *stringPool) Put(s []string) {
+ if s == nil || cap(s) < this.size || cap(s) > 2*this.size {
+ return
+ }
+
+ this.pool.Put(s[0:0])
+}
+
+var _STRING_POOL = newStringPool(16)
+
+type vbStringPool struct {
+ pool *sync.Pool
+ strPool *stringPool
+}
+
+func newVBStringPool(size int, sp *stringPool) *vbStringPool {
+ rv := &vbStringPool{
+ pool: &sync.Pool{
+ New: func() interface{} {
+ return make(map[uint16][]string, size)
+ },
+ },
+ strPool: sp,
+ }
+
+ return rv
+}
+
+func (this *vbStringPool) Get() map[uint16][]string {
+ return this.pool.Get().(map[uint16][]string)
+}
+
+func (this *vbStringPool) Put(s map[uint16][]string) {
+ if s == nil {
+ return
+ }
+
+ for k, v := range s {
+ delete(s, k)
+ this.strPool.Put(v)
+ }
+
+ this.pool.Put(s)
+}
+
+var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL)
diff --git a/vendor/github.com/couchbase/go-couchbase/conn_pool.go b/vendor/github.com/couchbase/go-couchbase/conn_pool.go
new file mode 100644
index 0000000000..47854c09f1
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/conn_pool.go
@@ -0,0 +1,421 @@
+package couchbase
+
+import (
+ "crypto/tls"
+ "errors"
+ "sync/atomic"
+ "time"
+
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/gomemcached/client"
+ "github.com/couchbase/goutils/logging"
+)
+
+// GenericMcdAuthHandler is a kind of AuthHandler that performs
+// special auth exchange (like non-standard auth, possibly followed by
+// select-bucket).
+type GenericMcdAuthHandler interface {
+ AuthHandler
+ AuthenticateMemcachedConn(host string, conn *memcached.Client) error
+}
+
+// Error raised when a connection can't be retrieved from a pool.
+var TimeoutError = errors.New("timeout waiting to build connection")
+var errClosedPool = errors.New("the connection pool is closed")
+var errNoPool = errors.New("no connection pool")
+
+// Default timeout for retrieving a connection from the pool.
+var ConnPoolTimeout = time.Hour * 24 * 30
+
+// overflow connection closer cycle time
+var ConnCloserInterval = time.Second * 30
+
+// ConnPoolAvailWaitTime is the amount of time to wait for an existing
+// connection from the pool before considering the creation of a new
+// one.
+var ConnPoolAvailWaitTime = time.Millisecond
+
+type connectionPool struct {
+ host string
+ mkConn func(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error)
+ auth AuthHandler
+ connections chan *memcached.Client
+ createsem chan bool
+ bailOut chan bool
+ poolSize int
+ connCount uint64
+ inUse bool
+ encrypted bool
+ tlsConfig *tls.Config
+ bucket string
+}
+
+func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int, tlsConfig *tls.Config, bucket string, encrypted bool) *connectionPool {
+ connSize := poolSize
+ if closer {
+ connSize += poolOverflow
+ }
+ rv := &connectionPool{
+ host: host,
+ connections: make(chan *memcached.Client, connSize),
+ createsem: make(chan bool, poolSize+poolOverflow),
+ mkConn: defaultMkConn,
+ auth: ah,
+ poolSize: poolSize,
+ bucket: bucket,
+ encrypted: encrypted,
+ }
+
+ if encrypted {
+ rv.tlsConfig = tlsConfig
+ }
+
+ if closer {
+ rv.bailOut = make(chan bool, 1)
+ go rv.connCloser()
+ }
+ return rv
+}
+
+// ConnPoolTimeout is notified whenever connections are acquired from a pool.
+var ConnPoolCallback func(host string, source string, start time.Time, err error)
+
+// Use regular in-the-clear connection if tlsConfig is nil.
+// Use secure connection (TLS) if tlsConfig is set.
+func defaultMkConn(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error) {
+ var features memcached.Features
+
+ var conn *memcached.Client
+ var err error
+ if tlsConfig == nil {
+ conn, err = memcached.Connect("tcp", host)
+ } else {
+ conn, err = memcached.ConnectTLS("tcp", host, tlsConfig)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ if DefaultTimeout > 0 {
+ conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
+ }
+
+ if TCPKeepalive == true {
+ conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second)
+ }
+
+ if EnableMutationToken == true {
+ features = append(features, memcached.FeatureMutationToken)
+ }
+ if EnableDataType == true {
+ features = append(features, memcached.FeatureDataType)
+ }
+
+ if EnableXattr == true {
+ features = append(features, memcached.FeatureXattr)
+ }
+
+ if EnableCollections {
+ features = append(features, memcached.FeatureCollections)
+ }
+
+ if len(features) > 0 {
+ res, err := conn.EnableFeatures(features)
+ if err != nil && isTimeoutError(err) {
+ conn.Close()
+ return nil, err
+ }
+
+ if err != nil || res.Status != gomemcached.SUCCESS {
+ logging.Warnf("Unable to enable features %v", err)
+ }
+ }
+
+ if gah, ok := ah.(GenericMcdAuthHandler); ok {
+ err = gah.AuthenticateMemcachedConn(host, conn)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+
+ if DefaultTimeout > 0 {
+ conn.SetDeadline(noDeadline)
+ }
+
+ return conn, nil
+ }
+ name, pass, bucket := ah.GetCredentials()
+ if bucket == "" {
+ // Authenticator does not know specific bucket.
+ bucket = bucketName
+ }
+
+ if name != "default" {
+ _, err = conn.Auth(name, pass)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+ // Select bucket (Required for cb_auth creds)
+ // Required when doing auth with _admin credentials
+ if bucket != "" && bucket != name {
+ _, err = conn.SelectBucket(bucket)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+ }
+ }
+
+ if DefaultTimeout > 0 {
+ conn.SetDeadline(noDeadline)
+ }
+
+ return conn, nil
+}
+
+func (cp *connectionPool) Close() (err error) {
+ defer func() {
+ if recover() != nil {
+ err = errors.New("connectionPool.Close error")
+ }
+ }()
+ if cp.bailOut != nil {
+
+ // defensively, we won't wait if the channel is full
+ select {
+ case cp.bailOut <- false:
+ default:
+ }
+ }
+ close(cp.connections)
+ for c := range cp.connections {
+ c.Close()
+ }
+ return
+}
+
+func (cp *connectionPool) Node() string {
+ return cp.host
+}
+
+func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) {
+ if cp == nil {
+ return nil, errNoPool
+ }
+
+ path := ""
+
+ if ConnPoolCallback != nil {
+ defer func(path *string, start time.Time) {
+ ConnPoolCallback(cp.host, *path, start, err)
+ }(&path, time.Now())
+ }
+
+ path = "short-circuit"
+
+ // short-circuit available connetions.
+ select {
+ case rv, isopen := <-cp.connections:
+ if !isopen {
+ return nil, errClosedPool
+ }
+ atomic.AddUint64(&cp.connCount, 1)
+ return rv, nil
+ default:
+ }
+
+ t := time.NewTimer(ConnPoolAvailWaitTime)
+ defer t.Stop()
+
+ // Try to grab an available connection within 1ms
+ select {
+ case rv, isopen := <-cp.connections:
+ path = "avail1"
+ if !isopen {
+ return nil, errClosedPool
+ }
+ atomic.AddUint64(&cp.connCount, 1)
+ return rv, nil
+ case <-t.C:
+ // No connection came around in time, let's see
+ // whether we can get one or build a new one first.
+ t.Reset(d) // Reuse the timer for the full timeout.
+ select {
+ case rv, isopen := <-cp.connections:
+ path = "avail2"
+ if !isopen {
+ return nil, errClosedPool
+ }
+ atomic.AddUint64(&cp.connCount, 1)
+ return rv, nil
+ case cp.createsem <- true:
+ path = "create"
+ // Build a connection if we can't get a real one.
+ // This can potentially be an overflow connection, or
+ // a pooled connection.
+ rv, err := cp.mkConn(cp.host, cp.auth, cp.tlsConfig, cp.bucket)
+ if err != nil {
+ // On error, release our create hold
+ <-cp.createsem
+ } else {
+ atomic.AddUint64(&cp.connCount, 1)
+ }
+ return rv, err
+ case <-t.C:
+ return nil, ErrTimeout
+ }
+ }
+}
+
+func (cp *connectionPool) Get() (*memcached.Client, error) {
+ return cp.GetWithTimeout(ConnPoolTimeout)
+}
+
+func (cp *connectionPool) Return(c *memcached.Client) {
+ if c == nil {
+ return
+ }
+
+ if cp == nil {
+ c.Close()
+ }
+
+ if c.IsHealthy() {
+ defer func() {
+ if recover() != nil {
+ // This happens when the pool has already been
+ // closed and we're trying to return a
+ // connection to it anyway. Just close the
+ // connection.
+ c.Close()
+ }
+ }()
+
+ select {
+ case cp.connections <- c:
+ default:
+ <-cp.createsem
+ c.Close()
+ }
+ } else {
+ <-cp.createsem
+ c.Close()
+ }
+}
+
+// give the ability to discard a connection from a pool
+// useful for ditching connections to the wrong node after a rebalance
+func (cp *connectionPool) Discard(c *memcached.Client) {
+ <-cp.createsem
+ c.Close()
+}
+
+// asynchronous connection closer
+func (cp *connectionPool) connCloser() {
+ var connCount uint64
+
+ t := time.NewTimer(ConnCloserInterval)
+ defer t.Stop()
+
+ for {
+ connCount = cp.connCount
+
+ // we don't exist anymore! bail out!
+ select {
+ case <-cp.bailOut:
+ return
+ case <-t.C:
+ }
+ t.Reset(ConnCloserInterval)
+
+ // no overflow connections open or sustained requests for connections
+ // nothing to do until the next cycle
+ if len(cp.connections) <= cp.poolSize ||
+ ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) {
+ continue
+ }
+
+ // close overflow connections now that they are not needed
+ for c := range cp.connections {
+ select {
+ case <-cp.bailOut:
+ return
+ default:
+ }
+
+ // bail out if close did not work out
+ if !cp.connCleanup(c) {
+ return
+ }
+ if len(cp.connections) <= cp.poolSize {
+ break
+ }
+ }
+ }
+}
+
+// close connection with recovery on error
+func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) {
+
+ // just in case we are closing a connection after
+ // bailOut has been sent but we haven't yet read it
+ defer func() {
+ if recover() != nil {
+ rv = false
+ }
+ }()
+ rv = true
+
+ c.Close()
+ <-cp.createsem
+ return
+}
+
+func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) {
+ if cp == nil {
+ return nil, errNoPool
+ }
+ mc, err := cp.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ // A connection can't be used after TAP; Dont' count it against the
+ // connection pool capacity
+ <-cp.createsem
+
+ return mc.StartTapFeed(*args)
+}
+
+const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
+
+func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) {
+ if cp == nil {
+ return nil, errNoPool
+ }
+ mc, err := cp.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ // A connection can't be used after it has been allocated to UPR;
+ // Dont' count it against the connection pool capacity
+ <-cp.createsem
+
+ uf, err := mc.NewUprFeed()
+ if err != nil {
+ return nil, err
+ }
+
+ if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil {
+ return nil, err
+ }
+
+ if err := uf.StartFeedWithConfig(data_chan_size); err != nil {
+ return nil, err
+ }
+
+ return uf, nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/ddocs.go b/vendor/github.com/couchbase/go-couchbase/ddocs.go
new file mode 100644
index 0000000000..f9cc343aa8
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/ddocs.go
@@ -0,0 +1,288 @@
+package couchbase
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "github.com/couchbase/goutils/logging"
+ "io/ioutil"
+ "net/http"
+)
+
+// ViewDefinition represents a single view within a design document.
+type ViewDefinition struct {
+ Map string `json:"map"`
+ Reduce string `json:"reduce,omitempty"`
+}
+
+// DDoc is the document body of a design document specifying a view.
+type DDoc struct {
+ Language string `json:"language,omitempty"`
+ Views map[string]ViewDefinition `json:"views"`
+}
+
+// DDocsResult represents the result from listing the design
+// documents.
+type DDocsResult struct {
+ Rows []struct {
+ DDoc struct {
+ Meta map[string]interface{}
+ JSON DDoc
+ } `json:"doc"`
+ } `json:"rows"`
+}
+
+// GetDDocs lists all design documents
+func (b *Bucket) GetDDocs() (DDocsResult, error) {
+ var ddocsResult DDocsResult
+ b.RLock()
+ pool := b.pool
+ uri := b.DDocs.URI
+ b.RUnlock()
+
+ // MB-23555 ephemeral buckets have no ddocs
+ if uri == "" {
+ return DDocsResult{}, nil
+ }
+
+ err := pool.client.parseURLResponse(uri, &ddocsResult)
+ if err != nil {
+ return DDocsResult{}, err
+ }
+ return ddocsResult, nil
+}
+
+func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error {
+ ddocURI := fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
+ err := b.parseAPIResponse(ddocURI, &into)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error) {
+ var ddocsResult DDocsResult
+ b.RLock()
+ uri := b.DDocs.URI
+ b.RUnlock()
+
+ // MB-23555 ephemeral buckets have no ddocs
+ if uri == "" {
+ return DDocsResult{}, nil
+ }
+
+ err := b.parseURLResponse(uri, &ddocsResult)
+ if err != nil {
+ return DDocsResult{}, err
+ }
+ return ddocsResult, nil
+}
+
+func (b *Bucket) ddocURL(docname string) (string, error) {
+ u, err := b.randomBaseURL()
+ if err != nil {
+ return "", err
+ }
+ u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
+ return u.String(), nil
+}
+
+func (b *Bucket) ddocURLNext(nodeId int, docname string) (string, int, error) {
+ u, selected, err := b.randomNextURL(nodeId)
+ if err != nil {
+ return "", -1, err
+ }
+ u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
+ return u.String(), selected, nil
+}
+
+const ABS_MAX_RETRIES = 10
+const ABS_MIN_RETRIES = 3
+
+func (b *Bucket) getMaxRetries() (int, error) {
+
+ maxRetries := len(b.Nodes())
+
+ if maxRetries == 0 {
+ return 0, fmt.Errorf("No available Couch rest URLs")
+ }
+
+ if maxRetries > ABS_MAX_RETRIES {
+ maxRetries = ABS_MAX_RETRIES
+ } else if maxRetries < ABS_MIN_RETRIES {
+ maxRetries = ABS_MIN_RETRIES
+ }
+
+ return maxRetries, nil
+}
+
+// PutDDoc installs a design document.
+func (b *Bucket) PutDDoc(docname string, value interface{}) error {
+
+ var Err error
+
+ maxRetries, err := b.getMaxRetries()
+ if err != nil {
+ return err
+ }
+
+ lastNode := START_NODE_ID
+
+ for retryCount := 0; retryCount < maxRetries; retryCount++ {
+
+ Err = nil
+
+ ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
+ if err != nil {
+ return err
+ }
+
+ lastNode = selectedNode
+
+ logging.Infof(" Trying with selected node %d", selectedNode)
+ j, err := json.Marshal(value)
+ if err != nil {
+ return err
+ }
+
+ req, err := http.NewRequest("PUT", ddocU, bytes.NewReader(j))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */))
+ if err != nil {
+ return err
+ }
+
+ res, err := doHTTPRequest(req)
+ if err != nil {
+ return err
+ }
+
+ if res.StatusCode != 201 {
+ body, _ := ioutil.ReadAll(res.Body)
+ Err = fmt.Errorf("error installing view: %v / %s",
+ res.Status, body)
+ logging.Errorf(" Error in PutDDOC %v. Retrying...", Err)
+ res.Body.Close()
+ b.Refresh()
+ continue
+ }
+
+ res.Body.Close()
+ break
+ }
+
+ return Err
+}
+
+// GetDDoc retrieves a specific a design doc.
+func (b *Bucket) GetDDoc(docname string, into interface{}) error {
+ var Err error
+ var res *http.Response
+
+ maxRetries, err := b.getMaxRetries()
+ if err != nil {
+ return err
+ }
+
+ lastNode := START_NODE_ID
+ for retryCount := 0; retryCount < maxRetries; retryCount++ {
+
+ Err = nil
+ ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
+ if err != nil {
+ return err
+ }
+
+ lastNode = selectedNode
+ logging.Infof(" Trying with selected node %d", selectedNode)
+
+ req, err := http.NewRequest("GET", ddocU, nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */))
+ if err != nil {
+ return err
+ }
+
+ res, err = doHTTPRequest(req)
+ if err != nil {
+ return err
+ }
+ if res.StatusCode != 200 {
+ body, _ := ioutil.ReadAll(res.Body)
+ Err = fmt.Errorf("error reading view: %v / %s",
+ res.Status, body)
+ logging.Errorf(" Error in GetDDOC %v Retrying...", Err)
+ b.Refresh()
+ res.Body.Close()
+ continue
+ }
+ defer res.Body.Close()
+ break
+ }
+
+ if Err != nil {
+ return Err
+ }
+
+ d := json.NewDecoder(res.Body)
+ return d.Decode(into)
+}
+
+// DeleteDDoc removes a design document.
+func (b *Bucket) DeleteDDoc(docname string) error {
+
+ var Err error
+
+ maxRetries, err := b.getMaxRetries()
+ if err != nil {
+ return err
+ }
+
+ lastNode := START_NODE_ID
+
+ for retryCount := 0; retryCount < maxRetries; retryCount++ {
+
+ Err = nil
+ ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
+ if err != nil {
+ return err
+ }
+
+ lastNode = selectedNode
+ logging.Infof(" Trying with selected node %d", selectedNode)
+
+ req, err := http.NewRequest("DELETE", ddocU, nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ err = maybeAddAuth(req, b.authHandler(false /* bucket not already locked */))
+ if err != nil {
+ return err
+ }
+
+ res, err := doHTTPRequest(req)
+ if err != nil {
+ return err
+ }
+ if res.StatusCode != 200 {
+ body, _ := ioutil.ReadAll(res.Body)
+ Err = fmt.Errorf("error deleting view : %v / %s", res.Status, body)
+ logging.Errorf(" Error in DeleteDDOC %v. Retrying ... ", Err)
+ b.Refresh()
+ res.Body.Close()
+ continue
+ }
+
+ res.Body.Close()
+ break
+ }
+ return Err
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/go.mod b/vendor/github.com/couchbase/go-couchbase/go.mod
new file mode 100644
index 0000000000..4d4ed0a714
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/go.mod
@@ -0,0 +1,3 @@
+module github.com/couchbase/go-couchbase
+
+go 1.13
diff --git a/vendor/github.com/couchbase/go-couchbase/observe.go b/vendor/github.com/couchbase/go-couchbase/observe.go
new file mode 100644
index 0000000000..6e746f5a16
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/observe.go
@@ -0,0 +1,300 @@
+package couchbase
+
+import (
+ "fmt"
+ "github.com/couchbase/goutils/logging"
+ "sync"
+)
+
+type PersistTo uint8
+
+const (
+ PersistNone = PersistTo(0x00)
+ PersistMaster = PersistTo(0x01)
+ PersistOne = PersistTo(0x02)
+ PersistTwo = PersistTo(0x03)
+ PersistThree = PersistTo(0x04)
+ PersistFour = PersistTo(0x05)
+)
+
+type ObserveTo uint8
+
+const (
+ ObserveNone = ObserveTo(0x00)
+ ObserveReplicateOne = ObserveTo(0x01)
+ ObserveReplicateTwo = ObserveTo(0x02)
+ ObserveReplicateThree = ObserveTo(0x03)
+ ObserveReplicateFour = ObserveTo(0x04)
+)
+
+type JobType uint8
+
+const (
+ OBSERVE = JobType(0x00)
+ PERSIST = JobType(0x01)
+)
+
+type ObservePersistJob struct {
+ vb uint16
+ vbuuid uint64
+ hostname string
+ jobType JobType
+ failover uint8
+ lastPersistedSeqNo uint64
+ currentSeqNo uint64
+ resultChan chan *ObservePersistJob
+ errorChan chan *OPErrResponse
+}
+
+type OPErrResponse struct {
+ vb uint16
+ vbuuid uint64
+ err error
+ job *ObservePersistJob
+}
+
+var ObservePersistPool = NewPool(1024)
+var OPJobChan = make(chan *ObservePersistJob, 1024)
+var OPJobDone = make(chan bool)
+
+var wg sync.WaitGroup
+
+func (b *Bucket) StartOPPollers(maxWorkers int) {
+
+ for i := 0; i < maxWorkers; i++ {
+ go b.OPJobPoll()
+ wg.Add(1)
+ }
+ wg.Wait()
+}
+
+func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
+
+ numNodes := len(b.Nodes())
+ if int(nPersist) > numNodes || int(nObserve) > numNodes {
+ return fmt.Errorf("Not enough healthy nodes in the cluster")
+ }
+
+ if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
+ return fmt.Errorf("Not enough replicas in the cluster")
+ }
+
+ if EnableMutationToken == false {
+ return fmt.Errorf("Mutation Tokens not enabled ")
+ }
+
+ b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
+ return
+}
+
+func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
+ b.RLock()
+ ds := b.ds
+ b.RUnlock()
+
+ if ds == nil {
+ return
+ }
+
+ nj := 0 // total number of jobs
+ resultChan := make(chan *ObservePersistJob, 10)
+ errChan := make(chan *OPErrResponse, 10)
+
+ nodes := b.GetNodeList(vb)
+ if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
+ return fmt.Errorf("Not enough healthy nodes in the cluster"), false
+ }
+
+ logging.Infof("Node list %v", nodes)
+
+ if ds.Observe >= ObserveReplicateOne {
+ // create a job for each host
+ for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
+ opJob := ObservePersistPool.Get()
+ opJob.vb = vb
+ opJob.vbuuid = vbuuid
+ opJob.jobType = OBSERVE
+ opJob.hostname = nodes[i]
+ opJob.resultChan = resultChan
+ opJob.errorChan = errChan
+
+ OPJobChan <- opJob
+ nj++
+
+ }
+ }
+
+ if ds.Persist >= PersistMaster {
+ for i := PersistMaster; i < ds.Persist+1; i++ {
+ opJob := ObservePersistPool.Get()
+ opJob.vb = vb
+ opJob.vbuuid = vbuuid
+ opJob.jobType = PERSIST
+ opJob.hostname = nodes[i]
+ opJob.resultChan = resultChan
+ opJob.errorChan = errChan
+
+ OPJobChan <- opJob
+ nj++
+
+ }
+ }
+
+ ok := true
+ for ok {
+ select {
+ case res := <-resultChan:
+ jobDone := false
+ if res.failover == 0 {
+ // no failover
+ if res.jobType == PERSIST {
+ if res.lastPersistedSeqNo >= seqNo {
+ jobDone = true
+ }
+
+ } else {
+ if res.currentSeqNo >= seqNo {
+ jobDone = true
+ }
+ }
+
+ if jobDone == true {
+ nj--
+ ObservePersistPool.Put(res)
+ } else {
+ // requeue this job
+ OPJobChan <- res
+ }
+
+ } else {
+ // Not currently handling failover scenarios TODO
+ nj--
+ ObservePersistPool.Put(res)
+ failover = true
+ }
+
+ if nj == 0 {
+ // done with all the jobs
+ ok = false
+ close(resultChan)
+ close(errChan)
+ }
+
+ case Err := <-errChan:
+ logging.Errorf("Error in Observe/Persist %v", Err.err)
+ err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
+ nj--
+ ObservePersistPool.Put(Err.job)
+ if nj == 0 {
+ close(resultChan)
+ close(errChan)
+ ok = false
+ }
+ }
+ }
+
+ return
+}
+
+func (b *Bucket) OPJobPoll() {
+
+ ok := true
+ for ok == true {
+ select {
+ case job := <-OPJobChan:
+ pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
+ if pool == nil {
+ errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
+ errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
+ errRes.job = job
+ job.errorChan <- errRes
+ continue
+ }
+ conn, err := pool.Get()
+ if err != nil {
+ errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
+ errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
+ errRes.job = job
+ job.errorChan <- errRes
+ continue
+ }
+
+ res, err := conn.ObserveSeq(job.vb, job.vbuuid)
+ if err != nil {
+ errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
+ errRes.err = fmt.Errorf("Command failed %v", err)
+ errRes.job = job
+ job.errorChan <- errRes
+ continue
+
+ }
+ pool.Return(conn)
+ job.lastPersistedSeqNo = res.LastPersistedSeqNo
+ job.currentSeqNo = res.CurrentSeqNo
+ job.failover = res.Failover
+
+ job.resultChan <- job
+ case <-OPJobDone:
+ logging.Infof("Observe Persist Poller exitting")
+ ok = false
+ }
+ }
+ wg.Done()
+}
+
+func (b *Bucket) GetNodeList(vb uint16) []string {
+
+ vbm := b.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ logging.Infof("vbmap smaller than vblist")
+ return nil
+ }
+
+ nodes := make([]string, len(vbm.VBucketMap[vb]))
+ for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
+ n := vbm.VBucketMap[vb][i]
+ if n < 0 {
+ continue
+ }
+
+ node := b.getMasterNode(n)
+ if len(node) > 1 {
+ nodes[i] = node
+ }
+ continue
+
+ }
+ return nodes
+}
+
+//pool of ObservePersist Jobs
+type OPpool struct {
+ pool chan *ObservePersistJob
+}
+
+// NewPool creates a new pool of jobs
+func NewPool(max int) *OPpool {
+ return &OPpool{
+ pool: make(chan *ObservePersistJob, max),
+ }
+}
+
+// Borrow a Client from the pool.
+func (p *OPpool) Get() *ObservePersistJob {
+ var o *ObservePersistJob
+ select {
+ case o = <-p.pool:
+ default:
+ o = &ObservePersistJob{}
+ }
+ return o
+}
+
+// Return returns a Client to the pool.
+func (p *OPpool) Put(o *ObservePersistJob) {
+ select {
+ case p.pool <- o:
+ default:
+ // let it go, let it go...
+ }
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/pools.go b/vendor/github.com/couchbase/go-couchbase/pools.go
new file mode 100644
index 0000000000..39db2ddbd9
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/pools.go
@@ -0,0 +1,1746 @@
+package couchbase
+
+import (
+ "bufio"
+ "bytes"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/base64"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "net/http"
+ "net/url"
+ "runtime"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+ "unsafe"
+
+ "github.com/couchbase/goutils/logging"
+
+ "github.com/couchbase/gomemcached" // package name is 'gomemcached'
+ "github.com/couchbase/gomemcached/client" // package name is 'memcached'
+)
+
+// HTTPClient to use for REST and view operations.
+var MaxIdleConnsPerHost = 256
+var ClientTimeOut = 10 * time.Second
+var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost}
+var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut}
+
+// Use this client for reading from streams that should be open for an extended duration.
+var HTTPClientForStreaming = &http.Client{Transport: HTTPTransport, Timeout: 0}
+
+// PoolSize is the size of each connection pool (per host).
+var PoolSize = 64
+
+// PoolOverflow is the number of overflow connections allowed in a
+// pool.
+var PoolOverflow = 16
+
+// AsynchronousCloser turns on asynchronous closing for overflow connections
+var AsynchronousCloser = false
+
+// TCP KeepAlive enabled/disabled
+var TCPKeepalive = false
+
+// Enable MutationToken
+var EnableMutationToken = false
+
+// Enable Data Type response
+var EnableDataType = false
+
+// Enable Xattr
+var EnableXattr = false
+
+// Enable Collections
+var EnableCollections = false
+
+// TCP keepalive interval in seconds. Default 30 minutes
+var TCPKeepaliveInterval = 30 * 60
+
+// Used to decide whether to skip verification of certificates when
+// connecting to an ssl port.
+var skipVerify = true
+var certFile = ""
+var keyFile = ""
+var rootFile = ""
+
+func SetSkipVerify(skip bool) {
+ skipVerify = skip
+}
+
+func SetCertFile(cert string) {
+ certFile = cert
+}
+
+func SetKeyFile(cert string) {
+ keyFile = cert
+}
+
+func SetRootFile(cert string) {
+ rootFile = cert
+}
+
+// Allow applications to speciify the Poolsize and Overflow
+func SetConnectionPoolParams(size, overflow int) {
+
+ if size > 0 {
+ PoolSize = size
+ }
+
+ if overflow > 0 {
+ PoolOverflow = overflow
+ }
+}
+
+// Turn off overflow connections
+func DisableOverflowConnections() {
+ PoolOverflow = 0
+}
+
+// Toggle asynchronous overflow closer
+func EnableAsynchronousCloser(closer bool) {
+ AsynchronousCloser = closer
+}
+
+// Allow TCP keepalive parameters to be set by the application
+func SetTcpKeepalive(enabled bool, interval int) {
+
+ TCPKeepalive = enabled
+
+ if interval > 0 {
+ TCPKeepaliveInterval = interval
+ }
+}
+
+// AuthHandler is a callback that gets the auth username and password
+// for the given bucket.
+type AuthHandler interface {
+ GetCredentials() (string, string, string)
+}
+
+// AuthHandler is a callback that gets the auth username and password
+// for the given bucket and sasl for memcached.
+type AuthWithSaslHandler interface {
+ AuthHandler
+ GetSaslCredentials() (string, string)
+}
+
+// MultiBucketAuthHandler is kind of AuthHandler that may perform
+// different auth for different buckets.
+type MultiBucketAuthHandler interface {
+ AuthHandler
+ ForBucket(bucket string) AuthHandler
+}
+
+// HTTPAuthHandler is kind of AuthHandler that performs more general
+// for outgoing http requests than is possible via simple
+// GetCredentials() call (i.e. digest auth or different auth per
+// different destinations).
+type HTTPAuthHandler interface {
+ AuthHandler
+ SetCredsForRequest(req *http.Request) error
+}
+
+// RestPool represents a single pool returned from the pools REST API.
+type RestPool struct {
+ Name string `json:"name"`
+ StreamingURI string `json:"streamingUri"`
+ URI string `json:"uri"`
+}
+
+// Pools represents the collection of pools as returned from the REST API.
+type Pools struct {
+ ComponentsVersion map[string]string `json:"componentsVersion,omitempty"`
+ ImplementationVersion string `json:"implementationVersion"`
+ IsAdmin bool `json:"isAdminCreds"`
+ UUID string `json:"uuid"`
+ Pools []RestPool `json:"pools"`
+}
+
+// A Node is a computer in a cluster running the couchbase software.
+type Node struct {
+ ClusterCompatibility int `json:"clusterCompatibility"`
+ ClusterMembership string `json:"clusterMembership"`
+ CouchAPIBase string `json:"couchApiBase"`
+ Hostname string `json:"hostname"`
+ AlternateNames map[string]NodeAlternateNames `json:"alternateAddresses"`
+ InterestingStats map[string]float64 `json:"interestingStats,omitempty"`
+ MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"`
+ MCDMemoryReserved float64 `json:"mcdMemoryReserved"`
+ MemoryFree float64 `json:"memoryFree"`
+ MemoryTotal float64 `json:"memoryTotal"`
+ OS string `json:"os"`
+ Ports map[string]int `json:"ports"`
+ Services []string `json:"services"`
+ Status string `json:"status"`
+ Uptime int `json:"uptime,string"`
+ Version string `json:"version"`
+ ThisNode bool `json:"thisNode,omitempty"`
+}
+
+// A Pool of nodes and buckets.
+type Pool struct {
+ BucketMap map[string]*Bucket
+ Nodes []Node
+
+ BucketURL map[string]string `json:"buckets"`
+
+ MemoryQuota float64 `json:"memoryQuota"`
+ CbasMemoryQuota float64 `json:"cbasMemoryQuota"`
+ EventingMemoryQuota float64 `json:"eventingMemoryQuota"`
+ FtsMemoryQuota float64 `json:"ftsMemoryQuota"`
+ IndexMemoryQuota float64 `json:"indexMemoryQuota"`
+
+ client *Client
+}
+
+// VBucketServerMap is the a mapping of vbuckets to nodes.
+type VBucketServerMap struct {
+ HashAlgorithm string `json:"hashAlgorithm"`
+ NumReplicas int `json:"numReplicas"`
+ ServerList []string `json:"serverList"`
+ VBucketMap [][]int `json:"vBucketMap"`
+}
+
+type DurablitySettings struct {
+ Persist PersistTo
+ Observe ObserveTo
+}
+
+// Bucket is the primary entry point for most data operations.
+// Bucket is a locked data structure. All access to its fields should be done using read or write locking,
+// as appropriate.
+//
+// Some access methods require locking, but rely on the caller to do so. These are appropriate
+// for calls from methods that have already locked the structure. Methods like this
+// take a boolean parameter "bucketLocked".
+type Bucket struct {
+ sync.RWMutex
+ AuthType string `json:"authType"`
+ Capabilities []string `json:"bucketCapabilities"`
+ CapabilitiesVersion string `json:"bucketCapabilitiesVer"`
+ CollectionsManifestUid string `json:"collectionsManifestUid"`
+ Type string `json:"bucketType"`
+ Name string `json:"name"`
+ NodeLocator string `json:"nodeLocator"`
+ Quota map[string]float64 `json:"quota,omitempty"`
+ Replicas int `json:"replicaNumber"`
+ Password string `json:"saslPassword"`
+ URI string `json:"uri"`
+ StreamingURI string `json:"streamingUri"`
+ LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"`
+ UUID string `json:"uuid"`
+ ConflictResolutionType string `json:"conflictResolutionType,omitempty"`
+ DDocs struct {
+ URI string `json:"uri"`
+ } `json:"ddocs,omitempty"`
+ BasicStats map[string]interface{} `json:"basicStats,omitempty"`
+ Controllers map[string]interface{} `json:"controllers,omitempty"`
+
+ // These are used for JSON IO, but isn't used for processing
+ // since it needs to be swapped out safely.
+ VBSMJson VBucketServerMap `json:"vBucketServerMap"`
+ NodesJSON []Node `json:"nodes"`
+
+ pool *Pool
+ connPools unsafe.Pointer // *[]*connectionPool
+ vBucketServerMap unsafe.Pointer // *VBucketServerMap
+ nodeList unsafe.Pointer // *[]Node
+ commonSufix string
+ ah AuthHandler // auth handler
+ ds *DurablitySettings // Durablity Settings for this bucket
+ closed bool
+}
+
+// PoolServices is all the bucket-independent services in a pool
+type PoolServices struct {
+ Rev int `json:"rev"`
+ NodesExt []NodeServices `json:"nodesExt"`
+ Capabilities json.RawMessage `json:"clusterCapabilities"`
+}
+
+// NodeServices is all the bucket-independent services running on
+// a node (given by Hostname)
+type NodeServices struct {
+ Services map[string]int `json:"services,omitempty"`
+ Hostname string `json:"hostname"`
+ ThisNode bool `json:"thisNode"`
+ AlternateNames map[string]NodeAlternateNames `json:"alternateAddresses"`
+}
+
+type NodeAlternateNames struct {
+ Hostname string `json:"hostname"`
+ Ports map[string]int `json:"ports"`
+}
+
+type BucketNotFoundError struct {
+ bucket string
+}
+
+func (e *BucketNotFoundError) Error() string {
+ return fmt.Sprint("No bucket named " + e.bucket)
+}
+
+type BucketAuth struct {
+ name string
+ saslPwd string
+ bucket string
+}
+
+func newBucketAuth(name string, pass string, bucket string) *BucketAuth {
+ return &BucketAuth{name: name, saslPwd: pass, bucket: bucket}
+}
+
+func (ba *BucketAuth) GetCredentials() (string, string, string) {
+ return ba.name, ba.saslPwd, ba.bucket
+}
+
+// VBServerMap returns the current VBucketServerMap.
+func (b *Bucket) VBServerMap() *VBucketServerMap {
+ b.RLock()
+ defer b.RUnlock()
+ ret := (*VBucketServerMap)(b.vBucketServerMap)
+ return ret
+}
+
+func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) {
+ vbmap := b.VBServerMap()
+ servers := vbmap.ServerList
+ if addrs == nil {
+ addrs = vbmap.ServerList
+ }
+
+ m := make(map[string][]uint16)
+ for _, addr := range addrs {
+ m[addr] = make([]uint16, 0)
+ }
+ for vbno, idxs := range vbmap.VBucketMap {
+ if len(idxs) == 0 {
+ return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno)
+ } else if idxs[0] < 0 || idxs[0] >= len(servers) {
+ return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno)
+ }
+ addr := servers[idxs[0]]
+ if _, ok := m[addr]; ok {
+ m[addr] = append(m[addr], uint16(vbno))
+ }
+ }
+ return m, nil
+}
+
+// true if node is not on the bucket VBmap
+func (b *Bucket) checkVBmap(node string) bool {
+ vbmap := b.VBServerMap()
+ servers := vbmap.ServerList
+
+ for _, idxs := range vbmap.VBucketMap {
+ if len(idxs) == 0 {
+ return true
+ } else if idxs[0] < 0 || idxs[0] >= len(servers) {
+ return true
+ }
+ if servers[idxs[0]] == node {
+ return false
+ }
+ }
+ return true
+}
+
+func (b *Bucket) GetName() string {
+ b.RLock()
+ defer b.RUnlock()
+ ret := b.Name
+ return ret
+}
+
+func (b *Bucket) GetUUID() string {
+ b.RLock()
+ defer b.RUnlock()
+ ret := b.UUID
+ return ret
+}
+
+// Nodes returns the current list of nodes servicing this bucket.
+func (b *Bucket) Nodes() []Node {
+ b.RLock()
+ defer b.RUnlock()
+ ret := *(*[]Node)(b.nodeList)
+ return ret
+}
+
+// return the list of healthy nodes
+func (b *Bucket) HealthyNodes() []Node {
+ nodes := []Node{}
+
+ for _, n := range b.Nodes() {
+ if n.Status == "healthy" && n.CouchAPIBase != "" {
+ nodes = append(nodes, n)
+ }
+ if n.Status != "healthy" { // log non-healthy node
+ logging.Infof("Non-healthy node; node details:")
+ logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
+ }
+ }
+
+ return nodes
+}
+
+func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool {
+ if !bucketLocked {
+ b.RLock()
+ defer b.RUnlock()
+ }
+ if b.connPools != nil {
+ return *(*[]*connectionPool)(b.connPools)
+ } else {
+ return nil
+ }
+}
+
+func (b *Bucket) replaceConnPools(with []*connectionPool) {
+ b.Lock()
+ defer b.Unlock()
+
+ old := b.connPools
+ b.connPools = unsafe.Pointer(&with)
+ if old != nil {
+ for _, pool := range *(*[]*connectionPool)(old) {
+ if pool != nil {
+ pool.Close()
+ }
+ }
+ }
+ return
+}
+
+func (b *Bucket) getConnPool(i int) *connectionPool {
+
+ if i < 0 {
+ return nil
+ }
+
+ p := b.getConnPools(false /* not already locked */)
+ if len(p) > i {
+ return p[i]
+ }
+
+ return nil
+}
+
+func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool {
+ pools := b.getConnPools(bucketLocked)
+ for _, p := range pools {
+ if p != nil && p.host == host {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Given a vbucket number, returns a memcached connection to it.
+// The connection must be returned to its pool after use.
+func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) {
+ for {
+ vbm := b.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v",
+ vb, vbm.VBucketMap)
+ }
+ masterId := vbm.VBucketMap[vb][0]
+ if masterId < 0 {
+ return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb)
+ }
+ pool := b.getConnPool(masterId)
+ conn, err := pool.Get()
+ if err != errClosedPool {
+ return conn, pool, err
+ }
+ // If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
+ }
+}
+
+// To get random documents, we need to cover all the nodes, so select
+// a connection at random.
+
+func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) {
+ for {
+ var currentPool = 0
+ pools := b.getConnPools(false /* not already locked */)
+ if len(pools) == 0 {
+ return nil, nil, fmt.Errorf("No connection pool found")
+ } else if len(pools) > 1 { // choose a random connection
+ currentPool = rand.Intn(len(pools))
+ } // if only one pool, currentPool defaults to 0, i.e., the only pool
+
+ // get the pool
+ pool := pools[currentPool]
+ conn, err := pool.Get()
+ if err != errClosedPool {
+ return conn, pool, err
+ }
+
+ // If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
+ }
+}
+
+//
+// Get a random document from a bucket. Since the bucket may be distributed
+// across nodes, we must first select a random connection, and then use the
+// Client.GetRandomDoc() call to get a random document from that node.
+//
+
+func (b *Bucket) GetRandomDoc(context ...*memcached.ClientContext) (*gomemcached.MCResponse, error) {
+ // get a connection from the pool
+ conn, pool, err := b.getRandomConnection()
+
+ if err != nil {
+ return nil, err
+ }
+ conn.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
+
+ // We may need to select the bucket before GetRandomDoc()
+ // will work. This is sometimes done at startup (see defaultMkConn())
+ // but not always, depending on the auth type.
+ _, err = conn.SelectBucket(b.Name)
+ if err != nil {
+ return nil, err
+ }
+
+ // get a randomm document from the connection
+ doc, err := conn.GetRandomDoc(context...)
+ // need to return the connection to the pool
+ pool.Return(conn)
+ return doc, err
+}
+
+// Bucket DDL
+func uriAdj(s string) string {
+ return strings.Replace(s, "%", "%25", -1)
+}
+
+func (b *Bucket) CreateScope(scope string) error {
+ b.RLock()
+ pool := b.pool
+ client := pool.client
+ b.RUnlock()
+ args := map[string]interface{}{"name": scope}
+ return client.parsePostURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections", args, nil)
+}
+
+func (b *Bucket) DropScope(scope string) error {
+ b.RLock()
+ pool := b.pool
+ client := pool.client
+ b.RUnlock()
+ return client.parseDeleteURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections/"+uriAdj(scope), nil, nil)
+}
+
+func (b *Bucket) CreateCollection(scope string, collection string) error {
+ b.RLock()
+ pool := b.pool
+ client := pool.client
+ b.RUnlock()
+ args := map[string]interface{}{"name": collection}
+ return client.parsePostURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections/"+uriAdj(scope), args, nil)
+}
+
+func (b *Bucket) DropCollection(scope string, collection string) error {
+ b.RLock()
+ pool := b.pool
+ client := pool.client
+ b.RUnlock()
+ return client.parseDeleteURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections/"+uriAdj(scope)+"/"+uriAdj(collection), nil, nil)
+}
+
+func (b *Bucket) FlushCollection(scope string, collection string) error {
+ b.RLock()
+ pool := b.pool
+ client := pool.client
+ b.RUnlock()
+ args := map[string]interface{}{"name": collection, "scope": scope}
+ return client.parsePostURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections-flush", args, nil)
+}
+
+func (b *Bucket) getMasterNode(i int) string {
+ p := b.getConnPools(false /* not already locked */)
+ if len(p) > i {
+ return p[i].host
+ }
+ return ""
+}
+
+func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) {
+ if !bucketLocked {
+ b.RLock()
+ defer b.RUnlock()
+ }
+ pool := b.pool
+ name := b.Name
+
+ if pool != nil {
+ ah = pool.client.ah
+ }
+ if mbah, ok := ah.(MultiBucketAuthHandler); ok {
+ return mbah.ForBucket(name)
+ }
+ if ah == nil {
+ ah = &basicAuth{name, ""}
+ }
+ return
+}
+
+// NodeAddresses gets the (sorted) list of memcached node addresses
+// (hostname:port).
+func (b *Bucket) NodeAddresses() []string {
+ vsm := b.VBServerMap()
+ rv := make([]string, len(vsm.ServerList))
+ copy(rv, vsm.ServerList)
+ sort.Strings(rv)
+ return rv
+}
+
+// CommonAddressSuffix finds the longest common suffix of all
+// host:port strings in the node list.
+func (b *Bucket) CommonAddressSuffix() string {
+ input := []string{}
+ for _, n := range b.Nodes() {
+ input = append(input, n.Hostname)
+ }
+ return FindCommonSuffix(input)
+}
+
+// A Client is the starting point for all services across all buckets
+// in a Couchbase cluster.
+type Client struct {
+ BaseURL *url.URL
+ ah AuthHandler
+ Info Pools
+ tlsConfig *tls.Config
+}
+
+func maybeAddAuth(req *http.Request, ah AuthHandler) error {
+ if hah, ok := ah.(HTTPAuthHandler); ok {
+ return hah.SetCredsForRequest(req)
+ }
+ if ah != nil {
+ user, pass, _ := ah.GetCredentials()
+ req.Header.Set("Authorization", "Basic "+
+ base64.StdEncoding.EncodeToString([]byte(user+":"+pass)))
+ }
+ return nil
+}
+
+// arbitary number, may need to be tuned #FIXME
+const HTTP_MAX_RETRY = 5
+
+// Someday golang network packages will implement standard
+// error codes. Until then #sigh
+func isHttpConnError(err error) bool {
+
+ estr := err.Error()
+ return strings.Contains(estr, "broken pipe") ||
+ strings.Contains(estr, "broken connection") ||
+ strings.Contains(estr, "connection reset")
+}
+
+var client *http.Client
+var clientForStreaming *http.Client
+
+func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) {
+ cfg := &tls.Config{}
+
+ if certFile != "" && keyFile != "" {
+ tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile)
+ if err != nil {
+ return nil, err
+ }
+ cfg.Certificates = []tls.Certificate{tlsCert}
+ } else {
+ //error need to pass both certfile and keyfile
+ return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile")
+ }
+
+ var caCert []byte
+ var err1 error
+
+ caCertPool := x509.NewCertPool()
+ if rootFile != "" {
+ // Read that value in
+ caCert, err1 = ioutil.ReadFile(rootFile)
+ if err1 != nil {
+ return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1)
+ }
+ caCertPool.AppendCertsFromPEM(caCert)
+ }
+
+ cfg.RootCAs = caCertPool
+ return cfg, nil
+}
+
+// This version of doHTTPRequest is for requests where the response connection is held open
+// for an extended duration since line is a new and significant output.
+//
+// The ordinary version of this method expects the results to arrive promptly, and
+// therefore use an HTTP client with a timeout. This client is not suitable
+// for streaming use.
+func doHTTPRequestForStreaming(req *http.Request) (*http.Response, error) {
+ var err error
+ var res *http.Response
+
+ // we need a client that ignores certificate errors, since we self-sign
+ // our certs
+ if clientForStreaming == nil && req.URL.Scheme == "https" {
+ var tr *http.Transport
+
+ if skipVerify {
+ tr = &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ }
+ } else {
+ // Handle cases with cert
+
+ cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
+ if err != nil {
+ return nil, err
+ }
+
+ tr = &http.Transport{
+ TLSClientConfig: cfg,
+ }
+ }
+
+ clientForStreaming = &http.Client{Transport: tr, Timeout: 0}
+
+ } else if clientForStreaming == nil {
+ clientForStreaming = HTTPClientForStreaming
+ }
+
+ for i := 0; i < HTTP_MAX_RETRY; i++ {
+ res, err = clientForStreaming.Do(req)
+ if err != nil && isHttpConnError(err) {
+ continue
+ }
+ break
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ return res, err
+}
+
+func doHTTPRequest(req *http.Request) (*http.Response, error) {
+
+ var err error
+ var res *http.Response
+
+ // we need a client that ignores certificate errors, since we self-sign
+ // our certs
+ if client == nil && req.URL.Scheme == "https" {
+ var tr *http.Transport
+
+ if skipVerify {
+ tr = &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ }
+ } else {
+ // Handle cases with cert
+
+ cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
+ if err != nil {
+ return nil, err
+ }
+
+ tr = &http.Transport{
+ TLSClientConfig: cfg,
+ }
+ }
+
+ client = &http.Client{Transport: tr}
+
+ } else if client == nil {
+ client = HTTPClient
+ }
+
+ for i := 0; i < HTTP_MAX_RETRY; i++ {
+ res, err = client.Do(req)
+ if err != nil && isHttpConnError(err) {
+ continue
+ }
+ break
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ return res, err
+}
+
+func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}, terse bool) error {
+ return doOutputAPI("PUT", baseURL, path, params, authHandler, out, terse)
+}
+
+func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}, terse bool) error {
+ return doOutputAPI("POST", baseURL, path, params, authHandler, out, terse)
+}
+
+func doDeleteAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}, terse bool) error {
+ return doOutputAPI("DELETE", baseURL, path, params, authHandler, out, terse)
+}
+
+func doOutputAPI(
+ httpVerb string,
+ baseURL *url.URL,
+ path string,
+ params map[string]interface{},
+ authHandler AuthHandler,
+ out interface{},
+ terse bool) error {
+
+ var requestUrl string
+
+ if q := strings.Index(path, "?"); q > 0 {
+ requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
+ } else {
+ requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
+ }
+
+ postData := url.Values{}
+ for k, v := range params {
+ postData.Set(k, fmt.Sprintf("%v", v))
+ }
+
+ req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode()))
+ if err != nil {
+ return err
+ }
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ err = maybeAddAuth(req, authHandler)
+ if err != nil {
+ return err
+ }
+
+ res, err := doHTTPRequest(req)
+ if err != nil {
+ return err
+ }
+
+ defer res.Body.Close()
+ // 200 - ok, 202 - accepted (asynchronously)
+ if res.StatusCode != 200 && res.StatusCode != 202 {
+ bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
+ if terse {
+ var outBuf interface{}
+
+ err := json.Unmarshal(bod, &outBuf)
+ if err == nil && outBuf != nil {
+ switch errText := outBuf.(type) {
+ case string:
+ return fmt.Errorf("%s", errText)
+ case map[string]interface{}:
+ errField := errText["errors"]
+ if errField != nil {
+
+ // remove annoying 'map' prefix
+ return fmt.Errorf("%s", strings.TrimPrefix(fmt.Sprintf("%v", errField), "map"))
+ }
+ }
+ }
+ return fmt.Errorf("%s", string(bod))
+ }
+ return fmt.Errorf("HTTP error %v getting %q: %s",
+ res.Status, requestUrl, bod)
+ }
+
+ d := json.NewDecoder(res.Body)
+ // PUT/POST/DELETE request may not have a response body
+ if d.More() {
+ if err = d.Decode(&out); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func queryRestAPI(
+ baseURL *url.URL,
+ path string,
+ authHandler AuthHandler,
+ out interface{},
+ terse bool) error {
+
+ var requestUrl string
+
+ if q := strings.Index(path, "?"); q > 0 {
+ requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
+ } else {
+ requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
+ }
+
+ req, err := http.NewRequest("GET", requestUrl, nil)
+ if err != nil {
+ return err
+ }
+
+ err = maybeAddAuth(req, authHandler)
+ if err != nil {
+ return err
+ }
+
+ res, err := doHTTPRequest(req)
+ if err != nil {
+ return err
+ }
+
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
+ if terse {
+ var outBuf interface{}
+
+ err := json.Unmarshal(bod, &outBuf)
+ if err == nil && outBuf != nil {
+ errText, ok := outBuf.(string)
+ if ok {
+ return fmt.Errorf(errText)
+ }
+ }
+ return fmt.Errorf(string(bod))
+ }
+ return fmt.Errorf("HTTP error %v getting %q: %s",
+ res.Status, requestUrl, bod)
+ }
+
+ d := json.NewDecoder(res.Body)
+ // GET request should have a response body
+ if err = d.Decode(&out); err != nil {
+ return fmt.Errorf("json decode err: %#v, for requestUrl: %s",
+ err, requestUrl)
+ }
+ return nil
+}
+
+func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error {
+ return c.processStream(c.BaseURL, path, c.ah, callb, data)
+}
+
+// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309
+func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error {
+ var requestUrl string
+
+ if q := strings.Index(path, "?"); q > 0 {
+ requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
+ } else {
+ requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
+ }
+
+ req, err := http.NewRequest("GET", requestUrl, nil)
+ if err != nil {
+ return err
+ }
+
+ err = maybeAddAuth(req, authHandler)
+ if err != nil {
+ return err
+ }
+
+ res, err := doHTTPRequestForStreaming(req)
+ if err != nil {
+ return err
+ }
+
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
+ return fmt.Errorf("HTTP error %v getting %q: %s",
+ res.Status, requestUrl, bod)
+ }
+
+ reader := bufio.NewReader(res.Body)
+ for {
+ bs, err := reader.ReadBytes('\n')
+ if err != nil {
+ return err
+ }
+ if len(bs) == 1 && bs[0] == '\n' {
+ continue
+ }
+
+ err = json.Unmarshal(bs, data)
+ if err != nil {
+ return err
+ }
+ err = callb(data)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+
+}
+
+func (c *Client) parseURLResponse(path string, out interface{}) error {
+ return queryRestAPI(c.BaseURL, path, c.ah, out, false)
+}
+
+func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error {
+ return doPostAPI(c.BaseURL, path, params, c.ah, out, false)
+}
+
+func (c *Client) parsePostURLResponseTerse(path string, params map[string]interface{}, out interface{}) error {
+ return doPostAPI(c.BaseURL, path, params, c.ah, out, true)
+}
+
+func (c *Client) parseDeleteURLResponse(path string, params map[string]interface{}, out interface{}) error {
+ return doDeleteAPI(c.BaseURL, path, params, c.ah, out, false)
+}
+
+func (c *Client) parseDeleteURLResponseTerse(path string, params map[string]interface{}, out interface{}) error {
+ return doDeleteAPI(c.BaseURL, path, params, c.ah, out, true)
+}
+
+func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error {
+ return doPutAPI(c.BaseURL, path, params, c.ah, out, false)
+}
+
+func (c *Client) parsePutURLResponseTerse(path string, params map[string]interface{}, out interface{}) error {
+ return doPutAPI(c.BaseURL, path, params, c.ah, out, true)
+}
+
+func (b *Bucket) parseURLResponse(path string, out interface{}) error {
+ nodes := b.Nodes()
+ if len(nodes) == 0 {
+ return errors.New("no couch rest URLs")
+ }
+
+ // Pick a random node to start querying.
+ startNode := rand.Intn(len(nodes))
+ maxRetries := len(nodes)
+ for i := 0; i < maxRetries; i++ {
+ node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
+ // Skip non-healthy nodes.
+ if node.Status != "healthy" || node.CouchAPIBase == "" {
+ continue
+ }
+ url := &url.URL{
+ Host: node.Hostname,
+ Scheme: "http",
+ }
+
+ // Lock here to avoid having pool closed under us.
+ b.RLock()
+ err := queryRestAPI(url, path, b.pool.client.ah, out, false)
+ b.RUnlock()
+ if err == nil {
+ return err
+ }
+ }
+ return errors.New("All nodes failed to respond or no healthy nodes for bucket found")
+}
+
+func (b *Bucket) parseAPIResponse(path string, out interface{}) error {
+ nodes := b.Nodes()
+ if len(nodes) == 0 {
+ return errors.New("no couch rest URLs")
+ }
+
+ var err error
+ var u *url.URL
+
+ // Pick a random node to start querying.
+ startNode := rand.Intn(len(nodes))
+ maxRetries := len(nodes)
+ for i := 0; i < maxRetries; i++ {
+ node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
+ // Skip non-healthy nodes.
+ if node.Status != "healthy" || node.CouchAPIBase == "" {
+ continue
+ }
+
+ u, err = ParseURL(node.CouchAPIBase)
+ // Lock here so pool does not get closed under us.
+ b.RLock()
+ if err != nil {
+ b.RUnlock()
+ return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
+ b.Name, i, node.CouchAPIBase, err)
+ } else if b.pool != nil {
+ u.User = b.pool.client.BaseURL.User
+ }
+ u.Path = path
+
+ // generate the path so that the strings are properly escaped
+ // MB-13770
+ requestPath := strings.Split(u.String(), u.Host)[1]
+
+ err = queryRestAPI(u, requestPath, b.pool.client.ah, out, false)
+ b.RUnlock()
+ if err == nil {
+ return err
+ }
+ }
+
+ var errStr string
+ if err != nil {
+ errStr = "Error " + err.Error()
+ }
+
+ return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr)
+}
+
+type basicAuth struct {
+ u, p string
+}
+
+func (b basicAuth) GetCredentials() (string, string, string) {
+ return b.u, b.p, b.u
+}
+
+func basicAuthFromURL(us string) (ah AuthHandler) {
+ u, err := ParseURL(us)
+ if err != nil {
+ return
+ }
+ if user := u.User; user != nil {
+ pw, _ := user.Password()
+ ah = basicAuth{user.Username(), pw}
+ }
+ return
+}
+
+// ConnectWithAuth connects to a couchbase cluster with the given
+// authentication handler.
+func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) {
+ c.BaseURL, err = ParseURL(baseU)
+ if err != nil {
+ return
+ }
+ c.ah = ah
+
+ return c, c.parseURLResponse("/pools", &c.Info)
+}
+
+// Call this method with a TLS certificate file name to make communication
+// with the KV engine encrypted.
+//
+// This method should be called immediately after a Connect*() method.
+func (c *Client) InitTLS(certFile string) error {
+ serverCert, err := ioutil.ReadFile(certFile)
+ if err != nil {
+ return err
+ }
+ CA_Pool := x509.NewCertPool()
+ CA_Pool.AppendCertsFromPEM(serverCert)
+ c.tlsConfig = &tls.Config{RootCAs: CA_Pool}
+ return nil
+}
+
+func (c *Client) ClearTLS() {
+ c.tlsConfig = nil
+}
+
+// ConnectWithAuthCreds connects to a couchbase cluster with the give
+// authorization creds returned by cb_auth
+func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) {
+ c.BaseURL, err = ParseURL(baseU)
+ if err != nil {
+ return
+ }
+
+ c.ah = newBucketAuth(username, password, "")
+ return c, c.parseURLResponse("/pools", &c.Info)
+}
+
+// Connect to a couchbase cluster. An authentication handler will be
+// created from the userinfo in the URL if provided.
+func Connect(baseU string) (Client, error) {
+ return ConnectWithAuth(baseU, basicAuthFromURL(baseU))
+}
+
+type BucketInfo struct {
+ Name string // name of bucket
+ Password string // SASL password of bucket
+}
+
+//Get SASL buckets
+func GetBucketList(baseU string) (bInfo []BucketInfo, err error) {
+
+ c := &Client{}
+ c.BaseURL, err = ParseURL(baseU)
+ if err != nil {
+ return
+ }
+ c.ah = basicAuthFromURL(baseU)
+
+ var buckets []Bucket
+ err = c.parseURLResponse("/pools/default/buckets", &buckets)
+ if err != nil {
+ return
+ }
+ bInfo = make([]BucketInfo, 0)
+ for _, bucket := range buckets {
+ bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password}
+ bInfo = append(bInfo, bucketInfo)
+ }
+ return bInfo, err
+}
+
+//Set viewUpdateDaemonOptions
+func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) {
+
+ c := &Client{}
+ c.BaseURL, err = ParseURL(baseU)
+ if err != nil {
+ return
+ }
+ c.ah = basicAuthFromURL(baseU)
+
+ if len(params) < 1 {
+ return nil, fmt.Errorf("No params to set")
+ }
+
+ err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts)
+ if err != nil {
+ return
+ }
+ return viewOpts, err
+}
+
+// This API lets the caller know, if the list of nodes a bucket is
+// connected to has gone through an edit (a rebalance operation)
+// since the last update to the bucket, in which case a Refresh is
+// advised.
+func (b *Bucket) NodeListChanged() bool {
+ b.RLock()
+ pool := b.pool
+ uri := b.URI
+ b.RUnlock()
+
+ tmpb := &Bucket{}
+ err := pool.client.parseURLResponse(uri, tmpb)
+ if err != nil {
+ return true
+ }
+
+ bNodes := *(*[]Node)(b.nodeList)
+ if len(bNodes) != len(tmpb.NodesJSON) {
+ return true
+ }
+
+ bucketHostnames := map[string]bool{}
+ for _, node := range bNodes {
+ bucketHostnames[node.Hostname] = true
+ }
+
+ for _, node := range tmpb.NodesJSON {
+ if _, found := bucketHostnames[node.Hostname]; !found {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Sample data for scopes and collections as returned from the
+// /pooles/default/$BUCKET_NAME/collections API.
+// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}}
+
+// Structures for parsing collections manifest.
+// The map key is the name of the scope.
+// Example data:
+// {"uid":"b","scopes":[
+// {"name":"_default","uid":"0","collections":[
+// {"name":"_default","uid":"0"}]},
+// {"name":"myScope1","uid":"8","collections":[
+// {"name":"myCollectionB","uid":"c"},
+// {"name":"myCollectionA","uid":"b"}]},
+// {"name":"myScope2","uid":"9","collections":[
+// {"name":"myCollectionC","uid":"d"}]}]}
+type InputManifest struct {
+ Uid string
+ Scopes []InputScope
+}
+type InputScope struct {
+ Name string
+ Uid string
+ Collections []InputCollection
+}
+type InputCollection struct {
+ Name string
+ Uid string
+}
+
+// Structures for storing collections information.
+type Manifest struct {
+ Uid uint64
+ Scopes map[string]*Scope // map by name
+}
+type Scope struct {
+ Name string
+ Uid uint64
+ Collections map[string]*Collection // map by name
+}
+type Collection struct {
+ Name string
+ Uid uint64
+}
+
+var _EMPTY_MANIFEST *Manifest = &Manifest{Uid: 0, Scopes: map[string]*Scope{}}
+
+func parseCollectionsManifest(res *gomemcached.MCResponse) (*Manifest, error) {
+ if !EnableCollections {
+ return _EMPTY_MANIFEST, nil
+ }
+
+ var im InputManifest
+ err := json.Unmarshal(res.Body, &im)
+ if err != nil {
+ return nil, err
+ }
+
+ uid, err := strconv.ParseUint(im.Uid, 16, 64)
+ if err != nil {
+ return nil, err
+ }
+ mani := &Manifest{Uid: uid, Scopes: make(map[string]*Scope, len(im.Scopes))}
+ for _, iscope := range im.Scopes {
+ scope_uid, err := strconv.ParseUint(iscope.Uid, 16, 64)
+ if err != nil {
+ return nil, err
+ }
+ scope := &Scope{Uid: scope_uid, Name: iscope.Name, Collections: make(map[string]*Collection, len(iscope.Collections))}
+ mani.Scopes[iscope.Name] = scope
+ for _, icoll := range iscope.Collections {
+ coll_uid, err := strconv.ParseUint(icoll.Uid, 16, 64)
+ if err != nil {
+ return nil, err
+ }
+ coll := &Collection{Uid: coll_uid, Name: icoll.Name}
+ scope.Collections[icoll.Name] = coll
+ }
+ }
+
+ return mani, nil
+}
+
+// This function assumes the bucket is locked.
+func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
+ // Collections not used?
+ if !EnableCollections {
+ return nil, fmt.Errorf("Collections not enabled.")
+ }
+
+ b.RLock()
+ pools := b.getConnPools(true /* already locked */)
+ pool := pools[0] // Any pool will do, so use the first one.
+ b.RUnlock()
+ client, err := pool.Get()
+ if err != nil {
+ return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
+ }
+ client.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
+
+ // We need to select the bucket before GetCollectionsManifest()
+ // will work. This is sometimes done at startup (see defaultMkConn())
+ // but not always, depending on the auth type.
+ // Doing this is safe because we collect the the connections
+ // by bucket, so the bucket being selected will never change.
+ _, err = client.SelectBucket(b.Name)
+ if err != nil {
+ pool.Return(client)
+ return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
+ }
+
+ res, err := client.GetCollectionsManifest()
+ if err != nil {
+ pool.Return(client)
+ return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
+ }
+ mani, err := parseCollectionsManifest(res)
+ if err != nil {
+ pool.Return(client)
+ return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
+ }
+
+ pool.Return(client)
+ return mani, nil
+}
+
+func (b *Bucket) RefreshFully() error {
+ return b.refresh(false)
+}
+
+func (b *Bucket) Refresh() error {
+ return b.refresh(true)
+}
+
+func (b *Bucket) refresh(preserveConnections bool) error {
+ b.RLock()
+ pool := b.pool
+ uri := b.URI
+ client := pool.client
+ b.RUnlock()
+
+ var poolServices PoolServices
+ var err error
+ if client.tlsConfig != nil {
+ poolServices, err = client.GetPoolServices("default")
+ if err != nil {
+ return err
+ }
+ }
+
+ tmpb := &Bucket{}
+ err = pool.client.parseURLResponse(uri, tmpb)
+ if err != nil {
+ return err
+ }
+
+ pools := b.getConnPools(false /* bucket not already locked */)
+
+ // We need this lock to ensure that bucket refreshes happening because
+ // of NMVb errors received during bulkGet do not end up over-writing
+ // pool.inUse.
+ b.Lock()
+
+ for _, pool := range pools {
+ if pool != nil {
+ pool.inUse = false
+ }
+ }
+
+ newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
+ for i := range newcps {
+ hostport := tmpb.VBSMJson.ServerList[i]
+ if preserveConnections {
+ pool := b.getConnPoolByHost(hostport, true /* bucket already locked */)
+ if pool != nil && pool.inUse == false && (!pool.encrypted || pool.tlsConfig == client.tlsConfig) {
+ // if the hostname and index is unchanged then reuse this pool
+ newcps[i] = pool
+ pool.inUse = true
+ continue
+ }
+ }
+
+ var encrypted bool
+ if client.tlsConfig != nil {
+ hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
+ if err != nil {
+ b.Unlock()
+ return err
+ }
+ }
+
+ if b.ah != nil {
+ newcps[i] = newConnectionPool(hostport,
+ b.ah, AsynchronousCloser, PoolSize, PoolOverflow, client.tlsConfig, b.Name, encrypted)
+
+ } else {
+ newcps[i] = newConnectionPool(hostport,
+ b.authHandler(true /* bucket already locked */),
+ AsynchronousCloser, PoolSize, PoolOverflow, client.tlsConfig, b.Name, encrypted)
+ }
+ }
+ b.replaceConnPools2(newcps, true /* bucket already locked */)
+ tmpb.ah = b.ah
+ b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
+ b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
+
+ b.Unlock()
+ return nil
+}
+
+func (p *Pool) refresh() (err error) {
+ p.BucketMap = make(map[string]*Bucket)
+
+ buckets := []Bucket{}
+ err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets)
+ if err != nil {
+ return err
+ }
+ for i, _ := range buckets {
+ b := new(Bucket)
+ *b = buckets[i]
+ b.pool = p
+ b.nodeList = unsafe.Pointer(&b.NodesJSON)
+
+ // MB-33185 this is merely defensive, just in case
+ // refresh() gets called on a perfectly node pool
+ ob, ok := p.BucketMap[b.Name]
+ if ok && ob.connPools != nil {
+ ob.Close()
+ }
+ b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
+ p.BucketMap[b.Name] = b
+ runtime.SetFinalizer(b, bucketFinalizer)
+ }
+ buckets = nil
+ return nil
+}
+
+// GetPool gets a pool from within the couchbase cluster (usually
+// "default").
+func (c *Client) GetPool(name string) (p Pool, err error) {
+ var poolURI string
+
+ for _, p := range c.Info.Pools {
+ if p.Name == name {
+ poolURI = p.URI
+ break
+ }
+ }
+ if poolURI == "" {
+ return p, errors.New("No pool named " + name)
+ }
+
+ err = c.parseURLResponse(poolURI, &p)
+ if err != nil {
+ return p, err
+ }
+
+ p.client = c
+
+ err = p.refresh()
+ return
+}
+
+// GetPoolServices returns all the bucket-independent services in a pool.
+// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)
+func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
+ var poolName string
+ for _, p := range c.Info.Pools {
+ if p.Name == name {
+ poolName = p.Name
+ }
+ }
+ if poolName == "" {
+ return ps, errors.New("No pool named " + name)
+ }
+
+ poolURI := "/pools/" + poolName + "/nodeServices"
+ err = c.parseURLResponse(poolURI, &ps)
+
+ return
+}
+
+func (b *Bucket) GetPoolServices(name string) (*PoolServices, error) {
+ b.RLock()
+ pool := b.pool
+ b.RUnlock()
+
+ ps, err := pool.client.GetPoolServices(name)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ps, nil
+}
+
+// Close marks this bucket as no longer needed, closing connections it
+// may have open.
+func (b *Bucket) Close() {
+ b.Lock()
+ defer b.Unlock()
+ if b.connPools != nil {
+ for _, c := range b.getConnPools(true /* already locked */) {
+ if c != nil {
+ c.Close()
+ }
+ }
+ b.connPools = nil
+ }
+}
+
+func bucketFinalizer(b *Bucket) {
+ if b.connPools != nil {
+ if !b.closed {
+ logging.Warnf("Finalizing a bucket with active connections.")
+ }
+
+ // MB-33185 do not leak connection pools
+ b.Close()
+ }
+}
+
+// GetBucket gets a bucket from within this pool.
+func (p *Pool) GetBucket(name string) (*Bucket, error) {
+ rv, ok := p.BucketMap[name]
+ if !ok {
+ return nil, &BucketNotFoundError{bucket: name}
+ }
+ err := rv.Refresh()
+ if err != nil {
+ return nil, err
+ }
+ return rv, nil
+}
+
+// GetBucket gets a bucket from within this pool.
+func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) {
+ rv, ok := p.BucketMap[bucket]
+ if !ok {
+ return nil, &BucketNotFoundError{bucket: bucket}
+ }
+ rv.ah = newBucketAuth(username, password, bucket)
+ err := rv.Refresh()
+ if err != nil {
+ return nil, err
+ }
+ return rv, nil
+}
+
+// GetPool gets the pool to which this bucket belongs.
+func (b *Bucket) GetPool() *Pool {
+ b.RLock()
+ defer b.RUnlock()
+ ret := b.pool
+ return ret
+}
+
+// GetClient gets the client from which we got this pool.
+func (p *Pool) GetClient() *Client {
+ return p.client
+}
+
+// Release bucket connections when the pool is no longer in use
+func (p *Pool) Close() {
+
+ // MB-36186 make the bucket map inaccessible
+ bucketMap := p.BucketMap
+ p.BucketMap = nil
+
+ // fine to loop through the buckets unlocked
+ // locking happens at the bucket level
+ for b, _ := range bucketMap {
+
+ // MB-36186 make the bucket unreachable and avoid concurrent read/write map panics
+ bucket := bucketMap[b]
+ bucketMap[b] = nil
+
+ bucket.Lock()
+
+ // MB-33208 defer closing connection pools until the bucket is no longer used
+ // MB-36186 if the bucket is unused make it unreachable straight away
+ needClose := bucket.connPools == nil && !bucket.closed
+ if needClose {
+ runtime.SetFinalizer(&bucket, nil)
+ }
+ bucket.closed = true
+ bucket.Unlock()
+ if needClose {
+ bucket.Close()
+ }
+ }
+}
+
+// GetBucket is a convenience function for getting a named bucket from
+// a URL
+func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) {
+ var err error
+ client, err := Connect(endpoint)
+ if err != nil {
+ return nil, err
+ }
+
+ pool, err := client.GetPool(poolname)
+ if err != nil {
+ return nil, err
+ }
+
+ return pool.GetBucket(bucketname)
+}
+
+// ConnectWithAuthAndGetBucket is a convenience function for
+// getting a named bucket from a given URL and an auth callback
+func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
+ ah AuthHandler) (*Bucket, error) {
+ client, err := ConnectWithAuth(endpoint, ah)
+ if err != nil {
+ return nil, err
+ }
+
+ pool, err := client.GetPool(poolname)
+ if err != nil {
+ return nil, err
+ }
+
+ return pool.GetBucket(bucketname)
+}
+
+func GetSystemBucket(c *Client, p *Pool, name string) (*Bucket, error) {
+ bucket, err := p.GetBucket(name)
+ if err != nil {
+ if _, ok := err.(*BucketNotFoundError); !ok {
+ return nil, err
+ }
+
+ // create the bucket if not found
+ args := map[string]interface{}{
+ "authType": "sasl",
+ "bucketType": "couchbase",
+ "name": name,
+ "ramQuotaMB": 100,
+ "saslPassword": "donotuse",
+ }
+ var ret interface{}
+ // allow "bucket already exists" error in case duplicate create
+ // (e.g. two query nodes starting at same time)
+ err = c.parsePostURLResponseTerse("/pools/default/buckets", args, &ret)
+ if err != nil && !AlreadyExistsError(err) {
+ return nil, err
+ }
+
+ // bucket created asynchronously, try to get the bucket
+ maxRetry := 8
+ interval := 100 * time.Millisecond
+ for i := 0; i < maxRetry; i++ {
+ time.Sleep(interval)
+ interval *= 2
+ err = p.refresh()
+ if err != nil {
+ return nil, err
+ }
+ bucket, err = p.GetBucket(name)
+ if bucket != nil {
+ bucket.RLock()
+ ok := !bucket.closed && len(bucket.getConnPools(true /* already locked */)) > 0
+ bucket.RUnlock()
+ if ok {
+ break
+ }
+ } else if err != nil {
+ if _, ok := err.(*BucketNotFoundError); !ok {
+ break
+ }
+ }
+ }
+ }
+
+ return bucket, err
+}
+
+func DropSystemBucket(c *Client, name string) error {
+ err := c.parseDeleteURLResponseTerse("/pools/default/buckets/"+name, nil, nil)
+ return err
+}
+
+func AlreadyExistsError(err error) bool {
+ // Bucket error: Bucket with given name already exists
+ // Scope error: Scope with this name already exists
+ // Collection error: Collection with this name already exists
+ return strings.Contains(err.Error(), " name already exists")
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/port_map.go b/vendor/github.com/couchbase/go-couchbase/port_map.go
new file mode 100644
index 0000000000..864bd4aedb
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/port_map.go
@@ -0,0 +1,106 @@
+package couchbase
+
+/*
+
+The goal here is to map a hostname:port combination to another hostname:port
+combination. The original hostname:port gives the name and regular KV port
+of a couchbase server. We want to determine the corresponding SSL KV port.
+
+To do this, we have a pool services structure, as obtained from
+the /pools/default/nodeServices API.
+
+For a fully configured two-node system, the structure may look like this:
+{"rev":32,"nodesExt":[
+ {"services":{"mgmt":8091,"mgmtSSL":18091,"fts":8094,"ftsSSL":18094,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211},"hostname":"172.23.123.101"},
+ {"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211,"n1ql":8093,"n1qlSSL":18093},"thisNode":true,"hostname":"172.23.123.102"}]}
+
+In this case, note the "hostname" fields, and the "kv" and "kvSSL" fields.
+
+For a single-node system, perhaps brought up for testing, the structure may look like this:
+{"rev":66,"nodesExt":[
+ {"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"kv":11210,"kvSSL":11207,"capi":8092,"capiSSL":18092,"projector":9999,"n1ql":8093,"n1qlSSL":18093},"thisNode":true}],"clusterCapabilitiesVer":[1,0],"clusterCapabilities":{"n1ql":["enhancedPreparedStatements"]}}
+
+Here, note that there is only a single entry in the "nodeExt" array and that it does not have a "hostname" field.
+We will assume that either hostname fields are present, or there is only a single node.
+*/
+
+import (
+ "encoding/json"
+ "fmt"
+ "net"
+ "strconv"
+)
+
+func ParsePoolServices(jsonInput string) (*PoolServices, error) {
+ ps := &PoolServices{}
+ err := json.Unmarshal([]byte(jsonInput), ps)
+ return ps, err
+}
+
+// Accepts a "host:port" string representing the KV TCP port and the pools
+// nodeServices payload and returns a host:port string representing the KV
+// TLS port on the same node as the KV TCP port.
+// Returns the original host:port if in case of local communication (services
+// on the same node as source)
+func MapKVtoSSL(hostport string, ps *PoolServices) (string, bool, error) {
+ return MapKVtoSSLExt(hostport, ps, false)
+}
+
+func MapKVtoSSLExt(hostport string, ps *PoolServices, force bool) (string, bool, error) {
+ host, port, err := net.SplitHostPort(hostport)
+ if err != nil {
+ return "", false, fmt.Errorf("Unable to split hostport %s: %v", hostport, err)
+ }
+
+ portInt, err := strconv.Atoi(port)
+ if err != nil {
+ return "", false, fmt.Errorf("Unable to parse host/port combination %s: %v", hostport, err)
+ }
+
+ var ns *NodeServices
+ for i := range ps.NodesExt {
+ hostname := ps.NodesExt[i].Hostname
+ if len(hostname) != 0 && hostname != host {
+ /* If the hostname is the empty string, it means the node (and by extension
+ the cluster) is configured on the loopback. Further, it means that the client
+ should use whatever hostname it used to get the nodeServices information in
+ the first place to access the cluster. Thus, when the hostname is empty in
+ the nodeService entry we can assume that client will use the hostname it used
+ to access the KV TCP endpoint - and thus that it automatically "matches".
+ If hostname is not empty and doesn't match then we move to the next entry.
+ */
+ continue
+ }
+ kvPort, found := ps.NodesExt[i].Services["kv"]
+ if !found {
+ /* not a node with a KV service */
+ continue
+ }
+ if kvPort == portInt {
+ ns = &(ps.NodesExt[i])
+ break
+ }
+ }
+
+ if ns == nil {
+ return "", false, fmt.Errorf("Unable to parse host/port combination %s: no matching node found among %d", hostport, len(ps.NodesExt))
+ }
+ kvSSL, found := ns.Services["kvSSL"]
+ if !found {
+ return "", false, fmt.Errorf("Unable to map host/port combination %s: target host has no kvSSL port listed", hostport)
+ }
+
+ //Don't encrypt for communication between local nodes
+ if !force && (len(ns.Hostname) == 0 || ns.ThisNode) {
+ return hostport, false, nil
+ }
+
+ ip := net.ParseIP(host)
+ if ip != nil && ip.To4() == nil && ip.To16() != nil { // IPv6 and not a FQDN
+ // Prefix and suffix square brackets as SplitHostPort removes them,
+ // see: https://golang.org/pkg/net/#SplitHostPort
+ host = "[" + host + "]"
+ }
+
+ return fmt.Sprintf("%s:%d", host, kvSSL), true, nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/streaming.go b/vendor/github.com/couchbase/go-couchbase/streaming.go
new file mode 100644
index 0000000000..ecf5be9932
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/streaming.go
@@ -0,0 +1,228 @@
+package couchbase
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/couchbase/goutils/logging"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "net"
+ "net/http"
+ "time"
+ "unsafe"
+)
+
+// Bucket auto-updater gets the latest version of the bucket config from
+// the server. If the configuration has changed then updated the local
+// bucket information. If the bucket has been deleted then notify anyone
+// who is holding a reference to this bucket
+
+const MAX_RETRY_COUNT = 5
+const DISCONNECT_PERIOD = 120 * time.Second
+
+type NotifyFn func(bucket string, err error)
+type StreamingFn func(bucket *Bucket)
+
+// Use TCP keepalive to detect half close sockets
+var updaterTransport http.RoundTripper = &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ Dial: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ }).Dial,
+}
+
+var updaterHTTPClient = &http.Client{Transport: updaterTransport}
+
+func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) {
+
+ var err error
+ var res *http.Response
+
+ for i := 0; i < HTTP_MAX_RETRY; i++ {
+ res, err = updaterHTTPClient.Do(req)
+ if err != nil && isHttpConnError(err) {
+ continue
+ }
+ break
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ return res, err
+}
+
+func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
+ b.RunBucketUpdater2(nil, notify)
+}
+
+func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {
+ go func() {
+ err := b.UpdateBucket2(streamingFn)
+ if err != nil {
+ if notify != nil {
+ notify(b.GetName(), err)
+ }
+ logging.Errorf(" Bucket Updater exited with err %v", err)
+ }
+ }()
+}
+
+func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) {
+ if !bucketLocked {
+ b.Lock()
+ defer b.Unlock()
+ }
+ old := b.connPools
+ b.connPools = unsafe.Pointer(&with)
+ if old != nil {
+ for _, pool := range *(*[]*connectionPool)(old) {
+ if pool != nil && pool.inUse == false {
+ pool.Close()
+ }
+ }
+ }
+ return
+}
+
+func (b *Bucket) UpdateBucket() error {
+ return b.UpdateBucket2(nil)
+}
+
+func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
+ var failures int
+ var returnErr error
+ var poolServices PoolServices
+
+ for {
+
+ if failures == MAX_RETRY_COUNT {
+ logging.Errorf(" Maximum failures reached. Exiting loop...")
+ return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
+ }
+
+ nodes := b.Nodes()
+ if len(nodes) < 1 {
+ return fmt.Errorf("No healthy nodes found")
+ }
+
+ startNode := rand.Intn(len(nodes))
+ node := nodes[(startNode)%len(nodes)]
+
+ streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, uriAdj(b.GetName()))
+ logging.Infof(" Trying with %s", streamUrl)
+ req, err := http.NewRequest("GET", streamUrl, nil)
+ if err != nil {
+ return err
+ }
+
+ // Lock here to avoid having pool closed under us.
+ b.RLock()
+ err = maybeAddAuth(req, b.pool.client.ah)
+ b.RUnlock()
+ if err != nil {
+ return err
+ }
+
+ res, err := doHTTPRequestForUpdate(req)
+ if err != nil {
+ return err
+ }
+
+ if res.StatusCode != 200 {
+ bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
+ logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
+ res.Body.Close()
+ returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
+ failures++
+ continue
+ }
+
+ dec := json.NewDecoder(res.Body)
+
+ tmpb := &Bucket{}
+ for {
+
+ err := dec.Decode(&tmpb)
+ if err != nil {
+ returnErr = err
+ res.Body.Close()
+ break
+ }
+
+ // if we got here, reset failure count
+ failures = 0
+
+ if b.pool.client.tlsConfig != nil {
+ poolServices, err = b.pool.client.GetPoolServices("default")
+ if err != nil {
+ returnErr = err
+ res.Body.Close()
+ break
+ }
+ }
+
+ b.Lock()
+
+ // mark all the old connection pools for deletion
+ pools := b.getConnPools(true /* already locked */)
+ for _, pool := range pools {
+ if pool != nil {
+ pool.inUse = false
+ }
+ }
+
+ newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
+ for i := range newcps {
+ // get the old connection pool and check if it is still valid
+ pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
+ if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig {
+ // if the hostname and index is unchanged then reuse this pool
+ newcps[i] = pool
+ pool.inUse = true
+ continue
+ }
+ // else create a new pool
+ var encrypted bool
+ hostport := tmpb.VBSMJson.ServerList[i]
+ if b.pool.client.tlsConfig != nil {
+ hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
+ if err != nil {
+ b.Unlock()
+ return err
+ }
+ }
+ if b.ah != nil {
+ newcps[i] = newConnectionPool(hostport,
+ b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
+
+ } else {
+ newcps[i] = newConnectionPool(hostport,
+ b.authHandler(true /* bucket already locked */),
+ false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
+ }
+ }
+
+ b.replaceConnPools2(newcps, true /* bucket already locked */)
+
+ tmpb.ah = b.ah
+ b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
+ b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
+ b.Unlock()
+
+ if streamingFn != nil {
+ streamingFn(tmpb)
+ }
+ logging.Debugf("Got new configuration for bucket %s", b.GetName())
+
+ }
+ // we are here because of an error
+ failures++
+ continue
+
+ }
+ return nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/tap.go b/vendor/github.com/couchbase/go-couchbase/tap.go
new file mode 100644
index 0000000000..86edd30554
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/tap.go
@@ -0,0 +1,143 @@
+package couchbase
+
+import (
+ "github.com/couchbase/gomemcached/client"
+ "github.com/couchbase/goutils/logging"
+ "sync"
+ "time"
+)
+
+const initialRetryInterval = 1 * time.Second
+const maximumRetryInterval = 30 * time.Second
+
+// A TapFeed streams mutation events from a bucket.
+//
+// Events from the bucket can be read from the channel 'C'. Remember
+// to call Close() on it when you're done, unless its channel has
+// closed itself already.
+type TapFeed struct {
+ C <-chan memcached.TapEvent
+
+ bucket *Bucket
+ args *memcached.TapArguments
+ nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes
+ output chan memcached.TapEvent // Same as C but writeably-typed
+ wg sync.WaitGroup
+ quit chan bool
+}
+
+// StartTapFeed creates and starts a new Tap feed
+func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
+ if args == nil {
+ defaultArgs := memcached.DefaultTapArguments()
+ args = &defaultArgs
+ }
+
+ feed := &TapFeed{
+ bucket: b,
+ args: args,
+ output: make(chan memcached.TapEvent, 10),
+ quit: make(chan bool),
+ }
+
+ go feed.run()
+
+ feed.C = feed.output
+ return feed, nil
+}
+
+// Goroutine that runs the feed
+func (feed *TapFeed) run() {
+ retryInterval := initialRetryInterval
+ bucketOK := true
+ for {
+ // Connect to the TAP feed of each server node:
+ if bucketOK {
+ killSwitch, err := feed.connectToNodes()
+ if err == nil {
+ // Run until one of the sub-feeds fails:
+ select {
+ case <-killSwitch:
+ case <-feed.quit:
+ return
+ }
+ feed.closeNodeFeeds()
+ retryInterval = initialRetryInterval
+ }
+ }
+
+ // On error, try to refresh the bucket in case the list of nodes changed:
+ logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
+ feed.bucket.Name, retryInterval)
+ err := feed.bucket.Refresh()
+ bucketOK = err == nil
+
+ select {
+ case <-time.After(retryInterval):
+ case <-feed.quit:
+ return
+ }
+ if retryInterval *= 2; retryInterval > maximumRetryInterval {
+ retryInterval = maximumRetryInterval
+ }
+ }
+}
+
+func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
+ killSwitch = make(chan bool)
+ for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
+ var singleFeed *memcached.TapFeed
+ singleFeed, err = serverConn.StartTapFeed(feed.args)
+ if err != nil {
+ logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
+ feed.closeNodeFeeds()
+ return
+ }
+ feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
+ go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
+ feed.wg.Add(1)
+ }
+ return
+}
+
+// Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
+func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
+ defer feed.wg.Done()
+ for {
+ select {
+ case event, ok := <-singleFeed.C:
+ if !ok {
+ if singleFeed.Error != nil {
+ logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
+ }
+ killSwitch <- true
+ return
+ }
+ feed.output <- event
+ case <-feed.quit:
+ return
+ }
+ }
+}
+
+func (feed *TapFeed) closeNodeFeeds() {
+ for _, f := range feed.nodeFeeds {
+ f.Close()
+ }
+ feed.nodeFeeds = nil
+}
+
+// Close a Tap feed.
+func (feed *TapFeed) Close() error {
+ select {
+ case <-feed.quit:
+ return nil
+ default:
+ }
+
+ feed.closeNodeFeeds()
+ close(feed.quit)
+ feed.wg.Wait()
+ close(feed.output)
+ return nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/upr.go b/vendor/github.com/couchbase/go-couchbase/upr.go
new file mode 100644
index 0000000000..844bf91510
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/upr.go
@@ -0,0 +1,399 @@
+package couchbase
+
+import (
+ "log"
+ "sync"
+ "time"
+
+ "fmt"
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/gomemcached/client"
+ "github.com/couchbase/goutils/logging"
+)
+
+// A UprFeed streams mutation events from a bucket.
+//
+// Events from the bucket can be read from the channel 'C'. Remember
+// to call Close() on it when you're done, unless its channel has
+// closed itself already.
+type UprFeed struct {
+ C <-chan *memcached.UprEvent
+
+ bucket *Bucket
+ nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes
+ output chan *memcached.UprEvent // Same as C but writeably-typed
+ outputClosed bool
+ quit chan bool
+ name string // name of this UPR feed
+ sequence uint32 // sequence number for this feed
+ connected bool
+ killSwitch chan bool
+ closing bool
+ wg sync.WaitGroup
+ dcp_buffer_size uint32
+ data_chan_size int
+}
+
+// UprFeed from a single connection
+type FeedInfo struct {
+ uprFeed *memcached.UprFeed // UPR feed handle
+ host string // hostname
+ connected bool // connected
+ quit chan bool // quit channel
+}
+
+type FailoverLog map[uint16]memcached.FailoverLog
+
+// GetFailoverLogs, get the failover logs for a set of vbucket ids
+func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {
+
+ // map vbids to their corresponding hosts
+ vbHostList := make(map[string][]uint16)
+ vbm := b.VBServerMap()
+ if len(vbm.VBucketMap) < len(vBuckets) {
+ return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
+ vbm.VBucketMap, vBuckets)
+ }
+
+ for _, vb := range vBuckets {
+ masterID := vbm.VBucketMap[vb][0]
+ master := b.getMasterNode(masterID)
+ if master == "" {
+ return nil, fmt.Errorf("No master found for vb %d", vb)
+ }
+
+ vbList := vbHostList[master]
+ if vbList == nil {
+ vbList = make([]uint16, 0)
+ }
+ vbList = append(vbList, vb)
+ vbHostList[master] = vbList
+ }
+
+ failoverLogMap := make(FailoverLog)
+ for _, serverConn := range b.getConnPools(false /* not already locked */) {
+
+ vbList := vbHostList[serverConn.host]
+ if vbList == nil {
+ continue
+ }
+
+ mc, err := serverConn.Get()
+ if err != nil {
+ logging.Infof("No Free connections for vblist %v", vbList)
+ return nil, fmt.Errorf("No Free connections for host %s",
+ serverConn.host)
+
+ }
+ // close the connection so that it doesn't get reused for upr data
+ // connection
+ defer mc.Close()
+ mc.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
+ failoverlogs, err := mc.UprGetFailoverLog(vbList)
+ if err != nil {
+ return nil, fmt.Errorf("Error getting failover log %s host %s",
+ err.Error(), serverConn.host)
+
+ }
+
+ for vb, log := range failoverlogs {
+ failoverLogMap[vb] = *log
+ }
+ }
+
+ return failoverLogMap, nil
+}
+
+func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
+ return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
+}
+
+// StartUprFeed creates and starts a new Upr feed
+// No data will be sent on the channel unless vbuckets streams are requested
+func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {
+
+ feed := &UprFeed{
+ bucket: b,
+ output: make(chan *memcached.UprEvent, data_chan_size),
+ quit: make(chan bool),
+ nodeFeeds: make(map[string]*FeedInfo, 0),
+ name: name,
+ sequence: sequence,
+ killSwitch: make(chan bool),
+ dcp_buffer_size: dcp_buffer_size,
+ data_chan_size: data_chan_size,
+ }
+
+ err := feed.connectToNodes()
+ if err != nil {
+ return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
+ }
+ feed.connected = true
+ go feed.run()
+
+ feed.C = feed.output
+ return feed, nil
+}
+
+// UprRequestStream starts a stream for a vb on a feed
+func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
+ vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
+
+ defer func() {
+ if r := recover(); r != nil {
+ log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
+ }
+ }()
+
+ vbm := feed.bucket.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
+ vb, vbm.VBucketMap)
+ }
+
+ if int(vb) >= len(vbm.VBucketMap) {
+ return fmt.Errorf("Invalid vbucket id %d", vb)
+ }
+
+ masterID := vbm.VBucketMap[vb][0]
+ master := feed.bucket.getMasterNode(masterID)
+ if master == "" {
+ return fmt.Errorf("Master node not found for vbucket %d", vb)
+ }
+ singleFeed := feed.nodeFeeds[master]
+ if singleFeed == nil {
+ return fmt.Errorf("UprFeed for this host not found")
+ }
+
+ if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
+ vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// UprCloseStream ends a vbucket stream.
+func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {
+
+ defer func() {
+ if r := recover(); r != nil {
+ log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
+ }
+ }()
+
+ vbm := feed.bucket.VBServerMap()
+ if len(vbm.VBucketMap) < int(vb) {
+ return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
+ vb, vbm.VBucketMap)
+ }
+
+ if int(vb) >= len(vbm.VBucketMap) {
+ return fmt.Errorf("Invalid vbucket id %d", vb)
+ }
+
+ masterID := vbm.VBucketMap[vb][0]
+ master := feed.bucket.getMasterNode(masterID)
+ if master == "" {
+ return fmt.Errorf("Master node not found for vbucket %d", vb)
+ }
+ singleFeed := feed.nodeFeeds[master]
+ if singleFeed == nil {
+ return fmt.Errorf("UprFeed for this host not found")
+ }
+
+ if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Goroutine that runs the feed
+func (feed *UprFeed) run() {
+ retryInterval := initialRetryInterval
+ bucketOK := true
+ for {
+ // Connect to the UPR feed of each server node:
+ if bucketOK {
+ // Run until one of the sub-feeds fails:
+ select {
+ case <-feed.killSwitch:
+ case <-feed.quit:
+ return
+ }
+ //feed.closeNodeFeeds()
+ retryInterval = initialRetryInterval
+ }
+
+ if feed.closing == true {
+ // we have been asked to shut down
+ return
+ }
+
+ // On error, try to refresh the bucket in case the list of nodes changed:
+ logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
+ feed.bucket.Name, retryInterval)
+
+ if err := feed.bucket.Refresh(); err != nil {
+ // if we fail to refresh the bucket, exit the feed
+ // MB-14917
+ logging.Infof("Unable to refresh bucket %s ", err.Error())
+ close(feed.output)
+ feed.outputClosed = true
+ feed.closeNodeFeeds()
+ return
+ }
+
+ // this will only connect to nodes that are not connected or changed
+ // user will have to reconnect the stream
+ err := feed.connectToNodes()
+ if err != nil {
+ logging.Infof("Unable to connect to nodes..exit ")
+ close(feed.output)
+ feed.outputClosed = true
+ feed.closeNodeFeeds()
+ return
+ }
+ bucketOK = err == nil
+
+ select {
+ case <-time.After(retryInterval):
+ case <-feed.quit:
+ return
+ }
+ if retryInterval *= 2; retryInterval > maximumRetryInterval {
+ retryInterval = maximumRetryInterval
+ }
+ }
+}
+
+func (feed *UprFeed) connectToNodes() (err error) {
+ nodeCount := 0
+ for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
+
+ // this maybe a reconnection, so check if the connection to the node
+ // already exists. Connect only if the node is not found in the list
+ // or connected == false
+ nodeFeed := feed.nodeFeeds[serverConn.host]
+
+ if nodeFeed != nil && nodeFeed.connected == true {
+ continue
+ }
+
+ var singleFeed *memcached.UprFeed
+ var name string
+ if feed.name == "" {
+ name = "DefaultUprClient"
+ } else {
+ name = feed.name
+ }
+ singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
+ if err != nil {
+ logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
+ feed.closeNodeFeeds()
+ return
+ }
+ // add the node to the connection map
+ feedInfo := &FeedInfo{
+ uprFeed: singleFeed,
+ connected: true,
+ host: serverConn.host,
+ quit: make(chan bool),
+ }
+ feed.nodeFeeds[serverConn.host] = feedInfo
+ go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
+ feed.wg.Add(1)
+ nodeCount++
+ }
+ if nodeCount == 0 {
+ return fmt.Errorf("No connection to bucket")
+ }
+
+ return nil
+}
+
+// Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
+func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
+ singleFeed := nodeFeed.uprFeed
+
+ defer func() {
+ feed.wg.Done()
+ if r := recover(); r != nil {
+ //if feed is not closing, re-throw the panic
+ if feed.outputClosed != true && feed.closing != true {
+ panic(r)
+ } else {
+ logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")
+
+ }
+ }
+ }()
+
+ for {
+ select {
+ case <-nodeFeed.quit:
+ nodeFeed.connected = false
+ return
+
+ case event, ok := <-singleFeed.C:
+ if !ok {
+ if singleFeed.Error != nil {
+ logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
+ }
+ killSwitch <- true
+ return
+ }
+ if feed.outputClosed == true {
+ // someone closed the node feed
+ logging.Infof("Node need closed, returning from forwardUprEvent")
+ return
+ }
+ feed.output <- event
+ if event.Status == gomemcached.NOT_MY_VBUCKET {
+ logging.Infof(" Got a not my vbucket error !! ")
+ if err := feed.bucket.Refresh(); err != nil {
+ logging.Errorf("Unable to refresh bucket %s ", err.Error())
+ feed.closeNodeFeeds()
+ return
+ }
+ // this will only connect to nodes that are not connected or changed
+ // user will have to reconnect the stream
+ if err := feed.connectToNodes(); err != nil {
+ logging.Errorf("Unable to connect to nodes %s", err.Error())
+ return
+ }
+
+ }
+ }
+ }
+}
+
+func (feed *UprFeed) closeNodeFeeds() {
+ for _, f := range feed.nodeFeeds {
+ logging.Infof(" Sending close to forwardUprEvent ")
+ close(f.quit)
+ f.uprFeed.Close()
+ }
+ feed.nodeFeeds = nil
+}
+
+// Close a Upr feed.
+func (feed *UprFeed) Close() error {
+ select {
+ case <-feed.quit:
+ return nil
+ default:
+ }
+
+ feed.closing = true
+ feed.closeNodeFeeds()
+ close(feed.quit)
+
+ feed.wg.Wait()
+ if feed.outputClosed == false {
+ feed.outputClosed = true
+ close(feed.output)
+ }
+
+ return nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/users.go b/vendor/github.com/couchbase/go-couchbase/users.go
new file mode 100644
index 0000000000..4e8f962908
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/users.go
@@ -0,0 +1,121 @@
+package couchbase
+
+import (
+ "bytes"
+ "fmt"
+)
+
+type User struct {
+ Name string
+ Id string
+ Domain string
+ Roles []Role
+}
+
+type Role struct {
+ Role string
+ BucketName string `json:"bucket_name"`
+ ScopeName string `json:"scope_name"`
+ CollectionName string `json:"collection_name"`
+}
+
+// Sample:
+// {"role":"admin","name":"Admin","desc":"Can manage ALL cluster features including security.","ce":true}
+// {"role":"query_select","bucket_name":"*","name":"Query Select","desc":"Can execute SELECT statement on bucket to retrieve data"}
+type RoleDescription struct {
+ Role string
+ Name string
+ Desc string
+ Ce bool
+ BucketName string `json:"bucket_name"`
+}
+
+// Return user-role data, as parsed JSON.
+// Sample:
+// [{"id":"ivanivanov","name":"Ivan Ivanov","roles":[{"role":"cluster_admin"},{"bucket_name":"default","role":"bucket_admin"}]},
+// {"id":"petrpetrov","name":"Petr Petrov","roles":[{"role":"replication_admin"}]}]
+func (c *Client) GetUserRoles() ([]interface{}, error) {
+ ret := make([]interface{}, 0, 1)
+ err := c.parseURLResponse("/settings/rbac/users", &ret)
+ if err != nil {
+ return nil, err
+ }
+
+ // Get the configured administrator.
+ // Expected result: {"port":8091,"username":"Administrator"}
+ adminInfo := make(map[string]interface{}, 2)
+ err = c.parseURLResponse("/settings/web", &adminInfo)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create a special entry for the configured administrator.
+ adminResult := map[string]interface{}{
+ "name": adminInfo["username"],
+ "id": adminInfo["username"],
+ "domain": "ns_server",
+ "roles": []interface{}{
+ map[string]interface{}{
+ "role": "admin",
+ },
+ },
+ }
+
+ // Add the configured administrator to the list of results.
+ ret = append(ret, adminResult)
+
+ return ret, nil
+}
+
+func (c *Client) GetUserInfoAll() ([]User, error) {
+ ret := make([]User, 0, 16)
+ err := c.parseURLResponse("/settings/rbac/users", &ret)
+ if err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
+
+func rolesToParamFormat(roles []Role) string {
+ var buffer bytes.Buffer
+ for i, role := range roles {
+ if i > 0 {
+ buffer.WriteString(",")
+ }
+ buffer.WriteString(role.Role)
+ if role.BucketName != "" {
+ buffer.WriteString("[")
+ buffer.WriteString(role.BucketName)
+ buffer.WriteString("]")
+ }
+ }
+ return buffer.String()
+}
+
+func (c *Client) PutUserInfo(u *User) error {
+ params := map[string]interface{}{
+ "name": u.Name,
+ "roles": rolesToParamFormat(u.Roles),
+ }
+ var target string
+ switch u.Domain {
+ case "external":
+ target = "/settings/rbac/users/" + u.Id
+ case "local":
+ target = "/settings/rbac/users/local/" + u.Id
+ default:
+ return fmt.Errorf("Unknown user type: %s", u.Domain)
+ }
+ var ret string // PUT returns an empty string. We ignore it.
+ err := c.parsePutURLResponse(target, params, &ret)
+ return err
+}
+
+func (c *Client) GetRolesAll() ([]RoleDescription, error) {
+ ret := make([]RoleDescription, 0, 32)
+ err := c.parseURLResponse("/settings/rbac/roles", &ret)
+ if err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/util.go b/vendor/github.com/couchbase/go-couchbase/util.go
new file mode 100644
index 0000000000..4d286a3271
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/util.go
@@ -0,0 +1,49 @@
+package couchbase
+
+import (
+ "fmt"
+ "net/url"
+ "strings"
+)
+
+// CleanupHost returns the hostname with the given suffix removed.
+func CleanupHost(h, commonSuffix string) string {
+ if strings.HasSuffix(h, commonSuffix) {
+ return h[:len(h)-len(commonSuffix)]
+ }
+ return h
+}
+
+// FindCommonSuffix returns the longest common suffix from the given
+// strings.
+func FindCommonSuffix(input []string) string {
+ rv := ""
+ if len(input) < 2 {
+ return ""
+ }
+ from := input
+ for i := len(input[0]); i > 0; i-- {
+ common := true
+ suffix := input[0][i:]
+ for _, s := range from {
+ if !strings.HasSuffix(s, suffix) {
+ common = false
+ break
+ }
+ }
+ if common {
+ rv = suffix
+ }
+ }
+ return rv
+}
+
+// ParseURL is a wrapper around url.Parse with some sanity-checking
+func ParseURL(urlStr string) (result *url.URL, err error) {
+ result, err = url.Parse(urlStr)
+ if result != nil && result.Scheme == "" {
+ result = nil
+ err = fmt.Errorf("invalid URL <%s>", urlStr)
+ }
+ return
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/vbmap.go b/vendor/github.com/couchbase/go-couchbase/vbmap.go
new file mode 100644
index 0000000000..b96a18ed57
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/vbmap.go
@@ -0,0 +1,77 @@
+package couchbase
+
+var crc32tab = []uint32{
+ 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
+ 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
+ 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
+ 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
+ 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
+ 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
+ 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
+ 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
+ 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
+ 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
+ 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
+ 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
+ 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
+ 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
+ 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
+ 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
+ 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
+ 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
+ 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
+ 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
+ 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
+ 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
+ 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
+ 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
+ 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
+ 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
+ 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
+ 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
+ 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
+ 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
+ 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
+ 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
+ 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
+ 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
+ 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
+ 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
+ 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
+ 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
+ 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
+ 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
+ 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
+ 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
+ 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
+ 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
+ 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
+ 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
+ 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
+ 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
+ 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
+ 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
+ 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
+ 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
+ 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
+ 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
+ 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
+ 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
+ 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
+ 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
+ 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
+ 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
+ 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
+ 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
+ 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
+ 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d}
+
+// VBHash finds the vbucket for the given key.
+func (b *Bucket) VBHash(key string) uint32 {
+ crc := uint32(0xffffffff)
+ for x := 0; x < len(key); x++ {
+ crc = (crc >> 8) ^ crc32tab[(uint64(crc)^uint64(key[x]))&0xff]
+ }
+ vbm := b.VBServerMap()
+ return ((^crc) >> 16) & 0x7fff & (uint32(len(vbm.VBucketMap)) - 1)
+}
diff --git a/vendor/github.com/couchbase/go-couchbase/views.go b/vendor/github.com/couchbase/go-couchbase/views.go
new file mode 100644
index 0000000000..2f68642f5a
--- /dev/null
+++ b/vendor/github.com/couchbase/go-couchbase/views.go
@@ -0,0 +1,231 @@
+package couchbase
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "net/http"
+ "net/url"
+ "time"
+)
+
+// ViewRow represents a single result from a view.
+//
+// Doc is present only if include_docs was set on the request.
+type ViewRow struct {
+ ID string
+ Key interface{}
+ Value interface{}
+ Doc *interface{}
+}
+
+// A ViewError is a node-specific error indicating a partial failure
+// within a view result.
+type ViewError struct {
+ From string
+ Reason string
+}
+
+func (ve ViewError) Error() string {
+ return "Node: " + ve.From + ", reason: " + ve.Reason
+}
+
+// ViewResult holds the entire result set from a view request,
+// including the rows and the errors.
+type ViewResult struct {
+ TotalRows int `json:"total_rows"`
+ Rows []ViewRow
+ Errors []ViewError
+}
+
+func (b *Bucket) randomBaseURL() (*url.URL, error) {
+ nodes := b.HealthyNodes()
+ if len(nodes) == 0 {
+ return nil, errors.New("no available couch rest URLs")
+ }
+ nodeNo := rand.Intn(len(nodes))
+ node := nodes[nodeNo]
+
+ b.RLock()
+ name := b.Name
+ pool := b.pool
+ b.RUnlock()
+
+ u, err := ParseURL(node.CouchAPIBase)
+ if err != nil {
+ return nil, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
+ name, nodeNo, node.CouchAPIBase, err)
+ } else if pool != nil {
+ u.User = pool.client.BaseURL.User
+ }
+ return u, err
+}
+
+const START_NODE_ID = -1
+
+func (b *Bucket) randomNextURL(lastNode int) (*url.URL, int, error) {
+ nodes := b.HealthyNodes()
+ if len(nodes) == 0 {
+ return nil, -1, errors.New("no available couch rest URLs")
+ }
+
+ var nodeNo int
+ if lastNode == START_NODE_ID || lastNode >= len(nodes) {
+ // randomly select a node if the value of lastNode is invalid
+ nodeNo = rand.Intn(len(nodes))
+ } else {
+ // wrap around the node list
+ nodeNo = (lastNode + 1) % len(nodes)
+ }
+
+ b.RLock()
+ name := b.Name
+ pool := b.pool
+ b.RUnlock()
+
+ node := nodes[nodeNo]
+ u, err := ParseURL(node.CouchAPIBase)
+ if err != nil {
+ return nil, -1, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
+ name, nodeNo, node.CouchAPIBase, err)
+ } else if pool != nil {
+ u.User = pool.client.BaseURL.User
+ }
+ return u, nodeNo, err
+}
+
+// DocID is the document ID type for the startkey_docid parameter in
+// views.
+type DocID string
+
+func qParam(k, v string) string {
+ format := `"%s"`
+ switch k {
+ case "startkey_docid", "endkey_docid", "stale":
+ format = "%s"
+ }
+ return fmt.Sprintf(format, v)
+}
+
+// ViewURL constructs a URL for a view with the given ddoc, view name,
+// and parameters.
+func (b *Bucket) ViewURL(ddoc, name string,
+ params map[string]interface{}) (string, error) {
+ u, err := b.randomBaseURL()
+ if err != nil {
+ return "", err
+ }
+
+ values := url.Values{}
+ for k, v := range params {
+ switch t := v.(type) {
+ case DocID:
+ values[k] = []string{string(t)}
+ case string:
+ values[k] = []string{qParam(k, t)}
+ case int:
+ values[k] = []string{fmt.Sprintf(`%d`, t)}
+ case bool:
+ values[k] = []string{fmt.Sprintf(`%v`, t)}
+ default:
+ b, err := json.Marshal(v)
+ if err != nil {
+ return "", fmt.Errorf("unsupported value-type %T in Query, "+
+ "json encoder said %v", t, err)
+ }
+ values[k] = []string{fmt.Sprintf(`%v`, string(b))}
+ }
+ }
+
+ if ddoc == "" && name == "_all_docs" {
+ u.Path = fmt.Sprintf("/%s/_all_docs", b.GetName())
+ } else {
+ u.Path = fmt.Sprintf("/%s/_design/%s/_view/%s", b.GetName(), ddoc, name)
+ }
+ u.RawQuery = values.Encode()
+
+ return u.String(), nil
+}
+
+// ViewCallback is called for each view invocation.
+var ViewCallback func(ddoc, name string, start time.Time, err error)
+
+// ViewCustom performs a view request that can map row values to a
+// custom type.
+//
+// See the source to View for an example usage.
+func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{},
+ vres interface{}) (err error) {
+ if SlowServerCallWarningThreshold > 0 {
+ defer slowLog(time.Now(), "call to ViewCustom(%q, %q)", ddoc, name)
+ }
+
+ if ViewCallback != nil {
+ defer func(t time.Time) { ViewCallback(ddoc, name, t, err) }(time.Now())
+ }
+
+ u, err := b.ViewURL(ddoc, name, params)
+ if err != nil {
+ return err
+ }
+
+ req, err := http.NewRequest("GET", u, nil)
+ if err != nil {
+ return err
+ }
+
+ ah := b.authHandler(false /* bucket not yet locked */)
+ maybeAddAuth(req, ah)
+
+ res, err := doHTTPRequest(req)
+ if err != nil {
+ return fmt.Errorf("error starting view req at %v: %v", u, err)
+ }
+ defer res.Body.Close()
+
+ if res.StatusCode != 200 {
+ bod := make([]byte, 512)
+ l, _ := res.Body.Read(bod)
+ return fmt.Errorf("error executing view req at %v: %v - %s",
+ u, res.Status, bod[:l])
+ }
+
+ body, err := ioutil.ReadAll(res.Body)
+ if err := json.Unmarshal(body, vres); err != nil {
+ return nil
+ }
+
+ return nil
+}
+
+// View executes a view.
+//
+// The ddoc parameter is just the bare name of your design doc without
+// the "_design/" prefix.
+//
+// Parameters are string keys with values that correspond to couchbase
+// view parameters. Primitive should work fairly naturally (booleans,
+// ints, strings, etc...) and other values will attempt to be JSON
+// marshaled (useful for array indexing on on view keys, for example).
+//
+// Example:
+//
+// res, err := couchbase.View("myddoc", "myview", map[string]interface{}{
+// "group_level": 2,
+// "startkey_docid": []interface{}{"thing"},
+// "endkey_docid": []interface{}{"thing", map[string]string{}},
+// "stale": false,
+// })
+func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error) {
+ vres := ViewResult{}
+
+ if err := b.ViewCustom(ddoc, name, params, &vres); err != nil {
+ //error in accessing views. Retry once after a bucket refresh
+ b.Refresh()
+ return vres, b.ViewCustom(ddoc, name, params, &vres)
+ } else {
+ return vres, nil
+ }
+}