diff options
author | techknowlogick <matti@mdranta.net> | 2019-02-05 11:52:51 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-05 11:52:51 -0500 |
commit | 9de871a0f8911030f8e06a881803cf722b8798ea (patch) | |
tree | 206400f0a5873d7d078fcdd004956036f07a1db5 /vendor/github.com/couchbase/gomemcached | |
parent | bf4badad1d68c18d7ffb92c69e09e4e8aa252935 (diff) | |
download | gitea-9de871a0f8911030f8e06a881803cf722b8798ea.tar.gz gitea-9de871a0f8911030f8e06a881803cf722b8798ea.zip |
add other session providers (#5963)
Diffstat (limited to 'vendor/github.com/couchbase/gomemcached')
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/LICENSE | 19 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/client/mc.go | 1074 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/client/tap_feed.go | 333 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/client/transport.go | 67 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/client/upr_feed.go | 1005 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/mc_constants.go | 335 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/mc_req.go | 197 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/mc_res.go | 267 | ||||
-rw-r--r-- | vendor/github.com/couchbase/gomemcached/tap.go | 168 |
9 files changed, 3465 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/gomemcached/LICENSE b/vendor/github.com/couchbase/gomemcached/LICENSE new file mode 100644 index 0000000000..b01ef80261 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2013 Dustin Sallings + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/couchbase/gomemcached/client/mc.go b/vendor/github.com/couchbase/gomemcached/client/mc.go new file mode 100644 index 0000000000..bd1433ba28 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/client/mc.go @@ -0,0 +1,1074 @@ +// Package memcached provides a memcached binary protocol client. +package memcached + +import ( + "encoding/binary" + "fmt" + "github.com/couchbase/gomemcached" + "github.com/couchbase/goutils/logging" + "github.com/couchbase/goutils/scramsha" + "github.com/pkg/errors" + "io" + "math" + "net" + "strings" + "sync" + "sync/atomic" + "time" +) + +type ClientIface interface { + Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) + Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) + Auth(user, pass string) (*gomemcached.MCResponse, error) + AuthList() (*gomemcached.MCResponse, error) + AuthPlain(user, pass string) (*gomemcached.MCResponse, error) + AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) + CASNext(vb uint16, k string, exp int, state *CASState) bool + CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error) + Close() error + Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error) + Del(vb uint16, key string) (*gomemcached.MCResponse, error) + EnableMutationToken() (*gomemcached.MCResponse, error) + Get(vb uint16, key string) (*gomemcached.MCResponse, error) + GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) + GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) + GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error + GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) + GetRandomDoc() (*gomemcached.MCResponse, error) + Hijack() io.ReadWriteCloser + Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error) + Observe(vb uint16, key string) (result ObserveResult, err error) + ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) + Receive() (*gomemcached.MCResponse, error) + ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) + Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) + Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) + SetKeepAliveOptions(interval time.Duration) + SetReadDeadline(t time.Time) + SetDeadline(t time.Time) + SelectBucket(bucket string) (*gomemcached.MCResponse, error) + SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) + Stats(key string) ([]StatValue, error) + StatsMap(key string) (map[string]string, error) + StatsMapForSpecifiedStats(key string, statsMap map[string]string) error + Transmit(req *gomemcached.MCRequest) error + TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error + TransmitResponse(res *gomemcached.MCResponse) error + + // UprFeed Related + NewUprFeed() (*UprFeed, error) + NewUprFeedIface() (UprFeedIface, error) + NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) + NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) + UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) +} + +const bufsize = 1024 + +var UnHealthy uint32 = 0 +var Healthy uint32 = 1 + +type Features []Feature +type Feature uint16 + +const FeatureMutationToken = Feature(0x04) +const FeatureXattr = Feature(0x06) +const FeatureDataType = Feature(0x0b) + +// The Client itself. +type Client struct { + conn io.ReadWriteCloser + // use uint32 type so that it can be accessed through atomic APIs + healthy uint32 + opaque uint32 + + hdrBuf []byte +} + +var ( + DefaultDialTimeout = time.Duration(0) // No timeout + + DefaultWriteTimeout = time.Duration(0) // No timeout + + dialFun = func(prot, dest string) (net.Conn, error) { + return net.DialTimeout(prot, dest, DefaultDialTimeout) + } +) + +// Connect to a memcached server. +func Connect(prot, dest string) (rv *Client, err error) { + conn, err := dialFun(prot, dest) + if err != nil { + return nil, err + } + return Wrap(conn) +} + +func SetDefaultTimeouts(dial, read, write time.Duration) { + DefaultDialTimeout = dial + DefaultWriteTimeout = write +} + +func SetDefaultDialTimeout(dial time.Duration) { + DefaultDialTimeout = dial +} + +func (c *Client) SetKeepAliveOptions(interval time.Duration) { + c.conn.(*net.TCPConn).SetKeepAlive(true) + c.conn.(*net.TCPConn).SetKeepAlivePeriod(interval) +} + +func (c *Client) SetReadDeadline(t time.Time) { + c.conn.(*net.TCPConn).SetReadDeadline(t) +} + +func (c *Client) SetDeadline(t time.Time) { + c.conn.(*net.TCPConn).SetDeadline(t) +} + +// Wrap an existing transport. +func Wrap(rwc io.ReadWriteCloser) (rv *Client, err error) { + client := &Client{ + conn: rwc, + hdrBuf: make([]byte, gomemcached.HDR_LEN), + opaque: uint32(1), + } + client.setHealthy(true) + return client, nil +} + +// Close the connection when you're done. +func (c *Client) Close() error { + return c.conn.Close() +} + +// IsHealthy returns true unless the client is belived to have +// difficulty communicating to its server. +// +// This is useful for connection pools where we want to +// non-destructively determine that a connection may be reused. +func (c Client) IsHealthy() bool { + healthyState := atomic.LoadUint32(&c.healthy) + return healthyState == Healthy +} + +// Send a custom request and get the response. +func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) { + err = c.Transmit(req) + if err != nil { + return + } + resp, _, err := getResponse(c.conn, c.hdrBuf) + c.setHealthy(!gomemcached.IsFatal(err)) + return resp, err +} + +// Transmit send a request, but does not wait for a response. +func (c *Client) Transmit(req *gomemcached.MCRequest) error { + if DefaultWriteTimeout > 0 { + c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout)) + } + _, err := transmitRequest(c.conn, req) + // clear write deadline to avoid interference with future write operations + if DefaultWriteTimeout > 0 { + c.conn.(net.Conn).SetWriteDeadline(time.Time{}) + } + if err != nil { + c.setHealthy(false) + } + return err +} + +func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error { + c.conn.(net.Conn).SetWriteDeadline(deadline) + + _, err := transmitRequest(c.conn, req) + + // clear write deadline to avoid interference with future write operations + c.conn.(net.Conn).SetWriteDeadline(time.Time{}) + + if err != nil { + c.setHealthy(false) + } + return err +} + +// TransmitResponse send a response, does not wait. +func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error { + if DefaultWriteTimeout > 0 { + c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout)) + } + _, err := transmitResponse(c.conn, res) + // clear write deadline to avoid interference with future write operations + if DefaultWriteTimeout > 0 { + c.conn.(net.Conn).SetWriteDeadline(time.Time{}) + } + if err != nil { + c.setHealthy(false) + } + return err +} + +// Receive a response +func (c *Client) Receive() (*gomemcached.MCResponse, error) { + resp, _, err := getResponse(c.conn, c.hdrBuf) + if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY { + c.setHealthy(false) + } + return resp, err +} + +func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) { + c.conn.(net.Conn).SetReadDeadline(deadline) + + resp, _, err := getResponse(c.conn, c.hdrBuf) + + // Clear read deadline to avoid interference with future read operations. + c.conn.(net.Conn).SetReadDeadline(time.Time{}) + + if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY { + c.setHealthy(false) + } + return resp, err +} + +func appendMutationToken(bytes []byte) []byte { + bytes = append(bytes, 0, 0) + binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04)) + return bytes +} + +//Send a hello command to enable MutationTokens +func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) { + var payload []byte + payload = appendMutationToken(payload) + + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.HELLO, + Key: []byte("GoMemcached"), + Body: payload, + }) + +} + +//Send a hello command to enable specific features +func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) { + var payload []byte + + for _, feature := range features { + payload = append(payload, 0, 0) + binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature)) + } + + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.HELLO, + Key: []byte("GoMemcached"), + Body: payload, + }) + +} + +// Get the value for a key. +func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error) { + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.GET, + VBucket: vb, + Key: []byte(key), + }) +} + +// Get the xattrs, doc value for the input key +func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) { + + extraBuf, valueBuf := GetSubDocVal(subPaths) + res, err := c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.SUBDOC_MULTI_LOOKUP, + VBucket: vb, + Key: []byte(key), + Extras: extraBuf, + Body: valueBuf, + }) + + if err != nil && IfResStatusError(res) { + return res, err + } + return res, nil +} + +// Get the value for a key, and update expiry +func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) { + extraBuf := make([]byte, 4) + binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp)) + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.GAT, + VBucket: vb, + Key: []byte(key), + Extras: extraBuf, + }) +} + +// Get metadata for a key +func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) { + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.GET_META, + VBucket: vb, + Key: []byte(key), + }) +} + +// Del deletes a key. +func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error) { + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.DELETE, + VBucket: vb, + Key: []byte(key)}) +} + +// Get a random document +func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error) { + return c.Send(&gomemcached.MCRequest{ + Opcode: 0xB6, + }) +} + +// AuthList lists SASL auth mechanisms. +func (c *Client) AuthList() (*gomemcached.MCResponse, error) { + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.SASL_LIST_MECHS}) +} + +// Auth performs SASL PLAIN authentication against the server. +func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) { + res, err := c.AuthList() + + if err != nil { + return res, err + } + + authMech := string(res.Body) + if strings.Index(authMech, "PLAIN") != -1 { + return c.AuthPlain(user, pass) + } + return nil, fmt.Errorf("auth mechanism PLAIN not supported") +} + +// AuthScramSha performs SCRAM-SHA authentication against the server. +func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) { + res, err := c.AuthList() + if err != nil { + return nil, errors.Wrap(err, "Unable to obtain list of methods.") + } + + methods := string(res.Body) + method, err := scramsha.BestMethod(methods) + if err != nil { + return nil, errors.Wrap(err, + "Unable to select SCRAM-SHA method.") + } + + s, err := scramsha.NewScramSha(method) + if err != nil { + return nil, errors.Wrap(err, "Unable to initialize scramsha.") + } + + logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd) + + message, err := s.GetStartRequest(user) + if err != nil { + return nil, errors.Wrapf(err, + "Error building start request for user %s.", user) + } + + startRequest := &gomemcached.MCRequest{ + Opcode: 0x21, + Key: []byte(method), + Body: []byte(message)} + + startResponse, err := c.Send(startRequest) + if err != nil { + return nil, errors.Wrap(err, "Error sending start request.") + } + + err = s.HandleStartResponse(string(startResponse.Body)) + if err != nil { + return nil, errors.Wrap(err, "Error handling start response.") + } + + message = s.GetFinalRequest(pass) + + // send step request + finalRequest := &gomemcached.MCRequest{ + Opcode: 0x22, + Key: []byte(method), + Body: []byte(message)} + finalResponse, err := c.Send(finalRequest) + if err != nil { + return nil, errors.Wrap(err, "Error sending final request.") + } + + err = s.HandleFinalResponse(string(finalResponse.Body)) + if err != nil { + return nil, errors.Wrap(err, "Error handling final response.") + } + + return finalResponse, nil +} + +func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) { + logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd) + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.SASL_AUTH, + Key: []byte("PLAIN"), + Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))}) +} + +// select bucket +func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) { + + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.SELECT_BUCKET, + Key: []byte(fmt.Sprintf("%s", bucket))}) +} + +func (c *Client) store(opcode gomemcached.CommandCode, vb uint16, + key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) { + + req := &gomemcached.MCRequest{ + Opcode: opcode, + VBucket: vb, + Key: []byte(key), + Cas: 0, + Opaque: 0, + Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + Body: body} + + binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) + return c.Send(req) +} + +func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16, + key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) { + + req := &gomemcached.MCRequest{ + Opcode: opcode, + VBucket: vb, + Key: []byte(key), + Cas: cas, + Opaque: 0, + Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + Body: body} + + binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) + return c.Send(req) +} + +// Incr increments the value at the given key. +func (c *Client) Incr(vb uint16, key string, + amt, def uint64, exp int) (uint64, error) { + + req := &gomemcached.MCRequest{ + Opcode: gomemcached.INCREMENT, + VBucket: vb, + Key: []byte(key), + Extras: make([]byte, 8+8+4), + } + binary.BigEndian.PutUint64(req.Extras[:8], amt) + binary.BigEndian.PutUint64(req.Extras[8:16], def) + binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp)) + + resp, err := c.Send(req) + if err != nil { + return 0, err + } + + return binary.BigEndian.Uint64(resp.Body), nil +} + +// Decr decrements the value at the given key. +func (c *Client) Decr(vb uint16, key string, + amt, def uint64, exp int) (uint64, error) { + + req := &gomemcached.MCRequest{ + Opcode: gomemcached.DECREMENT, + VBucket: vb, + Key: []byte(key), + Extras: make([]byte, 8+8+4), + } + binary.BigEndian.PutUint64(req.Extras[:8], amt) + binary.BigEndian.PutUint64(req.Extras[8:16], def) + binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp)) + + resp, err := c.Send(req) + if err != nil { + return 0, err + } + + return binary.BigEndian.Uint64(resp.Body), nil +} + +// Add a value for a key (store if not exists). +func (c *Client) Add(vb uint16, key string, flags int, exp int, + body []byte) (*gomemcached.MCResponse, error) { + return c.store(gomemcached.ADD, vb, key, flags, exp, body) +} + +// Set the value for a key. +func (c *Client) Set(vb uint16, key string, flags int, exp int, + body []byte) (*gomemcached.MCResponse, error) { + return c.store(gomemcached.SET, vb, key, flags, exp, body) +} + +// SetCas set the value for a key with cas +func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, + body []byte) (*gomemcached.MCResponse, error) { + return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body) +} + +// Append data to the value of a key. +func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) { + req := &gomemcached.MCRequest{ + Opcode: gomemcached.APPEND, + VBucket: vb, + Key: []byte(key), + Cas: 0, + Opaque: 0, + Body: data} + + return c.Send(req) +} + +// GetBulk gets keys in bulk +func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error { + stopch := make(chan bool) + var wg sync.WaitGroup + + defer func() { + close(stopch) + wg.Wait() + }() + + if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) { + c.opaque = uint32(1) + } + + opStart := c.opaque + + errch := make(chan error, 2) + + wg.Add(1) + go func() { + defer func() { + if r := recover(); r != nil { + logging.Infof("Recovered in f %v", r) + } + errch <- nil + wg.Done() + }() + + ok := true + for ok { + + select { + case <-stopch: + return + default: + res, err := c.Receive() + + if err != nil && IfResStatusError(res) { + if res == nil || res.Status != gomemcached.KEY_ENOENT { + errch <- err + return + } + // continue receiving in case of KEY_ENOENT + } else if res.Opcode == gomemcached.GET || + res.Opcode == gomemcached.SUBDOC_GET || + res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP { + opaque := res.Opaque - opStart + if opaque < 0 || opaque >= uint32(len(keys)) { + // Every now and then we seem to be seeing an invalid opaque + // value returned from the server. When this happens log the error + // and the calling function will retry the bulkGet. MB-15140 + logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body)) + errch <- fmt.Errorf("Out of Bounds error") + return + } + + rv[keys[opaque]] = res + } + + if res.Opcode == gomemcached.NOOP { + ok = false + } + } + } + }() + + memcachedReqPkt := &gomemcached.MCRequest{ + Opcode: gomemcached.GET, + VBucket: vb, + } + + if len(subPaths) > 0 { + extraBuf, valueBuf := GetSubDocVal(subPaths) + memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP + memcachedReqPkt.Extras = extraBuf + memcachedReqPkt.Body = valueBuf + } + + for _, k := range keys { // Start of Get request + memcachedReqPkt.Key = []byte(k) + memcachedReqPkt.Opaque = c.opaque + + err := c.Transmit(memcachedReqPkt) + if err != nil { + logging.Errorf(" Transmit failed in GetBulkAll %v", err) + return err + } + c.opaque++ + } // End of Get request + + // finally transmit a NOOP + err := c.Transmit(&gomemcached.MCRequest{ + Opcode: gomemcached.NOOP, + VBucket: vb, + Opaque: c.opaque, + }) + + if err != nil { + logging.Errorf(" Transmit of NOOP failed %v", err) + return err + } + c.opaque++ + + return <-errch +} + +func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) { + + var ops []string + totalBytesLen := 0 + num := 1 + + for _, v := range subPaths { + totalBytesLen = totalBytesLen + len([]byte(v)) + ops = append(ops, v) + num = num + 1 + } + + // Xattr retrieval - subdoc multi get + extraBuf = append(extraBuf, uint8(0x04)) + + valueBuf = make([]byte, num*4+totalBytesLen) + + //opcode for subdoc get + op := gomemcached.SUBDOC_GET + + // Calculate path total bytes + // There are 2 ops - get xattrs - both input and $document and get whole doc + valIter := 0 + + for _, v := range ops { + pathBytes := []byte(v) + valueBuf[valIter+0] = uint8(op) + + // SubdocFlagXattrPath indicates that the path refers to + // an Xattr rather than the document body. + valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR) + + // 2 byte key + binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes))) + + // Then n bytes path + copy(valueBuf[valIter+4:], pathBytes) + valIter = valIter + 4 + len(pathBytes) + } + + return +} + +// ObservedStatus is the type reported by the Observe method +type ObservedStatus uint8 + +// Observation status values. +const ( + ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted + ObservedPersisted = ObservedStatus(0x01) // found, persisted + ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete) + ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet) +) + +// ObserveResult represents the data obtained by an Observe call +type ObserveResult struct { + Status ObservedStatus // Whether the value has been persisted/deleted + Cas uint64 // Current value's CAS + PersistenceTime time.Duration // Node's average time to persist a value + ReplicationTime time.Duration // Node's average time to replicate a value +} + +// Observe gets the persistence/replication/CAS state of a key +func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) { + // http://www.couchbase.com/wiki/display/couchbase/Observe + body := make([]byte, 4+len(key)) + binary.BigEndian.PutUint16(body[0:2], vb) + binary.BigEndian.PutUint16(body[2:4], uint16(len(key))) + copy(body[4:4+len(key)], key) + + res, err := c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.OBSERVE, + VBucket: vb, + Body: body, + }) + if err != nil { + return + } + + // Parse the response data from the body: + if len(res.Body) < 2+2+1 { + err = io.ErrUnexpectedEOF + return + } + outVb := binary.BigEndian.Uint16(res.Body[0:2]) + keyLen := binary.BigEndian.Uint16(res.Body[2:4]) + if len(res.Body) < 2+2+int(keyLen)+1+8 { + err = io.ErrUnexpectedEOF + return + } + outKey := string(res.Body[4 : 4+keyLen]) + if outVb != vb || outKey != key { + err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey) + return + } + result.Status = ObservedStatus(res.Body[4+keyLen]) + result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:]) + // The response reuses the Cas field to store time statistics: + result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond + result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond + return +} + +// CheckPersistence checks whether a stored value has been persisted to disk yet. +func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) { + switch { + case result.Status == ObservedNotFound && deletion: + persisted = true + case result.Cas != cas: + overwritten = true + case result.Status == ObservedPersisted: + persisted = true + } + return +} + +// Sequence number based Observe Implementation +type ObserveSeqResult struct { + Failover uint8 // Set to 1 if a failover took place + VbId uint16 // vbucket id + Vbuuid uint64 // vucket uuid + LastPersistedSeqNo uint64 // last persisted sequence number + CurrentSeqNo uint64 // current sequence number + OldVbuuid uint64 // Old bucket vbuuid + LastSeqNo uint64 // last sequence number received before failover +} + +func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) { + // http://www.couchbase.com/wiki/display/couchbase/Observe + body := make([]byte, 8) + binary.BigEndian.PutUint64(body[0:8], vbuuid) + + res, err := c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.OBSERVE_SEQNO, + VBucket: vb, + Body: body, + Opaque: 0x01, + }) + if err != nil { + return + } + + if res.Status != gomemcached.SUCCESS { + return nil, fmt.Errorf(" Observe returned error %v", res.Status) + } + + // Parse the response data from the body: + if len(res.Body) < (1 + 2 + 8 + 8 + 8) { + err = io.ErrUnexpectedEOF + return + } + + result = &ObserveSeqResult{} + result.Failover = res.Body[0] + result.VbId = binary.BigEndian.Uint16(res.Body[1:3]) + result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11]) + result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19]) + result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27]) + + // in case of failover processing we can have old vbuuid and the last persisted seq number + if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) { + result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35]) + result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43]) + } + + return +} + +// CasOp is the type of operation to perform on this CAS loop. +type CasOp uint8 + +const ( + // CASStore instructs the server to store the new value normally + CASStore = CasOp(iota) + // CASQuit instructs the client to stop attempting to CAS, leaving value untouched + CASQuit + // CASDelete instructs the server to delete the current value + CASDelete +) + +// User specified termination is returned as an error. +func (c CasOp) Error() string { + switch c { + case CASStore: + return "CAS store" + case CASQuit: + return "CAS quit" + case CASDelete: + return "CAS delete" + } + panic("Unhandled value") +} + +//////// CAS TRANSFORM + +// CASState tracks the state of CAS over several operations. +// +// This is used directly by CASNext and indirectly by CAS +type CASState struct { + initialized bool // false on the first call to CASNext, then true + Value []byte // Current value of key; update in place to new value + Cas uint64 // Current CAS value of key + Exists bool // Does a value exist for the key? (If not, Value will be nil) + Err error // Error, if any, after CASNext returns false + resp *gomemcached.MCResponse +} + +// CASNext is a non-callback, loop-based version of CAS method. +// +// Usage is like this: +// +// var state memcached.CASState +// for client.CASNext(vb, key, exp, &state) { +// state.Value = some_mutation(state.Value) +// } +// if state.Err != nil { ... } +func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool { + if state.initialized { + if !state.Exists { + // Adding a new key: + if state.Value == nil { + state.Cas = 0 + return false // no-op (delete of non-existent value) + } + state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value) + } else { + // Updating / deleting a key: + req := &gomemcached.MCRequest{ + Opcode: gomemcached.DELETE, + VBucket: vb, + Key: []byte(k), + Cas: state.Cas} + if state.Value != nil { + req.Opcode = gomemcached.SET + req.Opaque = 0 + req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0} + req.Body = state.Value + + flags := 0 + binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) + } + state.resp, state.Err = c.Send(req) + } + + // If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to + // get the new value (below). Otherwise, we're done (either success or failure) so return: + if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS || + state.resp.Status == gomemcached.NOT_STORED)) { + state.Cas = state.resp.Cas + return false // either success or fatal error + } + } + + // Initial call, or after a conflict: GET the current value and CAS and return them: + state.initialized = true + if state.resp, state.Err = c.Get(vb, k); state.Err == nil { + state.Exists = true + state.Value = state.resp.Body + state.Cas = state.resp.Cas + } else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT { + state.Err = nil + state.Exists = false + state.Value = nil + state.Cas = 0 + } else { + return false // fatal error + } + return true // keep going... +} + +// CasFunc is type type of function to perform a CAS transform. +// +// Input is the current value, or nil if no value exists. +// The function should return the new value (if any) to set, and the store/quit/delete operation. +type CasFunc func(current []byte) ([]byte, CasOp) + +// CAS performs a CAS transform with the given function. +// +// If the value does not exist, a nil current value will be sent to f. +func (c *Client) CAS(vb uint16, k string, f CasFunc, + initexp int) (*gomemcached.MCResponse, error) { + var state CASState + for c.CASNext(vb, k, initexp, &state) { + newValue, operation := f(state.Value) + if operation == CASQuit || (operation == CASDelete && state.Value == nil) { + return nil, operation + } + state.Value = newValue + } + return state.resp, state.Err +} + +// StatValue is one of the stats returned from the Stats method. +type StatValue struct { + // The stat key + Key string + // The stat value + Val string +} + +// Stats requests server-side stats. +// +// Use "" as the stat key for toplevel stats. +func (c *Client) Stats(key string) ([]StatValue, error) { + rv := make([]StatValue, 0, 128) + + req := &gomemcached.MCRequest{ + Opcode: gomemcached.STAT, + Key: []byte(key), + Opaque: 918494, + } + + err := c.Transmit(req) + if err != nil { + return rv, err + } + + for { + res, _, err := getResponse(c.conn, c.hdrBuf) + if err != nil { + return rv, err + } + k := string(res.Key) + if k == "" { + break + } + rv = append(rv, StatValue{ + Key: k, + Val: string(res.Body), + }) + } + return rv, nil +} + +// StatsMap requests server-side stats similarly to Stats, but returns +// them as a map. +// +// Use "" as the stat key for toplevel stats. +func (c *Client) StatsMap(key string) (map[string]string, error) { + rv := make(map[string]string) + + req := &gomemcached.MCRequest{ + Opcode: gomemcached.STAT, + Key: []byte(key), + Opaque: 918494, + } + + err := c.Transmit(req) + if err != nil { + return rv, err + } + + for { + res, _, err := getResponse(c.conn, c.hdrBuf) + if err != nil { + return rv, err + } + k := string(res.Key) + if k == "" { + break + } + rv[k] = string(res.Body) + } + + return rv, nil +} + +// instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys +// for which stats needs to be retrieved +func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error { + + // clear statsMap + for key, _ := range statsMap { + statsMap[key] = "" + } + + req := &gomemcached.MCRequest{ + Opcode: gomemcached.STAT, + Key: []byte(key), + Opaque: 918494, + } + + err := c.Transmit(req) + if err != nil { + return err + } + + for { + res, _, err := getResponse(c.conn, c.hdrBuf) + if err != nil { + return err + } + k := string(res.Key) + if k == "" { + break + } + if _, ok := statsMap[k]; ok { + statsMap[k] = string(res.Body) + } + } + + return nil +} + +// Hijack exposes the underlying connection from this client. +// +// It also marks the connection as unhealthy since the client will +// have lost control over the connection and can't otherwise verify +// things are in good shape for connection pools. +func (c *Client) Hijack() io.ReadWriteCloser { + c.setHealthy(false) + return c.conn +} + +func (c *Client) setHealthy(healthy bool) { + healthyState := UnHealthy + if healthy { + healthyState = Healthy + } + atomic.StoreUint32(&c.healthy, healthyState) +} + +func IfResStatusError(response *gomemcached.MCResponse) bool { + return response == nil || + (response.Status != gomemcached.SUBDOC_BAD_MULTI && + response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND && + response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED) +} diff --git a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go new file mode 100644 index 0000000000..fd628c5de2 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go @@ -0,0 +1,333 @@ +package memcached + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "math" + + "github.com/couchbase/gomemcached" + "github.com/couchbase/goutils/logging" +) + +// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol> + +// TapOpcode is the tap operation type (found in TapEvent) +type TapOpcode uint8 + +// Tap opcode values. +const ( + TapBeginBackfill = TapOpcode(iota) + TapEndBackfill + TapMutation + TapDeletion + TapCheckpointStart + TapCheckpointEnd + tapEndStream +) + +const tapMutationExtraLen = 16 + +var tapOpcodeNames map[TapOpcode]string + +func init() { + tapOpcodeNames = map[TapOpcode]string{ + TapBeginBackfill: "BeginBackfill", + TapEndBackfill: "EndBackfill", + TapMutation: "Mutation", + TapDeletion: "Deletion", + TapCheckpointStart: "TapCheckpointStart", + TapCheckpointEnd: "TapCheckpointEnd", + tapEndStream: "EndStream", + } +} + +func (opcode TapOpcode) String() string { + name := tapOpcodeNames[opcode] + if name == "" { + name = fmt.Sprintf("#%d", opcode) + } + return name +} + +// TapEvent is a TAP notification of an operation on the server. +type TapEvent struct { + Opcode TapOpcode // Type of event + VBucket uint16 // VBucket this event applies to + Flags uint32 // Item flags + Expiry uint32 // Item expiration time + Key, Value []byte // Item key/value + Cas uint64 +} + +func makeTapEvent(req gomemcached.MCRequest) *TapEvent { + event := TapEvent{ + VBucket: req.VBucket, + } + switch req.Opcode { + case gomemcached.TAP_MUTATION: + event.Opcode = TapMutation + event.Key = req.Key + event.Value = req.Body + event.Cas = req.Cas + case gomemcached.TAP_DELETE: + event.Opcode = TapDeletion + event.Key = req.Key + event.Cas = req.Cas + case gomemcached.TAP_CHECKPOINT_START: + event.Opcode = TapCheckpointStart + case gomemcached.TAP_CHECKPOINT_END: + event.Opcode = TapCheckpointEnd + case gomemcached.TAP_OPAQUE: + if len(req.Extras) < 8+4 { + return nil + } + switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op { + case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM: + event.Opcode = TapBeginBackfill + case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL: + event.Opcode = TapEndBackfill + case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM: + event.Opcode = tapEndStream + case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK: + return nil + case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC: + return nil + default: + logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op) + return nil // unknown opaque event + } + case gomemcached.NOOP: + return nil // ignore + default: + logging.Infof("TapFeed: Ignoring %s", req.Opcode) + return nil // unknown event + } + + if len(req.Extras) >= tapMutationExtraLen && + (event.Opcode == TapMutation || event.Opcode == TapDeletion) { + + event.Flags = binary.BigEndian.Uint32(req.Extras[8:]) + event.Expiry = binary.BigEndian.Uint32(req.Extras[12:]) + } + + return &event +} + +func (event TapEvent) String() string { + switch event.Opcode { + case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd: + return fmt.Sprintf("<TapEvent %s, vbucket=%d>", + event.Opcode, event.VBucket) + default: + return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>", + event.Opcode, event.Key, len(event.Value), + event.Flags, event.Expiry) + } +} + +// TapArguments are parameters for requesting a TAP feed. +// +// Call DefaultTapArguments to get a default one. +type TapArguments struct { + // Timestamp of oldest item to send. + // + // Use TapNoBackfill to suppress all past items. + Backfill uint64 + // If set, server will disconnect after sending existing items. + Dump bool + // The indices of the vbuckets to watch; empty/nil to watch all. + VBuckets []uint16 + // Transfers ownership of vbuckets during cluster rebalance. + Takeover bool + // If true, server will wait for client ACK after every notification. + SupportAck bool + // If true, client doesn't want values so server shouldn't send them. + KeysOnly bool + // If true, client wants the server to send checkpoint events. + Checkpoint bool + // Optional identifier to use for this client, to allow reconnects + ClientName string + // Registers this client (by name) till explicitly deregistered. + RegisteredClient bool +} + +// Value for TapArguments.Backfill denoting that no past events at all +// should be sent. +const TapNoBackfill = math.MaxUint64 + +// DefaultTapArguments returns a default set of parameter values to +// pass to StartTapFeed. +func DefaultTapArguments() TapArguments { + return TapArguments{ + Backfill: TapNoBackfill, + } +} + +func (args *TapArguments) flags() []byte { + var flags gomemcached.TapConnectFlag + if args.Backfill != 0 { + flags |= gomemcached.BACKFILL + } + if args.Dump { + flags |= gomemcached.DUMP + } + if len(args.VBuckets) > 0 { + flags |= gomemcached.LIST_VBUCKETS + } + if args.Takeover { + flags |= gomemcached.TAKEOVER_VBUCKETS + } + if args.SupportAck { + flags |= gomemcached.SUPPORT_ACK + } + if args.KeysOnly { + flags |= gomemcached.REQUEST_KEYS_ONLY + } + if args.Checkpoint { + flags |= gomemcached.CHECKPOINT + } + if args.RegisteredClient { + flags |= gomemcached.REGISTERED_CLIENT + } + encoded := make([]byte, 4) + binary.BigEndian.PutUint32(encoded, uint32(flags)) + return encoded +} + +func must(err error) { + if err != nil { + panic(err) + } +} + +func (args *TapArguments) bytes() (rv []byte) { + buf := bytes.NewBuffer([]byte{}) + + if args.Backfill > 0 { + must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill))) + } + + if len(args.VBuckets) > 0 { + must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets)))) + for i := 0; i < len(args.VBuckets); i++ { + must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i]))) + } + } + return buf.Bytes() +} + +// TapFeed represents a stream of events from a server. +type TapFeed struct { + C <-chan TapEvent + Error error + closer chan bool +} + +// StartTapFeed starts a TAP feed on a client connection. +// +// The events can be read from the returned channel. The connection +// can no longer be used for other purposes; it's now reserved for +// receiving the TAP messages. To stop receiving events, close the +// client connection. +func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) { + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.TAP_CONNECT, + Key: []byte(args.ClientName), + Extras: args.flags(), + Body: args.bytes()} + + err := mc.Transmit(rq) + if err != nil { + return nil, err + } + + ch := make(chan TapEvent) + feed := &TapFeed{ + C: ch, + closer: make(chan bool), + } + go mc.runFeed(ch, feed) + return feed, nil +} + +// TapRecvHook is called after every incoming tap packet is received. +var TapRecvHook func(*gomemcached.MCRequest, int, error) + +// Internal goroutine that reads from the socket and writes events to +// the channel +func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) { + defer close(ch) + var headerBuf [gomemcached.HDR_LEN]byte +loop: + for { + // Read the next request from the server. + // + // (Can't call mc.Receive() because it reads a + // _response_ not a request.) + var pkt gomemcached.MCRequest + n, err := pkt.Receive(mc.conn, headerBuf[:]) + if TapRecvHook != nil { + TapRecvHook(&pkt, n, err) + } + + if err != nil { + if err != io.EOF { + feed.Error = err + } + break loop + } + + //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body) + + if pkt.Opcode == gomemcached.TAP_CONNECT { + // This is not an event from the server; it's + // an error response to my connect request. + feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body) + break loop + } + + event := makeTapEvent(pkt) + if event != nil { + if event.Opcode == tapEndStream { + break loop + } + + select { + case ch <- *event: + case <-feed.closer: + break loop + } + } + + if len(pkt.Extras) >= 4 { + reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:]) + if reqFlags&gomemcached.TAP_ACK != 0 { + if _, err := mc.sendAck(&pkt); err != nil { + feed.Error = err + break loop + } + } + } + } + if err := mc.Close(); err != nil { + logging.Errorf("Error closing memcached client: %v", err) + } +} + +func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) { + res := gomemcached.MCResponse{ + Opcode: pkt.Opcode, + Opaque: pkt.Opaque, + Status: gomemcached.SUCCESS, + } + return res.Transmit(mc.conn) +} + +// Close terminates a TapFeed. +// +// Call this if you stop using a TapFeed before its channel ends. +func (feed *TapFeed) Close() { + close(feed.closer) +} diff --git a/vendor/github.com/couchbase/gomemcached/client/transport.go b/vendor/github.com/couchbase/gomemcached/client/transport.go new file mode 100644 index 0000000000..f4cea17fca --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/client/transport.go @@ -0,0 +1,67 @@ +package memcached + +import ( + "errors" + "io" + + "github.com/couchbase/gomemcached" +) + +var errNoConn = errors.New("no connection") + +// UnwrapMemcachedError converts memcached errors to normal responses. +// +// If the error is a memcached response, declare the error to be nil +// so a client can handle the status without worrying about whether it +// indicates success or failure. +func UnwrapMemcachedError(rv *gomemcached.MCResponse, + err error) (*gomemcached.MCResponse, error) { + + if rv == err { + return rv, nil + } + return rv, err +} + +// ReceiveHook is called after every packet is received (or attempted to be) +var ReceiveHook func(*gomemcached.MCResponse, int, error) + +func getResponse(s io.Reader, hdrBytes []byte) (rv *gomemcached.MCResponse, n int, err error) { + if s == nil { + return nil, 0, errNoConn + } + + rv = &gomemcached.MCResponse{} + n, err = rv.Receive(s, hdrBytes) + + if ReceiveHook != nil { + ReceiveHook(rv, n, err) + } + + if err == nil && (rv.Status != gomemcached.SUCCESS && rv.Status != gomemcached.AUTH_CONTINUE) { + err = rv + } + return rv, n, err +} + +// TransmitHook is called after each packet is transmitted. +var TransmitHook func(*gomemcached.MCRequest, int, error) + +func transmitRequest(o io.Writer, req *gomemcached.MCRequest) (int, error) { + if o == nil { + return 0, errNoConn + } + n, err := req.Transmit(o) + if TransmitHook != nil { + TransmitHook(req, n, err) + } + return n, err +} + +func transmitResponse(o io.Writer, res *gomemcached.MCResponse) (int, error) { + if o == nil { + return 0, errNoConn + } + n, err := res.Transmit(o) + return n, err +} diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go new file mode 100644 index 0000000000..dc737e6cc0 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go @@ -0,0 +1,1005 @@ +// go implementation of upr client. +// See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md +// TODO +// 1. Use a pool allocator to avoid garbage +package memcached + +import ( + "encoding/binary" + "errors" + "fmt" + "github.com/couchbase/gomemcached" + "github.com/couchbase/goutils/logging" + "strconv" + "sync" + "sync/atomic" +) + +const uprMutationExtraLen = 30 +const uprDeletetionExtraLen = 18 +const uprDeletetionWithDeletionTimeExtraLen = 21 +const uprSnapshotExtraLen = 20 +const bufferAckThreshold = 0.2 +const opaqueOpen = 0xBEAF0001 +const opaqueFailover = 0xDEADBEEF +const uprDefaultNoopInterval = 120 + +// Counter on top of opaqueOpen that others can draw from for open and control msgs +var opaqueOpenCtrlWell uint32 = opaqueOpen + +// UprEvent memcached events for UPR streams. +type UprEvent struct { + Opcode gomemcached.CommandCode // Type of event + Status gomemcached.Status // Response status + VBucket uint16 // VBucket this event applies to + DataType uint8 // data type + Opaque uint16 // 16 MSB of opaque + VBuuid uint64 // This field is set by downstream + Flags uint32 // Item flags + Expiry uint32 // Item expiration time + Key, Value []byte // Item key/value + OldValue []byte // TODO: TBD: old document value + Cas uint64 // CAS value of the item + Seqno uint64 // sequence number of the mutation + RevSeqno uint64 // rev sequence number : deletions + LockTime uint32 // Lock time + MetadataSize uint16 // Metadata size + SnapstartSeq uint64 // start sequence number of this snapshot + SnapendSeq uint64 // End sequence number of the snapshot + SnapshotType uint32 // 0: disk 1: memory + FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number + Error error // Error value in case of a failure + ExtMeta []byte + AckSize uint32 // The number of bytes that can be Acked to DCP +} + +// UprStream is per stream data structure over an UPR Connection. +type UprStream struct { + Vbucket uint16 // Vbucket id + Vbuuid uint64 // vbucket uuid + StartSeq uint64 // start sequence number + EndSeq uint64 // end sequence number + connected bool +} + +const ( + CompressionTypeStartMarker = iota // also means invalid + CompressionTypeNone = iota + CompressionTypeSnappy = iota + CompressionTypeEndMarker = iota // also means invalid +) + +// kv_engine/include/mcbp/protocol/datatype.h +const ( + JSONDataType uint8 = 1 + SnappyDataType uint8 = 2 + XattrDataType uint8 = 4 +) + +type UprFeatures struct { + Xattribute bool + CompressionType int + IncludeDeletionTime bool +} + +/** + * Used to handle multiple concurrent calls UprRequestStream() by UprFeed clients + * It is expected that a client that calls UprRequestStream() more than once should issue + * different "opaque" (version) numbers + */ +type opaqueStreamMap map[uint16]*UprStream // opaque -> stream + +type vbStreamNegotiator struct { + vbHandshakeMap map[uint16]opaqueStreamMap // vbno -> opaqueStreamMap + mutex sync.RWMutex +} + +func (negotiator *vbStreamNegotiator) initialize() { + negotiator.mutex.Lock() + negotiator.vbHandshakeMap = make(map[uint16]opaqueStreamMap) + negotiator.mutex.Unlock() +} + +func (negotiator *vbStreamNegotiator) registerRequest(vbno, opaque uint16, vbuuid, startSequence, endSequence uint64) { + negotiator.mutex.Lock() + defer negotiator.mutex.Unlock() + + var osMap opaqueStreamMap + var ok bool + if osMap, ok = negotiator.vbHandshakeMap[vbno]; !ok { + osMap = make(opaqueStreamMap) + negotiator.vbHandshakeMap[vbno] = osMap + } + + if _, ok = osMap[opaque]; !ok { + osMap[opaque] = &UprStream{ + Vbucket: vbno, + Vbuuid: vbuuid, + StartSeq: startSequence, + EndSeq: endSequence, + } + } +} + +func (negotiator *vbStreamNegotiator) getStreamsCntFromMap(vbno uint16) int { + negotiator.mutex.RLock() + defer negotiator.mutex.RUnlock() + + osmap, ok := negotiator.vbHandshakeMap[vbno] + if !ok { + return 0 + } else { + return len(osmap) + } +} + +func (negotiator *vbStreamNegotiator) getStreamFromMap(vbno, opaque uint16) (*UprStream, error) { + negotiator.mutex.RLock() + defer negotiator.mutex.RUnlock() + + osmap, ok := negotiator.vbHandshakeMap[vbno] + if !ok { + return nil, fmt.Errorf("Error: stream for vb: %v does not exist", vbno) + } + + stream, ok := osmap[opaque] + if !ok { + return nil, fmt.Errorf("Error: stream for vb: %v opaque: %v does not exist", vbno, opaque) + } + return stream, nil +} + +func (negotiator *vbStreamNegotiator) deleteStreamFromMap(vbno, opaque uint16) { + negotiator.mutex.Lock() + defer negotiator.mutex.Unlock() + + osmap, ok := negotiator.vbHandshakeMap[vbno] + if !ok { + return + } + + delete(osmap, opaque) + if len(osmap) == 0 { + delete(negotiator.vbHandshakeMap, vbno) + } +} + +func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed, + headerBuf [gomemcached.HDR_LEN]byte, pktPtr *gomemcached.MCRequest, bytesReceivedFromDCP int, + response *gomemcached.MCResponse) (*UprEvent, error) { + var event *UprEvent + + if feed == nil || response == nil || pktPtr == nil { + return nil, errors.New("Invalid inputs") + } + + // Get Stream from negotiator map + vbno := vbOpaque(response.Opaque) + opaque := appOpaque(response.Opaque) + + stream, err := negotiator.getStreamFromMap(vbno, opaque) + if err != nil { + err = fmt.Errorf("Stream not found for vb %d appOpaque %v: %#v", vbno, appOpaque, *pktPtr) + logging.Errorf(err.Error()) + return nil, err + } + + status, rb, flog, err := handleStreamRequest(response, headerBuf[:]) + + if status == gomemcached.ROLLBACK { + event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP) + event.Status = status + // rollback stream + logging.Infof("UPR_STREAMREQ with rollback %d for vb %d Failed: %v", rb, vbno, err) + negotiator.deleteStreamFromMap(vbno, opaque) + } else if status == gomemcached.SUCCESS { + event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP) + event.Seqno = stream.StartSeq + event.FailoverLog = flog + event.Status = status + feed.activateStream(vbno, opaque, stream) + feed.negotiator.deleteStreamFromMap(vbno, opaque) + logging.Infof("UPR_STREAMREQ for vb %d successful", vbno) + + } else if err != nil { + logging.Errorf("UPR_STREAMREQ for vbucket %d erro %s", vbno, err.Error()) + event = &UprEvent{ + Opcode: gomemcached.UPR_STREAMREQ, + Status: status, + VBucket: vbno, + Error: err, + } + negotiator.deleteStreamFromMap(vbno, opaque) + } + return event, nil +} + +func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) { + negotiator.mutex.Lock() + defer negotiator.mutex.Unlock() + + delete(negotiator.vbHandshakeMap, vbno) +} + +// UprFeed represents an UPR feed. A feed contains a connection to a single +// host and multiple vBuckets +type UprFeed struct { + // lock for feed.vbstreams + muVbstreams sync.RWMutex + // lock for feed.closed + muClosed sync.RWMutex + C <-chan *UprEvent // Exported channel for receiving UPR events + negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation + vbstreams map[uint16]*UprStream // official live vb->stream mapping + closer chan bool // closer + conn *Client // connection to UPR producer + Error error // error + bytesRead uint64 // total bytes read on this connection + toAckBytes uint32 // bytes client has read + maxAckBytes uint32 // Max buffer control ack bytes + stats UprStats // Stats for upr client + transmitCh chan *gomemcached.MCRequest // transmit command channel + transmitCl chan bool // closer channel for transmit go-routine + closed bool // flag indicating whether the feed has been closed + // flag indicating whether client of upr feed will send ack to upr feed + // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP + // if flag is false, upr feed will track how many bytes it has sent to client + // and use that to determine whether/when to send ack to DCP + ackByClient bool +} + +// Exported interface - to allow for mocking +type UprFeedIface interface { + Close() + Closed() bool + CloseStream(vbno, opaqueMSB uint16) error + GetError() error + GetUprStats() *UprStats + ClientAck(event *UprEvent) error + GetUprEventCh() <-chan *UprEvent + StartFeed() error + StartFeedWithConfig(datachan_len int) error + UprOpen(name string, sequence uint32, bufSize uint32) error + UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error + UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) + UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error +} + +type UprStats struct { + TotalBytes uint64 + TotalMutation uint64 + TotalBufferAckSent uint64 + TotalSnapShot uint64 +} + +// FailoverLog containing vvuid and sequnce number +type FailoverLog [][2]uint64 + +// error codes +var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog") + +func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) { + if flogp != nil { + flog := *flogp + latest := flog[len(flog)-1] + return latest[0], latest[1], nil + } + return vbuuid, seqno, ErrorInvalidLog +} + +func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent { + event := &UprEvent{ + Opcode: rq.Opcode, + VBucket: stream.Vbucket, + VBuuid: stream.Vbuuid, + Key: rq.Key, + Value: rq.Body, + Cas: rq.Cas, + ExtMeta: rq.ExtMeta, + DataType: rq.DataType, + } + + // set AckSize for events that need to be acked to DCP, + // i.e., events with CommandCodes that need to be buffered in DCP + if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok { + event.AckSize = uint32(bytesReceivedFromDCP) + } + + // 16 LSBits are used by client library to encode vbucket number. + // 16 MSBits are left for application to multiplex on opaque value. + event.Opaque = appOpaque(rq.Opaque) + + if len(rq.Extras) >= uprMutationExtraLen && + event.Opcode == gomemcached.UPR_MUTATION { + + event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) + event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20]) + event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24]) + event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28]) + event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30]) + + } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen && + event.Opcode == gomemcached.UPR_DELETION { + + event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) + event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20]) + + } else if len(rq.Extras) >= uprDeletetionExtraLen && + event.Opcode == gomemcached.UPR_DELETION || + event.Opcode == gomemcached.UPR_EXPIRATION { + + event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) + event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18]) + + } else if len(rq.Extras) >= uprSnapshotExtraLen && + event.Opcode == gomemcached.UPR_SNAPSHOT { + + event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8]) + event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20]) + } + + return event +} + +func (event *UprEvent) String() string { + name := gomemcached.CommandNames[event.Opcode] + if name == "" { + name = fmt.Sprintf("#%d", event.Opcode) + } + return name +} + +func (event *UprEvent) IsSnappyDataType() bool { + return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0) +} + +func (feed *UprFeed) sendCommands(mc *Client) { + transmitCh := feed.transmitCh + transmitCl := feed.transmitCl +loop: + for { + select { + case command := <-transmitCh: + if err := mc.Transmit(command); err != nil { + logging.Errorf("Failed to transmit command %s. Error %s", command.Opcode.String(), err.Error()) + // get feed to close and runFeed routine to exit + feed.Close() + break loop + } + + case <-transmitCl: + break loop + } + } + + // After sendCommands exits, write to transmitCh will block forever + // when we write to transmitCh, e.g., at CloseStream(), we need to check feed closure to have an exit route + + logging.Infof("sendCommands exiting") +} + +// Sets the specified stream as the connected stream for this vbno, and also cleans up negotiator +func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) error { + feed.muVbstreams.Lock() + defer feed.muVbstreams.Unlock() + + // Set this stream as the officially connected stream for this vb + stream.connected = true + feed.vbstreams[vbno] = stream + return nil +} + +func (feed *UprFeed) cleanUpVbStream(vbno uint16) { + feed.muVbstreams.Lock() + defer feed.muVbstreams.Unlock() + + delete(feed.vbstreams, vbno) +} + +// NewUprFeed creates a new UPR Feed. +// TODO: Describe side-effects on bucket instance and its connection pool. +func (mc *Client) NewUprFeed() (*UprFeed, error) { + return mc.NewUprFeedWithConfig(false /*ackByClient*/) +} + +func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) { + + feed := &UprFeed{ + conn: mc, + closer: make(chan bool, 1), + vbstreams: make(map[uint16]*UprStream), + transmitCh: make(chan *gomemcached.MCRequest), + transmitCl: make(chan bool), + ackByClient: ackByClient, + } + + feed.negotiator.initialize() + + go feed.sendCommands(mc) + return feed, nil +} + +func (mc *Client) NewUprFeedIface() (UprFeedIface, error) { + return mc.NewUprFeed() +} + +func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) { + return mc.NewUprFeedWithConfig(ackByClient) +} + +func doUprOpen(mc *Client, name string, sequence uint32, features UprFeatures) error { + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_OPEN, + Key: []byte(name), + Opaque: getUprOpenCtrlOpaque(), + } + + rq.Extras = make([]byte, 8) + binary.BigEndian.PutUint32(rq.Extras[:4], sequence) + + // opens a producer type connection + flags := gomemcached.DCP_PRODUCER + if features.Xattribute { + flags = flags | gomemcached.DCP_OPEN_INCLUDE_XATTRS + } + if features.IncludeDeletionTime { + flags = flags | gomemcached.DCP_OPEN_INCLUDE_DELETE_TIMES + } + binary.BigEndian.PutUint32(rq.Extras[4:], flags) + + return sendMcRequestSync(mc, rq) +} + +// Synchronously send a memcached request and wait for the response +func sendMcRequestSync(mc *Client, req *gomemcached.MCRequest) error { + if err := mc.Transmit(req); err != nil { + return err + } + + if res, err := mc.Receive(); err != nil { + return err + } else if req.Opcode != res.Opcode { + return fmt.Errorf("unexpected #opcode sent %v received %v", req.Opcode, res.Opaque) + } else if req.Opaque != res.Opaque { + return fmt.Errorf("opaque mismatch, sent %v received %v", req.Opaque, res.Opaque) + } else if res.Status != gomemcached.SUCCESS { + return fmt.Errorf("error %v", res.Status) + } + return nil +} + +// UprOpen to connect with a UPR producer. +// Name: name of te UPR connection +// sequence: sequence number for the connection +// bufsize: max size of the application +func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error { + var allFeaturesDisabled UprFeatures + err, _ := feed.uprOpen(name, sequence, bufSize, allFeaturesDisabled) + return err +} + +// UprOpen with XATTR enabled. +func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error { + var onlyXattrEnabled UprFeatures + onlyXattrEnabled.Xattribute = true + err, _ := feed.uprOpen(name, sequence, bufSize, onlyXattrEnabled) + return err +} + +func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) { + return feed.uprOpen(name, sequence, bufSize, features) +} + +func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) { + mc := feed.conn + + // First set this to an invalid value to state that the method hasn't gotten to executing this control yet + activatedFeatures.CompressionType = CompressionTypeEndMarker + + if err = doUprOpen(mc, name, sequence, features); err != nil { + return + } + + activatedFeatures.Xattribute = features.Xattribute + + // send a UPR control message to set the window size for the this connection + if bufSize > 0 { + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("connection_buffer_size"), + Body: []byte(strconv.Itoa(int(bufSize))), + Opaque: getUprOpenCtrlOpaque(), + } + err = sendMcRequestSync(feed.conn, rq) + if err != nil { + return + } + feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufSize)) + } + + // enable noop and set noop interval + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("enable_noop"), + Body: []byte("true"), + Opaque: getUprOpenCtrlOpaque(), + } + err = sendMcRequestSync(feed.conn, rq) + if err != nil { + return + } + + rq = &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("set_noop_interval"), + Body: []byte(strconv.Itoa(int(uprDefaultNoopInterval))), + Opaque: getUprOpenCtrlOpaque(), + } + err = sendMcRequestSync(feed.conn, rq) + if err != nil { + return + } + + if features.CompressionType == CompressionTypeSnappy { + activatedFeatures.CompressionType = CompressionTypeNone + rq = &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("force_value_compression"), + Body: []byte("true"), + Opaque: getUprOpenCtrlOpaque(), + } + err = sendMcRequestSync(feed.conn, rq) + } else if features.CompressionType == CompressionTypeEndMarker { + err = fmt.Errorf("UPR_CONTROL Failed - Invalid CompressionType: %v", features.CompressionType) + } + if err != nil { + return + } + activatedFeatures.CompressionType = features.CompressionType + + return +} + +// UprGetFailoverLog for given list of vbuckets. +func (mc *Client) UprGetFailoverLog( + vb []uint16) (map[uint16]*FailoverLog, error) { + + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_FAILOVERLOG, + Opaque: opaqueFailover, + } + + var allFeaturesDisabled UprFeatures + if err := doUprOpen(mc, "FailoverLog", 0, allFeaturesDisabled); err != nil { + return nil, fmt.Errorf("UPR_OPEN Failed %s", err.Error()) + } + + failoverLogs := make(map[uint16]*FailoverLog) + for _, vBucket := range vb { + rq.VBucket = vBucket + if err := mc.Transmit(rq); err != nil { + return nil, err + } + res, err := mc.Receive() + + if err != nil { + return nil, fmt.Errorf("failed to receive %s", err.Error()) + } else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS { + return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode) + } + + flog, err := parseFailoverLog(res.Body) + if err != nil { + return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb) + } + failoverLogs[vBucket] = flog + } + + return failoverLogs, nil +} + +// UprRequestStream for a single vbucket. +func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, + vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { + + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_STREAMREQ, + VBucket: vbno, + Opaque: composeOpaque(vbno, opaqueMSB), + } + + rq.Extras = make([]byte, 48) // #Extras + binary.BigEndian.PutUint32(rq.Extras[:4], flags) + binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0)) + binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence) + binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence) + binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid) + binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart) + binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd) + + feed.negotiator.registerRequest(vbno, opaqueMSB, vuuid, startSequence, endSequence) + // Any client that has ever called this method, regardless of return code, + // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit. + + if err := feed.conn.Transmit(rq); err != nil { + logging.Errorf("Error in StreamRequest %s", err.Error()) + // If an error occurs during transmit, then the UPRFeed will keep the stream + // in the vbstreams map. This is to prevent nil lookup from any previously + // sent stream requests. + return err + } + + return nil +} + +// CloseStream for specified vbucket. +func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error { + + err := feed.validateCloseStream(vbno) + if err != nil { + logging.Infof("CloseStream for %v has been skipped because of error %v", vbno, err) + return err + } + + closeStream := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CLOSESTREAM, + VBucket: vbno, + Opaque: composeOpaque(vbno, opaqueMSB), + } + + feed.writeToTransmitCh(closeStream) + + return nil +} + +func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent { + return feed.C +} + +func (feed *UprFeed) GetError() error { + return feed.Error +} + +func (feed *UprFeed) validateCloseStream(vbno uint16) error { + feed.muVbstreams.RLock() + nilVbStream := feed.vbstreams[vbno] == nil + feed.muVbstreams.RUnlock() + + if nilVbStream && (feed.negotiator.getStreamsCntFromMap(vbno) == 0) { + return fmt.Errorf("Stream for vb %d has not been requested", vbno) + } + + return nil +} + +func (feed *UprFeed) writeToTransmitCh(rq *gomemcached.MCRequest) error { + // write to transmitCh may block forever if sendCommands has exited + // check for feed closure to have an exit route in this case + select { + case <-feed.closer: + errMsg := fmt.Sprintf("Abort sending request to transmitCh because feed has been closed. request=%v", rq) + logging.Infof(errMsg) + return errors.New(errMsg) + case feed.transmitCh <- rq: + } + return nil +} + +// StartFeed to start the upper feed. +func (feed *UprFeed) StartFeed() error { + return feed.StartFeedWithConfig(10) +} + +func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error { + ch := make(chan *UprEvent, datachan_len) + feed.C = ch + go feed.runFeed(ch) + return nil +} + +func parseFailoverLog(body []byte) (*FailoverLog, error) { + + if len(body)%16 != 0 { + err := fmt.Errorf("invalid body length %v, in failover-log", len(body)) + return nil, err + } + log := make(FailoverLog, len(body)/16) + for i, j := 0, 0; i < len(body); i += 16 { + vuuid := binary.BigEndian.Uint64(body[i : i+8]) + seqno := binary.BigEndian.Uint64(body[i+8 : i+16]) + log[j] = [2]uint64{vuuid, seqno} + j++ + } + return &log, nil +} + +func handleStreamRequest( + res *gomemcached.MCResponse, + headerBuf []byte, +) (gomemcached.Status, uint64, *FailoverLog, error) { + + var rollback uint64 + var err error + + switch { + case res.Status == gomemcached.ROLLBACK: + logging.Infof("Rollback response. body=%v, headerBuf=%v\n", res.Body, headerBuf) + rollback = binary.BigEndian.Uint64(res.Body) + logging.Infof("Rollback seqno is %v for response with opaque %v\n", rollback, res.Opaque) + return res.Status, rollback, nil, nil + + case res.Status != gomemcached.SUCCESS: + err = fmt.Errorf("unexpected status %v for response with opaque %v", res.Status, res.Opaque) + return res.Status, 0, nil, err + } + + flog, err := parseFailoverLog(res.Body[:]) + return res.Status, rollback, flog, err +} + +// generate stream end responses for all active vb streams +func (feed *UprFeed) doStreamClose(ch chan *UprEvent) { + feed.muVbstreams.RLock() + + uprEvents := make([]*UprEvent, len(feed.vbstreams)) + index := 0 + for vbno, stream := range feed.vbstreams { + uprEvent := &UprEvent{ + VBucket: vbno, + VBuuid: stream.Vbuuid, + Opcode: gomemcached.UPR_STREAMEND, + } + uprEvents[index] = uprEvent + index++ + } + + // release the lock before sending uprEvents to ch, which may block + feed.muVbstreams.RUnlock() + +loop: + for _, uprEvent := range uprEvents { + select { + case ch <- uprEvent: + case <-feed.closer: + logging.Infof("Feed has been closed. Aborting doStreamClose.") + break loop + } + } +} + +func (feed *UprFeed) runFeed(ch chan *UprEvent) { + defer close(ch) + var headerBuf [gomemcached.HDR_LEN]byte + var pkt gomemcached.MCRequest + var event *UprEvent + + mc := feed.conn.Hijack() + uprStats := &feed.stats + +loop: + for { + select { + case <-feed.closer: + logging.Infof("Feed has been closed. Exiting.") + break loop + default: + bytes, err := pkt.Receive(mc, headerBuf[:]) + if err != nil { + logging.Errorf("Error in receive %s", err.Error()) + feed.Error = err + // send all the stream close messages to the client + feed.doStreamClose(ch) + break loop + } else { + event = nil + res := &gomemcached.MCResponse{ + Opcode: pkt.Opcode, + Cas: pkt.Cas, + Opaque: pkt.Opaque, + Status: gomemcached.Status(pkt.VBucket), + Extras: pkt.Extras, + Key: pkt.Key, + Body: pkt.Body, + } + + vb := vbOpaque(pkt.Opaque) + appOpaque := appOpaque(pkt.Opaque) + uprStats.TotalBytes = uint64(bytes) + + feed.muVbstreams.RLock() + stream := feed.vbstreams[vb] + feed.muVbstreams.RUnlock() + + switch pkt.Opcode { + case gomemcached.UPR_STREAMREQ: + event, err = feed.negotiator.handleStreamRequest(feed, headerBuf, &pkt, bytes, res) + if err != nil { + logging.Infof(err.Error()) + break loop + } + case gomemcached.UPR_MUTATION, + gomemcached.UPR_DELETION, + gomemcached.UPR_EXPIRATION: + if stream == nil { + logging.Infof("Stream not found for vb %d: %#v", vb, pkt) + break loop + } + event = makeUprEvent(pkt, stream, bytes) + uprStats.TotalMutation++ + + case gomemcached.UPR_STREAMEND: + if stream == nil { + logging.Infof("Stream not found for vb %d: %#v", vb, pkt) + break loop + } + //stream has ended + event = makeUprEvent(pkt, stream, bytes) + logging.Infof("Stream Ended for vb %d", vb) + + feed.negotiator.deleteStreamFromMap(vb, appOpaque) + feed.cleanUpVbStream(vb) + + case gomemcached.UPR_SNAPSHOT: + if stream == nil { + logging.Infof("Stream not found for vb %d: %#v", vb, pkt) + break loop + } + // snapshot marker + event = makeUprEvent(pkt, stream, bytes) + uprStats.TotalSnapShot++ + + case gomemcached.UPR_FLUSH: + if stream == nil { + logging.Infof("Stream not found for vb %d: %#v", vb, pkt) + break loop + } + // special processing for flush ? + event = makeUprEvent(pkt, stream, bytes) + + case gomemcached.UPR_CLOSESTREAM: + if stream == nil { + logging.Infof("Stream not found for vb %d: %#v", vb, pkt) + break loop + } + event = makeUprEvent(pkt, stream, bytes) + event.Opcode = gomemcached.UPR_STREAMEND // opcode re-write !! + logging.Infof("Stream Closed for vb %d StreamEnd simulated", vb) + + feed.negotiator.deleteStreamFromMap(vb, appOpaque) + feed.cleanUpVbStream(vb) + + case gomemcached.UPR_ADDSTREAM: + logging.Infof("Opcode %v not implemented", pkt.Opcode) + + case gomemcached.UPR_CONTROL, gomemcached.UPR_BUFFERACK: + if res.Status != gomemcached.SUCCESS { + logging.Infof("Opcode %v received status %d", pkt.Opcode.String(), res.Status) + } + + case gomemcached.UPR_NOOP: + // send a NOOP back + noop := &gomemcached.MCResponse{ + Opcode: gomemcached.UPR_NOOP, + Opaque: pkt.Opaque, + } + + if err := feed.conn.TransmitResponse(noop); err != nil { + logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error()) + } + default: + logging.Infof("Recived an unknown response for vbucket %d", vb) + } + } + + if event != nil { + select { + case ch <- event: + case <-feed.closer: + logging.Infof("Feed has been closed. Skip sending events. Exiting.") + break loop + } + + feed.muVbstreams.RLock() + l := len(feed.vbstreams) + feed.muVbstreams.RUnlock() + + if event.Opcode == gomemcached.UPR_CLOSESTREAM && l == 0 { + logging.Infof("No more streams") + } + } + + if !feed.ackByClient { + // if client does not ack, do the ack check now + feed.sendBufferAckIfNeeded(event) + } + } + } + + // make sure that feed is closed before we signal transmitCl and exit runFeed + feed.Close() + + close(feed.transmitCl) + logging.Infof("runFeed exiting") +} + +// Client, after completing processing of an UprEvent, need to call this API to notify UprFeed, +// so that UprFeed can update its ack bytes stats and send ack to DCP if needed +// Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work +// This API is not thread safe. Caller should NOT have more than one go rountine calling this API +func (feed *UprFeed) ClientAck(event *UprEvent) error { + if !feed.ackByClient { + return errors.New("Upr feed does not have ackByclient flag set") + } + feed.sendBufferAckIfNeeded(event) + return nil +} + +// increment ack bytes if the event needs to be acked to DCP +// send buffer ack if enough ack bytes have been accumulated +func (feed *UprFeed) sendBufferAckIfNeeded(event *UprEvent) { + if event == nil || event.AckSize == 0 { + // this indicates that there is no need to ack to DCP + return + } + + totalBytes := feed.toAckBytes + event.AckSize + if totalBytes > feed.maxAckBytes { + feed.toAckBytes = 0 + feed.sendBufferAck(totalBytes) + } else { + feed.toAckBytes = totalBytes + } +} + +// send buffer ack to dcp +func (feed *UprFeed) sendBufferAck(sendSize uint32) { + bufferAck := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_BUFFERACK, + } + bufferAck.Extras = make([]byte, 4) + binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(sendSize)) + feed.writeToTransmitCh(bufferAck) + feed.stats.TotalBufferAckSent++ +} + +func (feed *UprFeed) GetUprStats() *UprStats { + return &feed.stats +} + +func composeOpaque(vbno, opaqueMSB uint16) uint32 { + return (uint32(opaqueMSB) << 16) | uint32(vbno) +} + +func getUprOpenCtrlOpaque() uint32 { + return atomic.AddUint32(&opaqueOpenCtrlWell, 1) +} + +func appOpaque(opq32 uint32) uint16 { + return uint16((opq32 & 0xFFFF0000) >> 16) +} + +func vbOpaque(opq32 uint32) uint16 { + return uint16(opq32 & 0xFFFF) +} + +// Close this UprFeed. +func (feed *UprFeed) Close() { + feed.muClosed.Lock() + defer feed.muClosed.Unlock() + if !feed.closed { + close(feed.closer) + feed.closed = true + feed.negotiator.initialize() + } +} + +// check if the UprFeed has been closed +func (feed *UprFeed) Closed() bool { + feed.muClosed.RLock() + defer feed.muClosed.RUnlock() + return feed.closed +} diff --git a/vendor/github.com/couchbase/gomemcached/mc_constants.go b/vendor/github.com/couchbase/gomemcached/mc_constants.go new file mode 100644 index 0000000000..1d5027d16c --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/mc_constants.go @@ -0,0 +1,335 @@ +// Package gomemcached is binary protocol packet formats and constants. +package gomemcached + +import ( + "fmt" +) + +const ( + REQ_MAGIC = 0x80 + RES_MAGIC = 0x81 +) + +// CommandCode for memcached packets. +type CommandCode uint8 + +const ( + GET = CommandCode(0x00) + SET = CommandCode(0x01) + ADD = CommandCode(0x02) + REPLACE = CommandCode(0x03) + DELETE = CommandCode(0x04) + INCREMENT = CommandCode(0x05) + DECREMENT = CommandCode(0x06) + QUIT = CommandCode(0x07) + FLUSH = CommandCode(0x08) + GETQ = CommandCode(0x09) + NOOP = CommandCode(0x0a) + VERSION = CommandCode(0x0b) + GETK = CommandCode(0x0c) + GETKQ = CommandCode(0x0d) + APPEND = CommandCode(0x0e) + PREPEND = CommandCode(0x0f) + STAT = CommandCode(0x10) + SETQ = CommandCode(0x11) + ADDQ = CommandCode(0x12) + REPLACEQ = CommandCode(0x13) + DELETEQ = CommandCode(0x14) + INCREMENTQ = CommandCode(0x15) + DECREMENTQ = CommandCode(0x16) + QUITQ = CommandCode(0x17) + FLUSHQ = CommandCode(0x18) + APPENDQ = CommandCode(0x19) + AUDIT = CommandCode(0x27) + PREPENDQ = CommandCode(0x1a) + GAT = CommandCode(0x1d) + HELLO = CommandCode(0x1f) + RGET = CommandCode(0x30) + RSET = CommandCode(0x31) + RSETQ = CommandCode(0x32) + RAPPEND = CommandCode(0x33) + RAPPENDQ = CommandCode(0x34) + RPREPEND = CommandCode(0x35) + RPREPENDQ = CommandCode(0x36) + RDELETE = CommandCode(0x37) + RDELETEQ = CommandCode(0x38) + RINCR = CommandCode(0x39) + RINCRQ = CommandCode(0x3a) + RDECR = CommandCode(0x3b) + RDECRQ = CommandCode(0x3c) + + SASL_LIST_MECHS = CommandCode(0x20) + SASL_AUTH = CommandCode(0x21) + SASL_STEP = CommandCode(0x22) + + SET_VBUCKET = CommandCode(0x3d) + + TAP_CONNECT = CommandCode(0x40) // Client-sent request to initiate Tap feed + TAP_MUTATION = CommandCode(0x41) // Notification of a SET/ADD/REPLACE/etc. on the server + TAP_DELETE = CommandCode(0x42) // Notification of a DELETE on the server + TAP_FLUSH = CommandCode(0x43) // Replicates a flush_all command + TAP_OPAQUE = CommandCode(0x44) // Opaque control data from the engine + TAP_VBUCKET_SET = CommandCode(0x45) // Sets state of vbucket in receiver (used in takeover) + TAP_CHECKPOINT_START = CommandCode(0x46) // Notifies start of new checkpoint + TAP_CHECKPOINT_END = CommandCode(0x47) // Notifies end of checkpoint + + UPR_OPEN = CommandCode(0x50) // Open a UPR connection with a name + UPR_ADDSTREAM = CommandCode(0x51) // Sent by ebucketMigrator to UPR Consumer + UPR_CLOSESTREAM = CommandCode(0x52) // Sent by eBucketMigrator to UPR Consumer + UPR_FAILOVERLOG = CommandCode(0x54) // Request failover logs + UPR_STREAMREQ = CommandCode(0x53) // Stream request from consumer to producer + UPR_STREAMEND = CommandCode(0x55) // Sent by producer when it has no more messages to stream + UPR_SNAPSHOT = CommandCode(0x56) // Start of a new snapshot + UPR_MUTATION = CommandCode(0x57) // Key mutation + UPR_DELETION = CommandCode(0x58) // Key deletion + UPR_EXPIRATION = CommandCode(0x59) // Key expiration + UPR_FLUSH = CommandCode(0x5a) // Delete all the data for a vbucket + UPR_NOOP = CommandCode(0x5c) // UPR NOOP + UPR_BUFFERACK = CommandCode(0x5d) // UPR Buffer Acknowledgement + UPR_CONTROL = CommandCode(0x5e) // Set flow control params + + SELECT_BUCKET = CommandCode(0x89) // Select bucket + + OBSERVE_SEQNO = CommandCode(0x91) // Sequence Number based Observe + OBSERVE = CommandCode(0x92) + + GET_META = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc + SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs + SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta. +) + +// command codes that are counted toward DCP control buffer +// when DCP clients receive DCP messages with these command codes, they need to provide acknowledgement +var BufferedCommandCodeMap = map[CommandCode]bool{ + SET_VBUCKET: true, + UPR_STREAMEND: true, + UPR_SNAPSHOT: true, + UPR_MUTATION: true, + UPR_DELETION: true, + UPR_EXPIRATION: true} + +// Status field for memcached response. +type Status uint16 + +// Matches with protocol_binary.h as source of truth +const ( + SUCCESS = Status(0x00) + KEY_ENOENT = Status(0x01) + KEY_EEXISTS = Status(0x02) + E2BIG = Status(0x03) + EINVAL = Status(0x04) + NOT_STORED = Status(0x05) + DELTA_BADVAL = Status(0x06) + NOT_MY_VBUCKET = Status(0x07) + NO_BUCKET = Status(0x08) + LOCKED = Status(0x09) + AUTH_STALE = Status(0x1f) + AUTH_ERROR = Status(0x20) + AUTH_CONTINUE = Status(0x21) + ERANGE = Status(0x22) + ROLLBACK = Status(0x23) + EACCESS = Status(0x24) + NOT_INITIALIZED = Status(0x25) + UNKNOWN_COMMAND = Status(0x81) + ENOMEM = Status(0x82) + NOT_SUPPORTED = Status(0x83) + EINTERNAL = Status(0x84) + EBUSY = Status(0x85) + TMPFAIL = Status(0x86) + + // SUBDOC + SUBDOC_PATH_NOT_FOUND = Status(0xc0) + SUBDOC_BAD_MULTI = Status(0xcc) + SUBDOC_MULTI_PATH_FAILURE_DELETED = Status(0xd3) +) + +// for log redaction +const ( + UdTagBegin = "<ud>" + UdTagEnd = "</ud>" +) + +var isFatal = map[Status]bool{ + DELTA_BADVAL: true, + NO_BUCKET: true, + AUTH_STALE: true, + AUTH_ERROR: true, + ERANGE: true, + ROLLBACK: true, + EACCESS: true, + ENOMEM: true, + NOT_SUPPORTED: true, +} + +// the producer/consumer bit in dcp flags +var DCP_PRODUCER uint32 = 0x01 + +// the include XATTRS bit in dcp flags +var DCP_OPEN_INCLUDE_XATTRS uint32 = 0x04 + +// the include deletion time bit in dcp flags +var DCP_OPEN_INCLUDE_DELETE_TIMES uint32 = 0x20 + +// Datatype to Include XATTRS in SUBDOC GET +var SUBDOC_FLAG_XATTR uint8 = 0x04 + +// MCItem is an internal representation of an item. +type MCItem struct { + Cas uint64 + Flags, Expiration uint32 + Data []byte +} + +// Number of bytes in a binary protocol header. +const HDR_LEN = 24 + +// Mapping of CommandCode -> name of command (not exhaustive) +var CommandNames map[CommandCode]string + +// StatusNames human readable names for memcached response. +var StatusNames map[Status]string + +func init() { + CommandNames = make(map[CommandCode]string) + CommandNames[GET] = "GET" + CommandNames[SET] = "SET" + CommandNames[ADD] = "ADD" + CommandNames[REPLACE] = "REPLACE" + CommandNames[DELETE] = "DELETE" + CommandNames[INCREMENT] = "INCREMENT" + CommandNames[DECREMENT] = "DECREMENT" + CommandNames[QUIT] = "QUIT" + CommandNames[FLUSH] = "FLUSH" + CommandNames[GETQ] = "GETQ" + CommandNames[NOOP] = "NOOP" + CommandNames[VERSION] = "VERSION" + CommandNames[GETK] = "GETK" + CommandNames[GETKQ] = "GETKQ" + CommandNames[APPEND] = "APPEND" + CommandNames[PREPEND] = "PREPEND" + CommandNames[STAT] = "STAT" + CommandNames[SETQ] = "SETQ" + CommandNames[ADDQ] = "ADDQ" + CommandNames[REPLACEQ] = "REPLACEQ" + CommandNames[DELETEQ] = "DELETEQ" + CommandNames[INCREMENTQ] = "INCREMENTQ" + CommandNames[DECREMENTQ] = "DECREMENTQ" + CommandNames[QUITQ] = "QUITQ" + CommandNames[FLUSHQ] = "FLUSHQ" + CommandNames[APPENDQ] = "APPENDQ" + CommandNames[PREPENDQ] = "PREPENDQ" + CommandNames[RGET] = "RGET" + CommandNames[RSET] = "RSET" + CommandNames[RSETQ] = "RSETQ" + CommandNames[RAPPEND] = "RAPPEND" + CommandNames[RAPPENDQ] = "RAPPENDQ" + CommandNames[RPREPEND] = "RPREPEND" + CommandNames[RPREPENDQ] = "RPREPENDQ" + CommandNames[RDELETE] = "RDELETE" + CommandNames[RDELETEQ] = "RDELETEQ" + CommandNames[RINCR] = "RINCR" + CommandNames[RINCRQ] = "RINCRQ" + CommandNames[RDECR] = "RDECR" + CommandNames[RDECRQ] = "RDECRQ" + + CommandNames[SASL_LIST_MECHS] = "SASL_LIST_MECHS" + CommandNames[SASL_AUTH] = "SASL_AUTH" + CommandNames[SASL_STEP] = "SASL_STEP" + + CommandNames[TAP_CONNECT] = "TAP_CONNECT" + CommandNames[TAP_MUTATION] = "TAP_MUTATION" + CommandNames[TAP_DELETE] = "TAP_DELETE" + CommandNames[TAP_FLUSH] = "TAP_FLUSH" + CommandNames[TAP_OPAQUE] = "TAP_OPAQUE" + CommandNames[TAP_VBUCKET_SET] = "TAP_VBUCKET_SET" + CommandNames[TAP_CHECKPOINT_START] = "TAP_CHECKPOINT_START" + CommandNames[TAP_CHECKPOINT_END] = "TAP_CHECKPOINT_END" + + CommandNames[UPR_OPEN] = "UPR_OPEN" + CommandNames[UPR_ADDSTREAM] = "UPR_ADDSTREAM" + CommandNames[UPR_CLOSESTREAM] = "UPR_CLOSESTREAM" + CommandNames[UPR_FAILOVERLOG] = "UPR_FAILOVERLOG" + CommandNames[UPR_STREAMREQ] = "UPR_STREAMREQ" + CommandNames[UPR_STREAMEND] = "UPR_STREAMEND" + CommandNames[UPR_SNAPSHOT] = "UPR_SNAPSHOT" + CommandNames[UPR_MUTATION] = "UPR_MUTATION" + CommandNames[UPR_DELETION] = "UPR_DELETION" + CommandNames[UPR_EXPIRATION] = "UPR_EXPIRATION" + CommandNames[UPR_FLUSH] = "UPR_FLUSH" + CommandNames[UPR_NOOP] = "UPR_NOOP" + CommandNames[UPR_BUFFERACK] = "UPR_BUFFERACK" + CommandNames[UPR_CONTROL] = "UPR_CONTROL" + CommandNames[SUBDOC_GET] = "SUBDOC_GET" + CommandNames[SUBDOC_MULTI_LOOKUP] = "SUBDOC_MULTI_LOOKUP" + + StatusNames = make(map[Status]string) + StatusNames[SUCCESS] = "SUCCESS" + StatusNames[KEY_ENOENT] = "KEY_ENOENT" + StatusNames[KEY_EEXISTS] = "KEY_EEXISTS" + StatusNames[E2BIG] = "E2BIG" + StatusNames[EINVAL] = "EINVAL" + StatusNames[NOT_STORED] = "NOT_STORED" + StatusNames[DELTA_BADVAL] = "DELTA_BADVAL" + StatusNames[NOT_MY_VBUCKET] = "NOT_MY_VBUCKET" + StatusNames[NO_BUCKET] = "NO_BUCKET" + StatusNames[AUTH_STALE] = "AUTH_STALE" + StatusNames[AUTH_ERROR] = "AUTH_ERROR" + StatusNames[AUTH_CONTINUE] = "AUTH_CONTINUE" + StatusNames[ERANGE] = "ERANGE" + StatusNames[ROLLBACK] = "ROLLBACK" + StatusNames[EACCESS] = "EACCESS" + StatusNames[NOT_INITIALIZED] = "NOT_INITIALIZED" + StatusNames[UNKNOWN_COMMAND] = "UNKNOWN_COMMAND" + StatusNames[ENOMEM] = "ENOMEM" + StatusNames[NOT_SUPPORTED] = "NOT_SUPPORTED" + StatusNames[EINTERNAL] = "EINTERNAL" + StatusNames[EBUSY] = "EBUSY" + StatusNames[TMPFAIL] = "TMPFAIL" + StatusNames[SUBDOC_PATH_NOT_FOUND] = "SUBDOC_PATH_NOT_FOUND" + StatusNames[SUBDOC_BAD_MULTI] = "SUBDOC_BAD_MULTI" + +} + +// String an op code. +func (o CommandCode) String() (rv string) { + rv = CommandNames[o] + if rv == "" { + rv = fmt.Sprintf("0x%02x", int(o)) + } + return rv +} + +// String an op code. +func (s Status) String() (rv string) { + rv = StatusNames[s] + if rv == "" { + rv = fmt.Sprintf("0x%02x", int(s)) + } + return rv +} + +// IsQuiet will return true if a command is a "quiet" command. +func (o CommandCode) IsQuiet() bool { + switch o { + case GETQ, + GETKQ, + SETQ, + ADDQ, + REPLACEQ, + DELETEQ, + INCREMENTQ, + DECREMENTQ, + QUITQ, + FLUSHQ, + APPENDQ, + PREPENDQ, + RSETQ, + RAPPENDQ, + RPREPENDQ, + RDELETEQ, + RINCRQ, + RDECRQ: + return true + } + return false +} diff --git a/vendor/github.com/couchbase/gomemcached/mc_req.go b/vendor/github.com/couchbase/gomemcached/mc_req.go new file mode 100644 index 0000000000..3ff67ab9a7 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/mc_req.go @@ -0,0 +1,197 @@ +package gomemcached + +import ( + "encoding/binary" + "fmt" + "io" +) + +// The maximum reasonable body length to expect. +// Anything larger than this will result in an error. +// The current limit, 20MB, is the size limit supported by ep-engine. +var MaxBodyLen = int(20 * 1024 * 1024) + +// MCRequest is memcached Request +type MCRequest struct { + // The command being issued + Opcode CommandCode + // The CAS (if applicable, or 0) + Cas uint64 + // An opaque value to be returned with this request + Opaque uint32 + // The vbucket to which this command belongs + VBucket uint16 + // Command extras, key, and body + Extras, Key, Body, ExtMeta []byte + // Datatype identifier + DataType uint8 +} + +// Size gives the number of bytes this request requires. +func (req *MCRequest) Size() int { + return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta) +} + +// A debugging string representation of this request +func (req MCRequest) String() string { + return fmt.Sprintf("{MCRequest opcode=%s, bodylen=%d, key='%s'}", + req.Opcode, len(req.Body), req.Key) +} + +func (req *MCRequest) fillHeaderBytes(data []byte) int { + + pos := 0 + data[pos] = REQ_MAGIC + pos++ + data[pos] = byte(req.Opcode) + pos++ + binary.BigEndian.PutUint16(data[pos:pos+2], + uint16(len(req.Key))) + pos += 2 + + // 4 + data[pos] = byte(len(req.Extras)) + pos++ + // Data type + if req.DataType != 0 { + data[pos] = byte(req.DataType) + } + pos++ + binary.BigEndian.PutUint16(data[pos:pos+2], req.VBucket) + pos += 2 + + // 8 + binary.BigEndian.PutUint32(data[pos:pos+4], + uint32(len(req.Body)+len(req.Key)+len(req.Extras)+len(req.ExtMeta))) + pos += 4 + + // 12 + binary.BigEndian.PutUint32(data[pos:pos+4], req.Opaque) + pos += 4 + + // 16 + if req.Cas != 0 { + binary.BigEndian.PutUint64(data[pos:pos+8], req.Cas) + } + pos += 8 + + if len(req.Extras) > 0 { + copy(data[pos:pos+len(req.Extras)], req.Extras) + pos += len(req.Extras) + } + + if len(req.Key) > 0 { + copy(data[pos:pos+len(req.Key)], req.Key) + pos += len(req.Key) + } + + return pos +} + +// HeaderBytes will return the wire representation of the request header +// (with the extras and key). +func (req *MCRequest) HeaderBytes() []byte { + data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key)) + + req.fillHeaderBytes(data) + + return data +} + +// Bytes will return the wire representation of this request. +func (req *MCRequest) Bytes() []byte { + data := make([]byte, req.Size()) + + pos := req.fillHeaderBytes(data) + + if len(req.Body) > 0 { + copy(data[pos:pos+len(req.Body)], req.Body) + } + + if len(req.ExtMeta) > 0 { + copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta) + } + + return data +} + +// Transmit will send this request message across a writer. +func (req *MCRequest) Transmit(w io.Writer) (n int, err error) { + if len(req.Body) < 128 { + n, err = w.Write(req.Bytes()) + } else { + n, err = w.Write(req.HeaderBytes()) + if err == nil { + m := 0 + m, err = w.Write(req.Body) + n += m + } + } + return +} + +// Receive will fill this MCRequest with the data from a reader. +func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) { + if len(hdrBytes) < HDR_LEN { + hdrBytes = []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0} + } + n, err := io.ReadFull(r, hdrBytes) + if err != nil { + return n, err + } + + if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC { + return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) + } + + klen := int(binary.BigEndian.Uint16(hdrBytes[2:])) + elen := int(hdrBytes[4]) + // Data type at 5 + req.DataType = uint8(hdrBytes[5]) + + req.Opcode = CommandCode(hdrBytes[1]) + // Vbucket at 6:7 + req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:]) + totalBodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:])) + + req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:]) + req.Cas = binary.BigEndian.Uint64(hdrBytes[16:]) + + if totalBodyLen > 0 { + buf := make([]byte, totalBodyLen) + m, err := io.ReadFull(r, buf) + n += m + if err == nil { + if req.Opcode >= TAP_MUTATION && + req.Opcode <= TAP_CHECKPOINT_END && + len(buf) > 1 { + // In these commands there is "engine private" + // data at the end of the extras. The first 2 + // bytes of extra data give its length. + elen += int(binary.BigEndian.Uint16(buf)) + } + + req.Extras = buf[0:elen] + req.Key = buf[elen : klen+elen] + + // get the length of extended metadata + extMetaLen := 0 + if elen > 29 { + extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30])) + } + + bodyLen := totalBodyLen - klen - elen - extMetaLen + if bodyLen > MaxBodyLen { + return n, fmt.Errorf("%d is too big (max %d)", + bodyLen, MaxBodyLen) + } + + req.Body = buf[klen+elen : klen+elen+bodyLen] + req.ExtMeta = buf[klen+elen+bodyLen:] + } + } + return n, err +} diff --git a/vendor/github.com/couchbase/gomemcached/mc_res.go b/vendor/github.com/couchbase/gomemcached/mc_res.go new file mode 100644 index 0000000000..2b4cfe1349 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/mc_res.go @@ -0,0 +1,267 @@ +package gomemcached + +import ( + "encoding/binary" + "fmt" + "io" + "sync" +) + +// MCResponse is memcached response +type MCResponse struct { + // The command opcode of the command that sent the request + Opcode CommandCode + // The status of the response + Status Status + // The opaque sent in the request + Opaque uint32 + // The CAS identifier (if applicable) + Cas uint64 + // Extras, key, and body for this response + Extras, Key, Body []byte + // If true, this represents a fatal condition and we should hang up + Fatal bool + // Datatype identifier + DataType uint8 +} + +// A debugging string representation of this response +func (res MCResponse) String() string { + return fmt.Sprintf("{MCResponse status=%v keylen=%d, extralen=%d, bodylen=%d}", + res.Status, len(res.Key), len(res.Extras), len(res.Body)) +} + +// Response as an error. +func (res *MCResponse) Error() string { + return fmt.Sprintf("MCResponse status=%v, opcode=%v, opaque=%v, msg: %s", + res.Status, res.Opcode, res.Opaque, string(res.Body)) +} + +func errStatus(e error) Status { + status := Status(0xffff) + if res, ok := e.(*MCResponse); ok { + status = res.Status + } + return status +} + +// IsNotFound is true if this error represents a "not found" response. +func IsNotFound(e error) bool { + return errStatus(e) == KEY_ENOENT +} + +// IsFatal is false if this error isn't believed to be fatal to a connection. +func IsFatal(e error) bool { + if e == nil { + return false + } + _, ok := isFatal[errStatus(e)] + if ok { + return true + } + return false +} + +// Size is number of bytes this response consumes on the wire. +func (res *MCResponse) Size() int { + return HDR_LEN + len(res.Extras) + len(res.Key) + len(res.Body) +} + +func (res *MCResponse) fillHeaderBytes(data []byte) int { + pos := 0 + data[pos] = RES_MAGIC + pos++ + data[pos] = byte(res.Opcode) + pos++ + binary.BigEndian.PutUint16(data[pos:pos+2], + uint16(len(res.Key))) + pos += 2 + + // 4 + data[pos] = byte(len(res.Extras)) + pos++ + // Data type + if res.DataType != 0 { + data[pos] = byte(res.DataType) + } else { + data[pos] = 0 + } + pos++ + binary.BigEndian.PutUint16(data[pos:pos+2], uint16(res.Status)) + pos += 2 + + // 8 + binary.BigEndian.PutUint32(data[pos:pos+4], + uint32(len(res.Body)+len(res.Key)+len(res.Extras))) + pos += 4 + + // 12 + binary.BigEndian.PutUint32(data[pos:pos+4], res.Opaque) + pos += 4 + + // 16 + binary.BigEndian.PutUint64(data[pos:pos+8], res.Cas) + pos += 8 + + if len(res.Extras) > 0 { + copy(data[pos:pos+len(res.Extras)], res.Extras) + pos += len(res.Extras) + } + + if len(res.Key) > 0 { + copy(data[pos:pos+len(res.Key)], res.Key) + pos += len(res.Key) + } + + return pos +} + +// HeaderBytes will get just the header bytes for this response. +func (res *MCResponse) HeaderBytes() []byte { + data := make([]byte, HDR_LEN+len(res.Extras)+len(res.Key)) + + res.fillHeaderBytes(data) + + return data +} + +// Bytes will return the actual bytes transmitted for this response. +func (res *MCResponse) Bytes() []byte { + data := make([]byte, res.Size()) + + pos := res.fillHeaderBytes(data) + + copy(data[pos:pos+len(res.Body)], res.Body) + + return data +} + +// Transmit will send this response message across a writer. +func (res *MCResponse) Transmit(w io.Writer) (n int, err error) { + if len(res.Body) < 128 { + n, err = w.Write(res.Bytes()) + } else { + n, err = w.Write(res.HeaderBytes()) + if err == nil { + m := 0 + m, err = w.Write(res.Body) + m += n + } + } + return +} + +// Receive will fill this MCResponse with the data from this reader. +func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) { + if len(hdrBytes) < HDR_LEN { + hdrBytes = []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0} + } + n, err = io.ReadFull(r, hdrBytes) + if err != nil { + return n, err + } + + if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC { + return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) + } + + klen := int(binary.BigEndian.Uint16(hdrBytes[2:4])) + elen := int(hdrBytes[4]) + + res.Opcode = CommandCode(hdrBytes[1]) + res.DataType = uint8(hdrBytes[5]) + res.Status = Status(binary.BigEndian.Uint16(hdrBytes[6:8])) + res.Opaque = binary.BigEndian.Uint32(hdrBytes[12:16]) + res.Cas = binary.BigEndian.Uint64(hdrBytes[16:24]) + + bodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:12])) - (klen + elen) + + //defer function to debug the panic seen with MB-15557 + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf(`Panic in Receive. Response %v \n + key len %v extra len %v bodylen %v`, res, klen, elen, bodyLen) + } + }() + + buf := make([]byte, klen+elen+bodyLen) + m, err := io.ReadFull(r, buf) + if err == nil { + res.Extras = buf[0:elen] + res.Key = buf[elen : klen+elen] + res.Body = buf[klen+elen:] + } + + return n + m, err +} + +type MCResponsePool struct { + pool *sync.Pool +} + +func NewMCResponsePool() *MCResponsePool { + rv := &MCResponsePool{ + pool: &sync.Pool{ + New: func() interface{} { + return &MCResponse{} + }, + }, + } + + return rv +} + +func (this *MCResponsePool) Get() *MCResponse { + return this.pool.Get().(*MCResponse) +} + +func (this *MCResponsePool) Put(r *MCResponse) { + if r == nil { + return + } + + r.Extras = nil + r.Key = nil + r.Body = nil + r.Fatal = false + + this.pool.Put(r) +} + +type StringMCResponsePool struct { + pool *sync.Pool + size int +} + +func NewStringMCResponsePool(size int) *StringMCResponsePool { + rv := &StringMCResponsePool{ + pool: &sync.Pool{ + New: func() interface{} { + return make(map[string]*MCResponse, size) + }, + }, + size: size, + } + + return rv +} + +func (this *StringMCResponsePool) Get() map[string]*MCResponse { + return this.pool.Get().(map[string]*MCResponse) +} + +func (this *StringMCResponsePool) Put(m map[string]*MCResponse) { + if m == nil || len(m) > 2*this.size { + return + } + + for k := range m { + m[k] = nil + delete(m, k) + } + + this.pool.Put(m) +} diff --git a/vendor/github.com/couchbase/gomemcached/tap.go b/vendor/github.com/couchbase/gomemcached/tap.go new file mode 100644 index 0000000000..e48623281b --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/tap.go @@ -0,0 +1,168 @@ +package gomemcached + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "strings" +) + +type TapConnectFlag uint32 + +// Tap connect option flags +const ( + BACKFILL = TapConnectFlag(0x01) + DUMP = TapConnectFlag(0x02) + LIST_VBUCKETS = TapConnectFlag(0x04) + TAKEOVER_VBUCKETS = TapConnectFlag(0x08) + SUPPORT_ACK = TapConnectFlag(0x10) + REQUEST_KEYS_ONLY = TapConnectFlag(0x20) + CHECKPOINT = TapConnectFlag(0x40) + REGISTERED_CLIENT = TapConnectFlag(0x80) + FIX_FLAG_BYTEORDER = TapConnectFlag(0x100) +) + +// Tap opaque event subtypes +const ( + TAP_OPAQUE_ENABLE_AUTO_NACK = 0 + TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1 + TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2 + TAP_OPAQUE_CLOSE_TAP_STREAM = 7 + TAP_OPAQUE_CLOSE_BACKFILL = 8 +) + +// Tap item flags +const ( + TAP_ACK = 1 + TAP_NO_VALUE = 2 + TAP_FLAG_NETWORK_BYTE_ORDER = 4 +) + +// TapConnectFlagNames for TapConnectFlag +var TapConnectFlagNames = map[TapConnectFlag]string{ + BACKFILL: "BACKFILL", + DUMP: "DUMP", + LIST_VBUCKETS: "LIST_VBUCKETS", + TAKEOVER_VBUCKETS: "TAKEOVER_VBUCKETS", + SUPPORT_ACK: "SUPPORT_ACK", + REQUEST_KEYS_ONLY: "REQUEST_KEYS_ONLY", + CHECKPOINT: "CHECKPOINT", + REGISTERED_CLIENT: "REGISTERED_CLIENT", + FIX_FLAG_BYTEORDER: "FIX_FLAG_BYTEORDER", +} + +// TapItemParser is a function to parse a single tap extra. +type TapItemParser func(io.Reader) (interface{}, error) + +// TapParseUint64 is a function to parse a single tap uint64. +func TapParseUint64(r io.Reader) (interface{}, error) { + var rv uint64 + err := binary.Read(r, binary.BigEndian, &rv) + return rv, err +} + +// TapParseUint16 is a function to parse a single tap uint16. +func TapParseUint16(r io.Reader) (interface{}, error) { + var rv uint16 + err := binary.Read(r, binary.BigEndian, &rv) + return rv, err +} + +// TapParseBool is a function to parse a single tap boolean. +func TapParseBool(r io.Reader) (interface{}, error) { + return true, nil +} + +// TapParseVBList parses a list of vBucket numbers as []uint16. +func TapParseVBList(r io.Reader) (interface{}, error) { + num, err := TapParseUint16(r) + if err != nil { + return nil, err + } + n := int(num.(uint16)) + + rv := make([]uint16, n) + for i := 0; i < n; i++ { + x, err := TapParseUint16(r) + if err != nil { + return nil, err + } + rv[i] = x.(uint16) + } + + return rv, err +} + +// TapFlagParsers parser functions for TAP fields. +var TapFlagParsers = map[TapConnectFlag]TapItemParser{ + BACKFILL: TapParseUint64, + LIST_VBUCKETS: TapParseVBList, +} + +// SplitFlags will split the ORed flags into the individual bit flags. +func (f TapConnectFlag) SplitFlags() []TapConnectFlag { + rv := []TapConnectFlag{} + for i := uint32(1); f != 0; i = i << 1 { + if uint32(f)&i == i { + rv = append(rv, TapConnectFlag(i)) + } + f = TapConnectFlag(uint32(f) & (^i)) + } + return rv +} + +func (f TapConnectFlag) String() string { + parts := []string{} + for _, x := range f.SplitFlags() { + p := TapConnectFlagNames[x] + if p == "" { + p = fmt.Sprintf("0x%x", int(x)) + } + parts = append(parts, p) + } + return strings.Join(parts, "|") +} + +type TapConnect struct { + Flags map[TapConnectFlag]interface{} + RemainingBody []byte + Name string +} + +// ParseTapCommands parse the tap request into the interesting bits we may +// need to do something with. +func (req *MCRequest) ParseTapCommands() (TapConnect, error) { + rv := TapConnect{ + Flags: map[TapConnectFlag]interface{}{}, + Name: string(req.Key), + } + + if len(req.Extras) < 4 { + return rv, fmt.Errorf("not enough extra bytes: %x", req.Extras) + } + + flags := TapConnectFlag(binary.BigEndian.Uint32(req.Extras)) + + r := bytes.NewReader(req.Body) + + for _, f := range flags.SplitFlags() { + fun := TapFlagParsers[f] + if fun == nil { + fun = TapParseBool + } + + val, err := fun(r) + if err != nil { + return rv, err + } + + rv.Flags[f] = val + } + + var err error + rv.RemainingBody, err = ioutil.ReadAll(r) + + return rv, err +} |