diff options
author | techknowlogick <techknowlogick@gitea.io> | 2022-01-14 18:16:05 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-14 18:16:05 -0500 |
commit | 84145e45c50130922fae9055535ab5ea0378e1d4 (patch) | |
tree | fce077a5ae462840bb876ace79aca42abab29ed7 /vendor/github.com/couchbase/gomemcached | |
parent | 2b16ca7c773de278ba01f122dc6f9f43d7534c52 (diff) | |
download | gitea-84145e45c50130922fae9055535ab5ea0378e1d4.tar.gz gitea-84145e45c50130922fae9055535ab5ea0378e1d4.zip |
Remove golang vendored directory (#18277)
* rm go vendor
* fix drone yaml
* add to gitignore
Diffstat (limited to 'vendor/github.com/couchbase/gomemcached')
15 files changed, 0 insertions, 5507 deletions
diff --git a/vendor/github.com/couchbase/gomemcached/.gitignore b/vendor/github.com/couchbase/gomemcached/.gitignore deleted file mode 100644 index cd8acba17e..0000000000 --- a/vendor/github.com/couchbase/gomemcached/.gitignore +++ /dev/null @@ -1,7 +0,0 @@ -#* -*.[68] -*~ -*.swp -/gocache/gocache -c.out -.idea
\ No newline at end of file diff --git a/vendor/github.com/couchbase/gomemcached/LICENSE b/vendor/github.com/couchbase/gomemcached/LICENSE deleted file mode 100644 index b01ef80261..0000000000 --- a/vendor/github.com/couchbase/gomemcached/LICENSE +++ /dev/null @@ -1,19 +0,0 @@ -Copyright (c) 2013 Dustin Sallings - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/vendor/github.com/couchbase/gomemcached/README.markdown b/vendor/github.com/couchbase/gomemcached/README.markdown deleted file mode 100644 index 5e9b2de5be..0000000000 --- a/vendor/github.com/couchbase/gomemcached/README.markdown +++ /dev/null @@ -1,32 +0,0 @@ -# gomemcached - -This is a memcached binary protocol toolkit in [go][go]. - -It provides client and server functionality as well as a little sample -server showing how I might make a server if I valued purity over -performance. - -## Server Design - -<div> - <img src="http://dustin.github.com/images/gomemcached.png" - alt="overview" style="float: right"/> -</div> - -The basic design can be seen in [gocache]. A [storage -server][storage] is run as a goroutine that receives a `MCRequest` on -a channel, and then issues an `MCResponse` to a channel contained -within the request. - -Each connection is a separate goroutine, of course, and is responsible -for all IO for that connection until the connection drops or the -`dataServer` decides it's stupid and sends a fatal response back over -the channel. - -There is currently no work at all in making the thing perform (there -are specific areas I know need work). This is just my attempt to -learn the language somewhat. - -[go]: http://golang.org/ -[gocache]: gomemcached/blob/master/gocache/gocache.go -[storage]: gomemcached/blob/master/gocache/mc_storage.go diff --git a/vendor/github.com/couchbase/gomemcached/client/collections_filter.go b/vendor/github.com/couchbase/gomemcached/client/collections_filter.go deleted file mode 100644 index a34d353fec..0000000000 --- a/vendor/github.com/couchbase/gomemcached/client/collections_filter.go +++ /dev/null @@ -1,130 +0,0 @@ -package memcached - -import ( - "encoding/json" - "fmt" -) - -// Collection based filter -type CollectionsFilter struct { - ManifestUid uint64 - UseManifestUid bool - StreamId uint16 - UseStreamId bool - - // Use either ScopeId OR CollectionsList, not both - CollectionsList []uint32 - ScopeId uint32 -} - -type nonStreamIdNonCollectionsMeta struct { - ManifestId string `json:"uid"` -} - -type nonStreamIdNonResumeCollectionsMeta struct { - CollectionsList []string `json:"collections"` -} - -type nonStreamIdCollectionsMeta struct { - ManifestId string `json:"uid"` - CollectionsList []string `json:"collections"` -} - -type streamIdNonResumeCollectionsMeta struct { - CollectionsList []string `json:"collections"` - StreamId uint16 `json:"sid"` -} - -type streamIdNonResumeScopeMeta struct { - ScopeId string `json:"scope"` - StreamId uint16 `json:"sid"` -} - -func (c *CollectionsFilter) IsValid() error { - if c.UseManifestUid && c.UseStreamId { - return fmt.Errorf("Not implemented yet") - } - - if len(c.CollectionsList) > 0 && c.ScopeId > 0 { - return fmt.Errorf("Collection list is specified but scope ID is also specified") - } - - return nil -} - -func (c *CollectionsFilter) outputCollectionsFilterColList() (outputList []string) { - for _, collectionUint := range c.CollectionsList { - outputList = append(outputList, fmt.Sprintf("%x", collectionUint)) - } - return -} - -func (c *CollectionsFilter) outputScopeId() string { - return fmt.Sprintf("%x", c.ScopeId) -} - -func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) { - if err := c.IsValid(); err != nil { - return nil, err - } - - var output interface{} - - switch c.UseStreamId { - case true: - switch c.UseManifestUid { - case true: - // TODO - return nil, fmt.Errorf("NotImplemented0") - case false: - switch len(c.CollectionsList) > 0 { - case true: - filter := &streamIdNonResumeCollectionsMeta{ - StreamId: c.StreamId, - CollectionsList: c.outputCollectionsFilterColList(), - } - output = *filter - case false: - filter := &streamIdNonResumeScopeMeta{ - StreamId: c.StreamId, - ScopeId: c.outputScopeId(), - } - output = *filter - } - } - case false: - switch c.UseManifestUid { - case true: - switch len(c.CollectionsList) > 0 { - case true: - filter := &nonStreamIdCollectionsMeta{ - ManifestId: fmt.Sprintf("%x", c.ManifestUid), - CollectionsList: c.outputCollectionsFilterColList(), - } - output = *filter - case false: - filter := &nonStreamIdNonCollectionsMeta{ - ManifestId: fmt.Sprintf("%x", c.ManifestUid), - } - output = *filter - } - case false: - switch len(c.CollectionsList) > 0 { - case true: - filter := &nonStreamIdNonResumeCollectionsMeta{ - CollectionsList: c.outputCollectionsFilterColList(), - } - output = *filter - case false: - return nil, fmt.Errorf("Specifying scopeID must require the use of streamId") - } - } - } - - data, err := json.Marshal(output) - if err != nil { - return nil, err - } else { - return data, nil - } -} diff --git a/vendor/github.com/couchbase/gomemcached/client/mc.go b/vendor/github.com/couchbase/gomemcached/client/mc.go deleted file mode 100644 index 3dc121da5d..0000000000 --- a/vendor/github.com/couchbase/gomemcached/client/mc.go +++ /dev/null @@ -1,1515 +0,0 @@ -// Package memcached provides a memcached binary protocol client. -package memcached - -import ( - "crypto/tls" - "encoding/binary" - "fmt" - "github.com/couchbase/gomemcached" - "github.com/couchbase/goutils/logging" - "github.com/couchbase/goutils/scramsha" - "github.com/pkg/errors" - "io" - "math" - "net" - "strings" - "sync" - "sync/atomic" - "time" -) - -type ClientIface interface { - Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) - Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) - Auth(user, pass string) (*gomemcached.MCResponse, error) - AuthList() (*gomemcached.MCResponse, error) - AuthPlain(user, pass string) (*gomemcached.MCResponse, error) - AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) - CASNext(vb uint16, k string, exp int, state *CASState) bool - CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error) - CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) - CollectionEnabled() bool - Close() error - Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error) - Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) - EnableMutationToken() (*gomemcached.MCResponse, error) - EnableFeatures(features Features) (*gomemcached.MCResponse, error) - Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) - GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error) - GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error) - GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error - GetCollectionsManifest() (*gomemcached.MCResponse, error) - GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) - GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) - GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error) - Hijack() io.ReadWriteCloser - Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error) - LastBucket() string - Observe(vb uint16, key string) (result ObserveResult, err error) - ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) - Receive() (*gomemcached.MCResponse, error) - ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) - Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) - Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) - SetKeepAliveOptions(interval time.Duration) - SetReadDeadline(t time.Time) - SetDeadline(t time.Time) - SelectBucket(bucket string) (*gomemcached.MCResponse, error) - SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) - Stats(key string) ([]StatValue, error) - StatsFunc(key string, fn func(key, val []byte)) error - StatsMap(key string) (map[string]string, error) - StatsMapForSpecifiedStats(key string, statsMap map[string]string) error - Transmit(req *gomemcached.MCRequest) error - TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error - TransmitResponse(res *gomemcached.MCResponse) error - UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) - - // UprFeed Related - NewUprFeed() (*UprFeed, error) - NewUprFeedIface() (UprFeedIface, error) - NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) - NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) -} - -type ClientContext struct { - // Collection-based context - CollId uint32 - - // Impersonate context - User string - - // VB-state related context - // nil means not used in this context - VbState *VbStateType -} - -type VbStateType uint8 - -const ( - VbAlive VbStateType = 0x00 - VbActive VbStateType = 0x01 - VbReplica VbStateType = 0x02 - VbPending VbStateType = 0x03 - VbDead VbStateType = 0x04 -) - -func (context *ClientContext) InitExtras(req *gomemcached.MCRequest, client *Client) { - if req == nil || client == nil { - return - } - - var bytesToAllocate int - switch req.Opcode { - case gomemcached.GET_ALL_VB_SEQNOS: - if context.VbState != nil { - bytesToAllocate += 4 - } - if client.CollectionEnabled() { - if context.VbState == nil { - bytesToAllocate += 8 - } else { - bytesToAllocate += 4 - } - } - } - if bytesToAllocate > 0 { - req.Extras = make([]byte, bytesToAllocate) - } -} - -const bufsize = 1024 - -var UnHealthy uint32 = 0 -var Healthy uint32 = 1 - -type Features []Feature -type Feature uint16 - -const FeatureTcpNoDelay = Feature(0x03) -const FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations -const FeatureXattr = Feature(0x06) -const FeatureXerror = Feature(0x07) -const FeatureCollections = Feature(0x12) -const FeatureSnappyCompression = Feature(0x0a) -const FeatureDataType = Feature(0x0b) - -type memcachedConnection interface { - io.ReadWriteCloser - - SetReadDeadline(time.Time) error - SetDeadline(time.Time) error -} - -// The Client itself. -type Client struct { - conn memcachedConnection - // use uint32 type so that it can be accessed through atomic APIs - healthy uint32 - opaque uint32 - - hdrBuf []byte - - collectionsEnabled uint32 - deadline time.Time - bucket string -} - -var ( - DefaultDialTimeout = time.Duration(0) // No timeout - - DefaultWriteTimeout = time.Duration(0) // No timeout - - dialFun = func(prot, dest string) (net.Conn, error) { - return net.DialTimeout(prot, dest, DefaultDialTimeout) - } -) - -// Connect to a memcached server. -func Connect(prot, dest string) (rv *Client, err error) { - conn, err := dialFun(prot, dest) - if err != nil { - return nil, err - } - return Wrap(conn) -} - -// Connect to a memcached server using TLS. -func ConnectTLS(prot, dest string, config *tls.Config) (rv *Client, err error) { - conn, err := tls.Dial(prot, dest, config) - if err != nil { - return nil, err - } - return Wrap(conn) -} - -func SetDefaultTimeouts(dial, read, write time.Duration) { - DefaultDialTimeout = dial - DefaultWriteTimeout = write -} - -func SetDefaultDialTimeout(dial time.Duration) { - DefaultDialTimeout = dial -} - -func (c *Client) SetKeepAliveOptions(interval time.Duration) { - tcpConn, ok := c.conn.(*net.TCPConn) - if ok { - tcpConn.SetKeepAlive(true) - tcpConn.SetKeepAlivePeriod(interval) - } -} - -func (c *Client) SetReadDeadline(t time.Time) { - c.conn.SetReadDeadline(t) -} - -func (c *Client) SetDeadline(t time.Time) { - if t.Equal(c.deadline) { - return - } - c.conn.SetDeadline(t) - c.deadline = t -} - -func (c *Client) getOpaque() uint32 { - if c.opaque >= math.MaxInt32 { - c.opaque = uint32(1) - } - return c.opaque + 1 -} - -// Wrap an existing transport. -func Wrap(conn memcachedConnection) (rv *Client, err error) { - client := &Client{ - conn: conn, - hdrBuf: make([]byte, gomemcached.HDR_LEN), - opaque: uint32(1), - } - client.setHealthy(true) - return client, nil -} - -// Close the connection when you're done. -func (c *Client) Close() error { - return c.conn.Close() -} - -// IsHealthy returns true unless the client is belived to have -// difficulty communicating to its server. -// -// This is useful for connection pools where we want to -// non-destructively determine that a connection may be reused. -func (c Client) IsHealthy() bool { - healthyState := atomic.LoadUint32(&c.healthy) - return healthyState == Healthy -} - -// Send a custom request and get the response. -func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) { - err = c.Transmit(req) - if err != nil { - return - } - resp, _, err := getResponse(c.conn, c.hdrBuf) - c.setHealthy(!gomemcached.IsFatal(err)) - return resp, err -} - -// Transmit send a request, but does not wait for a response. -func (c *Client) Transmit(req *gomemcached.MCRequest) error { - if DefaultWriteTimeout > 0 { - c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout)) - } - _, err := transmitRequest(c.conn, req) - // clear write deadline to avoid interference with future write operations - if DefaultWriteTimeout > 0 { - c.conn.(net.Conn).SetWriteDeadline(time.Time{}) - } - if err != nil { - c.setHealthy(false) - } - return err -} - -func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error { - c.conn.(net.Conn).SetWriteDeadline(deadline) - - _, err := transmitRequest(c.conn, req) - - // clear write deadline to avoid interference with future write operations - c.conn.(net.Conn).SetWriteDeadline(time.Time{}) - - if err != nil { - c.setHealthy(false) - } - return err -} - -// TransmitResponse send a response, does not wait. -func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error { - if DefaultWriteTimeout > 0 { - c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout)) - } - _, err := transmitResponse(c.conn, res) - // clear write deadline to avoid interference with future write operations - if DefaultWriteTimeout > 0 { - c.conn.(net.Conn).SetWriteDeadline(time.Time{}) - } - if err != nil { - c.setHealthy(false) - } - return err -} - -// Receive a response -func (c *Client) Receive() (*gomemcached.MCResponse, error) { - resp, _, err := getResponse(c.conn, c.hdrBuf) - if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY { - c.setHealthy(false) - } - return resp, err -} - -func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) { - c.conn.(net.Conn).SetReadDeadline(deadline) - - resp, _, err := getResponse(c.conn, c.hdrBuf) - - // Clear read deadline to avoid interference with future read operations. - c.conn.(net.Conn).SetReadDeadline(time.Time{}) - - if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY { - c.setHealthy(false) - } - return resp, err -} - -func appendMutationToken(bytes []byte) []byte { - bytes = append(bytes, 0, 0) - binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04)) - return bytes -} - -//Send a hello command to enable MutationTokens -func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) { - var payload []byte - payload = appendMutationToken(payload) - - return c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.HELLO, - Key: []byte("GoMemcached"), - Body: payload, - }) - -} - -//Send a hello command to enable specific features -func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) { - var payload []byte - collectionsEnabled := 0 - - for _, feature := range features { - if feature == FeatureCollections { - collectionsEnabled = 1 - } - payload = append(payload, 0, 0) - binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature)) - } - - rv, err := c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.HELLO, - Key: []byte("GoMemcached"), - Body: payload, - }) - - if err == nil && collectionsEnabled != 0 { - atomic.StoreUint32(&c.collectionsEnabled, uint32(collectionsEnabled)) - } - return rv, err -} - -// Sets collection and user info for a request -func (c *Client) setContext(req *gomemcached.MCRequest, context ...*ClientContext) error { - req.CollIdLen = 0 - req.UserLen = 0 - collectionId := uint32(0) - if len(context) > 0 { - collectionId = context[0].CollId - uLen := len(context[0].User) - if uLen > 0 { - if uLen > gomemcached.MAX_USER_LEN { - uLen = gomemcached.MAX_USER_LEN - } - req.UserLen = uLen - copy(req.Username[:uLen], context[0].User) - } - } - - // if the optional collection is specified, it must be default for clients that haven't turned on collections - if atomic.LoadUint32(&c.collectionsEnabled) == 0 { - if collectionId != 0 { - return fmt.Errorf("Client does not use collections but a collection was specified") - } - } else { - req.CollIdLen = binary.PutUvarint(req.CollId[:], uint64(collectionId)) - } - return nil -} - -// Sets collection info in extras -func (c *Client) setExtrasContext(req *gomemcached.MCRequest, context ...*ClientContext) error { - collectionId := uint32(0) - req.UserLen = 0 - if len(context) > 0 { - collectionId = context[0].CollId - uLen := len(context[0].User) - if uLen > 0 { - req.UserLen = uLen - copy(req.Username[:], context[0].User) - } - } - - // if the optional collection is specified, it must be default for clients that haven't turned on collections - if atomic.LoadUint32(&c.collectionsEnabled) == 0 { - if collectionId != 0 { - return fmt.Errorf("Client does not use collections but a collection was specified") - } - } else { - req.Extras = make([]byte, 4) - binary.BigEndian.PutUint32(req.Extras, collectionId) - } - return nil -} - -func (c *Client) setVbSeqnoContext(req *gomemcached.MCRequest, context ...*ClientContext) error { - if len(context) == 0 || req == nil { - return nil - } - - switch req.Opcode { - case gomemcached.GET_ALL_VB_SEQNOS: - if len(context) == 0 { - return nil - } - - if len(req.Extras) == 0 { - context[0].InitExtras(req, c) - } - if context[0].VbState != nil { - binary.BigEndian.PutUint32(req.Extras, uint32(*(context[0].VbState))) - } - if c.CollectionEnabled() { - binary.BigEndian.PutUint32(req.Extras[4:8], context[0].CollId) - } - return nil - default: - return fmt.Errorf("setVbState Not supported for opcode: %v", req.Opcode.String()) - } -} - -// Get the value for a key. -func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.GET, - VBucket: vb, - Key: []byte(key), - Opaque: c.getOpaque(), - } - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - return c.Send(req) -} - -// Get the xattrs, doc value for the input key -func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error) { - extraBuf, valueBuf := GetSubDocVal(subPaths) - req := &gomemcached.MCRequest{ - Opcode: gomemcached.SUBDOC_MULTI_LOOKUP, - VBucket: vb, - Key: []byte(key), - Extras: extraBuf, - Body: valueBuf, - Opaque: c.getOpaque(), - } - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - - res, err := c.Send(req) - - if err != nil && IfResStatusError(res) { - return res, err - } - return res, nil -} - -// Retrieve the collections manifest. -func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error) { - - res, err := c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.GET_COLLECTIONS_MANIFEST, - Opaque: c.getOpaque(), - }) - - if err != nil && IfResStatusError(res) { - return res, err - } - return res, nil -} - -// Retrieve the collections manifest. -func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) { - - res, err := c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.COLLECTIONS_GET_CID, - Key: []byte(scope + "." + collection), - Opaque: c.getOpaque(), - }) - - if err != nil && IfResStatusError(res) { - return res, err - } - return res, nil -} - -func (c *Client) CollectionEnabled() bool { - return atomic.LoadUint32(&c.collectionsEnabled) > 0 -} - -// Get the value for a key, and update expiry -func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error) { - extraBuf := make([]byte, 4) - binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp)) - req := &gomemcached.MCRequest{ - Opcode: gomemcached.GAT, - VBucket: vb, - Key: []byte(key), - Extras: extraBuf, - Opaque: c.getOpaque(), - } - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - return c.Send(req) -} - -// Get metadata for a key -func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.GET_META, - VBucket: vb, - Key: []byte(key), - Opaque: c.getOpaque(), - } - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - return c.Send(req) -} - -// Del deletes a key. -func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.DELETE, - VBucket: vb, - Key: []byte(key), - Opaque: c.getOpaque(), - } - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - return c.Send(req) -} - -// Get a random document -func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: 0xB6, - Opaque: c.getOpaque(), - } - err := c.setExtrasContext(req, context...) - if err != nil { - return nil, err - } - return c.Send(req) -} - -// AuthList lists SASL auth mechanisms. -func (c *Client) AuthList() (*gomemcached.MCResponse, error) { - return c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.SASL_LIST_MECHS}) -} - -// Auth performs SASL PLAIN authentication against the server. -func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) { - res, err := c.AuthList() - - if err != nil { - return res, err - } - - authMech := string(res.Body) - if strings.Index(authMech, "PLAIN") != -1 { - return c.AuthPlain(user, pass) - } - return nil, fmt.Errorf("auth mechanism PLAIN not supported") -} - -// AuthScramSha performs SCRAM-SHA authentication against the server. -func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) { - res, err := c.AuthList() - if err != nil { - return nil, errors.Wrap(err, "Unable to obtain list of methods.") - } - - methods := string(res.Body) - method, err := scramsha.BestMethod(methods) - if err != nil { - return nil, errors.Wrap(err, - "Unable to select SCRAM-SHA method.") - } - - s, err := scramsha.NewScramSha(method) - if err != nil { - return nil, errors.Wrap(err, "Unable to initialize scramsha.") - } - - logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd) - - message, err := s.GetStartRequest(user) - if err != nil { - return nil, errors.Wrapf(err, - "Error building start request for user %s.", user) - } - - startRequest := &gomemcached.MCRequest{ - Opcode: 0x21, - Key: []byte(method), - Body: []byte(message)} - - startResponse, err := c.Send(startRequest) - if err != nil { - return nil, errors.Wrap(err, "Error sending start request.") - } - - err = s.HandleStartResponse(string(startResponse.Body)) - if err != nil { - return nil, errors.Wrap(err, "Error handling start response.") - } - - message = s.GetFinalRequest(pass) - - // send step request - finalRequest := &gomemcached.MCRequest{ - Opcode: 0x22, - Key: []byte(method), - Body: []byte(message)} - finalResponse, err := c.Send(finalRequest) - if err != nil { - return nil, errors.Wrap(err, "Error sending final request.") - } - - err = s.HandleFinalResponse(string(finalResponse.Body)) - if err != nil { - return nil, errors.Wrap(err, "Error handling final response.") - } - - return finalResponse, nil -} - -func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) { - logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd) - return c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.SASL_AUTH, - Key: []byte("PLAIN"), - Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))}) -} - -// select bucket -func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) { - res, err := c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.SELECT_BUCKET, - Key: []byte(bucket)}) - if res != nil { - c.bucket = bucket - } - return res, err -} - -func (c *Client) LastBucket() string { - return c.bucket -} - -func (c *Client) store(opcode gomemcached.CommandCode, vb uint16, - key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: opcode, - VBucket: vb, - Key: []byte(key), - Cas: 0, - Opaque: c.getOpaque(), - Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0}, - Body: body} - - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) - return c.Send(req) -} - -func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16, - key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: opcode, - VBucket: vb, - Key: []byte(key), - Cas: cas, - Opaque: c.getOpaque(), - Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0}, - Body: body} - - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - - binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) - return c.Send(req) -} - -// Incr increments the value at the given key. -func (c *Client) Incr(vb uint16, key string, - amt, def uint64, exp int, context ...*ClientContext) (uint64, error) { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.INCREMENT, - VBucket: vb, - Key: []byte(key), - Extras: make([]byte, 8+8+4), - } - err := c.setContext(req, context...) - if err != nil { - return 0, err - } - - binary.BigEndian.PutUint64(req.Extras[:8], amt) - binary.BigEndian.PutUint64(req.Extras[8:16], def) - binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp)) - - resp, err := c.Send(req) - if err != nil { - return 0, err - } - - return binary.BigEndian.Uint64(resp.Body), nil -} - -// Decr decrements the value at the given key. -func (c *Client) Decr(vb uint16, key string, - amt, def uint64, exp int, context ...*ClientContext) (uint64, error) { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.DECREMENT, - VBucket: vb, - Key: []byte(key), - Extras: make([]byte, 8+8+4), - } - err := c.setContext(req, context...) - if err != nil { - return 0, err - } - - binary.BigEndian.PutUint64(req.Extras[:8], amt) - binary.BigEndian.PutUint64(req.Extras[8:16], def) - binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp)) - - resp, err := c.Send(req) - if err != nil { - return 0, err - } - - return binary.BigEndian.Uint64(resp.Body), nil -} - -// Add a value for a key (store if not exists). -func (c *Client) Add(vb uint16, key string, flags int, exp int, - body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) { - return c.store(gomemcached.ADD, vb, key, flags, exp, body, context...) -} - -// Set the value for a key. -func (c *Client) Set(vb uint16, key string, flags int, exp int, - body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) { - return c.store(gomemcached.SET, vb, key, flags, exp, body, context...) -} - -// SetCas set the value for a key with cas -func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, - body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) { - return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body, context...) -} - -// Append data to the value of a key. -func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.APPEND, - VBucket: vb, - Key: []byte(key), - Cas: 0, - Opaque: c.getOpaque(), - Body: data} - - err := c.setContext(req, context...) - if err != nil { - return nil, err - } - return c.Send(req) -} - -// GetBulk gets keys in bulk -func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error { - stopch := make(chan bool) - var wg sync.WaitGroup - - defer func() { - close(stopch) - wg.Wait() - }() - - if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) { - c.opaque = uint32(1) - } - - opStart := c.opaque - - errch := make(chan error, 2) - - wg.Add(1) - go func() { - defer func() { - if r := recover(); r != nil { - logging.Infof("Recovered in f %v", r) - } - errch <- nil - wg.Done() - }() - - ok := true - for ok { - - select { - case <-stopch: - return - default: - res, err := c.Receive() - - if err != nil && IfResStatusError(res) { - if res == nil || res.Status != gomemcached.KEY_ENOENT { - errch <- err - return - } - // continue receiving in case of KEY_ENOENT - } else if res.Opcode == gomemcached.GET || - res.Opcode == gomemcached.SUBDOC_GET || - res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP { - opaque := res.Opaque - opStart - if opaque < 0 || opaque >= uint32(len(keys)) { - // Every now and then we seem to be seeing an invalid opaque - // value returned from the server. When this happens log the error - // and the calling function will retry the bulkGet. MB-15140 - logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body)) - errch <- fmt.Errorf("Out of Bounds error") - return - } - - rv[keys[opaque]] = res - } - - if res.Opcode == gomemcached.NOOP { - ok = false - } - } - } - }() - - memcachedReqPkt := &gomemcached.MCRequest{ - Opcode: gomemcached.GET, - VBucket: vb, - } - err := c.setContext(memcachedReqPkt, context...) - if err != nil { - return err - } - - if len(subPaths) > 0 { - extraBuf, valueBuf := GetSubDocVal(subPaths) - memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP - memcachedReqPkt.Extras = extraBuf - memcachedReqPkt.Body = valueBuf - } - - for _, k := range keys { // Start of Get request - memcachedReqPkt.Key = []byte(k) - memcachedReqPkt.Opaque = c.opaque - - err := c.Transmit(memcachedReqPkt) - if err != nil { - logging.Errorf(" Transmit failed in GetBulkAll %v", err) - return err - } - c.opaque++ - } // End of Get request - - // finally transmit a NOOP - err = c.Transmit(&gomemcached.MCRequest{ - Opcode: gomemcached.NOOP, - VBucket: vb, - Opaque: c.opaque, - }) - - if err != nil { - logging.Errorf(" Transmit of NOOP failed %v", err) - return err - } - c.opaque++ - - return <-errch -} - -func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) { - - var ops []string - totalBytesLen := 0 - num := 1 - - for _, v := range subPaths { - totalBytesLen = totalBytesLen + len([]byte(v)) - ops = append(ops, v) - num = num + 1 - } - - // Xattr retrieval - subdoc multi get - // Set deleted true only if it is not expiration - if len(subPaths) != 1 || subPaths[0] != "$document.exptime" { - extraBuf = append(extraBuf, uint8(0x04)) - } - - valueBuf = make([]byte, num*4+totalBytesLen) - - //opcode for subdoc get - op := gomemcached.SUBDOC_GET - - // Calculate path total bytes - // There are 2 ops - get xattrs - both input and $document and get whole doc - valIter := 0 - - for _, v := range ops { - pathBytes := []byte(v) - valueBuf[valIter+0] = uint8(op) - - // SubdocFlagXattrPath indicates that the path refers to - // an Xattr rather than the document body. - valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR) - - // 2 byte key - binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes))) - - // Then n bytes path - copy(valueBuf[valIter+4:], pathBytes) - valIter = valIter + 4 + len(pathBytes) - } - - return -} - -// ObservedStatus is the type reported by the Observe method -type ObservedStatus uint8 - -// Observation status values. -const ( - ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted - ObservedPersisted = ObservedStatus(0x01) // found, persisted - ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete) - ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet) -) - -// ObserveResult represents the data obtained by an Observe call -type ObserveResult struct { - Status ObservedStatus // Whether the value has been persisted/deleted - Cas uint64 // Current value's CAS - PersistenceTime time.Duration // Node's average time to persist a value - ReplicationTime time.Duration // Node's average time to replicate a value -} - -// Observe gets the persistence/replication/CAS state of a key -func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) { - // http://www.couchbase.com/wiki/display/couchbase/Observe - body := make([]byte, 4+len(key)) - binary.BigEndian.PutUint16(body[0:2], vb) - binary.BigEndian.PutUint16(body[2:4], uint16(len(key))) - copy(body[4:4+len(key)], key) - - res, err := c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.OBSERVE, - VBucket: vb, - Body: body, - }) - if err != nil { - return - } - - // Parse the response data from the body: - if len(res.Body) < 2+2+1 { - err = io.ErrUnexpectedEOF - return - } - outVb := binary.BigEndian.Uint16(res.Body[0:2]) - keyLen := binary.BigEndian.Uint16(res.Body[2:4]) - if len(res.Body) < 2+2+int(keyLen)+1+8 { - err = io.ErrUnexpectedEOF - return - } - outKey := string(res.Body[4 : 4+keyLen]) - if outVb != vb || outKey != key { - err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey) - return - } - result.Status = ObservedStatus(res.Body[4+keyLen]) - result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:]) - // The response reuses the Cas field to store time statistics: - result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond - result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond - return -} - -// CheckPersistence checks whether a stored value has been persisted to disk yet. -func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) { - switch { - case result.Status == ObservedNotFound && deletion: - persisted = true - case result.Cas != cas: - overwritten = true - case result.Status == ObservedPersisted: - persisted = true - } - return -} - -// Sequence number based Observe Implementation -type ObserveSeqResult struct { - Failover uint8 // Set to 1 if a failover took place - VbId uint16 // vbucket id - Vbuuid uint64 // vucket uuid - LastPersistedSeqNo uint64 // last persisted sequence number - CurrentSeqNo uint64 // current sequence number - OldVbuuid uint64 // Old bucket vbuuid - LastSeqNo uint64 // last sequence number received before failover -} - -func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) { - // http://www.couchbase.com/wiki/display/couchbase/Observe - body := make([]byte, 8) - binary.BigEndian.PutUint64(body[0:8], vbuuid) - - res, err := c.Send(&gomemcached.MCRequest{ - Opcode: gomemcached.OBSERVE_SEQNO, - VBucket: vb, - Body: body, - Opaque: 0x01, - }) - if err != nil { - return - } - - if res.Status != gomemcached.SUCCESS { - return nil, fmt.Errorf(" Observe returned error %v", res.Status) - } - - // Parse the response data from the body: - if len(res.Body) < (1 + 2 + 8 + 8 + 8) { - err = io.ErrUnexpectedEOF - return - } - - result = &ObserveSeqResult{} - result.Failover = res.Body[0] - result.VbId = binary.BigEndian.Uint16(res.Body[1:3]) - result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11]) - result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19]) - result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27]) - - // in case of failover processing we can have old vbuuid and the last persisted seq number - if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) { - result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35]) - result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43]) - } - - return -} - -// CasOp is the type of operation to perform on this CAS loop. -type CasOp uint8 - -const ( - // CASStore instructs the server to store the new value normally - CASStore = CasOp(iota) - // CASQuit instructs the client to stop attempting to CAS, leaving value untouched - CASQuit - // CASDelete instructs the server to delete the current value - CASDelete -) - -// User specified termination is returned as an error. -func (c CasOp) Error() string { - switch c { - case CASStore: - return "CAS store" - case CASQuit: - return "CAS quit" - case CASDelete: - return "CAS delete" - } - panic("Unhandled value") -} - -//////// CAS TRANSFORM - -// CASState tracks the state of CAS over several operations. -// -// This is used directly by CASNext and indirectly by CAS -type CASState struct { - initialized bool // false on the first call to CASNext, then true - Value []byte // Current value of key; update in place to new value - Cas uint64 // Current CAS value of key - Exists bool // Does a value exist for the key? (If not, Value will be nil) - Err error // Error, if any, after CASNext returns false - resp *gomemcached.MCResponse -} - -// CASNext is a non-callback, loop-based version of CAS method. -// -// Usage is like this: -// -// var state memcached.CASState -// for client.CASNext(vb, key, exp, &state) { -// state.Value = some_mutation(state.Value) -// } -// if state.Err != nil { ... } -func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool { - if state.initialized { - if !state.Exists { - // Adding a new key: - if state.Value == nil { - state.Cas = 0 - return false // no-op (delete of non-existent value) - } - state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value) - } else { - // Updating / deleting a key: - req := &gomemcached.MCRequest{ - Opcode: gomemcached.DELETE, - VBucket: vb, - Key: []byte(k), - Cas: state.Cas} - if state.Value != nil { - req.Opcode = gomemcached.SET - req.Opaque = 0 - req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0} - req.Body = state.Value - - flags := 0 - binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) - } - state.resp, state.Err = c.Send(req) - } - - // If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to - // get the new value (below). Otherwise, we're done (either success or failure) so return: - if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS || - state.resp.Status == gomemcached.NOT_STORED)) { - state.Cas = state.resp.Cas - return false // either success or fatal error - } - } - - // Initial call, or after a conflict: GET the current value and CAS and return them: - state.initialized = true - if state.resp, state.Err = c.Get(vb, k); state.Err == nil { - state.Exists = true - state.Value = state.resp.Body - state.Cas = state.resp.Cas - } else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT { - state.Err = nil - state.Exists = false - state.Value = nil - state.Cas = 0 - } else { - return false // fatal error - } - return true // keep going... -} - -// CasFunc is type type of function to perform a CAS transform. -// -// Input is the current value, or nil if no value exists. -// The function should return the new value (if any) to set, and the store/quit/delete operation. -type CasFunc func(current []byte) ([]byte, CasOp) - -// CAS performs a CAS transform with the given function. -// -// If the value does not exist, a nil current value will be sent to f. -func (c *Client) CAS(vb uint16, k string, f CasFunc, - initexp int) (*gomemcached.MCResponse, error) { - var state CASState - for c.CASNext(vb, k, initexp, &state) { - newValue, operation := f(state.Value) - if operation == CASQuit || (operation == CASDelete && state.Value == nil) { - return nil, operation - } - state.Value = newValue - } - return state.resp, state.Err -} - -// StatValue is one of the stats returned from the Stats method. -type StatValue struct { - // The stat key - Key string - // The stat value - Val string -} - -// Stats requests server-side stats. -// -// Use "" as the stat key for toplevel stats. -func (c *Client) Stats(key string) ([]StatValue, error) { - rv := make([]StatValue, 0, 128) - - req := &gomemcached.MCRequest{ - Opcode: gomemcached.STAT, - Key: []byte(key), - Opaque: 918494, - } - - err := c.Transmit(req) - if err != nil { - return rv, err - } - - for { - res, _, err := getResponse(c.conn, c.hdrBuf) - if err != nil { - return rv, err - } - k := string(res.Key) - if k == "" { - break - } - rv = append(rv, StatValue{ - Key: k, - Val: string(res.Body), - }) - } - return rv, nil -} - -// Stats requests server-side stats. -// -// Use "" as the stat key for toplevel stats. -func (c *Client) StatsFunc(key string, fn func(key, val []byte)) error { - req := &gomemcached.MCRequest{ - Opcode: gomemcached.STAT, - Key: []byte(key), - Opaque: 918494, - } - - err := c.Transmit(req) - if err != nil { - return err - } - - for { - res, _, err := getResponse(c.conn, c.hdrBuf) - if err != nil { - return err - } - if len(res.Key) == 0 { - break - } - fn(res.Key, res.Body) - } - return nil -} - -// StatsMap requests server-side stats similarly to Stats, but returns -// them as a map. -// -// Use "" as the stat key for toplevel stats. -func (c *Client) StatsMap(key string) (map[string]string, error) { - rv := make(map[string]string) - - req := &gomemcached.MCRequest{ - Opcode: gomemcached.STAT, - Key: []byte(key), - Opaque: 918494, - } - - err := c.Transmit(req) - if err != nil { - return rv, err - } - - for { - res, _, err := getResponse(c.conn, c.hdrBuf) - if err != nil { - return rv, err - } - k := string(res.Key) - if k == "" { - break - } - rv[k] = string(res.Body) - } - - return rv, nil -} - -// instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys -// for which stats needs to be retrieved -func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error { - - // clear statsMap - for key, _ := range statsMap { - statsMap[key] = "" - } - - req := &gomemcached.MCRequest{ - Opcode: gomemcached.STAT, - Key: []byte(key), - Opaque: 918494, - } - - err := c.Transmit(req) - if err != nil { - return err - } - - for { - res, _, err := getResponse(c.conn, c.hdrBuf) - if err != nil { - return err - } - k := string(res.Key) - if k == "" { - break - } - if _, ok := statsMap[k]; ok { - statsMap[k] = string(res.Body) - } - } - - return nil -} - -// UprGetFailoverLog for given list of vbuckets. -func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) { - - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_FAILOVERLOG, - Opaque: opaqueFailover, - } - - failoverLogs := make(map[uint16]*FailoverLog) - for _, vBucket := range vb { - rq.VBucket = vBucket - if err := mc.Transmit(rq); err != nil { - return nil, err - } - res, err := mc.Receive() - - if err != nil { - return nil, fmt.Errorf("failed to receive %s", err.Error()) - } else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS { - return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode) - } - - flog, err := parseFailoverLog(res.Body) - if err != nil { - return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb) - } - failoverLogs[vBucket] = flog - } - - return failoverLogs, nil -} - -// Hijack exposes the underlying connection from this client. -// -// It also marks the connection as unhealthy since the client will -// have lost control over the connection and can't otherwise verify -// things are in good shape for connection pools. -func (c *Client) Hijack() io.ReadWriteCloser { - c.setHealthy(false) - return c.conn -} - -func (c *Client) setHealthy(healthy bool) { - healthyState := UnHealthy - if healthy { - healthyState = Healthy - } - atomic.StoreUint32(&c.healthy, healthyState) -} - -func IfResStatusError(response *gomemcached.MCResponse) bool { - return response == nil || - (response.Status != gomemcached.SUBDOC_BAD_MULTI && - response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND && - response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED) -} - -func (c *Client) Conn() io.ReadWriteCloser { - return c.conn -} - -// Since the binary request supports only a single collection at a time, it is possible -// that this may be called multiple times in succession by callers to get vbSeqnos for -// multiple collections. Thus, caller could pass in a non-nil map so the gomemcached -// client won't need to allocate new map for each call to prevent too much GC -// NOTE: If collection is enabled and context is not given, KV will still return stats for default collection -func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error) { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.GET_ALL_VB_SEQNOS, - Opaque: opaqueGetSeqno, - } - - err := c.setVbSeqnoContext(rq, context...) - if err != nil { - return vbSeqnoMap, err - } - - err = c.Transmit(rq) - if err != nil { - return vbSeqnoMap, err - } - - res, err := c.Receive() - if err != nil { - return vbSeqnoMap, fmt.Errorf("failed to receive: %v", err) - } - - vbSeqnosList, err := parseGetSeqnoResp(res.Body) - if err != nil { - logging.Errorf("Unable to parse : err: %v\n", err) - return vbSeqnoMap, err - } - - if vbSeqnoMap == nil { - vbSeqnoMap = make(map[uint16]uint64) - } - - combineMapWithReturnedList(vbSeqnoMap, vbSeqnosList) - return vbSeqnoMap, nil -} - -func combineMapWithReturnedList(vbSeqnoMap map[uint16]uint64, list *VBSeqnos) { - if list == nil { - return - } - - // If the map contains exactly the existing vbs in the list, no need to modify - needToCleanupMap := true - if len(vbSeqnoMap) == 0 { - needToCleanupMap = false - } else if len(vbSeqnoMap) == len(*list) { - needToCleanupMap = false - for _, pair := range *list { - _, vbExists := vbSeqnoMap[uint16(pair[0])] - if !vbExists { - needToCleanupMap = true - break - } - } - } - - if needToCleanupMap { - var vbsToDelete []uint16 - for vbInSeqnoMap, _ := range vbSeqnoMap { - // If a vb in the seqno map doesn't exist in the returned list, need to clean up - // to ensure returning an accurate result - found := false - var vbno uint16 - for _, pair := range *list { - vbno = uint16(pair[0]) - if vbno == vbInSeqnoMap { - found = true - break - } else if vbno > vbInSeqnoMap { - // definitely not in the list - break - } - } - if !found { - vbsToDelete = append(vbsToDelete, vbInSeqnoMap) - } - } - - for _, vbno := range vbsToDelete { - delete(vbSeqnoMap, vbno) - } - } - - // Set the map with data from the list - for _, pair := range *list { - vbno := uint16(pair[0]) - seqno := pair[1] - vbSeqnoMap[vbno] = seqno - } -} diff --git a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go deleted file mode 100644 index fd628c5de2..0000000000 --- a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go +++ /dev/null @@ -1,333 +0,0 @@ -package memcached - -import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "math" - - "github.com/couchbase/gomemcached" - "github.com/couchbase/goutils/logging" -) - -// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol> - -// TapOpcode is the tap operation type (found in TapEvent) -type TapOpcode uint8 - -// Tap opcode values. -const ( - TapBeginBackfill = TapOpcode(iota) - TapEndBackfill - TapMutation - TapDeletion - TapCheckpointStart - TapCheckpointEnd - tapEndStream -) - -const tapMutationExtraLen = 16 - -var tapOpcodeNames map[TapOpcode]string - -func init() { - tapOpcodeNames = map[TapOpcode]string{ - TapBeginBackfill: "BeginBackfill", - TapEndBackfill: "EndBackfill", - TapMutation: "Mutation", - TapDeletion: "Deletion", - TapCheckpointStart: "TapCheckpointStart", - TapCheckpointEnd: "TapCheckpointEnd", - tapEndStream: "EndStream", - } -} - -func (opcode TapOpcode) String() string { - name := tapOpcodeNames[opcode] - if name == "" { - name = fmt.Sprintf("#%d", opcode) - } - return name -} - -// TapEvent is a TAP notification of an operation on the server. -type TapEvent struct { - Opcode TapOpcode // Type of event - VBucket uint16 // VBucket this event applies to - Flags uint32 // Item flags - Expiry uint32 // Item expiration time - Key, Value []byte // Item key/value - Cas uint64 -} - -func makeTapEvent(req gomemcached.MCRequest) *TapEvent { - event := TapEvent{ - VBucket: req.VBucket, - } - switch req.Opcode { - case gomemcached.TAP_MUTATION: - event.Opcode = TapMutation - event.Key = req.Key - event.Value = req.Body - event.Cas = req.Cas - case gomemcached.TAP_DELETE: - event.Opcode = TapDeletion - event.Key = req.Key - event.Cas = req.Cas - case gomemcached.TAP_CHECKPOINT_START: - event.Opcode = TapCheckpointStart - case gomemcached.TAP_CHECKPOINT_END: - event.Opcode = TapCheckpointEnd - case gomemcached.TAP_OPAQUE: - if len(req.Extras) < 8+4 { - return nil - } - switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op { - case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM: - event.Opcode = TapBeginBackfill - case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL: - event.Opcode = TapEndBackfill - case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM: - event.Opcode = tapEndStream - case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK: - return nil - case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC: - return nil - default: - logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op) - return nil // unknown opaque event - } - case gomemcached.NOOP: - return nil // ignore - default: - logging.Infof("TapFeed: Ignoring %s", req.Opcode) - return nil // unknown event - } - - if len(req.Extras) >= tapMutationExtraLen && - (event.Opcode == TapMutation || event.Opcode == TapDeletion) { - - event.Flags = binary.BigEndian.Uint32(req.Extras[8:]) - event.Expiry = binary.BigEndian.Uint32(req.Extras[12:]) - } - - return &event -} - -func (event TapEvent) String() string { - switch event.Opcode { - case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd: - return fmt.Sprintf("<TapEvent %s, vbucket=%d>", - event.Opcode, event.VBucket) - default: - return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>", - event.Opcode, event.Key, len(event.Value), - event.Flags, event.Expiry) - } -} - -// TapArguments are parameters for requesting a TAP feed. -// -// Call DefaultTapArguments to get a default one. -type TapArguments struct { - // Timestamp of oldest item to send. - // - // Use TapNoBackfill to suppress all past items. - Backfill uint64 - // If set, server will disconnect after sending existing items. - Dump bool - // The indices of the vbuckets to watch; empty/nil to watch all. - VBuckets []uint16 - // Transfers ownership of vbuckets during cluster rebalance. - Takeover bool - // If true, server will wait for client ACK after every notification. - SupportAck bool - // If true, client doesn't want values so server shouldn't send them. - KeysOnly bool - // If true, client wants the server to send checkpoint events. - Checkpoint bool - // Optional identifier to use for this client, to allow reconnects - ClientName string - // Registers this client (by name) till explicitly deregistered. - RegisteredClient bool -} - -// Value for TapArguments.Backfill denoting that no past events at all -// should be sent. -const TapNoBackfill = math.MaxUint64 - -// DefaultTapArguments returns a default set of parameter values to -// pass to StartTapFeed. -func DefaultTapArguments() TapArguments { - return TapArguments{ - Backfill: TapNoBackfill, - } -} - -func (args *TapArguments) flags() []byte { - var flags gomemcached.TapConnectFlag - if args.Backfill != 0 { - flags |= gomemcached.BACKFILL - } - if args.Dump { - flags |= gomemcached.DUMP - } - if len(args.VBuckets) > 0 { - flags |= gomemcached.LIST_VBUCKETS - } - if args.Takeover { - flags |= gomemcached.TAKEOVER_VBUCKETS - } - if args.SupportAck { - flags |= gomemcached.SUPPORT_ACK - } - if args.KeysOnly { - flags |= gomemcached.REQUEST_KEYS_ONLY - } - if args.Checkpoint { - flags |= gomemcached.CHECKPOINT - } - if args.RegisteredClient { - flags |= gomemcached.REGISTERED_CLIENT - } - encoded := make([]byte, 4) - binary.BigEndian.PutUint32(encoded, uint32(flags)) - return encoded -} - -func must(err error) { - if err != nil { - panic(err) - } -} - -func (args *TapArguments) bytes() (rv []byte) { - buf := bytes.NewBuffer([]byte{}) - - if args.Backfill > 0 { - must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill))) - } - - if len(args.VBuckets) > 0 { - must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets)))) - for i := 0; i < len(args.VBuckets); i++ { - must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i]))) - } - } - return buf.Bytes() -} - -// TapFeed represents a stream of events from a server. -type TapFeed struct { - C <-chan TapEvent - Error error - closer chan bool -} - -// StartTapFeed starts a TAP feed on a client connection. -// -// The events can be read from the returned channel. The connection -// can no longer be used for other purposes; it's now reserved for -// receiving the TAP messages. To stop receiving events, close the -// client connection. -func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.TAP_CONNECT, - Key: []byte(args.ClientName), - Extras: args.flags(), - Body: args.bytes()} - - err := mc.Transmit(rq) - if err != nil { - return nil, err - } - - ch := make(chan TapEvent) - feed := &TapFeed{ - C: ch, - closer: make(chan bool), - } - go mc.runFeed(ch, feed) - return feed, nil -} - -// TapRecvHook is called after every incoming tap packet is received. -var TapRecvHook func(*gomemcached.MCRequest, int, error) - -// Internal goroutine that reads from the socket and writes events to -// the channel -func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) { - defer close(ch) - var headerBuf [gomemcached.HDR_LEN]byte -loop: - for { - // Read the next request from the server. - // - // (Can't call mc.Receive() because it reads a - // _response_ not a request.) - var pkt gomemcached.MCRequest - n, err := pkt.Receive(mc.conn, headerBuf[:]) - if TapRecvHook != nil { - TapRecvHook(&pkt, n, err) - } - - if err != nil { - if err != io.EOF { - feed.Error = err - } - break loop - } - - //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body) - - if pkt.Opcode == gomemcached.TAP_CONNECT { - // This is not an event from the server; it's - // an error response to my connect request. - feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body) - break loop - } - - event := makeTapEvent(pkt) - if event != nil { - if event.Opcode == tapEndStream { - break loop - } - - select { - case ch <- *event: - case <-feed.closer: - break loop - } - } - - if len(pkt.Extras) >= 4 { - reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:]) - if reqFlags&gomemcached.TAP_ACK != 0 { - if _, err := mc.sendAck(&pkt); err != nil { - feed.Error = err - break loop - } - } - } - } - if err := mc.Close(); err != nil { - logging.Errorf("Error closing memcached client: %v", err) - } -} - -func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) { - res := gomemcached.MCResponse{ - Opcode: pkt.Opcode, - Opaque: pkt.Opaque, - Status: gomemcached.SUCCESS, - } - return res.Transmit(mc.conn) -} - -// Close terminates a TapFeed. -// -// Call this if you stop using a TapFeed before its channel ends. -func (feed *TapFeed) Close() { - close(feed.closer) -} diff --git a/vendor/github.com/couchbase/gomemcached/client/transport.go b/vendor/github.com/couchbase/gomemcached/client/transport.go deleted file mode 100644 index f4cea17fca..0000000000 --- a/vendor/github.com/couchbase/gomemcached/client/transport.go +++ /dev/null @@ -1,67 +0,0 @@ -package memcached - -import ( - "errors" - "io" - - "github.com/couchbase/gomemcached" -) - -var errNoConn = errors.New("no connection") - -// UnwrapMemcachedError converts memcached errors to normal responses. -// -// If the error is a memcached response, declare the error to be nil -// so a client can handle the status without worrying about whether it -// indicates success or failure. -func UnwrapMemcachedError(rv *gomemcached.MCResponse, - err error) (*gomemcached.MCResponse, error) { - - if rv == err { - return rv, nil - } - return rv, err -} - -// ReceiveHook is called after every packet is received (or attempted to be) -var ReceiveHook func(*gomemcached.MCResponse, int, error) - -func getResponse(s io.Reader, hdrBytes []byte) (rv *gomemcached.MCResponse, n int, err error) { - if s == nil { - return nil, 0, errNoConn - } - - rv = &gomemcached.MCResponse{} - n, err = rv.Receive(s, hdrBytes) - - if ReceiveHook != nil { - ReceiveHook(rv, n, err) - } - - if err == nil && (rv.Status != gomemcached.SUCCESS && rv.Status != gomemcached.AUTH_CONTINUE) { - err = rv - } - return rv, n, err -} - -// TransmitHook is called after each packet is transmitted. -var TransmitHook func(*gomemcached.MCRequest, int, error) - -func transmitRequest(o io.Writer, req *gomemcached.MCRequest) (int, error) { - if o == nil { - return 0, errNoConn - } - n, err := req.Transmit(o) - if TransmitHook != nil { - TransmitHook(req, n, err) - } - return n, err -} - -func transmitResponse(o io.Writer, res *gomemcached.MCResponse) (int, error) { - if o == nil { - return 0, errNoConn - } - n, err := res.Transmit(o) - return n, err -} diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_event.go b/vendor/github.com/couchbase/gomemcached/client/upr_event.go deleted file mode 100644 index 2d3454aecc..0000000000 --- a/vendor/github.com/couchbase/gomemcached/client/upr_event.go +++ /dev/null @@ -1,403 +0,0 @@ -package memcached - -import ( - "encoding/binary" - "fmt" - "github.com/couchbase/gomemcached" - "math" -) - -type SystemEventType int - -const InvalidSysEvent SystemEventType = -1 - -const ( - CollectionCreate SystemEventType = 0 - CollectionDrop SystemEventType = iota - CollectionFlush SystemEventType = iota // KV did not implement - ScopeCreate SystemEventType = iota - ScopeDrop SystemEventType = iota - CollectionChanged SystemEventType = iota -) - -type ScopeCreateEvent interface { - GetSystemEventName() (string, error) - GetScopeId() (uint32, error) - GetManifestId() (uint64, error) -} - -type CollectionCreateEvent interface { - GetSystemEventName() (string, error) - GetScopeId() (uint32, error) - GetCollectionId() (uint32, error) - GetManifestId() (uint64, error) - GetMaxTTL() (uint32, error) -} - -type CollectionDropEvent interface { - GetScopeId() (uint32, error) - GetCollectionId() (uint32, error) - GetManifestId() (uint64, error) -} - -type ScopeDropEvent interface { - GetScopeId() (uint32, error) - GetManifestId() (uint64, error) -} - -type CollectionChangedEvent interface { - GetCollectionId() (uint32, error) - GetManifestId() (uint64, error) - GetMaxTTL() (uint32, error) -} - -var ErrorInvalidOp error = fmt.Errorf("Invalid Operation") -var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing") -var ErrorValueTooShort error = fmt.Errorf("Value length is too short") -var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL") - -// UprEvent memcached events for UPR streams. -type UprEvent struct { - Opcode gomemcached.CommandCode // Type of event - Status gomemcached.Status // Response status - VBucket uint16 // VBucket this event applies to - DataType uint8 // data type - Opaque uint16 // 16 MSB of opaque - VBuuid uint64 // This field is set by downstream - Flags uint32 // Item flags - Expiry uint32 // Item expiration time - Key, Value []byte // Item key/value - OldValue []byte // TODO: TBD: old document value - Cas uint64 // CAS value of the item - Seqno uint64 // sequence number of the mutation - RevSeqno uint64 // rev sequence number : deletions - LockTime uint32 // Lock time - MetadataSize uint16 // Metadata size - SnapstartSeq uint64 // start sequence number of this snapshot - SnapendSeq uint64 // End sequence number of the snapshot - SnapshotType uint32 // 0: disk 1: memory - FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number - Error error // Error value in case of a failure - ExtMeta []byte // Extended Metadata - AckSize uint32 // The number of bytes that can be Acked to DCP - SystemEvent SystemEventType // Only valid if IsSystemEvent() is true - SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different - ValueLen int // Cache it to avoid len() calls for performance - CollectionId uint32 // Valid if Collection is in use - StreamId *uint16 // Nil if not in use -} - -// FailoverLog containing vvuid and sequnce number -type FailoverLog [][2]uint64 - -// Containing a pair of vbno and the high seqno -type VBSeqnos [][2]uint64 - -func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent { - event := &UprEvent{ - Opcode: rq.Opcode, - VBucket: stream.Vbucket, - VBuuid: stream.Vbuuid, - Value: rq.Body, - Cas: rq.Cas, - ExtMeta: rq.ExtMeta, - DataType: rq.DataType, - ValueLen: len(rq.Body), - SystemEvent: InvalidSysEvent, - CollectionId: math.MaxUint32, - } - - event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType) - - // set AckSize for events that need to be acked to DCP, - // i.e., events with CommandCodes that need to be buffered in DCP - if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok { - event.AckSize = uint32(bytesReceivedFromDCP) - } - - // 16 LSBits are used by client library to encode vbucket number. - // 16 MSBits are left for application to multiplex on opaque value. - event.Opaque = appOpaque(rq.Opaque) - - if len(rq.Extras) >= uprMutationExtraLen && - event.Opcode == gomemcached.UPR_MUTATION { - - event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) - event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20]) - event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24]) - event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28]) - event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30]) - - } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen && - event.Opcode == gomemcached.UPR_DELETION { - - event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) - event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20]) - - } else if len(rq.Extras) >= uprDeletetionExtraLen && - event.Opcode == gomemcached.UPR_DELETION || - event.Opcode == gomemcached.UPR_EXPIRATION { - - event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) - event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18]) - - } else if len(rq.Extras) >= uprSnapshotExtraLen && - event.Opcode == gomemcached.UPR_SNAPSHOT { - - event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8]) - event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20]) - } else if event.IsSystemEvent() { - event.PopulateEvent(rq.Extras) - } else if event.IsSeqnoAdv() { - event.PopulateSeqnoAdv(rq.Extras) - } else if event.IsOsoSnapshot() { - event.PopulateOso(rq.Extras) - } - - return event -} - -func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) { - switch streamType { - case CollectionsStreamId: - for _, extra := range rq.FramingExtras { - streamId, streamIdErr := extra.GetStreamId() - if streamIdErr == nil { - event.StreamId = &streamId - } - } - // After parsing streamID, still need to populate regular collectionID - fallthrough - case CollectionsNonStreamId: - switch rq.Opcode { - // Only these will have CID encoded within the key - case gomemcached.UPR_MUTATION, - gomemcached.UPR_DELETION, - gomemcached.UPR_EXPIRATION: - uleb128 := Uleb128(rq.Key) - result, bytesShifted := uleb128.ToUint32(rq.Keylen) - event.CollectionId = result - event.Key = rq.Key[bytesShifted:] - default: - event.Key = rq.Key - } - case NonCollectionStream: - // Let default behavior be legacy stream type - fallthrough - default: - event.Key = rq.Key - } -} - -func (event *UprEvent) String() string { - name := gomemcached.CommandNames[event.Opcode] - if name == "" { - name = fmt.Sprintf("#%d", event.Opcode) - } - return name -} - -func (event *UprEvent) IsSnappyDataType() bool { - return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0) -} - -func (event *UprEvent) IsCollectionType() bool { - return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32 -} - -func (event *UprEvent) IsSystemEvent() bool { - return event.Opcode == gomemcached.DCP_SYSTEM_EVENT -} - -func (event *UprEvent) IsSeqnoAdv() bool { - return event.Opcode == gomemcached.DCP_SEQNO_ADV -} - -func (event *UprEvent) IsOsoSnapshot() bool { - return event.Opcode == gomemcached.DCP_OSO_SNAPSHOT -} - -func (event *UprEvent) PopulateEvent(extras []byte) { - if len(extras) < dcpSystemEventExtraLen { - // Wrong length, don't parse - return - } - - event.Seqno = binary.BigEndian.Uint64(extras[:8]) - event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12])) - var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14]) - event.SysEventVersion = uint8(versionTemp >> 8) -} - -func (event *UprEvent) PopulateSeqnoAdv(extras []byte) { - if len(extras) < dcpSeqnoAdvExtraLen { - // Wrong length, don't parse - return - } - - event.Seqno = binary.BigEndian.Uint64(extras[:8]) -} - -func (event *UprEvent) PopulateOso(extras []byte) { - if len(extras) < dcpOsoExtraLen { - // Wrong length, don't parse - return - } - event.Flags = binary.BigEndian.Uint32(extras[:4]) -} - -func (event *UprEvent) GetSystemEventName() (string, error) { - switch event.SystemEvent { - case CollectionCreate: - fallthrough - case ScopeCreate: - return string(event.Key), nil - default: - return "", ErrorInvalidOp - } -} - -func (event *UprEvent) GetManifestId() (uint64, error) { - switch event.SystemEvent { - // Version 0 only checks - case CollectionChanged: - fallthrough - case ScopeDrop: - fallthrough - case ScopeCreate: - fallthrough - case CollectionDrop: - if event.SysEventVersion > 0 { - return 0, ErrorInvalidVersion - } - fallthrough - case CollectionCreate: - // CollectionCreate supports version 1 - if event.SysEventVersion > 1 { - return 0, ErrorInvalidVersion - } - if event.ValueLen < 8 { - return 0, ErrorValueTooShort - } - return binary.BigEndian.Uint64(event.Value[0:8]), nil - default: - return 0, ErrorInvalidOp - } -} - -func (event *UprEvent) GetCollectionId() (uint32, error) { - switch event.SystemEvent { - case CollectionDrop: - if event.SysEventVersion > 0 { - return 0, ErrorInvalidVersion - } - fallthrough - case CollectionCreate: - if event.SysEventVersion > 1 { - return 0, ErrorInvalidVersion - } - if event.ValueLen < 16 { - return 0, ErrorValueTooShort - } - return binary.BigEndian.Uint32(event.Value[12:16]), nil - case CollectionChanged: - if event.SysEventVersion > 0 { - return 0, ErrorInvalidVersion - } - if event.ValueLen < 12 { - return 0, ErrorValueTooShort - } - return binary.BigEndian.Uint32(event.Value[8:12]), nil - default: - return 0, ErrorInvalidOp - } -} - -func (event *UprEvent) GetScopeId() (uint32, error) { - switch event.SystemEvent { - // version 0 checks - case ScopeCreate: - fallthrough - case ScopeDrop: - fallthrough - case CollectionDrop: - if event.SysEventVersion > 0 { - return 0, ErrorInvalidVersion - } - fallthrough - case CollectionCreate: - // CollectionCreate could be either 0 or 1 - if event.SysEventVersion > 1 { - return 0, ErrorInvalidVersion - } - if event.ValueLen < 12 { - return 0, ErrorValueTooShort - } - return binary.BigEndian.Uint32(event.Value[8:12]), nil - default: - return 0, ErrorInvalidOp - } -} - -func (event *UprEvent) GetMaxTTL() (uint32, error) { - switch event.SystemEvent { - case CollectionCreate: - if event.SysEventVersion < 1 { - return 0, ErrorNoMaxTTL - } - if event.ValueLen < 20 { - return 0, ErrorValueTooShort - } - return binary.BigEndian.Uint32(event.Value[16:20]), nil - case CollectionChanged: - if event.SysEventVersion > 0 { - return 0, ErrorInvalidVersion - } - if event.ValueLen < 16 { - return 0, ErrorValueTooShort - } - return binary.BigEndian.Uint32(event.Value[12:16]), nil - default: - return 0, ErrorInvalidOp - } -} - -// Only if error is nil: -// Returns true if event states oso begins -// Return false if event states oso ends -func (event *UprEvent) GetOsoBegin() (bool, error) { - if !event.IsOsoSnapshot() { - return false, ErrorInvalidOp - } - - if event.Flags == 1 { - return true, nil - } else if event.Flags == 2 { - return false, nil - } else { - return false, ErrorInvalidOp - } -} - -type Uleb128 []byte - -func (u Uleb128) ToUint32(cachedLen int) (result uint32, bytesShifted int) { - var shift uint = 0 - - for curByte := 0; curByte < cachedLen; curByte++ { - oneByte := u[curByte] - last7Bits := 0x7f & oneByte - result |= uint32(last7Bits) << shift - bytesShifted++ - if oneByte&0x80 == 0 { - break - } - shift += 7 - } - - return -} diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go deleted file mode 100644 index cdbed16bd3..0000000000 --- a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go +++ /dev/null @@ -1,1132 +0,0 @@ -// go implementation of upr client. -// See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md -// TODO -// 1. Use a pool allocator to avoid garbage -package memcached - -import ( - "encoding/binary" - "errors" - "fmt" - "github.com/couchbase/gomemcached" - "github.com/couchbase/goutils/logging" - "strconv" - "sync" - "sync/atomic" -) - -const uprMutationExtraLen = 30 -const uprDeletetionExtraLen = 18 -const uprDeletetionWithDeletionTimeExtraLen = 21 -const uprSnapshotExtraLen = 20 -const dcpSystemEventExtraLen = 13 -const dcpSeqnoAdvExtraLen = 8 -const bufferAckThreshold = 0.2 -const opaqueOpen = 0xBEAF0001 -const opaqueFailover = 0xDEADBEEF -const opaqueGetSeqno = 0xDEADBEEF -const uprDefaultNoopInterval = 120 -const dcpOsoExtraLen = 4 - -// Counter on top of opaqueOpen that others can draw from for open and control msgs -var opaqueOpenCtrlWell uint32 = opaqueOpen - -type PriorityType string - -// high > medium > disabled > low -const ( - PriorityDisabled PriorityType = "" - PriorityLow PriorityType = "low" - PriorityMed PriorityType = "medium" - PriorityHigh PriorityType = "high" -) - -type DcpStreamType int32 - -var UninitializedStream DcpStreamType = -1 - -const ( - NonCollectionStream DcpStreamType = 0 - CollectionsNonStreamId DcpStreamType = iota - CollectionsStreamId DcpStreamType = iota -) - -func (t DcpStreamType) String() string { - switch t { - case UninitializedStream: - return "Un-Initialized Stream" - case NonCollectionStream: - return "Traditional Non-Collection Stream" - case CollectionsNonStreamId: - return "Collections Stream without StreamID" - case CollectionsStreamId: - return "Collection Stream with StreamID" - default: - return "Unknown Stream Type" - } -} - -// UprStream is per stream data structure over an UPR Connection. -type UprStream struct { - Vbucket uint16 // Vbucket id - Vbuuid uint64 // vbucket uuid - StartSeq uint64 // start sequence number - EndSeq uint64 // end sequence number - connected bool - StreamType DcpStreamType -} - -type FeedState int - -const ( - FeedStateInitial = iota - FeedStateOpened = iota - FeedStateClosed = iota -) - -func (fs FeedState) String() string { - switch fs { - case FeedStateInitial: - return "Initial" - case FeedStateOpened: - return "Opened" - case FeedStateClosed: - return "Closed" - default: - return "Unknown" - } -} - -const ( - CompressionTypeStartMarker = iota // also means invalid - CompressionTypeNone = iota - CompressionTypeSnappy = iota - CompressionTypeEndMarker = iota // also means invalid -) - -// kv_engine/include/mcbp/protocol/datatype.h -const ( - JSONDataType uint8 = 1 - SnappyDataType uint8 = 2 - XattrDataType uint8 = 4 -) - -type UprFeatures struct { - Xattribute bool - CompressionType int - IncludeDeletionTime bool - DcpPriority PriorityType - EnableExpiry bool - EnableStreamId bool - EnableOso bool -} - -/** - * Used to handle multiple concurrent calls UprRequestStream() by UprFeed clients - * It is expected that a client that calls UprRequestStream() more than once should issue - * different "opaque" (version) numbers - */ -type opaqueStreamMap map[uint16]*UprStream // opaque -> stream - -type vbStreamNegotiator struct { - vbHandshakeMap map[uint16]opaqueStreamMap // vbno -> opaqueStreamMap - mutex sync.RWMutex -} - -func (negotiator *vbStreamNegotiator) initialize() { - negotiator.mutex.Lock() - negotiator.vbHandshakeMap = make(map[uint16]opaqueStreamMap) - negotiator.mutex.Unlock() -} - -func (negotiator *vbStreamNegotiator) registerRequest(vbno, opaque uint16, vbuuid, startSequence, endSequence uint64) { - negotiator.mutex.Lock() - defer negotiator.mutex.Unlock() - - var osMap opaqueStreamMap - var ok bool - if osMap, ok = negotiator.vbHandshakeMap[vbno]; !ok { - osMap = make(opaqueStreamMap) - negotiator.vbHandshakeMap[vbno] = osMap - } - - if _, ok = osMap[opaque]; !ok { - osMap[opaque] = &UprStream{ - Vbucket: vbno, - Vbuuid: vbuuid, - StartSeq: startSequence, - EndSeq: endSequence, - } - } -} - -func (negotiator *vbStreamNegotiator) getStreamsCntFromMap(vbno uint16) int { - negotiator.mutex.RLock() - defer negotiator.mutex.RUnlock() - - osmap, ok := negotiator.vbHandshakeMap[vbno] - if !ok { - return 0 - } else { - return len(osmap) - } -} - -func (negotiator *vbStreamNegotiator) getStreamFromMap(vbno, opaque uint16) (*UprStream, error) { - negotiator.mutex.RLock() - defer negotiator.mutex.RUnlock() - - osmap, ok := negotiator.vbHandshakeMap[vbno] - if !ok { - return nil, fmt.Errorf("Error: stream for vb: %v does not exist", vbno) - } - - stream, ok := osmap[opaque] - if !ok { - return nil, fmt.Errorf("Error: stream for vb: %v opaque: %v does not exist", vbno, opaque) - } - return stream, nil -} - -func (negotiator *vbStreamNegotiator) deleteStreamFromMap(vbno, opaque uint16) { - negotiator.mutex.Lock() - defer negotiator.mutex.Unlock() - - osmap, ok := negotiator.vbHandshakeMap[vbno] - if !ok { - return - } - - delete(osmap, opaque) - if len(osmap) == 0 { - delete(negotiator.vbHandshakeMap, vbno) - } -} - -func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed, - headerBuf [gomemcached.HDR_LEN]byte, pktPtr *gomemcached.MCRequest, bytesReceivedFromDCP int, - response *gomemcached.MCResponse) (*UprEvent, error) { - var event *UprEvent - - if feed == nil || response == nil || pktPtr == nil { - return nil, errors.New("Invalid inputs") - } - - // Get Stream from negotiator map - vbno := vbOpaque(response.Opaque) - opaque := appOpaque(response.Opaque) - - stream, err := negotiator.getStreamFromMap(vbno, opaque) - if err != nil { - err = fmt.Errorf("Stream not found for vb %d: %#v", vbno, *pktPtr) - logging.Errorf(err.Error()) - return nil, err - } - - status, rb, flog, err := handleStreamRequest(response, headerBuf[:]) - - if status == gomemcached.ROLLBACK { - event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP) - event.Status = status - // rollback stream - logging.Infof("UPR_STREAMREQ with rollback %d for vb %d Failed: %v", rb, vbno, err) - negotiator.deleteStreamFromMap(vbno, opaque) - } else if status == gomemcached.SUCCESS { - event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP) - event.Seqno = stream.StartSeq - event.FailoverLog = flog - event.Status = status - feed.activateStream(vbno, opaque, stream) - feed.negotiator.deleteStreamFromMap(vbno, opaque) - logging.Infof("UPR_STREAMREQ for vb %d successful", vbno) - - } else if err != nil { - logging.Errorf("UPR_STREAMREQ for vbucket %d erro %s", vbno, err.Error()) - event = &UprEvent{ - Opcode: gomemcached.UPR_STREAMREQ, - Status: status, - VBucket: vbno, - Error: err, - } - negotiator.deleteStreamFromMap(vbno, opaque) - } - return event, nil -} - -func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) { - negotiator.mutex.Lock() - defer negotiator.mutex.Unlock() - - delete(negotiator.vbHandshakeMap, vbno) -} - -// UprFeed represents an UPR feed. A feed contains a connection to a single -// host and multiple vBuckets -type UprFeed struct { - // lock for feed.vbstreams - muVbstreams sync.RWMutex - C <-chan *UprEvent // Exported channel for receiving UPR events - negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation - vbstreams map[uint16]*UprStream // official live vb->stream mapping - closer chan bool // closer - conn *Client // connection to UPR producer - Error error // error - bytesRead uint64 // total bytes read on this connection - toAckBytes uint32 // bytes client has read - maxAckBytes uint32 // Max buffer control ack bytes - stats UprStats // Stats for upr client - transmitCh chan *gomemcached.MCRequest // transmit command channel - transmitCl chan bool // closer channel for transmit go-routine - // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP - // if flag is false, upr feed will track how many bytes it has sent to client - // and use that to determine whether/when to send ack to DCP - ackByClient bool - feedState FeedState - muFeedState sync.RWMutex - activatedFeatures UprFeatures - collectionEnabled bool // This is needed separately because parsing depends on this - // DCP StreamID allows multiple filtered collection streams to share a single DCP Stream - // It is not allowed once a regular/legacy stream was started originally - streamsType DcpStreamType - initStreamTypeOnce sync.Once -} - -// Exported interface - to allow for mocking -type UprFeedIface interface { - Close() - Closed() bool - CloseStream(vbno, opaqueMSB uint16) error - GetError() error - GetUprStats() *UprStats - ClientAck(event *UprEvent) error - GetUprEventCh() <-chan *UprEvent - StartFeed() error - StartFeedWithConfig(datachan_len int) error - UprOpen(name string, sequence uint32, bufSize uint32) error - UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error - UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) - UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error - // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response - SetPriorityAsync(p PriorityType) error - - // Various Collection-Type RequestStreams - UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error -} - -type UprStats struct { - TotalBytes uint64 - TotalMutation uint64 - TotalBufferAckSent uint64 - TotalSnapShot uint64 -} - -// error codes -var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog") - -func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) { - if flogp != nil { - flog := *flogp - latest := flog[len(flog)-1] - return latest[0], latest[1], nil - } - return vbuuid, seqno, ErrorInvalidLog -} - -func (feed *UprFeed) sendCommands(mc *Client) { - transmitCh := feed.transmitCh - transmitCl := feed.transmitCl -loop: - for { - select { - case command := <-transmitCh: - if err := mc.Transmit(command); err != nil { - logging.Errorf("Failed to transmit command %s. Error %s", command.Opcode.String(), err.Error()) - // get feed to close and runFeed routine to exit - feed.Close() - break loop - } - - case <-transmitCl: - break loop - } - } - - // After sendCommands exits, write to transmitCh will block forever - // when we write to transmitCh, e.g., at CloseStream(), we need to check feed closure to have an exit route - - logging.Infof("sendCommands exiting") -} - -// Sets the specified stream as the connected stream for this vbno, and also cleans up negotiator -func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) error { - feed.muVbstreams.Lock() - defer feed.muVbstreams.Unlock() - - if feed.collectionEnabled { - stream.StreamType = feed.streamsType - } - - // Set this stream as the officially connected stream for this vb - stream.connected = true - feed.vbstreams[vbno] = stream - return nil -} - -func (feed *UprFeed) cleanUpVbStream(vbno uint16) { - feed.muVbstreams.Lock() - defer feed.muVbstreams.Unlock() - - delete(feed.vbstreams, vbno) -} - -// NewUprFeed creates a new UPR Feed. -// TODO: Describe side-effects on bucket instance and its connection pool. -func (mc *Client) NewUprFeed() (*UprFeed, error) { - return mc.NewUprFeedWithConfig(false /*ackByClient*/) -} - -func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) { - feed := &UprFeed{ - conn: mc, - closer: make(chan bool, 1), - vbstreams: make(map[uint16]*UprStream), - transmitCh: make(chan *gomemcached.MCRequest), - transmitCl: make(chan bool), - ackByClient: ackByClient, - collectionEnabled: mc.CollectionEnabled(), - streamsType: UninitializedStream, - } - - feed.negotiator.initialize() - - go feed.sendCommands(mc) - return feed, nil -} - -func (mc *Client) NewUprFeedIface() (UprFeedIface, error) { - return mc.NewUprFeed() -} - -func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) { - return mc.NewUprFeedWithConfig(ackByClient) -} - -func doUprOpen(mc *Client, name string, sequence uint32, features UprFeatures) error { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_OPEN, - Key: []byte(name), - Opaque: getUprOpenCtrlOpaque(), - } - - rq.Extras = make([]byte, 8) - binary.BigEndian.PutUint32(rq.Extras[:4], sequence) - - // opens a producer type connection - flags := gomemcached.DCP_PRODUCER - if features.Xattribute { - flags = flags | gomemcached.DCP_OPEN_INCLUDE_XATTRS - } - if features.IncludeDeletionTime { - flags = flags | gomemcached.DCP_OPEN_INCLUDE_DELETE_TIMES - } - binary.BigEndian.PutUint32(rq.Extras[4:], flags) - - return sendMcRequestSync(mc, rq) -} - -// Synchronously send a memcached request and wait for the response -func sendMcRequestSync(mc *Client, req *gomemcached.MCRequest) error { - if err := mc.Transmit(req); err != nil { - return err - } - - if res, err := mc.Receive(); err != nil { - return err - } else if req.Opcode != res.Opcode { - return fmt.Errorf("unexpected #opcode sent %v received %v", req.Opcode, res.Opaque) - } else if req.Opaque != res.Opaque { - return fmt.Errorf("opaque mismatch, sent %v received %v", req.Opaque, res.Opaque) - } else if res.Status != gomemcached.SUCCESS { - return fmt.Errorf("error %v", res.Status) - } - return nil -} - -// UprOpen to connect with a UPR producer. -// Name: name of te UPR connection -// sequence: sequence number for the connection -// bufsize: max size of the application -func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error { - var allFeaturesDisabled UprFeatures - err, _ := feed.uprOpen(name, sequence, bufSize, allFeaturesDisabled) - return err -} - -// UprOpen with XATTR enabled. -func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error { - var onlyXattrEnabled UprFeatures - onlyXattrEnabled.Xattribute = true - err, _ := feed.uprOpen(name, sequence, bufSize, onlyXattrEnabled) - return err -} - -func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) { - return feed.uprOpen(name, sequence, bufSize, features) -} - -func (feed *UprFeed) SetPriorityAsync(p PriorityType) error { - if !feed.isOpen() { - // do not send this command if upr feed is not yet open, otherwise it may interfere with - // feed start up process, which relies on synchronous message exchange with DCP. - return fmt.Errorf("Upr feed is not open. State=%v", feed.getState()) - } - - return feed.setPriority(p, false /*sync*/) -} - -func (feed *UprFeed) setPriority(p PriorityType, sync bool) error { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("set_priority"), - Body: []byte(p), - Opaque: getUprOpenCtrlOpaque(), - } - if sync { - return sendMcRequestSync(feed.conn, rq) - } else { - return feed.writeToTransmitCh(rq) - - } -} - -func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) { - mc := feed.conn - - // First set this to an invalid value to state that the method hasn't gotten to executing this control yet - activatedFeatures.CompressionType = CompressionTypeEndMarker - - if err = doUprOpen(mc, name, sequence, features); err != nil { - return - } - - activatedFeatures.Xattribute = features.Xattribute - - // send a UPR control message to set the window size for the this connection - if bufSize > 0 { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("connection_buffer_size"), - Body: []byte(strconv.Itoa(int(bufSize))), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - if err != nil { - return - } - feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufSize)) - } - - // enable noop and set noop interval - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("enable_noop"), - Body: []byte("true"), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - if err != nil { - return - } - - rq = &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("set_noop_interval"), - Body: []byte(strconv.Itoa(int(uprDefaultNoopInterval))), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - if err != nil { - return - } - - if features.CompressionType == CompressionTypeSnappy { - activatedFeatures.CompressionType = CompressionTypeNone - rq = &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("force_value_compression"), - Body: []byte("true"), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - } else if features.CompressionType == CompressionTypeEndMarker { - err = fmt.Errorf("UPR_CONTROL Failed - Invalid CompressionType: %v", features.CompressionType) - } - if err != nil { - return - } - activatedFeatures.CompressionType = features.CompressionType - - if features.DcpPriority != PriorityDisabled { - err = feed.setPriority(features.DcpPriority, true /*sync*/) - if err == nil { - activatedFeatures.DcpPriority = features.DcpPriority - } else { - return - } - } - - if features.EnableExpiry { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("enable_expiry_opcode"), - Body: []byte("true"), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - if err != nil { - return - } - activatedFeatures.EnableExpiry = true - } - - if features.EnableStreamId { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("enable_stream_id"), - Body: []byte("true"), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - if err != nil { - return - } - activatedFeatures.EnableStreamId = true - } - - if features.EnableOso { - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CONTROL, - Key: []byte("enable_out_of_order_snapshots"), - Body: []byte("true"), - Opaque: getUprOpenCtrlOpaque(), - } - err = sendMcRequestSync(feed.conn, rq) - if err != nil { - return - } - activatedFeatures.EnableOso = true - } - - // everything is ok so far, set upr feed to open state - feed.activatedFeatures = activatedFeatures - feed.setOpen() - return -} - -// UprRequestStream for a single vbucket. -func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, - vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { - - return feed.UprRequestCollectionsStream(vbno, opaqueMSB, flags, vuuid, startSequence, endSequence, snapStart, snapEnd, nil) -} - -func (feed *UprFeed) initStreamType(filter *CollectionsFilter) (err error) { - if filter != nil && filter.UseStreamId && !feed.activatedFeatures.EnableStreamId { - err = fmt.Errorf("Cannot use streamID based filter if the feed was not started with the streamID feature") - return - } - - streamInitFunc := func() { - if feed.streamsType != UninitializedStream { - // Shouldn't happen - err = fmt.Errorf("The current feed has already been started in %v mode", feed.streamsType.String()) - } else { - if !feed.collectionEnabled { - feed.streamsType = NonCollectionStream - } else { - if filter != nil && filter.UseStreamId { - feed.streamsType = CollectionsStreamId - } else { - feed.streamsType = CollectionsNonStreamId - } - } - } - } - feed.initStreamTypeOnce.Do(streamInitFunc) - return -} - -func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, - vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error { - - err := feed.initStreamType(filter) - if err != nil { - return err - } - - var mcRequestBody []byte - if filter != nil { - err = filter.IsValid() - if err != nil { - return err - } - mcRequestBody, err = filter.ToStreamReqBody() - if err != nil { - return err - } - } - - rq := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_STREAMREQ, - VBucket: vbno, - Opaque: composeOpaque(vbno, opaqueMSB), - Body: mcRequestBody, - } - - rq.Extras = make([]byte, 48) // #Extras - binary.BigEndian.PutUint32(rq.Extras[:4], flags) - binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0)) - binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence) - binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence) - binary.BigEndian.PutUint64(rq.Extras[24:32], vbuuid) - binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart) - binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd) - - feed.negotiator.registerRequest(vbno, opaqueMSB, vbuuid, startSequence, endSequence) - // Any client that has ever called this method, regardless of return code, - // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit. - - if err = feed.conn.Transmit(rq); err != nil { - logging.Errorf("Error in StreamRequest %s", err.Error()) - // If an error occurs during transmit, then the UPRFeed will keep the stream - // in the vbstreams map. This is to prevent nil lookup from any previously - // sent stream requests. - return err - } - - return nil -} - -// CloseStream for specified vbucket. -func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error { - - err := feed.validateCloseStream(vbno) - if err != nil { - logging.Infof("CloseStream for %v has been skipped because of error %v", vbno, err) - return err - } - - closeStream := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_CLOSESTREAM, - VBucket: vbno, - Opaque: composeOpaque(vbno, opaqueMSB), - } - - feed.writeToTransmitCh(closeStream) - - return nil -} - -func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent { - return feed.C -} - -func (feed *UprFeed) GetError() error { - return feed.Error -} - -func (feed *UprFeed) validateCloseStream(vbno uint16) error { - feed.muVbstreams.RLock() - nilVbStream := feed.vbstreams[vbno] == nil - feed.muVbstreams.RUnlock() - - if nilVbStream && (feed.negotiator.getStreamsCntFromMap(vbno) == 0) { - return fmt.Errorf("Stream for vb %d has not been requested", vbno) - } - - return nil -} - -func (feed *UprFeed) writeToTransmitCh(rq *gomemcached.MCRequest) error { - // write to transmitCh may block forever if sendCommands has exited - // check for feed closure to have an exit route in this case - select { - case <-feed.closer: - errMsg := fmt.Sprintf("Abort sending request to transmitCh because feed has been closed. request=%v", rq) - logging.Infof(errMsg) - return errors.New(errMsg) - case feed.transmitCh <- rq: - } - return nil -} - -// StartFeed to start the upper feed. -func (feed *UprFeed) StartFeed() error { - return feed.StartFeedWithConfig(10) -} - -func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error { - ch := make(chan *UprEvent, datachan_len) - feed.C = ch - go feed.runFeed(ch) - return nil -} - -func parseFailoverLog(body []byte) (*FailoverLog, error) { - if len(body)%16 != 0 { - err := fmt.Errorf("invalid body length %v, in failover-log", len(body)) - return nil, err - } - log := make(FailoverLog, len(body)/16) - for i, j := 0, 0; i < len(body); i += 16 { - vuuid := binary.BigEndian.Uint64(body[i : i+8]) - seqno := binary.BigEndian.Uint64(body[i+8 : i+16]) - log[j] = [2]uint64{vuuid, seqno} - j++ - } - return &log, nil -} - -func parseGetSeqnoResp(body []byte) (*VBSeqnos, error) { - // vbno of 2 bytes + seqno of 8 bytes - var entryLen int = 10 - - if len(body)%entryLen != 0 { - err := fmt.Errorf("invalid body length %v, in getVbSeqno", len(body)) - return nil, err - } - vbSeqnos := make(VBSeqnos, len(body)/entryLen) - for i, j := 0, 0; i < len(body); i += entryLen { - vbno := binary.BigEndian.Uint16(body[i : i+2]) - seqno := binary.BigEndian.Uint64(body[i+2 : i+10]) - vbSeqnos[j] = [2]uint64{uint64(vbno), seqno} - j++ - } - return &vbSeqnos, nil -} - -func handleStreamRequest( - res *gomemcached.MCResponse, - headerBuf []byte, -) (gomemcached.Status, uint64, *FailoverLog, error) { - - var rollback uint64 - var err error - - switch { - case res.Status == gomemcached.ROLLBACK: - logging.Infof("Rollback response. body=%v, headerBuf=%v\n", res.Body, headerBuf) - rollback = binary.BigEndian.Uint64(res.Body) - logging.Infof("Rollback seqno is %v for response with opaque %v\n", rollback, res.Opaque) - return res.Status, rollback, nil, nil - - case res.Status != gomemcached.SUCCESS: - err = fmt.Errorf("unexpected status %v for response with opaque %v", res.Status, res.Opaque) - return res.Status, 0, nil, err - } - - flog, err := parseFailoverLog(res.Body[:]) - return res.Status, rollback, flog, err -} - -// generate stream end responses for all active vb streams -func (feed *UprFeed) doStreamClose(ch chan *UprEvent) { - feed.muVbstreams.RLock() - - uprEvents := make([]*UprEvent, len(feed.vbstreams)) - index := 0 - for vbno, stream := range feed.vbstreams { - uprEvent := &UprEvent{ - VBucket: vbno, - VBuuid: stream.Vbuuid, - Opcode: gomemcached.UPR_STREAMEND, - } - uprEvents[index] = uprEvent - index++ - } - - // release the lock before sending uprEvents to ch, which may block - feed.muVbstreams.RUnlock() - -loop: - for _, uprEvent := range uprEvents { - select { - case ch <- uprEvent: - case <-feed.closer: - logging.Infof("Feed has been closed. Aborting doStreamClose.") - break loop - } - } -} - -func (feed *UprFeed) runFeed(ch chan *UprEvent) { - defer close(ch) - var headerBuf [gomemcached.HDR_LEN]byte - var pkt gomemcached.MCRequest - var event *UprEvent - - mc := feed.conn.Hijack() - uprStats := &feed.stats - -loop: - for { - select { - case <-feed.closer: - logging.Infof("Feed has been closed. Exiting.") - break loop - default: - bytes, err := pkt.Receive(mc, headerBuf[:]) - if err != nil { - logging.Errorf("Error in receive %s", err.Error()) - feed.Error = err - // send all the stream close messages to the client - feed.doStreamClose(ch) - break loop - } else { - event = nil - res := &gomemcached.MCResponse{ - Opcode: pkt.Opcode, - Cas: pkt.Cas, - Opaque: pkt.Opaque, - Status: gomemcached.Status(pkt.VBucket), - Extras: pkt.Extras, - Key: pkt.Key, - Body: pkt.Body, - } - - vb := vbOpaque(pkt.Opaque) - appOpaque := appOpaque(pkt.Opaque) - uprStats.TotalBytes = uint64(bytes) - - feed.muVbstreams.RLock() - stream := feed.vbstreams[vb] - feed.muVbstreams.RUnlock() - - switch pkt.Opcode { - case gomemcached.UPR_STREAMREQ: - event, err = feed.negotiator.handleStreamRequest(feed, headerBuf, &pkt, bytes, res) - if err != nil { - logging.Infof(err.Error()) - break loop - } - case gomemcached.UPR_MUTATION, - gomemcached.UPR_DELETION, - gomemcached.UPR_EXPIRATION: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - event = makeUprEvent(pkt, stream, bytes) - uprStats.TotalMutation++ - - case gomemcached.UPR_STREAMEND: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - //stream has ended - event = makeUprEvent(pkt, stream, bytes) - logging.Infof("Stream Ended for vb %d", vb) - - feed.negotiator.deleteStreamFromMap(vb, appOpaque) - feed.cleanUpVbStream(vb) - - case gomemcached.UPR_SNAPSHOT: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - // snapshot marker - event = makeUprEvent(pkt, stream, bytes) - uprStats.TotalSnapShot++ - - case gomemcached.UPR_FLUSH: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - // special processing for flush ? - event = makeUprEvent(pkt, stream, bytes) - - case gomemcached.UPR_CLOSESTREAM: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - event = makeUprEvent(pkt, stream, bytes) - event.Opcode = gomemcached.UPR_STREAMEND // opcode re-write !! - logging.Infof("Stream Closed for vb %d StreamEnd simulated", vb) - - feed.negotiator.deleteStreamFromMap(vb, appOpaque) - feed.cleanUpVbStream(vb) - - case gomemcached.UPR_ADDSTREAM: - logging.Infof("Opcode %v not implemented", pkt.Opcode) - - case gomemcached.UPR_CONTROL, gomemcached.UPR_BUFFERACK: - if res.Status != gomemcached.SUCCESS { - logging.Infof("Opcode %v received status %d", pkt.Opcode.String(), res.Status) - } - - case gomemcached.UPR_NOOP: - // send a NOOP back - noop := &gomemcached.MCResponse{ - Opcode: gomemcached.UPR_NOOP, - Opaque: pkt.Opaque, - } - - if err := feed.conn.TransmitResponse(noop); err != nil { - logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error()) - } - case gomemcached.DCP_SYSTEM_EVENT: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - event = makeUprEvent(pkt, stream, bytes) - case gomemcached.UPR_FAILOVERLOG: - logging.Infof("Failover log for vb %d received: %v", vb, pkt) - case gomemcached.DCP_SEQNO_ADV: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - event = makeUprEvent(pkt, stream, bytes) - case gomemcached.DCP_OSO_SNAPSHOT: - if stream == nil { - logging.Infof("Stream not found for vb %d: %#v", vb, pkt) - break loop - } - event = makeUprEvent(pkt, stream, bytes) - default: - logging.Infof("Recived an unknown response for vbucket %d", vb) - } - } - - if event != nil { - select { - case ch <- event: - case <-feed.closer: - logging.Infof("Feed has been closed. Skip sending events. Exiting.") - break loop - } - - feed.muVbstreams.RLock() - l := len(feed.vbstreams) - feed.muVbstreams.RUnlock() - - if event.Opcode == gomemcached.UPR_CLOSESTREAM && l == 0 { - logging.Infof("No more streams") - } - } - - if !feed.ackByClient { - // if client does not ack, do the ack check now - feed.sendBufferAckIfNeeded(event) - } - } - } - - // make sure that feed is closed before we signal transmitCl and exit runFeed - feed.Close() - - close(feed.transmitCl) - logging.Infof("runFeed exiting") -} - -// Client, after completing processing of an UprEvent, need to call this API to notify UprFeed, -// so that UprFeed can update its ack bytes stats and send ack to DCP if needed -// Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work -// This API is not thread safe. Caller should NOT have more than one go rountine calling this API -func (feed *UprFeed) ClientAck(event *UprEvent) error { - if !feed.ackByClient { - return errors.New("Upr feed does not have ackByclient flag set") - } - feed.sendBufferAckIfNeeded(event) - return nil -} - -// increment ack bytes if the event needs to be acked to DCP -// send buffer ack if enough ack bytes have been accumulated -func (feed *UprFeed) sendBufferAckIfNeeded(event *UprEvent) { - if event == nil || event.AckSize == 0 { - // this indicates that there is no need to ack to DCP - return - } - - totalBytes := feed.toAckBytes + event.AckSize - if totalBytes > feed.maxAckBytes { - feed.toAckBytes = 0 - feed.sendBufferAck(totalBytes) - } else { - feed.toAckBytes = totalBytes - } -} - -// send buffer ack to dcp -func (feed *UprFeed) sendBufferAck(sendSize uint32) { - bufferAck := &gomemcached.MCRequest{ - Opcode: gomemcached.UPR_BUFFERACK, - } - bufferAck.Extras = make([]byte, 4) - binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(sendSize)) - feed.writeToTransmitCh(bufferAck) - feed.stats.TotalBufferAckSent++ -} - -func (feed *UprFeed) GetUprStats() *UprStats { - return &feed.stats -} - -func composeOpaque(vbno, opaqueMSB uint16) uint32 { - return (uint32(opaqueMSB) << 16) | uint32(vbno) -} - -func getUprOpenCtrlOpaque() uint32 { - return atomic.AddUint32(&opaqueOpenCtrlWell, 1) -} - -func appOpaque(opq32 uint32) uint16 { - return uint16((opq32 & 0xFFFF0000) >> 16) -} - -func vbOpaque(opq32 uint32) uint16 { - return uint16(opq32 & 0xFFFF) -} - -// Close this UprFeed. -func (feed *UprFeed) Close() { - feed.muFeedState.Lock() - defer feed.muFeedState.Unlock() - if feed.feedState != FeedStateClosed { - close(feed.closer) - feed.feedState = FeedStateClosed - feed.negotiator.initialize() - } -} - -// check if the UprFeed has been closed -func (feed *UprFeed) Closed() bool { - feed.muFeedState.RLock() - defer feed.muFeedState.RUnlock() - return feed.feedState == FeedStateClosed -} - -// set upr feed to opened state after initialization is done -func (feed *UprFeed) setOpen() { - feed.muFeedState.Lock() - defer feed.muFeedState.Unlock() - feed.feedState = FeedStateOpened -} - -func (feed *UprFeed) isOpen() bool { - feed.muFeedState.RLock() - defer feed.muFeedState.RUnlock() - return feed.feedState == FeedStateOpened -} - -func (feed *UprFeed) getState() FeedState { - feed.muFeedState.RLock() - defer feed.muFeedState.RUnlock() - return feed.feedState -} diff --git a/vendor/github.com/couchbase/gomemcached/flexibleFraming.go b/vendor/github.com/couchbase/gomemcached/flexibleFraming.go deleted file mode 100644 index a545885fd8..0000000000 --- a/vendor/github.com/couchbase/gomemcached/flexibleFraming.go +++ /dev/null @@ -1,398 +0,0 @@ -package gomemcached - -import ( - "encoding/binary" - "fmt" -) - -type FrameObjType int - -const ( - FrameBarrier FrameObjType = iota - FrameDurability FrameObjType = iota - FrameDcpStreamId FrameObjType = iota - FrameOpenTracing FrameObjType = iota - FrameImpersonate FrameObjType = iota -) - -const MAX_USER_LEN = 15 // TODO half byte shifting to be implemented -// it's not very efficient so we currently truncate user names -const FAST_USER_LEN = 15 - -type FrameInfo struct { - ObjId FrameObjType - ObjLen int - ObjData []byte -} - -var ErrorInvalidOp error = fmt.Errorf("Specified method is not applicable") -var ErrorObjLenNotMatch error = fmt.Errorf("Object length does not match data") - -func (f *FrameInfo) Validate() error { - switch f.ObjId { - case FrameBarrier: - if f.ObjLen != 0 { - return fmt.Errorf("Invalid FrameBarrier - length is %v\n", f.ObjLen) - } else if f.ObjLen != len(f.ObjData) { - return ErrorObjLenNotMatch - } - case FrameDurability: - if f.ObjLen != 1 && f.ObjLen != 3 { - return fmt.Errorf("Invalid FrameDurability - length is %v\n", f.ObjLen) - } else if f.ObjLen != len(f.ObjData) { - return ErrorObjLenNotMatch - } - case FrameDcpStreamId: - if f.ObjLen != 2 { - return fmt.Errorf("Invalid FrameDcpStreamId - length is %v\n", f.ObjLen) - } else if f.ObjLen != len(f.ObjData) { - return ErrorObjLenNotMatch - } - case FrameOpenTracing: - if f.ObjLen != 1 { - return fmt.Errorf("Invalid FrameImpersonate - length is %v\n", f.ObjLen) - } else if f.ObjLen != len(f.ObjData) { - return ErrorObjLenNotMatch - } - case FrameImpersonate: - default: - return fmt.Errorf("Unknown FrameInfo type") - } - return nil -} - -func (f *FrameInfo) GetStreamId() (uint16, error) { - if f.ObjId != FrameDcpStreamId { - return 0, ErrorInvalidOp - } - - var output uint16 - output = uint16(f.ObjData[0]) - output = output << 8 - output |= uint16(f.ObjData[1]) - return output, nil -} - -type DurabilityLvl uint8 - -const ( - DuraInvalid DurabilityLvl = iota // Not used (0x0) - DuraMajority DurabilityLvl = iota // (0x01) - DuraMajorityAndPersistOnMaster DurabilityLvl = iota // (0x02) - DuraPersistToMajority DurabilityLvl = iota // (0x03) -) - -func (f *FrameInfo) GetDurabilityRequirements() (lvl DurabilityLvl, timeoutProvided bool, timeoutMs uint16, err error) { - if f.ObjId != FrameDurability { - err = ErrorInvalidOp - return - } - if f.ObjLen != 1 && f.ObjLen != 3 { - err = ErrorObjLenNotMatch - return - } - - lvl = DurabilityLvl(uint8(f.ObjData[0])) - - if f.ObjLen == 3 { - timeoutProvided = true - timeoutMs = binary.BigEndian.Uint16(f.ObjData[1:2]) - } - - return -} - -func incrementMarker(bitsToBeIncremented, byteIncrementCnt *int, framingElen, curObjIdx int) (int, error) { - for *bitsToBeIncremented >= 8 { - *byteIncrementCnt++ - *bitsToBeIncremented -= 8 - } - marker := curObjIdx + *byteIncrementCnt - if marker > framingElen { - return -1, fmt.Errorf("Out of bounds") - } - return marker, nil -} - -func (f *FrameInfo) Bytes() ([]byte, bool) { - return obj2Bytes(f.ObjId, f.ObjLen, f.ObjData) -} - -// TODO implement half byte shifting for impersonate user names -// halfByteRemaining will always be false, because ObjID and Len haven't gotten that large yet -// and user names are truncated -func obj2Bytes(id FrameObjType, len int, data []byte) (output []byte, halfByteRemaining bool) { - if len < 16 { - - // ObjIdentifier - 4 bits + ObjLength - 4 bits - var idAndLen uint8 - idAndLen |= uint8(id) << 4 - idAndLen |= uint8(len) - output = append(output, byte(idAndLen)) - - // Rest is Data - output = append(output, data[:len]...) - - } else { - } - return -} - -func parseFrameInfoObjects(buf []byte, framingElen int) (objs []FrameInfo, err error, halfByteRemaining bool) { - var curObjIdx int - var byteIncrementCnt int - var bitsToBeIncremented int - var marker int - - // Parse frameInfo objects - for curObjIdx = 0; curObjIdx < framingElen; curObjIdx += byteIncrementCnt { - byteIncrementCnt = 0 - var oneFrameObj FrameInfo - - // First get the objId - // ------------------------- - var objId int - var objHeader uint8 = buf[curObjIdx] - var objIdentifierRaw uint8 - if bitsToBeIncremented == 0 { - // ObjHeader - // 0 1 2 3 4 5 6 7 - // ^-----^ - // ObjIdentifierRaw - objIdentifierRaw = (objHeader & 0xf0) >> 4 - } else { - // ObjHeader - // 0 1 2 3 4 5 6 7 - // ^-----^ - // ObjIdentifierRaw - objIdentifierRaw = (objHeader & 0x0f) - } - bitsToBeIncremented += 4 - - marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) - if err != nil { - return - } - - // Value is 0-14 - objId = int(objIdentifierRaw & 0xe) - // If bit 15 is set, ID is 15 + value of next byte - if objIdentifierRaw&0x1 > 0 { - if bitsToBeIncremented > 0 { - // ObjHeader - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - // ^-----^ ^---------------^ - // ObjId1 Extension - // ^ marker - buffer := uint16(buf[marker]) - buffer = buffer << 8 - buffer |= uint16(buf[marker+1]) - var extension uint8 = uint8(buffer & 0xff0 >> 4) - objId += int(extension) - } else { - // ObjHeader - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - // ^-----^ ^-------------------^ - // ObjId1 extension - // ^ marker - var extension uint8 = uint8(buf[marker]) - objId += int(extension) - } - bitsToBeIncremented += 8 - } - - marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) - if err != nil { - return - } - oneFrameObj.ObjId = FrameObjType(objId) - - // Then get the obj length - // ------------------------- - var objLenRaw uint8 - var objLen int - if bitsToBeIncremented > 0 { - // ObjHeader - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - // ^ ^---------^ - // marker objLen - objLenRaw = uint8(buf[marker]) & 0x0f - } else { - // ObjHeader - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - // ^--------^ - // objLen - // ^ marker - objLenRaw = uint8(buf[marker]) & 0xf0 >> 4 - } - bitsToBeIncremented += 4 - - marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) - if err != nil { - return - } - - // Length is 0-14 - objLen = int(objLenRaw & 0xe) - // If bit 15 is set, lenghth is 15 + value of next byte - if objLenRaw&0x1 > 0 { - if bitsToBeIncremented == 0 { - // ObjHeader - // 12 13 14 15 16 17 18 19 20 21 22 23 - // ^---------^ ^--------------------^ - // objLen extension - // ^ marker - var extension uint8 = uint8(buf[marker]) - objLen += int(extension) - } else { - // ObjHeader - // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 - // ^--------^ ^---------------------^ - // objLen extension - // ^ marker var buffer uint16 - buffer := uint16(buf[marker]) - buffer = buffer << 8 - buffer |= uint16(buf[marker+1]) - var extension uint8 = uint8(buffer & 0xff0 >> 4) - objLen += int(extension) - } - bitsToBeIncremented += 8 - } - - marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) - if err != nil { - return - } - oneFrameObj.ObjLen = objLen - - // The rest is N-bytes of data based on the length - if bitsToBeIncremented == 0 { - // No weird alignment needed - oneFrameObj.ObjData = buf[marker : marker+objLen] - } else { - // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 - // ^--------^ ^---------------------^ ^---------> - // objLen extension data - // ^ marker - oneFrameObj.ObjData = ShiftByteSliceLeft4Bits(buf[marker : marker+objLen+1]) - } - err = oneFrameObj.Validate() - if err != nil { - return - } - objs = append(objs, oneFrameObj) - - bitsToBeIncremented += 8 * objLen - marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) - } - - if bitsToBeIncremented > 0 { - halfByteRemaining = true - } - return -} - -func ShiftByteSliceLeft4Bits(slice []byte) (replacement []byte) { - var buffer uint16 - var i int - sliceLen := len(slice) - - if sliceLen < 2 { - // Let's not shift less than 16 bits - return - } - - replacement = make([]byte, sliceLen, cap(slice)) - - for i = 0; i < sliceLen-1; i++ { - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - // ^-----^ ^---------------^ ^----------- - // garbage data byte 0 data byte 1 - buffer = uint16(slice[i]) - buffer = buffer << 8 - buffer |= uint16(slice[i+1]) - replacement[i] = uint8(buffer & 0xff0 >> 4) - } - - if i < sliceLen { - lastByte := slice[sliceLen-1] - lastByte = lastByte << 4 - replacement[i] = lastByte - } - return -} - -// The following is used to theoretically support frameInfo ObjID extensions -// for completeness, but they are not very efficient though -func ShiftByteSliceRight4Bits(slice []byte) (replacement []byte) { - var buffer uint16 - var i int - var leftovers uint8 // 4 bits only - var replacementUnit uint16 - var first bool = true - var firstLeftovers uint8 - var lastLeftovers uint8 - sliceLen := len(slice) - - if sliceLen < 2 { - // Let's not shift less than 16 bits - return - } - - if slice[sliceLen-1]&0xf == 0 { - replacement = make([]byte, sliceLen, cap(slice)) - } else { - replacement = make([]byte, sliceLen+1, cap(slice)+1) - } - - for i = 0; i < sliceLen-1; i++ { - buffer = binary.BigEndian.Uint16(slice[i : i+2]) - // (buffer) - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - // ^-------------^ ^-------------------^ - // data byte 0 data byte 1 - // - // into - // - // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 - // ^-----^ ^---------------^ ^--------------------^ ^----------^ - // zeroes data byte 0 data byte 1 zeroes - - if first { - // The leftover OR'ing will overwrite the first 4 bits of data byte 0. Save them - firstLeftovers = uint8(buffer & 0xf000 >> 12) - first = false - } - replacementUnit = 0 - replacementUnit |= uint16(leftovers) << 12 - replacementUnit |= (buffer & 0xff00) >> 4 // data byte 0 - replacementUnit |= buffer & 0xff >> 4 // data byte 1 first 4 bits - lastLeftovers = uint8(buffer&0xf) << 4 - - replacement[i+1] = byte(replacementUnit) - - leftovers = uint8((buffer & 0x000f) << 4) - } - - replacement[0] = byte(uint8(replacement[0]) | firstLeftovers) - if lastLeftovers > 0 { - replacement[sliceLen] = byte(lastLeftovers) - } - return -} - -func Merge2HalfByteSlices(src1, src2 []byte) (output []byte) { - src1Len := len(src1) - src2Len := len(src2) - output = make([]byte, src1Len+src2Len-1) - - var mergeByte uint8 = src1[src1Len-1] - mergeByte |= uint8(src2[0]) - - copy(output, src1) - copy(output[src1Len:], src2[1:]) - - output[src1Len-1] = byte(mergeByte) - - return -} diff --git a/vendor/github.com/couchbase/gomemcached/go.mod b/vendor/github.com/couchbase/gomemcached/go.mod deleted file mode 100644 index 3355d4ea75..0000000000 --- a/vendor/github.com/couchbase/gomemcached/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/couchbase/gomemcached - -go 1.13 diff --git a/vendor/github.com/couchbase/gomemcached/mc_constants.go b/vendor/github.com/couchbase/gomemcached/mc_constants.go deleted file mode 100644 index 19741f5a0d..0000000000 --- a/vendor/github.com/couchbase/gomemcached/mc_constants.go +++ /dev/null @@ -1,364 +0,0 @@ -// Package gomemcached is binary protocol packet formats and constants. -package gomemcached - -import ( - "fmt" -) - -const ( - REQ_MAGIC = 0x80 - RES_MAGIC = 0x81 - FLEX_MAGIC = 0x08 - FLEX_RES_MAGIC = 0x18 -) - -// CommandCode for memcached packets. -type CommandCode uint8 - -const ( - GET = CommandCode(0x00) - SET = CommandCode(0x01) - ADD = CommandCode(0x02) - REPLACE = CommandCode(0x03) - DELETE = CommandCode(0x04) - INCREMENT = CommandCode(0x05) - DECREMENT = CommandCode(0x06) - QUIT = CommandCode(0x07) - FLUSH = CommandCode(0x08) - GETQ = CommandCode(0x09) - NOOP = CommandCode(0x0a) - VERSION = CommandCode(0x0b) - GETK = CommandCode(0x0c) - GETKQ = CommandCode(0x0d) - APPEND = CommandCode(0x0e) - PREPEND = CommandCode(0x0f) - STAT = CommandCode(0x10) - SETQ = CommandCode(0x11) - ADDQ = CommandCode(0x12) - REPLACEQ = CommandCode(0x13) - DELETEQ = CommandCode(0x14) - INCREMENTQ = CommandCode(0x15) - DECREMENTQ = CommandCode(0x16) - QUITQ = CommandCode(0x17) - FLUSHQ = CommandCode(0x18) - APPENDQ = CommandCode(0x19) - AUDIT = CommandCode(0x27) - PREPENDQ = CommandCode(0x1a) - GAT = CommandCode(0x1d) - HELLO = CommandCode(0x1f) - RGET = CommandCode(0x30) - RSET = CommandCode(0x31) - RSETQ = CommandCode(0x32) - RAPPEND = CommandCode(0x33) - RAPPENDQ = CommandCode(0x34) - RPREPEND = CommandCode(0x35) - RPREPENDQ = CommandCode(0x36) - RDELETE = CommandCode(0x37) - RDELETEQ = CommandCode(0x38) - RINCR = CommandCode(0x39) - RINCRQ = CommandCode(0x3a) - RDECR = CommandCode(0x3b) - RDECRQ = CommandCode(0x3c) - - SASL_LIST_MECHS = CommandCode(0x20) - SASL_AUTH = CommandCode(0x21) - SASL_STEP = CommandCode(0x22) - - SET_VBUCKET = CommandCode(0x3d) - - TAP_CONNECT = CommandCode(0x40) // Client-sent request to initiate Tap feed - TAP_MUTATION = CommandCode(0x41) // Notification of a SET/ADD/REPLACE/etc. on the server - TAP_DELETE = CommandCode(0x42) // Notification of a DELETE on the server - TAP_FLUSH = CommandCode(0x43) // Replicates a flush_all command - TAP_OPAQUE = CommandCode(0x44) // Opaque control data from the engine - TAP_VBUCKET_SET = CommandCode(0x45) // Sets state of vbucket in receiver (used in takeover) - TAP_CHECKPOINT_START = CommandCode(0x46) // Notifies start of new checkpoint - TAP_CHECKPOINT_END = CommandCode(0x47) // Notifies end of checkpoint - GET_ALL_VB_SEQNOS = CommandCode(0x48) // Get current high sequence numbers from all vbuckets located on the server - - UPR_OPEN = CommandCode(0x50) // Open a UPR connection with a name - UPR_ADDSTREAM = CommandCode(0x51) // Sent by ebucketMigrator to UPR Consumer - UPR_CLOSESTREAM = CommandCode(0x52) // Sent by eBucketMigrator to UPR Consumer - UPR_FAILOVERLOG = CommandCode(0x54) // Request failover logs - UPR_STREAMREQ = CommandCode(0x53) // Stream request from consumer to producer - UPR_STREAMEND = CommandCode(0x55) // Sent by producer when it has no more messages to stream - UPR_SNAPSHOT = CommandCode(0x56) // Start of a new snapshot - UPR_MUTATION = CommandCode(0x57) // Key mutation - UPR_DELETION = CommandCode(0x58) // Key deletion - UPR_EXPIRATION = CommandCode(0x59) // Key expiration - UPR_FLUSH = CommandCode(0x5a) // Delete all the data for a vbucket - UPR_NOOP = CommandCode(0x5c) // UPR NOOP - UPR_BUFFERACK = CommandCode(0x5d) // UPR Buffer Acknowledgement - UPR_CONTROL = CommandCode(0x5e) // Set flow control params - - SELECT_BUCKET = CommandCode(0x89) // Select bucket - - OBSERVE_SEQNO = CommandCode(0x91) // Sequence Number based Observe - OBSERVE = CommandCode(0x92) - - GET_META = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc - GET_COLLECTIONS_MANIFEST = CommandCode(0xba) // Get entire collections manifest. - COLLECTIONS_GET_CID = CommandCode(0xbb) // Get collection id. - SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs - SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta. - - DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred - DCP_SEQNO_ADV = CommandCode(0x64) // Sent when the vb seqno has advanced due to an unsubscribed event - DCP_OSO_SNAPSHOT = CommandCode(0x65) // Marks the begin and end of out-of-sequence-number stream -) - -// command codes that are counted toward DCP control buffer -// when DCP clients receive DCP messages with these command codes, they need to provide acknowledgement -var BufferedCommandCodeMap = map[CommandCode]bool{ - SET_VBUCKET: true, - UPR_STREAMEND: true, - UPR_SNAPSHOT: true, - UPR_MUTATION: true, - UPR_DELETION: true, - UPR_EXPIRATION: true, - DCP_SYSTEM_EVENT: true, - DCP_SEQNO_ADV: true, - DCP_OSO_SNAPSHOT: true, -} - -// Status field for memcached response. -type Status uint16 - -// Matches with protocol_binary.h as source of truth -const ( - SUCCESS = Status(0x00) - KEY_ENOENT = Status(0x01) - KEY_EEXISTS = Status(0x02) - E2BIG = Status(0x03) - EINVAL = Status(0x04) - NOT_STORED = Status(0x05) - DELTA_BADVAL = Status(0x06) - NOT_MY_VBUCKET = Status(0x07) - NO_BUCKET = Status(0x08) - LOCKED = Status(0x09) - AUTH_STALE = Status(0x1f) - AUTH_ERROR = Status(0x20) - AUTH_CONTINUE = Status(0x21) - ERANGE = Status(0x22) - ROLLBACK = Status(0x23) - EACCESS = Status(0x24) - NOT_INITIALIZED = Status(0x25) - UNKNOWN_COMMAND = Status(0x81) - ENOMEM = Status(0x82) - NOT_SUPPORTED = Status(0x83) - EINTERNAL = Status(0x84) - EBUSY = Status(0x85) - TMPFAIL = Status(0x86) - UNKNOWN_COLLECTION = Status(0x88) - - SYNC_WRITE_IN_PROGRESS = Status(0xa2) - SYNC_WRITE_AMBIGUOUS = Status(0xa3) - - // SUBDOC - SUBDOC_PATH_NOT_FOUND = Status(0xc0) - SUBDOC_BAD_MULTI = Status(0xcc) - SUBDOC_MULTI_PATH_FAILURE_DELETED = Status(0xd3) - - // Not a Memcached status - UNKNOWN_STATUS = Status(0xffff) -) - -// for log redaction -const ( - UdTagBegin = "<ud>" - UdTagEnd = "</ud>" -) - -var isFatal = map[Status]bool{ - DELTA_BADVAL: true, - NO_BUCKET: true, - AUTH_STALE: true, - AUTH_ERROR: true, - ERANGE: true, - ROLLBACK: true, - EACCESS: true, - ENOMEM: true, - NOT_SUPPORTED: true, - - // consider statuses coming from outside couchbase (eg OS errors) as fatal for the connection - // as there might be unread data left over on the wire - UNKNOWN_STATUS: true, -} - -// the producer/consumer bit in dcp flags -var DCP_PRODUCER uint32 = 0x01 - -// the include XATTRS bit in dcp flags -var DCP_OPEN_INCLUDE_XATTRS uint32 = 0x04 - -// the include deletion time bit in dcp flags -var DCP_OPEN_INCLUDE_DELETE_TIMES uint32 = 0x20 - -// Datatype to Include XATTRS in SUBDOC GET -var SUBDOC_FLAG_XATTR uint8 = 0x04 - -// MCItem is an internal representation of an item. -type MCItem struct { - Cas uint64 - Flags, Expiration uint32 - Data []byte -} - -// Number of bytes in a binary protocol header. -const HDR_LEN = 24 - -// Mapping of CommandCode -> name of command (not exhaustive) -var CommandNames map[CommandCode]string - -// StatusNames human readable names for memcached response. -var StatusNames map[Status]string - -func init() { - CommandNames = make(map[CommandCode]string) - CommandNames[GET] = "GET" - CommandNames[SET] = "SET" - CommandNames[ADD] = "ADD" - CommandNames[REPLACE] = "REPLACE" - CommandNames[DELETE] = "DELETE" - CommandNames[INCREMENT] = "INCREMENT" - CommandNames[DECREMENT] = "DECREMENT" - CommandNames[QUIT] = "QUIT" - CommandNames[FLUSH] = "FLUSH" - CommandNames[GETQ] = "GETQ" - CommandNames[NOOP] = "NOOP" - CommandNames[VERSION] = "VERSION" - CommandNames[GETK] = "GETK" - CommandNames[GETKQ] = "GETKQ" - CommandNames[APPEND] = "APPEND" - CommandNames[PREPEND] = "PREPEND" - CommandNames[STAT] = "STAT" - CommandNames[SETQ] = "SETQ" - CommandNames[ADDQ] = "ADDQ" - CommandNames[REPLACEQ] = "REPLACEQ" - CommandNames[DELETEQ] = "DELETEQ" - CommandNames[INCREMENTQ] = "INCREMENTQ" - CommandNames[DECREMENTQ] = "DECREMENTQ" - CommandNames[QUITQ] = "QUITQ" - CommandNames[FLUSHQ] = "FLUSHQ" - CommandNames[APPENDQ] = "APPENDQ" - CommandNames[PREPENDQ] = "PREPENDQ" - CommandNames[RGET] = "RGET" - CommandNames[RSET] = "RSET" - CommandNames[RSETQ] = "RSETQ" - CommandNames[RAPPEND] = "RAPPEND" - CommandNames[RAPPENDQ] = "RAPPENDQ" - CommandNames[RPREPEND] = "RPREPEND" - CommandNames[RPREPENDQ] = "RPREPENDQ" - CommandNames[RDELETE] = "RDELETE" - CommandNames[RDELETEQ] = "RDELETEQ" - CommandNames[RINCR] = "RINCR" - CommandNames[RINCRQ] = "RINCRQ" - CommandNames[RDECR] = "RDECR" - CommandNames[RDECRQ] = "RDECRQ" - - CommandNames[SASL_LIST_MECHS] = "SASL_LIST_MECHS" - CommandNames[SASL_AUTH] = "SASL_AUTH" - CommandNames[SASL_STEP] = "SASL_STEP" - - CommandNames[TAP_CONNECT] = "TAP_CONNECT" - CommandNames[TAP_MUTATION] = "TAP_MUTATION" - CommandNames[TAP_DELETE] = "TAP_DELETE" - CommandNames[TAP_FLUSH] = "TAP_FLUSH" - CommandNames[TAP_OPAQUE] = "TAP_OPAQUE" - CommandNames[TAP_VBUCKET_SET] = "TAP_VBUCKET_SET" - CommandNames[TAP_CHECKPOINT_START] = "TAP_CHECKPOINT_START" - CommandNames[TAP_CHECKPOINT_END] = "TAP_CHECKPOINT_END" - - CommandNames[UPR_OPEN] = "UPR_OPEN" - CommandNames[UPR_ADDSTREAM] = "UPR_ADDSTREAM" - CommandNames[UPR_CLOSESTREAM] = "UPR_CLOSESTREAM" - CommandNames[UPR_FAILOVERLOG] = "UPR_FAILOVERLOG" - CommandNames[UPR_STREAMREQ] = "UPR_STREAMREQ" - CommandNames[UPR_STREAMEND] = "UPR_STREAMEND" - CommandNames[UPR_SNAPSHOT] = "UPR_SNAPSHOT" - CommandNames[UPR_MUTATION] = "UPR_MUTATION" - CommandNames[UPR_DELETION] = "UPR_DELETION" - CommandNames[UPR_EXPIRATION] = "UPR_EXPIRATION" - CommandNames[UPR_FLUSH] = "UPR_FLUSH" - CommandNames[UPR_NOOP] = "UPR_NOOP" - CommandNames[UPR_BUFFERACK] = "UPR_BUFFERACK" - CommandNames[UPR_CONTROL] = "UPR_CONTROL" - CommandNames[SUBDOC_GET] = "SUBDOC_GET" - CommandNames[SUBDOC_MULTI_LOOKUP] = "SUBDOC_MULTI_LOOKUP" - CommandNames[GET_COLLECTIONS_MANIFEST] = "GET_COLLECTIONS_MANIFEST" - CommandNames[COLLECTIONS_GET_CID] = "COLLECTIONS_GET_CID" - CommandNames[DCP_SYSTEM_EVENT] = "DCP_SYSTEM_EVENT" - CommandNames[DCP_SEQNO_ADV] = "DCP_SEQNO_ADV" - - StatusNames = make(map[Status]string) - StatusNames[SUCCESS] = "SUCCESS" - StatusNames[KEY_ENOENT] = "KEY_ENOENT" - StatusNames[KEY_EEXISTS] = "KEY_EEXISTS" - StatusNames[E2BIG] = "E2BIG" - StatusNames[EINVAL] = "EINVAL" - StatusNames[NOT_STORED] = "NOT_STORED" - StatusNames[DELTA_BADVAL] = "DELTA_BADVAL" - StatusNames[NOT_MY_VBUCKET] = "NOT_MY_VBUCKET" - StatusNames[NO_BUCKET] = "NO_BUCKET" - StatusNames[AUTH_STALE] = "AUTH_STALE" - StatusNames[AUTH_ERROR] = "AUTH_ERROR" - StatusNames[AUTH_CONTINUE] = "AUTH_CONTINUE" - StatusNames[ERANGE] = "ERANGE" - StatusNames[ROLLBACK] = "ROLLBACK" - StatusNames[EACCESS] = "EACCESS" - StatusNames[NOT_INITIALIZED] = "NOT_INITIALIZED" - StatusNames[UNKNOWN_COMMAND] = "UNKNOWN_COMMAND" - StatusNames[ENOMEM] = "ENOMEM" - StatusNames[NOT_SUPPORTED] = "NOT_SUPPORTED" - StatusNames[EINTERNAL] = "EINTERNAL" - StatusNames[EBUSY] = "EBUSY" - StatusNames[TMPFAIL] = "TMPFAIL" - StatusNames[UNKNOWN_COLLECTION] = "UNKNOWN_COLLECTION" - StatusNames[SUBDOC_PATH_NOT_FOUND] = "SUBDOC_PATH_NOT_FOUND" - StatusNames[SUBDOC_BAD_MULTI] = "SUBDOC_BAD_MULTI" - -} - -// String an op code. -func (o CommandCode) String() (rv string) { - rv = CommandNames[o] - if rv == "" { - rv = fmt.Sprintf("0x%02x", int(o)) - } - return rv -} - -// String an op code. -func (s Status) String() (rv string) { - rv = StatusNames[s] - if rv == "" { - rv = fmt.Sprintf("0x%02x", int(s)) - } - return rv -} - -// IsQuiet will return true if a command is a "quiet" command. -func (o CommandCode) IsQuiet() bool { - switch o { - case GETQ, - GETKQ, - SETQ, - ADDQ, - REPLACEQ, - DELETEQ, - INCREMENTQ, - DECREMENTQ, - QUITQ, - FLUSHQ, - APPENDQ, - PREPENDQ, - RSETQ, - RAPPENDQ, - RPREPENDQ, - RDELETEQ, - RINCRQ, - RDECRQ: - return true - } - return false -} diff --git a/vendor/github.com/couchbase/gomemcached/mc_req.go b/vendor/github.com/couchbase/gomemcached/mc_req.go deleted file mode 100644 index 9fbfde3457..0000000000 --- a/vendor/github.com/couchbase/gomemcached/mc_req.go +++ /dev/null @@ -1,656 +0,0 @@ -package gomemcached - -import ( - "encoding/binary" - "fmt" - "io" -) - -// The maximum reasonable body length to expect. -// Anything larger than this will result in an error. -// The current limit, 20MB, is the size limit supported by ep-engine. -var MaxBodyLen = int(20 * 1024 * 1024) - -const _BUFLEN = 256 - -// MCRequest is memcached Request -type MCRequest struct { - // The command being issued - Opcode CommandCode - // The CAS (if applicable, or 0) - Cas uint64 - // An opaque value to be returned with this request - Opaque uint32 - // The vbucket to which this command belongs - VBucket uint16 - // Command extras, key, and body - Extras, Key, Body, ExtMeta []byte - // Datatype identifier - DataType uint8 - // len() calls are expensive - cache this in case for collection - Keylen int - // Collection id for collection based operations - CollId [binary.MaxVarintLen32]byte - // Length of collection id - CollIdLen int - // Impersonate user name - could go in FramingExtras, but for efficiency - Username [MAX_USER_LEN]byte - // Length of Impersonate user name - UserLen int - // Flexible Framing Extras - FramingExtras []FrameInfo - // Stored length of incoming framing extras - FramingElen int -} - -// Size gives the number of bytes this request requires. -func (req *MCRequest) HdrSize() int { - rv := HDR_LEN + len(req.Extras) + req.CollIdLen + req.FramingElen + len(req.Key) - if req.UserLen != 0 { - rv += req.UserLen + 1 - - // half byte shifting required - if req.UserLen > FAST_USER_LEN { - rv++ - } - } - for _, e := range req.FramingExtras { - rv += e.ObjLen + 1 - - // half byte shifting required - if e.ObjLen > FAST_USER_LEN { - rv++ - } - } - return rv -} - -func (req *MCRequest) Size() int { - return req.HdrSize() + len(req.Body) + len(req.ExtMeta) -} - -// A debugging string representation of this request -func (req MCRequest) String() string { - return fmt.Sprintf("{MCRequest opcode=%s, bodylen=%d, key='%s'}", - req.Opcode, len(req.Body), req.Key) -} - -func (req *MCRequest) fillRegularHeaderBytes(data []byte) int { - // Byte/ 0 | 1 | 2 | 3 | - // / | | | | - // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| - // +---------------+---------------+---------------+---------------+ - // 0| Magic | Opcode | Key length | - // +---------------+---------------+---------------+---------------+ - // 4| Extras length | Data type | vbucket id | - // +---------------+---------------+---------------+---------------+ - // 8| Total body length | - // +---------------+---------------+---------------+---------------+ - // 12| Opaque | - // +---------------+---------------+---------------+---------------+ - // 16| CAS | - // | | - // +---------------+---------------+---------------+---------------+ - // Total 24 bytes - - pos := 0 - data[pos] = REQ_MAGIC - pos++ - data[pos] = byte(req.Opcode) - pos++ - binary.BigEndian.PutUint16(data[pos:pos+2], - uint16(req.CollIdLen+len(req.Key))) - pos += 2 - - // 4 - data[pos] = byte(len(req.Extras)) - pos++ - // Data type - if req.DataType != 0 { - data[pos] = byte(req.DataType) - } - pos++ - binary.BigEndian.PutUint16(data[pos:pos+2], req.VBucket) - pos += 2 - - // 8 - binary.BigEndian.PutUint32(data[pos:pos+4], - uint32(len(req.Body)+req.CollIdLen+len(req.Key)+len(req.Extras)+len(req.ExtMeta))) - pos += 4 - - // 12 - binary.BigEndian.PutUint32(data[pos:pos+4], req.Opaque) - pos += 4 - - // 16 - if req.Cas != 0 { - binary.BigEndian.PutUint64(data[pos:pos+8], req.Cas) - } - pos += 8 - - // 24 - extras - if len(req.Extras) > 0 { - copy(data[pos:pos+len(req.Extras)], req.Extras) - pos += len(req.Extras) - } - - if len(req.Key) > 0 { - if req.CollIdLen > 0 { - copy(data[pos:pos+req.CollIdLen], req.CollId[:]) - pos += req.CollIdLen - } - copy(data[pos:pos+len(req.Key)], req.Key) - pos += len(req.Key) - } - - return pos -} - -func (req *MCRequest) fillFastFlexHeaderBytes(data []byte) int { - // Byte/ 0 | 1 | 2 | 3 | - // / | | | | - // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| - // +---------------+---------------+---------------+---------------+ - // 0| Magic | Opcode | Framing extras| Key Length | - // +---------------+---------------+---------------+---------------+ - // 4| Extras length | Data type | vbucket id | - // +---------------+---------------+---------------+---------------+ - // 8| Total body length | - // +---------------+---------------+---------------+---------------+ - // 12| Opaque | - // +---------------+---------------+---------------+---------------+ - // 16| CAS | - // | | - // +---------------+---------------+---------------+---------------+ - // Total 24 bytes - - pos := 0 - data[pos] = FLEX_MAGIC - pos++ - data[pos] = byte(req.Opcode) - pos++ - data[pos] = byte(req.UserLen + 1) - pos++ - data[pos] = byte(len(req.Key) + req.CollIdLen) - pos++ - - // 4 - data[pos] = byte(len(req.Extras)) - pos++ - // Data type - if req.DataType != 0 { - data[pos] = byte(req.DataType) - } - pos++ - binary.BigEndian.PutUint16(data[pos:pos+2], req.VBucket) - pos += 2 - - // 8 - binary.BigEndian.PutUint32(data[pos:pos+4], - uint32(len(req.Body)+req.CollIdLen+len(req.Key)+(req.UserLen+1)+len(req.Extras)+len(req.ExtMeta))) - pos += 4 - - // 12 - binary.BigEndian.PutUint32(data[pos:pos+4], req.Opaque) - pos += 4 - - // 16 - if req.Cas != 0 { - binary.BigEndian.PutUint64(data[pos:pos+8], req.Cas) - } - pos += 8 - - // 24 Flexible extras - if req.UserLen > 0 { - data[pos] = byte((uint8(FrameImpersonate) << 4) | uint8(req.UserLen)) - pos++ - copy(data[pos:pos+req.UserLen], req.Username[:req.UserLen]) - pos += req.UserLen - } - - if len(req.Extras) > 0 { - copy(data[pos:pos+len(req.Extras)], req.Extras) - pos += len(req.Extras) - } - - if len(req.Key) > 0 { - if req.CollIdLen > 0 { - copy(data[pos:pos+req.CollIdLen], req.CollId[:]) - pos += req.CollIdLen - } - copy(data[pos:pos+len(req.Key)], req.Key) - pos += len(req.Key) - } - - return pos -} - -// Returns pos and if trailing by half byte -func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) { - - // Byte/ 0 | 1 | 2 | 3 | - // / | | | | - // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| - // +---------------+---------------+---------------+---------------+ - // 0| Magic (0x08) | Opcode | Framing extras| Key Length | - // +---------------+---------------+---------------+---------------+ - // 4| Extras length | Data type | vbucket id | - // +---------------+---------------+---------------+---------------+ - // 8| Total body length | - // +---------------+---------------+---------------+---------------+ - // 12| Opaque | - // +---------------+---------------+---------------+---------------+ - // 16| CAS | - // | | - // +---------------+---------------+---------------+---------------+ - // Total 24 bytes - - data[0] = FLEX_MAGIC - data[1] = byte(req.Opcode) - data[3] = byte(len(req.Key) + req.CollIdLen) - elen := len(req.Extras) - data[4] = byte(elen) - if req.DataType != 0 { - data[5] = byte(req.DataType) - } - binary.BigEndian.PutUint16(data[6:8], req.VBucket) - binary.BigEndian.PutUint32(data[12:16], req.Opaque) - if req.Cas != 0 { - binary.BigEndian.PutUint64(data[16:24], req.Cas) - } - pos := HDR_LEN - - // Add framing infos - var framingExtras []byte - var outputBytes []byte - var mergeModeSrc []byte - var frameBytes int - var halfByteMode bool - var mergeMode bool - for _, frameInfo := range req.FramingExtras { - if !mergeMode { - outputBytes, halfByteMode = frameInfo.Bytes() - if !halfByteMode { - framingExtras = append(framingExtras, outputBytes...) - frameBytes += len(outputBytes) - } else { - mergeMode = true - mergeModeSrc = outputBytes - } - } else { - outputBytes, halfByteMode = frameInfo.Bytes() - outputBytes := ShiftByteSliceRight4Bits(outputBytes) - if halfByteMode { - // Previous halfbyte merge with this halfbyte will result in a complete byte - mergeMode = false - outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes) - framingExtras = append(framingExtras, outputBytes...) - frameBytes += len(outputBytes) - } else { - // Merge half byte with a non-half byte will result in a combined half-byte that will - // become the source for the next iteration - mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes) - } - } - } - - // fast impersonate Flexible Extra - if req.UserLen > 0 { - if !mergeMode { - outputBytes, halfByteMode = obj2Bytes(FrameImpersonate, req.UserLen, req.Username[:req.UserLen]) - if !halfByteMode { - framingExtras = append(framingExtras, outputBytes...) - frameBytes += len(outputBytes) - } else { - mergeMode = true - mergeModeSrc = outputBytes - } - } else { - outputBytes, halfByteMode = obj2Bytes(FrameImpersonate, req.UserLen, req.Username[:req.UserLen]) - outputBytes := ShiftByteSliceRight4Bits(outputBytes) - if halfByteMode { - // Previous halfbyte merge with this halfbyte will result in a complete byte - mergeMode = false - outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes) - framingExtras = append(framingExtras, outputBytes...) - frameBytes += len(outputBytes) - } else { - // Merge half byte with a non-half byte will result in a combined half-byte that will - // become the source for the next iteration - mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes) - } - } - } - - if mergeMode { - // Commit the temporary merge area into framingExtras - framingExtras = append(framingExtras, mergeModeSrc...) - frameBytes += len(mergeModeSrc) - } - - req.FramingElen = frameBytes - - // these have to be set after we have worked out the size of the Flexible Extras - data[2] = byte(req.FramingElen) - binary.BigEndian.PutUint32(data[8:12], - uint32(len(req.Body)+len(req.Key)+req.CollIdLen+elen+len(req.ExtMeta)+req.FramingElen)) - copy(data[pos:pos+frameBytes], framingExtras) - - pos += frameBytes - - // Add Extras - if len(req.Extras) > 0 { - if mergeMode { - outputBytes = ShiftByteSliceRight4Bits(req.Extras) - data = Merge2HalfByteSlices(data, outputBytes) - } else { - copy(data[pos:pos+elen], req.Extras) - } - pos += elen - } - - // Add keys - if len(req.Key) > 0 { - if mergeMode { - var key []byte - var keylen int - - if req.CollIdLen == 0 { - key = req.Key - keylen = len(req.Key) - } else { - key = append(key, req.CollId[:]...) - key = append(key, req.Key...) - keylen = len(req.Key) + req.CollIdLen - } - - outputBytes = ShiftByteSliceRight4Bits(key) - data = Merge2HalfByteSlices(data, outputBytes) - pos += keylen - } else { - if req.CollIdLen > 0 { - copy(data[pos:pos+req.CollIdLen], req.CollId[:]) - pos += req.CollIdLen - } - copy(data[pos:pos+len(req.Key)], req.Key) - pos += len(req.Key) - } - } - - return pos, mergeMode -} - -func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool) { - if len(req.FramingExtras) > 0 || req.UserLen > FAST_USER_LEN { - return req.fillFlexHeaderBytes(data) - } else if req.UserLen > 0 { - return req.fillFastFlexHeaderBytes(data), false - } else { - return req.fillRegularHeaderBytes(data), false - } -} - -// HeaderBytes will return the wire representation of the request header -// (with the extras and key). -func (req *MCRequest) HeaderBytes() []byte { - data := make([]byte, req.HdrSize()) - - req.FillHeaderBytes(data) - - return data -} - -// Bytes will return the wire representation of this request. -func (req *MCRequest) Bytes() []byte { - data := make([]byte, req.Size()) - req.bytes(data) - return data -} - -func (req *MCRequest) bytes(data []byte) { - pos, halfByteMode := req.FillHeaderBytes(data) - // TODO - the halfByteMode should be revisited for a more efficient - // way of doing things - - if len(req.Body) > 0 { - if halfByteMode { - shifted := ShiftByteSliceRight4Bits(req.Body) - data = Merge2HalfByteSlices(data, shifted) - } else { - copy(data[pos:pos+len(req.Body)], req.Body) - } - } - - if len(req.ExtMeta) > 0 { - if halfByteMode { - shifted := ShiftByteSliceRight4Bits(req.ExtMeta) - data = Merge2HalfByteSlices(data, shifted) - } else { - copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta) - } - } -} - -// Transmit will send this request message across a writer. -func (req *MCRequest) Transmit(w io.Writer) (n int, err error) { - l := req.Size() - if l < _BUFLEN { - data := make([]byte, l) - req.bytes(data) - n, err = w.Write(data) - } else { - data := make([]byte, req.HdrSize()) - req.FillHeaderBytes(data) - n, err = w.Write(data) - if err == nil { - m := 0 - m, err = w.Write(req.Body) - n += m - } - } - return -} - -func (req *MCRequest) receiveHeaderCommon(hdrBytes []byte) (elen, totalBodyLen int) { - elen = int(hdrBytes[4]) - // Data type at 5 - req.DataType = uint8(hdrBytes[5]) - - req.Opcode = CommandCode(hdrBytes[1]) - // Vbucket at 6:7 - req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:]) - totalBodyLen = int(binary.BigEndian.Uint32(hdrBytes[8:])) - - req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:]) - req.Cas = binary.BigEndian.Uint64(hdrBytes[16:]) - return -} - -func (req *MCRequest) receiveRegHeader(hdrBytes []byte) (elen, totalBodyLen int) { - elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes) - req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:])) - return -} - -func (req *MCRequest) receiveFlexibleFramingHeader(hdrBytes []byte) (elen, totalBodyLen, framingElen int) { - elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes) - - // For flexible framing header, key length is a single byte at byte index 3 - req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]) & 0x0ff) - // Flexible framing lengh is a single byte at index 2 - framingElen = int(binary.BigEndian.Uint16(hdrBytes[2:]) >> 8) - req.FramingElen = framingElen - return -} - -func (req *MCRequest) populateRegularBody(r io.Reader, totalBodyLen, elen int) (int, error) { - var m int - var err error - if totalBodyLen > 0 { - buf := make([]byte, totalBodyLen) - m, err = io.ReadFull(r, buf) - if err == nil { - if req.Opcode >= TAP_MUTATION && - req.Opcode <= TAP_CHECKPOINT_END && - len(buf) > 1 { - // In these commands there is "engine private" - // data at the end of the extras. The first 2 - // bytes of extra data give its length. - elen += int(binary.BigEndian.Uint16(buf)) - } - - req.Extras = buf[0:elen] - req.Key = buf[elen : req.Keylen+elen] - - // get the length of extended metadata - extMetaLen := 0 - if elen > 29 { - extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30])) - } - - bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen - if bodyLen > MaxBodyLen { - return m, fmt.Errorf("%d is too big (max %d)", - bodyLen, MaxBodyLen) - } - - req.Body = buf[req.Keylen+elen : req.Keylen+elen+bodyLen] - req.ExtMeta = buf[req.Keylen+elen+bodyLen:] - } - } - return m, err -} - -func (req *MCRequest) populateFlexBody(r io.Reader, totalBodyLen, elen, framingElen int) (int, error) { - var m int - var err error - if totalBodyLen > 0 { - buf := make([]byte, totalBodyLen) - m, err = io.ReadFull(r, buf) - if err != nil { - return m, err - } - err = req.populateFlexBodyInternal(buf, totalBodyLen, elen, framingElen) - } - return m, err -} - -func (req *MCRequest) populateFlexBodyInternal(buf []byte, totalBodyLen, elen, framingElen int) error { - var halfByteOffset bool - var err error - if framingElen > 0 { - var objs []FrameInfo - objs, err, halfByteOffset = parseFrameInfoObjects(buf, framingElen) - if err != nil { - return err - } - req.FramingExtras = objs - } - - err = req.populateFlexBodyAfterFrames(buf, totalBodyLen, elen, framingElen, halfByteOffset) - if err != nil { - return err - } - - return nil -} - -func (req *MCRequest) populateFlexBodyAfterFrames(buf []byte, totalBodyLen, elen, framingElen int, halfByteOffset bool) error { - var idxCursor int = framingElen - if req.Opcode >= TAP_MUTATION && req.Opcode <= TAP_CHECKPOINT_END && len(buf[idxCursor:]) > 1 { - // In these commands there is "engine private" - // data at the end of the extras. The first 2 - // bytes of extra data give its length. - if !halfByteOffset { - elen += int(binary.BigEndian.Uint16(buf[idxCursor:])) - } else { - // 0 1 2 3 4 .... 19 20 21 22 ... 32 - // ^-----^ ^-------^ ^------------^ - // offset data do not care - var buffer uint32 = binary.BigEndian.Uint32(buf[idxCursor:]) - buffer &= 0xffff000 - elen += int(buffer >> 12) - } - } - - // Get the extras - if !halfByteOffset { - req.Extras = buf[idxCursor : idxCursor+elen] - } else { - preShift := buf[idxCursor : idxCursor+elen+1] - req.Extras = ShiftByteSliceLeft4Bits(preShift) - } - idxCursor += elen - - // Get the Key - if !halfByteOffset { - req.Key = buf[idxCursor : idxCursor+req.Keylen] - } else { - preShift := buf[idxCursor : idxCursor+req.Keylen+1] - req.Key = ShiftByteSliceLeft4Bits(preShift) - } - idxCursor += req.Keylen - - // get the length of extended metadata - extMetaLen := 0 - if elen > 29 { - extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30])) - } - idxCursor += extMetaLen - - bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen - framingElen - if bodyLen > MaxBodyLen { - return fmt.Errorf("%d is too big (max %d)", - bodyLen, MaxBodyLen) - } - - if !halfByteOffset { - req.Body = buf[idxCursor : idxCursor+bodyLen] - idxCursor += bodyLen - } else { - preShift := buf[idxCursor : idxCursor+bodyLen+1] - req.Body = ShiftByteSliceLeft4Bits(preShift) - idxCursor += bodyLen - } - - if extMetaLen > 0 { - if !halfByteOffset { - req.ExtMeta = buf[idxCursor:] - } else { - preShift := buf[idxCursor:] - req.ExtMeta = ShiftByteSliceLeft4Bits(preShift) - } - } - - return nil -} - -// Receive will fill this MCRequest with the data from a reader. -func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) { - if len(hdrBytes) < HDR_LEN { - hdrBytes = []byte{ - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0} - } - n, err := io.ReadFull(r, hdrBytes) - if err != nil { - fmt.Printf("Err %v\n", err) - return n, err - } - - switch hdrBytes[0] { - case RES_MAGIC: - fallthrough - case REQ_MAGIC: - elen, totalBodyLen := req.receiveRegHeader(hdrBytes) - bodyRead, err := req.populateRegularBody(r, totalBodyLen, elen) - return n + bodyRead, err - case FLEX_MAGIC: - elen, totalBodyLen, framingElen := req.receiveFlexibleFramingHeader(hdrBytes) - bodyRead, err := req.populateFlexBody(r, totalBodyLen, elen, framingElen) - return n + bodyRead, err - default: - return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) - } -} diff --git a/vendor/github.com/couchbase/gomemcached/mc_res.go b/vendor/github.com/couchbase/gomemcached/mc_res.go deleted file mode 100644 index 1e89020de2..0000000000 --- a/vendor/github.com/couchbase/gomemcached/mc_res.go +++ /dev/null @@ -1,280 +0,0 @@ -package gomemcached - -import ( - "encoding/binary" - "fmt" - "io" - "sync" -) - -// MCResponse is memcached response -type MCResponse struct { - // The command opcode of the command that sent the request - Opcode CommandCode - // The status of the response - Status Status - // The opaque sent in the request - Opaque uint32 - // The CAS identifier (if applicable) - Cas uint64 - // Extras, key, and body for this response - Extras, Key, Body []byte - // If true, this represents a fatal condition and we should hang up - Fatal bool - // Datatype identifier - DataType uint8 -} - -// A debugging string representation of this response -func (res MCResponse) String() string { - return fmt.Sprintf("{MCResponse status=%v keylen=%d, extralen=%d, bodylen=%d}", - res.Status, len(res.Key), len(res.Extras), len(res.Body)) -} - -// Response as an error. -func (res *MCResponse) Error() string { - return fmt.Sprintf("MCResponse status=%v, opcode=%v, opaque=%v, msg: %s", - res.Status, res.Opcode, res.Opaque, string(res.Body)) -} - -func errStatus(e error) Status { - status := UNKNOWN_STATUS - if res, ok := e.(*MCResponse); ok { - status = res.Status - } - return status -} - -// IsNotFound is true if this error represents a "not found" response. -func IsNotFound(e error) bool { - return errStatus(e) == KEY_ENOENT -} - -// IsFatal is false if this error isn't believed to be fatal to a connection. -func IsFatal(e error) bool { - if e == nil { - return false - } - _, ok := isFatal[errStatus(e)] - if ok { - return true - } - return false -} - -// Size is number of bytes this response consumes on the wire. -func (res *MCResponse) Size() int { - return HDR_LEN + len(res.Extras) + len(res.Key) + len(res.Body) -} - -func (res *MCResponse) fillHeaderBytes(data []byte) int { - pos := 0 - data[pos] = RES_MAGIC - pos++ - data[pos] = byte(res.Opcode) - pos++ - binary.BigEndian.PutUint16(data[pos:pos+2], - uint16(len(res.Key))) - pos += 2 - - // 4 - data[pos] = byte(len(res.Extras)) - pos++ - // Data type - if res.DataType != 0 { - data[pos] = byte(res.DataType) - } else { - data[pos] = 0 - } - pos++ - binary.BigEndian.PutUint16(data[pos:pos+2], uint16(res.Status)) - pos += 2 - - // 8 - binary.BigEndian.PutUint32(data[pos:pos+4], - uint32(len(res.Body)+len(res.Key)+len(res.Extras))) - pos += 4 - - // 12 - binary.BigEndian.PutUint32(data[pos:pos+4], res.Opaque) - pos += 4 - - // 16 - binary.BigEndian.PutUint64(data[pos:pos+8], res.Cas) - pos += 8 - - if len(res.Extras) > 0 { - copy(data[pos:pos+len(res.Extras)], res.Extras) - pos += len(res.Extras) - } - - if len(res.Key) > 0 { - copy(data[pos:pos+len(res.Key)], res.Key) - pos += len(res.Key) - } - - return pos -} - -// HeaderBytes will get just the header bytes for this response. -func (res *MCResponse) HeaderBytes() []byte { - data := make([]byte, HDR_LEN+len(res.Extras)+len(res.Key)) - - res.fillHeaderBytes(data) - - return data -} - -// Bytes will return the actual bytes transmitted for this response. -func (res *MCResponse) Bytes() []byte { - data := make([]byte, res.Size()) - - pos := res.fillHeaderBytes(data) - - copy(data[pos:pos+len(res.Body)], res.Body) - - return data -} - -// Transmit will send this response message across a writer. -func (res *MCResponse) Transmit(w io.Writer) (n int, err error) { - if len(res.Body) < 128 { - n, err = w.Write(res.Bytes()) - } else { - n, err = w.Write(res.HeaderBytes()) - if err == nil { - m := 0 - m, err = w.Write(res.Body) - m += n - } - } - return -} - -// Receive will fill this MCResponse with the data from this reader. -func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) { - return res.ReceiveWithBuf(r, hdrBytes, nil) -} - -// ReceiveWithBuf takes an optional pre-allocated []byte buf which -// will be used if its capacity is large enough, otherwise a new -// []byte slice is allocated. -func (res *MCResponse) ReceiveWithBuf(r io.Reader, hdrBytes, buf []byte) (n int, err error) { - if len(hdrBytes) < HDR_LEN { - hdrBytes = []byte{ - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0} - } - n, err = io.ReadFull(r, hdrBytes) - if err != nil { - return n, err - } - - if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC { - return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) - } - - klen := int(binary.BigEndian.Uint16(hdrBytes[2:4])) - elen := int(hdrBytes[4]) - - res.Opcode = CommandCode(hdrBytes[1]) - res.DataType = uint8(hdrBytes[5]) - res.Status = Status(binary.BigEndian.Uint16(hdrBytes[6:8])) - res.Opaque = binary.BigEndian.Uint32(hdrBytes[12:16]) - res.Cas = binary.BigEndian.Uint64(hdrBytes[16:24]) - - bodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:12])) - (klen + elen) - - //defer function to debug the panic seen with MB-15557 - defer func() { - if e := recover(); e != nil { - err = fmt.Errorf(`Panic in Receive. Response %v \n - key len %v extra len %v bodylen %v`, res, klen, elen, bodyLen) - } - }() - - bufNeed := klen + elen + bodyLen - if buf != nil && cap(buf) >= bufNeed { - buf = buf[0:bufNeed] - } else { - buf = make([]byte, bufNeed) - } - - m, err := io.ReadFull(r, buf) - if err == nil { - res.Extras = buf[0:elen] - res.Key = buf[elen : klen+elen] - res.Body = buf[klen+elen:] - } - - return n + m, err -} - -type MCResponsePool struct { - pool *sync.Pool -} - -func NewMCResponsePool() *MCResponsePool { - rv := &MCResponsePool{ - pool: &sync.Pool{ - New: func() interface{} { - return &MCResponse{} - }, - }, - } - - return rv -} - -func (this *MCResponsePool) Get() *MCResponse { - return this.pool.Get().(*MCResponse) -} - -func (this *MCResponsePool) Put(r *MCResponse) { - if r == nil { - return - } - - r.Extras = nil - r.Key = nil - r.Body = nil - r.Fatal = false - - this.pool.Put(r) -} - -type StringMCResponsePool struct { - pool *sync.Pool - size int -} - -func NewStringMCResponsePool(size int) *StringMCResponsePool { - rv := &StringMCResponsePool{ - pool: &sync.Pool{ - New: func() interface{} { - return make(map[string]*MCResponse, size) - }, - }, - size: size, - } - - return rv -} - -func (this *StringMCResponsePool) Get() map[string]*MCResponse { - return this.pool.Get().(map[string]*MCResponse) -} - -func (this *StringMCResponsePool) Put(m map[string]*MCResponse) { - if m == nil || len(m) > 2*this.size { - return - } - - for k := range m { - m[k] = nil - delete(m, k) - } - - this.pool.Put(m) -} diff --git a/vendor/github.com/couchbase/gomemcached/tap.go b/vendor/github.com/couchbase/gomemcached/tap.go deleted file mode 100644 index e48623281b..0000000000 --- a/vendor/github.com/couchbase/gomemcached/tap.go +++ /dev/null @@ -1,168 +0,0 @@ -package gomemcached - -import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "io/ioutil" - "strings" -) - -type TapConnectFlag uint32 - -// Tap connect option flags -const ( - BACKFILL = TapConnectFlag(0x01) - DUMP = TapConnectFlag(0x02) - LIST_VBUCKETS = TapConnectFlag(0x04) - TAKEOVER_VBUCKETS = TapConnectFlag(0x08) - SUPPORT_ACK = TapConnectFlag(0x10) - REQUEST_KEYS_ONLY = TapConnectFlag(0x20) - CHECKPOINT = TapConnectFlag(0x40) - REGISTERED_CLIENT = TapConnectFlag(0x80) - FIX_FLAG_BYTEORDER = TapConnectFlag(0x100) -) - -// Tap opaque event subtypes -const ( - TAP_OPAQUE_ENABLE_AUTO_NACK = 0 - TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1 - TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2 - TAP_OPAQUE_CLOSE_TAP_STREAM = 7 - TAP_OPAQUE_CLOSE_BACKFILL = 8 -) - -// Tap item flags -const ( - TAP_ACK = 1 - TAP_NO_VALUE = 2 - TAP_FLAG_NETWORK_BYTE_ORDER = 4 -) - -// TapConnectFlagNames for TapConnectFlag -var TapConnectFlagNames = map[TapConnectFlag]string{ - BACKFILL: "BACKFILL", - DUMP: "DUMP", - LIST_VBUCKETS: "LIST_VBUCKETS", - TAKEOVER_VBUCKETS: "TAKEOVER_VBUCKETS", - SUPPORT_ACK: "SUPPORT_ACK", - REQUEST_KEYS_ONLY: "REQUEST_KEYS_ONLY", - CHECKPOINT: "CHECKPOINT", - REGISTERED_CLIENT: "REGISTERED_CLIENT", - FIX_FLAG_BYTEORDER: "FIX_FLAG_BYTEORDER", -} - -// TapItemParser is a function to parse a single tap extra. -type TapItemParser func(io.Reader) (interface{}, error) - -// TapParseUint64 is a function to parse a single tap uint64. -func TapParseUint64(r io.Reader) (interface{}, error) { - var rv uint64 - err := binary.Read(r, binary.BigEndian, &rv) - return rv, err -} - -// TapParseUint16 is a function to parse a single tap uint16. -func TapParseUint16(r io.Reader) (interface{}, error) { - var rv uint16 - err := binary.Read(r, binary.BigEndian, &rv) - return rv, err -} - -// TapParseBool is a function to parse a single tap boolean. -func TapParseBool(r io.Reader) (interface{}, error) { - return true, nil -} - -// TapParseVBList parses a list of vBucket numbers as []uint16. -func TapParseVBList(r io.Reader) (interface{}, error) { - num, err := TapParseUint16(r) - if err != nil { - return nil, err - } - n := int(num.(uint16)) - - rv := make([]uint16, n) - for i := 0; i < n; i++ { - x, err := TapParseUint16(r) - if err != nil { - return nil, err - } - rv[i] = x.(uint16) - } - - return rv, err -} - -// TapFlagParsers parser functions for TAP fields. -var TapFlagParsers = map[TapConnectFlag]TapItemParser{ - BACKFILL: TapParseUint64, - LIST_VBUCKETS: TapParseVBList, -} - -// SplitFlags will split the ORed flags into the individual bit flags. -func (f TapConnectFlag) SplitFlags() []TapConnectFlag { - rv := []TapConnectFlag{} - for i := uint32(1); f != 0; i = i << 1 { - if uint32(f)&i == i { - rv = append(rv, TapConnectFlag(i)) - } - f = TapConnectFlag(uint32(f) & (^i)) - } - return rv -} - -func (f TapConnectFlag) String() string { - parts := []string{} - for _, x := range f.SplitFlags() { - p := TapConnectFlagNames[x] - if p == "" { - p = fmt.Sprintf("0x%x", int(x)) - } - parts = append(parts, p) - } - return strings.Join(parts, "|") -} - -type TapConnect struct { - Flags map[TapConnectFlag]interface{} - RemainingBody []byte - Name string -} - -// ParseTapCommands parse the tap request into the interesting bits we may -// need to do something with. -func (req *MCRequest) ParseTapCommands() (TapConnect, error) { - rv := TapConnect{ - Flags: map[TapConnectFlag]interface{}{}, - Name: string(req.Key), - } - - if len(req.Extras) < 4 { - return rv, fmt.Errorf("not enough extra bytes: %x", req.Extras) - } - - flags := TapConnectFlag(binary.BigEndian.Uint32(req.Extras)) - - r := bytes.NewReader(req.Body) - - for _, f := range flags.SplitFlags() { - fun := TapFlagParsers[f] - if fun == nil { - fun = TapParseBool - } - - val, err := fun(r) - if err != nil { - return rv, err - } - - rv.Flags[f] = val - } - - var err error - rv.RemainingBody, err = ioutil.ReadAll(r) - - return rv, err -} |