diff options
author | techknowlogick <matti@mdranta.net> | 2019-02-05 11:52:51 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-05 11:52:51 -0500 |
commit | 9de871a0f8911030f8e06a881803cf722b8798ea (patch) | |
tree | 206400f0a5873d7d078fcdd004956036f07a1db5 /vendor/github.com/couchbaselabs/go-couchbase/pools.go | |
parent | bf4badad1d68c18d7ffb92c69e09e4e8aa252935 (diff) | |
download | gitea-9de871a0f8911030f8e06a881803cf722b8798ea.tar.gz gitea-9de871a0f8911030f8e06a881803cf722b8798ea.zip |
add other session providers (#5963)
Diffstat (limited to 'vendor/github.com/couchbaselabs/go-couchbase/pools.go')
-rw-r--r-- | vendor/github.com/couchbaselabs/go-couchbase/pools.go | 1282 |
1 files changed, 1282 insertions, 0 deletions
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/pools.go b/vendor/github.com/couchbaselabs/go-couchbase/pools.go new file mode 100644 index 0000000000..5f3ff8c495 --- /dev/null +++ b/vendor/github.com/couchbaselabs/go-couchbase/pools.go @@ -0,0 +1,1282 @@ +package couchbase + +import ( + "bufio" + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "runtime" + "sort" + "strings" + "sync" + "unsafe" + + "github.com/couchbase/goutils/logging" + + "github.com/couchbase/gomemcached" // package name is 'gomemcached' + "github.com/couchbase/gomemcached/client" // package name is 'memcached' +) + +// HTTPClient to use for REST and view operations. +var MaxIdleConnsPerHost = 256 +var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost} +var HTTPClient = &http.Client{Transport: HTTPTransport} + +// PoolSize is the size of each connection pool (per host). +var PoolSize = 64 + +// PoolOverflow is the number of overflow connections allowed in a +// pool. +var PoolOverflow = 16 + +// AsynchronousCloser turns on asynchronous closing for overflow connections +var AsynchronousCloser = false + +// TCP KeepAlive enabled/disabled +var TCPKeepalive = false + +// Enable MutationToken +var EnableMutationToken = false + +// Enable Data Type response +var EnableDataType = false + +// Enable Xattr +var EnableXattr = false + +// TCP keepalive interval in seconds. Default 30 minutes +var TCPKeepaliveInterval = 30 * 60 + +// Used to decide whether to skip verification of certificates when +// connecting to an ssl port. +var skipVerify = true +var certFile = "" +var keyFile = "" +var rootFile = "" + +func SetSkipVerify(skip bool) { + skipVerify = skip +} + +func SetCertFile(cert string) { + certFile = cert +} + +func SetKeyFile(cert string) { + keyFile = cert +} + +func SetRootFile(cert string) { + rootFile = cert +} + +// Allow applications to speciify the Poolsize and Overflow +func SetConnectionPoolParams(size, overflow int) { + + if size > 0 { + PoolSize = size + } + + if overflow > 0 { + PoolOverflow = overflow + } +} + +// Turn off overflow connections +func DisableOverflowConnections() { + PoolOverflow = 0 +} + +// Toggle asynchronous overflow closer +func EnableAsynchronousCloser(closer bool) { + AsynchronousCloser = closer +} + +// Allow TCP keepalive parameters to be set by the application +func SetTcpKeepalive(enabled bool, interval int) { + + TCPKeepalive = enabled + + if interval > 0 { + TCPKeepaliveInterval = interval + } +} + +// AuthHandler is a callback that gets the auth username and password +// for the given bucket. +type AuthHandler interface { + GetCredentials() (string, string, string) +} + +// AuthHandler is a callback that gets the auth username and password +// for the given bucket and sasl for memcached. +type AuthWithSaslHandler interface { + AuthHandler + GetSaslCredentials() (string, string) +} + +// MultiBucketAuthHandler is kind of AuthHandler that may perform +// different auth for different buckets. +type MultiBucketAuthHandler interface { + AuthHandler + ForBucket(bucket string) AuthHandler +} + +// HTTPAuthHandler is kind of AuthHandler that performs more general +// for outgoing http requests than is possible via simple +// GetCredentials() call (i.e. digest auth or different auth per +// different destinations). +type HTTPAuthHandler interface { + AuthHandler + SetCredsForRequest(req *http.Request) error +} + +// RestPool represents a single pool returned from the pools REST API. +type RestPool struct { + Name string `json:"name"` + StreamingURI string `json:"streamingUri"` + URI string `json:"uri"` +} + +// Pools represents the collection of pools as returned from the REST API. +type Pools struct { + ComponentsVersion map[string]string `json:"componentsVersion,omitempty"` + ImplementationVersion string `json:"implementationVersion"` + IsAdmin bool `json:"isAdminCreds"` + UUID string `json:"uuid"` + Pools []RestPool `json:"pools"` +} + +// A Node is a computer in a cluster running the couchbase software. +type Node struct { + ClusterCompatibility int `json:"clusterCompatibility"` + ClusterMembership string `json:"clusterMembership"` + CouchAPIBase string `json:"couchApiBase"` + Hostname string `json:"hostname"` + InterestingStats map[string]float64 `json:"interestingStats,omitempty"` + MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"` + MCDMemoryReserved float64 `json:"mcdMemoryReserved"` + MemoryFree float64 `json:"memoryFree"` + MemoryTotal float64 `json:"memoryTotal"` + OS string `json:"os"` + Ports map[string]int `json:"ports"` + Services []string `json:"services"` + Status string `json:"status"` + Uptime int `json:"uptime,string"` + Version string `json:"version"` + ThisNode bool `json:"thisNode,omitempty"` +} + +// A Pool of nodes and buckets. +type Pool struct { + BucketMap map[string]Bucket + Nodes []Node + + BucketURL map[string]string `json:"buckets"` + + client Client +} + +// VBucketServerMap is the a mapping of vbuckets to nodes. +type VBucketServerMap struct { + HashAlgorithm string `json:"hashAlgorithm"` + NumReplicas int `json:"numReplicas"` + ServerList []string `json:"serverList"` + VBucketMap [][]int `json:"vBucketMap"` +} + +type DurablitySettings struct { + Persist PersistTo + Observe ObserveTo +} + +// Bucket is the primary entry point for most data operations. +// Bucket is a locked data structure. All access to its fields should be done using read or write locking, +// as appropriate. +// +// Some access methods require locking, but rely on the caller to do so. These are appropriate +// for calls from methods that have already locked the structure. Methods like this +// take a boolean parameter "bucketLocked". +type Bucket struct { + sync.RWMutex + AuthType string `json:"authType"` + Capabilities []string `json:"bucketCapabilities"` + CapabilitiesVersion string `json:"bucketCapabilitiesVer"` + Type string `json:"bucketType"` + Name string `json:"name"` + NodeLocator string `json:"nodeLocator"` + Quota map[string]float64 `json:"quota,omitempty"` + Replicas int `json:"replicaNumber"` + Password string `json:"saslPassword"` + URI string `json:"uri"` + StreamingURI string `json:"streamingUri"` + LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"` + UUID string `json:"uuid"` + ConflictResolutionType string `json:"conflictResolutionType,omitempty"` + DDocs struct { + URI string `json:"uri"` + } `json:"ddocs,omitempty"` + BasicStats map[string]interface{} `json:"basicStats,omitempty"` + Controllers map[string]interface{} `json:"controllers,omitempty"` + + // These are used for JSON IO, but isn't used for processing + // since it needs to be swapped out safely. + VBSMJson VBucketServerMap `json:"vBucketServerMap"` + NodesJSON []Node `json:"nodes"` + + pool *Pool + connPools unsafe.Pointer // *[]*connectionPool + vBucketServerMap unsafe.Pointer // *VBucketServerMap + nodeList unsafe.Pointer // *[]Node + commonSufix string + ah AuthHandler // auth handler + ds *DurablitySettings // Durablity Settings for this bucket + Scopes Scopes +} + +// PoolServices is all the bucket-independent services in a pool +type PoolServices struct { + Rev int `json:"rev"` + NodesExt []NodeServices `json:"nodesExt"` +} + +// NodeServices is all the bucket-independent services running on +// a node (given by Hostname) +type NodeServices struct { + Services map[string]int `json:"services,omitempty"` + Hostname string `json:"hostname"` + ThisNode bool `json:"thisNode"` +} + +type BucketNotFoundError struct { + bucket string +} + +func (e *BucketNotFoundError) Error() string { + return fmt.Sprint("No bucket named " + e.bucket) +} + +type BucketAuth struct { + name string + saslPwd string + bucket string +} + +func newBucketAuth(name string, pass string, bucket string) *BucketAuth { + return &BucketAuth{name: name, saslPwd: pass, bucket: bucket} +} + +func (ba *BucketAuth) GetCredentials() (string, string, string) { + return ba.name, ba.saslPwd, ba.bucket +} + +// VBServerMap returns the current VBucketServerMap. +func (b *Bucket) VBServerMap() *VBucketServerMap { + b.RLock() + defer b.RUnlock() + ret := (*VBucketServerMap)(b.vBucketServerMap) + return ret +} + +func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) { + vbmap := b.VBServerMap() + servers := vbmap.ServerList + if addrs == nil { + addrs = vbmap.ServerList + } + + m := make(map[string][]uint16) + for _, addr := range addrs { + m[addr] = make([]uint16, 0) + } + for vbno, idxs := range vbmap.VBucketMap { + if len(idxs) == 0 { + return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno) + } else if idxs[0] < 0 || idxs[0] >= len(servers) { + return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno) + } + addr := servers[idxs[0]] + if _, ok := m[addr]; ok { + m[addr] = append(m[addr], uint16(vbno)) + } + } + return m, nil +} + +// true if node is not on the bucket VBmap +func (b *Bucket) checkVBmap(node string) bool { + vbmap := b.VBServerMap() + servers := vbmap.ServerList + + for _, idxs := range vbmap.VBucketMap { + if len(idxs) == 0 { + return true + } else if idxs[0] < 0 || idxs[0] >= len(servers) { + return true + } + if servers[idxs[0]] == node { + return false + } + } + return true +} + +func (b *Bucket) GetName() string { + b.RLock() + defer b.RUnlock() + ret := b.Name + return ret +} + +// Nodes returns teh current list of nodes servicing this bucket. +func (b *Bucket) Nodes() []Node { + b.RLock() + defer b.RUnlock() + ret := *(*[]Node)(b.nodeList) + return ret +} + +// return the list of healthy nodes +func (b *Bucket) HealthyNodes() []Node { + nodes := []Node{} + + for _, n := range b.Nodes() { + if n.Status == "healthy" && n.CouchAPIBase != "" { + nodes = append(nodes, n) + } + if n.Status != "healthy" { // log non-healthy node + logging.Infof("Non-healthy node; node details:") + logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode) + } + } + + return nodes +} + +func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool { + if !bucketLocked { + b.RLock() + defer b.RUnlock() + } + if b.connPools != nil { + return *(*[]*connectionPool)(b.connPools) + } else { + return nil + } +} + +func (b *Bucket) replaceConnPools(with []*connectionPool) { + b.Lock() + defer b.Unlock() + + old := b.connPools + b.connPools = unsafe.Pointer(&with) + if old != nil { + for _, pool := range *(*[]*connectionPool)(old) { + if pool != nil { + pool.Close() + } + } + } + return +} + +func (b *Bucket) getConnPool(i int) *connectionPool { + + if i < 0 { + return nil + } + + p := b.getConnPools(false /* not already locked */) + if len(p) > i { + return p[i] + } + + return nil +} + +func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool { + pools := b.getConnPools(bucketLocked) + for _, p := range pools { + if p != nil && p.host == host { + return p + } + } + + return nil +} + +// Given a vbucket number, returns a memcached connection to it. +// The connection must be returned to its pool after use. +func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) { + for { + vbm := b.VBServerMap() + if len(vbm.VBucketMap) < int(vb) { + return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v", + vb, vbm.VBucketMap) + } + masterId := vbm.VBucketMap[vb][0] + if masterId < 0 { + return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb) + } + pool := b.getConnPool(masterId) + conn, err := pool.Get() + if err != errClosedPool { + return conn, pool, err + } + // If conn pool was closed, because another goroutine refreshed the vbucket map, retry... + } +} + +// To get random documents, we need to cover all the nodes, so select +// a connection at random. + +func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) { + for { + var currentPool = 0 + pools := b.getConnPools(false /* not already locked */) + if len(pools) == 0 { + return nil, nil, fmt.Errorf("No connection pool found") + } else if len(pools) > 1 { // choose a random connection + currentPool = rand.Intn(len(pools)) + } // if only one pool, currentPool defaults to 0, i.e., the only pool + + // get the pool + pool := pools[currentPool] + conn, err := pool.Get() + if err != errClosedPool { + return conn, pool, err + } + + // If conn pool was closed, because another goroutine refreshed the vbucket map, retry... + } +} + +// +// Get a random document from a bucket. Since the bucket may be distributed +// across nodes, we must first select a random connection, and then use the +// Client.GetRandomDoc() call to get a random document from that node. +// + +func (b *Bucket) GetRandomDoc() (*gomemcached.MCResponse, error) { + // get a connection from the pool + conn, pool, err := b.getRandomConnection() + + if err != nil { + return nil, err + } + + // get a randomm document from the connection + doc, err := conn.GetRandomDoc() + // need to return the connection to the pool + pool.Return(conn) + return doc, err +} + +func (b *Bucket) getMasterNode(i int) string { + p := b.getConnPools(false /* not already locked */) + if len(p) > i { + return p[i].host + } + return "" +} + +func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) { + if !bucketLocked { + b.RLock() + defer b.RUnlock() + } + pool := b.pool + name := b.Name + + if pool != nil { + ah = pool.client.ah + } + if mbah, ok := ah.(MultiBucketAuthHandler); ok { + return mbah.ForBucket(name) + } + if ah == nil { + ah = &basicAuth{name, ""} + } + return +} + +// NodeAddresses gets the (sorted) list of memcached node addresses +// (hostname:port). +func (b *Bucket) NodeAddresses() []string { + vsm := b.VBServerMap() + rv := make([]string, len(vsm.ServerList)) + copy(rv, vsm.ServerList) + sort.Strings(rv) + return rv +} + +// CommonAddressSuffix finds the longest common suffix of all +// host:port strings in the node list. +func (b *Bucket) CommonAddressSuffix() string { + input := []string{} + for _, n := range b.Nodes() { + input = append(input, n.Hostname) + } + return FindCommonSuffix(input) +} + +// A Client is the starting point for all services across all buckets +// in a Couchbase cluster. +type Client struct { + BaseURL *url.URL + ah AuthHandler + Info Pools +} + +func maybeAddAuth(req *http.Request, ah AuthHandler) error { + if hah, ok := ah.(HTTPAuthHandler); ok { + return hah.SetCredsForRequest(req) + } + if ah != nil { + user, pass, _ := ah.GetCredentials() + req.Header.Set("Authorization", "Basic "+ + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))) + } + return nil +} + +// arbitary number, may need to be tuned #FIXME +const HTTP_MAX_RETRY = 5 + +// Someday golang network packages will implement standard +// error codes. Until then #sigh +func isHttpConnError(err error) bool { + + estr := err.Error() + return strings.Contains(estr, "broken pipe") || + strings.Contains(estr, "broken connection") || + strings.Contains(estr, "connection reset") +} + +var client *http.Client + +func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) { + cfg := &tls.Config{} + + if certFile != "" && keyFile != "" { + tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + cfg.Certificates = []tls.Certificate{tlsCert} + } else { + //error need to pass both certfile and keyfile + return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile") + } + + var caCert []byte + var err1 error + + caCertPool := x509.NewCertPool() + if rootFile != "" { + // Read that value in + caCert, err1 = ioutil.ReadFile(rootFile) + if err1 != nil { + return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1) + } + caCertPool.AppendCertsFromPEM(caCert) + } + + cfg.RootCAs = caCertPool + return cfg, nil +} + +func doHTTPRequest(req *http.Request) (*http.Response, error) { + + var err error + var res *http.Response + + tr := &http.Transport{} + + // we need a client that ignores certificate errors, since we self-sign + // our certs + if client == nil && req.URL.Scheme == "https" { + if skipVerify { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } else { + // Handle cases with cert + + cfg, err := ClientConfigForX509(certFile, keyFile, rootFile) + if err != nil { + return nil, err + } + + tr = &http.Transport{ + TLSClientConfig: cfg, + } + } + + client = &http.Client{Transport: tr} + + } else if client == nil { + client = HTTPClient + } + + for i := 0; i < HTTP_MAX_RETRY; i++ { + res, err = client.Do(req) + if err != nil && isHttpConnError(err) { + continue + } + break + } + + if err != nil { + return nil, err + } + + return res, err +} + +func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error { + return doOutputAPI("PUT", baseURL, path, params, authHandler, out) +} + +func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error { + return doOutputAPI("POST", baseURL, path, params, authHandler, out) +} + +func doOutputAPI( + httpVerb string, + baseURL *url.URL, + path string, + params map[string]interface{}, + authHandler AuthHandler, + out interface{}) error { + + var requestUrl string + + if q := strings.Index(path, "?"); q > 0 { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] + } else { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path + } + + postData := url.Values{} + for k, v := range params { + postData.Set(k, fmt.Sprintf("%v", v)) + } + + req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode())) + if err != nil { + return err + } + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + err = maybeAddAuth(req, authHandler) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + return fmt.Errorf("HTTP error %v getting %q: %s", + res.Status, requestUrl, bod) + } + + d := json.NewDecoder(res.Body) + if err = d.Decode(&out); err != nil { + return err + } + return nil +} + +func queryRestAPI( + baseURL *url.URL, + path string, + authHandler AuthHandler, + out interface{}) error { + + var requestUrl string + + if q := strings.Index(path, "?"); q > 0 { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] + } else { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path + } + + req, err := http.NewRequest("GET", requestUrl, nil) + if err != nil { + return err + } + + err = maybeAddAuth(req, authHandler) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + return fmt.Errorf("HTTP error %v getting %q: %s", + res.Status, requestUrl, bod) + } + + d := json.NewDecoder(res.Body) + if err = d.Decode(&out); err != nil { + return err + } + return nil +} + +func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error { + return c.processStream(c.BaseURL, path, c.ah, callb, data) +} + +// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309 +func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error { + var requestUrl string + + if q := strings.Index(path, "?"); q > 0 { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:] + } else { + requestUrl = baseURL.Scheme + "://" + baseURL.Host + path + } + + req, err := http.NewRequest("GET", requestUrl, nil) + if err != nil { + return err + } + + err = maybeAddAuth(req, authHandler) + if err != nil { + return err + } + + res, err := doHTTPRequest(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != 200 { + bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) + return fmt.Errorf("HTTP error %v getting %q: %s", + res.Status, requestUrl, bod) + } + + reader := bufio.NewReader(res.Body) + for { + bs, err := reader.ReadBytes('\n') + if err != nil { + return err + } + if len(bs) == 1 && bs[0] == '\n' { + continue + } + + err = json.Unmarshal(bs, data) + if err != nil { + return err + } + err = callb(data) + if err != nil { + return err + } + } + return nil + +} + +func (c *Client) parseURLResponse(path string, out interface{}) error { + return queryRestAPI(c.BaseURL, path, c.ah, out) +} + +func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error { + return doPostAPI(c.BaseURL, path, params, c.ah, out) +} + +func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error { + return doPutAPI(c.BaseURL, path, params, c.ah, out) +} + +func (b *Bucket) parseURLResponse(path string, out interface{}) error { + nodes := b.Nodes() + if len(nodes) == 0 { + return errors.New("no couch rest URLs") + } + + // Pick a random node to start querying. + startNode := rand.Intn(len(nodes)) + maxRetries := len(nodes) + for i := 0; i < maxRetries; i++ { + node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list. + // Skip non-healthy nodes. + if node.Status != "healthy" || node.CouchAPIBase == "" { + continue + } + url := &url.URL{ + Host: node.Hostname, + Scheme: "http", + } + + // Lock here to avoid having pool closed under us. + b.RLock() + err := queryRestAPI(url, path, b.pool.client.ah, out) + b.RUnlock() + if err == nil { + return err + } + } + return errors.New("All nodes failed to respond or no healthy nodes for bucket found") +} + +func (b *Bucket) parseAPIResponse(path string, out interface{}) error { + nodes := b.Nodes() + if len(nodes) == 0 { + return errors.New("no couch rest URLs") + } + + var err error + var u *url.URL + + // Pick a random node to start querying. + startNode := rand.Intn(len(nodes)) + maxRetries := len(nodes) + for i := 0; i < maxRetries; i++ { + node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list. + // Skip non-healthy nodes. + if node.Status != "healthy" || node.CouchAPIBase == "" { + continue + } + + u, err = ParseURL(node.CouchAPIBase) + // Lock here so pool does not get closed under us. + b.RLock() + if err != nil { + b.RUnlock() + return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v", + b.Name, i, node.CouchAPIBase, err) + } else if b.pool != nil { + u.User = b.pool.client.BaseURL.User + } + u.Path = path + + // generate the path so that the strings are properly escaped + // MB-13770 + requestPath := strings.Split(u.String(), u.Host)[1] + + err = queryRestAPI(u, requestPath, b.pool.client.ah, out) + b.RUnlock() + if err == nil { + return err + } + } + + var errStr string + if err != nil { + errStr = "Error " + err.Error() + } + + return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr) +} + +type basicAuth struct { + u, p string +} + +func (b basicAuth) GetCredentials() (string, string, string) { + return b.u, b.p, b.u +} + +func basicAuthFromURL(us string) (ah AuthHandler) { + u, err := ParseURL(us) + if err != nil { + return + } + if user := u.User; user != nil { + pw, _ := user.Password() + ah = basicAuth{user.Username(), pw} + } + return +} + +// ConnectWithAuth connects to a couchbase cluster with the given +// authentication handler. +func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) { + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + c.ah = ah + + return c, c.parseURLResponse("/pools", &c.Info) +} + +// ConnectWithAuthCreds connects to a couchbase cluster with the give +// authorization creds returned by cb_auth +func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) { + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + + c.ah = newBucketAuth(username, password, "") + return c, c.parseURLResponse("/pools", &c.Info) + +} + +// Connect to a couchbase cluster. An authentication handler will be +// created from the userinfo in the URL if provided. +func Connect(baseU string) (Client, error) { + return ConnectWithAuth(baseU, basicAuthFromURL(baseU)) +} + +type BucketInfo struct { + Name string // name of bucket + Password string // SASL password of bucket +} + +//Get SASL buckets +func GetBucketList(baseU string) (bInfo []BucketInfo, err error) { + + c := &Client{} + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + c.ah = basicAuthFromURL(baseU) + + var buckets []Bucket + err = c.parseURLResponse("/pools/default/buckets", &buckets) + if err != nil { + return + } + bInfo = make([]BucketInfo, 0) + for _, bucket := range buckets { + bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password} + bInfo = append(bInfo, bucketInfo) + } + return bInfo, err +} + +//Set viewUpdateDaemonOptions +func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) { + + c := &Client{} + c.BaseURL, err = ParseURL(baseU) + if err != nil { + return + } + c.ah = basicAuthFromURL(baseU) + + if len(params) < 1 { + return nil, fmt.Errorf("No params to set") + } + + err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts) + if err != nil { + return + } + return viewOpts, err +} + +// This API lets the caller know, if the list of nodes a bucket is +// connected to has gone through an edit (a rebalance operation) +// since the last update to the bucket, in which case a Refresh is +// advised. +func (b *Bucket) NodeListChanged() bool { + b.RLock() + pool := b.pool + uri := b.URI + b.RUnlock() + + tmpb := &Bucket{} + err := pool.client.parseURLResponse(uri, tmpb) + if err != nil { + return true + } + + bNodes := *(*[]Node)(b.nodeList) + if len(bNodes) != len(tmpb.NodesJSON) { + return true + } + + bucketHostnames := map[string]bool{} + for _, node := range bNodes { + bucketHostnames[node.Hostname] = true + } + + for _, node := range tmpb.NodesJSON { + if _, found := bucketHostnames[node.Hostname]; !found { + return true + } + } + + return false +} + +// Sample data for scopes and collections as returned from the +// /pooles/default/$BUCKET_NAME/collections API. +// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}} + +// A Scopes holds the set of scopes in a bucket. +// The map key is the name of the scope. +type Scopes map[string]Collections + +// A Collections holds the set of collections in a scope. +// The map key is the name of the collection. +type Collections map[string]Collection + +// A Collection holds the information for a collection. +// It is currently returned empty. +type Collection struct{} + +func getScopesAndCollections(pool *Pool, bucketName string) (Scopes, error) { + scopes := make(Scopes) + // This URL is a bit of a hack. The "default" is the name of the pool, and should + // be a parameter. But the name does not appear to be available anywhere, + // and in any case we never use a pool other than "default". + err := pool.client.parseURLResponse(fmt.Sprintf("/pools/default/buckets/%s/collections", bucketName), &scopes) + if err != nil { + return nil, err + } + return scopes, nil +} + +func (b *Bucket) Refresh() error { + b.RLock() + pool := b.pool + uri := b.URI + name := b.Name + b.RUnlock() + + tmpb := &Bucket{} + err := pool.client.parseURLResponse(uri, tmpb) + if err != nil { + return err + } + + scopes, err := getScopesAndCollections(pool, name) + if err != nil { + return err + } + + pools := b.getConnPools(false /* bucket not already locked */) + + // We need this lock to ensure that bucket refreshes happening because + // of NMVb errors received during bulkGet do not end up over-writing + // pool.inUse. + b.Lock() + + for _, pool := range pools { + if pool != nil { + pool.inUse = false + } + } + + newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) + for i := range newcps { + + pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) + if pool != nil && pool.inUse == false { + // if the hostname and index is unchanged then reuse this pool + newcps[i] = pool + pool.inUse = true + continue + } + + if b.ah != nil { + newcps[i] = newConnectionPool( + tmpb.VBSMJson.ServerList[i], + b.ah, AsynchronousCloser, PoolSize, PoolOverflow) + + } else { + newcps[i] = newConnectionPool( + tmpb.VBSMJson.ServerList[i], + b.authHandler(true /* bucket already locked */), + AsynchronousCloser, PoolSize, PoolOverflow) + } + } + b.replaceConnPools2(newcps, true /* bucket already locked */) + tmpb.ah = b.ah + b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) + b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) + b.Scopes = scopes + + b.Unlock() + return nil +} + +func (p *Pool) refresh() (err error) { + p.BucketMap = make(map[string]Bucket) + + buckets := []Bucket{} + err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets) + if err != nil { + return err + } + for _, b := range buckets { + b.pool = p + b.nodeList = unsafe.Pointer(&b.NodesJSON) + b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList))) + + p.BucketMap[b.Name] = b + } + return nil +} + +// GetPool gets a pool from within the couchbase cluster (usually +// "default"). +func (c *Client) GetPool(name string) (p Pool, err error) { + var poolURI string + for _, p := range c.Info.Pools { + if p.Name == name { + poolURI = p.URI + } + } + if poolURI == "" { + return p, errors.New("No pool named " + name) + } + + err = c.parseURLResponse(poolURI, &p) + + p.client = *c + + err = p.refresh() + return +} + +// GetPoolServices returns all the bucket-independent services in a pool. +// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV) +func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) { + var poolName string + for _, p := range c.Info.Pools { + if p.Name == name { + poolName = p.Name + } + } + if poolName == "" { + return ps, errors.New("No pool named " + name) + } + + poolURI := "/pools/" + poolName + "/nodeServices" + err = c.parseURLResponse(poolURI, &ps) + + return +} + +// Close marks this bucket as no longer needed, closing connections it +// may have open. +func (b *Bucket) Close() { + b.Lock() + defer b.Unlock() + if b.connPools != nil { + for _, c := range b.getConnPools(true /* already locked */) { + if c != nil { + c.Close() + } + } + b.connPools = nil + } +} + +func bucketFinalizer(b *Bucket) { + if b.connPools != nil { + logging.Warnf("Finalizing a bucket with active connections.") + } +} + +// GetBucket gets a bucket from within this pool. +func (p *Pool) GetBucket(name string) (*Bucket, error) { + rv, ok := p.BucketMap[name] + if !ok { + return nil, &BucketNotFoundError{bucket: name} + } + runtime.SetFinalizer(&rv, bucketFinalizer) + err := rv.Refresh() + if err != nil { + return nil, err + } + return &rv, nil +} + +// GetBucket gets a bucket from within this pool. +func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) { + rv, ok := p.BucketMap[bucket] + if !ok { + return nil, &BucketNotFoundError{bucket: bucket} + } + runtime.SetFinalizer(&rv, bucketFinalizer) + rv.ah = newBucketAuth(username, password, bucket) + err := rv.Refresh() + if err != nil { + return nil, err + } + return &rv, nil +} + +// GetPool gets the pool to which this bucket belongs. +func (b *Bucket) GetPool() *Pool { + b.RLock() + defer b.RUnlock() + ret := b.pool + return ret +} + +// GetClient gets the client from which we got this pool. +func (p *Pool) GetClient() *Client { + return &p.client +} + +// GetBucket is a convenience function for getting a named bucket from +// a URL +func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) { + var err error + client, err := Connect(endpoint) + if err != nil { + return nil, err + } + + pool, err := client.GetPool(poolname) + if err != nil { + return nil, err + } + + return pool.GetBucket(bucketname) +} + +// ConnectWithAuthAndGetBucket is a convenience function for +// getting a named bucket from a given URL and an auth callback +func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string, + ah AuthHandler) (*Bucket, error) { + client, err := ConnectWithAuth(endpoint, ah) + if err != nil { + return nil, err + } + + pool, err := client.GetPool(poolname) + if err != nil { + return nil, err + } + + return pool.GetBucket(bucketname) +} |