summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbase/gomemcached
diff options
context:
space:
mode:
authortechknowlogick <matti@mdranta.net>2019-02-05 11:52:51 -0500
committerGitHub <noreply@github.com>2019-02-05 11:52:51 -0500
commit9de871a0f8911030f8e06a881803cf722b8798ea (patch)
tree206400f0a5873d7d078fcdd004956036f07a1db5 /vendor/github.com/couchbase/gomemcached
parentbf4badad1d68c18d7ffb92c69e09e4e8aa252935 (diff)
downloadgitea-9de871a0f8911030f8e06a881803cf722b8798ea.tar.gz
gitea-9de871a0f8911030f8e06a881803cf722b8798ea.zip
add other session providers (#5963)
Diffstat (limited to 'vendor/github.com/couchbase/gomemcached')
-rw-r--r--vendor/github.com/couchbase/gomemcached/LICENSE19
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/mc.go1074
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/tap_feed.go333
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/transport.go67
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/upr_feed.go1005
-rw-r--r--vendor/github.com/couchbase/gomemcached/mc_constants.go335
-rw-r--r--vendor/github.com/couchbase/gomemcached/mc_req.go197
-rw-r--r--vendor/github.com/couchbase/gomemcached/mc_res.go267
-rw-r--r--vendor/github.com/couchbase/gomemcached/tap.go168
9 files changed, 3465 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/gomemcached/LICENSE b/vendor/github.com/couchbase/gomemcached/LICENSE
new file mode 100644
index 0000000000..b01ef80261
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2013 Dustin Sallings
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/github.com/couchbase/gomemcached/client/mc.go b/vendor/github.com/couchbase/gomemcached/client/mc.go
new file mode 100644
index 0000000000..bd1433ba28
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/client/mc.go
@@ -0,0 +1,1074 @@
+// Package memcached provides a memcached binary protocol client.
+package memcached
+
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/goutils/logging"
+ "github.com/couchbase/goutils/scramsha"
+ "github.com/pkg/errors"
+ "io"
+ "math"
+ "net"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type ClientIface interface {
+ Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
+ Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
+ Auth(user, pass string) (*gomemcached.MCResponse, error)
+ AuthList() (*gomemcached.MCResponse, error)
+ AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
+ AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
+ CASNext(vb uint16, k string, exp int, state *CASState) bool
+ CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
+ Close() error
+ Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
+ Del(vb uint16, key string) (*gomemcached.MCResponse, error)
+ EnableMutationToken() (*gomemcached.MCResponse, error)
+ Get(vb uint16, key string) (*gomemcached.MCResponse, error)
+ GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error)
+ GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
+ GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error
+ GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
+ GetRandomDoc() (*gomemcached.MCResponse, error)
+ Hijack() io.ReadWriteCloser
+ Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
+ Observe(vb uint16, key string) (result ObserveResult, err error)
+ ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
+ Receive() (*gomemcached.MCResponse, error)
+ ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
+ Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
+ Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
+ SetKeepAliveOptions(interval time.Duration)
+ SetReadDeadline(t time.Time)
+ SetDeadline(t time.Time)
+ SelectBucket(bucket string) (*gomemcached.MCResponse, error)
+ SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
+ Stats(key string) ([]StatValue, error)
+ StatsMap(key string) (map[string]string, error)
+ StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
+ Transmit(req *gomemcached.MCRequest) error
+ TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
+ TransmitResponse(res *gomemcached.MCResponse) error
+
+ // UprFeed Related
+ NewUprFeed() (*UprFeed, error)
+ NewUprFeedIface() (UprFeedIface, error)
+ NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
+ NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
+ UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
+}
+
+const bufsize = 1024
+
+var UnHealthy uint32 = 0
+var Healthy uint32 = 1
+
+type Features []Feature
+type Feature uint16
+
+const FeatureMutationToken = Feature(0x04)
+const FeatureXattr = Feature(0x06)
+const FeatureDataType = Feature(0x0b)
+
+// The Client itself.
+type Client struct {
+ conn io.ReadWriteCloser
+ // use uint32 type so that it can be accessed through atomic APIs
+ healthy uint32
+ opaque uint32
+
+ hdrBuf []byte
+}
+
+var (
+ DefaultDialTimeout = time.Duration(0) // No timeout
+
+ DefaultWriteTimeout = time.Duration(0) // No timeout
+
+ dialFun = func(prot, dest string) (net.Conn, error) {
+ return net.DialTimeout(prot, dest, DefaultDialTimeout)
+ }
+)
+
+// Connect to a memcached server.
+func Connect(prot, dest string) (rv *Client, err error) {
+ conn, err := dialFun(prot, dest)
+ if err != nil {
+ return nil, err
+ }
+ return Wrap(conn)
+}
+
+func SetDefaultTimeouts(dial, read, write time.Duration) {
+ DefaultDialTimeout = dial
+ DefaultWriteTimeout = write
+}
+
+func SetDefaultDialTimeout(dial time.Duration) {
+ DefaultDialTimeout = dial
+}
+
+func (c *Client) SetKeepAliveOptions(interval time.Duration) {
+ c.conn.(*net.TCPConn).SetKeepAlive(true)
+ c.conn.(*net.TCPConn).SetKeepAlivePeriod(interval)
+}
+
+func (c *Client) SetReadDeadline(t time.Time) {
+ c.conn.(*net.TCPConn).SetReadDeadline(t)
+}
+
+func (c *Client) SetDeadline(t time.Time) {
+ c.conn.(*net.TCPConn).SetDeadline(t)
+}
+
+// Wrap an existing transport.
+func Wrap(rwc io.ReadWriteCloser) (rv *Client, err error) {
+ client := &Client{
+ conn: rwc,
+ hdrBuf: make([]byte, gomemcached.HDR_LEN),
+ opaque: uint32(1),
+ }
+ client.setHealthy(true)
+ return client, nil
+}
+
+// Close the connection when you're done.
+func (c *Client) Close() error {
+ return c.conn.Close()
+}
+
+// IsHealthy returns true unless the client is belived to have
+// difficulty communicating to its server.
+//
+// This is useful for connection pools where we want to
+// non-destructively determine that a connection may be reused.
+func (c Client) IsHealthy() bool {
+ healthyState := atomic.LoadUint32(&c.healthy)
+ return healthyState == Healthy
+}
+
+// Send a custom request and get the response.
+func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) {
+ err = c.Transmit(req)
+ if err != nil {
+ return
+ }
+ resp, _, err := getResponse(c.conn, c.hdrBuf)
+ c.setHealthy(!gomemcached.IsFatal(err))
+ return resp, err
+}
+
+// Transmit send a request, but does not wait for a response.
+func (c *Client) Transmit(req *gomemcached.MCRequest) error {
+ if DefaultWriteTimeout > 0 {
+ c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
+ }
+ _, err := transmitRequest(c.conn, req)
+ // clear write deadline to avoid interference with future write operations
+ if DefaultWriteTimeout > 0 {
+ c.conn.(net.Conn).SetWriteDeadline(time.Time{})
+ }
+ if err != nil {
+ c.setHealthy(false)
+ }
+ return err
+}
+
+func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error {
+ c.conn.(net.Conn).SetWriteDeadline(deadline)
+
+ _, err := transmitRequest(c.conn, req)
+
+ // clear write deadline to avoid interference with future write operations
+ c.conn.(net.Conn).SetWriteDeadline(time.Time{})
+
+ if err != nil {
+ c.setHealthy(false)
+ }
+ return err
+}
+
+// TransmitResponse send a response, does not wait.
+func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error {
+ if DefaultWriteTimeout > 0 {
+ c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
+ }
+ _, err := transmitResponse(c.conn, res)
+ // clear write deadline to avoid interference with future write operations
+ if DefaultWriteTimeout > 0 {
+ c.conn.(net.Conn).SetWriteDeadline(time.Time{})
+ }
+ if err != nil {
+ c.setHealthy(false)
+ }
+ return err
+}
+
+// Receive a response
+func (c *Client) Receive() (*gomemcached.MCResponse, error) {
+ resp, _, err := getResponse(c.conn, c.hdrBuf)
+ if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
+ c.setHealthy(false)
+ }
+ return resp, err
+}
+
+func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) {
+ c.conn.(net.Conn).SetReadDeadline(deadline)
+
+ resp, _, err := getResponse(c.conn, c.hdrBuf)
+
+ // Clear read deadline to avoid interference with future read operations.
+ c.conn.(net.Conn).SetReadDeadline(time.Time{})
+
+ if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
+ c.setHealthy(false)
+ }
+ return resp, err
+}
+
+func appendMutationToken(bytes []byte) []byte {
+ bytes = append(bytes, 0, 0)
+ binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04))
+ return bytes
+}
+
+//Send a hello command to enable MutationTokens
+func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) {
+ var payload []byte
+ payload = appendMutationToken(payload)
+
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.HELLO,
+ Key: []byte("GoMemcached"),
+ Body: payload,
+ })
+
+}
+
+//Send a hello command to enable specific features
+func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) {
+ var payload []byte
+
+ for _, feature := range features {
+ payload = append(payload, 0, 0)
+ binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
+ }
+
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.HELLO,
+ Key: []byte("GoMemcached"),
+ Body: payload,
+ })
+
+}
+
+// Get the value for a key.
+func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error) {
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.GET,
+ VBucket: vb,
+ Key: []byte(key),
+ })
+}
+
+// Get the xattrs, doc value for the input key
+func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) {
+
+ extraBuf, valueBuf := GetSubDocVal(subPaths)
+ res, err := c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.SUBDOC_MULTI_LOOKUP,
+ VBucket: vb,
+ Key: []byte(key),
+ Extras: extraBuf,
+ Body: valueBuf,
+ })
+
+ if err != nil && IfResStatusError(res) {
+ return res, err
+ }
+ return res, nil
+}
+
+// Get the value for a key, and update expiry
+func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) {
+ extraBuf := make([]byte, 4)
+ binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp))
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.GAT,
+ VBucket: vb,
+ Key: []byte(key),
+ Extras: extraBuf,
+ })
+}
+
+// Get metadata for a key
+func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) {
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.GET_META,
+ VBucket: vb,
+ Key: []byte(key),
+ })
+}
+
+// Del deletes a key.
+func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error) {
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.DELETE,
+ VBucket: vb,
+ Key: []byte(key)})
+}
+
+// Get a random document
+func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error) {
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: 0xB6,
+ })
+}
+
+// AuthList lists SASL auth mechanisms.
+func (c *Client) AuthList() (*gomemcached.MCResponse, error) {
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.SASL_LIST_MECHS})
+}
+
+// Auth performs SASL PLAIN authentication against the server.
+func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) {
+ res, err := c.AuthList()
+
+ if err != nil {
+ return res, err
+ }
+
+ authMech := string(res.Body)
+ if strings.Index(authMech, "PLAIN") != -1 {
+ return c.AuthPlain(user, pass)
+ }
+ return nil, fmt.Errorf("auth mechanism PLAIN not supported")
+}
+
+// AuthScramSha performs SCRAM-SHA authentication against the server.
+func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) {
+ res, err := c.AuthList()
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to obtain list of methods.")
+ }
+
+ methods := string(res.Body)
+ method, err := scramsha.BestMethod(methods)
+ if err != nil {
+ return nil, errors.Wrap(err,
+ "Unable to select SCRAM-SHA method.")
+ }
+
+ s, err := scramsha.NewScramSha(method)
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to initialize scramsha.")
+ }
+
+ logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
+
+ message, err := s.GetStartRequest(user)
+ if err != nil {
+ return nil, errors.Wrapf(err,
+ "Error building start request for user %s.", user)
+ }
+
+ startRequest := &gomemcached.MCRequest{
+ Opcode: 0x21,
+ Key: []byte(method),
+ Body: []byte(message)}
+
+ startResponse, err := c.Send(startRequest)
+ if err != nil {
+ return nil, errors.Wrap(err, "Error sending start request.")
+ }
+
+ err = s.HandleStartResponse(string(startResponse.Body))
+ if err != nil {
+ return nil, errors.Wrap(err, "Error handling start response.")
+ }
+
+ message = s.GetFinalRequest(pass)
+
+ // send step request
+ finalRequest := &gomemcached.MCRequest{
+ Opcode: 0x22,
+ Key: []byte(method),
+ Body: []byte(message)}
+ finalResponse, err := c.Send(finalRequest)
+ if err != nil {
+ return nil, errors.Wrap(err, "Error sending final request.")
+ }
+
+ err = s.HandleFinalResponse(string(finalResponse.Body))
+ if err != nil {
+ return nil, errors.Wrap(err, "Error handling final response.")
+ }
+
+ return finalResponse, nil
+}
+
+func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) {
+ logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.SASL_AUTH,
+ Key: []byte("PLAIN"),
+ Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))})
+}
+
+// select bucket
+func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) {
+
+ return c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.SELECT_BUCKET,
+ Key: []byte(fmt.Sprintf("%s", bucket))})
+}
+
+func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
+ key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) {
+
+ req := &gomemcached.MCRequest{
+ Opcode: opcode,
+ VBucket: vb,
+ Key: []byte(key),
+ Cas: 0,
+ Opaque: 0,
+ Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
+ Body: body}
+
+ binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
+ return c.Send(req)
+}
+
+func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
+ key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) {
+
+ req := &gomemcached.MCRequest{
+ Opcode: opcode,
+ VBucket: vb,
+ Key: []byte(key),
+ Cas: cas,
+ Opaque: 0,
+ Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
+ Body: body}
+
+ binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
+ return c.Send(req)
+}
+
+// Incr increments the value at the given key.
+func (c *Client) Incr(vb uint16, key string,
+ amt, def uint64, exp int) (uint64, error) {
+
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.INCREMENT,
+ VBucket: vb,
+ Key: []byte(key),
+ Extras: make([]byte, 8+8+4),
+ }
+ binary.BigEndian.PutUint64(req.Extras[:8], amt)
+ binary.BigEndian.PutUint64(req.Extras[8:16], def)
+ binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
+
+ resp, err := c.Send(req)
+ if err != nil {
+ return 0, err
+ }
+
+ return binary.BigEndian.Uint64(resp.Body), nil
+}
+
+// Decr decrements the value at the given key.
+func (c *Client) Decr(vb uint16, key string,
+ amt, def uint64, exp int) (uint64, error) {
+
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.DECREMENT,
+ VBucket: vb,
+ Key: []byte(key),
+ Extras: make([]byte, 8+8+4),
+ }
+ binary.BigEndian.PutUint64(req.Extras[:8], amt)
+ binary.BigEndian.PutUint64(req.Extras[8:16], def)
+ binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
+
+ resp, err := c.Send(req)
+ if err != nil {
+ return 0, err
+ }
+
+ return binary.BigEndian.Uint64(resp.Body), nil
+}
+
+// Add a value for a key (store if not exists).
+func (c *Client) Add(vb uint16, key string, flags int, exp int,
+ body []byte) (*gomemcached.MCResponse, error) {
+ return c.store(gomemcached.ADD, vb, key, flags, exp, body)
+}
+
+// Set the value for a key.
+func (c *Client) Set(vb uint16, key string, flags int, exp int,
+ body []byte) (*gomemcached.MCResponse, error) {
+ return c.store(gomemcached.SET, vb, key, flags, exp, body)
+}
+
+// SetCas set the value for a key with cas
+func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
+ body []byte) (*gomemcached.MCResponse, error) {
+ return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body)
+}
+
+// Append data to the value of a key.
+func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) {
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.APPEND,
+ VBucket: vb,
+ Key: []byte(key),
+ Cas: 0,
+ Opaque: 0,
+ Body: data}
+
+ return c.Send(req)
+}
+
+// GetBulk gets keys in bulk
+func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error {
+ stopch := make(chan bool)
+ var wg sync.WaitGroup
+
+ defer func() {
+ close(stopch)
+ wg.Wait()
+ }()
+
+ if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) {
+ c.opaque = uint32(1)
+ }
+
+ opStart := c.opaque
+
+ errch := make(chan error, 2)
+
+ wg.Add(1)
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ logging.Infof("Recovered in f %v", r)
+ }
+ errch <- nil
+ wg.Done()
+ }()
+
+ ok := true
+ for ok {
+
+ select {
+ case <-stopch:
+ return
+ default:
+ res, err := c.Receive()
+
+ if err != nil && IfResStatusError(res) {
+ if res == nil || res.Status != gomemcached.KEY_ENOENT {
+ errch <- err
+ return
+ }
+ // continue receiving in case of KEY_ENOENT
+ } else if res.Opcode == gomemcached.GET ||
+ res.Opcode == gomemcached.SUBDOC_GET ||
+ res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP {
+ opaque := res.Opaque - opStart
+ if opaque < 0 || opaque >= uint32(len(keys)) {
+ // Every now and then we seem to be seeing an invalid opaque
+ // value returned from the server. When this happens log the error
+ // and the calling function will retry the bulkGet. MB-15140
+ logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body))
+ errch <- fmt.Errorf("Out of Bounds error")
+ return
+ }
+
+ rv[keys[opaque]] = res
+ }
+
+ if res.Opcode == gomemcached.NOOP {
+ ok = false
+ }
+ }
+ }
+ }()
+
+ memcachedReqPkt := &gomemcached.MCRequest{
+ Opcode: gomemcached.GET,
+ VBucket: vb,
+ }
+
+ if len(subPaths) > 0 {
+ extraBuf, valueBuf := GetSubDocVal(subPaths)
+ memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP
+ memcachedReqPkt.Extras = extraBuf
+ memcachedReqPkt.Body = valueBuf
+ }
+
+ for _, k := range keys { // Start of Get request
+ memcachedReqPkt.Key = []byte(k)
+ memcachedReqPkt.Opaque = c.opaque
+
+ err := c.Transmit(memcachedReqPkt)
+ if err != nil {
+ logging.Errorf(" Transmit failed in GetBulkAll %v", err)
+ return err
+ }
+ c.opaque++
+ } // End of Get request
+
+ // finally transmit a NOOP
+ err := c.Transmit(&gomemcached.MCRequest{
+ Opcode: gomemcached.NOOP,
+ VBucket: vb,
+ Opaque: c.opaque,
+ })
+
+ if err != nil {
+ logging.Errorf(" Transmit of NOOP failed %v", err)
+ return err
+ }
+ c.opaque++
+
+ return <-errch
+}
+
+func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) {
+
+ var ops []string
+ totalBytesLen := 0
+ num := 1
+
+ for _, v := range subPaths {
+ totalBytesLen = totalBytesLen + len([]byte(v))
+ ops = append(ops, v)
+ num = num + 1
+ }
+
+ // Xattr retrieval - subdoc multi get
+ extraBuf = append(extraBuf, uint8(0x04))
+
+ valueBuf = make([]byte, num*4+totalBytesLen)
+
+ //opcode for subdoc get
+ op := gomemcached.SUBDOC_GET
+
+ // Calculate path total bytes
+ // There are 2 ops - get xattrs - both input and $document and get whole doc
+ valIter := 0
+
+ for _, v := range ops {
+ pathBytes := []byte(v)
+ valueBuf[valIter+0] = uint8(op)
+
+ // SubdocFlagXattrPath indicates that the path refers to
+ // an Xattr rather than the document body.
+ valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR)
+
+ // 2 byte key
+ binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes)))
+
+ // Then n bytes path
+ copy(valueBuf[valIter+4:], pathBytes)
+ valIter = valIter + 4 + len(pathBytes)
+ }
+
+ return
+}
+
+// ObservedStatus is the type reported by the Observe method
+type ObservedStatus uint8
+
+// Observation status values.
+const (
+ ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted
+ ObservedPersisted = ObservedStatus(0x01) // found, persisted
+ ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete)
+ ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet)
+)
+
+// ObserveResult represents the data obtained by an Observe call
+type ObserveResult struct {
+ Status ObservedStatus // Whether the value has been persisted/deleted
+ Cas uint64 // Current value's CAS
+ PersistenceTime time.Duration // Node's average time to persist a value
+ ReplicationTime time.Duration // Node's average time to replicate a value
+}
+
+// Observe gets the persistence/replication/CAS state of a key
+func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) {
+ // http://www.couchbase.com/wiki/display/couchbase/Observe
+ body := make([]byte, 4+len(key))
+ binary.BigEndian.PutUint16(body[0:2], vb)
+ binary.BigEndian.PutUint16(body[2:4], uint16(len(key)))
+ copy(body[4:4+len(key)], key)
+
+ res, err := c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.OBSERVE,
+ VBucket: vb,
+ Body: body,
+ })
+ if err != nil {
+ return
+ }
+
+ // Parse the response data from the body:
+ if len(res.Body) < 2+2+1 {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+ outVb := binary.BigEndian.Uint16(res.Body[0:2])
+ keyLen := binary.BigEndian.Uint16(res.Body[2:4])
+ if len(res.Body) < 2+2+int(keyLen)+1+8 {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+ outKey := string(res.Body[4 : 4+keyLen])
+ if outVb != vb || outKey != key {
+ err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey)
+ return
+ }
+ result.Status = ObservedStatus(res.Body[4+keyLen])
+ result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:])
+ // The response reuses the Cas field to store time statistics:
+ result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond
+ result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond
+ return
+}
+
+// CheckPersistence checks whether a stored value has been persisted to disk yet.
+func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) {
+ switch {
+ case result.Status == ObservedNotFound && deletion:
+ persisted = true
+ case result.Cas != cas:
+ overwritten = true
+ case result.Status == ObservedPersisted:
+ persisted = true
+ }
+ return
+}
+
+// Sequence number based Observe Implementation
+type ObserveSeqResult struct {
+ Failover uint8 // Set to 1 if a failover took place
+ VbId uint16 // vbucket id
+ Vbuuid uint64 // vucket uuid
+ LastPersistedSeqNo uint64 // last persisted sequence number
+ CurrentSeqNo uint64 // current sequence number
+ OldVbuuid uint64 // Old bucket vbuuid
+ LastSeqNo uint64 // last sequence number received before failover
+}
+
+func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) {
+ // http://www.couchbase.com/wiki/display/couchbase/Observe
+ body := make([]byte, 8)
+ binary.BigEndian.PutUint64(body[0:8], vbuuid)
+
+ res, err := c.Send(&gomemcached.MCRequest{
+ Opcode: gomemcached.OBSERVE_SEQNO,
+ VBucket: vb,
+ Body: body,
+ Opaque: 0x01,
+ })
+ if err != nil {
+ return
+ }
+
+ if res.Status != gomemcached.SUCCESS {
+ return nil, fmt.Errorf(" Observe returned error %v", res.Status)
+ }
+
+ // Parse the response data from the body:
+ if len(res.Body) < (1 + 2 + 8 + 8 + 8) {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+
+ result = &ObserveSeqResult{}
+ result.Failover = res.Body[0]
+ result.VbId = binary.BigEndian.Uint16(res.Body[1:3])
+ result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11])
+ result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19])
+ result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27])
+
+ // in case of failover processing we can have old vbuuid and the last persisted seq number
+ if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) {
+ result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35])
+ result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43])
+ }
+
+ return
+}
+
+// CasOp is the type of operation to perform on this CAS loop.
+type CasOp uint8
+
+const (
+ // CASStore instructs the server to store the new value normally
+ CASStore = CasOp(iota)
+ // CASQuit instructs the client to stop attempting to CAS, leaving value untouched
+ CASQuit
+ // CASDelete instructs the server to delete the current value
+ CASDelete
+)
+
+// User specified termination is returned as an error.
+func (c CasOp) Error() string {
+ switch c {
+ case CASStore:
+ return "CAS store"
+ case CASQuit:
+ return "CAS quit"
+ case CASDelete:
+ return "CAS delete"
+ }
+ panic("Unhandled value")
+}
+
+//////// CAS TRANSFORM
+
+// CASState tracks the state of CAS over several operations.
+//
+// This is used directly by CASNext and indirectly by CAS
+type CASState struct {
+ initialized bool // false on the first call to CASNext, then true
+ Value []byte // Current value of key; update in place to new value
+ Cas uint64 // Current CAS value of key
+ Exists bool // Does a value exist for the key? (If not, Value will be nil)
+ Err error // Error, if any, after CASNext returns false
+ resp *gomemcached.MCResponse
+}
+
+// CASNext is a non-callback, loop-based version of CAS method.
+//
+// Usage is like this:
+//
+// var state memcached.CASState
+// for client.CASNext(vb, key, exp, &state) {
+// state.Value = some_mutation(state.Value)
+// }
+// if state.Err != nil { ... }
+func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool {
+ if state.initialized {
+ if !state.Exists {
+ // Adding a new key:
+ if state.Value == nil {
+ state.Cas = 0
+ return false // no-op (delete of non-existent value)
+ }
+ state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value)
+ } else {
+ // Updating / deleting a key:
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.DELETE,
+ VBucket: vb,
+ Key: []byte(k),
+ Cas: state.Cas}
+ if state.Value != nil {
+ req.Opcode = gomemcached.SET
+ req.Opaque = 0
+ req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0}
+ req.Body = state.Value
+
+ flags := 0
+ binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
+ }
+ state.resp, state.Err = c.Send(req)
+ }
+
+ // If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to
+ // get the new value (below). Otherwise, we're done (either success or failure) so return:
+ if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS ||
+ state.resp.Status == gomemcached.NOT_STORED)) {
+ state.Cas = state.resp.Cas
+ return false // either success or fatal error
+ }
+ }
+
+ // Initial call, or after a conflict: GET the current value and CAS and return them:
+ state.initialized = true
+ if state.resp, state.Err = c.Get(vb, k); state.Err == nil {
+ state.Exists = true
+ state.Value = state.resp.Body
+ state.Cas = state.resp.Cas
+ } else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT {
+ state.Err = nil
+ state.Exists = false
+ state.Value = nil
+ state.Cas = 0
+ } else {
+ return false // fatal error
+ }
+ return true // keep going...
+}
+
+// CasFunc is type type of function to perform a CAS transform.
+//
+// Input is the current value, or nil if no value exists.
+// The function should return the new value (if any) to set, and the store/quit/delete operation.
+type CasFunc func(current []byte) ([]byte, CasOp)
+
+// CAS performs a CAS transform with the given function.
+//
+// If the value does not exist, a nil current value will be sent to f.
+func (c *Client) CAS(vb uint16, k string, f CasFunc,
+ initexp int) (*gomemcached.MCResponse, error) {
+ var state CASState
+ for c.CASNext(vb, k, initexp, &state) {
+ newValue, operation := f(state.Value)
+ if operation == CASQuit || (operation == CASDelete && state.Value == nil) {
+ return nil, operation
+ }
+ state.Value = newValue
+ }
+ return state.resp, state.Err
+}
+
+// StatValue is one of the stats returned from the Stats method.
+type StatValue struct {
+ // The stat key
+ Key string
+ // The stat value
+ Val string
+}
+
+// Stats requests server-side stats.
+//
+// Use "" as the stat key for toplevel stats.
+func (c *Client) Stats(key string) ([]StatValue, error) {
+ rv := make([]StatValue, 0, 128)
+
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.STAT,
+ Key: []byte(key),
+ Opaque: 918494,
+ }
+
+ err := c.Transmit(req)
+ if err != nil {
+ return rv, err
+ }
+
+ for {
+ res, _, err := getResponse(c.conn, c.hdrBuf)
+ if err != nil {
+ return rv, err
+ }
+ k := string(res.Key)
+ if k == "" {
+ break
+ }
+ rv = append(rv, StatValue{
+ Key: k,
+ Val: string(res.Body),
+ })
+ }
+ return rv, nil
+}
+
+// StatsMap requests server-side stats similarly to Stats, but returns
+// them as a map.
+//
+// Use "" as the stat key for toplevel stats.
+func (c *Client) StatsMap(key string) (map[string]string, error) {
+ rv := make(map[string]string)
+
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.STAT,
+ Key: []byte(key),
+ Opaque: 918494,
+ }
+
+ err := c.Transmit(req)
+ if err != nil {
+ return rv, err
+ }
+
+ for {
+ res, _, err := getResponse(c.conn, c.hdrBuf)
+ if err != nil {
+ return rv, err
+ }
+ k := string(res.Key)
+ if k == "" {
+ break
+ }
+ rv[k] = string(res.Body)
+ }
+
+ return rv, nil
+}
+
+// instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys
+// for which stats needs to be retrieved
+func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error {
+
+ // clear statsMap
+ for key, _ := range statsMap {
+ statsMap[key] = ""
+ }
+
+ req := &gomemcached.MCRequest{
+ Opcode: gomemcached.STAT,
+ Key: []byte(key),
+ Opaque: 918494,
+ }
+
+ err := c.Transmit(req)
+ if err != nil {
+ return err
+ }
+
+ for {
+ res, _, err := getResponse(c.conn, c.hdrBuf)
+ if err != nil {
+ return err
+ }
+ k := string(res.Key)
+ if k == "" {
+ break
+ }
+ if _, ok := statsMap[k]; ok {
+ statsMap[k] = string(res.Body)
+ }
+ }
+
+ return nil
+}
+
+// Hijack exposes the underlying connection from this client.
+//
+// It also marks the connection as unhealthy since the client will
+// have lost control over the connection and can't otherwise verify
+// things are in good shape for connection pools.
+func (c *Client) Hijack() io.ReadWriteCloser {
+ c.setHealthy(false)
+ return c.conn
+}
+
+func (c *Client) setHealthy(healthy bool) {
+ healthyState := UnHealthy
+ if healthy {
+ healthyState = Healthy
+ }
+ atomic.StoreUint32(&c.healthy, healthyState)
+}
+
+func IfResStatusError(response *gomemcached.MCResponse) bool {
+ return response == nil ||
+ (response.Status != gomemcached.SUBDOC_BAD_MULTI &&
+ response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
+ response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
+}
diff --git a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
new file mode 100644
index 0000000000..fd628c5de2
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
@@ -0,0 +1,333 @@
+package memcached
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/goutils/logging"
+)
+
+// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol>
+
+// TapOpcode is the tap operation type (found in TapEvent)
+type TapOpcode uint8
+
+// Tap opcode values.
+const (
+ TapBeginBackfill = TapOpcode(iota)
+ TapEndBackfill
+ TapMutation
+ TapDeletion
+ TapCheckpointStart
+ TapCheckpointEnd
+ tapEndStream
+)
+
+const tapMutationExtraLen = 16
+
+var tapOpcodeNames map[TapOpcode]string
+
+func init() {
+ tapOpcodeNames = map[TapOpcode]string{
+ TapBeginBackfill: "BeginBackfill",
+ TapEndBackfill: "EndBackfill",
+ TapMutation: "Mutation",
+ TapDeletion: "Deletion",
+ TapCheckpointStart: "TapCheckpointStart",
+ TapCheckpointEnd: "TapCheckpointEnd",
+ tapEndStream: "EndStream",
+ }
+}
+
+func (opcode TapOpcode) String() string {
+ name := tapOpcodeNames[opcode]
+ if name == "" {
+ name = fmt.Sprintf("#%d", opcode)
+ }
+ return name
+}
+
+// TapEvent is a TAP notification of an operation on the server.
+type TapEvent struct {
+ Opcode TapOpcode // Type of event
+ VBucket uint16 // VBucket this event applies to
+ Flags uint32 // Item flags
+ Expiry uint32 // Item expiration time
+ Key, Value []byte // Item key/value
+ Cas uint64
+}
+
+func makeTapEvent(req gomemcached.MCRequest) *TapEvent {
+ event := TapEvent{
+ VBucket: req.VBucket,
+ }
+ switch req.Opcode {
+ case gomemcached.TAP_MUTATION:
+ event.Opcode = TapMutation
+ event.Key = req.Key
+ event.Value = req.Body
+ event.Cas = req.Cas
+ case gomemcached.TAP_DELETE:
+ event.Opcode = TapDeletion
+ event.Key = req.Key
+ event.Cas = req.Cas
+ case gomemcached.TAP_CHECKPOINT_START:
+ event.Opcode = TapCheckpointStart
+ case gomemcached.TAP_CHECKPOINT_END:
+ event.Opcode = TapCheckpointEnd
+ case gomemcached.TAP_OPAQUE:
+ if len(req.Extras) < 8+4 {
+ return nil
+ }
+ switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op {
+ case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
+ event.Opcode = TapBeginBackfill
+ case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL:
+ event.Opcode = TapEndBackfill
+ case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM:
+ event.Opcode = tapEndStream
+ case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK:
+ return nil
+ case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
+ return nil
+ default:
+ logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op)
+ return nil // unknown opaque event
+ }
+ case gomemcached.NOOP:
+ return nil // ignore
+ default:
+ logging.Infof("TapFeed: Ignoring %s", req.Opcode)
+ return nil // unknown event
+ }
+
+ if len(req.Extras) >= tapMutationExtraLen &&
+ (event.Opcode == TapMutation || event.Opcode == TapDeletion) {
+
+ event.Flags = binary.BigEndian.Uint32(req.Extras[8:])
+ event.Expiry = binary.BigEndian.Uint32(req.Extras[12:])
+ }
+
+ return &event
+}
+
+func (event TapEvent) String() string {
+ switch event.Opcode {
+ case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd:
+ return fmt.Sprintf("<TapEvent %s, vbucket=%d>",
+ event.Opcode, event.VBucket)
+ default:
+ return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>",
+ event.Opcode, event.Key, len(event.Value),
+ event.Flags, event.Expiry)
+ }
+}
+
+// TapArguments are parameters for requesting a TAP feed.
+//
+// Call DefaultTapArguments to get a default one.
+type TapArguments struct {
+ // Timestamp of oldest item to send.
+ //
+ // Use TapNoBackfill to suppress all past items.
+ Backfill uint64
+ // If set, server will disconnect after sending existing items.
+ Dump bool
+ // The indices of the vbuckets to watch; empty/nil to watch all.
+ VBuckets []uint16
+ // Transfers ownership of vbuckets during cluster rebalance.
+ Takeover bool
+ // If true, server will wait for client ACK after every notification.
+ SupportAck bool
+ // If true, client doesn't want values so server shouldn't send them.
+ KeysOnly bool
+ // If true, client wants the server to send checkpoint events.
+ Checkpoint bool
+ // Optional identifier to use for this client, to allow reconnects
+ ClientName string
+ // Registers this client (by name) till explicitly deregistered.
+ RegisteredClient bool
+}
+
+// Value for TapArguments.Backfill denoting that no past events at all
+// should be sent.
+const TapNoBackfill = math.MaxUint64
+
+// DefaultTapArguments returns a default set of parameter values to
+// pass to StartTapFeed.
+func DefaultTapArguments() TapArguments {
+ return TapArguments{
+ Backfill: TapNoBackfill,
+ }
+}
+
+func (args *TapArguments) flags() []byte {
+ var flags gomemcached.TapConnectFlag
+ if args.Backfill != 0 {
+ flags |= gomemcached.BACKFILL
+ }
+ if args.Dump {
+ flags |= gomemcached.DUMP
+ }
+ if len(args.VBuckets) > 0 {
+ flags |= gomemcached.LIST_VBUCKETS
+ }
+ if args.Takeover {
+ flags |= gomemcached.TAKEOVER_VBUCKETS
+ }
+ if args.SupportAck {
+ flags |= gomemcached.SUPPORT_ACK
+ }
+ if args.KeysOnly {
+ flags |= gomemcached.REQUEST_KEYS_ONLY
+ }
+ if args.Checkpoint {
+ flags |= gomemcached.CHECKPOINT
+ }
+ if args.RegisteredClient {
+ flags |= gomemcached.REGISTERED_CLIENT
+ }
+ encoded := make([]byte, 4)
+ binary.BigEndian.PutUint32(encoded, uint32(flags))
+ return encoded
+}
+
+func must(err error) {
+ if err != nil {
+ panic(err)
+ }
+}
+
+func (args *TapArguments) bytes() (rv []byte) {
+ buf := bytes.NewBuffer([]byte{})
+
+ if args.Backfill > 0 {
+ must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill)))
+ }
+
+ if len(args.VBuckets) > 0 {
+ must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets))))
+ for i := 0; i < len(args.VBuckets); i++ {
+ must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i])))
+ }
+ }
+ return buf.Bytes()
+}
+
+// TapFeed represents a stream of events from a server.
+type TapFeed struct {
+ C <-chan TapEvent
+ Error error
+ closer chan bool
+}
+
+// StartTapFeed starts a TAP feed on a client connection.
+//
+// The events can be read from the returned channel. The connection
+// can no longer be used for other purposes; it's now reserved for
+// receiving the TAP messages. To stop receiving events, close the
+// client connection.
+func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) {
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.TAP_CONNECT,
+ Key: []byte(args.ClientName),
+ Extras: args.flags(),
+ Body: args.bytes()}
+
+ err := mc.Transmit(rq)
+ if err != nil {
+ return nil, err
+ }
+
+ ch := make(chan TapEvent)
+ feed := &TapFeed{
+ C: ch,
+ closer: make(chan bool),
+ }
+ go mc.runFeed(ch, feed)
+ return feed, nil
+}
+
+// TapRecvHook is called after every incoming tap packet is received.
+var TapRecvHook func(*gomemcached.MCRequest, int, error)
+
+// Internal goroutine that reads from the socket and writes events to
+// the channel
+func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) {
+ defer close(ch)
+ var headerBuf [gomemcached.HDR_LEN]byte
+loop:
+ for {
+ // Read the next request from the server.
+ //
+ // (Can't call mc.Receive() because it reads a
+ // _response_ not a request.)
+ var pkt gomemcached.MCRequest
+ n, err := pkt.Receive(mc.conn, headerBuf[:])
+ if TapRecvHook != nil {
+ TapRecvHook(&pkt, n, err)
+ }
+
+ if err != nil {
+ if err != io.EOF {
+ feed.Error = err
+ }
+ break loop
+ }
+
+ //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body)
+
+ if pkt.Opcode == gomemcached.TAP_CONNECT {
+ // This is not an event from the server; it's
+ // an error response to my connect request.
+ feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body)
+ break loop
+ }
+
+ event := makeTapEvent(pkt)
+ if event != nil {
+ if event.Opcode == tapEndStream {
+ break loop
+ }
+
+ select {
+ case ch <- *event:
+ case <-feed.closer:
+ break loop
+ }
+ }
+
+ if len(pkt.Extras) >= 4 {
+ reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:])
+ if reqFlags&gomemcached.TAP_ACK != 0 {
+ if _, err := mc.sendAck(&pkt); err != nil {
+ feed.Error = err
+ break loop
+ }
+ }
+ }
+ }
+ if err := mc.Close(); err != nil {
+ logging.Errorf("Error closing memcached client: %v", err)
+ }
+}
+
+func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) {
+ res := gomemcached.MCResponse{
+ Opcode: pkt.Opcode,
+ Opaque: pkt.Opaque,
+ Status: gomemcached.SUCCESS,
+ }
+ return res.Transmit(mc.conn)
+}
+
+// Close terminates a TapFeed.
+//
+// Call this if you stop using a TapFeed before its channel ends.
+func (feed *TapFeed) Close() {
+ close(feed.closer)
+}
diff --git a/vendor/github.com/couchbase/gomemcached/client/transport.go b/vendor/github.com/couchbase/gomemcached/client/transport.go
new file mode 100644
index 0000000000..f4cea17fca
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/client/transport.go
@@ -0,0 +1,67 @@
+package memcached
+
+import (
+ "errors"
+ "io"
+
+ "github.com/couchbase/gomemcached"
+)
+
+var errNoConn = errors.New("no connection")
+
+// UnwrapMemcachedError converts memcached errors to normal responses.
+//
+// If the error is a memcached response, declare the error to be nil
+// so a client can handle the status without worrying about whether it
+// indicates success or failure.
+func UnwrapMemcachedError(rv *gomemcached.MCResponse,
+ err error) (*gomemcached.MCResponse, error) {
+
+ if rv == err {
+ return rv, nil
+ }
+ return rv, err
+}
+
+// ReceiveHook is called after every packet is received (or attempted to be)
+var ReceiveHook func(*gomemcached.MCResponse, int, error)
+
+func getResponse(s io.Reader, hdrBytes []byte) (rv *gomemcached.MCResponse, n int, err error) {
+ if s == nil {
+ return nil, 0, errNoConn
+ }
+
+ rv = &gomemcached.MCResponse{}
+ n, err = rv.Receive(s, hdrBytes)
+
+ if ReceiveHook != nil {
+ ReceiveHook(rv, n, err)
+ }
+
+ if err == nil && (rv.Status != gomemcached.SUCCESS && rv.Status != gomemcached.AUTH_CONTINUE) {
+ err = rv
+ }
+ return rv, n, err
+}
+
+// TransmitHook is called after each packet is transmitted.
+var TransmitHook func(*gomemcached.MCRequest, int, error)
+
+func transmitRequest(o io.Writer, req *gomemcached.MCRequest) (int, error) {
+ if o == nil {
+ return 0, errNoConn
+ }
+ n, err := req.Transmit(o)
+ if TransmitHook != nil {
+ TransmitHook(req, n, err)
+ }
+ return n, err
+}
+
+func transmitResponse(o io.Writer, res *gomemcached.MCResponse) (int, error) {
+ if o == nil {
+ return 0, errNoConn
+ }
+ n, err := res.Transmit(o)
+ return n, err
+}
diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go
new file mode 100644
index 0000000000..dc737e6cc0
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go
@@ -0,0 +1,1005 @@
+// go implementation of upr client.
+// See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md
+// TODO
+// 1. Use a pool allocator to avoid garbage
+package memcached
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/goutils/logging"
+ "strconv"
+ "sync"
+ "sync/atomic"
+)
+
+const uprMutationExtraLen = 30
+const uprDeletetionExtraLen = 18
+const uprDeletetionWithDeletionTimeExtraLen = 21
+const uprSnapshotExtraLen = 20
+const bufferAckThreshold = 0.2
+const opaqueOpen = 0xBEAF0001
+const opaqueFailover = 0xDEADBEEF
+const uprDefaultNoopInterval = 120
+
+// Counter on top of opaqueOpen that others can draw from for open and control msgs
+var opaqueOpenCtrlWell uint32 = opaqueOpen
+
+// UprEvent memcached events for UPR streams.
+type UprEvent struct {
+ Opcode gomemcached.CommandCode // Type of event
+ Status gomemcached.Status // Response status
+ VBucket uint16 // VBucket this event applies to
+ DataType uint8 // data type
+ Opaque uint16 // 16 MSB of opaque
+ VBuuid uint64 // This field is set by downstream
+ Flags uint32 // Item flags
+ Expiry uint32 // Item expiration time
+ Key, Value []byte // Item key/value
+ OldValue []byte // TODO: TBD: old document value
+ Cas uint64 // CAS value of the item
+ Seqno uint64 // sequence number of the mutation
+ RevSeqno uint64 // rev sequence number : deletions
+ LockTime uint32 // Lock time
+ MetadataSize uint16 // Metadata size
+ SnapstartSeq uint64 // start sequence number of this snapshot
+ SnapendSeq uint64 // End sequence number of the snapshot
+ SnapshotType uint32 // 0: disk 1: memory
+ FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
+ Error error // Error value in case of a failure
+ ExtMeta []byte
+ AckSize uint32 // The number of bytes that can be Acked to DCP
+}
+
+// UprStream is per stream data structure over an UPR Connection.
+type UprStream struct {
+ Vbucket uint16 // Vbucket id
+ Vbuuid uint64 // vbucket uuid
+ StartSeq uint64 // start sequence number
+ EndSeq uint64 // end sequence number
+ connected bool
+}
+
+const (
+ CompressionTypeStartMarker = iota // also means invalid
+ CompressionTypeNone = iota
+ CompressionTypeSnappy = iota
+ CompressionTypeEndMarker = iota // also means invalid
+)
+
+// kv_engine/include/mcbp/protocol/datatype.h
+const (
+ JSONDataType uint8 = 1
+ SnappyDataType uint8 = 2
+ XattrDataType uint8 = 4
+)
+
+type UprFeatures struct {
+ Xattribute bool
+ CompressionType int
+ IncludeDeletionTime bool
+}
+
+/**
+ * Used to handle multiple concurrent calls UprRequestStream() by UprFeed clients
+ * It is expected that a client that calls UprRequestStream() more than once should issue
+ * different "opaque" (version) numbers
+ */
+type opaqueStreamMap map[uint16]*UprStream // opaque -> stream
+
+type vbStreamNegotiator struct {
+ vbHandshakeMap map[uint16]opaqueStreamMap // vbno -> opaqueStreamMap
+ mutex sync.RWMutex
+}
+
+func (negotiator *vbStreamNegotiator) initialize() {
+ negotiator.mutex.Lock()
+ negotiator.vbHandshakeMap = make(map[uint16]opaqueStreamMap)
+ negotiator.mutex.Unlock()
+}
+
+func (negotiator *vbStreamNegotiator) registerRequest(vbno, opaque uint16, vbuuid, startSequence, endSequence uint64) {
+ negotiator.mutex.Lock()
+ defer negotiator.mutex.Unlock()
+
+ var osMap opaqueStreamMap
+ var ok bool
+ if osMap, ok = negotiator.vbHandshakeMap[vbno]; !ok {
+ osMap = make(opaqueStreamMap)
+ negotiator.vbHandshakeMap[vbno] = osMap
+ }
+
+ if _, ok = osMap[opaque]; !ok {
+ osMap[opaque] = &UprStream{
+ Vbucket: vbno,
+ Vbuuid: vbuuid,
+ StartSeq: startSequence,
+ EndSeq: endSequence,
+ }
+ }
+}
+
+func (negotiator *vbStreamNegotiator) getStreamsCntFromMap(vbno uint16) int {
+ negotiator.mutex.RLock()
+ defer negotiator.mutex.RUnlock()
+
+ osmap, ok := negotiator.vbHandshakeMap[vbno]
+ if !ok {
+ return 0
+ } else {
+ return len(osmap)
+ }
+}
+
+func (negotiator *vbStreamNegotiator) getStreamFromMap(vbno, opaque uint16) (*UprStream, error) {
+ negotiator.mutex.RLock()
+ defer negotiator.mutex.RUnlock()
+
+ osmap, ok := negotiator.vbHandshakeMap[vbno]
+ if !ok {
+ return nil, fmt.Errorf("Error: stream for vb: %v does not exist", vbno)
+ }
+
+ stream, ok := osmap[opaque]
+ if !ok {
+ return nil, fmt.Errorf("Error: stream for vb: %v opaque: %v does not exist", vbno, opaque)
+ }
+ return stream, nil
+}
+
+func (negotiator *vbStreamNegotiator) deleteStreamFromMap(vbno, opaque uint16) {
+ negotiator.mutex.Lock()
+ defer negotiator.mutex.Unlock()
+
+ osmap, ok := negotiator.vbHandshakeMap[vbno]
+ if !ok {
+ return
+ }
+
+ delete(osmap, opaque)
+ if len(osmap) == 0 {
+ delete(negotiator.vbHandshakeMap, vbno)
+ }
+}
+
+func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed,
+ headerBuf [gomemcached.HDR_LEN]byte, pktPtr *gomemcached.MCRequest, bytesReceivedFromDCP int,
+ response *gomemcached.MCResponse) (*UprEvent, error) {
+ var event *UprEvent
+
+ if feed == nil || response == nil || pktPtr == nil {
+ return nil, errors.New("Invalid inputs")
+ }
+
+ // Get Stream from negotiator map
+ vbno := vbOpaque(response.Opaque)
+ opaque := appOpaque(response.Opaque)
+
+ stream, err := negotiator.getStreamFromMap(vbno, opaque)
+ if err != nil {
+ err = fmt.Errorf("Stream not found for vb %d appOpaque %v: %#v", vbno, appOpaque, *pktPtr)
+ logging.Errorf(err.Error())
+ return nil, err
+ }
+
+ status, rb, flog, err := handleStreamRequest(response, headerBuf[:])
+
+ if status == gomemcached.ROLLBACK {
+ event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
+ event.Status = status
+ // rollback stream
+ logging.Infof("UPR_STREAMREQ with rollback %d for vb %d Failed: %v", rb, vbno, err)
+ negotiator.deleteStreamFromMap(vbno, opaque)
+ } else if status == gomemcached.SUCCESS {
+ event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
+ event.Seqno = stream.StartSeq
+ event.FailoverLog = flog
+ event.Status = status
+ feed.activateStream(vbno, opaque, stream)
+ feed.negotiator.deleteStreamFromMap(vbno, opaque)
+ logging.Infof("UPR_STREAMREQ for vb %d successful", vbno)
+
+ } else if err != nil {
+ logging.Errorf("UPR_STREAMREQ for vbucket %d erro %s", vbno, err.Error())
+ event = &UprEvent{
+ Opcode: gomemcached.UPR_STREAMREQ,
+ Status: status,
+ VBucket: vbno,
+ Error: err,
+ }
+ negotiator.deleteStreamFromMap(vbno, opaque)
+ }
+ return event, nil
+}
+
+func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) {
+ negotiator.mutex.Lock()
+ defer negotiator.mutex.Unlock()
+
+ delete(negotiator.vbHandshakeMap, vbno)
+}
+
+// UprFeed represents an UPR feed. A feed contains a connection to a single
+// host and multiple vBuckets
+type UprFeed struct {
+ // lock for feed.vbstreams
+ muVbstreams sync.RWMutex
+ // lock for feed.closed
+ muClosed sync.RWMutex
+ C <-chan *UprEvent // Exported channel for receiving UPR events
+ negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation
+ vbstreams map[uint16]*UprStream // official live vb->stream mapping
+ closer chan bool // closer
+ conn *Client // connection to UPR producer
+ Error error // error
+ bytesRead uint64 // total bytes read on this connection
+ toAckBytes uint32 // bytes client has read
+ maxAckBytes uint32 // Max buffer control ack bytes
+ stats UprStats // Stats for upr client
+ transmitCh chan *gomemcached.MCRequest // transmit command channel
+ transmitCl chan bool // closer channel for transmit go-routine
+ closed bool // flag indicating whether the feed has been closed
+ // flag indicating whether client of upr feed will send ack to upr feed
+ // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP
+ // if flag is false, upr feed will track how many bytes it has sent to client
+ // and use that to determine whether/when to send ack to DCP
+ ackByClient bool
+}
+
+// Exported interface - to allow for mocking
+type UprFeedIface interface {
+ Close()
+ Closed() bool
+ CloseStream(vbno, opaqueMSB uint16) error
+ GetError() error
+ GetUprStats() *UprStats
+ ClientAck(event *UprEvent) error
+ GetUprEventCh() <-chan *UprEvent
+ StartFeed() error
+ StartFeedWithConfig(datachan_len int) error
+ UprOpen(name string, sequence uint32, bufSize uint32) error
+ UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
+ UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
+ UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
+}
+
+type UprStats struct {
+ TotalBytes uint64
+ TotalMutation uint64
+ TotalBufferAckSent uint64
+ TotalSnapShot uint64
+}
+
+// FailoverLog containing vvuid and sequnce number
+type FailoverLog [][2]uint64
+
+// error codes
+var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
+
+func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) {
+ if flogp != nil {
+ flog := *flogp
+ latest := flog[len(flog)-1]
+ return latest[0], latest[1], nil
+ }
+ return vbuuid, seqno, ErrorInvalidLog
+}
+
+func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
+ event := &UprEvent{
+ Opcode: rq.Opcode,
+ VBucket: stream.Vbucket,
+ VBuuid: stream.Vbuuid,
+ Key: rq.Key,
+ Value: rq.Body,
+ Cas: rq.Cas,
+ ExtMeta: rq.ExtMeta,
+ DataType: rq.DataType,
+ }
+
+ // set AckSize for events that need to be acked to DCP,
+ // i.e., events with CommandCodes that need to be buffered in DCP
+ if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
+ event.AckSize = uint32(bytesReceivedFromDCP)
+ }
+
+ // 16 LSBits are used by client library to encode vbucket number.
+ // 16 MSBits are left for application to multiplex on opaque value.
+ event.Opaque = appOpaque(rq.Opaque)
+
+ if len(rq.Extras) >= uprMutationExtraLen &&
+ event.Opcode == gomemcached.UPR_MUTATION {
+
+ event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
+ event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
+ event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
+ event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
+
+ } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
+ event.Opcode == gomemcached.UPR_DELETION {
+
+ event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
+
+ } else if len(rq.Extras) >= uprDeletetionExtraLen &&
+ event.Opcode == gomemcached.UPR_DELETION ||
+ event.Opcode == gomemcached.UPR_EXPIRATION {
+
+ event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
+
+ } else if len(rq.Extras) >= uprSnapshotExtraLen &&
+ event.Opcode == gomemcached.UPR_SNAPSHOT {
+
+ event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
+ }
+
+ return event
+}
+
+func (event *UprEvent) String() string {
+ name := gomemcached.CommandNames[event.Opcode]
+ if name == "" {
+ name = fmt.Sprintf("#%d", event.Opcode)
+ }
+ return name
+}
+
+func (event *UprEvent) IsSnappyDataType() bool {
+ return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
+}
+
+func (feed *UprFeed) sendCommands(mc *Client) {
+ transmitCh := feed.transmitCh
+ transmitCl := feed.transmitCl
+loop:
+ for {
+ select {
+ case command := <-transmitCh:
+ if err := mc.Transmit(command); err != nil {
+ logging.Errorf("Failed to transmit command %s. Error %s", command.Opcode.String(), err.Error())
+ // get feed to close and runFeed routine to exit
+ feed.Close()
+ break loop
+ }
+
+ case <-transmitCl:
+ break loop
+ }
+ }
+
+ // After sendCommands exits, write to transmitCh will block forever
+ // when we write to transmitCh, e.g., at CloseStream(), we need to check feed closure to have an exit route
+
+ logging.Infof("sendCommands exiting")
+}
+
+// Sets the specified stream as the connected stream for this vbno, and also cleans up negotiator
+func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) error {
+ feed.muVbstreams.Lock()
+ defer feed.muVbstreams.Unlock()
+
+ // Set this stream as the officially connected stream for this vb
+ stream.connected = true
+ feed.vbstreams[vbno] = stream
+ return nil
+}
+
+func (feed *UprFeed) cleanUpVbStream(vbno uint16) {
+ feed.muVbstreams.Lock()
+ defer feed.muVbstreams.Unlock()
+
+ delete(feed.vbstreams, vbno)
+}
+
+// NewUprFeed creates a new UPR Feed.
+// TODO: Describe side-effects on bucket instance and its connection pool.
+func (mc *Client) NewUprFeed() (*UprFeed, error) {
+ return mc.NewUprFeedWithConfig(false /*ackByClient*/)
+}
+
+func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) {
+
+ feed := &UprFeed{
+ conn: mc,
+ closer: make(chan bool, 1),
+ vbstreams: make(map[uint16]*UprStream),
+ transmitCh: make(chan *gomemcached.MCRequest),
+ transmitCl: make(chan bool),
+ ackByClient: ackByClient,
+ }
+
+ feed.negotiator.initialize()
+
+ go feed.sendCommands(mc)
+ return feed, nil
+}
+
+func (mc *Client) NewUprFeedIface() (UprFeedIface, error) {
+ return mc.NewUprFeed()
+}
+
+func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) {
+ return mc.NewUprFeedWithConfig(ackByClient)
+}
+
+func doUprOpen(mc *Client, name string, sequence uint32, features UprFeatures) error {
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_OPEN,
+ Key: []byte(name),
+ Opaque: getUprOpenCtrlOpaque(),
+ }
+
+ rq.Extras = make([]byte, 8)
+ binary.BigEndian.PutUint32(rq.Extras[:4], sequence)
+
+ // opens a producer type connection
+ flags := gomemcached.DCP_PRODUCER
+ if features.Xattribute {
+ flags = flags | gomemcached.DCP_OPEN_INCLUDE_XATTRS
+ }
+ if features.IncludeDeletionTime {
+ flags = flags | gomemcached.DCP_OPEN_INCLUDE_DELETE_TIMES
+ }
+ binary.BigEndian.PutUint32(rq.Extras[4:], flags)
+
+ return sendMcRequestSync(mc, rq)
+}
+
+// Synchronously send a memcached request and wait for the response
+func sendMcRequestSync(mc *Client, req *gomemcached.MCRequest) error {
+ if err := mc.Transmit(req); err != nil {
+ return err
+ }
+
+ if res, err := mc.Receive(); err != nil {
+ return err
+ } else if req.Opcode != res.Opcode {
+ return fmt.Errorf("unexpected #opcode sent %v received %v", req.Opcode, res.Opaque)
+ } else if req.Opaque != res.Opaque {
+ return fmt.Errorf("opaque mismatch, sent %v received %v", req.Opaque, res.Opaque)
+ } else if res.Status != gomemcached.SUCCESS {
+ return fmt.Errorf("error %v", res.Status)
+ }
+ return nil
+}
+
+// UprOpen to connect with a UPR producer.
+// Name: name of te UPR connection
+// sequence: sequence number for the connection
+// bufsize: max size of the application
+func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error {
+ var allFeaturesDisabled UprFeatures
+ err, _ := feed.uprOpen(name, sequence, bufSize, allFeaturesDisabled)
+ return err
+}
+
+// UprOpen with XATTR enabled.
+func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error {
+ var onlyXattrEnabled UprFeatures
+ onlyXattrEnabled.Xattribute = true
+ err, _ := feed.uprOpen(name, sequence, bufSize, onlyXattrEnabled)
+ return err
+}
+
+func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) {
+ return feed.uprOpen(name, sequence, bufSize, features)
+}
+
+func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) {
+ mc := feed.conn
+
+ // First set this to an invalid value to state that the method hasn't gotten to executing this control yet
+ activatedFeatures.CompressionType = CompressionTypeEndMarker
+
+ if err = doUprOpen(mc, name, sequence, features); err != nil {
+ return
+ }
+
+ activatedFeatures.Xattribute = features.Xattribute
+
+ // send a UPR control message to set the window size for the this connection
+ if bufSize > 0 {
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_CONTROL,
+ Key: []byte("connection_buffer_size"),
+ Body: []byte(strconv.Itoa(int(bufSize))),
+ Opaque: getUprOpenCtrlOpaque(),
+ }
+ err = sendMcRequestSync(feed.conn, rq)
+ if err != nil {
+ return
+ }
+ feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufSize))
+ }
+
+ // enable noop and set noop interval
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_CONTROL,
+ Key: []byte("enable_noop"),
+ Body: []byte("true"),
+ Opaque: getUprOpenCtrlOpaque(),
+ }
+ err = sendMcRequestSync(feed.conn, rq)
+ if err != nil {
+ return
+ }
+
+ rq = &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_CONTROL,
+ Key: []byte("set_noop_interval"),
+ Body: []byte(strconv.Itoa(int(uprDefaultNoopInterval))),
+ Opaque: getUprOpenCtrlOpaque(),
+ }
+ err = sendMcRequestSync(feed.conn, rq)
+ if err != nil {
+ return
+ }
+
+ if features.CompressionType == CompressionTypeSnappy {
+ activatedFeatures.CompressionType = CompressionTypeNone
+ rq = &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_CONTROL,
+ Key: []byte("force_value_compression"),
+ Body: []byte("true"),
+ Opaque: getUprOpenCtrlOpaque(),
+ }
+ err = sendMcRequestSync(feed.conn, rq)
+ } else if features.CompressionType == CompressionTypeEndMarker {
+ err = fmt.Errorf("UPR_CONTROL Failed - Invalid CompressionType: %v", features.CompressionType)
+ }
+ if err != nil {
+ return
+ }
+ activatedFeatures.CompressionType = features.CompressionType
+
+ return
+}
+
+// UprGetFailoverLog for given list of vbuckets.
+func (mc *Client) UprGetFailoverLog(
+ vb []uint16) (map[uint16]*FailoverLog, error) {
+
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_FAILOVERLOG,
+ Opaque: opaqueFailover,
+ }
+
+ var allFeaturesDisabled UprFeatures
+ if err := doUprOpen(mc, "FailoverLog", 0, allFeaturesDisabled); err != nil {
+ return nil, fmt.Errorf("UPR_OPEN Failed %s", err.Error())
+ }
+
+ failoverLogs := make(map[uint16]*FailoverLog)
+ for _, vBucket := range vb {
+ rq.VBucket = vBucket
+ if err := mc.Transmit(rq); err != nil {
+ return nil, err
+ }
+ res, err := mc.Receive()
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to receive %s", err.Error())
+ } else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
+ return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
+ }
+
+ flog, err := parseFailoverLog(res.Body)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
+ }
+ failoverLogs[vBucket] = flog
+ }
+
+ return failoverLogs, nil
+}
+
+// UprRequestStream for a single vbucket.
+func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
+ vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
+
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_STREAMREQ,
+ VBucket: vbno,
+ Opaque: composeOpaque(vbno, opaqueMSB),
+ }
+
+ rq.Extras = make([]byte, 48) // #Extras
+ binary.BigEndian.PutUint32(rq.Extras[:4], flags)
+ binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
+ binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
+ binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
+ binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid)
+ binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
+ binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
+
+ feed.negotiator.registerRequest(vbno, opaqueMSB, vuuid, startSequence, endSequence)
+ // Any client that has ever called this method, regardless of return code,
+ // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit.
+
+ if err := feed.conn.Transmit(rq); err != nil {
+ logging.Errorf("Error in StreamRequest %s", err.Error())
+ // If an error occurs during transmit, then the UPRFeed will keep the stream
+ // in the vbstreams map. This is to prevent nil lookup from any previously
+ // sent stream requests.
+ return err
+ }
+
+ return nil
+}
+
+// CloseStream for specified vbucket.
+func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error {
+
+ err := feed.validateCloseStream(vbno)
+ if err != nil {
+ logging.Infof("CloseStream for %v has been skipped because of error %v", vbno, err)
+ return err
+ }
+
+ closeStream := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_CLOSESTREAM,
+ VBucket: vbno,
+ Opaque: composeOpaque(vbno, opaqueMSB),
+ }
+
+ feed.writeToTransmitCh(closeStream)
+
+ return nil
+}
+
+func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent {
+ return feed.C
+}
+
+func (feed *UprFeed) GetError() error {
+ return feed.Error
+}
+
+func (feed *UprFeed) validateCloseStream(vbno uint16) error {
+ feed.muVbstreams.RLock()
+ nilVbStream := feed.vbstreams[vbno] == nil
+ feed.muVbstreams.RUnlock()
+
+ if nilVbStream && (feed.negotiator.getStreamsCntFromMap(vbno) == 0) {
+ return fmt.Errorf("Stream for vb %d has not been requested", vbno)
+ }
+
+ return nil
+}
+
+func (feed *UprFeed) writeToTransmitCh(rq *gomemcached.MCRequest) error {
+ // write to transmitCh may block forever if sendCommands has exited
+ // check for feed closure to have an exit route in this case
+ select {
+ case <-feed.closer:
+ errMsg := fmt.Sprintf("Abort sending request to transmitCh because feed has been closed. request=%v", rq)
+ logging.Infof(errMsg)
+ return errors.New(errMsg)
+ case feed.transmitCh <- rq:
+ }
+ return nil
+}
+
+// StartFeed to start the upper feed.
+func (feed *UprFeed) StartFeed() error {
+ return feed.StartFeedWithConfig(10)
+}
+
+func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error {
+ ch := make(chan *UprEvent, datachan_len)
+ feed.C = ch
+ go feed.runFeed(ch)
+ return nil
+}
+
+func parseFailoverLog(body []byte) (*FailoverLog, error) {
+
+ if len(body)%16 != 0 {
+ err := fmt.Errorf("invalid body length %v, in failover-log", len(body))
+ return nil, err
+ }
+ log := make(FailoverLog, len(body)/16)
+ for i, j := 0, 0; i < len(body); i += 16 {
+ vuuid := binary.BigEndian.Uint64(body[i : i+8])
+ seqno := binary.BigEndian.Uint64(body[i+8 : i+16])
+ log[j] = [2]uint64{vuuid, seqno}
+ j++
+ }
+ return &log, nil
+}
+
+func handleStreamRequest(
+ res *gomemcached.MCResponse,
+ headerBuf []byte,
+) (gomemcached.Status, uint64, *FailoverLog, error) {
+
+ var rollback uint64
+ var err error
+
+ switch {
+ case res.Status == gomemcached.ROLLBACK:
+ logging.Infof("Rollback response. body=%v, headerBuf=%v\n", res.Body, headerBuf)
+ rollback = binary.BigEndian.Uint64(res.Body)
+ logging.Infof("Rollback seqno is %v for response with opaque %v\n", rollback, res.Opaque)
+ return res.Status, rollback, nil, nil
+
+ case res.Status != gomemcached.SUCCESS:
+ err = fmt.Errorf("unexpected status %v for response with opaque %v", res.Status, res.Opaque)
+ return res.Status, 0, nil, err
+ }
+
+ flog, err := parseFailoverLog(res.Body[:])
+ return res.Status, rollback, flog, err
+}
+
+// generate stream end responses for all active vb streams
+func (feed *UprFeed) doStreamClose(ch chan *UprEvent) {
+ feed.muVbstreams.RLock()
+
+ uprEvents := make([]*UprEvent, len(feed.vbstreams))
+ index := 0
+ for vbno, stream := range feed.vbstreams {
+ uprEvent := &UprEvent{
+ VBucket: vbno,
+ VBuuid: stream.Vbuuid,
+ Opcode: gomemcached.UPR_STREAMEND,
+ }
+ uprEvents[index] = uprEvent
+ index++
+ }
+
+ // release the lock before sending uprEvents to ch, which may block
+ feed.muVbstreams.RUnlock()
+
+loop:
+ for _, uprEvent := range uprEvents {
+ select {
+ case ch <- uprEvent:
+ case <-feed.closer:
+ logging.Infof("Feed has been closed. Aborting doStreamClose.")
+ break loop
+ }
+ }
+}
+
+func (feed *UprFeed) runFeed(ch chan *UprEvent) {
+ defer close(ch)
+ var headerBuf [gomemcached.HDR_LEN]byte
+ var pkt gomemcached.MCRequest
+ var event *UprEvent
+
+ mc := feed.conn.Hijack()
+ uprStats := &feed.stats
+
+loop:
+ for {
+ select {
+ case <-feed.closer:
+ logging.Infof("Feed has been closed. Exiting.")
+ break loop
+ default:
+ bytes, err := pkt.Receive(mc, headerBuf[:])
+ if err != nil {
+ logging.Errorf("Error in receive %s", err.Error())
+ feed.Error = err
+ // send all the stream close messages to the client
+ feed.doStreamClose(ch)
+ break loop
+ } else {
+ event = nil
+ res := &gomemcached.MCResponse{
+ Opcode: pkt.Opcode,
+ Cas: pkt.Cas,
+ Opaque: pkt.Opaque,
+ Status: gomemcached.Status(pkt.VBucket),
+ Extras: pkt.Extras,
+ Key: pkt.Key,
+ Body: pkt.Body,
+ }
+
+ vb := vbOpaque(pkt.Opaque)
+ appOpaque := appOpaque(pkt.Opaque)
+ uprStats.TotalBytes = uint64(bytes)
+
+ feed.muVbstreams.RLock()
+ stream := feed.vbstreams[vb]
+ feed.muVbstreams.RUnlock()
+
+ switch pkt.Opcode {
+ case gomemcached.UPR_STREAMREQ:
+ event, err = feed.negotiator.handleStreamRequest(feed, headerBuf, &pkt, bytes, res)
+ if err != nil {
+ logging.Infof(err.Error())
+ break loop
+ }
+ case gomemcached.UPR_MUTATION,
+ gomemcached.UPR_DELETION,
+ gomemcached.UPR_EXPIRATION:
+ if stream == nil {
+ logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+ break loop
+ }
+ event = makeUprEvent(pkt, stream, bytes)
+ uprStats.TotalMutation++
+
+ case gomemcached.UPR_STREAMEND:
+ if stream == nil {
+ logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+ break loop
+ }
+ //stream has ended
+ event = makeUprEvent(pkt, stream, bytes)
+ logging.Infof("Stream Ended for vb %d", vb)
+
+ feed.negotiator.deleteStreamFromMap(vb, appOpaque)
+ feed.cleanUpVbStream(vb)
+
+ case gomemcached.UPR_SNAPSHOT:
+ if stream == nil {
+ logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+ break loop
+ }
+ // snapshot marker
+ event = makeUprEvent(pkt, stream, bytes)
+ uprStats.TotalSnapShot++
+
+ case gomemcached.UPR_FLUSH:
+ if stream == nil {
+ logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+ break loop
+ }
+ // special processing for flush ?
+ event = makeUprEvent(pkt, stream, bytes)
+
+ case gomemcached.UPR_CLOSESTREAM:
+ if stream == nil {
+ logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+ break loop
+ }
+ event = makeUprEvent(pkt, stream, bytes)
+ event.Opcode = gomemcached.UPR_STREAMEND // opcode re-write !!
+ logging.Infof("Stream Closed for vb %d StreamEnd simulated", vb)
+
+ feed.negotiator.deleteStreamFromMap(vb, appOpaque)
+ feed.cleanUpVbStream(vb)
+
+ case gomemcached.UPR_ADDSTREAM:
+ logging.Infof("Opcode %v not implemented", pkt.Opcode)
+
+ case gomemcached.UPR_CONTROL, gomemcached.UPR_BUFFERACK:
+ if res.Status != gomemcached.SUCCESS {
+ logging.Infof("Opcode %v received status %d", pkt.Opcode.String(), res.Status)
+ }
+
+ case gomemcached.UPR_NOOP:
+ // send a NOOP back
+ noop := &gomemcached.MCResponse{
+ Opcode: gomemcached.UPR_NOOP,
+ Opaque: pkt.Opaque,
+ }
+
+ if err := feed.conn.TransmitResponse(noop); err != nil {
+ logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error())
+ }
+ default:
+ logging.Infof("Recived an unknown response for vbucket %d", vb)
+ }
+ }
+
+ if event != nil {
+ select {
+ case ch <- event:
+ case <-feed.closer:
+ logging.Infof("Feed has been closed. Skip sending events. Exiting.")
+ break loop
+ }
+
+ feed.muVbstreams.RLock()
+ l := len(feed.vbstreams)
+ feed.muVbstreams.RUnlock()
+
+ if event.Opcode == gomemcached.UPR_CLOSESTREAM && l == 0 {
+ logging.Infof("No more streams")
+ }
+ }
+
+ if !feed.ackByClient {
+ // if client does not ack, do the ack check now
+ feed.sendBufferAckIfNeeded(event)
+ }
+ }
+ }
+
+ // make sure that feed is closed before we signal transmitCl and exit runFeed
+ feed.Close()
+
+ close(feed.transmitCl)
+ logging.Infof("runFeed exiting")
+}
+
+// Client, after completing processing of an UprEvent, need to call this API to notify UprFeed,
+// so that UprFeed can update its ack bytes stats and send ack to DCP if needed
+// Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work
+// This API is not thread safe. Caller should NOT have more than one go rountine calling this API
+func (feed *UprFeed) ClientAck(event *UprEvent) error {
+ if !feed.ackByClient {
+ return errors.New("Upr feed does not have ackByclient flag set")
+ }
+ feed.sendBufferAckIfNeeded(event)
+ return nil
+}
+
+// increment ack bytes if the event needs to be acked to DCP
+// send buffer ack if enough ack bytes have been accumulated
+func (feed *UprFeed) sendBufferAckIfNeeded(event *UprEvent) {
+ if event == nil || event.AckSize == 0 {
+ // this indicates that there is no need to ack to DCP
+ return
+ }
+
+ totalBytes := feed.toAckBytes + event.AckSize
+ if totalBytes > feed.maxAckBytes {
+ feed.toAckBytes = 0
+ feed.sendBufferAck(totalBytes)
+ } else {
+ feed.toAckBytes = totalBytes
+ }
+}
+
+// send buffer ack to dcp
+func (feed *UprFeed) sendBufferAck(sendSize uint32) {
+ bufferAck := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_BUFFERACK,
+ }
+ bufferAck.Extras = make([]byte, 4)
+ binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(sendSize))
+ feed.writeToTransmitCh(bufferAck)
+ feed.stats.TotalBufferAckSent++
+}
+
+func (feed *UprFeed) GetUprStats() *UprStats {
+ return &feed.stats
+}
+
+func composeOpaque(vbno, opaqueMSB uint16) uint32 {
+ return (uint32(opaqueMSB) << 16) | uint32(vbno)
+}
+
+func getUprOpenCtrlOpaque() uint32 {
+ return atomic.AddUint32(&opaqueOpenCtrlWell, 1)
+}
+
+func appOpaque(opq32 uint32) uint16 {
+ return uint16((opq32 & 0xFFFF0000) >> 16)
+}
+
+func vbOpaque(opq32 uint32) uint16 {
+ return uint16(opq32 & 0xFFFF)
+}
+
+// Close this UprFeed.
+func (feed *UprFeed) Close() {
+ feed.muClosed.Lock()
+ defer feed.muClosed.Unlock()
+ if !feed.closed {
+ close(feed.closer)
+ feed.closed = true
+ feed.negotiator.initialize()
+ }
+}
+
+// check if the UprFeed has been closed
+func (feed *UprFeed) Closed() bool {
+ feed.muClosed.RLock()
+ defer feed.muClosed.RUnlock()
+ return feed.closed
+}
diff --git a/vendor/github.com/couchbase/gomemcached/mc_constants.go b/vendor/github.com/couchbase/gomemcached/mc_constants.go
new file mode 100644
index 0000000000..1d5027d16c
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/mc_constants.go
@@ -0,0 +1,335 @@
+// Package gomemcached is binary protocol packet formats and constants.
+package gomemcached
+
+import (
+ "fmt"
+)
+
+const (
+ REQ_MAGIC = 0x80
+ RES_MAGIC = 0x81
+)
+
+// CommandCode for memcached packets.
+type CommandCode uint8
+
+const (
+ GET = CommandCode(0x00)
+ SET = CommandCode(0x01)
+ ADD = CommandCode(0x02)
+ REPLACE = CommandCode(0x03)
+ DELETE = CommandCode(0x04)
+ INCREMENT = CommandCode(0x05)
+ DECREMENT = CommandCode(0x06)
+ QUIT = CommandCode(0x07)
+ FLUSH = CommandCode(0x08)
+ GETQ = CommandCode(0x09)
+ NOOP = CommandCode(0x0a)
+ VERSION = CommandCode(0x0b)
+ GETK = CommandCode(0x0c)
+ GETKQ = CommandCode(0x0d)
+ APPEND = CommandCode(0x0e)
+ PREPEND = CommandCode(0x0f)
+ STAT = CommandCode(0x10)
+ SETQ = CommandCode(0x11)
+ ADDQ = CommandCode(0x12)
+ REPLACEQ = CommandCode(0x13)
+ DELETEQ = CommandCode(0x14)
+ INCREMENTQ = CommandCode(0x15)
+ DECREMENTQ = CommandCode(0x16)
+ QUITQ = CommandCode(0x17)
+ FLUSHQ = CommandCode(0x18)
+ APPENDQ = CommandCode(0x19)
+ AUDIT = CommandCode(0x27)
+ PREPENDQ = CommandCode(0x1a)
+ GAT = CommandCode(0x1d)
+ HELLO = CommandCode(0x1f)
+ RGET = CommandCode(0x30)
+ RSET = CommandCode(0x31)
+ RSETQ = CommandCode(0x32)
+ RAPPEND = CommandCode(0x33)
+ RAPPENDQ = CommandCode(0x34)
+ RPREPEND = CommandCode(0x35)
+ RPREPENDQ = CommandCode(0x36)
+ RDELETE = CommandCode(0x37)
+ RDELETEQ = CommandCode(0x38)
+ RINCR = CommandCode(0x39)
+ RINCRQ = CommandCode(0x3a)
+ RDECR = CommandCode(0x3b)
+ RDECRQ = CommandCode(0x3c)
+
+ SASL_LIST_MECHS = CommandCode(0x20)
+ SASL_AUTH = CommandCode(0x21)
+ SASL_STEP = CommandCode(0x22)
+
+ SET_VBUCKET = CommandCode(0x3d)
+
+ TAP_CONNECT = CommandCode(0x40) // Client-sent request to initiate Tap feed
+ TAP_MUTATION = CommandCode(0x41) // Notification of a SET/ADD/REPLACE/etc. on the server
+ TAP_DELETE = CommandCode(0x42) // Notification of a DELETE on the server
+ TAP_FLUSH = CommandCode(0x43) // Replicates a flush_all command
+ TAP_OPAQUE = CommandCode(0x44) // Opaque control data from the engine
+ TAP_VBUCKET_SET = CommandCode(0x45) // Sets state of vbucket in receiver (used in takeover)
+ TAP_CHECKPOINT_START = CommandCode(0x46) // Notifies start of new checkpoint
+ TAP_CHECKPOINT_END = CommandCode(0x47) // Notifies end of checkpoint
+
+ UPR_OPEN = CommandCode(0x50) // Open a UPR connection with a name
+ UPR_ADDSTREAM = CommandCode(0x51) // Sent by ebucketMigrator to UPR Consumer
+ UPR_CLOSESTREAM = CommandCode(0x52) // Sent by eBucketMigrator to UPR Consumer
+ UPR_FAILOVERLOG = CommandCode(0x54) // Request failover logs
+ UPR_STREAMREQ = CommandCode(0x53) // Stream request from consumer to producer
+ UPR_STREAMEND = CommandCode(0x55) // Sent by producer when it has no more messages to stream
+ UPR_SNAPSHOT = CommandCode(0x56) // Start of a new snapshot
+ UPR_MUTATION = CommandCode(0x57) // Key mutation
+ UPR_DELETION = CommandCode(0x58) // Key deletion
+ UPR_EXPIRATION = CommandCode(0x59) // Key expiration
+ UPR_FLUSH = CommandCode(0x5a) // Delete all the data for a vbucket
+ UPR_NOOP = CommandCode(0x5c) // UPR NOOP
+ UPR_BUFFERACK = CommandCode(0x5d) // UPR Buffer Acknowledgement
+ UPR_CONTROL = CommandCode(0x5e) // Set flow control params
+
+ SELECT_BUCKET = CommandCode(0x89) // Select bucket
+
+ OBSERVE_SEQNO = CommandCode(0x91) // Sequence Number based Observe
+ OBSERVE = CommandCode(0x92)
+
+ GET_META = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc
+ SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs
+ SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta.
+)
+
+// command codes that are counted toward DCP control buffer
+// when DCP clients receive DCP messages with these command codes, they need to provide acknowledgement
+var BufferedCommandCodeMap = map[CommandCode]bool{
+ SET_VBUCKET: true,
+ UPR_STREAMEND: true,
+ UPR_SNAPSHOT: true,
+ UPR_MUTATION: true,
+ UPR_DELETION: true,
+ UPR_EXPIRATION: true}
+
+// Status field for memcached response.
+type Status uint16
+
+// Matches with protocol_binary.h as source of truth
+const (
+ SUCCESS = Status(0x00)
+ KEY_ENOENT = Status(0x01)
+ KEY_EEXISTS = Status(0x02)
+ E2BIG = Status(0x03)
+ EINVAL = Status(0x04)
+ NOT_STORED = Status(0x05)
+ DELTA_BADVAL = Status(0x06)
+ NOT_MY_VBUCKET = Status(0x07)
+ NO_BUCKET = Status(0x08)
+ LOCKED = Status(0x09)
+ AUTH_STALE = Status(0x1f)
+ AUTH_ERROR = Status(0x20)
+ AUTH_CONTINUE = Status(0x21)
+ ERANGE = Status(0x22)
+ ROLLBACK = Status(0x23)
+ EACCESS = Status(0x24)
+ NOT_INITIALIZED = Status(0x25)
+ UNKNOWN_COMMAND = Status(0x81)
+ ENOMEM = Status(0x82)
+ NOT_SUPPORTED = Status(0x83)
+ EINTERNAL = Status(0x84)
+ EBUSY = Status(0x85)
+ TMPFAIL = Status(0x86)
+
+ // SUBDOC
+ SUBDOC_PATH_NOT_FOUND = Status(0xc0)
+ SUBDOC_BAD_MULTI = Status(0xcc)
+ SUBDOC_MULTI_PATH_FAILURE_DELETED = Status(0xd3)
+)
+
+// for log redaction
+const (
+ UdTagBegin = "<ud>"
+ UdTagEnd = "</ud>"
+)
+
+var isFatal = map[Status]bool{
+ DELTA_BADVAL: true,
+ NO_BUCKET: true,
+ AUTH_STALE: true,
+ AUTH_ERROR: true,
+ ERANGE: true,
+ ROLLBACK: true,
+ EACCESS: true,
+ ENOMEM: true,
+ NOT_SUPPORTED: true,
+}
+
+// the producer/consumer bit in dcp flags
+var DCP_PRODUCER uint32 = 0x01
+
+// the include XATTRS bit in dcp flags
+var DCP_OPEN_INCLUDE_XATTRS uint32 = 0x04
+
+// the include deletion time bit in dcp flags
+var DCP_OPEN_INCLUDE_DELETE_TIMES uint32 = 0x20
+
+// Datatype to Include XATTRS in SUBDOC GET
+var SUBDOC_FLAG_XATTR uint8 = 0x04
+
+// MCItem is an internal representation of an item.
+type MCItem struct {
+ Cas uint64
+ Flags, Expiration uint32
+ Data []byte
+}
+
+// Number of bytes in a binary protocol header.
+const HDR_LEN = 24
+
+// Mapping of CommandCode -> name of command (not exhaustive)
+var CommandNames map[CommandCode]string
+
+// StatusNames human readable names for memcached response.
+var StatusNames map[Status]string
+
+func init() {
+ CommandNames = make(map[CommandCode]string)
+ CommandNames[GET] = "GET"
+ CommandNames[SET] = "SET"
+ CommandNames[ADD] = "ADD"
+ CommandNames[REPLACE] = "REPLACE"
+ CommandNames[DELETE] = "DELETE"
+ CommandNames[INCREMENT] = "INCREMENT"
+ CommandNames[DECREMENT] = "DECREMENT"
+ CommandNames[QUIT] = "QUIT"
+ CommandNames[FLUSH] = "FLUSH"
+ CommandNames[GETQ] = "GETQ"
+ CommandNames[NOOP] = "NOOP"
+ CommandNames[VERSION] = "VERSION"
+ CommandNames[GETK] = "GETK"
+ CommandNames[GETKQ] = "GETKQ"
+ CommandNames[APPEND] = "APPEND"
+ CommandNames[PREPEND] = "PREPEND"
+ CommandNames[STAT] = "STAT"
+ CommandNames[SETQ] = "SETQ"
+ CommandNames[ADDQ] = "ADDQ"
+ CommandNames[REPLACEQ] = "REPLACEQ"
+ CommandNames[DELETEQ] = "DELETEQ"
+ CommandNames[INCREMENTQ] = "INCREMENTQ"
+ CommandNames[DECREMENTQ] = "DECREMENTQ"
+ CommandNames[QUITQ] = "QUITQ"
+ CommandNames[FLUSHQ] = "FLUSHQ"
+ CommandNames[APPENDQ] = "APPENDQ"
+ CommandNames[PREPENDQ] = "PREPENDQ"
+ CommandNames[RGET] = "RGET"
+ CommandNames[RSET] = "RSET"
+ CommandNames[RSETQ] = "RSETQ"
+ CommandNames[RAPPEND] = "RAPPEND"
+ CommandNames[RAPPENDQ] = "RAPPENDQ"
+ CommandNames[RPREPEND] = "RPREPEND"
+ CommandNames[RPREPENDQ] = "RPREPENDQ"
+ CommandNames[RDELETE] = "RDELETE"
+ CommandNames[RDELETEQ] = "RDELETEQ"
+ CommandNames[RINCR] = "RINCR"
+ CommandNames[RINCRQ] = "RINCRQ"
+ CommandNames[RDECR] = "RDECR"
+ CommandNames[RDECRQ] = "RDECRQ"
+
+ CommandNames[SASL_LIST_MECHS] = "SASL_LIST_MECHS"
+ CommandNames[SASL_AUTH] = "SASL_AUTH"
+ CommandNames[SASL_STEP] = "SASL_STEP"
+
+ CommandNames[TAP_CONNECT] = "TAP_CONNECT"
+ CommandNames[TAP_MUTATION] = "TAP_MUTATION"
+ CommandNames[TAP_DELETE] = "TAP_DELETE"
+ CommandNames[TAP_FLUSH] = "TAP_FLUSH"
+ CommandNames[TAP_OPAQUE] = "TAP_OPAQUE"
+ CommandNames[TAP_VBUCKET_SET] = "TAP_VBUCKET_SET"
+ CommandNames[TAP_CHECKPOINT_START] = "TAP_CHECKPOINT_START"
+ CommandNames[TAP_CHECKPOINT_END] = "TAP_CHECKPOINT_END"
+
+ CommandNames[UPR_OPEN] = "UPR_OPEN"
+ CommandNames[UPR_ADDSTREAM] = "UPR_ADDSTREAM"
+ CommandNames[UPR_CLOSESTREAM] = "UPR_CLOSESTREAM"
+ CommandNames[UPR_FAILOVERLOG] = "UPR_FAILOVERLOG"
+ CommandNames[UPR_STREAMREQ] = "UPR_STREAMREQ"
+ CommandNames[UPR_STREAMEND] = "UPR_STREAMEND"
+ CommandNames[UPR_SNAPSHOT] = "UPR_SNAPSHOT"
+ CommandNames[UPR_MUTATION] = "UPR_MUTATION"
+ CommandNames[UPR_DELETION] = "UPR_DELETION"
+ CommandNames[UPR_EXPIRATION] = "UPR_EXPIRATION"
+ CommandNames[UPR_FLUSH] = "UPR_FLUSH"
+ CommandNames[UPR_NOOP] = "UPR_NOOP"
+ CommandNames[UPR_BUFFERACK] = "UPR_BUFFERACK"
+ CommandNames[UPR_CONTROL] = "UPR_CONTROL"
+ CommandNames[SUBDOC_GET] = "SUBDOC_GET"
+ CommandNames[SUBDOC_MULTI_LOOKUP] = "SUBDOC_MULTI_LOOKUP"
+
+ StatusNames = make(map[Status]string)
+ StatusNames[SUCCESS] = "SUCCESS"
+ StatusNames[KEY_ENOENT] = "KEY_ENOENT"
+ StatusNames[KEY_EEXISTS] = "KEY_EEXISTS"
+ StatusNames[E2BIG] = "E2BIG"
+ StatusNames[EINVAL] = "EINVAL"
+ StatusNames[NOT_STORED] = "NOT_STORED"
+ StatusNames[DELTA_BADVAL] = "DELTA_BADVAL"
+ StatusNames[NOT_MY_VBUCKET] = "NOT_MY_VBUCKET"
+ StatusNames[NO_BUCKET] = "NO_BUCKET"
+ StatusNames[AUTH_STALE] = "AUTH_STALE"
+ StatusNames[AUTH_ERROR] = "AUTH_ERROR"
+ StatusNames[AUTH_CONTINUE] = "AUTH_CONTINUE"
+ StatusNames[ERANGE] = "ERANGE"
+ StatusNames[ROLLBACK] = "ROLLBACK"
+ StatusNames[EACCESS] = "EACCESS"
+ StatusNames[NOT_INITIALIZED] = "NOT_INITIALIZED"
+ StatusNames[UNKNOWN_COMMAND] = "UNKNOWN_COMMAND"
+ StatusNames[ENOMEM] = "ENOMEM"
+ StatusNames[NOT_SUPPORTED] = "NOT_SUPPORTED"
+ StatusNames[EINTERNAL] = "EINTERNAL"
+ StatusNames[EBUSY] = "EBUSY"
+ StatusNames[TMPFAIL] = "TMPFAIL"
+ StatusNames[SUBDOC_PATH_NOT_FOUND] = "SUBDOC_PATH_NOT_FOUND"
+ StatusNames[SUBDOC_BAD_MULTI] = "SUBDOC_BAD_MULTI"
+
+}
+
+// String an op code.
+func (o CommandCode) String() (rv string) {
+ rv = CommandNames[o]
+ if rv == "" {
+ rv = fmt.Sprintf("0x%02x", int(o))
+ }
+ return rv
+}
+
+// String an op code.
+func (s Status) String() (rv string) {
+ rv = StatusNames[s]
+ if rv == "" {
+ rv = fmt.Sprintf("0x%02x", int(s))
+ }
+ return rv
+}
+
+// IsQuiet will return true if a command is a "quiet" command.
+func (o CommandCode) IsQuiet() bool {
+ switch o {
+ case GETQ,
+ GETKQ,
+ SETQ,
+ ADDQ,
+ REPLACEQ,
+ DELETEQ,
+ INCREMENTQ,
+ DECREMENTQ,
+ QUITQ,
+ FLUSHQ,
+ APPENDQ,
+ PREPENDQ,
+ RSETQ,
+ RAPPENDQ,
+ RPREPENDQ,
+ RDELETEQ,
+ RINCRQ,
+ RDECRQ:
+ return true
+ }
+ return false
+}
diff --git a/vendor/github.com/couchbase/gomemcached/mc_req.go b/vendor/github.com/couchbase/gomemcached/mc_req.go
new file mode 100644
index 0000000000..3ff67ab9a7
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/mc_req.go
@@ -0,0 +1,197 @@
+package gomemcached
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+)
+
+// The maximum reasonable body length to expect.
+// Anything larger than this will result in an error.
+// The current limit, 20MB, is the size limit supported by ep-engine.
+var MaxBodyLen = int(20 * 1024 * 1024)
+
+// MCRequest is memcached Request
+type MCRequest struct {
+ // The command being issued
+ Opcode CommandCode
+ // The CAS (if applicable, or 0)
+ Cas uint64
+ // An opaque value to be returned with this request
+ Opaque uint32
+ // The vbucket to which this command belongs
+ VBucket uint16
+ // Command extras, key, and body
+ Extras, Key, Body, ExtMeta []byte
+ // Datatype identifier
+ DataType uint8
+}
+
+// Size gives the number of bytes this request requires.
+func (req *MCRequest) Size() int {
+ return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta)
+}
+
+// A debugging string representation of this request
+func (req MCRequest) String() string {
+ return fmt.Sprintf("{MCRequest opcode=%s, bodylen=%d, key='%s'}",
+ req.Opcode, len(req.Body), req.Key)
+}
+
+func (req *MCRequest) fillHeaderBytes(data []byte) int {
+
+ pos := 0
+ data[pos] = REQ_MAGIC
+ pos++
+ data[pos] = byte(req.Opcode)
+ pos++
+ binary.BigEndian.PutUint16(data[pos:pos+2],
+ uint16(len(req.Key)))
+ pos += 2
+
+ // 4
+ data[pos] = byte(len(req.Extras))
+ pos++
+ // Data type
+ if req.DataType != 0 {
+ data[pos] = byte(req.DataType)
+ }
+ pos++
+ binary.BigEndian.PutUint16(data[pos:pos+2], req.VBucket)
+ pos += 2
+
+ // 8
+ binary.BigEndian.PutUint32(data[pos:pos+4],
+ uint32(len(req.Body)+len(req.Key)+len(req.Extras)+len(req.ExtMeta)))
+ pos += 4
+
+ // 12
+ binary.BigEndian.PutUint32(data[pos:pos+4], req.Opaque)
+ pos += 4
+
+ // 16
+ if req.Cas != 0 {
+ binary.BigEndian.PutUint64(data[pos:pos+8], req.Cas)
+ }
+ pos += 8
+
+ if len(req.Extras) > 0 {
+ copy(data[pos:pos+len(req.Extras)], req.Extras)
+ pos += len(req.Extras)
+ }
+
+ if len(req.Key) > 0 {
+ copy(data[pos:pos+len(req.Key)], req.Key)
+ pos += len(req.Key)
+ }
+
+ return pos
+}
+
+// HeaderBytes will return the wire representation of the request header
+// (with the extras and key).
+func (req *MCRequest) HeaderBytes() []byte {
+ data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key))
+
+ req.fillHeaderBytes(data)
+
+ return data
+}
+
+// Bytes will return the wire representation of this request.
+func (req *MCRequest) Bytes() []byte {
+ data := make([]byte, req.Size())
+
+ pos := req.fillHeaderBytes(data)
+
+ if len(req.Body) > 0 {
+ copy(data[pos:pos+len(req.Body)], req.Body)
+ }
+
+ if len(req.ExtMeta) > 0 {
+ copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
+ }
+
+ return data
+}
+
+// Transmit will send this request message across a writer.
+func (req *MCRequest) Transmit(w io.Writer) (n int, err error) {
+ if len(req.Body) < 128 {
+ n, err = w.Write(req.Bytes())
+ } else {
+ n, err = w.Write(req.HeaderBytes())
+ if err == nil {
+ m := 0
+ m, err = w.Write(req.Body)
+ n += m
+ }
+ }
+ return
+}
+
+// Receive will fill this MCRequest with the data from a reader.
+func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
+ if len(hdrBytes) < HDR_LEN {
+ hdrBytes = []byte{
+ 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0}
+ }
+ n, err := io.ReadFull(r, hdrBytes)
+ if err != nil {
+ return n, err
+ }
+
+ if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC {
+ return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
+ }
+
+ klen := int(binary.BigEndian.Uint16(hdrBytes[2:]))
+ elen := int(hdrBytes[4])
+ // Data type at 5
+ req.DataType = uint8(hdrBytes[5])
+
+ req.Opcode = CommandCode(hdrBytes[1])
+ // Vbucket at 6:7
+ req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:])
+ totalBodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:]))
+
+ req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:])
+ req.Cas = binary.BigEndian.Uint64(hdrBytes[16:])
+
+ if totalBodyLen > 0 {
+ buf := make([]byte, totalBodyLen)
+ m, err := io.ReadFull(r, buf)
+ n += m
+ if err == nil {
+ if req.Opcode >= TAP_MUTATION &&
+ req.Opcode <= TAP_CHECKPOINT_END &&
+ len(buf) > 1 {
+ // In these commands there is "engine private"
+ // data at the end of the extras. The first 2
+ // bytes of extra data give its length.
+ elen += int(binary.BigEndian.Uint16(buf))
+ }
+
+ req.Extras = buf[0:elen]
+ req.Key = buf[elen : klen+elen]
+
+ // get the length of extended metadata
+ extMetaLen := 0
+ if elen > 29 {
+ extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
+ }
+
+ bodyLen := totalBodyLen - klen - elen - extMetaLen
+ if bodyLen > MaxBodyLen {
+ return n, fmt.Errorf("%d is too big (max %d)",
+ bodyLen, MaxBodyLen)
+ }
+
+ req.Body = buf[klen+elen : klen+elen+bodyLen]
+ req.ExtMeta = buf[klen+elen+bodyLen:]
+ }
+ }
+ return n, err
+}
diff --git a/vendor/github.com/couchbase/gomemcached/mc_res.go b/vendor/github.com/couchbase/gomemcached/mc_res.go
new file mode 100644
index 0000000000..2b4cfe1349
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/mc_res.go
@@ -0,0 +1,267 @@
+package gomemcached
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "sync"
+)
+
+// MCResponse is memcached response
+type MCResponse struct {
+ // The command opcode of the command that sent the request
+ Opcode CommandCode
+ // The status of the response
+ Status Status
+ // The opaque sent in the request
+ Opaque uint32
+ // The CAS identifier (if applicable)
+ Cas uint64
+ // Extras, key, and body for this response
+ Extras, Key, Body []byte
+ // If true, this represents a fatal condition and we should hang up
+ Fatal bool
+ // Datatype identifier
+ DataType uint8
+}
+
+// A debugging string representation of this response
+func (res MCResponse) String() string {
+ return fmt.Sprintf("{MCResponse status=%v keylen=%d, extralen=%d, bodylen=%d}",
+ res.Status, len(res.Key), len(res.Extras), len(res.Body))
+}
+
+// Response as an error.
+func (res *MCResponse) Error() string {
+ return fmt.Sprintf("MCResponse status=%v, opcode=%v, opaque=%v, msg: %s",
+ res.Status, res.Opcode, res.Opaque, string(res.Body))
+}
+
+func errStatus(e error) Status {
+ status := Status(0xffff)
+ if res, ok := e.(*MCResponse); ok {
+ status = res.Status
+ }
+ return status
+}
+
+// IsNotFound is true if this error represents a "not found" response.
+func IsNotFound(e error) bool {
+ return errStatus(e) == KEY_ENOENT
+}
+
+// IsFatal is false if this error isn't believed to be fatal to a connection.
+func IsFatal(e error) bool {
+ if e == nil {
+ return false
+ }
+ _, ok := isFatal[errStatus(e)]
+ if ok {
+ return true
+ }
+ return false
+}
+
+// Size is number of bytes this response consumes on the wire.
+func (res *MCResponse) Size() int {
+ return HDR_LEN + len(res.Extras) + len(res.Key) + len(res.Body)
+}
+
+func (res *MCResponse) fillHeaderBytes(data []byte) int {
+ pos := 0
+ data[pos] = RES_MAGIC
+ pos++
+ data[pos] = byte(res.Opcode)
+ pos++
+ binary.BigEndian.PutUint16(data[pos:pos+2],
+ uint16(len(res.Key)))
+ pos += 2
+
+ // 4
+ data[pos] = byte(len(res.Extras))
+ pos++
+ // Data type
+ if res.DataType != 0 {
+ data[pos] = byte(res.DataType)
+ } else {
+ data[pos] = 0
+ }
+ pos++
+ binary.BigEndian.PutUint16(data[pos:pos+2], uint16(res.Status))
+ pos += 2
+
+ // 8
+ binary.BigEndian.PutUint32(data[pos:pos+4],
+ uint32(len(res.Body)+len(res.Key)+len(res.Extras)))
+ pos += 4
+
+ // 12
+ binary.BigEndian.PutUint32(data[pos:pos+4], res.Opaque)
+ pos += 4
+
+ // 16
+ binary.BigEndian.PutUint64(data[pos:pos+8], res.Cas)
+ pos += 8
+
+ if len(res.Extras) > 0 {
+ copy(data[pos:pos+len(res.Extras)], res.Extras)
+ pos += len(res.Extras)
+ }
+
+ if len(res.Key) > 0 {
+ copy(data[pos:pos+len(res.Key)], res.Key)
+ pos += len(res.Key)
+ }
+
+ return pos
+}
+
+// HeaderBytes will get just the header bytes for this response.
+func (res *MCResponse) HeaderBytes() []byte {
+ data := make([]byte, HDR_LEN+len(res.Extras)+len(res.Key))
+
+ res.fillHeaderBytes(data)
+
+ return data
+}
+
+// Bytes will return the actual bytes transmitted for this response.
+func (res *MCResponse) Bytes() []byte {
+ data := make([]byte, res.Size())
+
+ pos := res.fillHeaderBytes(data)
+
+ copy(data[pos:pos+len(res.Body)], res.Body)
+
+ return data
+}
+
+// Transmit will send this response message across a writer.
+func (res *MCResponse) Transmit(w io.Writer) (n int, err error) {
+ if len(res.Body) < 128 {
+ n, err = w.Write(res.Bytes())
+ } else {
+ n, err = w.Write(res.HeaderBytes())
+ if err == nil {
+ m := 0
+ m, err = w.Write(res.Body)
+ m += n
+ }
+ }
+ return
+}
+
+// Receive will fill this MCResponse with the data from this reader.
+func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) {
+ if len(hdrBytes) < HDR_LEN {
+ hdrBytes = []byte{
+ 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0}
+ }
+ n, err = io.ReadFull(r, hdrBytes)
+ if err != nil {
+ return n, err
+ }
+
+ if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC {
+ return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
+ }
+
+ klen := int(binary.BigEndian.Uint16(hdrBytes[2:4]))
+ elen := int(hdrBytes[4])
+
+ res.Opcode = CommandCode(hdrBytes[1])
+ res.DataType = uint8(hdrBytes[5])
+ res.Status = Status(binary.BigEndian.Uint16(hdrBytes[6:8]))
+ res.Opaque = binary.BigEndian.Uint32(hdrBytes[12:16])
+ res.Cas = binary.BigEndian.Uint64(hdrBytes[16:24])
+
+ bodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:12])) - (klen + elen)
+
+ //defer function to debug the panic seen with MB-15557
+ defer func() {
+ if e := recover(); e != nil {
+ err = fmt.Errorf(`Panic in Receive. Response %v \n
+ key len %v extra len %v bodylen %v`, res, klen, elen, bodyLen)
+ }
+ }()
+
+ buf := make([]byte, klen+elen+bodyLen)
+ m, err := io.ReadFull(r, buf)
+ if err == nil {
+ res.Extras = buf[0:elen]
+ res.Key = buf[elen : klen+elen]
+ res.Body = buf[klen+elen:]
+ }
+
+ return n + m, err
+}
+
+type MCResponsePool struct {
+ pool *sync.Pool
+}
+
+func NewMCResponsePool() *MCResponsePool {
+ rv := &MCResponsePool{
+ pool: &sync.Pool{
+ New: func() interface{} {
+ return &MCResponse{}
+ },
+ },
+ }
+
+ return rv
+}
+
+func (this *MCResponsePool) Get() *MCResponse {
+ return this.pool.Get().(*MCResponse)
+}
+
+func (this *MCResponsePool) Put(r *MCResponse) {
+ if r == nil {
+ return
+ }
+
+ r.Extras = nil
+ r.Key = nil
+ r.Body = nil
+ r.Fatal = false
+
+ this.pool.Put(r)
+}
+
+type StringMCResponsePool struct {
+ pool *sync.Pool
+ size int
+}
+
+func NewStringMCResponsePool(size int) *StringMCResponsePool {
+ rv := &StringMCResponsePool{
+ pool: &sync.Pool{
+ New: func() interface{} {
+ return make(map[string]*MCResponse, size)
+ },
+ },
+ size: size,
+ }
+
+ return rv
+}
+
+func (this *StringMCResponsePool) Get() map[string]*MCResponse {
+ return this.pool.Get().(map[string]*MCResponse)
+}
+
+func (this *StringMCResponsePool) Put(m map[string]*MCResponse) {
+ if m == nil || len(m) > 2*this.size {
+ return
+ }
+
+ for k := range m {
+ m[k] = nil
+ delete(m, k)
+ }
+
+ this.pool.Put(m)
+}
diff --git a/vendor/github.com/couchbase/gomemcached/tap.go b/vendor/github.com/couchbase/gomemcached/tap.go
new file mode 100644
index 0000000000..e48623281b
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/tap.go
@@ -0,0 +1,168 @@
+package gomemcached
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "strings"
+)
+
+type TapConnectFlag uint32
+
+// Tap connect option flags
+const (
+ BACKFILL = TapConnectFlag(0x01)
+ DUMP = TapConnectFlag(0x02)
+ LIST_VBUCKETS = TapConnectFlag(0x04)
+ TAKEOVER_VBUCKETS = TapConnectFlag(0x08)
+ SUPPORT_ACK = TapConnectFlag(0x10)
+ REQUEST_KEYS_ONLY = TapConnectFlag(0x20)
+ CHECKPOINT = TapConnectFlag(0x40)
+ REGISTERED_CLIENT = TapConnectFlag(0x80)
+ FIX_FLAG_BYTEORDER = TapConnectFlag(0x100)
+)
+
+// Tap opaque event subtypes
+const (
+ TAP_OPAQUE_ENABLE_AUTO_NACK = 0
+ TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1
+ TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2
+ TAP_OPAQUE_CLOSE_TAP_STREAM = 7
+ TAP_OPAQUE_CLOSE_BACKFILL = 8
+)
+
+// Tap item flags
+const (
+ TAP_ACK = 1
+ TAP_NO_VALUE = 2
+ TAP_FLAG_NETWORK_BYTE_ORDER = 4
+)
+
+// TapConnectFlagNames for TapConnectFlag
+var TapConnectFlagNames = map[TapConnectFlag]string{
+ BACKFILL: "BACKFILL",
+ DUMP: "DUMP",
+ LIST_VBUCKETS: "LIST_VBUCKETS",
+ TAKEOVER_VBUCKETS: "TAKEOVER_VBUCKETS",
+ SUPPORT_ACK: "SUPPORT_ACK",
+ REQUEST_KEYS_ONLY: "REQUEST_KEYS_ONLY",
+ CHECKPOINT: "CHECKPOINT",
+ REGISTERED_CLIENT: "REGISTERED_CLIENT",
+ FIX_FLAG_BYTEORDER: "FIX_FLAG_BYTEORDER",
+}
+
+// TapItemParser is a function to parse a single tap extra.
+type TapItemParser func(io.Reader) (interface{}, error)
+
+// TapParseUint64 is a function to parse a single tap uint64.
+func TapParseUint64(r io.Reader) (interface{}, error) {
+ var rv uint64
+ err := binary.Read(r, binary.BigEndian, &rv)
+ return rv, err
+}
+
+// TapParseUint16 is a function to parse a single tap uint16.
+func TapParseUint16(r io.Reader) (interface{}, error) {
+ var rv uint16
+ err := binary.Read(r, binary.BigEndian, &rv)
+ return rv, err
+}
+
+// TapParseBool is a function to parse a single tap boolean.
+func TapParseBool(r io.Reader) (interface{}, error) {
+ return true, nil
+}
+
+// TapParseVBList parses a list of vBucket numbers as []uint16.
+func TapParseVBList(r io.Reader) (interface{}, error) {
+ num, err := TapParseUint16(r)
+ if err != nil {
+ return nil, err
+ }
+ n := int(num.(uint16))
+
+ rv := make([]uint16, n)
+ for i := 0; i < n; i++ {
+ x, err := TapParseUint16(r)
+ if err != nil {
+ return nil, err
+ }
+ rv[i] = x.(uint16)
+ }
+
+ return rv, err
+}
+
+// TapFlagParsers parser functions for TAP fields.
+var TapFlagParsers = map[TapConnectFlag]TapItemParser{
+ BACKFILL: TapParseUint64,
+ LIST_VBUCKETS: TapParseVBList,
+}
+
+// SplitFlags will split the ORed flags into the individual bit flags.
+func (f TapConnectFlag) SplitFlags() []TapConnectFlag {
+ rv := []TapConnectFlag{}
+ for i := uint32(1); f != 0; i = i << 1 {
+ if uint32(f)&i == i {
+ rv = append(rv, TapConnectFlag(i))
+ }
+ f = TapConnectFlag(uint32(f) & (^i))
+ }
+ return rv
+}
+
+func (f TapConnectFlag) String() string {
+ parts := []string{}
+ for _, x := range f.SplitFlags() {
+ p := TapConnectFlagNames[x]
+ if p == "" {
+ p = fmt.Sprintf("0x%x", int(x))
+ }
+ parts = append(parts, p)
+ }
+ return strings.Join(parts, "|")
+}
+
+type TapConnect struct {
+ Flags map[TapConnectFlag]interface{}
+ RemainingBody []byte
+ Name string
+}
+
+// ParseTapCommands parse the tap request into the interesting bits we may
+// need to do something with.
+func (req *MCRequest) ParseTapCommands() (TapConnect, error) {
+ rv := TapConnect{
+ Flags: map[TapConnectFlag]interface{}{},
+ Name: string(req.Key),
+ }
+
+ if len(req.Extras) < 4 {
+ return rv, fmt.Errorf("not enough extra bytes: %x", req.Extras)
+ }
+
+ flags := TapConnectFlag(binary.BigEndian.Uint32(req.Extras))
+
+ r := bytes.NewReader(req.Body)
+
+ for _, f := range flags.SplitFlags() {
+ fun := TapFlagParsers[f]
+ if fun == nil {
+ fun = TapParseBool
+ }
+
+ val, err := fun(r)
+ if err != nil {
+ return rv, err
+ }
+
+ rv.Flags[f] = val
+ }
+
+ var err error
+ rv.RemainingBody, err = ioutil.ReadAll(r)
+
+ return rv, err
+}