summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbaselabs
diff options
context:
space:
mode:
author6543 <6543@obermui.de>2020-11-03 07:04:09 +0100
committerGitHub <noreply@github.com>2020-11-03 08:04:09 +0200
commit70ea2300ca24311f033d85b41e938e86b1d50acd (patch)
treeeb25c089d5def4df2c036ab2820df7c895798572 /vendor/github.com/couchbaselabs
parentb687707014b31d0f388d1dfb60c09b5dcd48fc4c (diff)
downloadgitea-70ea2300ca24311f033d85b41e938e86b1d50acd.tar.gz
gitea-70ea2300ca24311f033d85b41e938e86b1d50acd.zip
[Vendor] update macaron related (#13409)
* Vendor: update gitea.com/macaron/session to a177a270 * make vendor * Vendor: update gitea.com/macaron/macaron to 0db5d458 * make vendor * Vendor: update gitea.com/macaron/cache to 905232fb * make vendor * Vendor: update gitea.com/macaron/i18n to 4ca3dd0c * make vendor * Vendor: update gitea.com/macaron/gzip to efa5e847 * make vendor * Vendor: update gitea.com/macaron/captcha to e8597820 * make vendor
Diffstat (limited to 'vendor/github.com/couchbaselabs')
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/.gitignore14
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/.travis.yml5
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/LICENSE19
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/README.markdown37
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/audit.go32
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/client.go1477
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go410
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/ddocs.go288
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/observe.go300
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/pools.go1474
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/port_map.go84
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/streaming.go215
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/tap.go143
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/upr.go398
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/users.go119
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/util.go49
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/vbmap.go77
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/views.go231
18 files changed, 0 insertions, 5372 deletions
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/.gitignore b/vendor/github.com/couchbaselabs/go-couchbase/.gitignore
deleted file mode 100644
index eda885ce8d..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/.gitignore
+++ /dev/null
@@ -1,14 +0,0 @@
-#*
-*.6
-*.a
-*~
-*.swp
-/examples/basic/basic
-/hello/hello
-/populate/populate
-/tools/view2go/view2go
-/tools/loadfile/loadfile
-gotags.files
-TAGS
-6.out
-_*
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/.travis.yml b/vendor/github.com/couchbaselabs/go-couchbase/.travis.yml
deleted file mode 100644
index 4ecafb1894..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/.travis.yml
+++ /dev/null
@@ -1,5 +0,0 @@
-language: go
-install: go get -v -d ./... && go build -v ./...
-script: go test -v ./...
-
-go: 1.1.1
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/LICENSE b/vendor/github.com/couchbaselabs/go-couchbase/LICENSE
deleted file mode 100644
index 0b23ef358e..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/LICENSE
+++ /dev/null
@@ -1,19 +0,0 @@
-Copyright (c) 2013 Couchbase, Inc.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
-of the Software, and to permit persons to whom the Software is furnished to do
-so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/README.markdown b/vendor/github.com/couchbaselabs/go-couchbase/README.markdown
deleted file mode 100644
index bf5fe49421..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/README.markdown
+++ /dev/null
@@ -1,37 +0,0 @@
-# A smart client for couchbase in go
-
-This is a *unoffical* version of a Couchbase Golang client. If you are
-looking for the *Offical* Couchbase Golang client please see
- [CB-go])[https://github.com/couchbaselabs/gocb].
-
-This is an evolving package, but does provide a useful interface to a
-[couchbase](http://www.couchbase.com/) server including all of the
-pool/bucket discovery features, compatible key distribution with other
-clients, and vbucket motion awareness so application can continue to
-operate during rebalances.
-
-It also supports view querying with source node randomization so you
-don't bang on all one node to do all the work.
-
-## Install
-
- go get github.com/couchbase/go-couchbase
-
-## Example
-
- c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/")
- if err != nil {
- log.Fatalf("Error connecting: %v", err)
- }
-
- pool, err := c.GetPool("default")
- if err != nil {
- log.Fatalf("Error getting pool: %v", err)
- }
-
- bucket, err := pool.GetBucket("default")
- if err != nil {
- log.Fatalf("Error getting bucket: %v", err)
- }
-
- bucket.Set("someKey", 0, []string{"an", "example", "list"})
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/audit.go b/vendor/github.com/couchbaselabs/go-couchbase/audit.go
deleted file mode 100644
index 3db7d9f9ff..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/audit.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package couchbase
-
-import ()
-
-// Sample data:
-// {"disabled":["12333", "22244"],"uid":"132492431","auditdEnabled":true,
-// "disabledUsers":[{"name":"bill","domain":"local"},{"name":"bob","domain":"local"}],
-// "logPath":"/Users/johanlarson/Library/Application Support/Couchbase/var/lib/couchbase/logs",
-// "rotateInterval":86400,"rotateSize":20971520}
-type AuditSpec struct {
- Disabled []uint32 `json:"disabled"`
- Uid string `json:"uid"`
- AuditdEnabled bool `json:"auditdEnabled`
- DisabledUsers []AuditUser `json:"disabledUsers"`
- LogPath string `json:"logPath"`
- RotateInterval int64 `json:"rotateInterval"`
- RotateSize int64 `json:"rotateSize"`
-}
-
-type AuditUser struct {
- Name string `json:"name"`
- Domain string `json:"domain"`
-}
-
-func (c *Client) GetAuditSpec() (*AuditSpec, error) {
- ret := &AuditSpec{}
- err := c.parseURLResponse("/settings/audit", ret)
- if err != nil {
- return nil, err
- }
- return ret, nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/client.go b/vendor/github.com/couchbaselabs/go-couchbase/client.go
deleted file mode 100644
index 433b08ff02..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/client.go
+++ /dev/null
@@ -1,1477 +0,0 @@
-/*
-Package couchbase provides a smart client for go.
-
-Usage:
-
- client, err := couchbase.Connect("http://myserver:8091/")
- handleError(err)
- pool, err := client.GetPool("default")
- handleError(err)
- bucket, err := pool.GetBucket("MyAwesomeBucket")
- handleError(err)
- ...
-
-or a shortcut for the bucket directly
-
- bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")
-
-in any case, you can specify authentication credentials using
-standard URL userinfo syntax:
-
- b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
- "default", "bucket")
-*/
-package couchbase
-
-import (
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "time"
- "unsafe"
-
- "github.com/couchbase/gomemcached"
- "github.com/couchbase/gomemcached/client" // package name is 'memcached'
- "github.com/couchbase/goutils/logging"
-)
-
-// Mutation Token
-type MutationToken struct {
- VBid uint16 // vbucket id
- Guard uint64 // vbuuid
- Value uint64 // sequence number
-}
-
-// Maximum number of times to retry a chunk of a bulk get on error.
-var MaxBulkRetries = 5000
-var backOffDuration time.Duration = 100 * time.Millisecond
-var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s)
-
-// If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call
-// takes longer than that.
-var SlowServerCallWarningThreshold time.Duration
-
-func slowLog(startTime time.Time, format string, args ...interface{}) {
- if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold {
- pc, _, _, _ := runtime.Caller(2)
- caller := runtime.FuncForPC(pc).Name()
- logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...)
- }
-}
-
-// Return true if error is KEY_ENOENT. Required by cbq-engine
-func IsKeyEExistsError(err error) bool {
-
- res, ok := err.(*gomemcached.MCResponse)
- if ok && res.Status == gomemcached.KEY_EEXISTS {
- return true
- }
-
- return false
-}
-
-// Return true if error is KEY_ENOENT. Required by cbq-engine
-func IsKeyNoEntError(err error) bool {
-
- res, ok := err.(*gomemcached.MCResponse)
- if ok && res.Status == gomemcached.KEY_ENOENT {
- return true
- }
-
- return false
-}
-
-// Return true if error suggests a bucket refresh is required. Required by cbq-engine
-func IsRefreshRequired(err error) bool {
-
- res, ok := err.(*gomemcached.MCResponse)
- if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) {
- return true
- }
-
- return false
-}
-
-// ClientOpCallback is called for each invocation of Do.
-var ClientOpCallback func(opname, k string, start time.Time, err error)
-
-// Do executes a function on a memcached connection to the node owning key "k"
-//
-// Note that this automatically handles transient errors by replaying
-// your function on a "not-my-vbucket" error, so don't assume
-// your command will only be executed only once.
-func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) {
- return b.Do2(k, f, true)
-}
-
-func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) {
- if SlowServerCallWarningThreshold > 0 {
- defer slowLog(time.Now(), "call to Do(%q)", k)
- }
-
- vb := b.VBHash(k)
- maxTries := len(b.Nodes()) * 2
- for i := 0; i < maxTries; i++ {
- conn, pool, err := b.getConnectionToVBucket(vb)
- if err != nil {
- if isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
- b.Refresh()
- continue
- }
- return err
- }
-
- if deadline && DefaultTimeout > 0 {
- conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
- err = f(conn, uint16(vb))
- conn.SetDeadline(noDeadline)
- } else {
- err = f(conn, uint16(vb))
- }
-
- var retry bool
- discard := isOutOfBoundsError(err)
-
- // MB-30967 / MB-31001 implement back off for transient errors
- if resp, ok := err.(*gomemcached.MCResponse); ok {
- switch resp.Status {
- case gomemcached.NOT_MY_VBUCKET:
- b.Refresh()
- // MB-28842: in case of NMVB, check if the node is still part of the map
- // and ditch the connection if it isn't.
- discard = b.checkVBmap(pool.Node())
- retry = true
- case gomemcached.NOT_SUPPORTED:
- discard = true
- retry = true
- case gomemcached.ENOMEM:
- fallthrough
- case gomemcached.TMPFAIL:
- retry = backOff(i, maxTries, backOffDuration, true)
- default:
- retry = false
- }
- } else if err != nil && isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
- retry = true
- }
-
- if discard {
- pool.Discard(conn)
- } else {
- pool.Return(conn)
- }
-
- if !retry {
- return err
- }
- }
-
- return fmt.Errorf("unable to complete action after %v attemps", maxTries)
-}
-
-type GatheredStats struct {
- Server string
- Stats map[string]string
- Err error
-}
-
-func getStatsParallel(sn string, b *Bucket, offset int, which string,
- ch chan<- GatheredStats) {
- pool := b.getConnPool(offset)
- var gatheredStats GatheredStats
-
- conn, err := pool.Get()
- defer func() {
- pool.Return(conn)
- ch <- gatheredStats
- }()
-
- if err != nil {
- gatheredStats = GatheredStats{Server: sn, Err: err}
- } else {
- sm, err := conn.StatsMap(which)
- gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err}
- }
-}
-
-// GetStats gets a set of stats from all servers.
-//
-// Returns a map of server ID -> map of stat key to map value.
-func (b *Bucket) GetStats(which string) map[string]map[string]string {
- rv := map[string]map[string]string{}
- for server, gs := range b.GatherStats(which) {
- if len(gs.Stats) > 0 {
- rv[server] = gs.Stats
- }
- }
- return rv
-}
-
-// GatherStats returns a map of server ID -> GatheredStats from all servers.
-func (b *Bucket) GatherStats(which string) map[string]GatheredStats {
- vsm := b.VBServerMap()
- if vsm.ServerList == nil {
- return nil
- }
-
- // Go grab all the things at once.
- ch := make(chan GatheredStats, len(vsm.ServerList))
- for i, sn := range vsm.ServerList {
- go getStatsParallel(sn, b, i, which, ch)
- }
-
- // Gather the results
- rv := map[string]GatheredStats{}
- for range vsm.ServerList {
- gs := <-ch
- rv[gs.Server] = gs
- }
- return rv
-}
-
-// Get bucket count through the bucket stats
-func (b *Bucket) GetCount(refresh bool) (count int64, err error) {
- if refresh {
- b.Refresh()
- }
-
- var cnt int64
- for _, gs := range b.GatherStats("") {
- if len(gs.Stats) > 0 {
- cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64)
- if err != nil {
- return 0, err
- }
- count += cnt
- }
- }
-
- return count, nil
-}
-
-// Get bucket document size through the bucket stats
-func (b *Bucket) GetSize(refresh bool) (size int64, err error) {
- if refresh {
- b.Refresh()
- }
-
- var sz int64
- for _, gs := range b.GatherStats("") {
- if len(gs.Stats) > 0 {
- sz, err = strconv.ParseInt(gs.Stats["ep_value_size"], 10, 64)
- if err != nil {
- return 0, err
- }
- size += sz
- }
- }
-
- return size, nil
-}
-
-func isAuthError(err error) bool {
- estr := err.Error()
- return strings.Contains(estr, "Auth failure")
-}
-
-func IsReadTimeOutError(err error) bool {
- estr := err.Error()
- return strings.Contains(estr, "read tcp") ||
- strings.Contains(estr, "i/o timeout")
-}
-
-func isTimeoutError(err error) bool {
- estr := err.Error()
- return strings.Contains(estr, "i/o timeout") ||
- strings.Contains(estr, "connection timed out") ||
- strings.Contains(estr, "no route to host")
-}
-
-// Errors that are not considered fatal for our fetch loop
-func isConnError(err error) bool {
- if err == io.EOF {
- return true
- }
- estr := err.Error()
- return strings.Contains(estr, "broken pipe") ||
- strings.Contains(estr, "connection reset") ||
- strings.Contains(estr, "connection refused") ||
- strings.Contains(estr, "connection pool is closed")
-}
-
-func isOutOfBoundsError(err error) bool {
- return err != nil && strings.Contains(err.Error(), "Out of Bounds error")
-
-}
-
-func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time {
- if reqDeadline.IsZero() && duration > 0 {
- return time.Now().Add(duration)
- }
- return reqDeadline
-}
-
-func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool {
- if attempt < maxAttempts {
- // 0th attempt return immediately
- if attempt > 0 {
- if exponential {
- duration = time.Duration(attempt) * duration
- }
- time.Sleep(duration)
- }
- return true
- }
-
- return false
-}
-
-func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time,
- ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
- eStatus *errorStatus) {
- if SlowServerCallWarningThreshold > 0 {
- defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys))
- }
-
- rv := _STRING_MCRESPONSE_POOL.Get()
- attempts := 0
- backOffAttempts := 0
- done := false
- bname := b.Name
- for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ {
-
- if len(b.VBServerMap().VBucketMap) < int(vb) {
- //fatal
- err := fmt.Errorf("vbmap smaller than requested for %v", bname)
- logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap))
- ech <- err
- return
- }
-
- masterID := b.VBServerMap().VBucketMap[vb][0]
-
- if masterID < 0 {
- // fatal
- err := fmt.Errorf("No master node available for %v vb %d", bname, vb)
- logging.Errorf("%v", err.Error())
- ech <- err
- return
- }
-
- // This stack frame exists to ensure we can clean up
- // connection at a reasonable time.
- err := func() error {
- pool := b.getConnPool(masterID)
- conn, err := pool.Get()
- if err != nil {
- if isAuthError(err) || isTimeoutError(err) {
- logging.Errorf("Fatal Error %v : %v", bname, err)
- ech <- err
- return err
- } else if isConnError(err) {
- if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
- logging.Errorf("Connection Error %v : %v", bname, err)
- ech <- err
- return err
- }
- b.Refresh()
- backOffAttempts++
- }
- logging.Infof("Pool Get returned %v: %v", bname, err)
- // retry
- return nil
- }
-
- conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
- err = conn.GetBulk(vb, keys, rv, subPaths)
- conn.SetDeadline(noDeadline)
-
- discard := false
- defer func() {
- if discard {
- pool.Discard(conn)
- } else {
- pool.Return(conn)
- }
- }()
-
- switch err.(type) {
- case *gomemcached.MCResponse:
- notSMaxTries := len(b.Nodes()) * 2
- st := err.(*gomemcached.MCResponse).Status
- if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) {
- b.Refresh()
- discard = b.checkVBmap(pool.Node())
- return nil // retry
- } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED {
- if (attempts % (MaxBulkRetries / 100)) == 0 {
- logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
- err.Error(), bname, vb, keys)
- }
- return nil // retry
- } else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
- // MB-30967 / MB-31001 use backoff for TMPFAIL too
- backOffAttempts++
- logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
- err.Error(), bname, vb, keys)
- return nil // retry
- }
- ech <- err
- return err
- case error:
- if isOutOfBoundsError(err) {
- // We got an out of bound error, retry the operation
- discard = true
- return nil
- } else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
- backOffAttempts++
- logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)",
- err.Error(), bname, vb, keys)
- discard = true
- b.Refresh()
- return nil // retry
- }
- ech <- err
- ch <- rv
- return err
- }
-
- done = true
- return nil
- }()
-
- if err != nil {
- return
- }
- }
-
- if attempts >= MaxBulkRetries {
- err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys)
- logging.Errorf("%v", err.Error())
- ech <- err
- }
-
- ch <- rv
-}
-
-type errorStatus struct {
- errStatus bool
-}
-
-type vbBulkGet struct {
- b *Bucket
- ch chan<- map[string]*gomemcached.MCResponse
- ech chan<- error
- k uint16
- keys []string
- reqDeadline time.Time
- wg *sync.WaitGroup
- subPaths []string
- groupError *errorStatus
-}
-
-const _NUM_CHANNELS = 5
-
-var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2
-var DefaultDialTimeout = time.Duration(0)
-var DefaultTimeout = time.Duration(0)
-var noDeadline = time.Time{}
-
-// Buffer 4k requests per worker
-var _VB_BULK_GET_CHANNELS []chan *vbBulkGet
-
-func InitBulkGet() {
-
- DefaultDialTimeout = 20 * time.Second
- DefaultTimeout = 120 * time.Second
-
- memcached.SetDefaultDialTimeout(DefaultDialTimeout)
-
- _VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS)
-
- for i := 0; i < _NUM_CHANNELS; i++ {
- channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS)
- _VB_BULK_GET_CHANNELS[i] = channel
-
- for j := 0; j < _NUM_CHANNEL_WORKERS; j++ {
- go vbBulkGetWorker(channel)
- }
- }
-}
-
-func vbBulkGetWorker(ch chan *vbBulkGet) {
- defer func() {
- // Workers cannot panic and die
- recover()
- go vbBulkGetWorker(ch)
- }()
-
- for vbg := range ch {
- vbDoBulkGet(vbg)
- }
-}
-
-func vbDoBulkGet(vbg *vbBulkGet) {
- defer vbg.wg.Done()
- defer func() {
- // Workers cannot panic and die
- recover()
- }()
- vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError)
-}
-
-var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.")
-
-func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time,
- ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
- eStatus *errorStatus) {
-
- defer close(ch)
- defer close(ech)
-
- wg := &sync.WaitGroup{}
-
- for k, keys := range kdm {
-
- // GetBulk() group has error donot Queue items for this group
- if eStatus.errStatus {
- break
- }
-
- vbg := &vbBulkGet{
- b: b,
- ch: ch,
- ech: ech,
- k: k,
- keys: keys,
- reqDeadline: reqDeadline,
- wg: wg,
- subPaths: subPaths,
- groupError: eStatus,
- }
-
- wg.Add(1)
-
- // Random int
- // Right shift to avoid 8-byte alignment, and take low bits
- c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS
-
- select {
- case _VB_BULK_GET_CHANNELS[c] <- vbg:
- // No-op
- default:
- // Buffer full, abandon the bulk get
- ech <- _ERR_CHAN_FULL
- wg.Add(-1)
- }
- }
-
- // Wait for my vb bulk gets
- wg.Wait()
-}
-
-type multiError []error
-
-func (m multiError) Error() string {
- if len(m) == 0 {
- panic("Error of none")
- }
-
- return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error())
-}
-
-// Convert a stream of errors from ech into a multiError (or nil) and
-// send down eout.
-//
-// At least one send is guaranteed on eout, but two is possible, so
-// buffer the out channel appropriately.
-func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) {
- defer func() { eout <- nil }()
- var errs multiError
- for e := range ech {
- if !eStatus.errStatus && !IsKeyNoEntError(e) {
- eStatus.errStatus = true
- }
-
- errs = append(errs, e)
- }
-
- if len(errs) > 0 {
- eout <- errs
- }
-}
-
-// Fetches multiple keys concurrently, with []byte values
-//
-// This is a wrapper around GetBulk which converts all values returned
-// by GetBulk from raw memcached responses into []byte slices.
-// Returns one document for duplicate keys
-func (b *Bucket) GetBulkRaw(keys []string) (map[string][]byte, error) {
-
- resp, eout := b.getBulk(keys, noDeadline, nil)
-
- rv := make(map[string][]byte, len(keys))
- for k, av := range resp {
- rv[k] = av.Body
- }
-
- b.ReleaseGetBulkPools(resp)
- return rv, eout
-
-}
-
-// GetBulk fetches multiple keys concurrently.
-//
-// Unlike more convenient GETs, the entire response is returned in the
-// map array for each key. Keys that were not found will not be included in
-// the map.
-
-func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) {
- return b.getBulk(keys, reqDeadline, subPaths)
-}
-
-func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) {
- _STRING_MCRESPONSE_POOL.Put(rv)
-}
-
-func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) {
- kdm := _VB_STRING_POOL.Get()
- defer _VB_STRING_POOL.Put(kdm)
- for _, k := range keys {
- if k != "" {
- vb := uint16(b.VBHash(k))
- a, ok1 := kdm[vb]
- if !ok1 {
- a = _STRING_POOL.Get()
- }
- kdm[vb] = append(a, k)
- }
- }
-
- eout := make(chan error, 2)
- groupErrorStatus := &errorStatus{}
-
- // processBulkGet will own both of these channels and
- // guarantee they're closed before it returns.
- ch := make(chan map[string]*gomemcached.MCResponse)
- ech := make(chan error)
-
- go errorCollector(ech, eout, groupErrorStatus)
- go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus)
-
- var rv map[string]*gomemcached.MCResponse
-
- for m := range ch {
- if rv == nil {
- rv = m
- continue
- }
-
- for k, v := range m {
- rv[k] = v
- }
- _STRING_MCRESPONSE_POOL.Put(m)
- }
-
- return rv, <-eout
-}
-
-// WriteOptions is the set of option flags availble for the Write
-// method. They are ORed together to specify the desired request.
-type WriteOptions int
-
-const (
- // Raw specifies that the value is raw []byte or nil; don't
- // JSON-encode it.
- Raw = WriteOptions(1 << iota)
- // AddOnly indicates an item should only be written if it
- // doesn't exist, otherwise ErrKeyExists is returned.
- AddOnly
- // Persist causes the operation to block until the server
- // confirms the item is persisted.
- Persist
- // Indexable causes the operation to block until it's availble via the index.
- Indexable
- // Append indicates the given value should be appended to the
- // existing value for the given key.
- Append
-)
-
-var optNames = []struct {
- opt WriteOptions
- name string
-}{
- {Raw, "raw"},
- {AddOnly, "addonly"}, {Persist, "persist"},
- {Indexable, "indexable"}, {Append, "append"},
-}
-
-// String representation of WriteOptions
-func (w WriteOptions) String() string {
- f := []string{}
- for _, on := range optNames {
- if w&on.opt != 0 {
- f = append(f, on.name)
- w &= ^on.opt
- }
- }
- if len(f) == 0 || w != 0 {
- f = append(f, fmt.Sprintf("0x%x", int(w)))
- }
- return strings.Join(f, "|")
-}
-
-// Error returned from Write with AddOnly flag, when key already exists in the bucket.
-var ErrKeyExists = errors.New("key exists")
-
-// General-purpose value setter.
-//
-// The Set, Add and Delete methods are just wrappers around this. The
-// interpretation of `v` depends on whether the `Raw` option is
-// given. If it is, v must be a byte array or nil. (A nil value causes
-// a delete.) If `Raw` is not given, `v` will be marshaled as JSON
-// before being written. It must be JSON-marshalable and it must not
-// be nil.
-func (b *Bucket) Write(k string, flags, exp int, v interface{},
- opt WriteOptions) (err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) {
- ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
- }(time.Now())
- }
-
- var data []byte
- if opt&Raw == 0 {
- data, err = json.Marshal(v)
- if err != nil {
- return err
- }
- } else if v != nil {
- data = v.([]byte)
- }
-
- var res *gomemcached.MCResponse
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- if opt&AddOnly != 0 {
- res, err = memcached.UnwrapMemcachedError(
- mc.Add(vb, k, flags, exp, data))
- if err == nil && res.Status != gomemcached.SUCCESS {
- if res.Status == gomemcached.KEY_EEXISTS {
- err = ErrKeyExists
- } else {
- err = res
- }
- }
- } else if opt&Append != 0 {
- res, err = mc.Append(vb, k, data)
- } else if data == nil {
- res, err = mc.Del(vb, k)
- } else {
- res, err = mc.Set(vb, k, flags, exp, data)
- }
-
- return err
- })
-
- if err == nil && (opt&(Persist|Indexable) != 0) {
- err = b.WaitForPersistence(k, res.Cas, data == nil)
- }
-
- return err
-}
-
-func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{},
- opt WriteOptions) (mt *MutationToken, err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) {
- ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err)
- }(time.Now())
- }
-
- var data []byte
- if opt&Raw == 0 {
- data, err = json.Marshal(v)
- if err != nil {
- return nil, err
- }
- } else if v != nil {
- data = v.([]byte)
- }
-
- var res *gomemcached.MCResponse
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- if opt&AddOnly != 0 {
- res, err = memcached.UnwrapMemcachedError(
- mc.Add(vb, k, flags, exp, data))
- if err == nil && res.Status != gomemcached.SUCCESS {
- if res.Status == gomemcached.KEY_EEXISTS {
- err = ErrKeyExists
- } else {
- err = res
- }
- }
- } else if opt&Append != 0 {
- res, err = mc.Append(vb, k, data)
- } else if data == nil {
- res, err = mc.Del(vb, k)
- } else {
- res, err = mc.Set(vb, k, flags, exp, data)
- }
-
- if len(res.Extras) >= 16 {
- vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
- seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
- mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo}
- }
-
- return err
- })
-
- if err == nil && (opt&(Persist|Indexable) != 0) {
- err = b.WaitForPersistence(k, res.Cas, data == nil)
- }
-
- return mt, err
-}
-
-// Set a value in this bucket with Cas and return the new Cas value
-func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) (uint64, error) {
- return b.WriteCas(k, 0, exp, cas, v, 0)
-}
-
-// Set a value in this bucket with Cas without json encoding it
-func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) (uint64, error) {
- return b.WriteCas(k, 0, exp, cas, v, Raw)
-}
-
-func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
- opt WriteOptions) (newCas uint64, err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) {
- ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
- }(time.Now())
- }
-
- var data []byte
- if opt&Raw == 0 {
- data, err = json.Marshal(v)
- if err != nil {
- return 0, err
- }
- } else if v != nil {
- data = v.([]byte)
- }
-
- var res *gomemcached.MCResponse
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err = mc.SetCas(vb, k, flags, exp, cas, data)
- return err
- })
-
- if err == nil && (opt&(Persist|Indexable) != 0) {
- err = b.WaitForPersistence(k, res.Cas, data == nil)
- }
-
- return res.Cas, err
-}
-
-// Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard
-func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) {
- return b.WriteCasWithMT(k, flags, exp, cas, v, 0)
-}
-
-func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) {
- return b.WriteCasWithMT(k, flags, exp, cas, v, Raw)
-}
-
-func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{},
- opt WriteOptions) (newCas uint64, mt *MutationToken, err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) {
- ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
- }(time.Now())
- }
-
- var data []byte
- if opt&Raw == 0 {
- data, err = json.Marshal(v)
- if err != nil {
- return 0, nil, err
- }
- } else if v != nil {
- data = v.([]byte)
- }
-
- var res *gomemcached.MCResponse
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err = mc.SetCas(vb, k, flags, exp, cas, data)
- return err
- })
-
- if err != nil {
- return 0, nil, err
- }
-
- // check for extras
- if len(res.Extras) >= 16 {
- vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
- seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
- vb := b.VBHash(k)
- mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo}
- }
-
- if err == nil && (opt&(Persist|Indexable) != 0) {
- err = b.WaitForPersistence(k, res.Cas, data == nil)
- }
-
- return res.Cas, mt, err
-}
-
-// Set a value in this bucket.
-// The value will be serialized into a JSON document.
-func (b *Bucket) Set(k string, exp int, v interface{}) error {
- return b.Write(k, 0, exp, v, 0)
-}
-
-// Set a value in this bucket with with flags
-func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}) (*MutationToken, error) {
- return b.WriteWithMT(k, flags, exp, v, 0)
-}
-
-// SetRaw sets a value in this bucket without JSON encoding it.
-func (b *Bucket) SetRaw(k string, exp int, v []byte) error {
- return b.Write(k, 0, exp, v, Raw)
-}
-
-// Add adds a value to this bucket; like Set except that nothing
-// happens if the key exists. The value will be serialized into a
-// JSON document.
-func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error) {
- err = b.Write(k, 0, exp, v, AddOnly)
- if err == ErrKeyExists {
- return false, nil
- }
- return (err == nil), err
-}
-
-// AddRaw adds a value to this bucket; like SetRaw except that nothing
-// happens if the key exists. The value will be stored as raw bytes.
-func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error) {
- err = b.Write(k, 0, exp, v, AddOnly|Raw)
- if err == ErrKeyExists {
- return false, nil
- }
- return (err == nil), err
-}
-
-// Add adds a value to this bucket; like Set except that nothing
-// happens if the key exists. The value will be serialized into a
-// JSON document.
-func (b *Bucket) AddWithMT(k string, exp int, v interface{}) (added bool, mt *MutationToken, err error) {
- mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly)
- if err == ErrKeyExists {
- return false, mt, nil
- }
- return (err == nil), mt, err
-}
-
-// AddRaw adds a value to this bucket; like SetRaw except that nothing
-// happens if the key exists. The value will be stored as raw bytes.
-func (b *Bucket) AddRawWithMT(k string, exp int, v []byte) (added bool, mt *MutationToken, err error) {
- mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw)
- if err == ErrKeyExists {
- return false, mt, nil
- }
- return (err == nil), mt, err
-}
-
-// Append appends raw data to an existing item.
-func (b *Bucket) Append(k string, data []byte) error {
- return b.Write(k, 0, 0, data, Append|Raw)
-}
-
-func (b *Bucket) GetsMCFromCollection(collUid uint32, key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) {
- var err error
- var response *gomemcached.MCResponse
-
- if key == "" {
- return nil, nil
- }
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetsMCFromCollection", key, t, err) }(time.Now())
- }
-
- err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
- var err1 error
-
- mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
- _, err1 = mc.SelectBucket(b.Name)
- if err1 != nil {
- mc.SetDeadline(noDeadline)
- return err1
- }
-
- mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
- response, err1 = mc.GetFromCollection(vb, collUid, key)
- if err1 != nil {
- mc.SetDeadline(noDeadline)
- return err1
- }
-
- return nil
- }, false)
-
- return response, err
-}
-
-// Returns collectionUid, manifestUid, error.
-func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline time.Time) (uint32, uint32, error) {
- var err error
- var response *gomemcached.MCResponse
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetCollectionCID", scope+"."+collection, t, err) }(time.Now())
- }
-
- var key = "DUMMY" // Contact any server.
- var manifestUid uint32
- var collUid uint32
- err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
- var err1 error
-
- mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
- _, err1 = mc.SelectBucket(b.Name)
- if err1 != nil {
- mc.SetDeadline(noDeadline)
- return err1
- }
-
- response, err1 = mc.CollectionsGetCID(scope, collection)
- if err1 != nil {
- mc.SetDeadline(noDeadline)
- return err1
- }
-
- manifestUid = binary.BigEndian.Uint32(response.Extras[4:8])
- collUid = binary.BigEndian.Uint32(response.Extras[8:12])
-
- return nil
- }, false)
-
- return collUid, manifestUid, err
-}
-
-// Get a value straight from Memcached
-func (b *Bucket) GetsMC(key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) {
- var err error
- var response *gomemcached.MCResponse
-
- if key == "" {
- return nil, nil
- }
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now())
- }
-
- err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
- var err1 error
-
- mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
- response, err1 = mc.Get(vb, key)
- mc.SetDeadline(noDeadline)
- if err1 != nil {
- return err1
- }
- return nil
- }, false)
- return response, err
-}
-
-// Get a value through the subdoc API
-func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string) (*gomemcached.MCResponse, error) {
- var err error
- var response *gomemcached.MCResponse
-
- if key == "" {
- return nil, nil
- }
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now())
- }
-
- err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
- var err1 error
-
- mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
- response, err1 = mc.GetSubdoc(vb, key, subPaths)
- mc.SetDeadline(noDeadline)
- if err1 != nil {
- return err1
- }
- return nil
- }, false)
- return response, err
-}
-
-// GetsRaw gets a raw value from this bucket including its CAS
-// counter and flags.
-func (b *Bucket) GetsRaw(k string) (data []byte, flags int,
- cas uint64, err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
- }
-
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err := mc.Get(vb, k)
- if err != nil {
- return err
- }
- cas = res.Cas
- if len(res.Extras) >= 4 {
- flags = int(binary.BigEndian.Uint32(res.Extras))
- }
- data = res.Body
- return nil
- })
- return
-}
-
-// Gets gets a value from this bucket, including its CAS counter. The
-// value is expected to be a JSON stream and will be deserialized into
-// rv.
-func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error {
- data, _, cas, err := b.GetsRaw(k)
- if err != nil {
- return err
- }
- if caso != nil {
- *caso = cas
- }
- return json.Unmarshal(data, rv)
-}
-
-// Get a value from this bucket.
-// The value is expected to be a JSON stream and will be deserialized
-// into rv.
-func (b *Bucket) Get(k string, rv interface{}) error {
- return b.Gets(k, rv, nil)
-}
-
-// GetRaw gets a raw value from this bucket. No marshaling is performed.
-func (b *Bucket) GetRaw(k string) ([]byte, error) {
- d, _, _, err := b.GetsRaw(k)
- return d, err
-}
-
-// GetAndTouchRaw gets a raw value from this bucket including its CAS
-// counter and flags, and updates the expiry on the doc.
-func (b *Bucket) GetAndTouchRaw(k string, exp int) (data []byte,
- cas uint64, err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
- }
-
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err := mc.GetAndTouch(vb, k, exp)
- if err != nil {
- return err
- }
- cas = res.Cas
- data = res.Body
- return nil
- })
- return data, cas, err
-}
-
-// GetMeta returns the meta values for a key
-func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64) (err error) {
-
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now())
- }
-
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err := mc.GetMeta(vb, k)
- if err != nil {
- return err
- }
-
- *cas = res.Cas
- if len(res.Extras) >= 8 {
- *flags = int(binary.BigEndian.Uint32(res.Extras[4:]))
- }
-
- if len(res.Extras) >= 12 {
- *expiry = int(binary.BigEndian.Uint32(res.Extras[8:]))
- }
-
- if len(res.Extras) >= 20 {
- *seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:]))
- }
-
- return nil
- })
-
- return err
-}
-
-// Delete a key from this bucket.
-func (b *Bucket) Delete(k string) error {
- return b.Write(k, 0, 0, nil, Raw)
-}
-
-// Incr increments the value at a given key by amt and defaults to def if no value present.
-func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error) {
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now())
- }
-
- var rv uint64
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err := mc.Incr(vb, k, amt, def, exp)
- if err != nil {
- return err
- }
- rv = res
- return nil
- })
- return rv, err
-}
-
-// Decr decrements the value at a given key by amt and defaults to def if no value present
-func (b *Bucket) Decr(k string, amt, def uint64, exp int) (val uint64, err error) {
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now())
- }
-
- var rv uint64
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- res, err := mc.Decr(vb, k, amt, def, exp)
- if err != nil {
- return err
- }
- rv = res
- return nil
- })
- return rv, err
-}
-
-// Wrapper around memcached.CASNext()
-func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool {
- if ClientOpCallback != nil {
- defer func(t time.Time) {
- ClientOpCallback("casNext", k, t, state.Err)
- }(time.Now())
- }
-
- keepGoing := false
- state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- keepGoing = mc.CASNext(vb, k, exp, state)
- return state.Err
- })
- return keepGoing && state.Err == nil
-}
-
-// An UpdateFunc is a callback function to update a document
-type UpdateFunc func(current []byte) (updated []byte, err error)
-
-// Return this as the error from an UpdateFunc to cancel the Update
-// operation.
-const UpdateCancel = memcached.CASQuit
-
-// Update performs a Safe update of a document, avoiding conflicts by
-// using CAS.
-//
-// The callback function will be invoked with the current raw document
-// contents (or nil if the document doesn't exist); it should return
-// the updated raw contents (or nil to delete.) If it decides not to
-// change anything it can return UpdateCancel as the error.
-//
-// If another writer modifies the document between the get and the
-// set, the callback will be invoked again with the newer value.
-func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error {
- _, err := b.update(k, exp, callback)
- return err
-}
-
-// internal version of Update that returns a CAS value
-func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) {
- var state memcached.CASState
- for b.casNext(k, exp, &state) {
- var err error
- if state.Value, err = callback(state.Value); err != nil {
- return 0, err
- }
- }
- return state.Cas, state.Err
-}
-
-// A WriteUpdateFunc is a callback function to update a document
-type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)
-
-// WriteUpdate performs a Safe update of a document, avoiding
-// conflicts by using CAS. WriteUpdate is like Update, except that
-// the callback can return a set of WriteOptions, of which Persist and
-// Indexable are recognized: these cause the call to wait until the
-// document update has been persisted to disk and/or become available
-// to index.
-func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error {
- var writeOpts WriteOptions
- var deletion bool
- // Wrap the callback in an UpdateFunc we can pass to Update:
- updateCallback := func(current []byte) (updated []byte, err error) {
- update, opt, err := callback(current)
- writeOpts = opt
- deletion = (update == nil)
- return update, err
- }
- cas, err := b.update(k, exp, updateCallback)
- if err != nil {
- return err
- }
- // If callback asked, wait for persistence or indexability:
- if writeOpts&(Persist|Indexable) != 0 {
- err = b.WaitForPersistence(k, cas, deletion)
- }
- return err
-}
-
-// Observe observes the current state of a document.
-func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) {
- if ClientOpCallback != nil {
- defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now())
- }
-
- err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
- result, err = mc.Observe(vb, k)
- return err
- })
- return
-}
-
-// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
-// if the value has been overwritten by another before being persisted.
-var ErrOverwritten = errors.New("overwritten")
-
-// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
-// if the value hasn't been persisted by the timeout interval
-var ErrTimeout = errors.New("timeout")
-
-// WaitForPersistence waits for an item to be considered durable.
-//
-// Besides transport errors, ErrOverwritten may be returned if the
-// item is overwritten before it reaches durability. ErrTimeout may
-// occur if the item isn't found durable in a reasonable amount of
-// time.
-func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error {
- timeout := 10 * time.Second
- sleepDelay := 5 * time.Millisecond
- start := time.Now()
- for {
- time.Sleep(sleepDelay)
- sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time
-
- result, err := b.Observe(k)
- if err != nil {
- return err
- }
- if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten {
- return ErrOverwritten
- } else if persisted {
- return nil
- }
-
- if result.PersistenceTime > 0 {
- timeout = 2 * result.PersistenceTime
- }
- if time.Since(start) >= timeout-sleepDelay {
- return ErrTimeout
- }
- }
-}
-
-var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16)
-
-type stringPool struct {
- pool *sync.Pool
- size int
-}
-
-func newStringPool(size int) *stringPool {
- rv := &stringPool{
- pool: &sync.Pool{
- New: func() interface{} {
- return make([]string, 0, size)
- },
- },
- size: size,
- }
-
- return rv
-}
-
-func (this *stringPool) Get() []string {
- return this.pool.Get().([]string)
-}
-
-func (this *stringPool) Put(s []string) {
- if s == nil || cap(s) < this.size || cap(s) > 2*this.size {
- return
- }
-
- this.pool.Put(s[0:0])
-}
-
-var _STRING_POOL = newStringPool(16)
-
-type vbStringPool struct {
- pool *sync.Pool
- strPool *stringPool
-}
-
-func newVBStringPool(size int, sp *stringPool) *vbStringPool {
- rv := &vbStringPool{
- pool: &sync.Pool{
- New: func() interface{} {
- return make(map[uint16][]string, size)
- },
- },
- strPool: sp,
- }
-
- return rv
-}
-
-func (this *vbStringPool) Get() map[uint16][]string {
- return this.pool.Get().(map[uint16][]string)
-}
-
-func (this *vbStringPool) Put(s map[uint16][]string) {
- if s == nil {
- return
- }
-
- for k, v := range s {
- delete(s, k)
- this.strPool.Put(v)
- }
-
- this.pool.Put(s)
-}
-
-var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL)
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go b/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go
deleted file mode 100644
index 23102abd84..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go
+++ /dev/null
@@ -1,410 +0,0 @@
-package couchbase
-
-import (
- "crypto/tls"
- "errors"
- "sync/atomic"
- "time"
-
- "github.com/couchbase/gomemcached"
- "github.com/couchbase/gomemcached/client"
- "github.com/couchbase/goutils/logging"
-)
-
-// GenericMcdAuthHandler is a kind of AuthHandler that performs
-// special auth exchange (like non-standard auth, possibly followed by
-// select-bucket).
-type GenericMcdAuthHandler interface {
- AuthHandler
- AuthenticateMemcachedConn(host string, conn *memcached.Client) error
-}
-
-// Error raised when a connection can't be retrieved from a pool.
-var TimeoutError = errors.New("timeout waiting to build connection")
-var errClosedPool = errors.New("the connection pool is closed")
-var errNoPool = errors.New("no connection pool")
-
-// Default timeout for retrieving a connection from the pool.
-var ConnPoolTimeout = time.Hour * 24 * 30
-
-// overflow connection closer cycle time
-var ConnCloserInterval = time.Second * 30
-
-// ConnPoolAvailWaitTime is the amount of time to wait for an existing
-// connection from the pool before considering the creation of a new
-// one.
-var ConnPoolAvailWaitTime = time.Millisecond
-
-type connectionPool struct {
- host string
- mkConn func(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error)
- auth AuthHandler
- connections chan *memcached.Client
- createsem chan bool
- bailOut chan bool
- poolSize int
- connCount uint64
- inUse bool
- tlsConfig *tls.Config
- bucket string
-}
-
-func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int, tlsConfig *tls.Config, bucket string) *connectionPool {
- connSize := poolSize
- if closer {
- connSize += poolOverflow
- }
- rv := &connectionPool{
- host: host,
- connections: make(chan *memcached.Client, connSize),
- createsem: make(chan bool, poolSize+poolOverflow),
- mkConn: defaultMkConn,
- auth: ah,
- poolSize: poolSize,
- tlsConfig: tlsConfig,
- bucket: bucket,
- }
- if closer {
- rv.bailOut = make(chan bool, 1)
- go rv.connCloser()
- }
- return rv
-}
-
-// ConnPoolTimeout is notified whenever connections are acquired from a pool.
-var ConnPoolCallback func(host string, source string, start time.Time, err error)
-
-// Use regular in-the-clear connection if tlsConfig is nil.
-// Use secure connection (TLS) if tlsConfig is set.
-func defaultMkConn(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error) {
- var features memcached.Features
-
- var conn *memcached.Client
- var err error
- if tlsConfig == nil {
- conn, err = memcached.Connect("tcp", host)
- } else {
- conn, err = memcached.ConnectTLS("tcp", host, tlsConfig)
- }
-
- if err != nil {
- return nil, err
- }
-
- if TCPKeepalive == true {
- conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second)
- }
-
- if EnableMutationToken == true {
- features = append(features, memcached.FeatureMutationToken)
- }
- if EnableDataType == true {
- features = append(features, memcached.FeatureDataType)
- }
-
- if EnableXattr == true {
- features = append(features, memcached.FeatureXattr)
- }
-
- if EnableCollections {
- features = append(features, memcached.FeatureCollections)
- }
-
- if len(features) > 0 {
- if DefaultTimeout > 0 {
- conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
- }
-
- res, err := conn.EnableFeatures(features)
-
- if DefaultTimeout > 0 {
- conn.SetDeadline(noDeadline)
- }
-
- if err != nil && isTimeoutError(err) {
- conn.Close()
- return nil, err
- }
-
- if err != nil || res.Status != gomemcached.SUCCESS {
- logging.Warnf("Unable to enable features %v", err)
- }
- }
-
- if gah, ok := ah.(GenericMcdAuthHandler); ok {
- err = gah.AuthenticateMemcachedConn(host, conn)
- if err != nil {
- conn.Close()
- return nil, err
- }
- return conn, nil
- }
- name, pass, bucket := ah.GetCredentials()
- if bucket == "" {
- // Authenticator does not know specific bucket.
- bucket = bucketName
- }
-
- if name != "default" {
- _, err = conn.Auth(name, pass)
- if err != nil {
- conn.Close()
- return nil, err
- }
- // Select bucket (Required for cb_auth creds)
- // Required when doing auth with _admin credentials
- if bucket != "" && bucket != name {
- _, err = conn.SelectBucket(bucket)
- if err != nil {
- conn.Close()
- return nil, err
- }
- }
- }
- return conn, nil
-}
-
-func (cp *connectionPool) Close() (err error) {
- defer func() {
- if recover() != nil {
- err = errors.New("connectionPool.Close error")
- }
- }()
- if cp.bailOut != nil {
-
- // defensively, we won't wait if the channel is full
- select {
- case cp.bailOut <- false:
- default:
- }
- }
- close(cp.connections)
- for c := range cp.connections {
- c.Close()
- }
- return
-}
-
-func (cp *connectionPool) Node() string {
- return cp.host
-}
-
-func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) {
- if cp == nil {
- return nil, errNoPool
- }
-
- path := ""
-
- if ConnPoolCallback != nil {
- defer func(path *string, start time.Time) {
- ConnPoolCallback(cp.host, *path, start, err)
- }(&path, time.Now())
- }
-
- path = "short-circuit"
-
- // short-circuit available connetions.
- select {
- case rv, isopen := <-cp.connections:
- if !isopen {
- return nil, errClosedPool
- }
- atomic.AddUint64(&cp.connCount, 1)
- return rv, nil
- default:
- }
-
- t := time.NewTimer(ConnPoolAvailWaitTime)
- defer t.Stop()
-
- // Try to grab an available connection within 1ms
- select {
- case rv, isopen := <-cp.connections:
- path = "avail1"
- if !isopen {
- return nil, errClosedPool
- }
- atomic.AddUint64(&cp.connCount, 1)
- return rv, nil
- case <-t.C:
- // No connection came around in time, let's see
- // whether we can get one or build a new one first.
- t.Reset(d) // Reuse the timer for the full timeout.
- select {
- case rv, isopen := <-cp.connections:
- path = "avail2"
- if !isopen {
- return nil, errClosedPool
- }
- atomic.AddUint64(&cp.connCount, 1)
- return rv, nil
- case cp.createsem <- true:
- path = "create"
- // Build a connection if we can't get a real one.
- // This can potentially be an overflow connection, or
- // a pooled connection.
- rv, err := cp.mkConn(cp.host, cp.auth, cp.tlsConfig, cp.bucket)
- if err != nil {
- // On error, release our create hold
- <-cp.createsem
- } else {
- atomic.AddUint64(&cp.connCount, 1)
- }
- return rv, err
- case <-t.C:
- return nil, ErrTimeout
- }
- }
-}
-
-func (cp *connectionPool) Get() (*memcached.Client, error) {
- return cp.GetWithTimeout(ConnPoolTimeout)
-}
-
-func (cp *connectionPool) Return(c *memcached.Client) {
- if c == nil {
- return
- }
-
- if cp == nil {
- c.Close()
- }
-
- if c.IsHealthy() {
- defer func() {
- if recover() != nil {
- // This happens when the pool has already been
- // closed and we're trying to return a
- // connection to it anyway. Just close the
- // connection.
- c.Close()
- }
- }()
-
- select {
- case cp.connections <- c:
- default:
- <-cp.createsem
- c.Close()
- }
- } else {
- <-cp.createsem
- c.Close()
- }
-}
-
-// give the ability to discard a connection from a pool
-// useful for ditching connections to the wrong node after a rebalance
-func (cp *connectionPool) Discard(c *memcached.Client) {
- <-cp.createsem
- c.Close()
-}
-
-// asynchronous connection closer
-func (cp *connectionPool) connCloser() {
- var connCount uint64
-
- t := time.NewTimer(ConnCloserInterval)
- defer t.Stop()
-
- for {
- connCount = cp.connCount
-
- // we don't exist anymore! bail out!
- select {
- case <-cp.bailOut:
- return
- case <-t.C:
- }
- t.Reset(ConnCloserInterval)
-
- // no overflow connections open or sustained requests for connections
- // nothing to do until the next cycle
- if len(cp.connections) <= cp.poolSize ||
- ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) {
- continue
- }
-
- // close overflow connections now that they are not needed
- for c := range cp.connections {
- select {
- case <-cp.bailOut:
- return
- default:
- }
-
- // bail out if close did not work out
- if !cp.connCleanup(c) {
- return
- }
- if len(cp.connections) <= cp.poolSize {
- break
- }
- }
- }
-}
-
-// close connection with recovery on error
-func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) {
-
- // just in case we are closing a connection after
- // bailOut has been sent but we haven't yet read it
- defer func() {
- if recover() != nil {
- rv = false
- }
- }()
- rv = true
-
- c.Close()
- <-cp.createsem
- return
-}
-
-func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) {
- if cp == nil {
- return nil, errNoPool
- }
- mc, err := cp.Get()
- if err != nil {
- return nil, err
- }
-
- // A connection can't be used after TAP; Dont' count it against the
- // connection pool capacity
- <-cp.createsem
-
- return mc.StartTapFeed(*args)
-}
-
-const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
-
-func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) {
- if cp == nil {
- return nil, errNoPool
- }
- mc, err := cp.Get()
- if err != nil {
- return nil, err
- }
-
- // A connection can't be used after it has been allocated to UPR;
- // Dont' count it against the connection pool capacity
- <-cp.createsem
-
- uf, err := mc.NewUprFeed()
- if err != nil {
- return nil, err
- }
-
- if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil {
- return nil, err
- }
-
- if err := uf.StartFeedWithConfig(data_chan_size); err != nil {
- return nil, err
- }
-
- return uf, nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go b/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go
deleted file mode 100644
index f9cc343aa8..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go
+++ /dev/null
@@ -1,288 +0,0 @@
-package couchbase
-
-import (
- "bytes"
- "encoding/json"
- "fmt"
- "github.com/couchbase/goutils/logging"
- "io/ioutil"
- "net/http"
-)
-
-// ViewDefinition represents a single view within a design document.
-type ViewDefinition struct {
- Map string `json:"map"`
- Reduce string `json:"reduce,omitempty"`
-}
-
-// DDoc is the document body of a design document specifying a view.
-type DDoc struct {
- Language string `json:"language,omitempty"`
- Views map[string]ViewDefinition `json:"views"`
-}
-
-// DDocsResult represents the result from listing the design
-// documents.
-type DDocsResult struct {
- Rows []struct {
- DDoc struct {
- Meta map[string]interface{}
- JSON DDoc
- } `json:"doc"`
- } `json:"rows"`
-}
-
-// GetDDocs lists all design documents
-func (b *Bucket) GetDDocs() (DDocsResult, error) {
- var ddocsResult DDocsResult
- b.RLock()
- pool := b.pool
- uri := b.DDocs.URI
- b.RUnlock()
-
- // MB-23555 ephemeral buckets have no ddocs
- if uri == "" {
- return DDocsResult{}, nil
- }
-
- err := pool.client.parseURLResponse(uri, &ddocsResult)
- if err != nil {
- return DDocsResult{}, err
- }
- return ddocsResult, nil
-}
-
-func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error {
- ddocURI := fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
- err := b.parseAPIResponse(ddocURI, &into)
- if err != nil {
- return err
- }
- return nil
-}
-
-func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error) {
- var ddocsResult DDocsResult
- b.RLock()
- uri := b.DDocs.URI
- b.RUnlock()
-
- // MB-23555 ephemeral buckets have no ddocs
- if uri == "" {
- return DDocsResult{}, nil
- }
-
- err := b.parseURLResponse(uri, &ddocsResult)
- if err != nil {
- return DDocsResult{}, err
- }
- return ddocsResult, nil
-}
-
-func (b *Bucket) ddocURL(docname string) (string, error) {
- u, err := b.randomBaseURL()
- if err != nil {
- return "", err
- }
- u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
- return u.String(), nil
-}
-
-func (b *Bucket) ddocURLNext(nodeId int, docname string) (string, int, error) {
- u, selected, err := b.randomNextURL(nodeId)
- if err != nil {
- return "", -1, err
- }
- u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname)
- return u.String(), selected, nil
-}
-
-const ABS_MAX_RETRIES = 10
-const ABS_MIN_RETRIES = 3
-
-func (b *Bucket) getMaxRetries() (int, error) {
-
- maxRetries := len(b.Nodes())
-
- if maxRetries == 0 {
- return 0, fmt.Errorf("No available Couch rest URLs")
- }
-
- if maxRetries > ABS_MAX_RETRIES {
- maxRetries = ABS_MAX_RETRIES
- } else if maxRetries < ABS_MIN_RETRIES {
- maxRetries = ABS_MIN_RETRIES
- }
-
- return maxRetries, nil
-}
-
-// PutDDoc installs a design document.
-func (b *Bucket) PutDDoc(docname string, value interface{}) error {
-
- var Err error
-
- maxRetries, err := b.getMaxRetries()
- if err != nil {
- return err
- }
-
- lastNode := START_NODE_ID
-
- for retryCount := 0; retryCount < maxRetries; retryCount++ {
-
- Err = nil
-
- ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
- if err != nil {
- return err
- }
-
- lastNode = selectedNode
-
- logging.Infof(" Trying with selected node %d", selectedNode)
- j, err := json.Marshal(value)
- if err != nil {
- return err
- }
-
- req, err := http.NewRequest("PUT", ddocU, bytes.NewReader(j))
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/json")
- err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */))
- if err != nil {
- return err
- }
-
- res, err := doHTTPRequest(req)
- if err != nil {
- return err
- }
-
- if res.StatusCode != 201 {
- body, _ := ioutil.ReadAll(res.Body)
- Err = fmt.Errorf("error installing view: %v / %s",
- res.Status, body)
- logging.Errorf(" Error in PutDDOC %v. Retrying...", Err)
- res.Body.Close()
- b.Refresh()
- continue
- }
-
- res.Body.Close()
- break
- }
-
- return Err
-}
-
-// GetDDoc retrieves a specific a design doc.
-func (b *Bucket) GetDDoc(docname string, into interface{}) error {
- var Err error
- var res *http.Response
-
- maxRetries, err := b.getMaxRetries()
- if err != nil {
- return err
- }
-
- lastNode := START_NODE_ID
- for retryCount := 0; retryCount < maxRetries; retryCount++ {
-
- Err = nil
- ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
- if err != nil {
- return err
- }
-
- lastNode = selectedNode
- logging.Infof(" Trying with selected node %d", selectedNode)
-
- req, err := http.NewRequest("GET", ddocU, nil)
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/json")
- err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */))
- if err != nil {
- return err
- }
-
- res, err = doHTTPRequest(req)
- if err != nil {
- return err
- }
- if res.StatusCode != 200 {
- body, _ := ioutil.ReadAll(res.Body)
- Err = fmt.Errorf("error reading view: %v / %s",
- res.Status, body)
- logging.Errorf(" Error in GetDDOC %v Retrying...", Err)
- b.Refresh()
- res.Body.Close()
- continue
- }
- defer res.Body.Close()
- break
- }
-
- if Err != nil {
- return Err
- }
-
- d := json.NewDecoder(res.Body)
- return d.Decode(into)
-}
-
-// DeleteDDoc removes a design document.
-func (b *Bucket) DeleteDDoc(docname string) error {
-
- var Err error
-
- maxRetries, err := b.getMaxRetries()
- if err != nil {
- return err
- }
-
- lastNode := START_NODE_ID
-
- for retryCount := 0; retryCount < maxRetries; retryCount++ {
-
- Err = nil
- ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname)
- if err != nil {
- return err
- }
-
- lastNode = selectedNode
- logging.Infof(" Trying with selected node %d", selectedNode)
-
- req, err := http.NewRequest("DELETE", ddocU, nil)
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/json")
- err = maybeAddAuth(req, b.authHandler(false /* bucket not already locked */))
- if err != nil {
- return err
- }
-
- res, err := doHTTPRequest(req)
- if err != nil {
- return err
- }
- if res.StatusCode != 200 {
- body, _ := ioutil.ReadAll(res.Body)
- Err = fmt.Errorf("error deleting view : %v / %s", res.Status, body)
- logging.Errorf(" Error in DeleteDDOC %v. Retrying ... ", Err)
- b.Refresh()
- res.Body.Close()
- continue
- }
-
- res.Body.Close()
- break
- }
- return Err
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/observe.go b/vendor/github.com/couchbaselabs/go-couchbase/observe.go
deleted file mode 100644
index 6e746f5a16..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/observe.go
+++ /dev/null
@@ -1,300 +0,0 @@
-package couchbase
-
-import (
- "fmt"
- "github.com/couchbase/goutils/logging"
- "sync"
-)
-
-type PersistTo uint8
-
-const (
- PersistNone = PersistTo(0x00)
- PersistMaster = PersistTo(0x01)
- PersistOne = PersistTo(0x02)
- PersistTwo = PersistTo(0x03)
- PersistThree = PersistTo(0x04)
- PersistFour = PersistTo(0x05)
-)
-
-type ObserveTo uint8
-
-const (
- ObserveNone = ObserveTo(0x00)
- ObserveReplicateOne = ObserveTo(0x01)
- ObserveReplicateTwo = ObserveTo(0x02)
- ObserveReplicateThree = ObserveTo(0x03)
- ObserveReplicateFour = ObserveTo(0x04)
-)
-
-type JobType uint8
-
-const (
- OBSERVE = JobType(0x00)
- PERSIST = JobType(0x01)
-)
-
-type ObservePersistJob struct {
- vb uint16
- vbuuid uint64
- hostname string
- jobType JobType
- failover uint8
- lastPersistedSeqNo uint64
- currentSeqNo uint64
- resultChan chan *ObservePersistJob
- errorChan chan *OPErrResponse
-}
-
-type OPErrResponse struct {
- vb uint16
- vbuuid uint64
- err error
- job *ObservePersistJob
-}
-
-var ObservePersistPool = NewPool(1024)
-var OPJobChan = make(chan *ObservePersistJob, 1024)
-var OPJobDone = make(chan bool)
-
-var wg sync.WaitGroup
-
-func (b *Bucket) StartOPPollers(maxWorkers int) {
-
- for i := 0; i < maxWorkers; i++ {
- go b.OPJobPoll()
- wg.Add(1)
- }
- wg.Wait()
-}
-
-func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
-
- numNodes := len(b.Nodes())
- if int(nPersist) > numNodes || int(nObserve) > numNodes {
- return fmt.Errorf("Not enough healthy nodes in the cluster")
- }
-
- if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
- return fmt.Errorf("Not enough replicas in the cluster")
- }
-
- if EnableMutationToken == false {
- return fmt.Errorf("Mutation Tokens not enabled ")
- }
-
- b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
- return
-}
-
-func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
- b.RLock()
- ds := b.ds
- b.RUnlock()
-
- if ds == nil {
- return
- }
-
- nj := 0 // total number of jobs
- resultChan := make(chan *ObservePersistJob, 10)
- errChan := make(chan *OPErrResponse, 10)
-
- nodes := b.GetNodeList(vb)
- if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
- return fmt.Errorf("Not enough healthy nodes in the cluster"), false
- }
-
- logging.Infof("Node list %v", nodes)
-
- if ds.Observe >= ObserveReplicateOne {
- // create a job for each host
- for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
- opJob := ObservePersistPool.Get()
- opJob.vb = vb
- opJob.vbuuid = vbuuid
- opJob.jobType = OBSERVE
- opJob.hostname = nodes[i]
- opJob.resultChan = resultChan
- opJob.errorChan = errChan
-
- OPJobChan <- opJob
- nj++
-
- }
- }
-
- if ds.Persist >= PersistMaster {
- for i := PersistMaster; i < ds.Persist+1; i++ {
- opJob := ObservePersistPool.Get()
- opJob.vb = vb
- opJob.vbuuid = vbuuid
- opJob.jobType = PERSIST
- opJob.hostname = nodes[i]
- opJob.resultChan = resultChan
- opJob.errorChan = errChan
-
- OPJobChan <- opJob
- nj++
-
- }
- }
-
- ok := true
- for ok {
- select {
- case res := <-resultChan:
- jobDone := false
- if res.failover == 0 {
- // no failover
- if res.jobType == PERSIST {
- if res.lastPersistedSeqNo >= seqNo {
- jobDone = true
- }
-
- } else {
- if res.currentSeqNo >= seqNo {
- jobDone = true
- }
- }
-
- if jobDone == true {
- nj--
- ObservePersistPool.Put(res)
- } else {
- // requeue this job
- OPJobChan <- res
- }
-
- } else {
- // Not currently handling failover scenarios TODO
- nj--
- ObservePersistPool.Put(res)
- failover = true
- }
-
- if nj == 0 {
- // done with all the jobs
- ok = false
- close(resultChan)
- close(errChan)
- }
-
- case Err := <-errChan:
- logging.Errorf("Error in Observe/Persist %v", Err.err)
- err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
- nj--
- ObservePersistPool.Put(Err.job)
- if nj == 0 {
- close(resultChan)
- close(errChan)
- ok = false
- }
- }
- }
-
- return
-}
-
-func (b *Bucket) OPJobPoll() {
-
- ok := true
- for ok == true {
- select {
- case job := <-OPJobChan:
- pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
- if pool == nil {
- errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
- errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
- errRes.job = job
- job.errorChan <- errRes
- continue
- }
- conn, err := pool.Get()
- if err != nil {
- errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
- errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
- errRes.job = job
- job.errorChan <- errRes
- continue
- }
-
- res, err := conn.ObserveSeq(job.vb, job.vbuuid)
- if err != nil {
- errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
- errRes.err = fmt.Errorf("Command failed %v", err)
- errRes.job = job
- job.errorChan <- errRes
- continue
-
- }
- pool.Return(conn)
- job.lastPersistedSeqNo = res.LastPersistedSeqNo
- job.currentSeqNo = res.CurrentSeqNo
- job.failover = res.Failover
-
- job.resultChan <- job
- case <-OPJobDone:
- logging.Infof("Observe Persist Poller exitting")
- ok = false
- }
- }
- wg.Done()
-}
-
-func (b *Bucket) GetNodeList(vb uint16) []string {
-
- vbm := b.VBServerMap()
- if len(vbm.VBucketMap) < int(vb) {
- logging.Infof("vbmap smaller than vblist")
- return nil
- }
-
- nodes := make([]string, len(vbm.VBucketMap[vb]))
- for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
- n := vbm.VBucketMap[vb][i]
- if n < 0 {
- continue
- }
-
- node := b.getMasterNode(n)
- if len(node) > 1 {
- nodes[i] = node
- }
- continue
-
- }
- return nodes
-}
-
-//pool of ObservePersist Jobs
-type OPpool struct {
- pool chan *ObservePersistJob
-}
-
-// NewPool creates a new pool of jobs
-func NewPool(max int) *OPpool {
- return &OPpool{
- pool: make(chan *ObservePersistJob, max),
- }
-}
-
-// Borrow a Client from the pool.
-func (p *OPpool) Get() *ObservePersistJob {
- var o *ObservePersistJob
- select {
- case o = <-p.pool:
- default:
- o = &ObservePersistJob{}
- }
- return o
-}
-
-// Return returns a Client to the pool.
-func (p *OPpool) Put(o *ObservePersistJob) {
- select {
- case p.pool <- o:
- default:
- // let it go, let it go...
- }
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/pools.go b/vendor/github.com/couchbaselabs/go-couchbase/pools.go
deleted file mode 100644
index 0e2379398a..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/pools.go
+++ /dev/null
@@ -1,1474 +0,0 @@
-package couchbase
-
-import (
- "bufio"
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "math/rand"
- "net/http"
- "net/url"
- "runtime"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
- "unsafe"
-
- "github.com/couchbase/goutils/logging"
-
- "github.com/couchbase/gomemcached" // package name is 'gomemcached'
- "github.com/couchbase/gomemcached/client" // package name is 'memcached'
-)
-
-// HTTPClient to use for REST and view operations.
-var MaxIdleConnsPerHost = 256
-var ClientTimeOut = 10 * time.Second
-var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost}
-var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut}
-
-// PoolSize is the size of each connection pool (per host).
-var PoolSize = 64
-
-// PoolOverflow is the number of overflow connections allowed in a
-// pool.
-var PoolOverflow = 16
-
-// AsynchronousCloser turns on asynchronous closing for overflow connections
-var AsynchronousCloser = false
-
-// TCP KeepAlive enabled/disabled
-var TCPKeepalive = false
-
-// Enable MutationToken
-var EnableMutationToken = false
-
-// Enable Data Type response
-var EnableDataType = false
-
-// Enable Xattr
-var EnableXattr = false
-
-// Enable Collections
-var EnableCollections = false
-
-// TCP keepalive interval in seconds. Default 30 minutes
-var TCPKeepaliveInterval = 30 * 60
-
-// Used to decide whether to skip verification of certificates when
-// connecting to an ssl port.
-var skipVerify = true
-var certFile = ""
-var keyFile = ""
-var rootFile = ""
-
-func SetSkipVerify(skip bool) {
- skipVerify = skip
-}
-
-func SetCertFile(cert string) {
- certFile = cert
-}
-
-func SetKeyFile(cert string) {
- keyFile = cert
-}
-
-func SetRootFile(cert string) {
- rootFile = cert
-}
-
-// Allow applications to speciify the Poolsize and Overflow
-func SetConnectionPoolParams(size, overflow int) {
-
- if size > 0 {
- PoolSize = size
- }
-
- if overflow > 0 {
- PoolOverflow = overflow
- }
-}
-
-// Turn off overflow connections
-func DisableOverflowConnections() {
- PoolOverflow = 0
-}
-
-// Toggle asynchronous overflow closer
-func EnableAsynchronousCloser(closer bool) {
- AsynchronousCloser = closer
-}
-
-// Allow TCP keepalive parameters to be set by the application
-func SetTcpKeepalive(enabled bool, interval int) {
-
- TCPKeepalive = enabled
-
- if interval > 0 {
- TCPKeepaliveInterval = interval
- }
-}
-
-// AuthHandler is a callback that gets the auth username and password
-// for the given bucket.
-type AuthHandler interface {
- GetCredentials() (string, string, string)
-}
-
-// AuthHandler is a callback that gets the auth username and password
-// for the given bucket and sasl for memcached.
-type AuthWithSaslHandler interface {
- AuthHandler
- GetSaslCredentials() (string, string)
-}
-
-// MultiBucketAuthHandler is kind of AuthHandler that may perform
-// different auth for different buckets.
-type MultiBucketAuthHandler interface {
- AuthHandler
- ForBucket(bucket string) AuthHandler
-}
-
-// HTTPAuthHandler is kind of AuthHandler that performs more general
-// for outgoing http requests than is possible via simple
-// GetCredentials() call (i.e. digest auth or different auth per
-// different destinations).
-type HTTPAuthHandler interface {
- AuthHandler
- SetCredsForRequest(req *http.Request) error
-}
-
-// RestPool represents a single pool returned from the pools REST API.
-type RestPool struct {
- Name string `json:"name"`
- StreamingURI string `json:"streamingUri"`
- URI string `json:"uri"`
-}
-
-// Pools represents the collection of pools as returned from the REST API.
-type Pools struct {
- ComponentsVersion map[string]string `json:"componentsVersion,omitempty"`
- ImplementationVersion string `json:"implementationVersion"`
- IsAdmin bool `json:"isAdminCreds"`
- UUID string `json:"uuid"`
- Pools []RestPool `json:"pools"`
-}
-
-// A Node is a computer in a cluster running the couchbase software.
-type Node struct {
- ClusterCompatibility int `json:"clusterCompatibility"`
- ClusterMembership string `json:"clusterMembership"`
- CouchAPIBase string `json:"couchApiBase"`
- Hostname string `json:"hostname"`
- InterestingStats map[string]float64 `json:"interestingStats,omitempty"`
- MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"`
- MCDMemoryReserved float64 `json:"mcdMemoryReserved"`
- MemoryFree float64 `json:"memoryFree"`
- MemoryTotal float64 `json:"memoryTotal"`
- OS string `json:"os"`
- Ports map[string]int `json:"ports"`
- Services []string `json:"services"`
- Status string `json:"status"`
- Uptime int `json:"uptime,string"`
- Version string `json:"version"`
- ThisNode bool `json:"thisNode,omitempty"`
-}
-
-// A Pool of nodes and buckets.
-type Pool struct {
- BucketMap map[string]*Bucket
- Nodes []Node
-
- BucketURL map[string]string `json:"buckets"`
-
- client *Client
-}
-
-// VBucketServerMap is the a mapping of vbuckets to nodes.
-type VBucketServerMap struct {
- HashAlgorithm string `json:"hashAlgorithm"`
- NumReplicas int `json:"numReplicas"`
- ServerList []string `json:"serverList"`
- VBucketMap [][]int `json:"vBucketMap"`
-}
-
-type DurablitySettings struct {
- Persist PersistTo
- Observe ObserveTo
-}
-
-// Bucket is the primary entry point for most data operations.
-// Bucket is a locked data structure. All access to its fields should be done using read or write locking,
-// as appropriate.
-//
-// Some access methods require locking, but rely on the caller to do so. These are appropriate
-// for calls from methods that have already locked the structure. Methods like this
-// take a boolean parameter "bucketLocked".
-type Bucket struct {
- sync.RWMutex
- AuthType string `json:"authType"`
- Capabilities []string `json:"bucketCapabilities"`
- CapabilitiesVersion string `json:"bucketCapabilitiesVer"`
- Type string `json:"bucketType"`
- Name string `json:"name"`
- NodeLocator string `json:"nodeLocator"`
- Quota map[string]float64 `json:"quota,omitempty"`
- Replicas int `json:"replicaNumber"`
- Password string `json:"saslPassword"`
- URI string `json:"uri"`
- StreamingURI string `json:"streamingUri"`
- LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"`
- UUID string `json:"uuid"`
- ConflictResolutionType string `json:"conflictResolutionType,omitempty"`
- DDocs struct {
- URI string `json:"uri"`
- } `json:"ddocs,omitempty"`
- BasicStats map[string]interface{} `json:"basicStats,omitempty"`
- Controllers map[string]interface{} `json:"controllers,omitempty"`
-
- // These are used for JSON IO, but isn't used for processing
- // since it needs to be swapped out safely.
- VBSMJson VBucketServerMap `json:"vBucketServerMap"`
- NodesJSON []Node `json:"nodes"`
-
- pool *Pool
- connPools unsafe.Pointer // *[]*connectionPool
- vBucketServerMap unsafe.Pointer // *VBucketServerMap
- nodeList unsafe.Pointer // *[]Node
- commonSufix string
- ah AuthHandler // auth handler
- ds *DurablitySettings // Durablity Settings for this bucket
- closed bool
-}
-
-// PoolServices is all the bucket-independent services in a pool
-type PoolServices struct {
- Rev int `json:"rev"`
- NodesExt []NodeServices `json:"nodesExt"`
- Capabilities json.RawMessage `json:"clusterCapabilities"`
-}
-
-// NodeServices is all the bucket-independent services running on
-// a node (given by Hostname)
-type NodeServices struct {
- Services map[string]int `json:"services,omitempty"`
- Hostname string `json:"hostname"`
- ThisNode bool `json:"thisNode"`
-}
-
-type BucketNotFoundError struct {
- bucket string
-}
-
-func (e *BucketNotFoundError) Error() string {
- return fmt.Sprint("No bucket named " + e.bucket)
-}
-
-type BucketAuth struct {
- name string
- saslPwd string
- bucket string
-}
-
-func newBucketAuth(name string, pass string, bucket string) *BucketAuth {
- return &BucketAuth{name: name, saslPwd: pass, bucket: bucket}
-}
-
-func (ba *BucketAuth) GetCredentials() (string, string, string) {
- return ba.name, ba.saslPwd, ba.bucket
-}
-
-// VBServerMap returns the current VBucketServerMap.
-func (b *Bucket) VBServerMap() *VBucketServerMap {
- b.RLock()
- defer b.RUnlock()
- ret := (*VBucketServerMap)(b.vBucketServerMap)
- return ret
-}
-
-func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) {
- vbmap := b.VBServerMap()
- servers := vbmap.ServerList
- if addrs == nil {
- addrs = vbmap.ServerList
- }
-
- m := make(map[string][]uint16)
- for _, addr := range addrs {
- m[addr] = make([]uint16, 0)
- }
- for vbno, idxs := range vbmap.VBucketMap {
- if len(idxs) == 0 {
- return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno)
- } else if idxs[0] < 0 || idxs[0] >= len(servers) {
- return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno)
- }
- addr := servers[idxs[0]]
- if _, ok := m[addr]; ok {
- m[addr] = append(m[addr], uint16(vbno))
- }
- }
- return m, nil
-}
-
-// true if node is not on the bucket VBmap
-func (b *Bucket) checkVBmap(node string) bool {
- vbmap := b.VBServerMap()
- servers := vbmap.ServerList
-
- for _, idxs := range vbmap.VBucketMap {
- if len(idxs) == 0 {
- return true
- } else if idxs[0] < 0 || idxs[0] >= len(servers) {
- return true
- }
- if servers[idxs[0]] == node {
- return false
- }
- }
- return true
-}
-
-func (b *Bucket) GetName() string {
- b.RLock()
- defer b.RUnlock()
- ret := b.Name
- return ret
-}
-
-// Nodes returns the current list of nodes servicing this bucket.
-func (b *Bucket) Nodes() []Node {
- b.RLock()
- defer b.RUnlock()
- ret := *(*[]Node)(b.nodeList)
- return ret
-}
-
-// return the list of healthy nodes
-func (b *Bucket) HealthyNodes() []Node {
- nodes := []Node{}
-
- for _, n := range b.Nodes() {
- if n.Status == "healthy" && n.CouchAPIBase != "" {
- nodes = append(nodes, n)
- }
- if n.Status != "healthy" { // log non-healthy node
- logging.Infof("Non-healthy node; node details:")
- logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
- }
- }
-
- return nodes
-}
-
-func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool {
- if !bucketLocked {
- b.RLock()
- defer b.RUnlock()
- }
- if b.connPools != nil {
- return *(*[]*connectionPool)(b.connPools)
- } else {
- return nil
- }
-}
-
-func (b *Bucket) replaceConnPools(with []*connectionPool) {
- b.Lock()
- defer b.Unlock()
-
- old := b.connPools
- b.connPools = unsafe.Pointer(&with)
- if old != nil {
- for _, pool := range *(*[]*connectionPool)(old) {
- if pool != nil {
- pool.Close()
- }
- }
- }
- return
-}
-
-func (b *Bucket) getConnPool(i int) *connectionPool {
-
- if i < 0 {
- return nil
- }
-
- p := b.getConnPools(false /* not already locked */)
- if len(p) > i {
- return p[i]
- }
-
- return nil
-}
-
-func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool {
- pools := b.getConnPools(bucketLocked)
- for _, p := range pools {
- if p != nil && p.host == host {
- return p
- }
- }
-
- return nil
-}
-
-// Given a vbucket number, returns a memcached connection to it.
-// The connection must be returned to its pool after use.
-func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) {
- for {
- vbm := b.VBServerMap()
- if len(vbm.VBucketMap) < int(vb) {
- return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v",
- vb, vbm.VBucketMap)
- }
- masterId := vbm.VBucketMap[vb][0]
- if masterId < 0 {
- return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb)
- }
- pool := b.getConnPool(masterId)
- conn, err := pool.Get()
- if err != errClosedPool {
- return conn, pool, err
- }
- // If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
- }
-}
-
-// To get random documents, we need to cover all the nodes, so select
-// a connection at random.
-
-func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) {
- for {
- var currentPool = 0
- pools := b.getConnPools(false /* not already locked */)
- if len(pools) == 0 {
- return nil, nil, fmt.Errorf("No connection pool found")
- } else if len(pools) > 1 { // choose a random connection
- currentPool = rand.Intn(len(pools))
- } // if only one pool, currentPool defaults to 0, i.e., the only pool
-
- // get the pool
- pool := pools[currentPool]
- conn, err := pool.Get()
- if err != errClosedPool {
- return conn, pool, err
- }
-
- // If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
- }
-}
-
-//
-// Get a random document from a bucket. Since the bucket may be distributed
-// across nodes, we must first select a random connection, and then use the
-// Client.GetRandomDoc() call to get a random document from that node.
-//
-
-func (b *Bucket) GetRandomDoc() (*gomemcached.MCResponse, error) {
- // get a connection from the pool
- conn, pool, err := b.getRandomConnection()
-
- if err != nil {
- return nil, err
- }
-
- // We may need to select the bucket before GetRandomDoc()
- // will work. This is sometimes done at startup (see defaultMkConn())
- // but not always, depending on the auth type.
- _, err = conn.SelectBucket(b.Name)
- if err != nil {
- return nil, err
- }
-
- // get a randomm document from the connection
- doc, err := conn.GetRandomDoc()
- // need to return the connection to the pool
- pool.Return(conn)
- return doc, err
-}
-
-func (b *Bucket) getMasterNode(i int) string {
- p := b.getConnPools(false /* not already locked */)
- if len(p) > i {
- return p[i].host
- }
- return ""
-}
-
-func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) {
- if !bucketLocked {
- b.RLock()
- defer b.RUnlock()
- }
- pool := b.pool
- name := b.Name
-
- if pool != nil {
- ah = pool.client.ah
- }
- if mbah, ok := ah.(MultiBucketAuthHandler); ok {
- return mbah.ForBucket(name)
- }
- if ah == nil {
- ah = &basicAuth{name, ""}
- }
- return
-}
-
-// NodeAddresses gets the (sorted) list of memcached node addresses
-// (hostname:port).
-func (b *Bucket) NodeAddresses() []string {
- vsm := b.VBServerMap()
- rv := make([]string, len(vsm.ServerList))
- copy(rv, vsm.ServerList)
- sort.Strings(rv)
- return rv
-}
-
-// CommonAddressSuffix finds the longest common suffix of all
-// host:port strings in the node list.
-func (b *Bucket) CommonAddressSuffix() string {
- input := []string{}
- for _, n := range b.Nodes() {
- input = append(input, n.Hostname)
- }
- return FindCommonSuffix(input)
-}
-
-// A Client is the starting point for all services across all buckets
-// in a Couchbase cluster.
-type Client struct {
- BaseURL *url.URL
- ah AuthHandler
- Info Pools
- tlsConfig *tls.Config
-}
-
-func maybeAddAuth(req *http.Request, ah AuthHandler) error {
- if hah, ok := ah.(HTTPAuthHandler); ok {
- return hah.SetCredsForRequest(req)
- }
- if ah != nil {
- user, pass, _ := ah.GetCredentials()
- req.Header.Set("Authorization", "Basic "+
- base64.StdEncoding.EncodeToString([]byte(user+":"+pass)))
- }
- return nil
-}
-
-// arbitary number, may need to be tuned #FIXME
-const HTTP_MAX_RETRY = 5
-
-// Someday golang network packages will implement standard
-// error codes. Until then #sigh
-func isHttpConnError(err error) bool {
-
- estr := err.Error()
- return strings.Contains(estr, "broken pipe") ||
- strings.Contains(estr, "broken connection") ||
- strings.Contains(estr, "connection reset")
-}
-
-var client *http.Client
-
-func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) {
- cfg := &tls.Config{}
-
- if certFile != "" && keyFile != "" {
- tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile)
- if err != nil {
- return nil, err
- }
- cfg.Certificates = []tls.Certificate{tlsCert}
- } else {
- //error need to pass both certfile and keyfile
- return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile")
- }
-
- var caCert []byte
- var err1 error
-
- caCertPool := x509.NewCertPool()
- if rootFile != "" {
- // Read that value in
- caCert, err1 = ioutil.ReadFile(rootFile)
- if err1 != nil {
- return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1)
- }
- caCertPool.AppendCertsFromPEM(caCert)
- }
-
- cfg.RootCAs = caCertPool
- return cfg, nil
-}
-
-func doHTTPRequest(req *http.Request) (*http.Response, error) {
-
- var err error
- var res *http.Response
-
- // we need a client that ignores certificate errors, since we self-sign
- // our certs
- if client == nil && req.URL.Scheme == "https" {
- var tr *http.Transport
-
- if skipVerify {
- tr = &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
- }
- } else {
- // Handle cases with cert
-
- cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
- if err != nil {
- return nil, err
- }
-
- tr = &http.Transport{
- TLSClientConfig: cfg,
- }
- }
-
- client = &http.Client{Transport: tr}
-
- } else if client == nil {
- client = HTTPClient
- }
-
- for i := 0; i < HTTP_MAX_RETRY; i++ {
- res, err = client.Do(req)
- if err != nil && isHttpConnError(err) {
- continue
- }
- break
- }
-
- if err != nil {
- return nil, err
- }
-
- return res, err
-}
-
-func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error {
- return doOutputAPI("PUT", baseURL, path, params, authHandler, out)
-}
-
-func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error {
- return doOutputAPI("POST", baseURL, path, params, authHandler, out)
-}
-
-func doOutputAPI(
- httpVerb string,
- baseURL *url.URL,
- path string,
- params map[string]interface{},
- authHandler AuthHandler,
- out interface{}) error {
-
- var requestUrl string
-
- if q := strings.Index(path, "?"); q > 0 {
- requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
- } else {
- requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
- }
-
- postData := url.Values{}
- for k, v := range params {
- postData.Set(k, fmt.Sprintf("%v", v))
- }
-
- req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode()))
- if err != nil {
- return err
- }
-
- req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
-
- err = maybeAddAuth(req, authHandler)
- if err != nil {
- return err
- }
-
- res, err := doHTTPRequest(req)
- if err != nil {
- return err
- }
-
- defer res.Body.Close()
- if res.StatusCode != 200 {
- bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
- return fmt.Errorf("HTTP error %v getting %q: %s",
- res.Status, requestUrl, bod)
- }
-
- d := json.NewDecoder(res.Body)
- if err = d.Decode(&out); err != nil {
- return err
- }
- return nil
-}
-
-func queryRestAPI(
- baseURL *url.URL,
- path string,
- authHandler AuthHandler,
- out interface{}) error {
-
- var requestUrl string
-
- if q := strings.Index(path, "?"); q > 0 {
- requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
- } else {
- requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
- }
-
- req, err := http.NewRequest("GET", requestUrl, nil)
- if err != nil {
- return err
- }
-
- err = maybeAddAuth(req, authHandler)
- if err != nil {
- return err
- }
-
- res, err := doHTTPRequest(req)
- if err != nil {
- return err
- }
-
- defer res.Body.Close()
- if res.StatusCode != 200 {
- bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
- return fmt.Errorf("HTTP error %v getting %q: %s",
- res.Status, requestUrl, bod)
- }
-
- d := json.NewDecoder(res.Body)
- if err = d.Decode(&out); err != nil {
- return err
- }
- return nil
-}
-
-func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error {
- return c.processStream(c.BaseURL, path, c.ah, callb, data)
-}
-
-// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309
-func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error {
- var requestUrl string
-
- if q := strings.Index(path, "?"); q > 0 {
- requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
- } else {
- requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
- }
-
- req, err := http.NewRequest("GET", requestUrl, nil)
- if err != nil {
- return err
- }
-
- err = maybeAddAuth(req, authHandler)
- if err != nil {
- return err
- }
-
- res, err := doHTTPRequest(req)
- if err != nil {
- return err
- }
-
- defer res.Body.Close()
- if res.StatusCode != 200 {
- bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
- return fmt.Errorf("HTTP error %v getting %q: %s",
- res.Status, requestUrl, bod)
- }
-
- reader := bufio.NewReader(res.Body)
- for {
- bs, err := reader.ReadBytes('\n')
- if err != nil {
- return err
- }
- if len(bs) == 1 && bs[0] == '\n' {
- continue
- }
-
- err = json.Unmarshal(bs, data)
- if err != nil {
- return err
- }
- err = callb(data)
- if err != nil {
- return err
- }
- }
- return nil
-
-}
-
-func (c *Client) parseURLResponse(path string, out interface{}) error {
- return queryRestAPI(c.BaseURL, path, c.ah, out)
-}
-
-func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error {
- return doPostAPI(c.BaseURL, path, params, c.ah, out)
-}
-
-func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error {
- return doPutAPI(c.BaseURL, path, params, c.ah, out)
-}
-
-func (b *Bucket) parseURLResponse(path string, out interface{}) error {
- nodes := b.Nodes()
- if len(nodes) == 0 {
- return errors.New("no couch rest URLs")
- }
-
- // Pick a random node to start querying.
- startNode := rand.Intn(len(nodes))
- maxRetries := len(nodes)
- for i := 0; i < maxRetries; i++ {
- node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
- // Skip non-healthy nodes.
- if node.Status != "healthy" || node.CouchAPIBase == "" {
- continue
- }
- url := &url.URL{
- Host: node.Hostname,
- Scheme: "http",
- }
-
- // Lock here to avoid having pool closed under us.
- b.RLock()
- err := queryRestAPI(url, path, b.pool.client.ah, out)
- b.RUnlock()
- if err == nil {
- return err
- }
- }
- return errors.New("All nodes failed to respond or no healthy nodes for bucket found")
-}
-
-func (b *Bucket) parseAPIResponse(path string, out interface{}) error {
- nodes := b.Nodes()
- if len(nodes) == 0 {
- return errors.New("no couch rest URLs")
- }
-
- var err error
- var u *url.URL
-
- // Pick a random node to start querying.
- startNode := rand.Intn(len(nodes))
- maxRetries := len(nodes)
- for i := 0; i < maxRetries; i++ {
- node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
- // Skip non-healthy nodes.
- if node.Status != "healthy" || node.CouchAPIBase == "" {
- continue
- }
-
- u, err = ParseURL(node.CouchAPIBase)
- // Lock here so pool does not get closed under us.
- b.RLock()
- if err != nil {
- b.RUnlock()
- return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
- b.Name, i, node.CouchAPIBase, err)
- } else if b.pool != nil {
- u.User = b.pool.client.BaseURL.User
- }
- u.Path = path
-
- // generate the path so that the strings are properly escaped
- // MB-13770
- requestPath := strings.Split(u.String(), u.Host)[1]
-
- err = queryRestAPI(u, requestPath, b.pool.client.ah, out)
- b.RUnlock()
- if err == nil {
- return err
- }
- }
-
- var errStr string
- if err != nil {
- errStr = "Error " + err.Error()
- }
-
- return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr)
-}
-
-type basicAuth struct {
- u, p string
-}
-
-func (b basicAuth) GetCredentials() (string, string, string) {
- return b.u, b.p, b.u
-}
-
-func basicAuthFromURL(us string) (ah AuthHandler) {
- u, err := ParseURL(us)
- if err != nil {
- return
- }
- if user := u.User; user != nil {
- pw, _ := user.Password()
- ah = basicAuth{user.Username(), pw}
- }
- return
-}
-
-// ConnectWithAuth connects to a couchbase cluster with the given
-// authentication handler.
-func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) {
- c.BaseURL, err = ParseURL(baseU)
- if err != nil {
- return
- }
- c.ah = ah
-
- return c, c.parseURLResponse("/pools", &c.Info)
-}
-
-// Call this method with a TLS certificate file name to make communication
-// with the KV engine encrypted.
-//
-// This method should be called immediately after a Connect*() method.
-func (c *Client) InitTLS(certFile string) error {
- serverCert, err := ioutil.ReadFile(certFile)
- if err != nil {
- return err
- }
- CA_Pool := x509.NewCertPool()
- CA_Pool.AppendCertsFromPEM(serverCert)
- c.tlsConfig = &tls.Config{RootCAs: CA_Pool}
- return nil
-}
-
-func (c *Client) ClearTLS() {
- c.tlsConfig = nil
-}
-
-// ConnectWithAuthCreds connects to a couchbase cluster with the give
-// authorization creds returned by cb_auth
-func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) {
- c.BaseURL, err = ParseURL(baseU)
- if err != nil {
- return
- }
-
- c.ah = newBucketAuth(username, password, "")
- return c, c.parseURLResponse("/pools", &c.Info)
-}
-
-// Connect to a couchbase cluster. An authentication handler will be
-// created from the userinfo in the URL if provided.
-func Connect(baseU string) (Client, error) {
- return ConnectWithAuth(baseU, basicAuthFromURL(baseU))
-}
-
-type BucketInfo struct {
- Name string // name of bucket
- Password string // SASL password of bucket
-}
-
-//Get SASL buckets
-func GetBucketList(baseU string) (bInfo []BucketInfo, err error) {
-
- c := &Client{}
- c.BaseURL, err = ParseURL(baseU)
- if err != nil {
- return
- }
- c.ah = basicAuthFromURL(baseU)
-
- var buckets []Bucket
- err = c.parseURLResponse("/pools/default/buckets", &buckets)
- if err != nil {
- return
- }
- bInfo = make([]BucketInfo, 0)
- for _, bucket := range buckets {
- bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password}
- bInfo = append(bInfo, bucketInfo)
- }
- return bInfo, err
-}
-
-//Set viewUpdateDaemonOptions
-func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) {
-
- c := &Client{}
- c.BaseURL, err = ParseURL(baseU)
- if err != nil {
- return
- }
- c.ah = basicAuthFromURL(baseU)
-
- if len(params) < 1 {
- return nil, fmt.Errorf("No params to set")
- }
-
- err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts)
- if err != nil {
- return
- }
- return viewOpts, err
-}
-
-// This API lets the caller know, if the list of nodes a bucket is
-// connected to has gone through an edit (a rebalance operation)
-// since the last update to the bucket, in which case a Refresh is
-// advised.
-func (b *Bucket) NodeListChanged() bool {
- b.RLock()
- pool := b.pool
- uri := b.URI
- b.RUnlock()
-
- tmpb := &Bucket{}
- err := pool.client.parseURLResponse(uri, tmpb)
- if err != nil {
- return true
- }
-
- bNodes := *(*[]Node)(b.nodeList)
- if len(bNodes) != len(tmpb.NodesJSON) {
- return true
- }
-
- bucketHostnames := map[string]bool{}
- for _, node := range bNodes {
- bucketHostnames[node.Hostname] = true
- }
-
- for _, node := range tmpb.NodesJSON {
- if _, found := bucketHostnames[node.Hostname]; !found {
- return true
- }
- }
-
- return false
-}
-
-// Sample data for scopes and collections as returned from the
-// /pooles/default/$BUCKET_NAME/collections API.
-// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}}
-
-// Structures for parsing collections manifest.
-// The map key is the name of the scope.
-// Example data:
-// {"uid":"b","scopes":[
-// {"name":"_default","uid":"0","collections":[
-// {"name":"_default","uid":"0"}]},
-// {"name":"myScope1","uid":"8","collections":[
-// {"name":"myCollectionB","uid":"c"},
-// {"name":"myCollectionA","uid":"b"}]},
-// {"name":"myScope2","uid":"9","collections":[
-// {"name":"myCollectionC","uid":"d"}]}]}
-type InputManifest struct {
- Uid string
- Scopes []InputScope
-}
-type InputScope struct {
- Name string
- Uid string
- Collections []InputCollection
-}
-type InputCollection struct {
- Name string
- Uid string
-}
-
-// Structures for storing collections information.
-type Manifest struct {
- Uid uint64
- Scopes map[string]*Scope // map by name
-}
-type Scope struct {
- Name string
- Uid uint64
- Collections map[string]*Collection // map by name
-}
-type Collection struct {
- Name string
- Uid uint64
-}
-
-var _EMPTY_MANIFEST *Manifest = &Manifest{Uid: 0, Scopes: map[string]*Scope{}}
-
-func parseCollectionsManifest(res *gomemcached.MCResponse) (*Manifest, error) {
- if !EnableCollections {
- return _EMPTY_MANIFEST, nil
- }
-
- var im InputManifest
- err := json.Unmarshal(res.Body, &im)
- if err != nil {
- return nil, err
- }
-
- uid, err := strconv.ParseUint(im.Uid, 16, 64)
- if err != nil {
- return nil, err
- }
- mani := &Manifest{Uid: uid, Scopes: make(map[string]*Scope, len(im.Scopes))}
- for _, iscope := range im.Scopes {
- scope_uid, err := strconv.ParseUint(iscope.Uid, 16, 64)
- if err != nil {
- return nil, err
- }
- scope := &Scope{Uid: scope_uid, Name: iscope.Name, Collections: make(map[string]*Collection, len(iscope.Collections))}
- mani.Scopes[iscope.Name] = scope
- for _, icoll := range iscope.Collections {
- coll_uid, err := strconv.ParseUint(icoll.Uid, 16, 64)
- if err != nil {
- return nil, err
- }
- coll := &Collection{Uid: coll_uid, Name: icoll.Name}
- scope.Collections[icoll.Name] = coll
- }
- }
-
- return mani, nil
-}
-
-// This function assumes the bucket is locked.
-func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
- // Collections not used?
- if !EnableCollections {
- return nil, fmt.Errorf("Collections not enabled.")
- }
-
- b.RLock()
- pools := b.getConnPools(true /* already locked */)
- pool := pools[0] // Any pool will do, so use the first one.
- b.RUnlock()
- client, err := pool.Get()
- if err != nil {
- return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
- }
-
- // We need to select the bucket before GetCollectionsManifest()
- // will work. This is sometimes done at startup (see defaultMkConn())
- // but not always, depending on the auth type.
- // Doing this is safe because we collect the the connections
- // by bucket, so the bucket being selected will never change.
- _, err = client.SelectBucket(b.Name)
- if err != nil {
- pool.Return(client)
- return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
- }
-
- res, err := client.GetCollectionsManifest()
- if err != nil {
- pool.Return(client)
- return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
- }
- mani, err := parseCollectionsManifest(res)
- if err != nil {
- pool.Return(client)
- return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
- }
-
- pool.Return(client)
- return mani, nil
-}
-
-func (b *Bucket) RefreshFully() error {
- return b.refresh(false)
-}
-
-func (b *Bucket) Refresh() error {
- return b.refresh(true)
-}
-
-func (b *Bucket) refresh(preserveConnections bool) error {
- b.RLock()
- pool := b.pool
- uri := b.URI
- client := pool.client
- b.RUnlock()
- tlsConfig := client.tlsConfig
-
- var poolServices PoolServices
- var err error
- if tlsConfig != nil {
- poolServices, err = client.GetPoolServices("default")
- if err != nil {
- return err
- }
- }
-
- tmpb := &Bucket{}
- err = pool.client.parseURLResponse(uri, tmpb)
- if err != nil {
- return err
- }
-
- pools := b.getConnPools(false /* bucket not already locked */)
-
- // We need this lock to ensure that bucket refreshes happening because
- // of NMVb errors received during bulkGet do not end up over-writing
- // pool.inUse.
- b.Lock()
-
- for _, pool := range pools {
- if pool != nil {
- pool.inUse = false
- }
- }
-
- newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
- for i := range newcps {
-
- if preserveConnections {
- pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
- if pool != nil && pool.inUse == false {
- // if the hostname and index is unchanged then reuse this pool
- newcps[i] = pool
- pool.inUse = true
- continue
- }
- }
-
- hostport := tmpb.VBSMJson.ServerList[i]
- if tlsConfig != nil {
- hostport, err = MapKVtoSSL(hostport, &poolServices)
- if err != nil {
- b.Unlock()
- return err
- }
- }
-
- if b.ah != nil {
- newcps[i] = newConnectionPool(hostport,
- b.ah, AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name)
-
- } else {
- newcps[i] = newConnectionPool(hostport,
- b.authHandler(true /* bucket already locked */),
- AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name)
- }
- }
- b.replaceConnPools2(newcps, true /* bucket already locked */)
- tmpb.ah = b.ah
- b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
- b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
-
- b.Unlock()
- return nil
-}
-
-func (p *Pool) refresh() (err error) {
- p.BucketMap = make(map[string]*Bucket)
-
- buckets := []Bucket{}
- err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets)
- if err != nil {
- return err
- }
- for i, _ := range buckets {
- b := new(Bucket)
- *b = buckets[i]
- b.pool = p
- b.nodeList = unsafe.Pointer(&b.NodesJSON)
-
- // MB-33185 this is merely defensive, just in case
- // refresh() gets called on a perfectly node pool
- ob, ok := p.BucketMap[b.Name]
- if ok && ob.connPools != nil {
- ob.Close()
- }
- b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
- p.BucketMap[b.Name] = b
- runtime.SetFinalizer(b, bucketFinalizer)
- }
- return nil
-}
-
-// GetPool gets a pool from within the couchbase cluster (usually
-// "default").
-func (c *Client) GetPool(name string) (p Pool, err error) {
- var poolURI string
-
- for _, p := range c.Info.Pools {
- if p.Name == name {
- poolURI = p.URI
- break
- }
- }
- if poolURI == "" {
- return p, errors.New("No pool named " + name)
- }
-
- err = c.parseURLResponse(poolURI, &p)
-
- p.client = c
-
- err = p.refresh()
- return
-}
-
-// GetPoolServices returns all the bucket-independent services in a pool.
-// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)
-func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
- var poolName string
- for _, p := range c.Info.Pools {
- if p.Name == name {
- poolName = p.Name
- }
- }
- if poolName == "" {
- return ps, errors.New("No pool named " + name)
- }
-
- poolURI := "/pools/" + poolName + "/nodeServices"
- err = c.parseURLResponse(poolURI, &ps)
-
- return
-}
-
-func (b *Bucket) GetPoolServices(name string) (*PoolServices, error) {
- b.RLock()
- pool := b.pool
- b.RUnlock()
-
- ps, err := pool.client.GetPoolServices(name)
- if err != nil {
- return nil, err
- }
-
- return &ps, nil
-}
-
-// Close marks this bucket as no longer needed, closing connections it
-// may have open.
-func (b *Bucket) Close() {
- b.Lock()
- defer b.Unlock()
- if b.connPools != nil {
- for _, c := range b.getConnPools(true /* already locked */) {
- if c != nil {
- c.Close()
- }
- }
- b.connPools = nil
- }
-}
-
-func bucketFinalizer(b *Bucket) {
- if b.connPools != nil {
- if !b.closed {
- logging.Warnf("Finalizing a bucket with active connections.")
- }
-
- // MB-33185 do not leak connection pools
- b.Close()
- }
-}
-
-// GetBucket gets a bucket from within this pool.
-func (p *Pool) GetBucket(name string) (*Bucket, error) {
- rv, ok := p.BucketMap[name]
- if !ok {
- return nil, &BucketNotFoundError{bucket: name}
- }
- err := rv.Refresh()
- if err != nil {
- return nil, err
- }
- return rv, nil
-}
-
-// GetBucket gets a bucket from within this pool.
-func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) {
- rv, ok := p.BucketMap[bucket]
- if !ok {
- return nil, &BucketNotFoundError{bucket: bucket}
- }
- rv.ah = newBucketAuth(username, password, bucket)
- err := rv.Refresh()
- if err != nil {
- return nil, err
- }
- return rv, nil
-}
-
-// GetPool gets the pool to which this bucket belongs.
-func (b *Bucket) GetPool() *Pool {
- b.RLock()
- defer b.RUnlock()
- ret := b.pool
- return ret
-}
-
-// GetClient gets the client from which we got this pool.
-func (p *Pool) GetClient() *Client {
- return p.client
-}
-
-// Release bucket connections when the pool is no longer in use
-func (p *Pool) Close() {
- // fine to loop through the buckets unlocked
- // locking happens at the bucket level
- for b, _ := range p.BucketMap {
-
- // MB-33208 defer closing connection pools until the bucket is no longer used
- bucket := p.BucketMap[b]
- bucket.Lock()
- bucket.closed = true
- bucket.Unlock()
- }
-}
-
-// GetBucket is a convenience function for getting a named bucket from
-// a URL
-func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) {
- var err error
- client, err := Connect(endpoint)
- if err != nil {
- return nil, err
- }
-
- pool, err := client.GetPool(poolname)
- if err != nil {
- return nil, err
- }
-
- return pool.GetBucket(bucketname)
-}
-
-// ConnectWithAuthAndGetBucket is a convenience function for
-// getting a named bucket from a given URL and an auth callback
-func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
- ah AuthHandler) (*Bucket, error) {
- client, err := ConnectWithAuth(endpoint, ah)
- if err != nil {
- return nil, err
- }
-
- pool, err := client.GetPool(poolname)
- if err != nil {
- return nil, err
- }
-
- return pool.GetBucket(bucketname)
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/port_map.go b/vendor/github.com/couchbaselabs/go-couchbase/port_map.go
deleted file mode 100644
index 24c9f105db..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/port_map.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package couchbase
-
-/*
-
-The goal here is to map a hostname:port combination to another hostname:port
-combination. The original hostname:port gives the name and regular KV port
-of a couchbase server. We want to determine the corresponding SSL KV port.
-
-To do this, we have a pool services structure, as obtained from
-the /pools/default/nodeServices API.
-
-For a fully configured two-node system, the structure may look like this:
-{"rev":32,"nodesExt":[
- {"services":{"mgmt":8091,"mgmtSSL":18091,"fts":8094,"ftsSSL":18094,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211},"hostname":"172.23.123.101"},
- {"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211,"n1ql":8093,"n1qlSSL":18093},"thisNode":true,"hostname":"172.23.123.102"}]}
-
-In this case, note the "hostname" fields, and the "kv" and "kvSSL" fields.
-
-For a single-node system, perhaps brought up for testing, the structure may look like this:
-{"rev":66,"nodesExt":[
- {"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"kv":11210,"kvSSL":11207,"capi":8092,"capiSSL":18092,"projector":9999,"n1ql":8093,"n1qlSSL":18093},"thisNode":true}],"clusterCapabilitiesVer":[1,0],"clusterCapabilities":{"n1ql":["enhancedPreparedStatements"]}}
-
-Here, note that there is only a single entry in the "nodeExt" array and that it does not have a "hostname" field.
-We will assume that either hostname fields are present, or there is only a single node.
-*/
-
-import (
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
-)
-
-func ParsePoolServices(jsonInput string) (*PoolServices, error) {
- ps := &PoolServices{}
- err := json.Unmarshal([]byte(jsonInput), ps)
- return ps, err
-}
-
-func MapKVtoSSL(hostport string, ps *PoolServices) (string, error) {
- colonIndex := strings.LastIndex(hostport, ":")
- if colonIndex < 0 {
- return "", fmt.Errorf("Unable to find host/port separator in %s", hostport)
- }
- host := hostport[0:colonIndex]
- port := hostport[colonIndex+1:]
- portInt, err := strconv.Atoi(port)
- if err != nil {
- return "", fmt.Errorf("Unable to parse host/port combination %s: %v", hostport, err)
- }
-
- var ns *NodeServices
- if len(ps.NodesExt) == 1 {
- ns = &(ps.NodesExt[0])
- } else {
- for i := range ps.NodesExt {
- hostname := ps.NodesExt[i].Hostname
- if len(hostname) == 0 {
- // in case of missing hostname, check for 127.0.0.1
- hostname = "127.0.0.1"
- }
- if hostname == host {
- ns = &(ps.NodesExt[i])
- break
- }
- }
- }
-
- if ns == nil {
- return "", fmt.Errorf("Unable to parse host/port combination %s: no matching node found among %d", hostport, len(ps.NodesExt))
- }
- kv, found := ns.Services["kv"]
- if !found {
- return "", fmt.Errorf("Unable to map host/port combination %s: target host has no kv port listed", hostport)
- }
- kvSSL, found := ns.Services["kvSSL"]
- if !found {
- return "", fmt.Errorf("Unable to map host/port combination %s: target host has no kvSSL port listed", hostport)
- }
- if portInt != kv {
- return "", fmt.Errorf("Unable to map hostport combination %s: expected port %d but found %d", hostport, portInt, kv)
- }
- return fmt.Sprintf("%s:%d", host, kvSSL), nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/streaming.go b/vendor/github.com/couchbaselabs/go-couchbase/streaming.go
deleted file mode 100644
index 6d8f7dfd53..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/streaming.go
+++ /dev/null
@@ -1,215 +0,0 @@
-package couchbase
-
-import (
- "encoding/json"
- "fmt"
- "github.com/couchbase/goutils/logging"
- "io"
- "io/ioutil"
- "math/rand"
- "net"
- "net/http"
- "time"
- "unsafe"
-)
-
-// Bucket auto-updater gets the latest version of the bucket config from
-// the server. If the configuration has changed then updated the local
-// bucket information. If the bucket has been deleted then notify anyone
-// who is holding a reference to this bucket
-
-const MAX_RETRY_COUNT = 5
-const DISCONNECT_PERIOD = 120 * time.Second
-
-type NotifyFn func(bucket string, err error)
-
-// Use TCP keepalive to detect half close sockets
-var updaterTransport http.RoundTripper = &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- Dial: (&net.Dialer{
- Timeout: 30 * time.Second,
- KeepAlive: 30 * time.Second,
- }).Dial,
-}
-
-var updaterHTTPClient = &http.Client{Transport: updaterTransport}
-
-func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) {
-
- var err error
- var res *http.Response
-
- for i := 0; i < HTTP_MAX_RETRY; i++ {
- res, err = updaterHTTPClient.Do(req)
- if err != nil && isHttpConnError(err) {
- continue
- }
- break
- }
-
- if err != nil {
- return nil, err
- }
-
- return res, err
-}
-
-func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
- go func() {
- err := b.UpdateBucket()
- if err != nil {
- if notify != nil {
- notify(b.GetName(), err)
- }
- logging.Errorf(" Bucket Updater exited with err %v", err)
- }
- }()
-}
-
-func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) {
- if !bucketLocked {
- b.Lock()
- defer b.Unlock()
- }
- old := b.connPools
- b.connPools = unsafe.Pointer(&with)
- if old != nil {
- for _, pool := range *(*[]*connectionPool)(old) {
- if pool != nil && pool.inUse == false {
- pool.Close()
- }
- }
- }
- return
-}
-
-func (b *Bucket) UpdateBucket() error {
-
- var failures int
- var returnErr error
-
- var poolServices PoolServices
- var err error
- tlsConfig := b.pool.client.tlsConfig
- if tlsConfig != nil {
- poolServices, err = b.pool.client.GetPoolServices("default")
- if err != nil {
- return err
- }
- }
-
- for {
-
- if failures == MAX_RETRY_COUNT {
- logging.Errorf(" Maximum failures reached. Exiting loop...")
- return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
- }
-
- nodes := b.Nodes()
- if len(nodes) < 1 {
- return fmt.Errorf("No healthy nodes found")
- }
-
- startNode := rand.Intn(len(nodes))
- node := nodes[(startNode)%len(nodes)]
-
- streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, b.GetName())
- logging.Infof(" Trying with %s", streamUrl)
- req, err := http.NewRequest("GET", streamUrl, nil)
- if err != nil {
- return err
- }
-
- // Lock here to avoid having pool closed under us.
- b.RLock()
- err = maybeAddAuth(req, b.pool.client.ah)
- b.RUnlock()
- if err != nil {
- return err
- }
-
- res, err := doHTTPRequestForUpdate(req)
- if err != nil {
- return err
- }
-
- if res.StatusCode != 200 {
- bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
- logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
- res.Body.Close()
- returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
- failures++
- continue
- }
-
- dec := json.NewDecoder(res.Body)
-
- tmpb := &Bucket{}
- for {
-
- err := dec.Decode(&tmpb)
- if err != nil {
- returnErr = err
- res.Body.Close()
- break
- }
-
- // if we got here, reset failure count
- failures = 0
- b.Lock()
-
- // mark all the old connection pools for deletion
- pools := b.getConnPools(true /* already locked */)
- for _, pool := range pools {
- if pool != nil {
- pool.inUse = false
- }
- }
-
- newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
- for i := range newcps {
- // get the old connection pool and check if it is still valid
- pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
- if pool != nil && pool.inUse == false {
- // if the hostname and index is unchanged then reuse this pool
- newcps[i] = pool
- pool.inUse = true
- continue
- }
- // else create a new pool
- hostport := tmpb.VBSMJson.ServerList[i]
- if tlsConfig != nil {
- hostport, err = MapKVtoSSL(hostport, &poolServices)
- if err != nil {
- b.Unlock()
- return err
- }
- }
- if b.ah != nil {
- newcps[i] = newConnectionPool(hostport,
- b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name)
-
- } else {
- newcps[i] = newConnectionPool(hostport,
- b.authHandler(true /* bucket already locked */),
- false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name)
- }
- }
-
- b.replaceConnPools2(newcps, true /* bucket already locked */)
-
- tmpb.ah = b.ah
- b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
- b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
- b.Unlock()
-
- logging.Infof("Got new configuration for bucket %s", b.GetName())
-
- }
- // we are here because of an error
- failures++
- continue
-
- }
- return nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/tap.go b/vendor/github.com/couchbaselabs/go-couchbase/tap.go
deleted file mode 100644
index 86edd30554..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/tap.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package couchbase
-
-import (
- "github.com/couchbase/gomemcached/client"
- "github.com/couchbase/goutils/logging"
- "sync"
- "time"
-)
-
-const initialRetryInterval = 1 * time.Second
-const maximumRetryInterval = 30 * time.Second
-
-// A TapFeed streams mutation events from a bucket.
-//
-// Events from the bucket can be read from the channel 'C'. Remember
-// to call Close() on it when you're done, unless its channel has
-// closed itself already.
-type TapFeed struct {
- C <-chan memcached.TapEvent
-
- bucket *Bucket
- args *memcached.TapArguments
- nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes
- output chan memcached.TapEvent // Same as C but writeably-typed
- wg sync.WaitGroup
- quit chan bool
-}
-
-// StartTapFeed creates and starts a new Tap feed
-func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
- if args == nil {
- defaultArgs := memcached.DefaultTapArguments()
- args = &defaultArgs
- }
-
- feed := &TapFeed{
- bucket: b,
- args: args,
- output: make(chan memcached.TapEvent, 10),
- quit: make(chan bool),
- }
-
- go feed.run()
-
- feed.C = feed.output
- return feed, nil
-}
-
-// Goroutine that runs the feed
-func (feed *TapFeed) run() {
- retryInterval := initialRetryInterval
- bucketOK := true
- for {
- // Connect to the TAP feed of each server node:
- if bucketOK {
- killSwitch, err := feed.connectToNodes()
- if err == nil {
- // Run until one of the sub-feeds fails:
- select {
- case <-killSwitch:
- case <-feed.quit:
- return
- }
- feed.closeNodeFeeds()
- retryInterval = initialRetryInterval
- }
- }
-
- // On error, try to refresh the bucket in case the list of nodes changed:
- logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
- feed.bucket.Name, retryInterval)
- err := feed.bucket.Refresh()
- bucketOK = err == nil
-
- select {
- case <-time.After(retryInterval):
- case <-feed.quit:
- return
- }
- if retryInterval *= 2; retryInterval > maximumRetryInterval {
- retryInterval = maximumRetryInterval
- }
- }
-}
-
-func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
- killSwitch = make(chan bool)
- for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
- var singleFeed *memcached.TapFeed
- singleFeed, err = serverConn.StartTapFeed(feed.args)
- if err != nil {
- logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
- feed.closeNodeFeeds()
- return
- }
- feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
- go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
- feed.wg.Add(1)
- }
- return
-}
-
-// Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
-func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
- defer feed.wg.Done()
- for {
- select {
- case event, ok := <-singleFeed.C:
- if !ok {
- if singleFeed.Error != nil {
- logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
- }
- killSwitch <- true
- return
- }
- feed.output <- event
- case <-feed.quit:
- return
- }
- }
-}
-
-func (feed *TapFeed) closeNodeFeeds() {
- for _, f := range feed.nodeFeeds {
- f.Close()
- }
- feed.nodeFeeds = nil
-}
-
-// Close a Tap feed.
-func (feed *TapFeed) Close() error {
- select {
- case <-feed.quit:
- return nil
- default:
- }
-
- feed.closeNodeFeeds()
- close(feed.quit)
- feed.wg.Wait()
- close(feed.output)
- return nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/upr.go b/vendor/github.com/couchbaselabs/go-couchbase/upr.go
deleted file mode 100644
index bf1b209b7e..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/upr.go
+++ /dev/null
@@ -1,398 +0,0 @@
-package couchbase
-
-import (
- "log"
- "sync"
- "time"
-
- "fmt"
- "github.com/couchbase/gomemcached"
- "github.com/couchbase/gomemcached/client"
- "github.com/couchbase/goutils/logging"
-)
-
-// A UprFeed streams mutation events from a bucket.
-//
-// Events from the bucket can be read from the channel 'C'. Remember
-// to call Close() on it when you're done, unless its channel has
-// closed itself already.
-type UprFeed struct {
- C <-chan *memcached.UprEvent
-
- bucket *Bucket
- nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes
- output chan *memcached.UprEvent // Same as C but writeably-typed
- outputClosed bool
- quit chan bool
- name string // name of this UPR feed
- sequence uint32 // sequence number for this feed
- connected bool
- killSwitch chan bool
- closing bool
- wg sync.WaitGroup
- dcp_buffer_size uint32
- data_chan_size int
-}
-
-// UprFeed from a single connection
-type FeedInfo struct {
- uprFeed *memcached.UprFeed // UPR feed handle
- host string // hostname
- connected bool // connected
- quit chan bool // quit channel
-}
-
-type FailoverLog map[uint16]memcached.FailoverLog
-
-// GetFailoverLogs, get the failover logs for a set of vbucket ids
-func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {
-
- // map vbids to their corresponding hosts
- vbHostList := make(map[string][]uint16)
- vbm := b.VBServerMap()
- if len(vbm.VBucketMap) < len(vBuckets) {
- return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
- vbm.VBucketMap, vBuckets)
- }
-
- for _, vb := range vBuckets {
- masterID := vbm.VBucketMap[vb][0]
- master := b.getMasterNode(masterID)
- if master == "" {
- return nil, fmt.Errorf("No master found for vb %d", vb)
- }
-
- vbList := vbHostList[master]
- if vbList == nil {
- vbList = make([]uint16, 0)
- }
- vbList = append(vbList, vb)
- vbHostList[master] = vbList
- }
-
- failoverLogMap := make(FailoverLog)
- for _, serverConn := range b.getConnPools(false /* not already locked */) {
-
- vbList := vbHostList[serverConn.host]
- if vbList == nil {
- continue
- }
-
- mc, err := serverConn.Get()
- if err != nil {
- logging.Infof("No Free connections for vblist %v", vbList)
- return nil, fmt.Errorf("No Free connections for host %s",
- serverConn.host)
-
- }
- // close the connection so that it doesn't get reused for upr data
- // connection
- defer mc.Close()
- failoverlogs, err := mc.UprGetFailoverLog(vbList)
- if err != nil {
- return nil, fmt.Errorf("Error getting failover log %s host %s",
- err.Error(), serverConn.host)
-
- }
-
- for vb, log := range failoverlogs {
- failoverLogMap[vb] = *log
- }
- }
-
- return failoverLogMap, nil
-}
-
-func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
- return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
-}
-
-// StartUprFeed creates and starts a new Upr feed
-// No data will be sent on the channel unless vbuckets streams are requested
-func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {
-
- feed := &UprFeed{
- bucket: b,
- output: make(chan *memcached.UprEvent, data_chan_size),
- quit: make(chan bool),
- nodeFeeds: make(map[string]*FeedInfo, 0),
- name: name,
- sequence: sequence,
- killSwitch: make(chan bool),
- dcp_buffer_size: dcp_buffer_size,
- data_chan_size: data_chan_size,
- }
-
- err := feed.connectToNodes()
- if err != nil {
- return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
- }
- feed.connected = true
- go feed.run()
-
- feed.C = feed.output
- return feed, nil
-}
-
-// UprRequestStream starts a stream for a vb on a feed
-func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
- vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
-
- defer func() {
- if r := recover(); r != nil {
- log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
- }
- }()
-
- vbm := feed.bucket.VBServerMap()
- if len(vbm.VBucketMap) < int(vb) {
- return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
- vb, vbm.VBucketMap)
- }
-
- if int(vb) >= len(vbm.VBucketMap) {
- return fmt.Errorf("Invalid vbucket id %d", vb)
- }
-
- masterID := vbm.VBucketMap[vb][0]
- master := feed.bucket.getMasterNode(masterID)
- if master == "" {
- return fmt.Errorf("Master node not found for vbucket %d", vb)
- }
- singleFeed := feed.nodeFeeds[master]
- if singleFeed == nil {
- return fmt.Errorf("UprFeed for this host not found")
- }
-
- if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
- vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
- return err
- }
-
- return nil
-}
-
-// UprCloseStream ends a vbucket stream.
-func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {
-
- defer func() {
- if r := recover(); r != nil {
- log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
- }
- }()
-
- vbm := feed.bucket.VBServerMap()
- if len(vbm.VBucketMap) < int(vb) {
- return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
- vb, vbm.VBucketMap)
- }
-
- if int(vb) >= len(vbm.VBucketMap) {
- return fmt.Errorf("Invalid vbucket id %d", vb)
- }
-
- masterID := vbm.VBucketMap[vb][0]
- master := feed.bucket.getMasterNode(masterID)
- if master == "" {
- return fmt.Errorf("Master node not found for vbucket %d", vb)
- }
- singleFeed := feed.nodeFeeds[master]
- if singleFeed == nil {
- return fmt.Errorf("UprFeed for this host not found")
- }
-
- if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
- return err
- }
- return nil
-}
-
-// Goroutine that runs the feed
-func (feed *UprFeed) run() {
- retryInterval := initialRetryInterval
- bucketOK := true
- for {
- // Connect to the UPR feed of each server node:
- if bucketOK {
- // Run until one of the sub-feeds fails:
- select {
- case <-feed.killSwitch:
- case <-feed.quit:
- return
- }
- //feed.closeNodeFeeds()
- retryInterval = initialRetryInterval
- }
-
- if feed.closing == true {
- // we have been asked to shut down
- return
- }
-
- // On error, try to refresh the bucket in case the list of nodes changed:
- logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
- feed.bucket.Name, retryInterval)
-
- if err := feed.bucket.Refresh(); err != nil {
- // if we fail to refresh the bucket, exit the feed
- // MB-14917
- logging.Infof("Unable to refresh bucket %s ", err.Error())
- close(feed.output)
- feed.outputClosed = true
- feed.closeNodeFeeds()
- return
- }
-
- // this will only connect to nodes that are not connected or changed
- // user will have to reconnect the stream
- err := feed.connectToNodes()
- if err != nil {
- logging.Infof("Unable to connect to nodes..exit ")
- close(feed.output)
- feed.outputClosed = true
- feed.closeNodeFeeds()
- return
- }
- bucketOK = err == nil
-
- select {
- case <-time.After(retryInterval):
- case <-feed.quit:
- return
- }
- if retryInterval *= 2; retryInterval > maximumRetryInterval {
- retryInterval = maximumRetryInterval
- }
- }
-}
-
-func (feed *UprFeed) connectToNodes() (err error) {
- nodeCount := 0
- for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
-
- // this maybe a reconnection, so check if the connection to the node
- // already exists. Connect only if the node is not found in the list
- // or connected == false
- nodeFeed := feed.nodeFeeds[serverConn.host]
-
- if nodeFeed != nil && nodeFeed.connected == true {
- continue
- }
-
- var singleFeed *memcached.UprFeed
- var name string
- if feed.name == "" {
- name = "DefaultUprClient"
- } else {
- name = feed.name
- }
- singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
- if err != nil {
- logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
- feed.closeNodeFeeds()
- return
- }
- // add the node to the connection map
- feedInfo := &FeedInfo{
- uprFeed: singleFeed,
- connected: true,
- host: serverConn.host,
- quit: make(chan bool),
- }
- feed.nodeFeeds[serverConn.host] = feedInfo
- go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
- feed.wg.Add(1)
- nodeCount++
- }
- if nodeCount == 0 {
- return fmt.Errorf("No connection to bucket")
- }
-
- return nil
-}
-
-// Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
-func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
- singleFeed := nodeFeed.uprFeed
-
- defer func() {
- feed.wg.Done()
- if r := recover(); r != nil {
- //if feed is not closing, re-throw the panic
- if feed.outputClosed != true && feed.closing != true {
- panic(r)
- } else {
- logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")
-
- }
- }
- }()
-
- for {
- select {
- case <-nodeFeed.quit:
- nodeFeed.connected = false
- return
-
- case event, ok := <-singleFeed.C:
- if !ok {
- if singleFeed.Error != nil {
- logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
- }
- killSwitch <- true
- return
- }
- if feed.outputClosed == true {
- // someone closed the node feed
- logging.Infof("Node need closed, returning from forwardUprEvent")
- return
- }
- feed.output <- event
- if event.Status == gomemcached.NOT_MY_VBUCKET {
- logging.Infof(" Got a not my vbucket error !! ")
- if err := feed.bucket.Refresh(); err != nil {
- logging.Errorf("Unable to refresh bucket %s ", err.Error())
- feed.closeNodeFeeds()
- return
- }
- // this will only connect to nodes that are not connected or changed
- // user will have to reconnect the stream
- if err := feed.connectToNodes(); err != nil {
- logging.Errorf("Unable to connect to nodes %s", err.Error())
- return
- }
-
- }
- }
- }
-}
-
-func (feed *UprFeed) closeNodeFeeds() {
- for _, f := range feed.nodeFeeds {
- logging.Infof(" Sending close to forwardUprEvent ")
- close(f.quit)
- f.uprFeed.Close()
- }
- feed.nodeFeeds = nil
-}
-
-// Close a Upr feed.
-func (feed *UprFeed) Close() error {
- select {
- case <-feed.quit:
- return nil
- default:
- }
-
- feed.closing = true
- feed.closeNodeFeeds()
- close(feed.quit)
-
- feed.wg.Wait()
- if feed.outputClosed == false {
- feed.outputClosed = true
- close(feed.output)
- }
-
- return nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/users.go b/vendor/github.com/couchbaselabs/go-couchbase/users.go
deleted file mode 100644
index 47d4861522..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/users.go
+++ /dev/null
@@ -1,119 +0,0 @@
-package couchbase
-
-import (
- "bytes"
- "fmt"
-)
-
-type User struct {
- Name string
- Id string
- Domain string
- Roles []Role
-}
-
-type Role struct {
- Role string
- BucketName string `json:"bucket_name"`
-}
-
-// Sample:
-// {"role":"admin","name":"Admin","desc":"Can manage ALL cluster features including security.","ce":true}
-// {"role":"query_select","bucket_name":"*","name":"Query Select","desc":"Can execute SELECT statement on bucket to retrieve data"}
-type RoleDescription struct {
- Role string
- Name string
- Desc string
- Ce bool
- BucketName string `json:"bucket_name"`
-}
-
-// Return user-role data, as parsed JSON.
-// Sample:
-// [{"id":"ivanivanov","name":"Ivan Ivanov","roles":[{"role":"cluster_admin"},{"bucket_name":"default","role":"bucket_admin"}]},
-// {"id":"petrpetrov","name":"Petr Petrov","roles":[{"role":"replication_admin"}]}]
-func (c *Client) GetUserRoles() ([]interface{}, error) {
- ret := make([]interface{}, 0, 1)
- err := c.parseURLResponse("/settings/rbac/users", &ret)
- if err != nil {
- return nil, err
- }
-
- // Get the configured administrator.
- // Expected result: {"port":8091,"username":"Administrator"}
- adminInfo := make(map[string]interface{}, 2)
- err = c.parseURLResponse("/settings/web", &adminInfo)
- if err != nil {
- return nil, err
- }
-
- // Create a special entry for the configured administrator.
- adminResult := map[string]interface{}{
- "name": adminInfo["username"],
- "id": adminInfo["username"],
- "domain": "ns_server",
- "roles": []interface{}{
- map[string]interface{}{
- "role": "admin",
- },
- },
- }
-
- // Add the configured administrator to the list of results.
- ret = append(ret, adminResult)
-
- return ret, nil
-}
-
-func (c *Client) GetUserInfoAll() ([]User, error) {
- ret := make([]User, 0, 16)
- err := c.parseURLResponse("/settings/rbac/users", &ret)
- if err != nil {
- return nil, err
- }
- return ret, nil
-}
-
-func rolesToParamFormat(roles []Role) string {
- var buffer bytes.Buffer
- for i, role := range roles {
- if i > 0 {
- buffer.WriteString(",")
- }
- buffer.WriteString(role.Role)
- if role.BucketName != "" {
- buffer.WriteString("[")
- buffer.WriteString(role.BucketName)
- buffer.WriteString("]")
- }
- }
- return buffer.String()
-}
-
-func (c *Client) PutUserInfo(u *User) error {
- params := map[string]interface{}{
- "name": u.Name,
- "roles": rolesToParamFormat(u.Roles),
- }
- var target string
- switch u.Domain {
- case "external":
- target = "/settings/rbac/users/" + u.Id
- case "local":
- target = "/settings/rbac/users/local/" + u.Id
- default:
- return fmt.Errorf("Unknown user type: %s", u.Domain)
- }
- var ret string // PUT returns an empty string. We ignore it.
- err := c.parsePutURLResponse(target, params, &ret)
- return err
-}
-
-func (c *Client) GetRolesAll() ([]RoleDescription, error) {
- ret := make([]RoleDescription, 0, 32)
- err := c.parseURLResponse("/settings/rbac/roles", &ret)
- if err != nil {
- return nil, err
- }
- return ret, nil
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/util.go b/vendor/github.com/couchbaselabs/go-couchbase/util.go
deleted file mode 100644
index 4d286a3271..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/util.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package couchbase
-
-import (
- "fmt"
- "net/url"
- "strings"
-)
-
-// CleanupHost returns the hostname with the given suffix removed.
-func CleanupHost(h, commonSuffix string) string {
- if strings.HasSuffix(h, commonSuffix) {
- return h[:len(h)-len(commonSuffix)]
- }
- return h
-}
-
-// FindCommonSuffix returns the longest common suffix from the given
-// strings.
-func FindCommonSuffix(input []string) string {
- rv := ""
- if len(input) < 2 {
- return ""
- }
- from := input
- for i := len(input[0]); i > 0; i-- {
- common := true
- suffix := input[0][i:]
- for _, s := range from {
- if !strings.HasSuffix(s, suffix) {
- common = false
- break
- }
- }
- if common {
- rv = suffix
- }
- }
- return rv
-}
-
-// ParseURL is a wrapper around url.Parse with some sanity-checking
-func ParseURL(urlStr string) (result *url.URL, err error) {
- result, err = url.Parse(urlStr)
- if result != nil && result.Scheme == "" {
- result = nil
- err = fmt.Errorf("invalid URL <%s>", urlStr)
- }
- return
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go b/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go
deleted file mode 100644
index b96a18ed57..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go
+++ /dev/null
@@ -1,77 +0,0 @@
-package couchbase
-
-var crc32tab = []uint32{
- 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
- 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
- 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
- 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
- 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
- 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
- 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
- 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
- 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
- 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
- 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
- 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
- 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
- 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
- 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
- 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
- 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
- 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
- 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
- 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
- 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
- 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
- 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
- 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
- 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
- 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
- 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
- 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
- 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
- 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
- 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
- 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
- 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
- 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
- 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
- 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
- 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
- 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
- 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
- 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
- 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
- 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
- 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
- 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
- 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
- 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
- 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
- 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
- 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
- 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
- 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
- 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
- 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
- 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
- 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
- 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
- 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
- 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
- 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
- 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
- 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
- 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
- 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
- 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d}
-
-// VBHash finds the vbucket for the given key.
-func (b *Bucket) VBHash(key string) uint32 {
- crc := uint32(0xffffffff)
- for x := 0; x < len(key); x++ {
- crc = (crc >> 8) ^ crc32tab[(uint64(crc)^uint64(key[x]))&0xff]
- }
- vbm := b.VBServerMap()
- return ((^crc) >> 16) & 0x7fff & (uint32(len(vbm.VBucketMap)) - 1)
-}
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/views.go b/vendor/github.com/couchbaselabs/go-couchbase/views.go
deleted file mode 100644
index 2f68642f5a..0000000000
--- a/vendor/github.com/couchbaselabs/go-couchbase/views.go
+++ /dev/null
@@ -1,231 +0,0 @@
-package couchbase
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "math/rand"
- "net/http"
- "net/url"
- "time"
-)
-
-// ViewRow represents a single result from a view.
-//
-// Doc is present only if include_docs was set on the request.
-type ViewRow struct {
- ID string
- Key interface{}
- Value interface{}
- Doc *interface{}
-}
-
-// A ViewError is a node-specific error indicating a partial failure
-// within a view result.
-type ViewError struct {
- From string
- Reason string
-}
-
-func (ve ViewError) Error() string {
- return "Node: " + ve.From + ", reason: " + ve.Reason
-}
-
-// ViewResult holds the entire result set from a view request,
-// including the rows and the errors.
-type ViewResult struct {
- TotalRows int `json:"total_rows"`
- Rows []ViewRow
- Errors []ViewError
-}
-
-func (b *Bucket) randomBaseURL() (*url.URL, error) {
- nodes := b.HealthyNodes()
- if len(nodes) == 0 {
- return nil, errors.New("no available couch rest URLs")
- }
- nodeNo := rand.Intn(len(nodes))
- node := nodes[nodeNo]
-
- b.RLock()
- name := b.Name
- pool := b.pool
- b.RUnlock()
-
- u, err := ParseURL(node.CouchAPIBase)
- if err != nil {
- return nil, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
- name, nodeNo, node.CouchAPIBase, err)
- } else if pool != nil {
- u.User = pool.client.BaseURL.User
- }
- return u, err
-}
-
-const START_NODE_ID = -1
-
-func (b *Bucket) randomNextURL(lastNode int) (*url.URL, int, error) {
- nodes := b.HealthyNodes()
- if len(nodes) == 0 {
- return nil, -1, errors.New("no available couch rest URLs")
- }
-
- var nodeNo int
- if lastNode == START_NODE_ID || lastNode >= len(nodes) {
- // randomly select a node if the value of lastNode is invalid
- nodeNo = rand.Intn(len(nodes))
- } else {
- // wrap around the node list
- nodeNo = (lastNode + 1) % len(nodes)
- }
-
- b.RLock()
- name := b.Name
- pool := b.pool
- b.RUnlock()
-
- node := nodes[nodeNo]
- u, err := ParseURL(node.CouchAPIBase)
- if err != nil {
- return nil, -1, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
- name, nodeNo, node.CouchAPIBase, err)
- } else if pool != nil {
- u.User = pool.client.BaseURL.User
- }
- return u, nodeNo, err
-}
-
-// DocID is the document ID type for the startkey_docid parameter in
-// views.
-type DocID string
-
-func qParam(k, v string) string {
- format := `"%s"`
- switch k {
- case "startkey_docid", "endkey_docid", "stale":
- format = "%s"
- }
- return fmt.Sprintf(format, v)
-}
-
-// ViewURL constructs a URL for a view with the given ddoc, view name,
-// and parameters.
-func (b *Bucket) ViewURL(ddoc, name string,
- params map[string]interface{}) (string, error) {
- u, err := b.randomBaseURL()
- if err != nil {
- return "", err
- }
-
- values := url.Values{}
- for k, v := range params {
- switch t := v.(type) {
- case DocID:
- values[k] = []string{string(t)}
- case string:
- values[k] = []string{qParam(k, t)}
- case int:
- values[k] = []string{fmt.Sprintf(`%d`, t)}
- case bool:
- values[k] = []string{fmt.Sprintf(`%v`, t)}
- default:
- b, err := json.Marshal(v)
- if err != nil {
- return "", fmt.Errorf("unsupported value-type %T in Query, "+
- "json encoder said %v", t, err)
- }
- values[k] = []string{fmt.Sprintf(`%v`, string(b))}
- }
- }
-
- if ddoc == "" && name == "_all_docs" {
- u.Path = fmt.Sprintf("/%s/_all_docs", b.GetName())
- } else {
- u.Path = fmt.Sprintf("/%s/_design/%s/_view/%s", b.GetName(), ddoc, name)
- }
- u.RawQuery = values.Encode()
-
- return u.String(), nil
-}
-
-// ViewCallback is called for each view invocation.
-var ViewCallback func(ddoc, name string, start time.Time, err error)
-
-// ViewCustom performs a view request that can map row values to a
-// custom type.
-//
-// See the source to View for an example usage.
-func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{},
- vres interface{}) (err error) {
- if SlowServerCallWarningThreshold > 0 {
- defer slowLog(time.Now(), "call to ViewCustom(%q, %q)", ddoc, name)
- }
-
- if ViewCallback != nil {
- defer func(t time.Time) { ViewCallback(ddoc, name, t, err) }(time.Now())
- }
-
- u, err := b.ViewURL(ddoc, name, params)
- if err != nil {
- return err
- }
-
- req, err := http.NewRequest("GET", u, nil)
- if err != nil {
- return err
- }
-
- ah := b.authHandler(false /* bucket not yet locked */)
- maybeAddAuth(req, ah)
-
- res, err := doHTTPRequest(req)
- if err != nil {
- return fmt.Errorf("error starting view req at %v: %v", u, err)
- }
- defer res.Body.Close()
-
- if res.StatusCode != 200 {
- bod := make([]byte, 512)
- l, _ := res.Body.Read(bod)
- return fmt.Errorf("error executing view req at %v: %v - %s",
- u, res.Status, bod[:l])
- }
-
- body, err := ioutil.ReadAll(res.Body)
- if err := json.Unmarshal(body, vres); err != nil {
- return nil
- }
-
- return nil
-}
-
-// View executes a view.
-//
-// The ddoc parameter is just the bare name of your design doc without
-// the "_design/" prefix.
-//
-// Parameters are string keys with values that correspond to couchbase
-// view parameters. Primitive should work fairly naturally (booleans,
-// ints, strings, etc...) and other values will attempt to be JSON
-// marshaled (useful for array indexing on on view keys, for example).
-//
-// Example:
-//
-// res, err := couchbase.View("myddoc", "myview", map[string]interface{}{
-// "group_level": 2,
-// "startkey_docid": []interface{}{"thing"},
-// "endkey_docid": []interface{}{"thing", map[string]string{}},
-// "stale": false,
-// })
-func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error) {
- vres := ViewResult{}
-
- if err := b.ViewCustom(ddoc, name, params, &vres); err != nil {
- //error in accessing views. Retry once after a bucket refresh
- b.Refresh()
- return vres, b.ViewCustom(ddoc, name, params, &vres)
- } else {
- return vres, nil
- }
-}