diff options
author | 6543 <6543@obermui.de> | 2020-11-03 07:04:09 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-03 08:04:09 +0200 |
commit | 70ea2300ca24311f033d85b41e938e86b1d50acd (patch) | |
tree | eb25c089d5def4df2c036ab2820df7c895798572 /vendor/github.com/couchbaselabs | |
parent | b687707014b31d0f388d1dfb60c09b5dcd48fc4c (diff) | |
download | gitea-70ea2300ca24311f033d85b41e938e86b1d50acd.tar.gz gitea-70ea2300ca24311f033d85b41e938e86b1d50acd.zip |
[Vendor] update macaron related (#13409)
* Vendor: update gitea.com/macaron/session to a177a270
* make vendor
* Vendor: update gitea.com/macaron/macaron to 0db5d458
* make vendor
* Vendor: update gitea.com/macaron/cache to 905232fb
* make vendor
* Vendor: update gitea.com/macaron/i18n to 4ca3dd0c
* make vendor
* Vendor: update gitea.com/macaron/gzip to efa5e847
* make vendor
* Vendor: update gitea.com/macaron/captcha to e8597820
* make vendor
Diffstat (limited to 'vendor/github.com/couchbaselabs')
18 files changed, 0 insertions, 5372 deletions
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/.gitignore b/vendor/github.com/couchbaselabs/go-couchbase/.gitignore deleted file mode 100644 index eda885ce8d..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/.gitignore +++ /dev/null @@ -1,14 +0,0 @@ -#* -*.6 -*.a -*~ -*.swp -/examples/basic/basic -/hello/hello -/populate/populate -/tools/view2go/view2go -/tools/loadfile/loadfile -gotags.files -TAGS -6.out -_* diff --git a/vendor/github.com/couchbaselabs/go-couchbase/.travis.yml b/vendor/github.com/couchbaselabs/go-couchbase/.travis.yml deleted file mode 100644 index 4ecafb1894..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/.travis.yml +++ /dev/null @@ -1,5 +0,0 @@ -language: go -install: go get -v -d ./... && go build -v ./... -script: go test -v ./... - -go: 1.1.1 diff --git a/vendor/github.com/couchbaselabs/go-couchbase/LICENSE b/vendor/github.com/couchbaselabs/go-couchbase/LICENSE deleted file mode 100644 index 0b23ef358e..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/LICENSE +++ /dev/null @@ -1,19 +0,0 @@ -Copyright (c) 2013 Couchbase, Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/github.com/couchbaselabs/go-couchbase/README.markdown b/vendor/github.com/couchbaselabs/go-couchbase/README.markdown deleted file mode 100644 index bf5fe49421..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/README.markdown +++ /dev/null @@ -1,37 +0,0 @@ -# A smart client for couchbase in go - -This is a *unoffical* version of a Couchbase Golang client. If you are -looking for the *Offical* Couchbase Golang client please see - [CB-go])[https://github.com/couchbaselabs/gocb]. - -This is an evolving package, but does provide a useful interface to a -[couchbase](http://www.couchbase.com/) server including all of the -pool/bucket discovery features, compatible key distribution with other -clients, and vbucket motion awareness so application can continue to -operate during rebalances. - -It also supports view querying with source node randomization so you -don't bang on all one node to do all the work. - -## Install - - go get github.com/couchbase/go-couchbase - -## Example - - c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/") - if err != nil { - log.Fatalf("Error connecting: %v", err) - } - - pool, err := c.GetPool("default") - if err != nil { - log.Fatalf("Error getting pool: %v", err) - } - - bucket, err := pool.GetBucket("default") - if err != nil { - log.Fatalf("Error getting bucket: %v", err) - } - - bucket.Set("someKey", 0, []string{"an", "example", "list"}) diff --git a/vendor/github.com/couchbaselabs/go-couchbase/audit.go b/vendor/github.com/couchbaselabs/go-couchbase/audit.go deleted file mode 100644 index 3db7d9f9ff..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/audit.go +++ /dev/null @@ -1,32 +0,0 @@ -package couchbase - -import () - -// Sample data: -// {"disabled":["12333", "22244"],"uid":"132492431","auditdEnabled":true, -// "disabledUsers":[{"name":"bill","domain":"local"},{"name":"bob","domain":"local"}], -// "logPath":"/Users/johanlarson/Library/Application Support/Couchbase/var/lib/couchbase/logs", -// "rotateInterval":86400,"rotateSize":20971520} -type AuditSpec struct { - Disabled []uint32 `json:"disabled"` - Uid string `json:"uid"` - AuditdEnabled bool `json:"auditdEnabled` - DisabledUsers []AuditUser `json:"disabledUsers"` - LogPath string `json:"logPath"` - RotateInterval int64 `json:"rotateInterval"` - RotateSize int64 `json:"rotateSize"` -} - -type AuditUser struct { - Name string `json:"name"` - Domain string `json:"domain"` -} - -func (c *Client) GetAuditSpec() (*AuditSpec, error) { - ret := &AuditSpec{} - err := c.parseURLResponse("/settings/audit", ret) - if err != nil { - return nil, err - } - return ret, nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/client.go b/vendor/github.com/couchbaselabs/go-couchbase/client.go deleted file mode 100644 index 433b08ff02..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/client.go +++ /dev/null @@ -1,1477 +0,0 @@ -/* -Package couchbase provides a smart client for go. - -Usage: - - client, err := couchbase.Connect("http://myserver:8091/") - handleError(err) - pool, err := client.GetPool("default") - handleError(err) - bucket, err := pool.GetBucket("MyAwesomeBucket") - handleError(err) - ... - -or a shortcut for the bucket directly - - bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default") - -in any case, you can specify authentication credentials using -standard URL userinfo syntax: - - b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/", - "default", "bucket") -*/ -package couchbase - -import ( - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "io" - "runtime" - "strconv" - "strings" - "sync" - "time" - "unsafe" - - "github.com/couchbase/gomemcached" - "github.com/couchbase/gomemcached/client" // package name is 'memcached' - "github.com/couchbase/goutils/logging" -) - -// Mutation Token -type MutationToken struct { - VBid uint16 // vbucket id - Guard uint64 // vbuuid - Value uint64 // sequence number -} - -// Maximum number of times to retry a chunk of a bulk get on error. -var MaxBulkRetries = 5000 -var backOffDuration time.Duration = 100 * time.Millisecond -var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s) - -// If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call -// takes longer than that. -var SlowServerCallWarningThreshold time.Duration - -func slowLog(startTime time.Time, format string, args ...interface{}) { - if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold { - pc, _, _, _ := runtime.Caller(2) - caller := runtime.FuncForPC(pc).Name() - logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...) - } -} - -// Return true if error is KEY_ENOENT. Required by cbq-engine -func IsKeyEExistsError(err error) bool { - - res, ok := err.(*gomemcached.MCResponse) - if ok && res.Status == gomemcached.KEY_EEXISTS { - return true - } - - return false -} - -// Return true if error is KEY_ENOENT. Required by cbq-engine -func IsKeyNoEntError(err error) bool { - - res, ok := err.(*gomemcached.MCResponse) - if ok && res.Status == gomemcached.KEY_ENOENT { - return true - } - - return false -} - -// Return true if error suggests a bucket refresh is required. Required by cbq-engine -func IsRefreshRequired(err error) bool { - - res, ok := err.(*gomemcached.MCResponse) - if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) { - return true - } - - return false -} - -// ClientOpCallback is called for each invocation of Do. -var ClientOpCallback func(opname, k string, start time.Time, err error) - -// Do executes a function on a memcached connection to the node owning key "k" -// -// Note that this automatically handles transient errors by replaying -// your function on a "not-my-vbucket" error, so don't assume -// your command will only be executed only once. -func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) { - return b.Do2(k, f, true) -} - -func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) { - if SlowServerCallWarningThreshold > 0 { - defer slowLog(time.Now(), "call to Do(%q)", k) - } - - vb := b.VBHash(k) - maxTries := len(b.Nodes()) * 2 - for i := 0; i < maxTries; i++ { - conn, pool, err := b.getConnectionToVBucket(vb) - if err != nil { - if isConnError(err) && backOff(i, maxTries, backOffDuration, true) { - b.Refresh() - continue - } - return err - } - - if deadline && DefaultTimeout > 0 { - conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout)) - err = f(conn, uint16(vb)) - conn.SetDeadline(noDeadline) - } else { - err = f(conn, uint16(vb)) - } - - var retry bool - discard := isOutOfBoundsError(err) - - // MB-30967 / MB-31001 implement back off for transient errors - if resp, ok := err.(*gomemcached.MCResponse); ok { - switch resp.Status { - case gomemcached.NOT_MY_VBUCKET: - b.Refresh() - // MB-28842: in case of NMVB, check if the node is still part of the map - // and ditch the connection if it isn't. - discard = b.checkVBmap(pool.Node()) - retry = true - case gomemcached.NOT_SUPPORTED: - discard = true - retry = true - case gomemcached.ENOMEM: - fallthrough - case gomemcached.TMPFAIL: - retry = backOff(i, maxTries, backOffDuration, true) - default: - retry = false - } - } else if err != nil && isConnError(err) && backOff(i, maxTries, backOffDuration, true) { - retry = true - } - - if discard { - pool.Discard(conn) - } else { - pool.Return(conn) - } - - if !retry { - return err - } - } - - return fmt.Errorf("unable to complete action after %v attemps", maxTries) -} - -type GatheredStats struct { - Server string - Stats map[string]string - Err error -} - -func getStatsParallel(sn string, b *Bucket, offset int, which string, - ch chan<- GatheredStats) { - pool := b.getConnPool(offset) - var gatheredStats GatheredStats - - conn, err := pool.Get() - defer func() { - pool.Return(conn) - ch <- gatheredStats - }() - - if err != nil { - gatheredStats = GatheredStats{Server: sn, Err: err} - } else { - sm, err := conn.StatsMap(which) - gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err} - } -} - -// GetStats gets a set of stats from all servers. -// -// Returns a map of server ID -> map of stat key to map value. -func (b *Bucket) GetStats(which string) map[string]map[string]string { - rv := map[string]map[string]string{} - for server, gs := range b.GatherStats(which) { - if len(gs.Stats) > 0 { - rv[server] = gs.Stats - } - } - return rv -} - -// GatherStats returns a map of server ID -> GatheredStats from all servers. -func (b *Bucket) GatherStats(which string) map[string]GatheredStats { - vsm := b.VBServerMap() - if vsm.ServerList == nil { - return nil - } - - // Go grab all the things at once. - ch := make(chan GatheredStats, len(vsm.ServerList)) - for i, sn := range vsm.ServerList { - go getStatsParallel(sn, b, i, which, ch) - } - - // Gather the results - rv := map[string]GatheredStats{} - for range vsm.ServerList { - gs := <-ch - rv[gs.Server] = gs - } - return rv -} - -// Get bucket count through the bucket stats -func (b *Bucket) GetCount(refresh bool) (count int64, err error) { - if refresh { - b.Refresh() - } - - var cnt int64 - for _, gs := range b.GatherStats("") { - if len(gs.Stats) > 0 { - cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64) - if err != nil { - return 0, err - } - count += cnt - } - } - - return count, nil -} - -// Get bucket document size through the bucket stats -func (b *Bucket) GetSize(refresh bool) (size int64, err error) { - if refresh { - b.Refresh() - } - - var sz int64 - for _, gs := range b.GatherStats("") { - if len(gs.Stats) > 0 { - sz, err = strconv.ParseInt(gs.Stats["ep_value_size"], 10, 64) - if err != nil { - return 0, err - } - size += sz - } - } - - return size, nil -} - -func isAuthError(err error) bool { - estr := err.Error() - return strings.Contains(estr, "Auth failure") -} - -func IsReadTimeOutError(err error) bool { - estr := err.Error() - return strings.Contains(estr, "read tcp") || - strings.Contains(estr, "i/o timeout") -} - -func isTimeoutError(err error) bool { - estr := err.Error() - return strings.Contains(estr, "i/o timeout") || - strings.Contains(estr, "connection timed out") || - strings.Contains(estr, "no route to host") -} - -// Errors that are not considered fatal for our fetch loop -func isConnError(err error) bool { - if err == io.EOF { - return true - } - estr := err.Error() - return strings.Contains(estr, "broken pipe") || - strings.Contains(estr, "connection reset") || - strings.Contains(estr, "connection refused") || - strings.Contains(estr, "connection pool is closed") -} - -func isOutOfBoundsError(err error) bool { - return err != nil && strings.Contains(err.Error(), "Out of Bounds error") - -} - -func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time { - if reqDeadline.IsZero() && duration > 0 { - return time.Now().Add(duration) - } - return reqDeadline -} - -func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool { - if attempt < maxAttempts { - // 0th attempt return immediately - if attempt > 0 { - if exponential { - duration = time.Duration(attempt) * duration - } - time.Sleep(duration) - } - return true - } - - return false -} - -func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time, - ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string, - eStatus *errorStatus) { - if SlowServerCallWarningThreshold > 0 { - defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys)) - } - - rv := _STRING_MCRESPONSE_POOL.Get() - attempts := 0 - backOffAttempts := 0 - done := false - bname := b.Name - for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ { - - if len(b.VBServerMap().VBucketMap) < int(vb) { - //fatal - err := fmt.Errorf("vbmap smaller than requested for %v", bname) - logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap)) - ech <- err - return - } - - masterID := b.VBServerMap().VBucketMap[vb][0] - - if masterID < 0 { - // fatal - err := fmt.Errorf("No master node available for %v vb %d", bname, vb) - logging.Errorf("%v", err.Error()) - ech <- err - return - } - - // This stack frame exists to ensure we can clean up - // connection at a reasonable time. - err := func() error { - pool := b.getConnPool(masterID) - conn, err := pool.Get() - if err != nil { - if isAuthError(err) || isTimeoutError(err) { - logging.Errorf("Fatal Error %v : %v", bname, err) - ech <- err - return err - } else if isConnError(err) { - if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { - logging.Errorf("Connection Error %v : %v", bname, err) - ech <- err - return err - } - b.Refresh() - backOffAttempts++ - } - logging.Infof("Pool Get returned %v: %v", bname, err) - // retry - return nil - } - - conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) - err = conn.GetBulk(vb, keys, rv, subPaths) - conn.SetDeadline(noDeadline) - - discard := false - defer func() { - if discard { - pool.Discard(conn) - } else { - pool.Return(conn) - } - }() - - switch err.(type) { - case *gomemcached.MCResponse: - notSMaxTries := len(b.Nodes()) * 2 - st := err.(*gomemcached.MCResponse).Status - if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) { - b.Refresh() - discard = b.checkVBmap(pool.Node()) - return nil // retry - } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED { - if (attempts % (MaxBulkRetries / 100)) == 0 { - logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)", - err.Error(), bname, vb, keys) - } - return nil // retry - } else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { - // MB-30967 / MB-31001 use backoff for TMPFAIL too - backOffAttempts++ - logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)", - err.Error(), bname, vb, keys) - return nil // retry - } - ech <- err - return err - case error: - if isOutOfBoundsError(err) { - // We got an out of bound error, retry the operation - discard = true - return nil - } else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { - backOffAttempts++ - logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)", - err.Error(), bname, vb, keys) - discard = true - b.Refresh() - return nil // retry - } - ech <- err - ch <- rv - return err - } - - done = true - return nil - }() - - if err != nil { - return - } - } - - if attempts >= MaxBulkRetries { - err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys) - logging.Errorf("%v", err.Error()) - ech <- err - } - - ch <- rv -} - -type errorStatus struct { - errStatus bool -} - -type vbBulkGet struct { - b *Bucket - ch chan<- map[string]*gomemcached.MCResponse - ech chan<- error - k uint16 - keys []string - reqDeadline time.Time - wg *sync.WaitGroup - subPaths []string - groupError *errorStatus -} - -const _NUM_CHANNELS = 5 - -var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2 -var DefaultDialTimeout = time.Duration(0) -var DefaultTimeout = time.Duration(0) -var noDeadline = time.Time{} - -// Buffer 4k requests per worker -var _VB_BULK_GET_CHANNELS []chan *vbBulkGet - -func InitBulkGet() { - - DefaultDialTimeout = 20 * time.Second - DefaultTimeout = 120 * time.Second - - memcached.SetDefaultDialTimeout(DefaultDialTimeout) - - _VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS) - - for i := 0; i < _NUM_CHANNELS; i++ { - channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS) - _VB_BULK_GET_CHANNELS[i] = channel - - for j := 0; j < _NUM_CHANNEL_WORKERS; j++ { - go vbBulkGetWorker(channel) - } - } -} - -func vbBulkGetWorker(ch chan *vbBulkGet) { - defer func() { - // Workers cannot panic and die - recover() - go vbBulkGetWorker(ch) - }() - - for vbg := range ch { - vbDoBulkGet(vbg) - } -} - -func vbDoBulkGet(vbg *vbBulkGet) { - defer vbg.wg.Done() - defer func() { - // Workers cannot panic and die - recover() - }() - vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError) -} - -var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.") - -func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time, - ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string, - eStatus *errorStatus) { - - defer close(ch) - defer close(ech) - - wg := &sync.WaitGroup{} - - for k, keys := range kdm { - - // GetBulk() group has error donot Queue items for this group - if eStatus.errStatus { - break - } - - vbg := &vbBulkGet{ - b: b, - ch: ch, - ech: ech, - k: k, - keys: keys, - reqDeadline: reqDeadline, - wg: wg, - subPaths: subPaths, - groupError: eStatus, - } - - wg.Add(1) - - // Random int - // Right shift to avoid 8-byte alignment, and take low bits - c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS - - select { - case _VB_BULK_GET_CHANNELS[c] <- vbg: - // No-op - default: - // Buffer full, abandon the bulk get - ech <- _ERR_CHAN_FULL - wg.Add(-1) - } - } - - // Wait for my vb bulk gets - wg.Wait() -} - -type multiError []error - -func (m multiError) Error() string { - if len(m) == 0 { - panic("Error of none") - } - - return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error()) -} - -// Convert a stream of errors from ech into a multiError (or nil) and -// send down eout. -// -// At least one send is guaranteed on eout, but two is possible, so -// buffer the out channel appropriately. -func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) { - defer func() { eout <- nil }() - var errs multiError - for e := range ech { - if !eStatus.errStatus && !IsKeyNoEntError(e) { - eStatus.errStatus = true - } - - errs = append(errs, e) - } - - if len(errs) > 0 { - eout <- errs - } -} - -// Fetches multiple keys concurrently, with []byte values -// -// This is a wrapper around GetBulk which converts all values returned -// by GetBulk from raw memcached responses into []byte slices. -// Returns one document for duplicate keys -func (b *Bucket) GetBulkRaw(keys []string) (map[string][]byte, error) { - - resp, eout := b.getBulk(keys, noDeadline, nil) - - rv := make(map[string][]byte, len(keys)) - for k, av := range resp { - rv[k] = av.Body - } - - b.ReleaseGetBulkPools(resp) - return rv, eout - -} - -// GetBulk fetches multiple keys concurrently. -// -// Unlike more convenient GETs, the entire response is returned in the -// map array for each key. Keys that were not found will not be included in -// the map. - -func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) { - return b.getBulk(keys, reqDeadline, subPaths) -} - -func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) { - _STRING_MCRESPONSE_POOL.Put(rv) -} - -func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) { - kdm := _VB_STRING_POOL.Get() - defer _VB_STRING_POOL.Put(kdm) - for _, k := range keys { - if k != "" { - vb := uint16(b.VBHash(k)) - a, ok1 := kdm[vb] - if !ok1 { - a = _STRING_POOL.Get() - } - kdm[vb] = append(a, k) - } - } - - eout := make(chan error, 2) - groupErrorStatus := &errorStatus{} - - // processBulkGet will own both of these channels and - // guarantee they're closed before it returns. - ch := make(chan map[string]*gomemcached.MCResponse) - ech := make(chan error) - - go errorCollector(ech, eout, groupErrorStatus) - go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus) - - var rv map[string]*gomemcached.MCResponse - - for m := range ch { - if rv == nil { - rv = m - continue - } - - for k, v := range m { - rv[k] = v - } - _STRING_MCRESPONSE_POOL.Put(m) - } - - return rv, <-eout -} - -// WriteOptions is the set of option flags availble for the Write -// method. They are ORed together to specify the desired request. -type WriteOptions int - -const ( - // Raw specifies that the value is raw []byte or nil; don't - // JSON-encode it. - Raw = WriteOptions(1 << iota) - // AddOnly indicates an item should only be written if it - // doesn't exist, otherwise ErrKeyExists is returned. - AddOnly - // Persist causes the operation to block until the server - // confirms the item is persisted. - Persist - // Indexable causes the operation to block until it's availble via the index. - Indexable - // Append indicates the given value should be appended to the - // existing value for the given key. - Append -) - -var optNames = []struct { - opt WriteOptions - name string -}{ - {Raw, "raw"}, - {AddOnly, "addonly"}, {Persist, "persist"}, - {Indexable, "indexable"}, {Append, "append"}, -} - -// String representation of WriteOptions -func (w WriteOptions) String() string { - f := []string{} - for _, on := range optNames { - if w&on.opt != 0 { - f = append(f, on.name) - w &= ^on.opt - } - } - if len(f) == 0 || w != 0 { - f = append(f, fmt.Sprintf("0x%x", int(w))) - } - return strings.Join(f, "|") -} - -// Error returned from Write with AddOnly flag, when key already exists in the bucket. -var ErrKeyExists = errors.New("key exists") - -// General-purpose value setter. -// -// The Set, Add and Delete methods are just wrappers around this. The -// interpretation of `v` depends on whether the `Raw` option is -// given. If it is, v must be a byte array or nil. (A nil value causes -// a delete.) If `Raw` is not given, `v` will be marshaled as JSON -// before being written. It must be JSON-marshalable and it must not -// be nil. -func (b *Bucket) Write(k string, flags, exp int, v interface{}, - opt WriteOptions) (err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { - ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) - }(time.Now()) - } - - var data []byte - if opt&Raw == 0 { - data, err = json.Marshal(v) - if err != nil { - return err - } - } else if v != nil { - data = v.([]byte) - } - - var res *gomemcached.MCResponse - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - if opt&AddOnly != 0 { - res, err = memcached.UnwrapMemcachedError( - mc.Add(vb, k, flags, exp, data)) - if err == nil && res.Status != gomemcached.SUCCESS { - if res.Status == gomemcached.KEY_EEXISTS { - err = ErrKeyExists - } else { - err = res - } - } - } else if opt&Append != 0 { - res, err = mc.Append(vb, k, data) - } else if data == nil { - res, err = mc.Del(vb, k) - } else { - res, err = mc.Set(vb, k, flags, exp, data) - } - - return err - }) - - if err == nil && (opt&(Persist|Indexable) != 0) { - err = b.WaitForPersistence(k, res.Cas, data == nil) - } - - return err -} - -func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{}, - opt WriteOptions) (mt *MutationToken, err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { - ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err) - }(time.Now()) - } - - var data []byte - if opt&Raw == 0 { - data, err = json.Marshal(v) - if err != nil { - return nil, err - } - } else if v != nil { - data = v.([]byte) - } - - var res *gomemcached.MCResponse - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - if opt&AddOnly != 0 { - res, err = memcached.UnwrapMemcachedError( - mc.Add(vb, k, flags, exp, data)) - if err == nil && res.Status != gomemcached.SUCCESS { - if res.Status == gomemcached.KEY_EEXISTS { - err = ErrKeyExists - } else { - err = res - } - } - } else if opt&Append != 0 { - res, err = mc.Append(vb, k, data) - } else if data == nil { - res, err = mc.Del(vb, k) - } else { - res, err = mc.Set(vb, k, flags, exp, data) - } - - if len(res.Extras) >= 16 { - vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8])) - seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16])) - mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo} - } - - return err - }) - - if err == nil && (opt&(Persist|Indexable) != 0) { - err = b.WaitForPersistence(k, res.Cas, data == nil) - } - - return mt, err -} - -// Set a value in this bucket with Cas and return the new Cas value -func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) (uint64, error) { - return b.WriteCas(k, 0, exp, cas, v, 0) -} - -// Set a value in this bucket with Cas without json encoding it -func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) (uint64, error) { - return b.WriteCas(k, 0, exp, cas, v, Raw) -} - -func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{}, - opt WriteOptions) (newCas uint64, err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { - ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) - }(time.Now()) - } - - var data []byte - if opt&Raw == 0 { - data, err = json.Marshal(v) - if err != nil { - return 0, err - } - } else if v != nil { - data = v.([]byte) - } - - var res *gomemcached.MCResponse - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err = mc.SetCas(vb, k, flags, exp, cas, data) - return err - }) - - if err == nil && (opt&(Persist|Indexable) != 0) { - err = b.WaitForPersistence(k, res.Cas, data == nil) - } - - return res.Cas, err -} - -// Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard -func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) { - return b.WriteCasWithMT(k, flags, exp, cas, v, 0) -} - -func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) { - return b.WriteCasWithMT(k, flags, exp, cas, v, Raw) -} - -func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{}, - opt WriteOptions) (newCas uint64, mt *MutationToken, err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { - ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) - }(time.Now()) - } - - var data []byte - if opt&Raw == 0 { - data, err = json.Marshal(v) - if err != nil { - return 0, nil, err - } - } else if v != nil { - data = v.([]byte) - } - - var res *gomemcached.MCResponse - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err = mc.SetCas(vb, k, flags, exp, cas, data) - return err - }) - - if err != nil { - return 0, nil, err - } - - // check for extras - if len(res.Extras) >= 16 { - vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8])) - seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16])) - vb := b.VBHash(k) - mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo} - } - - if err == nil && (opt&(Persist|Indexable) != 0) { - err = b.WaitForPersistence(k, res.Cas, data == nil) - } - - return res.Cas, mt, err -} - -// Set a value in this bucket. -// The value will be serialized into a JSON document. -func (b *Bucket) Set(k string, exp int, v interface{}) error { - return b.Write(k, 0, exp, v, 0) -} - -// Set a value in this bucket with with flags -func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}) (*MutationToken, error) { - return b.WriteWithMT(k, flags, exp, v, 0) -} - -// SetRaw sets a value in this bucket without JSON encoding it. -func (b *Bucket) SetRaw(k string, exp int, v []byte) error { - return b.Write(k, 0, exp, v, Raw) -} - -// Add adds a value to this bucket; like Set except that nothing -// happens if the key exists. The value will be serialized into a -// JSON document. -func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error) { - err = b.Write(k, 0, exp, v, AddOnly) - if err == ErrKeyExists { - return false, nil - } - return (err == nil), err -} - -// AddRaw adds a value to this bucket; like SetRaw except that nothing -// happens if the key exists. The value will be stored as raw bytes. -func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error) { - err = b.Write(k, 0, exp, v, AddOnly|Raw) - if err == ErrKeyExists { - return false, nil - } - return (err == nil), err -} - -// Add adds a value to this bucket; like Set except that nothing -// happens if the key exists. The value will be serialized into a -// JSON document. -func (b *Bucket) AddWithMT(k string, exp int, v interface{}) (added bool, mt *MutationToken, err error) { - mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly) - if err == ErrKeyExists { - return false, mt, nil - } - return (err == nil), mt, err -} - -// AddRaw adds a value to this bucket; like SetRaw except that nothing -// happens if the key exists. The value will be stored as raw bytes. -func (b *Bucket) AddRawWithMT(k string, exp int, v []byte) (added bool, mt *MutationToken, err error) { - mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw) - if err == ErrKeyExists { - return false, mt, nil - } - return (err == nil), mt, err -} - -// Append appends raw data to an existing item. -func (b *Bucket) Append(k string, data []byte) error { - return b.Write(k, 0, 0, data, Append|Raw) -} - -func (b *Bucket) GetsMCFromCollection(collUid uint32, key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) { - var err error - var response *gomemcached.MCResponse - - if key == "" { - return nil, nil - } - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetsMCFromCollection", key, t, err) }(time.Now()) - } - - err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { - var err1 error - - mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) - _, err1 = mc.SelectBucket(b.Name) - if err1 != nil { - mc.SetDeadline(noDeadline) - return err1 - } - - mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) - response, err1 = mc.GetFromCollection(vb, collUid, key) - if err1 != nil { - mc.SetDeadline(noDeadline) - return err1 - } - - return nil - }, false) - - return response, err -} - -// Returns collectionUid, manifestUid, error. -func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline time.Time) (uint32, uint32, error) { - var err error - var response *gomemcached.MCResponse - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetCollectionCID", scope+"."+collection, t, err) }(time.Now()) - } - - var key = "DUMMY" // Contact any server. - var manifestUid uint32 - var collUid uint32 - err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { - var err1 error - - mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) - _, err1 = mc.SelectBucket(b.Name) - if err1 != nil { - mc.SetDeadline(noDeadline) - return err1 - } - - response, err1 = mc.CollectionsGetCID(scope, collection) - if err1 != nil { - mc.SetDeadline(noDeadline) - return err1 - } - - manifestUid = binary.BigEndian.Uint32(response.Extras[4:8]) - collUid = binary.BigEndian.Uint32(response.Extras[8:12]) - - return nil - }, false) - - return collUid, manifestUid, err -} - -// Get a value straight from Memcached -func (b *Bucket) GetsMC(key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) { - var err error - var response *gomemcached.MCResponse - - if key == "" { - return nil, nil - } - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now()) - } - - err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { - var err1 error - - mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) - response, err1 = mc.Get(vb, key) - mc.SetDeadline(noDeadline) - if err1 != nil { - return err1 - } - return nil - }, false) - return response, err -} - -// Get a value through the subdoc API -func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string) (*gomemcached.MCResponse, error) { - var err error - var response *gomemcached.MCResponse - - if key == "" { - return nil, nil - } - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now()) - } - - err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { - var err1 error - - mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) - response, err1 = mc.GetSubdoc(vb, key, subPaths) - mc.SetDeadline(noDeadline) - if err1 != nil { - return err1 - } - return nil - }, false) - return response, err -} - -// GetsRaw gets a raw value from this bucket including its CAS -// counter and flags. -func (b *Bucket) GetsRaw(k string) (data []byte, flags int, - cas uint64, err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now()) - } - - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err := mc.Get(vb, k) - if err != nil { - return err - } - cas = res.Cas - if len(res.Extras) >= 4 { - flags = int(binary.BigEndian.Uint32(res.Extras)) - } - data = res.Body - return nil - }) - return -} - -// Gets gets a value from this bucket, including its CAS counter. The -// value is expected to be a JSON stream and will be deserialized into -// rv. -func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error { - data, _, cas, err := b.GetsRaw(k) - if err != nil { - return err - } - if caso != nil { - *caso = cas - } - return json.Unmarshal(data, rv) -} - -// Get a value from this bucket. -// The value is expected to be a JSON stream and will be deserialized -// into rv. -func (b *Bucket) Get(k string, rv interface{}) error { - return b.Gets(k, rv, nil) -} - -// GetRaw gets a raw value from this bucket. No marshaling is performed. -func (b *Bucket) GetRaw(k string) ([]byte, error) { - d, _, _, err := b.GetsRaw(k) - return d, err -} - -// GetAndTouchRaw gets a raw value from this bucket including its CAS -// counter and flags, and updates the expiry on the doc. -func (b *Bucket) GetAndTouchRaw(k string, exp int) (data []byte, - cas uint64, err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now()) - } - - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err := mc.GetAndTouch(vb, k, exp) - if err != nil { - return err - } - cas = res.Cas - data = res.Body - return nil - }) - return data, cas, err -} - -// GetMeta returns the meta values for a key -func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64) (err error) { - - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now()) - } - - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err := mc.GetMeta(vb, k) - if err != nil { - return err - } - - *cas = res.Cas - if len(res.Extras) >= 8 { - *flags = int(binary.BigEndian.Uint32(res.Extras[4:])) - } - - if len(res.Extras) >= 12 { - *expiry = int(binary.BigEndian.Uint32(res.Extras[8:])) - } - - if len(res.Extras) >= 20 { - *seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:])) - } - - return nil - }) - - return err -} - -// Delete a key from this bucket. -func (b *Bucket) Delete(k string) error { - return b.Write(k, 0, 0, nil, Raw) -} - -// Incr increments the value at a given key by amt and defaults to def if no value present. -func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error) { - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now()) - } - - var rv uint64 - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err := mc.Incr(vb, k, amt, def, exp) - if err != nil { - return err - } - rv = res - return nil - }) - return rv, err -} - -// Decr decrements the value at a given key by amt and defaults to def if no value present -func (b *Bucket) Decr(k string, amt, def uint64, exp int) (val uint64, err error) { - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now()) - } - - var rv uint64 - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - res, err := mc.Decr(vb, k, amt, def, exp) - if err != nil { - return err - } - rv = res - return nil - }) - return rv, err -} - -// Wrapper around memcached.CASNext() -func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool { - if ClientOpCallback != nil { - defer func(t time.Time) { - ClientOpCallback("casNext", k, t, state.Err) - }(time.Now()) - } - - keepGoing := false - state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - keepGoing = mc.CASNext(vb, k, exp, state) - return state.Err - }) - return keepGoing && state.Err == nil -} - -// An UpdateFunc is a callback function to update a document -type UpdateFunc func(current []byte) (updated []byte, err error) - -// Return this as the error from an UpdateFunc to cancel the Update -// operation. -const UpdateCancel = memcached.CASQuit - -// Update performs a Safe update of a document, avoiding conflicts by -// using CAS. -// -// The callback function will be invoked with the current raw document -// contents (or nil if the document doesn't exist); it should return -// the updated raw contents (or nil to delete.) If it decides not to -// change anything it can return UpdateCancel as the error. -// -// If another writer modifies the document between the get and the -// set, the callback will be invoked again with the newer value. -func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error { - _, err := b.update(k, exp, callback) - return err -} - -// internal version of Update that returns a CAS value -func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) { - var state memcached.CASState - for b.casNext(k, exp, &state) { - var err error - if state.Value, err = callback(state.Value); err != nil { - return 0, err - } - } - return state.Cas, state.Err -} - -// A WriteUpdateFunc is a callback function to update a document -type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error) - -// WriteUpdate performs a Safe update of a document, avoiding -// conflicts by using CAS. WriteUpdate is like Update, except that -// the callback can return a set of WriteOptions, of which Persist and -// Indexable are recognized: these cause the call to wait until the -// document update has been persisted to disk and/or become available -// to index. -func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error { - var writeOpts WriteOptions - var deletion bool - // Wrap the callback in an UpdateFunc we can pass to Update: - updateCallback := func(current []byte) (updated []byte, err error) { - update, opt, err := callback(current) - writeOpts = opt - deletion = (update == nil) - return update, err - } - cas, err := b.update(k, exp, updateCallback) - if err != nil { - return err - } - // If callback asked, wait for persistence or indexability: - if writeOpts&(Persist|Indexable) != 0 { - err = b.WaitForPersistence(k, cas, deletion) - } - return err -} - -// Observe observes the current state of a document. -func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) { - if ClientOpCallback != nil { - defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now()) - } - - err = b.Do(k, func(mc *memcached.Client, vb uint16) error { - result, err = mc.Observe(vb, k) - return err - }) - return -} - -// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) -// if the value has been overwritten by another before being persisted. -var ErrOverwritten = errors.New("overwritten") - -// Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) -// if the value hasn't been persisted by the timeout interval -var ErrTimeout = errors.New("timeout") - -// WaitForPersistence waits for an item to be considered durable. -// -// Besides transport errors, ErrOverwritten may be returned if the -// item is overwritten before it reaches durability. ErrTimeout may -// occur if the item isn't found durable in a reasonable amount of -// time. -func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error { - timeout := 10 * time.Second - sleepDelay := 5 * time.Millisecond - start := time.Now() - for { - time.Sleep(sleepDelay) - sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time - - result, err := b.Observe(k) - if err != nil { - return err - } - if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten { - return ErrOverwritten - } else if persisted { - return nil - } - - if result.PersistenceTime > 0 { - timeout = 2 * result.PersistenceTime - } - if time.Since(start) >= timeout-sleepDelay { - return ErrTimeout - } - } -} - -var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16) - -type stringPool struct { - pool *sync.Pool - size int -} - -func newStringPool(size int) *stringPool { - rv := &stringPool{ - pool: &sync.Pool{ - New: func() interface{} { - return make([]string, 0, size) - }, - }, - size: size, - } - - return rv -} - -func (this *stringPool) Get() []string { - return this.pool.Get().([]string) -} - -func (this *stringPool) Put(s []string) { - if s == nil || cap(s) < this.size || cap(s) > 2*this.size { - return - } - - this.pool.Put(s[0:0]) -} - -var _STRING_POOL = newStringPool(16) - -type vbStringPool struct { - pool *sync.Pool - strPool *stringPool -} - -func newVBStringPool(size int, sp *stringPool) *vbStringPool { - rv := &vbStringPool{ - pool: &sync.Pool{ - New: func() interface{} { - return make(map[uint16][]string, size) - }, - }, - strPool: sp, - } - - return rv -} - -func (this *vbStringPool) Get() map[uint16][]string { - return this.pool.Get().(map[uint16][]string) -} - -func (this *vbStringPool) Put(s map[uint16][]string) { - if s == nil { - return - } - - for k, v := range s { - delete(s, k) - this.strPool.Put(v) - } - - this.pool.Put(s) -} - -var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL) diff --git a/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go b/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go deleted file mode 100644 index 23102abd84..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go +++ /dev/null @@ -1,410 +0,0 @@ -package couchbase - -import ( - "crypto/tls" - "errors" - "sync/atomic" - "time" - - "github.com/couchbase/gomemcached" - "github.com/couchbase/gomemcached/client" - "github.com/couchbase/goutils/logging" -) - -// GenericMcdAuthHandler is a kind of AuthHandler that performs -// special auth exchange (like non-standard auth, possibly followed by -// select-bucket). -type GenericMcdAuthHandler interface { - AuthHandler - AuthenticateMemcachedConn(host string, conn *memcached.Client) error -} - -// Error raised when a connection can't be retrieved from a pool. -var TimeoutError = errors.New("timeout waiting to build connection") -var errClosedPool = errors.New("the connection pool is closed") -var errNoPool = errors.New("no connection pool") - -// Default timeout for retrieving a connection from the pool. -var ConnPoolTimeout = time.Hour * 24 * 30 - -// overflow connection closer cycle time -var ConnCloserInterval = time.Second * 30 - -// ConnPoolAvailWaitTime is the amount of time to wait for an existing -// connection from the pool before considering the creation of a new -// one. -var ConnPoolAvailWaitTime = time.Millisecond - -type connectionPool struct { - host string - mkConn func(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error) - auth AuthHandler - connections chan *memcached.Client - createsem chan bool - bailOut chan bool - poolSize int - connCount uint64 - inUse bool - tlsConfig *tls.Config - bucket string -} - -func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int, tlsConfig *tls.Config, bucket string) *connectionPool { - connSize := poolSize - if closer { - connSize += poolOverflow - } - rv := &connectionPool{ - host: host, - connections: make(chan *memcached.Client, connSize), - createsem: make(chan bool, poolSize+poolOverflow), - mkConn: defaultMkConn, - auth: ah, - poolSize: poolSize, - tlsConfig: tlsConfig, - bucket: bucket, - } - if closer { - rv.bailOut = make(chan bool, 1) - go rv.connCloser() - } - return rv -} - -// ConnPoolTimeout is notified whenever connections are acquired from a pool. -var ConnPoolCallback func(host string, source string, start time.Time, err error) - -// Use regular in-the-clear connection if tlsConfig is nil. -// Use secure connection (TLS) if tlsConfig is set. -func defaultMkConn(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error) { - var features memcached.Features - - var conn *memcached.Client - var err error - if tlsConfig == nil { - conn, err = memcached.Connect("tcp", host) - } else { - conn, err = memcached.ConnectTLS("tcp", host, tlsConfig) - } - - if err != nil { - return nil, err - } - - if TCPKeepalive == true { - conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second) - } - - if EnableMutationToken == true { - features = append(features, memcached.FeatureMutationToken) - } - if EnableDataType == true { - features = append(features, memcached.FeatureDataType) - } - - if EnableXattr == true { - features = append(features, memcached.FeatureXattr) - } - - if EnableCollections { - features = append(features, memcached.FeatureCollections) - } - - if len(features) > 0 { - if DefaultTimeout > 0 { - conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout)) - } - - res, err := conn.EnableFeatures(features) - - if DefaultTimeout > 0 { - conn.SetDeadline(noDeadline) - } - - if err != nil && isTimeoutError(err) { - conn.Close() - return nil, err - } - - if err != nil || res.Status != gomemcached.SUCCESS { - logging.Warnf("Unable to enable features %v", err) - } - } - - if gah, ok := ah.(GenericMcdAuthHandler); ok { - err = gah.AuthenticateMemcachedConn(host, conn) - if err != nil { - conn.Close() - return nil, err - } - return conn, nil - } - name, pass, bucket := ah.GetCredentials() - if bucket == "" { - // Authenticator does not know specific bucket. - bucket = bucketName - } - - if name != "default" { - _, err = conn.Auth(name, pass) - if err != nil { - conn.Close() - return nil, err - } - // Select bucket (Required for cb_auth creds) - // Required when doing auth with _admin credentials - if bucket != "" && bucket != name { - _, err = conn.SelectBucket(bucket) - if err != nil { - conn.Close() - return nil, err - } - } - } - return conn, nil -} - -func (cp *connectionPool) Close() (err error) { - defer func() { - if recover() != nil { - err = errors.New("connectionPool.Close error") - } - }() - if cp.bailOut != nil { - - // defensively, we won't wait if the channel is full - select { - case cp.bailOut <- false: - default: - } - } - close(cp.connections) - for c := range cp.connections { - c.Close() - } - return -} - -func (cp *connectionPool) Node() string { - return cp.host -} - -func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) { - if cp == nil { - return nil, errNoPool - } - - path := "" - - if ConnPoolCallback != nil { - defer func(path *string, start time.Time) { - ConnPoolCallback(cp.host, *path, start, err) - }(&path, time.Now()) - } - - path = "short-circuit" - - // short-circuit available connetions. - select { - case rv, isopen := <-cp.connections: - if !isopen { - return nil, errClosedPool - } - atomic.AddUint64(&cp.connCount, 1) - return rv, nil - default: - } - - t := time.NewTimer(ConnPoolAvailWaitTime) - defer t.Stop() - - // Try to grab an available connection within 1ms - select { - case rv, isopen := <-cp.connections: - path = "avail1" - if !isopen { - return nil, errClosedPool - } - atomic.AddUint64(&cp.connCount, 1) - return rv, nil - case <-t.C: - // No connection came around in time, let's see - // whether we can get one or build a new one first. - t.Reset(d) // Reuse the timer for the full timeout. - select { - case rv, isopen := <-cp.connections: - path = "avail2" - if !isopen { - return nil, errClosedPool - } - atomic.AddUint64(&cp.connCount, 1) - return rv, nil - case cp.createsem <- true: - path = "create" - // Build a connection if we can't get a real one. - // This can potentially be an overflow connection, or - // a pooled connection. - rv, err := cp.mkConn(cp.host, cp.auth, cp.tlsConfig, cp.bucket) - if err != nil { - // On error, release our create hold - <-cp.createsem - } else { - atomic.AddUint64(&cp.connCount, 1) - } - return rv, err - case <-t.C: - return nil, ErrTimeout - } - } -} - -func (cp *connectionPool) Get() (*memcached.Client, error) { - return cp.GetWithTimeout(ConnPoolTimeout) -} - -func (cp *connectionPool) Return(c *memcached.Client) { - if c == nil { - return - } - - if cp == nil { - c.Close() - } - - if c.IsHealthy() { - defer func() { - if recover() != nil { - // This happens when the pool has already been - // closed and we're trying to return a - // connection to it anyway. Just close the - // connection. - c.Close() - } - }() - - select { - case cp.connections <- c: - default: - <-cp.createsem - c.Close() - } - } else { - <-cp.createsem - c.Close() - } -} - -// give the ability to discard a connection from a pool -// useful for ditching connections to the wrong node after a rebalance -func (cp *connectionPool) Discard(c *memcached.Client) { - <-cp.createsem - c.Close() -} - -// asynchronous connection closer -func (cp *connectionPool) connCloser() { - var connCount uint64 - - t := time.NewTimer(ConnCloserInterval) - defer t.Stop() - - for { - connCount = cp.connCount - - // we don't exist anymore! bail out! - select { - case <-cp.bailOut: - return - case <-t.C: - } - t.Reset(ConnCloserInterval) - - // no overflow connections open or sustained requests for connections - // nothing to do until the next cycle - if len(cp.connections) <= cp.poolSize || - ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) { - continue - } - - // close overflow connections now that they are not needed - for c := range cp.connections { - select { - case <-cp.bailOut: - return - default: - } - - // bail out if close did not work out - if !cp.connCleanup(c) { - return - } - if len(cp.connections) <= cp.poolSize { - break - } - } - } -} - -// close connection with recovery on error -func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) { - - // just in case we are closing a connection after - // bailOut has been sent but we haven't yet read it - defer func() { - if recover() != nil { - rv = false - } - }() - rv = true - - c.Close() - <-cp.createsem - return -} - -func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) { - if cp == nil { - return nil, errNoPool - } - mc, err := cp.Get() - if err != nil { - return nil, err - } - - // A connection can't be used after TAP; Dont' count it against the - // connection pool capacity - <-cp.createsem - - return mc.StartTapFeed(*args) -} - -const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb - -func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) { - if cp == nil { - return nil, errNoPool - } - mc, err := cp.Get() - if err != nil { - return nil, err - } - - // A connection can't be used after it has been allocated to UPR; - // Dont' count it against the connection pool capacity - <-cp.createsem - - uf, err := mc.NewUprFeed() - if err != nil { - return nil, err - } - - if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil { - return nil, err - } - - if err := uf.StartFeedWithConfig(data_chan_size); err != nil { - return nil, err - } - - return uf, nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go b/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go deleted file mode 100644 index f9cc343aa8..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/ddocs.go +++ /dev/null @@ -1,288 +0,0 @@ -package couchbase - -import ( - "bytes" - "encoding/json" - "fmt" - "github.com/couchbase/goutils/logging" - "io/ioutil" - "net/http" -) - -// ViewDefinition represents a single view within a design document. -type ViewDefinition struct { - Map string `json:"map"` - Reduce string `json:"reduce,omitempty"` -} - -// DDoc is the document body of a design document specifying a view. -type DDoc struct { - Language string `json:"language,omitempty"` - Views map[string]ViewDefinition `json:"views"` -} - -// DDocsResult represents the result from listing the design -// documents. -type DDocsResult struct { - Rows []struct { - DDoc struct { - Meta map[string]interface{} - JSON DDoc - } `json:"doc"` - } `json:"rows"` -} - -// GetDDocs lists all design documents -func (b *Bucket) GetDDocs() (DDocsResult, error) { - var ddocsResult DDocsResult - b.RLock() - pool := b.pool - uri := b.DDocs.URI - b.RUnlock() - - // MB-23555 ephemeral buckets have no ddocs - if uri == "" { - return DDocsResult{}, nil - } - - err := pool.client.parseURLResponse(uri, &ddocsResult) - if err != nil { - return DDocsResult{}, err - } - return ddocsResult, nil -} - -func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error { - ddocURI := fmt.Sprintf("/%s/_design/%s", b.GetName(), docname) - err := b.parseAPIResponse(ddocURI, &into) - if err != nil { - return err - } - return nil -} - -func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error) { - var ddocsResult DDocsResult - b.RLock() - uri := b.DDocs.URI - b.RUnlock() - - // MB-23555 ephemeral buckets have no ddocs - if uri == "" { - return DDocsResult{}, nil - } - - err := b.parseURLResponse(uri, &ddocsResult) - if err != nil { - return DDocsResult{}, err - } - return ddocsResult, nil -} - -func (b *Bucket) ddocURL(docname string) (string, error) { - u, err := b.randomBaseURL() - if err != nil { - return "", err - } - u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname) - return u.String(), nil -} - -func (b *Bucket) ddocURLNext(nodeId int, docname string) (string, int, error) { - u, selected, err := b.randomNextURL(nodeId) - if err != nil { - return "", -1, err - } - u.Path = fmt.Sprintf("/%s/_design/%s", b.GetName(), docname) - return u.String(), selected, nil -} - -const ABS_MAX_RETRIES = 10 -const ABS_MIN_RETRIES = 3 - -func (b *Bucket) getMaxRetries() (int, error) { - - maxRetries := len(b.Nodes()) - - if maxRetries == 0 { - return 0, fmt.Errorf("No available Couch rest URLs") - } - - if maxRetries > ABS_MAX_RETRIES { - maxRetries = ABS_MAX_RETRIES - } else if maxRetries < ABS_MIN_RETRIES { - maxRetries = ABS_MIN_RETRIES - } - - return maxRetries, nil -} - -// PutDDoc installs a design document. -func (b *Bucket) PutDDoc(docname string, value interface{}) error { - - var Err error - - maxRetries, err := b.getMaxRetries() - if err != nil { - return err - } - - lastNode := START_NODE_ID - - for retryCount := 0; retryCount < maxRetries; retryCount++ { - - Err = nil - - ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname) - if err != nil { - return err - } - - lastNode = selectedNode - - logging.Infof(" Trying with selected node %d", selectedNode) - j, err := json.Marshal(value) - if err != nil { - return err - } - - req, err := http.NewRequest("PUT", ddocU, bytes.NewReader(j)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */)) - if err != nil { - return err - } - - res, err := doHTTPRequest(req) - if err != nil { - return err - } - - if res.StatusCode != 201 { - body, _ := ioutil.ReadAll(res.Body) - Err = fmt.Errorf("error installing view: %v / %s", - res.Status, body) - logging.Errorf(" Error in PutDDOC %v. Retrying...", Err) - res.Body.Close() - b.Refresh() - continue - } - - res.Body.Close() - break - } - - return Err -} - -// GetDDoc retrieves a specific a design doc. -func (b *Bucket) GetDDoc(docname string, into interface{}) error { - var Err error - var res *http.Response - - maxRetries, err := b.getMaxRetries() - if err != nil { - return err - } - - lastNode := START_NODE_ID - for retryCount := 0; retryCount < maxRetries; retryCount++ { - - Err = nil - ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname) - if err != nil { - return err - } - - lastNode = selectedNode - logging.Infof(" Trying with selected node %d", selectedNode) - - req, err := http.NewRequest("GET", ddocU, nil) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - err = maybeAddAuth(req, b.authHandler(false /* bucket not yet locked */)) - if err != nil { - return err - } - - res, err = doHTTPRequest(req) - if err != nil { - return err - } - if res.StatusCode != 200 { - body, _ := ioutil.ReadAll(res.Body) - Err = fmt.Errorf("error reading view: %v / %s", - res.Status, body) - logging.Errorf(" Error in GetDDOC %v Retrying...", Err) - b.Refresh() - res.Body.Close() - continue - } - defer res.Body.Close() - break - } - - if Err != nil { - return Err - } - - d := json.NewDecoder(res.Body) - return d.Decode(into) -} - -// DeleteDDoc removes a design document. -func (b *Bucket) DeleteDDoc(docname string) error { - - var Err error - - maxRetries, err := b.getMaxRetries() - if err != nil { - return err - } - - lastNode := START_NODE_ID - - for retryCount := 0; retryCount < maxRetries; retryCount++ { - - Err = nil - ddocU, selectedNode, err := b.ddocURLNext(lastNode, docname) - if err != nil { - return err - } - - lastNode = selectedNode - logging.Infof(" Trying with selected node %d", selectedNode) - - req, err := http.NewRequest("DELETE", ddocU, nil) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - err = maybeAddAuth(req, b.authHandler(false /* bucket not already locked */)) - if err != nil { - return err - } - - res, err := doHTTPRequest(req) - if err != nil { - return err - } - if res.StatusCode != 200 { - body, _ := ioutil.ReadAll(res.Body) - Err = fmt.Errorf("error deleting view : %v / %s", res.Status, body) - logging.Errorf(" Error in DeleteDDOC %v. Retrying ... ", Err) - b.Refresh() - res.Body.Close() - continue - } - - res.Body.Close() - break - } - return Err -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/observe.go b/vendor/github.com/couchbaselabs/go-couchbase/observe.go deleted file mode 100644 index 6e746f5a16..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/observe.go +++ /dev/null @@ -1,300 +0,0 @@ -package couchbase - -import ( - "fmt" - "github.com/couchbase/goutils/logging" - "sync" -) - -type PersistTo uint8 - -const ( - PersistNone = PersistTo(0x00) - PersistMaster = PersistTo(0x01) - PersistOne = PersistTo(0x02) - PersistTwo = PersistTo(0x03) - PersistThree = PersistTo(0x04) - PersistFour = PersistTo(0x05) -) - -type ObserveTo uint8 - -const ( - ObserveNone = ObserveTo(0x00) - ObserveReplicateOne = ObserveTo(0x01) - ObserveReplicateTwo = ObserveTo(0x02) - ObserveReplicateThree = ObserveTo(0x03) - ObserveReplicateFour = ObserveTo(0x04) -) - -type JobType uint8 - -const ( - OBSERVE = JobType(0x00) - PERSIST = JobType(0x01) -) - -type ObservePersistJob struct { - vb uint16 - vbuuid uint64 - hostname string - jobType JobType - failover uint8 - lastPersistedSeqNo uint64 - currentSeqNo uint64 - resultChan chan *ObservePersistJob - errorChan chan *OPErrResponse -} - -type OPErrResponse struct { - vb uint16 - vbuuid uint64 - err error - job *ObservePersistJob -} - -var ObservePersistPool = NewPool(1024) -var OPJobChan = make(chan *ObservePersistJob, 1024) -var OPJobDone = make(chan bool) - -var wg sync.WaitGroup - -func (b *Bucket) StartOPPollers(maxWorkers int) { - - for i := 0; i < maxWorkers; i++ { - go b.OPJobPoll() - wg.Add(1) - } - wg.Wait() -} - -func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) { - - numNodes := len(b.Nodes()) - if int(nPersist) > numNodes || int(nObserve) > numNodes { - return fmt.Errorf("Not enough healthy nodes in the cluster") - } - - if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas { - return fmt.Errorf("Not enough replicas in the cluster") - } - - if EnableMutationToken == false { - return fmt.Errorf("Mutation Tokens not enabled ") - } - - b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)} - return -} - -func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) { - b.RLock() - ds := b.ds - b.RUnlock() - - if ds == nil { - return - } - - nj := 0 // total number of jobs - resultChan := make(chan *ObservePersistJob, 10) - errChan := make(chan *OPErrResponse, 10) - - nodes := b.GetNodeList(vb) - if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) { - return fmt.Errorf("Not enough healthy nodes in the cluster"), false - } - - logging.Infof("Node list %v", nodes) - - if ds.Observe >= ObserveReplicateOne { - // create a job for each host - for i := ObserveReplicateOne; i < ds.Observe+1; i++ { - opJob := ObservePersistPool.Get() - opJob.vb = vb - opJob.vbuuid = vbuuid - opJob.jobType = OBSERVE - opJob.hostname = nodes[i] - opJob.resultChan = resultChan - opJob.errorChan = errChan - - OPJobChan <- opJob - nj++ - - } - } - - if ds.Persist >= PersistMaster { - for i := PersistMaster; i < ds.Persist+1; i++ { - opJob := ObservePersistPool.Get() - opJob.vb = vb - opJob.vbuuid = vbuuid - opJob.jobType = PERSIST - opJob.hostname = nodes[i] - opJob.resultChan = resultChan - opJob.errorChan = errChan - - OPJobChan <- opJob - nj++ - - } - } - - ok := true - for ok { - select { - case res := <-resultChan: - jobDone := false - if res.failover == 0 { - // no failover - if res.jobType == PERSIST { - if res.lastPersistedSeqNo >= seqNo { - jobDone = true - } - - } else { - if res.currentSeqNo >= seqNo { - jobDone = true - } - } - - if jobDone == true { - nj-- - ObservePersistPool.Put(res) - } else { - // requeue this job - OPJobChan <- res - } - - } else { - // Not currently handling failover scenarios TODO - nj-- - ObservePersistPool.Put(res) - failover = true - } - - if nj == 0 { - // done with all the jobs - ok = false - close(resultChan) - close(errChan) - } - - case Err := <-errChan: - logging.Errorf("Error in Observe/Persist %v", Err.err) - err = fmt.Errorf("Error in Observe/Persist job %v", Err.err) - nj-- - ObservePersistPool.Put(Err.job) - if nj == 0 { - close(resultChan) - close(errChan) - ok = false - } - } - } - - return -} - -func (b *Bucket) OPJobPoll() { - - ok := true - for ok == true { - select { - case job := <-OPJobChan: - pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */) - if pool == nil { - errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} - errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname) - errRes.job = job - job.errorChan <- errRes - continue - } - conn, err := pool.Get() - if err != nil { - errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} - errRes.err = fmt.Errorf("Unable to get connection from pool %v", err) - errRes.job = job - job.errorChan <- errRes - continue - } - - res, err := conn.ObserveSeq(job.vb, job.vbuuid) - if err != nil { - errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} - errRes.err = fmt.Errorf("Command failed %v", err) - errRes.job = job - job.errorChan <- errRes - continue - - } - pool.Return(conn) - job.lastPersistedSeqNo = res.LastPersistedSeqNo - job.currentSeqNo = res.CurrentSeqNo - job.failover = res.Failover - - job.resultChan <- job - case <-OPJobDone: - logging.Infof("Observe Persist Poller exitting") - ok = false - } - } - wg.Done() -} - -func (b *Bucket) GetNodeList(vb uint16) []string { - - vbm := b.VBServerMap() - if len(vbm.VBucketMap) < int(vb) { - logging.Infof("vbmap smaller than vblist") - return nil - } - - nodes := make([]string, len(vbm.VBucketMap[vb])) - for i := 0; i < len(vbm.VBucketMap[vb]); i++ { - n := vbm.VBucketMap[vb][i] - if n < 0 { - continue - } - - node := b.getMasterNode(n) - if len(node) > 1 { - nodes[i] = node - } - continue - - } - return nodes -} - -//pool of ObservePersist Jobs -type OPpool struct { - pool chan *ObservePersistJob -} - -// NewPool creates a new pool of jobs -func NewPool(max int) *OPpool { - return &OPpool{ - pool: make(chan *ObservePersistJob, max), - } -} - -// Borrow a Client from the pool. -func (p *OPpool) Get() *ObservePersistJob { - var o *ObservePersistJob - select { - case o = <-p.pool: - default: - o = &ObservePersistJob{} - } - return o -} - -// Return returns a Client to the pool. -func (p *OPpool) Put(o *ObservePersistJob) { - select { - case p.pool <- o: - default: - // let it go, let it go... - } -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/pools.go b/vendor/github.com/couchbaselabs/go-couchbase/pools.go deleted file mode 100644 index 0e2379398a..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/pools.go +++ /dev/null @@ -1,1474 +0,0 @@ -package couchbase - -import ( - "bufio" - "bytes" - "crypto/tls" - "crypto/x509" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net/http" - "net/url" - "runtime" - "sort" - "strconv" - "strings" - "sync" - "time" - "unsafe" - - "github.com/couchbase/goutils/logging" - - "github.com/couchbase/gomemcached" // package name is 'gomemcached' - "github.com/couchbase/gomemcached/client" // package name is 'memcached' -) - -// HTTPClient to use for REST and view operations. -var MaxIdleConnsPerHost = 256 -var ClientTimeOut = 10 * time.Second -var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost} -var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut} - -// PoolSize is the size of each connection pool (per host). -var PoolSize = 64 - -// PoolOverflow is the number of overflow connections allowed in a -// pool. -var PoolOverflow = 16 - -// AsynchronousCloser turns on asynchronous closing for overflow connections -var AsynchronousCloser = false - -// TCP KeepAlive enabled/disabled -var TCPKeepalive = false - -// Enable MutationToken -var EnableMutationToken = false - -// Enable Data Type response -var EnableDataType = false - -// Enable Xattr -var EnableXattr = false - -// Enable Collections -var EnableCollections = false - -// TCP keepalive interval in seconds. Default 30 minutes -var TCPKeepaliveInterval = 30 * 60 - -// Used to decide whether to skip verification of certificates when -// connecting to an ssl port. -var skipVerify = true -var certFile = "" -var keyFile = "" -var rootFile = "" - -func SetSkipVerify(skip bool) { - skipVerify = skip -} - -func SetCertFile(cert string) { - certFile = cert -} - -func SetKeyFile(cert string) { - keyFile = cert -} - -func SetRootFile(cert string) { - rootFile = cert -} - -// Allow applications to speciify the Poolsize and Overflow -func SetConnectionPoolParams(size, overflow int) { - - if size > 0 { - PoolSize = size - } - - if overflow > 0 { - PoolOverflow = overflow - } -} - -// Turn off overflow connections -func DisableOverflowConnections() { - PoolOverflow = 0 -} - -// Toggle asynchronous overflow closer -func EnableAsynchronousCloser(closer bool) { - AsynchronousCloser = closer -} - -// Allow TCP keepalive parameters to be set by the application -func SetTcpKeepalive(enabled bool, interval int) { - - TCPKeepalive = enabled - - if interval > 0 { - TCPKeepaliveInterval = interval - } -} - -// AuthHandler is a callback that gets the auth username and password -// for the given bucket. -type AuthHandler interface { - GetCredentials() (string, string, string) -} - -// AuthHandler is a callback that gets the auth username and password -// for the given bucket and sasl for memcached. -type AuthWithSaslHandler interface { - AuthHandler - GetSaslCredentials() (string, string) -} - -// MultiBucketAuthHandler is kind of AuthHandler that may perform -// different auth for different buckets. -type MultiBucketAuthHandler interface { - AuthHandler - ForBucket(bucket string) AuthHandler -} - -// HTTPAuthHandler is kind of AuthHandler that performs more general -// for outgoing http requests than is possible via simple -// GetCredentials() call (i.e. digest auth or different auth per -// different destinations). -type HTTPAuthHandler interface { - AuthHandler - SetCredsForRequest(req *http.Request) error -} - -// RestPool represents a single pool returned from the pools REST API. -type RestPool struct { - Name string `json:"name"` - StreamingURI string `json:"streamingUri"` - URI string `json:"uri"` -} - -// Pools represents the collection of pools as returned from the REST API. -type Pools struct { - ComponentsVersion map[string]string `json:"componentsVersion,omitempty"` - ImplementationVersion string `json:"implementationVersion"` - IsAdmin bool `json:"isAdminCreds"` - UUID string `json:"uuid"` - Pools []RestPool `json:"pools"` -} - -// A Node is a computer in a cluster running the couchbase software. -type Node struct { - ClusterCompatibility int `json:"clusterCompatibility"` - ClusterMembership string `json:"clusterMembership"` - CouchAPIBase string `json:"couchApiBase"` - Hostname string `json:"hostname"` - InterestingStats map[string]float64 `json:"interestingStats,omitempty"` - MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"` - MCDMemoryReserved float64 `json:"mcdMemoryReserved"` - MemoryFree float64 `json:"memoryFree"` - MemoryTotal float64 `json:"memoryTotal"` - OS string `json:"os"` - Ports map[string]int `json:"ports"` - Services []string `json:"services"` - Status string `json:"status"` - Uptime int `json:"uptime,string"` - Version string `json:"version"` - ThisNode bool `json:"thisNode,omitempty"` -} - -// A Pool of nodes and buckets. -type Pool struct { - BucketMap map[string]*Bucket - Nodes []Node - - BucketURL map[string]string `json:"buckets"` - - client *Client -} - -// VBucketServerMap is the a mapping of vbuckets to nodes. -type VBucketServerMap struct { - HashAlgorithm string `json:"hashAlgorithm"` - NumReplicas int `json:"numReplicas"` - ServerList []string `json:"serverList"` - VBucketMap [][]int `json:"vBucketMap"` -} - -type DurablitySettings struct { - Persist PersistTo - Observe ObserveTo -} - -// Bucket is the primary entry point for most data operations. -// Bucket is a locked data structure. All access to its fields should be done using read or write locking, -// as appropriate. -// -// Some access methods require locking, but rely on the caller to do so. These are appropriate -// for calls from methods that have already locked the structure. Methods like this -// take a boolean parameter "bucketLocked". -type Bucket struct { - sync.RWMutex - AuthType string `json:"authType"` - Capabilities []string `json:"bucketCapabilities"` - CapabilitiesVersion string `json:"bucketCapabilitiesVer"` - Type string `json:"bucketType"` - Name string `json:"name"` - NodeLocator string `json:"nodeLocator"` - Quota map[string]float64 `json:"quota,omitempty"` - Replicas int `json:"replicaNumber"` - Password string `json:"saslPassword"` - URI string `json:"uri"` - StreamingURI string `json:"streamingUri"` - LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"` - UUID string `json:"uuid"` - ConflictResolutionType string `json:"conflictResolutionType,omitempty"` - DDocs struct { - URI string `json:"uri"` - } `json:"ddocs,omitempty"` - BasicStats map[string]interface{} `json:"basicStats,omitempty"` - Controllers map[string]interface{} `json:"controllers,omitempty"` - - // These are used for JSON IO, but isn't used for processing - // since it needs to be swapped out safely. - VBSMJson VBucketServerMap `json:"vBucketServerMap"` - NodesJSON []Node `json:"nodes"` - - pool *Pool - connPools unsafe.Pointer // *[]*connectionPool - vBucketServerMap unsafe.Pointer // *VBucketServerMap - nodeList unsafe.Pointer // *[]Node - commonSufix string - ah AuthHandler // auth handler - ds *DurablitySettings // Durablity Settings for this bucket - closed bool -} - -// PoolServices is all the bucket-independent services in a pool -type PoolServices struct { - Rev int `json:"rev"` - NodesExt []NodeServices `json:"nodesExt"` - Capabilities json.RawMessage `json:"clusterCapabilities"` -} - -// NodeServices is all the bucket-independent services running on -// a node (given by Hostname) -type NodeServices struct { - Services map[string]int `json:"services,omitempty"` - Hostname string `json:"hostname"` - ThisNode bool `json:"thisNode"` -} - -type BucketNotFoundError struct { - bucket string -} - -func (e *BucketNotFoundError) Error() string { - return fmt.Sprint("No bucket named " + e.bucket) -} - -type BucketAuth struct { - name string - saslPwd string - bucket string -} - -func newBucketAuth(name string, pass string, bucket string) *BucketAuth { - return &BucketAuth{name: name, saslPwd: pass, bucket: bucket} -} - -func (ba *BucketAuth) GetCredentials() (string, string, string) { - return ba.name, ba.saslPwd, ba.bucket -} - -// VBServerMap returns the current VBucketServerMap. -func (b *Bucket) VBServerMap() *VBucketServerMap { - b.RLock() - defer b.RUnlock() - ret := (*VBucketServerMap)(b.vBucketServerMap) - return ret -} - -func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) { - vbmap := b.VBServerMap() - servers := vbmap.ServerList - if addrs == nil { - addrs = vbmap.ServerList - } - - m := make(map[string][]uint16) - for _, addr := range addrs { - m[addr] = make([]uint16, 0) - } - for vbno, idxs := range vbmap.VBucketMap { - if len(idxs) == 0 { - return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno) - } else if idxs[0] < 0 || idxs[0] >= len(servers) { - return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno) - } - addr := servers[idxs[0]] - if _, ok := m[addr]; ok { - m[addr] = append(m[addr], uint16(vbno)) - } - } - return m, nil -} - -// true if node is not on the bucket VBmap -func (b *Bucket) checkVBmap(node string) bool { - vbmap := b.VBServerMap() - servers := vbmap.ServerList - - for _, idxs := range vbmap.VBucketMap { - if len(idxs) == 0 { - return true - } else if idxs[0] < 0 || idxs[0] >= len(servers) { - return true - } - if servers[idxs[0]] == node { - return false - } - } - return true -} - -func (b *Bucket) GetName() string { - b.RLock() - defer b.RUnlock() - ret := b.Name - return ret -} - -// Nodes returns the current list of nodes servicing this bucket. -func (b *Bucket) Nodes() []Node { - b.RLock() - defer b.RUnlock() - ret := *(*[]Node)(b.nodeList) - return ret -} - -// return the list of healthy nodes -func (b *Bucket) HealthyNodes() []Node { - nodes := []Node{} - - for _, n := range b.Nodes() { - if n.Status == "healthy" && n.CouchAPIBase != "" { - nodes = append(nodes, n) - } - if n.Status != "healthy" { // log non-healthy node - logging.Infof("Non-healthy node; node details:") - logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode) - } - } - - return nodes -} - -func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool { - if !bucketLocked { - b.RLock() - defer b.RUnlock() - } - if b.connPools != nil { - return *(*[]*connectionPool)(b.connPools) - } else { - return nil - } -} - -func (b *Bucket) replaceConnPools(with []*connectionPool) { - b.Lock() - defer b.Unlock() - - old := b.connPools - b.connPools = unsafe.Pointer(&with) - if old != nil { - for _, pool := range *(*[]*connectionPool)(old) { - if pool != nil { - pool.Close() - } - } - } - return -} - -func (b *Bucket) getConnPool(i int) *connectionPool { - - if i < 0 { - return nil - } - - p := b.getConnPools(false /* not already locked */) - if len(p) > i { - return p[i] - } - - return nil -} - -func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool { - pools := b.getConnPools(bucketLocked) - for _, p := range pools { - if p != nil && p.host == host { - return p - } - } - - return nil -} - -// Given a vbucket number, returns a memcached connection to it. -// The connection must be returned to its pool after use. -func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) { - for { - vbm := b.VBServerMap() - if len(vbm.VBucketMap) < int(vb) { - return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v", - vb, vbm.VBucketMap) - } - masterId := vbm.VBucketMap[vb][0] - if masterId < 0 { - return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb) - } - pool := b.getConnPool(masterId) - conn, err := pool.Get() - if err != errClosedPool { - return conn, pool, err - } - // If conn pool was closed, because another goroutine refreshed the vbucket map, retry... - } -} - -// To get random documents, we need to cover all the nodes, so select -// a connection at random. - -func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) { - for { - var currentPool = 0 - pools := b.getConnPools(false /* not already locked */) - if len(pools) == 0 { - return nil, nil, fmt.Errorf("No connection pool found") - } else if len(pools) > 1 { // choose a random connection - currentPool = rand.Intn(len(pools)) - } // if only one pool, currentPool defaults to 0, i.e., the only pool - - // get the pool - pool := pools[currentPool] - conn, err := pool.Get() - if err != errClosedPool { - return conn, pool, err - } - - // If conn pool was closed, because another goroutine refreshed the vbucket map, retry... - } -} - -// -// Get a random document from a bucket. Since the bucket may be distributed -// across nodes, we must first select a random connection, and then use the -// Client.GetRandomDoc() call to get a random document from that node. -// - -func (b *Bucket) GetRandomDoc() (*gomemcached.MCResponse, error) { - // get a connection from the pool - conn, pool, err := b.getRandomConnection() - - if err != nil { - return nil, err - } - - // We may need to select the bucket before GetRandomDoc() - // will work. This is sometimes done at startup (see defaultMkConn()) - // but not always, depending on the auth type. - _, err = conn.SelectBucket(b.Name) - if err != nil { - return nil, err - } - - // get a randomm document from the connection - doc, err := conn.GetRandomDoc() - // need to return the connection to the pool - pool.Return(conn) - return doc, err -} - -func (b *Bucket) getMasterNode(i int) string { - p := b.getConnPools(false /* not already locked */) - if len(p) > i { - return p[i].host - } - return "" -} - -func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) { - if !bucketLocked { - b.RLock() - defer b.RUnlock() - } - pool := b.pool - name := b.Name - - if pool != nil { - ah = pool.client.ah - } - if mbah, ok := ah.(MultiBucketAuthHandler); ok { - return mbah.ForBucket(name) - } - if ah == nil { - ah = &basicAuth{name, ""} - } - return -} - -// NodeAddresses gets the (sorted) list of memcached node addresses -// (hostname:port). -func (b *Bucket) NodeAddresses() []string { - vsm := b.VBServerMap() - rv := make([]string, len(vsm.ServerList)) - copy(rv, vsm.ServerList) - sort.Strings(rv) - return rv -} - -// CommonAddressSuffix finds the longest common suffix of all -// host:port strings in the node list. -func (b *Bucket) CommonAddressSuffix() string { - input := []string{} - for _, n := range b.Nodes() { - input = append(input, n.Hostname) - } - return FindCommonSuffix(input) -} - -// A Client is the starting point for all services across all buckets -// in a Couchbase cluster. -type Client struct { - BaseURL *url.URL - ah AuthHandler - Info Pools - tlsConfig *tls.Config -} - -func maybeAddAuth(req *http.Request, ah AuthHandler) error { - if hah, ok := ah.(HTTPAuthHandler); ok { - return hah.SetCredsForRequest(req) - } - if ah != nil { - user, pass, _ := ah.GetCredentials() - req.Header.Set("Authorization", "Basic "+ - base64.StdEncoding.EncodeToString([]byte(user+":"+pass))) - } - return nil -} - -// arbitary number, may need to be tuned #FIXME -const HTTP_MAX_RETRY = 5 - -// Someday golang network packages will implement standard -// error codes. Until then #sigh -func isHttpConnError(err error) bool { - - estr := err.Error() - return strings.Contains(estr, "broken pipe") || - strings.Contains(estr, "broken connection") || - strings.Contains(estr, "connection reset") -} - -var client *http.Client - -func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) { - cfg := &tls.Config{} - - if certFile != "" && keyFile != "" { - tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, err - } - cfg.Certificates = []tls.Certificate{tlsCert} - } else { - //error need to pass both certfile and keyfile - return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile") - } - - var caCert []byte - var err1 error - - caCertPool := x509.NewCertPool() - if rootFile != "" { - // Read that value in - caCert, err1 = ioutil.ReadFile(rootFile) - if err1 != nil { - return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1) - } - caCertPool.AppendCertsFromPEM(caCert) - } - - cfg.RootCAs = caCertPool - return cfg, nil -} - -func doHTTPRequest(req *http.Request) (*http.Response, error) { - - var err error - var res *http.Response - - // we need a client that ignores certificate errors, since we self-sign - // our certs - if client == nil && req.URL.Scheme == "https" { - var tr *http.Transport - - if skipVerify { - tr = &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - } else { - // Handle cases with cert - - cfg, err := ClientConfigForX509(certFile, keyFile, rootFile) - if err != nil { - return nil, err - } - - tr = &http.Transport{ - TLSClientConfig: cfg, - } - } - - client = &http.Client{Transport: tr} - - } else if client == nil { - client = HTTPClient - } - - for i := 0; i < HTTP_MAX_RETRY; i++ { - res, err = client.Do(req) - if err != nil && isHttpConnError(err) { - continue - } - break - } - - if err != nil { - return nil, err - } - - return res, err -} - -func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error { - return doOutputAPI("PUT", baseURL, path, params, authHandler, out) -} - -func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error { - return doOutputAPI("POST", baseURL, path, params, authHandler, out) -} - -func doOutputAPI( - httpVerb string, - baseURL *url.URL, - path string, - params map[string]interface{}, - authHandler AuthHandler, - out interface{}) error { - - var requestUrl string - - if q := strings.Index(path, "?"); q > 0 { - requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] - } else { - requestUrl = baseURL.Scheme + "://" + baseURL.Host + path - } - - postData := url.Values{} - for k, v := range params { - postData.Set(k, fmt.Sprintf("%v", v)) - } - - req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode())) - if err != nil { - return err - } - - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - - err = maybeAddAuth(req, authHandler) - if err != nil { - return err - } - - res, err := doHTTPRequest(req) - if err != nil { - return err - } - - defer res.Body.Close() - if res.StatusCode != 200 { - bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) - return fmt.Errorf("HTTP error %v getting %q: %s", - res.Status, requestUrl, bod) - } - - d := json.NewDecoder(res.Body) - if err = d.Decode(&out); err != nil { - return err - } - return nil -} - -func queryRestAPI( - baseURL *url.URL, - path string, - authHandler AuthHandler, - out interface{}) error { - - var requestUrl string - - if q := strings.Index(path, "?"); q > 0 { - requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] - } else { - requestUrl = baseURL.Scheme + "://" + baseURL.Host + path - } - - req, err := http.NewRequest("GET", requestUrl, nil) - if err != nil { - return err - } - - err = maybeAddAuth(req, authHandler) - if err != nil { - return err - } - - res, err := doHTTPRequest(req) - if err != nil { - return err - } - - defer res.Body.Close() - if res.StatusCode != 200 { - bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) - return fmt.Errorf("HTTP error %v getting %q: %s", - res.Status, requestUrl, bod) - } - - d := json.NewDecoder(res.Body) - if err = d.Decode(&out); err != nil { - return err - } - return nil -} - -func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error { - return c.processStream(c.BaseURL, path, c.ah, callb, data) -} - -// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309 -func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error { - var requestUrl string - - if q := strings.Index(path, "?"); q > 0 { - requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] - } else { - requestUrl = baseURL.Scheme + "://" + baseURL.Host + path - } - - req, err := http.NewRequest("GET", requestUrl, nil) - if err != nil { - return err - } - - err = maybeAddAuth(req, authHandler) - if err != nil { - return err - } - - res, err := doHTTPRequest(req) - if err != nil { - return err - } - - defer res.Body.Close() - if res.StatusCode != 200 { - bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) - return fmt.Errorf("HTTP error %v getting %q: %s", - res.Status, requestUrl, bod) - } - - reader := bufio.NewReader(res.Body) - for { - bs, err := reader.ReadBytes('\n') - if err != nil { - return err - } - if len(bs) == 1 && bs[0] == '\n' { - continue - } - - err = json.Unmarshal(bs, data) - if err != nil { - return err - } - err = callb(data) - if err != nil { - return err - } - } - return nil - -} - -func (c *Client) parseURLResponse(path string, out interface{}) error { - return queryRestAPI(c.BaseURL, path, c.ah, out) -} - -func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error { - return doPostAPI(c.BaseURL, path, params, c.ah, out) -} - -func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error { - return doPutAPI(c.BaseURL, path, params, c.ah, out) -} - -func (b *Bucket) parseURLResponse(path string, out interface{}) error { - nodes := b.Nodes() - if len(nodes) == 0 { - return errors.New("no couch rest URLs") - } - - // Pick a random node to start querying. - startNode := rand.Intn(len(nodes)) - maxRetries := len(nodes) - for i := 0; i < maxRetries; i++ { - node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list. - // Skip non-healthy nodes. - if node.Status != "healthy" || node.CouchAPIBase == "" { - continue - } - url := &url.URL{ - Host: node.Hostname, - Scheme: "http", - } - - // Lock here to avoid having pool closed under us. - b.RLock() - err := queryRestAPI(url, path, b.pool.client.ah, out) - b.RUnlock() - if err == nil { - return err - } - } - return errors.New("All nodes failed to respond or no healthy nodes for bucket found") -} - -func (b *Bucket) parseAPIResponse(path string, out interface{}) error { - nodes := b.Nodes() - if len(nodes) == 0 { - return errors.New("no couch rest URLs") - } - - var err error - var u *url.URL - - // Pick a random node to start querying. - startNode := rand.Intn(len(nodes)) - maxRetries := len(nodes) - for i := 0; i < maxRetries; i++ { - node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list. - // Skip non-healthy nodes. - if node.Status != "healthy" || node.CouchAPIBase == "" { - continue - } - - u, err = ParseURL(node.CouchAPIBase) - // Lock here so pool does not get closed under us. - b.RLock() - if err != nil { - b.RUnlock() - return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", - b.Name, i, node.CouchAPIBase, err) - } else if b.pool != nil { - u.User = b.pool.client.BaseURL.User - } - u.Path = path - - // generate the path so that the strings are properly escaped - // MB-13770 - requestPath := strings.Split(u.String(), u.Host)[1] - - err = queryRestAPI(u, requestPath, b.pool.client.ah, out) - b.RUnlock() - if err == nil { - return err - } - } - - var errStr string - if err != nil { - errStr = "Error " + err.Error() - } - - return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr) -} - -type basicAuth struct { - u, p string -} - -func (b basicAuth) GetCredentials() (string, string, string) { - return b.u, b.p, b.u -} - -func basicAuthFromURL(us string) (ah AuthHandler) { - u, err := ParseURL(us) - if err != nil { - return - } - if user := u.User; user != nil { - pw, _ := user.Password() - ah = basicAuth{user.Username(), pw} - } - return -} - -// ConnectWithAuth connects to a couchbase cluster with the given -// authentication handler. -func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) { - c.BaseURL, err = ParseURL(baseU) - if err != nil { - return - } - c.ah = ah - - return c, c.parseURLResponse("/pools", &c.Info) -} - -// Call this method with a TLS certificate file name to make communication -// with the KV engine encrypted. -// -// This method should be called immediately after a Connect*() method. -func (c *Client) InitTLS(certFile string) error { - serverCert, err := ioutil.ReadFile(certFile) - if err != nil { - return err - } - CA_Pool := x509.NewCertPool() - CA_Pool.AppendCertsFromPEM(serverCert) - c.tlsConfig = &tls.Config{RootCAs: CA_Pool} - return nil -} - -func (c *Client) ClearTLS() { - c.tlsConfig = nil -} - -// ConnectWithAuthCreds connects to a couchbase cluster with the give -// authorization creds returned by cb_auth -func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) { - c.BaseURL, err = ParseURL(baseU) - if err != nil { - return - } - - c.ah = newBucketAuth(username, password, "") - return c, c.parseURLResponse("/pools", &c.Info) -} - -// Connect to a couchbase cluster. An authentication handler will be -// created from the userinfo in the URL if provided. -func Connect(baseU string) (Client, error) { - return ConnectWithAuth(baseU, basicAuthFromURL(baseU)) -} - -type BucketInfo struct { - Name string // name of bucket - Password string // SASL password of bucket -} - -//Get SASL buckets -func GetBucketList(baseU string) (bInfo []BucketInfo, err error) { - - c := &Client{} - c.BaseURL, err = ParseURL(baseU) - if err != nil { - return - } - c.ah = basicAuthFromURL(baseU) - - var buckets []Bucket - err = c.parseURLResponse("/pools/default/buckets", &buckets) - if err != nil { - return - } - bInfo = make([]BucketInfo, 0) - for _, bucket := range buckets { - bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password} - bInfo = append(bInfo, bucketInfo) - } - return bInfo, err -} - -//Set viewUpdateDaemonOptions -func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) { - - c := &Client{} - c.BaseURL, err = ParseURL(baseU) - if err != nil { - return - } - c.ah = basicAuthFromURL(baseU) - - if len(params) < 1 { - return nil, fmt.Errorf("No params to set") - } - - err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts) - if err != nil { - return - } - return viewOpts, err -} - -// This API lets the caller know, if the list of nodes a bucket is -// connected to has gone through an edit (a rebalance operation) -// since the last update to the bucket, in which case a Refresh is -// advised. -func (b *Bucket) NodeListChanged() bool { - b.RLock() - pool := b.pool - uri := b.URI - b.RUnlock() - - tmpb := &Bucket{} - err := pool.client.parseURLResponse(uri, tmpb) - if err != nil { - return true - } - - bNodes := *(*[]Node)(b.nodeList) - if len(bNodes) != len(tmpb.NodesJSON) { - return true - } - - bucketHostnames := map[string]bool{} - for _, node := range bNodes { - bucketHostnames[node.Hostname] = true - } - - for _, node := range tmpb.NodesJSON { - if _, found := bucketHostnames[node.Hostname]; !found { - return true - } - } - - return false -} - -// Sample data for scopes and collections as returned from the -// /pooles/default/$BUCKET_NAME/collections API. -// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}} - -// Structures for parsing collections manifest. -// The map key is the name of the scope. -// Example data: -// {"uid":"b","scopes":[ -// {"name":"_default","uid":"0","collections":[ -// {"name":"_default","uid":"0"}]}, -// {"name":"myScope1","uid":"8","collections":[ -// {"name":"myCollectionB","uid":"c"}, -// {"name":"myCollectionA","uid":"b"}]}, -// {"name":"myScope2","uid":"9","collections":[ -// {"name":"myCollectionC","uid":"d"}]}]} -type InputManifest struct { - Uid string - Scopes []InputScope -} -type InputScope struct { - Name string - Uid string - Collections []InputCollection -} -type InputCollection struct { - Name string - Uid string -} - -// Structures for storing collections information. -type Manifest struct { - Uid uint64 - Scopes map[string]*Scope // map by name -} -type Scope struct { - Name string - Uid uint64 - Collections map[string]*Collection // map by name -} -type Collection struct { - Name string - Uid uint64 -} - -var _EMPTY_MANIFEST *Manifest = &Manifest{Uid: 0, Scopes: map[string]*Scope{}} - -func parseCollectionsManifest(res *gomemcached.MCResponse) (*Manifest, error) { - if !EnableCollections { - return _EMPTY_MANIFEST, nil - } - - var im InputManifest - err := json.Unmarshal(res.Body, &im) - if err != nil { - return nil, err - } - - uid, err := strconv.ParseUint(im.Uid, 16, 64) - if err != nil { - return nil, err - } - mani := &Manifest{Uid: uid, Scopes: make(map[string]*Scope, len(im.Scopes))} - for _, iscope := range im.Scopes { - scope_uid, err := strconv.ParseUint(iscope.Uid, 16, 64) - if err != nil { - return nil, err - } - scope := &Scope{Uid: scope_uid, Name: iscope.Name, Collections: make(map[string]*Collection, len(iscope.Collections))} - mani.Scopes[iscope.Name] = scope - for _, icoll := range iscope.Collections { - coll_uid, err := strconv.ParseUint(icoll.Uid, 16, 64) - if err != nil { - return nil, err - } - coll := &Collection{Uid: coll_uid, Name: icoll.Name} - scope.Collections[icoll.Name] = coll - } - } - - return mani, nil -} - -// This function assumes the bucket is locked. -func (b *Bucket) GetCollectionsManifest() (*Manifest, error) { - // Collections not used? - if !EnableCollections { - return nil, fmt.Errorf("Collections not enabled.") - } - - b.RLock() - pools := b.getConnPools(true /* already locked */) - pool := pools[0] // Any pool will do, so use the first one. - b.RUnlock() - client, err := pool.Get() - if err != nil { - return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name) - } - - // We need to select the bucket before GetCollectionsManifest() - // will work. This is sometimes done at startup (see defaultMkConn()) - // but not always, depending on the auth type. - // Doing this is safe because we collect the the connections - // by bucket, so the bucket being selected will never change. - _, err = client.SelectBucket(b.Name) - if err != nil { - pool.Return(client) - return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name) - } - - res, err := client.GetCollectionsManifest() - if err != nil { - pool.Return(client) - return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name) - } - mani, err := parseCollectionsManifest(res) - if err != nil { - pool.Return(client) - return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name) - } - - pool.Return(client) - return mani, nil -} - -func (b *Bucket) RefreshFully() error { - return b.refresh(false) -} - -func (b *Bucket) Refresh() error { - return b.refresh(true) -} - -func (b *Bucket) refresh(preserveConnections bool) error { - b.RLock() - pool := b.pool - uri := b.URI - client := pool.client - b.RUnlock() - tlsConfig := client.tlsConfig - - var poolServices PoolServices - var err error - if tlsConfig != nil { - poolServices, err = client.GetPoolServices("default") - if err != nil { - return err - } - } - - tmpb := &Bucket{} - err = pool.client.parseURLResponse(uri, tmpb) - if err != nil { - return err - } - - pools := b.getConnPools(false /* bucket not already locked */) - - // We need this lock to ensure that bucket refreshes happening because - // of NMVb errors received during bulkGet do not end up over-writing - // pool.inUse. - b.Lock() - - for _, pool := range pools { - if pool != nil { - pool.inUse = false - } - } - - newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) - for i := range newcps { - - if preserveConnections { - pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) - if pool != nil && pool.inUse == false { - // if the hostname and index is unchanged then reuse this pool - newcps[i] = pool - pool.inUse = true - continue - } - } - - hostport := tmpb.VBSMJson.ServerList[i] - if tlsConfig != nil { - hostport, err = MapKVtoSSL(hostport, &poolServices) - if err != nil { - b.Unlock() - return err - } - } - - if b.ah != nil { - newcps[i] = newConnectionPool(hostport, - b.ah, AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name) - - } else { - newcps[i] = newConnectionPool(hostport, - b.authHandler(true /* bucket already locked */), - AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name) - } - } - b.replaceConnPools2(newcps, true /* bucket already locked */) - tmpb.ah = b.ah - b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) - b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) - - b.Unlock() - return nil -} - -func (p *Pool) refresh() (err error) { - p.BucketMap = make(map[string]*Bucket) - - buckets := []Bucket{} - err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets) - if err != nil { - return err - } - for i, _ := range buckets { - b := new(Bucket) - *b = buckets[i] - b.pool = p - b.nodeList = unsafe.Pointer(&b.NodesJSON) - - // MB-33185 this is merely defensive, just in case - // refresh() gets called on a perfectly node pool - ob, ok := p.BucketMap[b.Name] - if ok && ob.connPools != nil { - ob.Close() - } - b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList))) - p.BucketMap[b.Name] = b - runtime.SetFinalizer(b, bucketFinalizer) - } - return nil -} - -// GetPool gets a pool from within the couchbase cluster (usually -// "default"). -func (c *Client) GetPool(name string) (p Pool, err error) { - var poolURI string - - for _, p := range c.Info.Pools { - if p.Name == name { - poolURI = p.URI - break - } - } - if poolURI == "" { - return p, errors.New("No pool named " + name) - } - - err = c.parseURLResponse(poolURI, &p) - - p.client = c - - err = p.refresh() - return -} - -// GetPoolServices returns all the bucket-independent services in a pool. -// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV) -func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) { - var poolName string - for _, p := range c.Info.Pools { - if p.Name == name { - poolName = p.Name - } - } - if poolName == "" { - return ps, errors.New("No pool named " + name) - } - - poolURI := "/pools/" + poolName + "/nodeServices" - err = c.parseURLResponse(poolURI, &ps) - - return -} - -func (b *Bucket) GetPoolServices(name string) (*PoolServices, error) { - b.RLock() - pool := b.pool - b.RUnlock() - - ps, err := pool.client.GetPoolServices(name) - if err != nil { - return nil, err - } - - return &ps, nil -} - -// Close marks this bucket as no longer needed, closing connections it -// may have open. -func (b *Bucket) Close() { - b.Lock() - defer b.Unlock() - if b.connPools != nil { - for _, c := range b.getConnPools(true /* already locked */) { - if c != nil { - c.Close() - } - } - b.connPools = nil - } -} - -func bucketFinalizer(b *Bucket) { - if b.connPools != nil { - if !b.closed { - logging.Warnf("Finalizing a bucket with active connections.") - } - - // MB-33185 do not leak connection pools - b.Close() - } -} - -// GetBucket gets a bucket from within this pool. -func (p *Pool) GetBucket(name string) (*Bucket, error) { - rv, ok := p.BucketMap[name] - if !ok { - return nil, &BucketNotFoundError{bucket: name} - } - err := rv.Refresh() - if err != nil { - return nil, err - } - return rv, nil -} - -// GetBucket gets a bucket from within this pool. -func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) { - rv, ok := p.BucketMap[bucket] - if !ok { - return nil, &BucketNotFoundError{bucket: bucket} - } - rv.ah = newBucketAuth(username, password, bucket) - err := rv.Refresh() - if err != nil { - return nil, err - } - return rv, nil -} - -// GetPool gets the pool to which this bucket belongs. -func (b *Bucket) GetPool() *Pool { - b.RLock() - defer b.RUnlock() - ret := b.pool - return ret -} - -// GetClient gets the client from which we got this pool. -func (p *Pool) GetClient() *Client { - return p.client -} - -// Release bucket connections when the pool is no longer in use -func (p *Pool) Close() { - // fine to loop through the buckets unlocked - // locking happens at the bucket level - for b, _ := range p.BucketMap { - - // MB-33208 defer closing connection pools until the bucket is no longer used - bucket := p.BucketMap[b] - bucket.Lock() - bucket.closed = true - bucket.Unlock() - } -} - -// GetBucket is a convenience function for getting a named bucket from -// a URL -func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) { - var err error - client, err := Connect(endpoint) - if err != nil { - return nil, err - } - - pool, err := client.GetPool(poolname) - if err != nil { - return nil, err - } - - return pool.GetBucket(bucketname) -} - -// ConnectWithAuthAndGetBucket is a convenience function for -// getting a named bucket from a given URL and an auth callback -func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string, - ah AuthHandler) (*Bucket, error) { - client, err := ConnectWithAuth(endpoint, ah) - if err != nil { - return nil, err - } - - pool, err := client.GetPool(poolname) - if err != nil { - return nil, err - } - - return pool.GetBucket(bucketname) -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/port_map.go b/vendor/github.com/couchbaselabs/go-couchbase/port_map.go deleted file mode 100644 index 24c9f105db..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/port_map.go +++ /dev/null @@ -1,84 +0,0 @@ -package couchbase - -/* - -The goal here is to map a hostname:port combination to another hostname:port -combination. The original hostname:port gives the name and regular KV port -of a couchbase server. We want to determine the corresponding SSL KV port. - -To do this, we have a pool services structure, as obtained from -the /pools/default/nodeServices API. - -For a fully configured two-node system, the structure may look like this: -{"rev":32,"nodesExt":[ - {"services":{"mgmt":8091,"mgmtSSL":18091,"fts":8094,"ftsSSL":18094,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211},"hostname":"172.23.123.101"}, - {"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"capiSSL":18092,"capi":8092,"kvSSL":11207,"projector":9999,"kv":11210,"moxi":11211,"n1ql":8093,"n1qlSSL":18093},"thisNode":true,"hostname":"172.23.123.102"}]} - -In this case, note the "hostname" fields, and the "kv" and "kvSSL" fields. - -For a single-node system, perhaps brought up for testing, the structure may look like this: -{"rev":66,"nodesExt":[ - {"services":{"mgmt":8091,"mgmtSSL":18091,"indexAdmin":9100,"indexScan":9101,"indexHttp":9102,"indexStreamInit":9103,"indexStreamCatchup":9104,"indexStreamMaint":9105,"indexHttps":19102,"kv":11210,"kvSSL":11207,"capi":8092,"capiSSL":18092,"projector":9999,"n1ql":8093,"n1qlSSL":18093},"thisNode":true}],"clusterCapabilitiesVer":[1,0],"clusterCapabilities":{"n1ql":["enhancedPreparedStatements"]}} - -Here, note that there is only a single entry in the "nodeExt" array and that it does not have a "hostname" field. -We will assume that either hostname fields are present, or there is only a single node. -*/ - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" -) - -func ParsePoolServices(jsonInput string) (*PoolServices, error) { - ps := &PoolServices{} - err := json.Unmarshal([]byte(jsonInput), ps) - return ps, err -} - -func MapKVtoSSL(hostport string, ps *PoolServices) (string, error) { - colonIndex := strings.LastIndex(hostport, ":") - if colonIndex < 0 { - return "", fmt.Errorf("Unable to find host/port separator in %s", hostport) - } - host := hostport[0:colonIndex] - port := hostport[colonIndex+1:] - portInt, err := strconv.Atoi(port) - if err != nil { - return "", fmt.Errorf("Unable to parse host/port combination %s: %v", hostport, err) - } - - var ns *NodeServices - if len(ps.NodesExt) == 1 { - ns = &(ps.NodesExt[0]) - } else { - for i := range ps.NodesExt { - hostname := ps.NodesExt[i].Hostname - if len(hostname) == 0 { - // in case of missing hostname, check for 127.0.0.1 - hostname = "127.0.0.1" - } - if hostname == host { - ns = &(ps.NodesExt[i]) - break - } - } - } - - if ns == nil { - return "", fmt.Errorf("Unable to parse host/port combination %s: no matching node found among %d", hostport, len(ps.NodesExt)) - } - kv, found := ns.Services["kv"] - if !found { - return "", fmt.Errorf("Unable to map host/port combination %s: target host has no kv port listed", hostport) - } - kvSSL, found := ns.Services["kvSSL"] - if !found { - return "", fmt.Errorf("Unable to map host/port combination %s: target host has no kvSSL port listed", hostport) - } - if portInt != kv { - return "", fmt.Errorf("Unable to map hostport combination %s: expected port %d but found %d", hostport, portInt, kv) - } - return fmt.Sprintf("%s:%d", host, kvSSL), nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/streaming.go b/vendor/github.com/couchbaselabs/go-couchbase/streaming.go deleted file mode 100644 index 6d8f7dfd53..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/streaming.go +++ /dev/null @@ -1,215 +0,0 @@ -package couchbase - -import ( - "encoding/json" - "fmt" - "github.com/couchbase/goutils/logging" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "time" - "unsafe" -) - -// Bucket auto-updater gets the latest version of the bucket config from -// the server. If the configuration has changed then updated the local -// bucket information. If the bucket has been deleted then notify anyone -// who is holding a reference to this bucket - -const MAX_RETRY_COUNT = 5 -const DISCONNECT_PERIOD = 120 * time.Second - -type NotifyFn func(bucket string, err error) - -// Use TCP keepalive to detect half close sockets -var updaterTransport http.RoundTripper = &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, -} - -var updaterHTTPClient = &http.Client{Transport: updaterTransport} - -func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) { - - var err error - var res *http.Response - - for i := 0; i < HTTP_MAX_RETRY; i++ { - res, err = updaterHTTPClient.Do(req) - if err != nil && isHttpConnError(err) { - continue - } - break - } - - if err != nil { - return nil, err - } - - return res, err -} - -func (b *Bucket) RunBucketUpdater(notify NotifyFn) { - go func() { - err := b.UpdateBucket() - if err != nil { - if notify != nil { - notify(b.GetName(), err) - } - logging.Errorf(" Bucket Updater exited with err %v", err) - } - }() -} - -func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) { - if !bucketLocked { - b.Lock() - defer b.Unlock() - } - old := b.connPools - b.connPools = unsafe.Pointer(&with) - if old != nil { - for _, pool := range *(*[]*connectionPool)(old) { - if pool != nil && pool.inUse == false { - pool.Close() - } - } - } - return -} - -func (b *Bucket) UpdateBucket() error { - - var failures int - var returnErr error - - var poolServices PoolServices - var err error - tlsConfig := b.pool.client.tlsConfig - if tlsConfig != nil { - poolServices, err = b.pool.client.GetPoolServices("default") - if err != nil { - return err - } - } - - for { - - if failures == MAX_RETRY_COUNT { - logging.Errorf(" Maximum failures reached. Exiting loop...") - return fmt.Errorf("Max failures reached. Last Error %v", returnErr) - } - - nodes := b.Nodes() - if len(nodes) < 1 { - return fmt.Errorf("No healthy nodes found") - } - - startNode := rand.Intn(len(nodes)) - node := nodes[(startNode)%len(nodes)] - - streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, b.GetName()) - logging.Infof(" Trying with %s", streamUrl) - req, err := http.NewRequest("GET", streamUrl, nil) - if err != nil { - return err - } - - // Lock here to avoid having pool closed under us. - b.RLock() - err = maybeAddAuth(req, b.pool.client.ah) - b.RUnlock() - if err != nil { - return err - } - - res, err := doHTTPRequestForUpdate(req) - if err != nil { - return err - } - - if res.StatusCode != 200 { - bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) - logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod) - res.Body.Close() - returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod) - failures++ - continue - } - - dec := json.NewDecoder(res.Body) - - tmpb := &Bucket{} - for { - - err := dec.Decode(&tmpb) - if err != nil { - returnErr = err - res.Body.Close() - break - } - - // if we got here, reset failure count - failures = 0 - b.Lock() - - // mark all the old connection pools for deletion - pools := b.getConnPools(true /* already locked */) - for _, pool := range pools { - if pool != nil { - pool.inUse = false - } - } - - newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) - for i := range newcps { - // get the old connection pool and check if it is still valid - pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) - if pool != nil && pool.inUse == false { - // if the hostname and index is unchanged then reuse this pool - newcps[i] = pool - pool.inUse = true - continue - } - // else create a new pool - hostport := tmpb.VBSMJson.ServerList[i] - if tlsConfig != nil { - hostport, err = MapKVtoSSL(hostport, &poolServices) - if err != nil { - b.Unlock() - return err - } - } - if b.ah != nil { - newcps[i] = newConnectionPool(hostport, - b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name) - - } else { - newcps[i] = newConnectionPool(hostport, - b.authHandler(true /* bucket already locked */), - false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name) - } - } - - b.replaceConnPools2(newcps, true /* bucket already locked */) - - tmpb.ah = b.ah - b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) - b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) - b.Unlock() - - logging.Infof("Got new configuration for bucket %s", b.GetName()) - - } - // we are here because of an error - failures++ - continue - - } - return nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/tap.go b/vendor/github.com/couchbaselabs/go-couchbase/tap.go deleted file mode 100644 index 86edd30554..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/tap.go +++ /dev/null @@ -1,143 +0,0 @@ -package couchbase - -import ( - "github.com/couchbase/gomemcached/client" - "github.com/couchbase/goutils/logging" - "sync" - "time" -) - -const initialRetryInterval = 1 * time.Second -const maximumRetryInterval = 30 * time.Second - -// A TapFeed streams mutation events from a bucket. -// -// Events from the bucket can be read from the channel 'C'. Remember -// to call Close() on it when you're done, unless its channel has -// closed itself already. -type TapFeed struct { - C <-chan memcached.TapEvent - - bucket *Bucket - args *memcached.TapArguments - nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes - output chan memcached.TapEvent // Same as C but writeably-typed - wg sync.WaitGroup - quit chan bool -} - -// StartTapFeed creates and starts a new Tap feed -func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) { - if args == nil { - defaultArgs := memcached.DefaultTapArguments() - args = &defaultArgs - } - - feed := &TapFeed{ - bucket: b, - args: args, - output: make(chan memcached.TapEvent, 10), - quit: make(chan bool), - } - - go feed.run() - - feed.C = feed.output - return feed, nil -} - -// Goroutine that runs the feed -func (feed *TapFeed) run() { - retryInterval := initialRetryInterval - bucketOK := true - for { - // Connect to the TAP feed of each server node: - if bucketOK { - killSwitch, err := feed.connectToNodes() - if err == nil { - // Run until one of the sub-feeds fails: - select { - case <-killSwitch: - case <-feed.quit: - return - } - feed.closeNodeFeeds() - retryInterval = initialRetryInterval - } - } - - // On error, try to refresh the bucket in case the list of nodes changed: - logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v", - feed.bucket.Name, retryInterval) - err := feed.bucket.Refresh() - bucketOK = err == nil - - select { - case <-time.After(retryInterval): - case <-feed.quit: - return - } - if retryInterval *= 2; retryInterval > maximumRetryInterval { - retryInterval = maximumRetryInterval - } - } -} - -func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) { - killSwitch = make(chan bool) - for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { - var singleFeed *memcached.TapFeed - singleFeed, err = serverConn.StartTapFeed(feed.args) - if err != nil { - logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err) - feed.closeNodeFeeds() - return - } - feed.nodeFeeds = append(feed.nodeFeeds, singleFeed) - go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host) - feed.wg.Add(1) - } - return -} - -// Goroutine that forwards Tap events from a single node's feed to the aggregate feed. -func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) { - defer feed.wg.Done() - for { - select { - case event, ok := <-singleFeed.C: - if !ok { - if singleFeed.Error != nil { - logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error) - } - killSwitch <- true - return - } - feed.output <- event - case <-feed.quit: - return - } - } -} - -func (feed *TapFeed) closeNodeFeeds() { - for _, f := range feed.nodeFeeds { - f.Close() - } - feed.nodeFeeds = nil -} - -// Close a Tap feed. -func (feed *TapFeed) Close() error { - select { - case <-feed.quit: - return nil - default: - } - - feed.closeNodeFeeds() - close(feed.quit) - feed.wg.Wait() - close(feed.output) - return nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/upr.go b/vendor/github.com/couchbaselabs/go-couchbase/upr.go deleted file mode 100644 index bf1b209b7e..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/upr.go +++ /dev/null @@ -1,398 +0,0 @@ -package couchbase - -import ( - "log" - "sync" - "time" - - "fmt" - "github.com/couchbase/gomemcached" - "github.com/couchbase/gomemcached/client" - "github.com/couchbase/goutils/logging" -) - -// A UprFeed streams mutation events from a bucket. -// -// Events from the bucket can be read from the channel 'C'. Remember -// to call Close() on it when you're done, unless its channel has -// closed itself already. -type UprFeed struct { - C <-chan *memcached.UprEvent - - bucket *Bucket - nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes - output chan *memcached.UprEvent // Same as C but writeably-typed - outputClosed bool - quit chan bool - name string // name of this UPR feed - sequence uint32 // sequence number for this feed - connected bool - killSwitch chan bool - closing bool - wg sync.WaitGroup - dcp_buffer_size uint32 - data_chan_size int -} - -// UprFeed from a single connection -type FeedInfo struct { - uprFeed *memcached.UprFeed // UPR feed handle - host string // hostname - connected bool // connected - quit chan bool // quit channel -} - -type FailoverLog map[uint16]memcached.FailoverLog - -// GetFailoverLogs, get the failover logs for a set of vbucket ids -func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) { - - // map vbids to their corresponding hosts - vbHostList := make(map[string][]uint16) - vbm := b.VBServerMap() - if len(vbm.VBucketMap) < len(vBuckets) { - return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", - vbm.VBucketMap, vBuckets) - } - - for _, vb := range vBuckets { - masterID := vbm.VBucketMap[vb][0] - master := b.getMasterNode(masterID) - if master == "" { - return nil, fmt.Errorf("No master found for vb %d", vb) - } - - vbList := vbHostList[master] - if vbList == nil { - vbList = make([]uint16, 0) - } - vbList = append(vbList, vb) - vbHostList[master] = vbList - } - - failoverLogMap := make(FailoverLog) - for _, serverConn := range b.getConnPools(false /* not already locked */) { - - vbList := vbHostList[serverConn.host] - if vbList == nil { - continue - } - - mc, err := serverConn.Get() - if err != nil { - logging.Infof("No Free connections for vblist %v", vbList) - return nil, fmt.Errorf("No Free connections for host %s", - serverConn.host) - - } - // close the connection so that it doesn't get reused for upr data - // connection - defer mc.Close() - failoverlogs, err := mc.UprGetFailoverLog(vbList) - if err != nil { - return nil, fmt.Errorf("Error getting failover log %s host %s", - err.Error(), serverConn.host) - - } - - for vb, log := range failoverlogs { - failoverLogMap[vb] = *log - } - } - - return failoverLogMap, nil -} - -func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) { - return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE) -} - -// StartUprFeed creates and starts a new Upr feed -// No data will be sent on the channel unless vbuckets streams are requested -func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) { - - feed := &UprFeed{ - bucket: b, - output: make(chan *memcached.UprEvent, data_chan_size), - quit: make(chan bool), - nodeFeeds: make(map[string]*FeedInfo, 0), - name: name, - sequence: sequence, - killSwitch: make(chan bool), - dcp_buffer_size: dcp_buffer_size, - data_chan_size: data_chan_size, - } - - err := feed.connectToNodes() - if err != nil { - return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error()) - } - feed.connected = true - go feed.run() - - feed.C = feed.output - return feed, nil -} - -// UprRequestStream starts a stream for a vb on a feed -func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32, - vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { - - defer func() { - if r := recover(); r != nil { - log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket) - } - }() - - vbm := feed.bucket.VBServerMap() - if len(vbm.VBucketMap) < int(vb) { - return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", - vb, vbm.VBucketMap) - } - - if int(vb) >= len(vbm.VBucketMap) { - return fmt.Errorf("Invalid vbucket id %d", vb) - } - - masterID := vbm.VBucketMap[vb][0] - master := feed.bucket.getMasterNode(masterID) - if master == "" { - return fmt.Errorf("Master node not found for vbucket %d", vb) - } - singleFeed := feed.nodeFeeds[master] - if singleFeed == nil { - return fmt.Errorf("UprFeed for this host not found") - } - - if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags, - vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil { - return err - } - - return nil -} - -// UprCloseStream ends a vbucket stream. -func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error { - - defer func() { - if r := recover(); r != nil { - log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket) - } - }() - - vbm := feed.bucket.VBServerMap() - if len(vbm.VBucketMap) < int(vb) { - return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", - vb, vbm.VBucketMap) - } - - if int(vb) >= len(vbm.VBucketMap) { - return fmt.Errorf("Invalid vbucket id %d", vb) - } - - masterID := vbm.VBucketMap[vb][0] - master := feed.bucket.getMasterNode(masterID) - if master == "" { - return fmt.Errorf("Master node not found for vbucket %d", vb) - } - singleFeed := feed.nodeFeeds[master] - if singleFeed == nil { - return fmt.Errorf("UprFeed for this host not found") - } - - if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil { - return err - } - return nil -} - -// Goroutine that runs the feed -func (feed *UprFeed) run() { - retryInterval := initialRetryInterval - bucketOK := true - for { - // Connect to the UPR feed of each server node: - if bucketOK { - // Run until one of the sub-feeds fails: - select { - case <-feed.killSwitch: - case <-feed.quit: - return - } - //feed.closeNodeFeeds() - retryInterval = initialRetryInterval - } - - if feed.closing == true { - // we have been asked to shut down - return - } - - // On error, try to refresh the bucket in case the list of nodes changed: - logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v", - feed.bucket.Name, retryInterval) - - if err := feed.bucket.Refresh(); err != nil { - // if we fail to refresh the bucket, exit the feed - // MB-14917 - logging.Infof("Unable to refresh bucket %s ", err.Error()) - close(feed.output) - feed.outputClosed = true - feed.closeNodeFeeds() - return - } - - // this will only connect to nodes that are not connected or changed - // user will have to reconnect the stream - err := feed.connectToNodes() - if err != nil { - logging.Infof("Unable to connect to nodes..exit ") - close(feed.output) - feed.outputClosed = true - feed.closeNodeFeeds() - return - } - bucketOK = err == nil - - select { - case <-time.After(retryInterval): - case <-feed.quit: - return - } - if retryInterval *= 2; retryInterval > maximumRetryInterval { - retryInterval = maximumRetryInterval - } - } -} - -func (feed *UprFeed) connectToNodes() (err error) { - nodeCount := 0 - for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { - - // this maybe a reconnection, so check if the connection to the node - // already exists. Connect only if the node is not found in the list - // or connected == false - nodeFeed := feed.nodeFeeds[serverConn.host] - - if nodeFeed != nil && nodeFeed.connected == true { - continue - } - - var singleFeed *memcached.UprFeed - var name string - if feed.name == "" { - name = "DefaultUprClient" - } else { - name = feed.name - } - singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size) - if err != nil { - logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err) - feed.closeNodeFeeds() - return - } - // add the node to the connection map - feedInfo := &FeedInfo{ - uprFeed: singleFeed, - connected: true, - host: serverConn.host, - quit: make(chan bool), - } - feed.nodeFeeds[serverConn.host] = feedInfo - go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host) - feed.wg.Add(1) - nodeCount++ - } - if nodeCount == 0 { - return fmt.Errorf("No connection to bucket") - } - - return nil -} - -// Goroutine that forwards Upr events from a single node's feed to the aggregate feed. -func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) { - singleFeed := nodeFeed.uprFeed - - defer func() { - feed.wg.Done() - if r := recover(); r != nil { - //if feed is not closing, re-throw the panic - if feed.outputClosed != true && feed.closing != true { - panic(r) - } else { - logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully") - - } - } - }() - - for { - select { - case <-nodeFeed.quit: - nodeFeed.connected = false - return - - case event, ok := <-singleFeed.C: - if !ok { - if singleFeed.Error != nil { - logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error) - } - killSwitch <- true - return - } - if feed.outputClosed == true { - // someone closed the node feed - logging.Infof("Node need closed, returning from forwardUprEvent") - return - } - feed.output <- event - if event.Status == gomemcached.NOT_MY_VBUCKET { - logging.Infof(" Got a not my vbucket error !! ") - if err := feed.bucket.Refresh(); err != nil { - logging.Errorf("Unable to refresh bucket %s ", err.Error()) - feed.closeNodeFeeds() - return - } - // this will only connect to nodes that are not connected or changed - // user will have to reconnect the stream - if err := feed.connectToNodes(); err != nil { - logging.Errorf("Unable to connect to nodes %s", err.Error()) - return - } - - } - } - } -} - -func (feed *UprFeed) closeNodeFeeds() { - for _, f := range feed.nodeFeeds { - logging.Infof(" Sending close to forwardUprEvent ") - close(f.quit) - f.uprFeed.Close() - } - feed.nodeFeeds = nil -} - -// Close a Upr feed. -func (feed *UprFeed) Close() error { - select { - case <-feed.quit: - return nil - default: - } - - feed.closing = true - feed.closeNodeFeeds() - close(feed.quit) - - feed.wg.Wait() - if feed.outputClosed == false { - feed.outputClosed = true - close(feed.output) - } - - return nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/users.go b/vendor/github.com/couchbaselabs/go-couchbase/users.go deleted file mode 100644 index 47d4861522..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/users.go +++ /dev/null @@ -1,119 +0,0 @@ -package couchbase - -import ( - "bytes" - "fmt" -) - -type User struct { - Name string - Id string - Domain string - Roles []Role -} - -type Role struct { - Role string - BucketName string `json:"bucket_name"` -} - -// Sample: -// {"role":"admin","name":"Admin","desc":"Can manage ALL cluster features including security.","ce":true} -// {"role":"query_select","bucket_name":"*","name":"Query Select","desc":"Can execute SELECT statement on bucket to retrieve data"} -type RoleDescription struct { - Role string - Name string - Desc string - Ce bool - BucketName string `json:"bucket_name"` -} - -// Return user-role data, as parsed JSON. -// Sample: -// [{"id":"ivanivanov","name":"Ivan Ivanov","roles":[{"role":"cluster_admin"},{"bucket_name":"default","role":"bucket_admin"}]}, -// {"id":"petrpetrov","name":"Petr Petrov","roles":[{"role":"replication_admin"}]}] -func (c *Client) GetUserRoles() ([]interface{}, error) { - ret := make([]interface{}, 0, 1) - err := c.parseURLResponse("/settings/rbac/users", &ret) - if err != nil { - return nil, err - } - - // Get the configured administrator. - // Expected result: {"port":8091,"username":"Administrator"} - adminInfo := make(map[string]interface{}, 2) - err = c.parseURLResponse("/settings/web", &adminInfo) - if err != nil { - return nil, err - } - - // Create a special entry for the configured administrator. - adminResult := map[string]interface{}{ - "name": adminInfo["username"], - "id": adminInfo["username"], - "domain": "ns_server", - "roles": []interface{}{ - map[string]interface{}{ - "role": "admin", - }, - }, - } - - // Add the configured administrator to the list of results. - ret = append(ret, adminResult) - - return ret, nil -} - -func (c *Client) GetUserInfoAll() ([]User, error) { - ret := make([]User, 0, 16) - err := c.parseURLResponse("/settings/rbac/users", &ret) - if err != nil { - return nil, err - } - return ret, nil -} - -func rolesToParamFormat(roles []Role) string { - var buffer bytes.Buffer - for i, role := range roles { - if i > 0 { - buffer.WriteString(",") - } - buffer.WriteString(role.Role) - if role.BucketName != "" { - buffer.WriteString("[") - buffer.WriteString(role.BucketName) - buffer.WriteString("]") - } - } - return buffer.String() -} - -func (c *Client) PutUserInfo(u *User) error { - params := map[string]interface{}{ - "name": u.Name, - "roles": rolesToParamFormat(u.Roles), - } - var target string - switch u.Domain { - case "external": - target = "/settings/rbac/users/" + u.Id - case "local": - target = "/settings/rbac/users/local/" + u.Id - default: - return fmt.Errorf("Unknown user type: %s", u.Domain) - } - var ret string // PUT returns an empty string. We ignore it. - err := c.parsePutURLResponse(target, params, &ret) - return err -} - -func (c *Client) GetRolesAll() ([]RoleDescription, error) { - ret := make([]RoleDescription, 0, 32) - err := c.parseURLResponse("/settings/rbac/roles", &ret) - if err != nil { - return nil, err - } - return ret, nil -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/util.go b/vendor/github.com/couchbaselabs/go-couchbase/util.go deleted file mode 100644 index 4d286a3271..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/util.go +++ /dev/null @@ -1,49 +0,0 @@ -package couchbase - -import ( - "fmt" - "net/url" - "strings" -) - -// CleanupHost returns the hostname with the given suffix removed. -func CleanupHost(h, commonSuffix string) string { - if strings.HasSuffix(h, commonSuffix) { - return h[:len(h)-len(commonSuffix)] - } - return h -} - -// FindCommonSuffix returns the longest common suffix from the given -// strings. -func FindCommonSuffix(input []string) string { - rv := "" - if len(input) < 2 { - return "" - } - from := input - for i := len(input[0]); i > 0; i-- { - common := true - suffix := input[0][i:] - for _, s := range from { - if !strings.HasSuffix(s, suffix) { - common = false - break - } - } - if common { - rv = suffix - } - } - return rv -} - -// ParseURL is a wrapper around url.Parse with some sanity-checking -func ParseURL(urlStr string) (result *url.URL, err error) { - result, err = url.Parse(urlStr) - if result != nil && result.Scheme == "" { - result = nil - err = fmt.Errorf("invalid URL <%s>", urlStr) - } - return -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go b/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go deleted file mode 100644 index b96a18ed57..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/vbmap.go +++ /dev/null @@ -1,77 +0,0 @@ -package couchbase - -var crc32tab = []uint32{ - 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, - 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3, - 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, - 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, - 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de, - 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, - 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, - 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5, - 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, - 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, - 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940, - 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, - 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, - 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f, - 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, - 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, - 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a, - 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, - 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, - 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01, - 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, - 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, - 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c, - 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, - 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, - 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb, - 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, - 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, - 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086, - 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, - 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, - 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad, - 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, - 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, - 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8, - 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, - 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, - 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7, - 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, - 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, - 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252, - 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, - 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, - 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79, - 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, - 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, - 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04, - 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, - 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, - 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713, - 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, - 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, - 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e, - 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, - 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, - 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45, - 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, - 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, - 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0, - 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, - 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, - 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf, - 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, - 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d} - -// VBHash finds the vbucket for the given key. -func (b *Bucket) VBHash(key string) uint32 { - crc := uint32(0xffffffff) - for x := 0; x < len(key); x++ { - crc = (crc >> 8) ^ crc32tab[(uint64(crc)^uint64(key[x]))&0xff] - } - vbm := b.VBServerMap() - return ((^crc) >> 16) & 0x7fff & (uint32(len(vbm.VBucketMap)) - 1) -} diff --git a/vendor/github.com/couchbaselabs/go-couchbase/views.go b/vendor/github.com/couchbaselabs/go-couchbase/views.go deleted file mode 100644 index 2f68642f5a..0000000000 --- a/vendor/github.com/couchbaselabs/go-couchbase/views.go +++ /dev/null @@ -1,231 +0,0 @@ -package couchbase - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "math/rand" - "net/http" - "net/url" - "time" -) - -// ViewRow represents a single result from a view. -// -// Doc is present only if include_docs was set on the request. -type ViewRow struct { - ID string - Key interface{} - Value interface{} - Doc *interface{} -} - -// A ViewError is a node-specific error indicating a partial failure -// within a view result. -type ViewError struct { - From string - Reason string -} - -func (ve ViewError) Error() string { - return "Node: " + ve.From + ", reason: " + ve.Reason -} - -// ViewResult holds the entire result set from a view request, -// including the rows and the errors. -type ViewResult struct { - TotalRows int `json:"total_rows"` - Rows []ViewRow - Errors []ViewError -} - -func (b *Bucket) randomBaseURL() (*url.URL, error) { - nodes := b.HealthyNodes() - if len(nodes) == 0 { - return nil, errors.New("no available couch rest URLs") - } - nodeNo := rand.Intn(len(nodes)) - node := nodes[nodeNo] - - b.RLock() - name := b.Name - pool := b.pool - b.RUnlock() - - u, err := ParseURL(node.CouchAPIBase) - if err != nil { - return nil, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", - name, nodeNo, node.CouchAPIBase, err) - } else if pool != nil { - u.User = pool.client.BaseURL.User - } - return u, err -} - -const START_NODE_ID = -1 - -func (b *Bucket) randomNextURL(lastNode int) (*url.URL, int, error) { - nodes := b.HealthyNodes() - if len(nodes) == 0 { - return nil, -1, errors.New("no available couch rest URLs") - } - - var nodeNo int - if lastNode == START_NODE_ID || lastNode >= len(nodes) { - // randomly select a node if the value of lastNode is invalid - nodeNo = rand.Intn(len(nodes)) - } else { - // wrap around the node list - nodeNo = (lastNode + 1) % len(nodes) - } - - b.RLock() - name := b.Name - pool := b.pool - b.RUnlock() - - node := nodes[nodeNo] - u, err := ParseURL(node.CouchAPIBase) - if err != nil { - return nil, -1, fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", - name, nodeNo, node.CouchAPIBase, err) - } else if pool != nil { - u.User = pool.client.BaseURL.User - } - return u, nodeNo, err -} - -// DocID is the document ID type for the startkey_docid parameter in -// views. -type DocID string - -func qParam(k, v string) string { - format := `"%s"` - switch k { - case "startkey_docid", "endkey_docid", "stale": - format = "%s" - } - return fmt.Sprintf(format, v) -} - -// ViewURL constructs a URL for a view with the given ddoc, view name, -// and parameters. -func (b *Bucket) ViewURL(ddoc, name string, - params map[string]interface{}) (string, error) { - u, err := b.randomBaseURL() - if err != nil { - return "", err - } - - values := url.Values{} - for k, v := range params { - switch t := v.(type) { - case DocID: - values[k] = []string{string(t)} - case string: - values[k] = []string{qParam(k, t)} - case int: - values[k] = []string{fmt.Sprintf(`%d`, t)} - case bool: - values[k] = []string{fmt.Sprintf(`%v`, t)} - default: - b, err := json.Marshal(v) - if err != nil { - return "", fmt.Errorf("unsupported value-type %T in Query, "+ - "json encoder said %v", t, err) - } - values[k] = []string{fmt.Sprintf(`%v`, string(b))} - } - } - - if ddoc == "" && name == "_all_docs" { - u.Path = fmt.Sprintf("/%s/_all_docs", b.GetName()) - } else { - u.Path = fmt.Sprintf("/%s/_design/%s/_view/%s", b.GetName(), ddoc, name) - } - u.RawQuery = values.Encode() - - return u.String(), nil -} - -// ViewCallback is called for each view invocation. -var ViewCallback func(ddoc, name string, start time.Time, err error) - -// ViewCustom performs a view request that can map row values to a -// custom type. -// -// See the source to View for an example usage. -func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{}, - vres interface{}) (err error) { - if SlowServerCallWarningThreshold > 0 { - defer slowLog(time.Now(), "call to ViewCustom(%q, %q)", ddoc, name) - } - - if ViewCallback != nil { - defer func(t time.Time) { ViewCallback(ddoc, name, t, err) }(time.Now()) - } - - u, err := b.ViewURL(ddoc, name, params) - if err != nil { - return err - } - - req, err := http.NewRequest("GET", u, nil) - if err != nil { - return err - } - - ah := b.authHandler(false /* bucket not yet locked */) - maybeAddAuth(req, ah) - - res, err := doHTTPRequest(req) - if err != nil { - return fmt.Errorf("error starting view req at %v: %v", u, err) - } - defer res.Body.Close() - - if res.StatusCode != 200 { - bod := make([]byte, 512) - l, _ := res.Body.Read(bod) - return fmt.Errorf("error executing view req at %v: %v - %s", - u, res.Status, bod[:l]) - } - - body, err := ioutil.ReadAll(res.Body) - if err := json.Unmarshal(body, vres); err != nil { - return nil - } - - return nil -} - -// View executes a view. -// -// The ddoc parameter is just the bare name of your design doc without -// the "_design/" prefix. -// -// Parameters are string keys with values that correspond to couchbase -// view parameters. Primitive should work fairly naturally (booleans, -// ints, strings, etc...) and other values will attempt to be JSON -// marshaled (useful for array indexing on on view keys, for example). -// -// Example: -// -// res, err := couchbase.View("myddoc", "myview", map[string]interface{}{ -// "group_level": 2, -// "startkey_docid": []interface{}{"thing"}, -// "endkey_docid": []interface{}{"thing", map[string]string{}}, -// "stale": false, -// }) -func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error) { - vres := ViewResult{} - - if err := b.ViewCustom(ddoc, name, params, &vres); err != nil { - //error in accessing views. Retry once after a bucket refresh - b.Refresh() - return vres, b.ViewCustom(ddoc, name, params, &vres) - } else { - return vres, nil - } -} |