summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go')
-rw-r--r--vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go387
1 files changed, 387 insertions, 0 deletions
diff --git a/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go b/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go
new file mode 100644
index 0000000000..babd3adb6a
--- /dev/null
+++ b/vendor/github.com/couchbaselabs/go-couchbase/conn_pool.go
@@ -0,0 +1,387 @@
+package couchbase
+
+import (
+ "errors"
+ "sync/atomic"
+ "time"
+
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/gomemcached/client"
+ "github.com/couchbase/goutils/logging"
+)
+
+// GenericMcdAuthHandler is a kind of AuthHandler that performs
+// special auth exchange (like non-standard auth, possibly followed by
+// select-bucket).
+type GenericMcdAuthHandler interface {
+ AuthHandler
+ AuthenticateMemcachedConn(host string, conn *memcached.Client) error
+}
+
+// Error raised when a connection can't be retrieved from a pool.
+var TimeoutError = errors.New("timeout waiting to build connection")
+var errClosedPool = errors.New("the connection pool is closed")
+var errNoPool = errors.New("no connection pool")
+
+// Default timeout for retrieving a connection from the pool.
+var ConnPoolTimeout = time.Hour * 24 * 30
+
+// overflow connection closer cycle time
+var ConnCloserInterval = time.Second * 30
+
+// ConnPoolAvailWaitTime is the amount of time to wait for an existing
+// connection from the pool before considering the creation of a new
+// one.
+var ConnPoolAvailWaitTime = time.Millisecond
+
+type connectionPool struct {
+ host string
+ mkConn func(host string, ah AuthHandler) (*memcached.Client, error)
+ auth AuthHandler
+ connections chan *memcached.Client
+ createsem chan bool
+ bailOut chan bool
+ poolSize int
+ connCount uint64
+ inUse bool
+}
+
+func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int) *connectionPool {
+ connSize := poolSize
+ if closer {
+ connSize += poolOverflow
+ }
+ rv := &connectionPool{
+ host: host,
+ connections: make(chan *memcached.Client, connSize),
+ createsem: make(chan bool, poolSize+poolOverflow),
+ mkConn: defaultMkConn,
+ auth: ah,
+ poolSize: poolSize,
+ }
+ if closer {
+ rv.bailOut = make(chan bool, 1)
+ go rv.connCloser()
+ }
+ return rv
+}
+
+// ConnPoolTimeout is notified whenever connections are acquired from a pool.
+var ConnPoolCallback func(host string, source string, start time.Time, err error)
+
+func defaultMkConn(host string, ah AuthHandler) (*memcached.Client, error) {
+ var features memcached.Features
+
+ conn, err := memcached.Connect("tcp", host)
+ if err != nil {
+ return nil, err
+ }
+
+ if TCPKeepalive == true {
+ conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second)
+ }
+
+ if EnableMutationToken == true {
+ features = append(features, memcached.FeatureMutationToken)
+ }
+ if EnableDataType == true {
+ features = append(features, memcached.FeatureDataType)
+ }
+
+ if EnableXattr == true {
+ features = append(features, memcached.FeatureXattr)
+ }
+
+ if len(features) > 0 {
+ if DefaultTimeout > 0 {
+ conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
+ }
+
+ res, err := conn.EnableFeatures(features)
+
+ if DefaultTimeout > 0 {
+ conn.SetDeadline(noDeadline)
+ }
+
+ if err != nil && isTimeoutError(err) {
+ conn.Close()
+ return nil, err
+ }
+
+ if err != nil || res.Status != gomemcached.SUCCESS {
+ logging.Warnf("Unable to enable features %v", err)
+ }
+ }
+
+ if gah, ok := ah.(GenericMcdAuthHandler); ok {
+ err = gah.AuthenticateMemcachedConn(host, conn)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+ return conn, nil
+ }
+ name, pass, bucket := ah.GetCredentials()
+ if name != "default" {
+ _, err = conn.Auth(name, pass)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+ // Select bucket (Required for cb_auth creds)
+ // Required when doing auth with _admin credentials
+ if bucket != "" && bucket != name {
+ _, err = conn.SelectBucket(bucket)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+ }
+ }
+ return conn, nil
+}
+
+func (cp *connectionPool) Close() (err error) {
+ defer func() {
+ if recover() != nil {
+ err = errors.New("connectionPool.Close error")
+ }
+ }()
+ if cp.bailOut != nil {
+
+ // defensively, we won't wait if the channel is full
+ select {
+ case cp.bailOut <- false:
+ default:
+ }
+ }
+ close(cp.connections)
+ for c := range cp.connections {
+ c.Close()
+ }
+ return
+}
+
+func (cp *connectionPool) Node() string {
+ return cp.host
+}
+
+func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) {
+ if cp == nil {
+ return nil, errNoPool
+ }
+
+ path := ""
+
+ if ConnPoolCallback != nil {
+ defer func(path *string, start time.Time) {
+ ConnPoolCallback(cp.host, *path, start, err)
+ }(&path, time.Now())
+ }
+
+ path = "short-circuit"
+
+ // short-circuit available connetions.
+ select {
+ case rv, isopen := <-cp.connections:
+ if !isopen {
+ return nil, errClosedPool
+ }
+ atomic.AddUint64(&cp.connCount, 1)
+ return rv, nil
+ default:
+ }
+
+ t := time.NewTimer(ConnPoolAvailWaitTime)
+ defer t.Stop()
+
+ // Try to grab an available connection within 1ms
+ select {
+ case rv, isopen := <-cp.connections:
+ path = "avail1"
+ if !isopen {
+ return nil, errClosedPool
+ }
+ atomic.AddUint64(&cp.connCount, 1)
+ return rv, nil
+ case <-t.C:
+ // No connection came around in time, let's see
+ // whether we can get one or build a new one first.
+ t.Reset(d) // Reuse the timer for the full timeout.
+ select {
+ case rv, isopen := <-cp.connections:
+ path = "avail2"
+ if !isopen {
+ return nil, errClosedPool
+ }
+ atomic.AddUint64(&cp.connCount, 1)
+ return rv, nil
+ case cp.createsem <- true:
+ path = "create"
+ // Build a connection if we can't get a real one.
+ // This can potentially be an overflow connection, or
+ // a pooled connection.
+ rv, err := cp.mkConn(cp.host, cp.auth)
+ if err != nil {
+ // On error, release our create hold
+ <-cp.createsem
+ } else {
+ atomic.AddUint64(&cp.connCount, 1)
+ }
+ return rv, err
+ case <-t.C:
+ return nil, ErrTimeout
+ }
+ }
+}
+
+func (cp *connectionPool) Get() (*memcached.Client, error) {
+ return cp.GetWithTimeout(ConnPoolTimeout)
+}
+
+func (cp *connectionPool) Return(c *memcached.Client) {
+ if c == nil {
+ return
+ }
+
+ if cp == nil {
+ c.Close()
+ }
+
+ if c.IsHealthy() {
+ defer func() {
+ if recover() != nil {
+ // This happens when the pool has already been
+ // closed and we're trying to return a
+ // connection to it anyway. Just close the
+ // connection.
+ c.Close()
+ }
+ }()
+
+ select {
+ case cp.connections <- c:
+ default:
+ <-cp.createsem
+ c.Close()
+ }
+ } else {
+ <-cp.createsem
+ c.Close()
+ }
+}
+
+// give the ability to discard a connection from a pool
+// useful for ditching connections to the wrong node after a rebalance
+func (cp *connectionPool) Discard(c *memcached.Client) {
+ <-cp.createsem
+ c.Close()
+}
+
+// asynchronous connection closer
+func (cp *connectionPool) connCloser() {
+ var connCount uint64
+
+ t := time.NewTimer(ConnCloserInterval)
+ defer t.Stop()
+
+ for {
+ connCount = cp.connCount
+
+ // we don't exist anymore! bail out!
+ select {
+ case <-cp.bailOut:
+ return
+ case <-t.C:
+ }
+ t.Reset(ConnCloserInterval)
+
+ // no overflow connections open or sustained requests for connections
+ // nothing to do until the next cycle
+ if len(cp.connections) <= cp.poolSize ||
+ ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) {
+ continue
+ }
+
+ // close overflow connections now that they are not needed
+ for c := range cp.connections {
+ select {
+ case <-cp.bailOut:
+ return
+ default:
+ }
+
+ // bail out if close did not work out
+ if !cp.connCleanup(c) {
+ return
+ }
+ if len(cp.connections) <= cp.poolSize {
+ break
+ }
+ }
+ }
+}
+
+// close connection with recovery on error
+func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) {
+
+ // just in case we are closing a connection after
+ // bailOut has been sent but we haven't yet read it
+ defer func() {
+ if recover() != nil {
+ rv = false
+ }
+ }()
+ rv = true
+
+ c.Close()
+ <-cp.createsem
+ return
+}
+
+func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) {
+ if cp == nil {
+ return nil, errNoPool
+ }
+ mc, err := cp.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ // A connection can't be used after TAP; Dont' count it against the
+ // connection pool capacity
+ <-cp.createsem
+
+ return mc.StartTapFeed(*args)
+}
+
+const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
+
+func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) {
+ if cp == nil {
+ return nil, errNoPool
+ }
+ mc, err := cp.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ // A connection can't be used after it has been allocated to UPR;
+ // Dont' count it against the connection pool capacity
+ <-cp.createsem
+
+ uf, err := mc.NewUprFeed()
+ if err != nil {
+ return nil, err
+ }
+
+ if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil {
+ return nil, err
+ }
+
+ if err := uf.StartFeedWithConfig(data_chan_size); err != nil {
+ return nil, err
+ }
+
+ return uf, nil
+}