summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbase/gomemcached
diff options
context:
space:
mode:
authortechknowlogick <techknowlogick@gitea.io>2022-01-14 18:16:05 -0500
committerGitHub <noreply@github.com>2022-01-14 18:16:05 -0500
commit84145e45c50130922fae9055535ab5ea0378e1d4 (patch)
treefce077a5ae462840bb876ace79aca42abab29ed7 /vendor/github.com/couchbase/gomemcached
parent2b16ca7c773de278ba01f122dc6f9f43d7534c52 (diff)
downloadgitea-84145e45c50130922fae9055535ab5ea0378e1d4.tar.gz
gitea-84145e45c50130922fae9055535ab5ea0378e1d4.zip
Remove golang vendored directory (#18277)
* rm go vendor * fix drone yaml * add to gitignore
Diffstat (limited to 'vendor/github.com/couchbase/gomemcached')
-rw-r--r--vendor/github.com/couchbase/gomemcached/.gitignore7
-rw-r--r--vendor/github.com/couchbase/gomemcached/LICENSE19
-rw-r--r--vendor/github.com/couchbase/gomemcached/README.markdown32
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/collections_filter.go130
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/mc.go1515
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/tap_feed.go333
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/transport.go67
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/upr_event.go403
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/upr_feed.go1132
-rw-r--r--vendor/github.com/couchbase/gomemcached/flexibleFraming.go398
-rw-r--r--vendor/github.com/couchbase/gomemcached/go.mod3
-rw-r--r--vendor/github.com/couchbase/gomemcached/mc_constants.go364
-rw-r--r--vendor/github.com/couchbase/gomemcached/mc_req.go656
-rw-r--r--vendor/github.com/couchbase/gomemcached/mc_res.go280
-rw-r--r--vendor/github.com/couchbase/gomemcached/tap.go168
15 files changed, 0 insertions, 5507 deletions
diff --git a/vendor/github.com/couchbase/gomemcached/.gitignore b/vendor/github.com/couchbase/gomemcached/.gitignore
deleted file mode 100644
index cd8acba17e..0000000000
--- a/vendor/github.com/couchbase/gomemcached/.gitignore
+++ /dev/null
@@ -1,7 +0,0 @@
-#*
-*.[68]
-*~
-*.swp
-/gocache/gocache
-c.out
-.idea \ No newline at end of file
diff --git a/vendor/github.com/couchbase/gomemcached/LICENSE b/vendor/github.com/couchbase/gomemcached/LICENSE
deleted file mode 100644
index b01ef80261..0000000000
--- a/vendor/github.com/couchbase/gomemcached/LICENSE
+++ /dev/null
@@ -1,19 +0,0 @@
-Copyright (c) 2013 Dustin Sallings
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
diff --git a/vendor/github.com/couchbase/gomemcached/README.markdown b/vendor/github.com/couchbase/gomemcached/README.markdown
deleted file mode 100644
index 5e9b2de5be..0000000000
--- a/vendor/github.com/couchbase/gomemcached/README.markdown
+++ /dev/null
@@ -1,32 +0,0 @@
-# gomemcached
-
-This is a memcached binary protocol toolkit in [go][go].
-
-It provides client and server functionality as well as a little sample
-server showing how I might make a server if I valued purity over
-performance.
-
-## Server Design
-
-<div>
- <img src="http://dustin.github.com/images/gomemcached.png"
- alt="overview" style="float: right"/>
-</div>
-
-The basic design can be seen in [gocache]. A [storage
-server][storage] is run as a goroutine that receives a `MCRequest` on
-a channel, and then issues an `MCResponse` to a channel contained
-within the request.
-
-Each connection is a separate goroutine, of course, and is responsible
-for all IO for that connection until the connection drops or the
-`dataServer` decides it's stupid and sends a fatal response back over
-the channel.
-
-There is currently no work at all in making the thing perform (there
-are specific areas I know need work). This is just my attempt to
-learn the language somewhat.
-
-[go]: http://golang.org/
-[gocache]: gomemcached/blob/master/gocache/gocache.go
-[storage]: gomemcached/blob/master/gocache/mc_storage.go
diff --git a/vendor/github.com/couchbase/gomemcached/client/collections_filter.go b/vendor/github.com/couchbase/gomemcached/client/collections_filter.go
deleted file mode 100644
index a34d353fec..0000000000
--- a/vendor/github.com/couchbase/gomemcached/client/collections_filter.go
+++ /dev/null
@@ -1,130 +0,0 @@
-package memcached
-
-import (
- "encoding/json"
- "fmt"
-)
-
-// Collection based filter
-type CollectionsFilter struct {
- ManifestUid uint64
- UseManifestUid bool
- StreamId uint16
- UseStreamId bool
-
- // Use either ScopeId OR CollectionsList, not both
- CollectionsList []uint32
- ScopeId uint32
-}
-
-type nonStreamIdNonCollectionsMeta struct {
- ManifestId string `json:"uid"`
-}
-
-type nonStreamIdNonResumeCollectionsMeta struct {
- CollectionsList []string `json:"collections"`
-}
-
-type nonStreamIdCollectionsMeta struct {
- ManifestId string `json:"uid"`
- CollectionsList []string `json:"collections"`
-}
-
-type streamIdNonResumeCollectionsMeta struct {
- CollectionsList []string `json:"collections"`
- StreamId uint16 `json:"sid"`
-}
-
-type streamIdNonResumeScopeMeta struct {
- ScopeId string `json:"scope"`
- StreamId uint16 `json:"sid"`
-}
-
-func (c *CollectionsFilter) IsValid() error {
- if c.UseManifestUid && c.UseStreamId {
- return fmt.Errorf("Not implemented yet")
- }
-
- if len(c.CollectionsList) > 0 && c.ScopeId > 0 {
- return fmt.Errorf("Collection list is specified but scope ID is also specified")
- }
-
- return nil
-}
-
-func (c *CollectionsFilter) outputCollectionsFilterColList() (outputList []string) {
- for _, collectionUint := range c.CollectionsList {
- outputList = append(outputList, fmt.Sprintf("%x", collectionUint))
- }
- return
-}
-
-func (c *CollectionsFilter) outputScopeId() string {
- return fmt.Sprintf("%x", c.ScopeId)
-}
-
-func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) {
- if err := c.IsValid(); err != nil {
- return nil, err
- }
-
- var output interface{}
-
- switch c.UseStreamId {
- case true:
- switch c.UseManifestUid {
- case true:
- // TODO
- return nil, fmt.Errorf("NotImplemented0")
- case false:
- switch len(c.CollectionsList) > 0 {
- case true:
- filter := &streamIdNonResumeCollectionsMeta{
- StreamId: c.StreamId,
- CollectionsList: c.outputCollectionsFilterColList(),
- }
- output = *filter
- case false:
- filter := &streamIdNonResumeScopeMeta{
- StreamId: c.StreamId,
- ScopeId: c.outputScopeId(),
- }
- output = *filter
- }
- }
- case false:
- switch c.UseManifestUid {
- case true:
- switch len(c.CollectionsList) > 0 {
- case true:
- filter := &nonStreamIdCollectionsMeta{
- ManifestId: fmt.Sprintf("%x", c.ManifestUid),
- CollectionsList: c.outputCollectionsFilterColList(),
- }
- output = *filter
- case false:
- filter := &nonStreamIdNonCollectionsMeta{
- ManifestId: fmt.Sprintf("%x", c.ManifestUid),
- }
- output = *filter
- }
- case false:
- switch len(c.CollectionsList) > 0 {
- case true:
- filter := &nonStreamIdNonResumeCollectionsMeta{
- CollectionsList: c.outputCollectionsFilterColList(),
- }
- output = *filter
- case false:
- return nil, fmt.Errorf("Specifying scopeID must require the use of streamId")
- }
- }
- }
-
- data, err := json.Marshal(output)
- if err != nil {
- return nil, err
- } else {
- return data, nil
- }
-}
diff --git a/vendor/github.com/couchbase/gomemcached/client/mc.go b/vendor/github.com/couchbase/gomemcached/client/mc.go
deleted file mode 100644
index 3dc121da5d..0000000000
--- a/vendor/github.com/couchbase/gomemcached/client/mc.go
+++ /dev/null
@@ -1,1515 +0,0 @@
-// Package memcached provides a memcached binary protocol client.
-package memcached
-
-import (
- "crypto/tls"
- "encoding/binary"
- "fmt"
- "github.com/couchbase/gomemcached"
- "github.com/couchbase/goutils/logging"
- "github.com/couchbase/goutils/scramsha"
- "github.com/pkg/errors"
- "io"
- "math"
- "net"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type ClientIface interface {
- Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
- Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
- Auth(user, pass string) (*gomemcached.MCResponse, error)
- AuthList() (*gomemcached.MCResponse, error)
- AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
- AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
- CASNext(vb uint16, k string, exp int, state *CASState) bool
- CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
- CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
- CollectionEnabled() bool
- Close() error
- Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
- Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- EnableMutationToken() (*gomemcached.MCResponse, error)
- EnableFeatures(features Features) (*gomemcached.MCResponse, error)
- Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)
- GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)
- GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error
- GetCollectionsManifest() (*gomemcached.MCResponse, error)
- GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)
- GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- Hijack() io.ReadWriteCloser
- Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
- LastBucket() string
- Observe(vb uint16, key string) (result ObserveResult, err error)
- ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
- Receive() (*gomemcached.MCResponse, error)
- ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
- Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
- Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
- SetKeepAliveOptions(interval time.Duration)
- SetReadDeadline(t time.Time)
- SetDeadline(t time.Time)
- SelectBucket(bucket string) (*gomemcached.MCResponse, error)
- SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
- Stats(key string) ([]StatValue, error)
- StatsFunc(key string, fn func(key, val []byte)) error
- StatsMap(key string) (map[string]string, error)
- StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
- Transmit(req *gomemcached.MCRequest) error
- TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
- TransmitResponse(res *gomemcached.MCResponse) error
- UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
-
- // UprFeed Related
- NewUprFeed() (*UprFeed, error)
- NewUprFeedIface() (UprFeedIface, error)
- NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
- NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
-}
-
-type ClientContext struct {
- // Collection-based context
- CollId uint32
-
- // Impersonate context
- User string
-
- // VB-state related context
- // nil means not used in this context
- VbState *VbStateType
-}
-
-type VbStateType uint8
-
-const (
- VbAlive VbStateType = 0x00
- VbActive VbStateType = 0x01
- VbReplica VbStateType = 0x02
- VbPending VbStateType = 0x03
- VbDead VbStateType = 0x04
-)
-
-func (context *ClientContext) InitExtras(req *gomemcached.MCRequest, client *Client) {
- if req == nil || client == nil {
- return
- }
-
- var bytesToAllocate int
- switch req.Opcode {
- case gomemcached.GET_ALL_VB_SEQNOS:
- if context.VbState != nil {
- bytesToAllocate += 4
- }
- if client.CollectionEnabled() {
- if context.VbState == nil {
- bytesToAllocate += 8
- } else {
- bytesToAllocate += 4
- }
- }
- }
- if bytesToAllocate > 0 {
- req.Extras = make([]byte, bytesToAllocate)
- }
-}
-
-const bufsize = 1024
-
-var UnHealthy uint32 = 0
-var Healthy uint32 = 1
-
-type Features []Feature
-type Feature uint16
-
-const FeatureTcpNoDelay = Feature(0x03)
-const FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations
-const FeatureXattr = Feature(0x06)
-const FeatureXerror = Feature(0x07)
-const FeatureCollections = Feature(0x12)
-const FeatureSnappyCompression = Feature(0x0a)
-const FeatureDataType = Feature(0x0b)
-
-type memcachedConnection interface {
- io.ReadWriteCloser
-
- SetReadDeadline(time.Time) error
- SetDeadline(time.Time) error
-}
-
-// The Client itself.
-type Client struct {
- conn memcachedConnection
- // use uint32 type so that it can be accessed through atomic APIs
- healthy uint32
- opaque uint32
-
- hdrBuf []byte
-
- collectionsEnabled uint32
- deadline time.Time
- bucket string
-}
-
-var (
- DefaultDialTimeout = time.Duration(0) // No timeout
-
- DefaultWriteTimeout = time.Duration(0) // No timeout
-
- dialFun = func(prot, dest string) (net.Conn, error) {
- return net.DialTimeout(prot, dest, DefaultDialTimeout)
- }
-)
-
-// Connect to a memcached server.
-func Connect(prot, dest string) (rv *Client, err error) {
- conn, err := dialFun(prot, dest)
- if err != nil {
- return nil, err
- }
- return Wrap(conn)
-}
-
-// Connect to a memcached server using TLS.
-func ConnectTLS(prot, dest string, config *tls.Config) (rv *Client, err error) {
- conn, err := tls.Dial(prot, dest, config)
- if err != nil {
- return nil, err
- }
- return Wrap(conn)
-}
-
-func SetDefaultTimeouts(dial, read, write time.Duration) {
- DefaultDialTimeout = dial
- DefaultWriteTimeout = write
-}
-
-func SetDefaultDialTimeout(dial time.Duration) {
- DefaultDialTimeout = dial
-}
-
-func (c *Client) SetKeepAliveOptions(interval time.Duration) {
- tcpConn, ok := c.conn.(*net.TCPConn)
- if ok {
- tcpConn.SetKeepAlive(true)
- tcpConn.SetKeepAlivePeriod(interval)
- }
-}
-
-func (c *Client) SetReadDeadline(t time.Time) {
- c.conn.SetReadDeadline(t)
-}
-
-func (c *Client) SetDeadline(t time.Time) {
- if t.Equal(c.deadline) {
- return
- }
- c.conn.SetDeadline(t)
- c.deadline = t
-}
-
-func (c *Client) getOpaque() uint32 {
- if c.opaque >= math.MaxInt32 {
- c.opaque = uint32(1)
- }
- return c.opaque + 1
-}
-
-// Wrap an existing transport.
-func Wrap(conn memcachedConnection) (rv *Client, err error) {
- client := &Client{
- conn: conn,
- hdrBuf: make([]byte, gomemcached.HDR_LEN),
- opaque: uint32(1),
- }
- client.setHealthy(true)
- return client, nil
-}
-
-// Close the connection when you're done.
-func (c *Client) Close() error {
- return c.conn.Close()
-}
-
-// IsHealthy returns true unless the client is belived to have
-// difficulty communicating to its server.
-//
-// This is useful for connection pools where we want to
-// non-destructively determine that a connection may be reused.
-func (c Client) IsHealthy() bool {
- healthyState := atomic.LoadUint32(&c.healthy)
- return healthyState == Healthy
-}
-
-// Send a custom request and get the response.
-func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) {
- err = c.Transmit(req)
- if err != nil {
- return
- }
- resp, _, err := getResponse(c.conn, c.hdrBuf)
- c.setHealthy(!gomemcached.IsFatal(err))
- return resp, err
-}
-
-// Transmit send a request, but does not wait for a response.
-func (c *Client) Transmit(req *gomemcached.MCRequest) error {
- if DefaultWriteTimeout > 0 {
- c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
- }
- _, err := transmitRequest(c.conn, req)
- // clear write deadline to avoid interference with future write operations
- if DefaultWriteTimeout > 0 {
- c.conn.(net.Conn).SetWriteDeadline(time.Time{})
- }
- if err != nil {
- c.setHealthy(false)
- }
- return err
-}
-
-func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error {
- c.conn.(net.Conn).SetWriteDeadline(deadline)
-
- _, err := transmitRequest(c.conn, req)
-
- // clear write deadline to avoid interference with future write operations
- c.conn.(net.Conn).SetWriteDeadline(time.Time{})
-
- if err != nil {
- c.setHealthy(false)
- }
- return err
-}
-
-// TransmitResponse send a response, does not wait.
-func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error {
- if DefaultWriteTimeout > 0 {
- c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
- }
- _, err := transmitResponse(c.conn, res)
- // clear write deadline to avoid interference with future write operations
- if DefaultWriteTimeout > 0 {
- c.conn.(net.Conn).SetWriteDeadline(time.Time{})
- }
- if err != nil {
- c.setHealthy(false)
- }
- return err
-}
-
-// Receive a response
-func (c *Client) Receive() (*gomemcached.MCResponse, error) {
- resp, _, err := getResponse(c.conn, c.hdrBuf)
- if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
- c.setHealthy(false)
- }
- return resp, err
-}
-
-func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) {
- c.conn.(net.Conn).SetReadDeadline(deadline)
-
- resp, _, err := getResponse(c.conn, c.hdrBuf)
-
- // Clear read deadline to avoid interference with future read operations.
- c.conn.(net.Conn).SetReadDeadline(time.Time{})
-
- if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
- c.setHealthy(false)
- }
- return resp, err
-}
-
-func appendMutationToken(bytes []byte) []byte {
- bytes = append(bytes, 0, 0)
- binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04))
- return bytes
-}
-
-//Send a hello command to enable MutationTokens
-func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) {
- var payload []byte
- payload = appendMutationToken(payload)
-
- return c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.HELLO,
- Key: []byte("GoMemcached"),
- Body: payload,
- })
-
-}
-
-//Send a hello command to enable specific features
-func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) {
- var payload []byte
- collectionsEnabled := 0
-
- for _, feature := range features {
- if feature == FeatureCollections {
- collectionsEnabled = 1
- }
- payload = append(payload, 0, 0)
- binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
- }
-
- rv, err := c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.HELLO,
- Key: []byte("GoMemcached"),
- Body: payload,
- })
-
- if err == nil && collectionsEnabled != 0 {
- atomic.StoreUint32(&c.collectionsEnabled, uint32(collectionsEnabled))
- }
- return rv, err
-}
-
-// Sets collection and user info for a request
-func (c *Client) setContext(req *gomemcached.MCRequest, context ...*ClientContext) error {
- req.CollIdLen = 0
- req.UserLen = 0
- collectionId := uint32(0)
- if len(context) > 0 {
- collectionId = context[0].CollId
- uLen := len(context[0].User)
- if uLen > 0 {
- if uLen > gomemcached.MAX_USER_LEN {
- uLen = gomemcached.MAX_USER_LEN
- }
- req.UserLen = uLen
- copy(req.Username[:uLen], context[0].User)
- }
- }
-
- // if the optional collection is specified, it must be default for clients that haven't turned on collections
- if atomic.LoadUint32(&c.collectionsEnabled) == 0 {
- if collectionId != 0 {
- return fmt.Errorf("Client does not use collections but a collection was specified")
- }
- } else {
- req.CollIdLen = binary.PutUvarint(req.CollId[:], uint64(collectionId))
- }
- return nil
-}
-
-// Sets collection info in extras
-func (c *Client) setExtrasContext(req *gomemcached.MCRequest, context ...*ClientContext) error {
- collectionId := uint32(0)
- req.UserLen = 0
- if len(context) > 0 {
- collectionId = context[0].CollId
- uLen := len(context[0].User)
- if uLen > 0 {
- req.UserLen = uLen
- copy(req.Username[:], context[0].User)
- }
- }
-
- // if the optional collection is specified, it must be default for clients that haven't turned on collections
- if atomic.LoadUint32(&c.collectionsEnabled) == 0 {
- if collectionId != 0 {
- return fmt.Errorf("Client does not use collections but a collection was specified")
- }
- } else {
- req.Extras = make([]byte, 4)
- binary.BigEndian.PutUint32(req.Extras, collectionId)
- }
- return nil
-}
-
-func (c *Client) setVbSeqnoContext(req *gomemcached.MCRequest, context ...*ClientContext) error {
- if len(context) == 0 || req == nil {
- return nil
- }
-
- switch req.Opcode {
- case gomemcached.GET_ALL_VB_SEQNOS:
- if len(context) == 0 {
- return nil
- }
-
- if len(req.Extras) == 0 {
- context[0].InitExtras(req, c)
- }
- if context[0].VbState != nil {
- binary.BigEndian.PutUint32(req.Extras, uint32(*(context[0].VbState)))
- }
- if c.CollectionEnabled() {
- binary.BigEndian.PutUint32(req.Extras[4:8], context[0].CollId)
- }
- return nil
- default:
- return fmt.Errorf("setVbState Not supported for opcode: %v", req.Opcode.String())
- }
-}
-
-// Get the value for a key.
-func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.GET,
- VBucket: vb,
- Key: []byte(key),
- Opaque: c.getOpaque(),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
- return c.Send(req)
-}
-
-// Get the xattrs, doc value for the input key
-func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- extraBuf, valueBuf := GetSubDocVal(subPaths)
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.SUBDOC_MULTI_LOOKUP,
- VBucket: vb,
- Key: []byte(key),
- Extras: extraBuf,
- Body: valueBuf,
- Opaque: c.getOpaque(),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
-
- res, err := c.Send(req)
-
- if err != nil && IfResStatusError(res) {
- return res, err
- }
- return res, nil
-}
-
-// Retrieve the collections manifest.
-func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error) {
-
- res, err := c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.GET_COLLECTIONS_MANIFEST,
- Opaque: c.getOpaque(),
- })
-
- if err != nil && IfResStatusError(res) {
- return res, err
- }
- return res, nil
-}
-
-// Retrieve the collections manifest.
-func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) {
-
- res, err := c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.COLLECTIONS_GET_CID,
- Key: []byte(scope + "." + collection),
- Opaque: c.getOpaque(),
- })
-
- if err != nil && IfResStatusError(res) {
- return res, err
- }
- return res, nil
-}
-
-func (c *Client) CollectionEnabled() bool {
- return atomic.LoadUint32(&c.collectionsEnabled) > 0
-}
-
-// Get the value for a key, and update expiry
-func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- extraBuf := make([]byte, 4)
- binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp))
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.GAT,
- VBucket: vb,
- Key: []byte(key),
- Extras: extraBuf,
- Opaque: c.getOpaque(),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
- return c.Send(req)
-}
-
-// Get metadata for a key
-func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.GET_META,
- VBucket: vb,
- Key: []byte(key),
- Opaque: c.getOpaque(),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
- return c.Send(req)
-}
-
-// Del deletes a key.
-func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.DELETE,
- VBucket: vb,
- Key: []byte(key),
- Opaque: c.getOpaque(),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
- return c.Send(req)
-}
-
-// Get a random document
-func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: 0xB6,
- Opaque: c.getOpaque(),
- }
- err := c.setExtrasContext(req, context...)
- if err != nil {
- return nil, err
- }
- return c.Send(req)
-}
-
-// AuthList lists SASL auth mechanisms.
-func (c *Client) AuthList() (*gomemcached.MCResponse, error) {
- return c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.SASL_LIST_MECHS})
-}
-
-// Auth performs SASL PLAIN authentication against the server.
-func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) {
- res, err := c.AuthList()
-
- if err != nil {
- return res, err
- }
-
- authMech := string(res.Body)
- if strings.Index(authMech, "PLAIN") != -1 {
- return c.AuthPlain(user, pass)
- }
- return nil, fmt.Errorf("auth mechanism PLAIN not supported")
-}
-
-// AuthScramSha performs SCRAM-SHA authentication against the server.
-func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) {
- res, err := c.AuthList()
- if err != nil {
- return nil, errors.Wrap(err, "Unable to obtain list of methods.")
- }
-
- methods := string(res.Body)
- method, err := scramsha.BestMethod(methods)
- if err != nil {
- return nil, errors.Wrap(err,
- "Unable to select SCRAM-SHA method.")
- }
-
- s, err := scramsha.NewScramSha(method)
- if err != nil {
- return nil, errors.Wrap(err, "Unable to initialize scramsha.")
- }
-
- logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
-
- message, err := s.GetStartRequest(user)
- if err != nil {
- return nil, errors.Wrapf(err,
- "Error building start request for user %s.", user)
- }
-
- startRequest := &gomemcached.MCRequest{
- Opcode: 0x21,
- Key: []byte(method),
- Body: []byte(message)}
-
- startResponse, err := c.Send(startRequest)
- if err != nil {
- return nil, errors.Wrap(err, "Error sending start request.")
- }
-
- err = s.HandleStartResponse(string(startResponse.Body))
- if err != nil {
- return nil, errors.Wrap(err, "Error handling start response.")
- }
-
- message = s.GetFinalRequest(pass)
-
- // send step request
- finalRequest := &gomemcached.MCRequest{
- Opcode: 0x22,
- Key: []byte(method),
- Body: []byte(message)}
- finalResponse, err := c.Send(finalRequest)
- if err != nil {
- return nil, errors.Wrap(err, "Error sending final request.")
- }
-
- err = s.HandleFinalResponse(string(finalResponse.Body))
- if err != nil {
- return nil, errors.Wrap(err, "Error handling final response.")
- }
-
- return finalResponse, nil
-}
-
-func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) {
- logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
- return c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.SASL_AUTH,
- Key: []byte("PLAIN"),
- Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))})
-}
-
-// select bucket
-func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) {
- res, err := c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.SELECT_BUCKET,
- Key: []byte(bucket)})
- if res != nil {
- c.bucket = bucket
- }
- return res, err
-}
-
-func (c *Client) LastBucket() string {
- return c.bucket
-}
-
-func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
- key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: opcode,
- VBucket: vb,
- Key: []byte(key),
- Cas: 0,
- Opaque: c.getOpaque(),
- Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
- Body: body}
-
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
- binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
- return c.Send(req)
-}
-
-func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
- key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: opcode,
- VBucket: vb,
- Key: []byte(key),
- Cas: cas,
- Opaque: c.getOpaque(),
- Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
- Body: body}
-
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
-
- binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
- return c.Send(req)
-}
-
-// Incr increments the value at the given key.
-func (c *Client) Incr(vb uint16, key string,
- amt, def uint64, exp int, context ...*ClientContext) (uint64, error) {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.INCREMENT,
- VBucket: vb,
- Key: []byte(key),
- Extras: make([]byte, 8+8+4),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return 0, err
- }
-
- binary.BigEndian.PutUint64(req.Extras[:8], amt)
- binary.BigEndian.PutUint64(req.Extras[8:16], def)
- binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
-
- resp, err := c.Send(req)
- if err != nil {
- return 0, err
- }
-
- return binary.BigEndian.Uint64(resp.Body), nil
-}
-
-// Decr decrements the value at the given key.
-func (c *Client) Decr(vb uint16, key string,
- amt, def uint64, exp int, context ...*ClientContext) (uint64, error) {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.DECREMENT,
- VBucket: vb,
- Key: []byte(key),
- Extras: make([]byte, 8+8+4),
- }
- err := c.setContext(req, context...)
- if err != nil {
- return 0, err
- }
-
- binary.BigEndian.PutUint64(req.Extras[:8], amt)
- binary.BigEndian.PutUint64(req.Extras[8:16], def)
- binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
-
- resp, err := c.Send(req)
- if err != nil {
- return 0, err
- }
-
- return binary.BigEndian.Uint64(resp.Body), nil
-}
-
-// Add a value for a key (store if not exists).
-func (c *Client) Add(vb uint16, key string, flags int, exp int,
- body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- return c.store(gomemcached.ADD, vb, key, flags, exp, body, context...)
-}
-
-// Set the value for a key.
-func (c *Client) Set(vb uint16, key string, flags int, exp int,
- body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- return c.store(gomemcached.SET, vb, key, flags, exp, body, context...)
-}
-
-// SetCas set the value for a key with cas
-func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
- body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body, context...)
-}
-
-// Append data to the value of a key.
-func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.APPEND,
- VBucket: vb,
- Key: []byte(key),
- Cas: 0,
- Opaque: c.getOpaque(),
- Body: data}
-
- err := c.setContext(req, context...)
- if err != nil {
- return nil, err
- }
- return c.Send(req)
-}
-
-// GetBulk gets keys in bulk
-func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error {
- stopch := make(chan bool)
- var wg sync.WaitGroup
-
- defer func() {
- close(stopch)
- wg.Wait()
- }()
-
- if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) {
- c.opaque = uint32(1)
- }
-
- opStart := c.opaque
-
- errch := make(chan error, 2)
-
- wg.Add(1)
- go func() {
- defer func() {
- if r := recover(); r != nil {
- logging.Infof("Recovered in f %v", r)
- }
- errch <- nil
- wg.Done()
- }()
-
- ok := true
- for ok {
-
- select {
- case <-stopch:
- return
- default:
- res, err := c.Receive()
-
- if err != nil && IfResStatusError(res) {
- if res == nil || res.Status != gomemcached.KEY_ENOENT {
- errch <- err
- return
- }
- // continue receiving in case of KEY_ENOENT
- } else if res.Opcode == gomemcached.GET ||
- res.Opcode == gomemcached.SUBDOC_GET ||
- res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP {
- opaque := res.Opaque - opStart
- if opaque < 0 || opaque >= uint32(len(keys)) {
- // Every now and then we seem to be seeing an invalid opaque
- // value returned from the server. When this happens log the error
- // and the calling function will retry the bulkGet. MB-15140
- logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body))
- errch <- fmt.Errorf("Out of Bounds error")
- return
- }
-
- rv[keys[opaque]] = res
- }
-
- if res.Opcode == gomemcached.NOOP {
- ok = false
- }
- }
- }
- }()
-
- memcachedReqPkt := &gomemcached.MCRequest{
- Opcode: gomemcached.GET,
- VBucket: vb,
- }
- err := c.setContext(memcachedReqPkt, context...)
- if err != nil {
- return err
- }
-
- if len(subPaths) > 0 {
- extraBuf, valueBuf := GetSubDocVal(subPaths)
- memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP
- memcachedReqPkt.Extras = extraBuf
- memcachedReqPkt.Body = valueBuf
- }
-
- for _, k := range keys { // Start of Get request
- memcachedReqPkt.Key = []byte(k)
- memcachedReqPkt.Opaque = c.opaque
-
- err := c.Transmit(memcachedReqPkt)
- if err != nil {
- logging.Errorf(" Transmit failed in GetBulkAll %v", err)
- return err
- }
- c.opaque++
- } // End of Get request
-
- // finally transmit a NOOP
- err = c.Transmit(&gomemcached.MCRequest{
- Opcode: gomemcached.NOOP,
- VBucket: vb,
- Opaque: c.opaque,
- })
-
- if err != nil {
- logging.Errorf(" Transmit of NOOP failed %v", err)
- return err
- }
- c.opaque++
-
- return <-errch
-}
-
-func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) {
-
- var ops []string
- totalBytesLen := 0
- num := 1
-
- for _, v := range subPaths {
- totalBytesLen = totalBytesLen + len([]byte(v))
- ops = append(ops, v)
- num = num + 1
- }
-
- // Xattr retrieval - subdoc multi get
- // Set deleted true only if it is not expiration
- if len(subPaths) != 1 || subPaths[0] != "$document.exptime" {
- extraBuf = append(extraBuf, uint8(0x04))
- }
-
- valueBuf = make([]byte, num*4+totalBytesLen)
-
- //opcode for subdoc get
- op := gomemcached.SUBDOC_GET
-
- // Calculate path total bytes
- // There are 2 ops - get xattrs - both input and $document and get whole doc
- valIter := 0
-
- for _, v := range ops {
- pathBytes := []byte(v)
- valueBuf[valIter+0] = uint8(op)
-
- // SubdocFlagXattrPath indicates that the path refers to
- // an Xattr rather than the document body.
- valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR)
-
- // 2 byte key
- binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes)))
-
- // Then n bytes path
- copy(valueBuf[valIter+4:], pathBytes)
- valIter = valIter + 4 + len(pathBytes)
- }
-
- return
-}
-
-// ObservedStatus is the type reported by the Observe method
-type ObservedStatus uint8
-
-// Observation status values.
-const (
- ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted
- ObservedPersisted = ObservedStatus(0x01) // found, persisted
- ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete)
- ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet)
-)
-
-// ObserveResult represents the data obtained by an Observe call
-type ObserveResult struct {
- Status ObservedStatus // Whether the value has been persisted/deleted
- Cas uint64 // Current value's CAS
- PersistenceTime time.Duration // Node's average time to persist a value
- ReplicationTime time.Duration // Node's average time to replicate a value
-}
-
-// Observe gets the persistence/replication/CAS state of a key
-func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) {
- // http://www.couchbase.com/wiki/display/couchbase/Observe
- body := make([]byte, 4+len(key))
- binary.BigEndian.PutUint16(body[0:2], vb)
- binary.BigEndian.PutUint16(body[2:4], uint16(len(key)))
- copy(body[4:4+len(key)], key)
-
- res, err := c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.OBSERVE,
- VBucket: vb,
- Body: body,
- })
- if err != nil {
- return
- }
-
- // Parse the response data from the body:
- if len(res.Body) < 2+2+1 {
- err = io.ErrUnexpectedEOF
- return
- }
- outVb := binary.BigEndian.Uint16(res.Body[0:2])
- keyLen := binary.BigEndian.Uint16(res.Body[2:4])
- if len(res.Body) < 2+2+int(keyLen)+1+8 {
- err = io.ErrUnexpectedEOF
- return
- }
- outKey := string(res.Body[4 : 4+keyLen])
- if outVb != vb || outKey != key {
- err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey)
- return
- }
- result.Status = ObservedStatus(res.Body[4+keyLen])
- result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:])
- // The response reuses the Cas field to store time statistics:
- result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond
- result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond
- return
-}
-
-// CheckPersistence checks whether a stored value has been persisted to disk yet.
-func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) {
- switch {
- case result.Status == ObservedNotFound && deletion:
- persisted = true
- case result.Cas != cas:
- overwritten = true
- case result.Status == ObservedPersisted:
- persisted = true
- }
- return
-}
-
-// Sequence number based Observe Implementation
-type ObserveSeqResult struct {
- Failover uint8 // Set to 1 if a failover took place
- VbId uint16 // vbucket id
- Vbuuid uint64 // vucket uuid
- LastPersistedSeqNo uint64 // last persisted sequence number
- CurrentSeqNo uint64 // current sequence number
- OldVbuuid uint64 // Old bucket vbuuid
- LastSeqNo uint64 // last sequence number received before failover
-}
-
-func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) {
- // http://www.couchbase.com/wiki/display/couchbase/Observe
- body := make([]byte, 8)
- binary.BigEndian.PutUint64(body[0:8], vbuuid)
-
- res, err := c.Send(&gomemcached.MCRequest{
- Opcode: gomemcached.OBSERVE_SEQNO,
- VBucket: vb,
- Body: body,
- Opaque: 0x01,
- })
- if err != nil {
- return
- }
-
- if res.Status != gomemcached.SUCCESS {
- return nil, fmt.Errorf(" Observe returned error %v", res.Status)
- }
-
- // Parse the response data from the body:
- if len(res.Body) < (1 + 2 + 8 + 8 + 8) {
- err = io.ErrUnexpectedEOF
- return
- }
-
- result = &ObserveSeqResult{}
- result.Failover = res.Body[0]
- result.VbId = binary.BigEndian.Uint16(res.Body[1:3])
- result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11])
- result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19])
- result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27])
-
- // in case of failover processing we can have old vbuuid and the last persisted seq number
- if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) {
- result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35])
- result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43])
- }
-
- return
-}
-
-// CasOp is the type of operation to perform on this CAS loop.
-type CasOp uint8
-
-const (
- // CASStore instructs the server to store the new value normally
- CASStore = CasOp(iota)
- // CASQuit instructs the client to stop attempting to CAS, leaving value untouched
- CASQuit
- // CASDelete instructs the server to delete the current value
- CASDelete
-)
-
-// User specified termination is returned as an error.
-func (c CasOp) Error() string {
- switch c {
- case CASStore:
- return "CAS store"
- case CASQuit:
- return "CAS quit"
- case CASDelete:
- return "CAS delete"
- }
- panic("Unhandled value")
-}
-
-//////// CAS TRANSFORM
-
-// CASState tracks the state of CAS over several operations.
-//
-// This is used directly by CASNext and indirectly by CAS
-type CASState struct {
- initialized bool // false on the first call to CASNext, then true
- Value []byte // Current value of key; update in place to new value
- Cas uint64 // Current CAS value of key
- Exists bool // Does a value exist for the key? (If not, Value will be nil)
- Err error // Error, if any, after CASNext returns false
- resp *gomemcached.MCResponse
-}
-
-// CASNext is a non-callback, loop-based version of CAS method.
-//
-// Usage is like this:
-//
-// var state memcached.CASState
-// for client.CASNext(vb, key, exp, &state) {
-// state.Value = some_mutation(state.Value)
-// }
-// if state.Err != nil { ... }
-func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool {
- if state.initialized {
- if !state.Exists {
- // Adding a new key:
- if state.Value == nil {
- state.Cas = 0
- return false // no-op (delete of non-existent value)
- }
- state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value)
- } else {
- // Updating / deleting a key:
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.DELETE,
- VBucket: vb,
- Key: []byte(k),
- Cas: state.Cas}
- if state.Value != nil {
- req.Opcode = gomemcached.SET
- req.Opaque = 0
- req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0}
- req.Body = state.Value
-
- flags := 0
- binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
- }
- state.resp, state.Err = c.Send(req)
- }
-
- // If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to
- // get the new value (below). Otherwise, we're done (either success or failure) so return:
- if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS ||
- state.resp.Status == gomemcached.NOT_STORED)) {
- state.Cas = state.resp.Cas
- return false // either success or fatal error
- }
- }
-
- // Initial call, or after a conflict: GET the current value and CAS and return them:
- state.initialized = true
- if state.resp, state.Err = c.Get(vb, k); state.Err == nil {
- state.Exists = true
- state.Value = state.resp.Body
- state.Cas = state.resp.Cas
- } else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT {
- state.Err = nil
- state.Exists = false
- state.Value = nil
- state.Cas = 0
- } else {
- return false // fatal error
- }
- return true // keep going...
-}
-
-// CasFunc is type type of function to perform a CAS transform.
-//
-// Input is the current value, or nil if no value exists.
-// The function should return the new value (if any) to set, and the store/quit/delete operation.
-type CasFunc func(current []byte) ([]byte, CasOp)
-
-// CAS performs a CAS transform with the given function.
-//
-// If the value does not exist, a nil current value will be sent to f.
-func (c *Client) CAS(vb uint16, k string, f CasFunc,
- initexp int) (*gomemcached.MCResponse, error) {
- var state CASState
- for c.CASNext(vb, k, initexp, &state) {
- newValue, operation := f(state.Value)
- if operation == CASQuit || (operation == CASDelete && state.Value == nil) {
- return nil, operation
- }
- state.Value = newValue
- }
- return state.resp, state.Err
-}
-
-// StatValue is one of the stats returned from the Stats method.
-type StatValue struct {
- // The stat key
- Key string
- // The stat value
- Val string
-}
-
-// Stats requests server-side stats.
-//
-// Use "" as the stat key for toplevel stats.
-func (c *Client) Stats(key string) ([]StatValue, error) {
- rv := make([]StatValue, 0, 128)
-
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.STAT,
- Key: []byte(key),
- Opaque: 918494,
- }
-
- err := c.Transmit(req)
- if err != nil {
- return rv, err
- }
-
- for {
- res, _, err := getResponse(c.conn, c.hdrBuf)
- if err != nil {
- return rv, err
- }
- k := string(res.Key)
- if k == "" {
- break
- }
- rv = append(rv, StatValue{
- Key: k,
- Val: string(res.Body),
- })
- }
- return rv, nil
-}
-
-// Stats requests server-side stats.
-//
-// Use "" as the stat key for toplevel stats.
-func (c *Client) StatsFunc(key string, fn func(key, val []byte)) error {
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.STAT,
- Key: []byte(key),
- Opaque: 918494,
- }
-
- err := c.Transmit(req)
- if err != nil {
- return err
- }
-
- for {
- res, _, err := getResponse(c.conn, c.hdrBuf)
- if err != nil {
- return err
- }
- if len(res.Key) == 0 {
- break
- }
- fn(res.Key, res.Body)
- }
- return nil
-}
-
-// StatsMap requests server-side stats similarly to Stats, but returns
-// them as a map.
-//
-// Use "" as the stat key for toplevel stats.
-func (c *Client) StatsMap(key string) (map[string]string, error) {
- rv := make(map[string]string)
-
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.STAT,
- Key: []byte(key),
- Opaque: 918494,
- }
-
- err := c.Transmit(req)
- if err != nil {
- return rv, err
- }
-
- for {
- res, _, err := getResponse(c.conn, c.hdrBuf)
- if err != nil {
- return rv, err
- }
- k := string(res.Key)
- if k == "" {
- break
- }
- rv[k] = string(res.Body)
- }
-
- return rv, nil
-}
-
-// instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys
-// for which stats needs to be retrieved
-func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error {
-
- // clear statsMap
- for key, _ := range statsMap {
- statsMap[key] = ""
- }
-
- req := &gomemcached.MCRequest{
- Opcode: gomemcached.STAT,
- Key: []byte(key),
- Opaque: 918494,
- }
-
- err := c.Transmit(req)
- if err != nil {
- return err
- }
-
- for {
- res, _, err := getResponse(c.conn, c.hdrBuf)
- if err != nil {
- return err
- }
- k := string(res.Key)
- if k == "" {
- break
- }
- if _, ok := statsMap[k]; ok {
- statsMap[k] = string(res.Body)
- }
- }
-
- return nil
-}
-
-// UprGetFailoverLog for given list of vbuckets.
-func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) {
-
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_FAILOVERLOG,
- Opaque: opaqueFailover,
- }
-
- failoverLogs := make(map[uint16]*FailoverLog)
- for _, vBucket := range vb {
- rq.VBucket = vBucket
- if err := mc.Transmit(rq); err != nil {
- return nil, err
- }
- res, err := mc.Receive()
-
- if err != nil {
- return nil, fmt.Errorf("failed to receive %s", err.Error())
- } else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
- return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
- }
-
- flog, err := parseFailoverLog(res.Body)
- if err != nil {
- return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
- }
- failoverLogs[vBucket] = flog
- }
-
- return failoverLogs, nil
-}
-
-// Hijack exposes the underlying connection from this client.
-//
-// It also marks the connection as unhealthy since the client will
-// have lost control over the connection and can't otherwise verify
-// things are in good shape for connection pools.
-func (c *Client) Hijack() io.ReadWriteCloser {
- c.setHealthy(false)
- return c.conn
-}
-
-func (c *Client) setHealthy(healthy bool) {
- healthyState := UnHealthy
- if healthy {
- healthyState = Healthy
- }
- atomic.StoreUint32(&c.healthy, healthyState)
-}
-
-func IfResStatusError(response *gomemcached.MCResponse) bool {
- return response == nil ||
- (response.Status != gomemcached.SUBDOC_BAD_MULTI &&
- response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
- response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
-}
-
-func (c *Client) Conn() io.ReadWriteCloser {
- return c.conn
-}
-
-// Since the binary request supports only a single collection at a time, it is possible
-// that this may be called multiple times in succession by callers to get vbSeqnos for
-// multiple collections. Thus, caller could pass in a non-nil map so the gomemcached
-// client won't need to allocate new map for each call to prevent too much GC
-// NOTE: If collection is enabled and context is not given, KV will still return stats for default collection
-func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error) {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.GET_ALL_VB_SEQNOS,
- Opaque: opaqueGetSeqno,
- }
-
- err := c.setVbSeqnoContext(rq, context...)
- if err != nil {
- return vbSeqnoMap, err
- }
-
- err = c.Transmit(rq)
- if err != nil {
- return vbSeqnoMap, err
- }
-
- res, err := c.Receive()
- if err != nil {
- return vbSeqnoMap, fmt.Errorf("failed to receive: %v", err)
- }
-
- vbSeqnosList, err := parseGetSeqnoResp(res.Body)
- if err != nil {
- logging.Errorf("Unable to parse : err: %v\n", err)
- return vbSeqnoMap, err
- }
-
- if vbSeqnoMap == nil {
- vbSeqnoMap = make(map[uint16]uint64)
- }
-
- combineMapWithReturnedList(vbSeqnoMap, vbSeqnosList)
- return vbSeqnoMap, nil
-}
-
-func combineMapWithReturnedList(vbSeqnoMap map[uint16]uint64, list *VBSeqnos) {
- if list == nil {
- return
- }
-
- // If the map contains exactly the existing vbs in the list, no need to modify
- needToCleanupMap := true
- if len(vbSeqnoMap) == 0 {
- needToCleanupMap = false
- } else if len(vbSeqnoMap) == len(*list) {
- needToCleanupMap = false
- for _, pair := range *list {
- _, vbExists := vbSeqnoMap[uint16(pair[0])]
- if !vbExists {
- needToCleanupMap = true
- break
- }
- }
- }
-
- if needToCleanupMap {
- var vbsToDelete []uint16
- for vbInSeqnoMap, _ := range vbSeqnoMap {
- // If a vb in the seqno map doesn't exist in the returned list, need to clean up
- // to ensure returning an accurate result
- found := false
- var vbno uint16
- for _, pair := range *list {
- vbno = uint16(pair[0])
- if vbno == vbInSeqnoMap {
- found = true
- break
- } else if vbno > vbInSeqnoMap {
- // definitely not in the list
- break
- }
- }
- if !found {
- vbsToDelete = append(vbsToDelete, vbInSeqnoMap)
- }
- }
-
- for _, vbno := range vbsToDelete {
- delete(vbSeqnoMap, vbno)
- }
- }
-
- // Set the map with data from the list
- for _, pair := range *list {
- vbno := uint16(pair[0])
- seqno := pair[1]
- vbSeqnoMap[vbno] = seqno
- }
-}
diff --git a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
deleted file mode 100644
index fd628c5de2..0000000000
--- a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
+++ /dev/null
@@ -1,333 +0,0 @@
-package memcached
-
-import (
- "bytes"
- "encoding/binary"
- "fmt"
- "io"
- "math"
-
- "github.com/couchbase/gomemcached"
- "github.com/couchbase/goutils/logging"
-)
-
-// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol>
-
-// TapOpcode is the tap operation type (found in TapEvent)
-type TapOpcode uint8
-
-// Tap opcode values.
-const (
- TapBeginBackfill = TapOpcode(iota)
- TapEndBackfill
- TapMutation
- TapDeletion
- TapCheckpointStart
- TapCheckpointEnd
- tapEndStream
-)
-
-const tapMutationExtraLen = 16
-
-var tapOpcodeNames map[TapOpcode]string
-
-func init() {
- tapOpcodeNames = map[TapOpcode]string{
- TapBeginBackfill: "BeginBackfill",
- TapEndBackfill: "EndBackfill",
- TapMutation: "Mutation",
- TapDeletion: "Deletion",
- TapCheckpointStart: "TapCheckpointStart",
- TapCheckpointEnd: "TapCheckpointEnd",
- tapEndStream: "EndStream",
- }
-}
-
-func (opcode TapOpcode) String() string {
- name := tapOpcodeNames[opcode]
- if name == "" {
- name = fmt.Sprintf("#%d", opcode)
- }
- return name
-}
-
-// TapEvent is a TAP notification of an operation on the server.
-type TapEvent struct {
- Opcode TapOpcode // Type of event
- VBucket uint16 // VBucket this event applies to
- Flags uint32 // Item flags
- Expiry uint32 // Item expiration time
- Key, Value []byte // Item key/value
- Cas uint64
-}
-
-func makeTapEvent(req gomemcached.MCRequest) *TapEvent {
- event := TapEvent{
- VBucket: req.VBucket,
- }
- switch req.Opcode {
- case gomemcached.TAP_MUTATION:
- event.Opcode = TapMutation
- event.Key = req.Key
- event.Value = req.Body
- event.Cas = req.Cas
- case gomemcached.TAP_DELETE:
- event.Opcode = TapDeletion
- event.Key = req.Key
- event.Cas = req.Cas
- case gomemcached.TAP_CHECKPOINT_START:
- event.Opcode = TapCheckpointStart
- case gomemcached.TAP_CHECKPOINT_END:
- event.Opcode = TapCheckpointEnd
- case gomemcached.TAP_OPAQUE:
- if len(req.Extras) < 8+4 {
- return nil
- }
- switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op {
- case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
- event.Opcode = TapBeginBackfill
- case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL:
- event.Opcode = TapEndBackfill
- case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM:
- event.Opcode = tapEndStream
- case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK:
- return nil
- case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
- return nil
- default:
- logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op)
- return nil // unknown opaque event
- }
- case gomemcached.NOOP:
- return nil // ignore
- default:
- logging.Infof("TapFeed: Ignoring %s", req.Opcode)
- return nil // unknown event
- }
-
- if len(req.Extras) >= tapMutationExtraLen &&
- (event.Opcode == TapMutation || event.Opcode == TapDeletion) {
-
- event.Flags = binary.BigEndian.Uint32(req.Extras[8:])
- event.Expiry = binary.BigEndian.Uint32(req.Extras[12:])
- }
-
- return &event
-}
-
-func (event TapEvent) String() string {
- switch event.Opcode {
- case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd:
- return fmt.Sprintf("<TapEvent %s, vbucket=%d>",
- event.Opcode, event.VBucket)
- default:
- return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>",
- event.Opcode, event.Key, len(event.Value),
- event.Flags, event.Expiry)
- }
-}
-
-// TapArguments are parameters for requesting a TAP feed.
-//
-// Call DefaultTapArguments to get a default one.
-type TapArguments struct {
- // Timestamp of oldest item to send.
- //
- // Use TapNoBackfill to suppress all past items.
- Backfill uint64
- // If set, server will disconnect after sending existing items.
- Dump bool
- // The indices of the vbuckets to watch; empty/nil to watch all.
- VBuckets []uint16
- // Transfers ownership of vbuckets during cluster rebalance.
- Takeover bool
- // If true, server will wait for client ACK after every notification.
- SupportAck bool
- // If true, client doesn't want values so server shouldn't send them.
- KeysOnly bool
- // If true, client wants the server to send checkpoint events.
- Checkpoint bool
- // Optional identifier to use for this client, to allow reconnects
- ClientName string
- // Registers this client (by name) till explicitly deregistered.
- RegisteredClient bool
-}
-
-// Value for TapArguments.Backfill denoting that no past events at all
-// should be sent.
-const TapNoBackfill = math.MaxUint64
-
-// DefaultTapArguments returns a default set of parameter values to
-// pass to StartTapFeed.
-func DefaultTapArguments() TapArguments {
- return TapArguments{
- Backfill: TapNoBackfill,
- }
-}
-
-func (args *TapArguments) flags() []byte {
- var flags gomemcached.TapConnectFlag
- if args.Backfill != 0 {
- flags |= gomemcached.BACKFILL
- }
- if args.Dump {
- flags |= gomemcached.DUMP
- }
- if len(args.VBuckets) > 0 {
- flags |= gomemcached.LIST_VBUCKETS
- }
- if args.Takeover {
- flags |= gomemcached.TAKEOVER_VBUCKETS
- }
- if args.SupportAck {
- flags |= gomemcached.SUPPORT_ACK
- }
- if args.KeysOnly {
- flags |= gomemcached.REQUEST_KEYS_ONLY
- }
- if args.Checkpoint {
- flags |= gomemcached.CHECKPOINT
- }
- if args.RegisteredClient {
- flags |= gomemcached.REGISTERED_CLIENT
- }
- encoded := make([]byte, 4)
- binary.BigEndian.PutUint32(encoded, uint32(flags))
- return encoded
-}
-
-func must(err error) {
- if err != nil {
- panic(err)
- }
-}
-
-func (args *TapArguments) bytes() (rv []byte) {
- buf := bytes.NewBuffer([]byte{})
-
- if args.Backfill > 0 {
- must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill)))
- }
-
- if len(args.VBuckets) > 0 {
- must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets))))
- for i := 0; i < len(args.VBuckets); i++ {
- must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i])))
- }
- }
- return buf.Bytes()
-}
-
-// TapFeed represents a stream of events from a server.
-type TapFeed struct {
- C <-chan TapEvent
- Error error
- closer chan bool
-}
-
-// StartTapFeed starts a TAP feed on a client connection.
-//
-// The events can be read from the returned channel. The connection
-// can no longer be used for other purposes; it's now reserved for
-// receiving the TAP messages. To stop receiving events, close the
-// client connection.
-func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.TAP_CONNECT,
- Key: []byte(args.ClientName),
- Extras: args.flags(),
- Body: args.bytes()}
-
- err := mc.Transmit(rq)
- if err != nil {
- return nil, err
- }
-
- ch := make(chan TapEvent)
- feed := &TapFeed{
- C: ch,
- closer: make(chan bool),
- }
- go mc.runFeed(ch, feed)
- return feed, nil
-}
-
-// TapRecvHook is called after every incoming tap packet is received.
-var TapRecvHook func(*gomemcached.MCRequest, int, error)
-
-// Internal goroutine that reads from the socket and writes events to
-// the channel
-func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) {
- defer close(ch)
- var headerBuf [gomemcached.HDR_LEN]byte
-loop:
- for {
- // Read the next request from the server.
- //
- // (Can't call mc.Receive() because it reads a
- // _response_ not a request.)
- var pkt gomemcached.MCRequest
- n, err := pkt.Receive(mc.conn, headerBuf[:])
- if TapRecvHook != nil {
- TapRecvHook(&pkt, n, err)
- }
-
- if err != nil {
- if err != io.EOF {
- feed.Error = err
- }
- break loop
- }
-
- //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body)
-
- if pkt.Opcode == gomemcached.TAP_CONNECT {
- // This is not an event from the server; it's
- // an error response to my connect request.
- feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body)
- break loop
- }
-
- event := makeTapEvent(pkt)
- if event != nil {
- if event.Opcode == tapEndStream {
- break loop
- }
-
- select {
- case ch <- *event:
- case <-feed.closer:
- break loop
- }
- }
-
- if len(pkt.Extras) >= 4 {
- reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:])
- if reqFlags&gomemcached.TAP_ACK != 0 {
- if _, err := mc.sendAck(&pkt); err != nil {
- feed.Error = err
- break loop
- }
- }
- }
- }
- if err := mc.Close(); err != nil {
- logging.Errorf("Error closing memcached client: %v", err)
- }
-}
-
-func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) {
- res := gomemcached.MCResponse{
- Opcode: pkt.Opcode,
- Opaque: pkt.Opaque,
- Status: gomemcached.SUCCESS,
- }
- return res.Transmit(mc.conn)
-}
-
-// Close terminates a TapFeed.
-//
-// Call this if you stop using a TapFeed before its channel ends.
-func (feed *TapFeed) Close() {
- close(feed.closer)
-}
diff --git a/vendor/github.com/couchbase/gomemcached/client/transport.go b/vendor/github.com/couchbase/gomemcached/client/transport.go
deleted file mode 100644
index f4cea17fca..0000000000
--- a/vendor/github.com/couchbase/gomemcached/client/transport.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package memcached
-
-import (
- "errors"
- "io"
-
- "github.com/couchbase/gomemcached"
-)
-
-var errNoConn = errors.New("no connection")
-
-// UnwrapMemcachedError converts memcached errors to normal responses.
-//
-// If the error is a memcached response, declare the error to be nil
-// so a client can handle the status without worrying about whether it
-// indicates success or failure.
-func UnwrapMemcachedError(rv *gomemcached.MCResponse,
- err error) (*gomemcached.MCResponse, error) {
-
- if rv == err {
- return rv, nil
- }
- return rv, err
-}
-
-// ReceiveHook is called after every packet is received (or attempted to be)
-var ReceiveHook func(*gomemcached.MCResponse, int, error)
-
-func getResponse(s io.Reader, hdrBytes []byte) (rv *gomemcached.MCResponse, n int, err error) {
- if s == nil {
- return nil, 0, errNoConn
- }
-
- rv = &gomemcached.MCResponse{}
- n, err = rv.Receive(s, hdrBytes)
-
- if ReceiveHook != nil {
- ReceiveHook(rv, n, err)
- }
-
- if err == nil && (rv.Status != gomemcached.SUCCESS && rv.Status != gomemcached.AUTH_CONTINUE) {
- err = rv
- }
- return rv, n, err
-}
-
-// TransmitHook is called after each packet is transmitted.
-var TransmitHook func(*gomemcached.MCRequest, int, error)
-
-func transmitRequest(o io.Writer, req *gomemcached.MCRequest) (int, error) {
- if o == nil {
- return 0, errNoConn
- }
- n, err := req.Transmit(o)
- if TransmitHook != nil {
- TransmitHook(req, n, err)
- }
- return n, err
-}
-
-func transmitResponse(o io.Writer, res *gomemcached.MCResponse) (int, error) {
- if o == nil {
- return 0, errNoConn
- }
- n, err := res.Transmit(o)
- return n, err
-}
diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_event.go b/vendor/github.com/couchbase/gomemcached/client/upr_event.go
deleted file mode 100644
index 2d3454aecc..0000000000
--- a/vendor/github.com/couchbase/gomemcached/client/upr_event.go
+++ /dev/null
@@ -1,403 +0,0 @@
-package memcached
-
-import (
- "encoding/binary"
- "fmt"
- "github.com/couchbase/gomemcached"
- "math"
-)
-
-type SystemEventType int
-
-const InvalidSysEvent SystemEventType = -1
-
-const (
- CollectionCreate SystemEventType = 0
- CollectionDrop SystemEventType = iota
- CollectionFlush SystemEventType = iota // KV did not implement
- ScopeCreate SystemEventType = iota
- ScopeDrop SystemEventType = iota
- CollectionChanged SystemEventType = iota
-)
-
-type ScopeCreateEvent interface {
- GetSystemEventName() (string, error)
- GetScopeId() (uint32, error)
- GetManifestId() (uint64, error)
-}
-
-type CollectionCreateEvent interface {
- GetSystemEventName() (string, error)
- GetScopeId() (uint32, error)
- GetCollectionId() (uint32, error)
- GetManifestId() (uint64, error)
- GetMaxTTL() (uint32, error)
-}
-
-type CollectionDropEvent interface {
- GetScopeId() (uint32, error)
- GetCollectionId() (uint32, error)
- GetManifestId() (uint64, error)
-}
-
-type ScopeDropEvent interface {
- GetScopeId() (uint32, error)
- GetManifestId() (uint64, error)
-}
-
-type CollectionChangedEvent interface {
- GetCollectionId() (uint32, error)
- GetManifestId() (uint64, error)
- GetMaxTTL() (uint32, error)
-}
-
-var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
-var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
-var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
-var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")
-
-// UprEvent memcached events for UPR streams.
-type UprEvent struct {
- Opcode gomemcached.CommandCode // Type of event
- Status gomemcached.Status // Response status
- VBucket uint16 // VBucket this event applies to
- DataType uint8 // data type
- Opaque uint16 // 16 MSB of opaque
- VBuuid uint64 // This field is set by downstream
- Flags uint32 // Item flags
- Expiry uint32 // Item expiration time
- Key, Value []byte // Item key/value
- OldValue []byte // TODO: TBD: old document value
- Cas uint64 // CAS value of the item
- Seqno uint64 // sequence number of the mutation
- RevSeqno uint64 // rev sequence number : deletions
- LockTime uint32 // Lock time
- MetadataSize uint16 // Metadata size
- SnapstartSeq uint64 // start sequence number of this snapshot
- SnapendSeq uint64 // End sequence number of the snapshot
- SnapshotType uint32 // 0: disk 1: memory
- FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
- Error error // Error value in case of a failure
- ExtMeta []byte // Extended Metadata
- AckSize uint32 // The number of bytes that can be Acked to DCP
- SystemEvent SystemEventType // Only valid if IsSystemEvent() is true
- SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different
- ValueLen int // Cache it to avoid len() calls for performance
- CollectionId uint32 // Valid if Collection is in use
- StreamId *uint16 // Nil if not in use
-}
-
-// FailoverLog containing vvuid and sequnce number
-type FailoverLog [][2]uint64
-
-// Containing a pair of vbno and the high seqno
-type VBSeqnos [][2]uint64
-
-func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
- event := &UprEvent{
- Opcode: rq.Opcode,
- VBucket: stream.Vbucket,
- VBuuid: stream.Vbuuid,
- Value: rq.Body,
- Cas: rq.Cas,
- ExtMeta: rq.ExtMeta,
- DataType: rq.DataType,
- ValueLen: len(rq.Body),
- SystemEvent: InvalidSysEvent,
- CollectionId: math.MaxUint32,
- }
-
- event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType)
-
- // set AckSize for events that need to be acked to DCP,
- // i.e., events with CommandCodes that need to be buffered in DCP
- if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
- event.AckSize = uint32(bytesReceivedFromDCP)
- }
-
- // 16 LSBits are used by client library to encode vbucket number.
- // 16 MSBits are left for application to multiplex on opaque value.
- event.Opaque = appOpaque(rq.Opaque)
-
- if len(rq.Extras) >= uprMutationExtraLen &&
- event.Opcode == gomemcached.UPR_MUTATION {
-
- event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
- event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
- event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
- event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
- event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
-
- } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
- event.Opcode == gomemcached.UPR_DELETION {
-
- event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
- event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
-
- } else if len(rq.Extras) >= uprDeletetionExtraLen &&
- event.Opcode == gomemcached.UPR_DELETION ||
- event.Opcode == gomemcached.UPR_EXPIRATION {
-
- event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
- event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
-
- } else if len(rq.Extras) >= uprSnapshotExtraLen &&
- event.Opcode == gomemcached.UPR_SNAPSHOT {
-
- event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
- event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
- } else if event.IsSystemEvent() {
- event.PopulateEvent(rq.Extras)
- } else if event.IsSeqnoAdv() {
- event.PopulateSeqnoAdv(rq.Extras)
- } else if event.IsOsoSnapshot() {
- event.PopulateOso(rq.Extras)
- }
-
- return event
-}
-
-func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) {
- switch streamType {
- case CollectionsStreamId:
- for _, extra := range rq.FramingExtras {
- streamId, streamIdErr := extra.GetStreamId()
- if streamIdErr == nil {
- event.StreamId = &streamId
- }
- }
- // After parsing streamID, still need to populate regular collectionID
- fallthrough
- case CollectionsNonStreamId:
- switch rq.Opcode {
- // Only these will have CID encoded within the key
- case gomemcached.UPR_MUTATION,
- gomemcached.UPR_DELETION,
- gomemcached.UPR_EXPIRATION:
- uleb128 := Uleb128(rq.Key)
- result, bytesShifted := uleb128.ToUint32(rq.Keylen)
- event.CollectionId = result
- event.Key = rq.Key[bytesShifted:]
- default:
- event.Key = rq.Key
- }
- case NonCollectionStream:
- // Let default behavior be legacy stream type
- fallthrough
- default:
- event.Key = rq.Key
- }
-}
-
-func (event *UprEvent) String() string {
- name := gomemcached.CommandNames[event.Opcode]
- if name == "" {
- name = fmt.Sprintf("#%d", event.Opcode)
- }
- return name
-}
-
-func (event *UprEvent) IsSnappyDataType() bool {
- return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
-}
-
-func (event *UprEvent) IsCollectionType() bool {
- return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32
-}
-
-func (event *UprEvent) IsSystemEvent() bool {
- return event.Opcode == gomemcached.DCP_SYSTEM_EVENT
-}
-
-func (event *UprEvent) IsSeqnoAdv() bool {
- return event.Opcode == gomemcached.DCP_SEQNO_ADV
-}
-
-func (event *UprEvent) IsOsoSnapshot() bool {
- return event.Opcode == gomemcached.DCP_OSO_SNAPSHOT
-}
-
-func (event *UprEvent) PopulateEvent(extras []byte) {
- if len(extras) < dcpSystemEventExtraLen {
- // Wrong length, don't parse
- return
- }
-
- event.Seqno = binary.BigEndian.Uint64(extras[:8])
- event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12]))
- var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14])
- event.SysEventVersion = uint8(versionTemp >> 8)
-}
-
-func (event *UprEvent) PopulateSeqnoAdv(extras []byte) {
- if len(extras) < dcpSeqnoAdvExtraLen {
- // Wrong length, don't parse
- return
- }
-
- event.Seqno = binary.BigEndian.Uint64(extras[:8])
-}
-
-func (event *UprEvent) PopulateOso(extras []byte) {
- if len(extras) < dcpOsoExtraLen {
- // Wrong length, don't parse
- return
- }
- event.Flags = binary.BigEndian.Uint32(extras[:4])
-}
-
-func (event *UprEvent) GetSystemEventName() (string, error) {
- switch event.SystemEvent {
- case CollectionCreate:
- fallthrough
- case ScopeCreate:
- return string(event.Key), nil
- default:
- return "", ErrorInvalidOp
- }
-}
-
-func (event *UprEvent) GetManifestId() (uint64, error) {
- switch event.SystemEvent {
- // Version 0 only checks
- case CollectionChanged:
- fallthrough
- case ScopeDrop:
- fallthrough
- case ScopeCreate:
- fallthrough
- case CollectionDrop:
- if event.SysEventVersion > 0 {
- return 0, ErrorInvalidVersion
- }
- fallthrough
- case CollectionCreate:
- // CollectionCreate supports version 1
- if event.SysEventVersion > 1 {
- return 0, ErrorInvalidVersion
- }
- if event.ValueLen < 8 {
- return 0, ErrorValueTooShort
- }
- return binary.BigEndian.Uint64(event.Value[0:8]), nil
- default:
- return 0, ErrorInvalidOp
- }
-}
-
-func (event *UprEvent) GetCollectionId() (uint32, error) {
- switch event.SystemEvent {
- case CollectionDrop:
- if event.SysEventVersion > 0 {
- return 0, ErrorInvalidVersion
- }
- fallthrough
- case CollectionCreate:
- if event.SysEventVersion > 1 {
- return 0, ErrorInvalidVersion
- }
- if event.ValueLen < 16 {
- return 0, ErrorValueTooShort
- }
- return binary.BigEndian.Uint32(event.Value[12:16]), nil
- case CollectionChanged:
- if event.SysEventVersion > 0 {
- return 0, ErrorInvalidVersion
- }
- if event.ValueLen < 12 {
- return 0, ErrorValueTooShort
- }
- return binary.BigEndian.Uint32(event.Value[8:12]), nil
- default:
- return 0, ErrorInvalidOp
- }
-}
-
-func (event *UprEvent) GetScopeId() (uint32, error) {
- switch event.SystemEvent {
- // version 0 checks
- case ScopeCreate:
- fallthrough
- case ScopeDrop:
- fallthrough
- case CollectionDrop:
- if event.SysEventVersion > 0 {
- return 0, ErrorInvalidVersion
- }
- fallthrough
- case CollectionCreate:
- // CollectionCreate could be either 0 or 1
- if event.SysEventVersion > 1 {
- return 0, ErrorInvalidVersion
- }
- if event.ValueLen < 12 {
- return 0, ErrorValueTooShort
- }
- return binary.BigEndian.Uint32(event.Value[8:12]), nil
- default:
- return 0, ErrorInvalidOp
- }
-}
-
-func (event *UprEvent) GetMaxTTL() (uint32, error) {
- switch event.SystemEvent {
- case CollectionCreate:
- if event.SysEventVersion < 1 {
- return 0, ErrorNoMaxTTL
- }
- if event.ValueLen < 20 {
- return 0, ErrorValueTooShort
- }
- return binary.BigEndian.Uint32(event.Value[16:20]), nil
- case CollectionChanged:
- if event.SysEventVersion > 0 {
- return 0, ErrorInvalidVersion
- }
- if event.ValueLen < 16 {
- return 0, ErrorValueTooShort
- }
- return binary.BigEndian.Uint32(event.Value[12:16]), nil
- default:
- return 0, ErrorInvalidOp
- }
-}
-
-// Only if error is nil:
-// Returns true if event states oso begins
-// Return false if event states oso ends
-func (event *UprEvent) GetOsoBegin() (bool, error) {
- if !event.IsOsoSnapshot() {
- return false, ErrorInvalidOp
- }
-
- if event.Flags == 1 {
- return true, nil
- } else if event.Flags == 2 {
- return false, nil
- } else {
- return false, ErrorInvalidOp
- }
-}
-
-type Uleb128 []byte
-
-func (u Uleb128) ToUint32(cachedLen int) (result uint32, bytesShifted int) {
- var shift uint = 0
-
- for curByte := 0; curByte < cachedLen; curByte++ {
- oneByte := u[curByte]
- last7Bits := 0x7f & oneByte
- result |= uint32(last7Bits) << shift
- bytesShifted++
- if oneByte&0x80 == 0 {
- break
- }
- shift += 7
- }
-
- return
-}
diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go
deleted file mode 100644
index cdbed16bd3..0000000000
--- a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go
+++ /dev/null
@@ -1,1132 +0,0 @@
-// go implementation of upr client.
-// See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md
-// TODO
-// 1. Use a pool allocator to avoid garbage
-package memcached
-
-import (
- "encoding/binary"
- "errors"
- "fmt"
- "github.com/couchbase/gomemcached"
- "github.com/couchbase/goutils/logging"
- "strconv"
- "sync"
- "sync/atomic"
-)
-
-const uprMutationExtraLen = 30
-const uprDeletetionExtraLen = 18
-const uprDeletetionWithDeletionTimeExtraLen = 21
-const uprSnapshotExtraLen = 20
-const dcpSystemEventExtraLen = 13
-const dcpSeqnoAdvExtraLen = 8
-const bufferAckThreshold = 0.2
-const opaqueOpen = 0xBEAF0001
-const opaqueFailover = 0xDEADBEEF
-const opaqueGetSeqno = 0xDEADBEEF
-const uprDefaultNoopInterval = 120
-const dcpOsoExtraLen = 4
-
-// Counter on top of opaqueOpen that others can draw from for open and control msgs
-var opaqueOpenCtrlWell uint32 = opaqueOpen
-
-type PriorityType string
-
-// high > medium > disabled > low
-const (
- PriorityDisabled PriorityType = ""
- PriorityLow PriorityType = "low"
- PriorityMed PriorityType = "medium"
- PriorityHigh PriorityType = "high"
-)
-
-type DcpStreamType int32
-
-var UninitializedStream DcpStreamType = -1
-
-const (
- NonCollectionStream DcpStreamType = 0
- CollectionsNonStreamId DcpStreamType = iota
- CollectionsStreamId DcpStreamType = iota
-)
-
-func (t DcpStreamType) String() string {
- switch t {
- case UninitializedStream:
- return "Un-Initialized Stream"
- case NonCollectionStream:
- return "Traditional Non-Collection Stream"
- case CollectionsNonStreamId:
- return "Collections Stream without StreamID"
- case CollectionsStreamId:
- return "Collection Stream with StreamID"
- default:
- return "Unknown Stream Type"
- }
-}
-
-// UprStream is per stream data structure over an UPR Connection.
-type UprStream struct {
- Vbucket uint16 // Vbucket id
- Vbuuid uint64 // vbucket uuid
- StartSeq uint64 // start sequence number
- EndSeq uint64 // end sequence number
- connected bool
- StreamType DcpStreamType
-}
-
-type FeedState int
-
-const (
- FeedStateInitial = iota
- FeedStateOpened = iota
- FeedStateClosed = iota
-)
-
-func (fs FeedState) String() string {
- switch fs {
- case FeedStateInitial:
- return "Initial"
- case FeedStateOpened:
- return "Opened"
- case FeedStateClosed:
- return "Closed"
- default:
- return "Unknown"
- }
-}
-
-const (
- CompressionTypeStartMarker = iota // also means invalid
- CompressionTypeNone = iota
- CompressionTypeSnappy = iota
- CompressionTypeEndMarker = iota // also means invalid
-)
-
-// kv_engine/include/mcbp/protocol/datatype.h
-const (
- JSONDataType uint8 = 1
- SnappyDataType uint8 = 2
- XattrDataType uint8 = 4
-)
-
-type UprFeatures struct {
- Xattribute bool
- CompressionType int
- IncludeDeletionTime bool
- DcpPriority PriorityType
- EnableExpiry bool
- EnableStreamId bool
- EnableOso bool
-}
-
-/**
- * Used to handle multiple concurrent calls UprRequestStream() by UprFeed clients
- * It is expected that a client that calls UprRequestStream() more than once should issue
- * different "opaque" (version) numbers
- */
-type opaqueStreamMap map[uint16]*UprStream // opaque -> stream
-
-type vbStreamNegotiator struct {
- vbHandshakeMap map[uint16]opaqueStreamMap // vbno -> opaqueStreamMap
- mutex sync.RWMutex
-}
-
-func (negotiator *vbStreamNegotiator) initialize() {
- negotiator.mutex.Lock()
- negotiator.vbHandshakeMap = make(map[uint16]opaqueStreamMap)
- negotiator.mutex.Unlock()
-}
-
-func (negotiator *vbStreamNegotiator) registerRequest(vbno, opaque uint16, vbuuid, startSequence, endSequence uint64) {
- negotiator.mutex.Lock()
- defer negotiator.mutex.Unlock()
-
- var osMap opaqueStreamMap
- var ok bool
- if osMap, ok = negotiator.vbHandshakeMap[vbno]; !ok {
- osMap = make(opaqueStreamMap)
- negotiator.vbHandshakeMap[vbno] = osMap
- }
-
- if _, ok = osMap[opaque]; !ok {
- osMap[opaque] = &UprStream{
- Vbucket: vbno,
- Vbuuid: vbuuid,
- StartSeq: startSequence,
- EndSeq: endSequence,
- }
- }
-}
-
-func (negotiator *vbStreamNegotiator) getStreamsCntFromMap(vbno uint16) int {
- negotiator.mutex.RLock()
- defer negotiator.mutex.RUnlock()
-
- osmap, ok := negotiator.vbHandshakeMap[vbno]
- if !ok {
- return 0
- } else {
- return len(osmap)
- }
-}
-
-func (negotiator *vbStreamNegotiator) getStreamFromMap(vbno, opaque uint16) (*UprStream, error) {
- negotiator.mutex.RLock()
- defer negotiator.mutex.RUnlock()
-
- osmap, ok := negotiator.vbHandshakeMap[vbno]
- if !ok {
- return nil, fmt.Errorf("Error: stream for vb: %v does not exist", vbno)
- }
-
- stream, ok := osmap[opaque]
- if !ok {
- return nil, fmt.Errorf("Error: stream for vb: %v opaque: %v does not exist", vbno, opaque)
- }
- return stream, nil
-}
-
-func (negotiator *vbStreamNegotiator) deleteStreamFromMap(vbno, opaque uint16) {
- negotiator.mutex.Lock()
- defer negotiator.mutex.Unlock()
-
- osmap, ok := negotiator.vbHandshakeMap[vbno]
- if !ok {
- return
- }
-
- delete(osmap, opaque)
- if len(osmap) == 0 {
- delete(negotiator.vbHandshakeMap, vbno)
- }
-}
-
-func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed,
- headerBuf [gomemcached.HDR_LEN]byte, pktPtr *gomemcached.MCRequest, bytesReceivedFromDCP int,
- response *gomemcached.MCResponse) (*UprEvent, error) {
- var event *UprEvent
-
- if feed == nil || response == nil || pktPtr == nil {
- return nil, errors.New("Invalid inputs")
- }
-
- // Get Stream from negotiator map
- vbno := vbOpaque(response.Opaque)
- opaque := appOpaque(response.Opaque)
-
- stream, err := negotiator.getStreamFromMap(vbno, opaque)
- if err != nil {
- err = fmt.Errorf("Stream not found for vb %d: %#v", vbno, *pktPtr)
- logging.Errorf(err.Error())
- return nil, err
- }
-
- status, rb, flog, err := handleStreamRequest(response, headerBuf[:])
-
- if status == gomemcached.ROLLBACK {
- event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
- event.Status = status
- // rollback stream
- logging.Infof("UPR_STREAMREQ with rollback %d for vb %d Failed: %v", rb, vbno, err)
- negotiator.deleteStreamFromMap(vbno, opaque)
- } else if status == gomemcached.SUCCESS {
- event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
- event.Seqno = stream.StartSeq
- event.FailoverLog = flog
- event.Status = status
- feed.activateStream(vbno, opaque, stream)
- feed.negotiator.deleteStreamFromMap(vbno, opaque)
- logging.Infof("UPR_STREAMREQ for vb %d successful", vbno)
-
- } else if err != nil {
- logging.Errorf("UPR_STREAMREQ for vbucket %d erro %s", vbno, err.Error())
- event = &UprEvent{
- Opcode: gomemcached.UPR_STREAMREQ,
- Status: status,
- VBucket: vbno,
- Error: err,
- }
- negotiator.deleteStreamFromMap(vbno, opaque)
- }
- return event, nil
-}
-
-func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) {
- negotiator.mutex.Lock()
- defer negotiator.mutex.Unlock()
-
- delete(negotiator.vbHandshakeMap, vbno)
-}
-
-// UprFeed represents an UPR feed. A feed contains a connection to a single
-// host and multiple vBuckets
-type UprFeed struct {
- // lock for feed.vbstreams
- muVbstreams sync.RWMutex
- C <-chan *UprEvent // Exported channel for receiving UPR events
- negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation
- vbstreams map[uint16]*UprStream // official live vb->stream mapping
- closer chan bool // closer
- conn *Client // connection to UPR producer
- Error error // error
- bytesRead uint64 // total bytes read on this connection
- toAckBytes uint32 // bytes client has read
- maxAckBytes uint32 // Max buffer control ack bytes
- stats UprStats // Stats for upr client
- transmitCh chan *gomemcached.MCRequest // transmit command channel
- transmitCl chan bool // closer channel for transmit go-routine
- // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP
- // if flag is false, upr feed will track how many bytes it has sent to client
- // and use that to determine whether/when to send ack to DCP
- ackByClient bool
- feedState FeedState
- muFeedState sync.RWMutex
- activatedFeatures UprFeatures
- collectionEnabled bool // This is needed separately because parsing depends on this
- // DCP StreamID allows multiple filtered collection streams to share a single DCP Stream
- // It is not allowed once a regular/legacy stream was started originally
- streamsType DcpStreamType
- initStreamTypeOnce sync.Once
-}
-
-// Exported interface - to allow for mocking
-type UprFeedIface interface {
- Close()
- Closed() bool
- CloseStream(vbno, opaqueMSB uint16) error
- GetError() error
- GetUprStats() *UprStats
- ClientAck(event *UprEvent) error
- GetUprEventCh() <-chan *UprEvent
- StartFeed() error
- StartFeedWithConfig(datachan_len int) error
- UprOpen(name string, sequence uint32, bufSize uint32) error
- UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
- UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
- UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
- // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response
- SetPriorityAsync(p PriorityType) error
-
- // Various Collection-Type RequestStreams
- UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error
-}
-
-type UprStats struct {
- TotalBytes uint64
- TotalMutation uint64
- TotalBufferAckSent uint64
- TotalSnapShot uint64
-}
-
-// error codes
-var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
-
-func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) {
- if flogp != nil {
- flog := *flogp
- latest := flog[len(flog)-1]
- return latest[0], latest[1], nil
- }
- return vbuuid, seqno, ErrorInvalidLog
-}
-
-func (feed *UprFeed) sendCommands(mc *Client) {
- transmitCh := feed.transmitCh
- transmitCl := feed.transmitCl
-loop:
- for {
- select {
- case command := <-transmitCh:
- if err := mc.Transmit(command); err != nil {
- logging.Errorf("Failed to transmit command %s. Error %s", command.Opcode.String(), err.Error())
- // get feed to close and runFeed routine to exit
- feed.Close()
- break loop
- }
-
- case <-transmitCl:
- break loop
- }
- }
-
- // After sendCommands exits, write to transmitCh will block forever
- // when we write to transmitCh, e.g., at CloseStream(), we need to check feed closure to have an exit route
-
- logging.Infof("sendCommands exiting")
-}
-
-// Sets the specified stream as the connected stream for this vbno, and also cleans up negotiator
-func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) error {
- feed.muVbstreams.Lock()
- defer feed.muVbstreams.Unlock()
-
- if feed.collectionEnabled {
- stream.StreamType = feed.streamsType
- }
-
- // Set this stream as the officially connected stream for this vb
- stream.connected = true
- feed.vbstreams[vbno] = stream
- return nil
-}
-
-func (feed *UprFeed) cleanUpVbStream(vbno uint16) {
- feed.muVbstreams.Lock()
- defer feed.muVbstreams.Unlock()
-
- delete(feed.vbstreams, vbno)
-}
-
-// NewUprFeed creates a new UPR Feed.
-// TODO: Describe side-effects on bucket instance and its connection pool.
-func (mc *Client) NewUprFeed() (*UprFeed, error) {
- return mc.NewUprFeedWithConfig(false /*ackByClient*/)
-}
-
-func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) {
- feed := &UprFeed{
- conn: mc,
- closer: make(chan bool, 1),
- vbstreams: make(map[uint16]*UprStream),
- transmitCh: make(chan *gomemcached.MCRequest),
- transmitCl: make(chan bool),
- ackByClient: ackByClient,
- collectionEnabled: mc.CollectionEnabled(),
- streamsType: UninitializedStream,
- }
-
- feed.negotiator.initialize()
-
- go feed.sendCommands(mc)
- return feed, nil
-}
-
-func (mc *Client) NewUprFeedIface() (UprFeedIface, error) {
- return mc.NewUprFeed()
-}
-
-func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) {
- return mc.NewUprFeedWithConfig(ackByClient)
-}
-
-func doUprOpen(mc *Client, name string, sequence uint32, features UprFeatures) error {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_OPEN,
- Key: []byte(name),
- Opaque: getUprOpenCtrlOpaque(),
- }
-
- rq.Extras = make([]byte, 8)
- binary.BigEndian.PutUint32(rq.Extras[:4], sequence)
-
- // opens a producer type connection
- flags := gomemcached.DCP_PRODUCER
- if features.Xattribute {
- flags = flags | gomemcached.DCP_OPEN_INCLUDE_XATTRS
- }
- if features.IncludeDeletionTime {
- flags = flags | gomemcached.DCP_OPEN_INCLUDE_DELETE_TIMES
- }
- binary.BigEndian.PutUint32(rq.Extras[4:], flags)
-
- return sendMcRequestSync(mc, rq)
-}
-
-// Synchronously send a memcached request and wait for the response
-func sendMcRequestSync(mc *Client, req *gomemcached.MCRequest) error {
- if err := mc.Transmit(req); err != nil {
- return err
- }
-
- if res, err := mc.Receive(); err != nil {
- return err
- } else if req.Opcode != res.Opcode {
- return fmt.Errorf("unexpected #opcode sent %v received %v", req.Opcode, res.Opaque)
- } else if req.Opaque != res.Opaque {
- return fmt.Errorf("opaque mismatch, sent %v received %v", req.Opaque, res.Opaque)
- } else if res.Status != gomemcached.SUCCESS {
- return fmt.Errorf("error %v", res.Status)
- }
- return nil
-}
-
-// UprOpen to connect with a UPR producer.
-// Name: name of te UPR connection
-// sequence: sequence number for the connection
-// bufsize: max size of the application
-func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error {
- var allFeaturesDisabled UprFeatures
- err, _ := feed.uprOpen(name, sequence, bufSize, allFeaturesDisabled)
- return err
-}
-
-// UprOpen with XATTR enabled.
-func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error {
- var onlyXattrEnabled UprFeatures
- onlyXattrEnabled.Xattribute = true
- err, _ := feed.uprOpen(name, sequence, bufSize, onlyXattrEnabled)
- return err
-}
-
-func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) {
- return feed.uprOpen(name, sequence, bufSize, features)
-}
-
-func (feed *UprFeed) SetPriorityAsync(p PriorityType) error {
- if !feed.isOpen() {
- // do not send this command if upr feed is not yet open, otherwise it may interfere with
- // feed start up process, which relies on synchronous message exchange with DCP.
- return fmt.Errorf("Upr feed is not open. State=%v", feed.getState())
- }
-
- return feed.setPriority(p, false /*sync*/)
-}
-
-func (feed *UprFeed) setPriority(p PriorityType, sync bool) error {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("set_priority"),
- Body: []byte(p),
- Opaque: getUprOpenCtrlOpaque(),
- }
- if sync {
- return sendMcRequestSync(feed.conn, rq)
- } else {
- return feed.writeToTransmitCh(rq)
-
- }
-}
-
-func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) {
- mc := feed.conn
-
- // First set this to an invalid value to state that the method hasn't gotten to executing this control yet
- activatedFeatures.CompressionType = CompressionTypeEndMarker
-
- if err = doUprOpen(mc, name, sequence, features); err != nil {
- return
- }
-
- activatedFeatures.Xattribute = features.Xattribute
-
- // send a UPR control message to set the window size for the this connection
- if bufSize > 0 {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("connection_buffer_size"),
- Body: []byte(strconv.Itoa(int(bufSize))),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- if err != nil {
- return
- }
- feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufSize))
- }
-
- // enable noop and set noop interval
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("enable_noop"),
- Body: []byte("true"),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- if err != nil {
- return
- }
-
- rq = &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("set_noop_interval"),
- Body: []byte(strconv.Itoa(int(uprDefaultNoopInterval))),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- if err != nil {
- return
- }
-
- if features.CompressionType == CompressionTypeSnappy {
- activatedFeatures.CompressionType = CompressionTypeNone
- rq = &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("force_value_compression"),
- Body: []byte("true"),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- } else if features.CompressionType == CompressionTypeEndMarker {
- err = fmt.Errorf("UPR_CONTROL Failed - Invalid CompressionType: %v", features.CompressionType)
- }
- if err != nil {
- return
- }
- activatedFeatures.CompressionType = features.CompressionType
-
- if features.DcpPriority != PriorityDisabled {
- err = feed.setPriority(features.DcpPriority, true /*sync*/)
- if err == nil {
- activatedFeatures.DcpPriority = features.DcpPriority
- } else {
- return
- }
- }
-
- if features.EnableExpiry {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("enable_expiry_opcode"),
- Body: []byte("true"),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- if err != nil {
- return
- }
- activatedFeatures.EnableExpiry = true
- }
-
- if features.EnableStreamId {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("enable_stream_id"),
- Body: []byte("true"),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- if err != nil {
- return
- }
- activatedFeatures.EnableStreamId = true
- }
-
- if features.EnableOso {
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CONTROL,
- Key: []byte("enable_out_of_order_snapshots"),
- Body: []byte("true"),
- Opaque: getUprOpenCtrlOpaque(),
- }
- err = sendMcRequestSync(feed.conn, rq)
- if err != nil {
- return
- }
- activatedFeatures.EnableOso = true
- }
-
- // everything is ok so far, set upr feed to open state
- feed.activatedFeatures = activatedFeatures
- feed.setOpen()
- return
-}
-
-// UprRequestStream for a single vbucket.
-func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
- vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
-
- return feed.UprRequestCollectionsStream(vbno, opaqueMSB, flags, vuuid, startSequence, endSequence, snapStart, snapEnd, nil)
-}
-
-func (feed *UprFeed) initStreamType(filter *CollectionsFilter) (err error) {
- if filter != nil && filter.UseStreamId && !feed.activatedFeatures.EnableStreamId {
- err = fmt.Errorf("Cannot use streamID based filter if the feed was not started with the streamID feature")
- return
- }
-
- streamInitFunc := func() {
- if feed.streamsType != UninitializedStream {
- // Shouldn't happen
- err = fmt.Errorf("The current feed has already been started in %v mode", feed.streamsType.String())
- } else {
- if !feed.collectionEnabled {
- feed.streamsType = NonCollectionStream
- } else {
- if filter != nil && filter.UseStreamId {
- feed.streamsType = CollectionsStreamId
- } else {
- feed.streamsType = CollectionsNonStreamId
- }
- }
- }
- }
- feed.initStreamTypeOnce.Do(streamInitFunc)
- return
-}
-
-func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32,
- vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error {
-
- err := feed.initStreamType(filter)
- if err != nil {
- return err
- }
-
- var mcRequestBody []byte
- if filter != nil {
- err = filter.IsValid()
- if err != nil {
- return err
- }
- mcRequestBody, err = filter.ToStreamReqBody()
- if err != nil {
- return err
- }
- }
-
- rq := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_STREAMREQ,
- VBucket: vbno,
- Opaque: composeOpaque(vbno, opaqueMSB),
- Body: mcRequestBody,
- }
-
- rq.Extras = make([]byte, 48) // #Extras
- binary.BigEndian.PutUint32(rq.Extras[:4], flags)
- binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
- binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
- binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
- binary.BigEndian.PutUint64(rq.Extras[24:32], vbuuid)
- binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
- binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
-
- feed.negotiator.registerRequest(vbno, opaqueMSB, vbuuid, startSequence, endSequence)
- // Any client that has ever called this method, regardless of return code,
- // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit.
-
- if err = feed.conn.Transmit(rq); err != nil {
- logging.Errorf("Error in StreamRequest %s", err.Error())
- // If an error occurs during transmit, then the UPRFeed will keep the stream
- // in the vbstreams map. This is to prevent nil lookup from any previously
- // sent stream requests.
- return err
- }
-
- return nil
-}
-
-// CloseStream for specified vbucket.
-func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error {
-
- err := feed.validateCloseStream(vbno)
- if err != nil {
- logging.Infof("CloseStream for %v has been skipped because of error %v", vbno, err)
- return err
- }
-
- closeStream := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_CLOSESTREAM,
- VBucket: vbno,
- Opaque: composeOpaque(vbno, opaqueMSB),
- }
-
- feed.writeToTransmitCh(closeStream)
-
- return nil
-}
-
-func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent {
- return feed.C
-}
-
-func (feed *UprFeed) GetError() error {
- return feed.Error
-}
-
-func (feed *UprFeed) validateCloseStream(vbno uint16) error {
- feed.muVbstreams.RLock()
- nilVbStream := feed.vbstreams[vbno] == nil
- feed.muVbstreams.RUnlock()
-
- if nilVbStream && (feed.negotiator.getStreamsCntFromMap(vbno) == 0) {
- return fmt.Errorf("Stream for vb %d has not been requested", vbno)
- }
-
- return nil
-}
-
-func (feed *UprFeed) writeToTransmitCh(rq *gomemcached.MCRequest) error {
- // write to transmitCh may block forever if sendCommands has exited
- // check for feed closure to have an exit route in this case
- select {
- case <-feed.closer:
- errMsg := fmt.Sprintf("Abort sending request to transmitCh because feed has been closed. request=%v", rq)
- logging.Infof(errMsg)
- return errors.New(errMsg)
- case feed.transmitCh <- rq:
- }
- return nil
-}
-
-// StartFeed to start the upper feed.
-func (feed *UprFeed) StartFeed() error {
- return feed.StartFeedWithConfig(10)
-}
-
-func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error {
- ch := make(chan *UprEvent, datachan_len)
- feed.C = ch
- go feed.runFeed(ch)
- return nil
-}
-
-func parseFailoverLog(body []byte) (*FailoverLog, error) {
- if len(body)%16 != 0 {
- err := fmt.Errorf("invalid body length %v, in failover-log", len(body))
- return nil, err
- }
- log := make(FailoverLog, len(body)/16)
- for i, j := 0, 0; i < len(body); i += 16 {
- vuuid := binary.BigEndian.Uint64(body[i : i+8])
- seqno := binary.BigEndian.Uint64(body[i+8 : i+16])
- log[j] = [2]uint64{vuuid, seqno}
- j++
- }
- return &log, nil
-}
-
-func parseGetSeqnoResp(body []byte) (*VBSeqnos, error) {
- // vbno of 2 bytes + seqno of 8 bytes
- var entryLen int = 10
-
- if len(body)%entryLen != 0 {
- err := fmt.Errorf("invalid body length %v, in getVbSeqno", len(body))
- return nil, err
- }
- vbSeqnos := make(VBSeqnos, len(body)/entryLen)
- for i, j := 0, 0; i < len(body); i += entryLen {
- vbno := binary.BigEndian.Uint16(body[i : i+2])
- seqno := binary.BigEndian.Uint64(body[i+2 : i+10])
- vbSeqnos[j] = [2]uint64{uint64(vbno), seqno}
- j++
- }
- return &vbSeqnos, nil
-}
-
-func handleStreamRequest(
- res *gomemcached.MCResponse,
- headerBuf []byte,
-) (gomemcached.Status, uint64, *FailoverLog, error) {
-
- var rollback uint64
- var err error
-
- switch {
- case res.Status == gomemcached.ROLLBACK:
- logging.Infof("Rollback response. body=%v, headerBuf=%v\n", res.Body, headerBuf)
- rollback = binary.BigEndian.Uint64(res.Body)
- logging.Infof("Rollback seqno is %v for response with opaque %v\n", rollback, res.Opaque)
- return res.Status, rollback, nil, nil
-
- case res.Status != gomemcached.SUCCESS:
- err = fmt.Errorf("unexpected status %v for response with opaque %v", res.Status, res.Opaque)
- return res.Status, 0, nil, err
- }
-
- flog, err := parseFailoverLog(res.Body[:])
- return res.Status, rollback, flog, err
-}
-
-// generate stream end responses for all active vb streams
-func (feed *UprFeed) doStreamClose(ch chan *UprEvent) {
- feed.muVbstreams.RLock()
-
- uprEvents := make([]*UprEvent, len(feed.vbstreams))
- index := 0
- for vbno, stream := range feed.vbstreams {
- uprEvent := &UprEvent{
- VBucket: vbno,
- VBuuid: stream.Vbuuid,
- Opcode: gomemcached.UPR_STREAMEND,
- }
- uprEvents[index] = uprEvent
- index++
- }
-
- // release the lock before sending uprEvents to ch, which may block
- feed.muVbstreams.RUnlock()
-
-loop:
- for _, uprEvent := range uprEvents {
- select {
- case ch <- uprEvent:
- case <-feed.closer:
- logging.Infof("Feed has been closed. Aborting doStreamClose.")
- break loop
- }
- }
-}
-
-func (feed *UprFeed) runFeed(ch chan *UprEvent) {
- defer close(ch)
- var headerBuf [gomemcached.HDR_LEN]byte
- var pkt gomemcached.MCRequest
- var event *UprEvent
-
- mc := feed.conn.Hijack()
- uprStats := &feed.stats
-
-loop:
- for {
- select {
- case <-feed.closer:
- logging.Infof("Feed has been closed. Exiting.")
- break loop
- default:
- bytes, err := pkt.Receive(mc, headerBuf[:])
- if err != nil {
- logging.Errorf("Error in receive %s", err.Error())
- feed.Error = err
- // send all the stream close messages to the client
- feed.doStreamClose(ch)
- break loop
- } else {
- event = nil
- res := &gomemcached.MCResponse{
- Opcode: pkt.Opcode,
- Cas: pkt.Cas,
- Opaque: pkt.Opaque,
- Status: gomemcached.Status(pkt.VBucket),
- Extras: pkt.Extras,
- Key: pkt.Key,
- Body: pkt.Body,
- }
-
- vb := vbOpaque(pkt.Opaque)
- appOpaque := appOpaque(pkt.Opaque)
- uprStats.TotalBytes = uint64(bytes)
-
- feed.muVbstreams.RLock()
- stream := feed.vbstreams[vb]
- feed.muVbstreams.RUnlock()
-
- switch pkt.Opcode {
- case gomemcached.UPR_STREAMREQ:
- event, err = feed.negotiator.handleStreamRequest(feed, headerBuf, &pkt, bytes, res)
- if err != nil {
- logging.Infof(err.Error())
- break loop
- }
- case gomemcached.UPR_MUTATION,
- gomemcached.UPR_DELETION,
- gomemcached.UPR_EXPIRATION:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- event = makeUprEvent(pkt, stream, bytes)
- uprStats.TotalMutation++
-
- case gomemcached.UPR_STREAMEND:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- //stream has ended
- event = makeUprEvent(pkt, stream, bytes)
- logging.Infof("Stream Ended for vb %d", vb)
-
- feed.negotiator.deleteStreamFromMap(vb, appOpaque)
- feed.cleanUpVbStream(vb)
-
- case gomemcached.UPR_SNAPSHOT:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- // snapshot marker
- event = makeUprEvent(pkt, stream, bytes)
- uprStats.TotalSnapShot++
-
- case gomemcached.UPR_FLUSH:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- // special processing for flush ?
- event = makeUprEvent(pkt, stream, bytes)
-
- case gomemcached.UPR_CLOSESTREAM:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- event = makeUprEvent(pkt, stream, bytes)
- event.Opcode = gomemcached.UPR_STREAMEND // opcode re-write !!
- logging.Infof("Stream Closed for vb %d StreamEnd simulated", vb)
-
- feed.negotiator.deleteStreamFromMap(vb, appOpaque)
- feed.cleanUpVbStream(vb)
-
- case gomemcached.UPR_ADDSTREAM:
- logging.Infof("Opcode %v not implemented", pkt.Opcode)
-
- case gomemcached.UPR_CONTROL, gomemcached.UPR_BUFFERACK:
- if res.Status != gomemcached.SUCCESS {
- logging.Infof("Opcode %v received status %d", pkt.Opcode.String(), res.Status)
- }
-
- case gomemcached.UPR_NOOP:
- // send a NOOP back
- noop := &gomemcached.MCResponse{
- Opcode: gomemcached.UPR_NOOP,
- Opaque: pkt.Opaque,
- }
-
- if err := feed.conn.TransmitResponse(noop); err != nil {
- logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error())
- }
- case gomemcached.DCP_SYSTEM_EVENT:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- event = makeUprEvent(pkt, stream, bytes)
- case gomemcached.UPR_FAILOVERLOG:
- logging.Infof("Failover log for vb %d received: %v", vb, pkt)
- case gomemcached.DCP_SEQNO_ADV:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- event = makeUprEvent(pkt, stream, bytes)
- case gomemcached.DCP_OSO_SNAPSHOT:
- if stream == nil {
- logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
- break loop
- }
- event = makeUprEvent(pkt, stream, bytes)
- default:
- logging.Infof("Recived an unknown response for vbucket %d", vb)
- }
- }
-
- if event != nil {
- select {
- case ch <- event:
- case <-feed.closer:
- logging.Infof("Feed has been closed. Skip sending events. Exiting.")
- break loop
- }
-
- feed.muVbstreams.RLock()
- l := len(feed.vbstreams)
- feed.muVbstreams.RUnlock()
-
- if event.Opcode == gomemcached.UPR_CLOSESTREAM && l == 0 {
- logging.Infof("No more streams")
- }
- }
-
- if !feed.ackByClient {
- // if client does not ack, do the ack check now
- feed.sendBufferAckIfNeeded(event)
- }
- }
- }
-
- // make sure that feed is closed before we signal transmitCl and exit runFeed
- feed.Close()
-
- close(feed.transmitCl)
- logging.Infof("runFeed exiting")
-}
-
-// Client, after completing processing of an UprEvent, need to call this API to notify UprFeed,
-// so that UprFeed can update its ack bytes stats and send ack to DCP if needed
-// Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work
-// This API is not thread safe. Caller should NOT have more than one go rountine calling this API
-func (feed *UprFeed) ClientAck(event *UprEvent) error {
- if !feed.ackByClient {
- return errors.New("Upr feed does not have ackByclient flag set")
- }
- feed.sendBufferAckIfNeeded(event)
- return nil
-}
-
-// increment ack bytes if the event needs to be acked to DCP
-// send buffer ack if enough ack bytes have been accumulated
-func (feed *UprFeed) sendBufferAckIfNeeded(event *UprEvent) {
- if event == nil || event.AckSize == 0 {
- // this indicates that there is no need to ack to DCP
- return
- }
-
- totalBytes := feed.toAckBytes + event.AckSize
- if totalBytes > feed.maxAckBytes {
- feed.toAckBytes = 0
- feed.sendBufferAck(totalBytes)
- } else {
- feed.toAckBytes = totalBytes
- }
-}
-
-// send buffer ack to dcp
-func (feed *UprFeed) sendBufferAck(sendSize uint32) {
- bufferAck := &gomemcached.MCRequest{
- Opcode: gomemcached.UPR_BUFFERACK,
- }
- bufferAck.Extras = make([]byte, 4)
- binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(sendSize))
- feed.writeToTransmitCh(bufferAck)
- feed.stats.TotalBufferAckSent++
-}
-
-func (feed *UprFeed) GetUprStats() *UprStats {
- return &feed.stats
-}
-
-func composeOpaque(vbno, opaqueMSB uint16) uint32 {
- return (uint32(opaqueMSB) << 16) | uint32(vbno)
-}
-
-func getUprOpenCtrlOpaque() uint32 {
- return atomic.AddUint32(&opaqueOpenCtrlWell, 1)
-}
-
-func appOpaque(opq32 uint32) uint16 {
- return uint16((opq32 & 0xFFFF0000) >> 16)
-}
-
-func vbOpaque(opq32 uint32) uint16 {
- return uint16(opq32 & 0xFFFF)
-}
-
-// Close this UprFeed.
-func (feed *UprFeed) Close() {
- feed.muFeedState.Lock()
- defer feed.muFeedState.Unlock()
- if feed.feedState != FeedStateClosed {
- close(feed.closer)
- feed.feedState = FeedStateClosed
- feed.negotiator.initialize()
- }
-}
-
-// check if the UprFeed has been closed
-func (feed *UprFeed) Closed() bool {
- feed.muFeedState.RLock()
- defer feed.muFeedState.RUnlock()
- return feed.feedState == FeedStateClosed
-}
-
-// set upr feed to opened state after initialization is done
-func (feed *UprFeed) setOpen() {
- feed.muFeedState.Lock()
- defer feed.muFeedState.Unlock()
- feed.feedState = FeedStateOpened
-}
-
-func (feed *UprFeed) isOpen() bool {
- feed.muFeedState.RLock()
- defer feed.muFeedState.RUnlock()
- return feed.feedState == FeedStateOpened
-}
-
-func (feed *UprFeed) getState() FeedState {
- feed.muFeedState.RLock()
- defer feed.muFeedState.RUnlock()
- return feed.feedState
-}
diff --git a/vendor/github.com/couchbase/gomemcached/flexibleFraming.go b/vendor/github.com/couchbase/gomemcached/flexibleFraming.go
deleted file mode 100644
index a545885fd8..0000000000
--- a/vendor/github.com/couchbase/gomemcached/flexibleFraming.go
+++ /dev/null
@@ -1,398 +0,0 @@
-package gomemcached
-
-import (
- "encoding/binary"
- "fmt"
-)
-
-type FrameObjType int
-
-const (
- FrameBarrier FrameObjType = iota
- FrameDurability FrameObjType = iota
- FrameDcpStreamId FrameObjType = iota
- FrameOpenTracing FrameObjType = iota
- FrameImpersonate FrameObjType = iota
-)
-
-const MAX_USER_LEN = 15 // TODO half byte shifting to be implemented
-// it's not very efficient so we currently truncate user names
-const FAST_USER_LEN = 15
-
-type FrameInfo struct {
- ObjId FrameObjType
- ObjLen int
- ObjData []byte
-}
-
-var ErrorInvalidOp error = fmt.Errorf("Specified method is not applicable")
-var ErrorObjLenNotMatch error = fmt.Errorf("Object length does not match data")
-
-func (f *FrameInfo) Validate() error {
- switch f.ObjId {
- case FrameBarrier:
- if f.ObjLen != 0 {
- return fmt.Errorf("Invalid FrameBarrier - length is %v\n", f.ObjLen)
- } else if f.ObjLen != len(f.ObjData) {
- return ErrorObjLenNotMatch
- }
- case FrameDurability:
- if f.ObjLen != 1 && f.ObjLen != 3 {
- return fmt.Errorf("Invalid FrameDurability - length is %v\n", f.ObjLen)
- } else if f.ObjLen != len(f.ObjData) {
- return ErrorObjLenNotMatch
- }
- case FrameDcpStreamId:
- if f.ObjLen != 2 {
- return fmt.Errorf("Invalid FrameDcpStreamId - length is %v\n", f.ObjLen)
- } else if f.ObjLen != len(f.ObjData) {
- return ErrorObjLenNotMatch
- }
- case FrameOpenTracing:
- if f.ObjLen != 1 {
- return fmt.Errorf("Invalid FrameImpersonate - length is %v\n", f.ObjLen)
- } else if f.ObjLen != len(f.ObjData) {
- return ErrorObjLenNotMatch
- }
- case FrameImpersonate:
- default:
- return fmt.Errorf("Unknown FrameInfo type")
- }
- return nil
-}
-
-func (f *FrameInfo) GetStreamId() (uint16, error) {
- if f.ObjId != FrameDcpStreamId {
- return 0, ErrorInvalidOp
- }
-
- var output uint16
- output = uint16(f.ObjData[0])
- output = output << 8
- output |= uint16(f.ObjData[1])
- return output, nil
-}
-
-type DurabilityLvl uint8
-
-const (
- DuraInvalid DurabilityLvl = iota // Not used (0x0)
- DuraMajority DurabilityLvl = iota // (0x01)
- DuraMajorityAndPersistOnMaster DurabilityLvl = iota // (0x02)
- DuraPersistToMajority DurabilityLvl = iota // (0x03)
-)
-
-func (f *FrameInfo) GetDurabilityRequirements() (lvl DurabilityLvl, timeoutProvided bool, timeoutMs uint16, err error) {
- if f.ObjId != FrameDurability {
- err = ErrorInvalidOp
- return
- }
- if f.ObjLen != 1 && f.ObjLen != 3 {
- err = ErrorObjLenNotMatch
- return
- }
-
- lvl = DurabilityLvl(uint8(f.ObjData[0]))
-
- if f.ObjLen == 3 {
- timeoutProvided = true
- timeoutMs = binary.BigEndian.Uint16(f.ObjData[1:2])
- }
-
- return
-}
-
-func incrementMarker(bitsToBeIncremented, byteIncrementCnt *int, framingElen, curObjIdx int) (int, error) {
- for *bitsToBeIncremented >= 8 {
- *byteIncrementCnt++
- *bitsToBeIncremented -= 8
- }
- marker := curObjIdx + *byteIncrementCnt
- if marker > framingElen {
- return -1, fmt.Errorf("Out of bounds")
- }
- return marker, nil
-}
-
-func (f *FrameInfo) Bytes() ([]byte, bool) {
- return obj2Bytes(f.ObjId, f.ObjLen, f.ObjData)
-}
-
-// TODO implement half byte shifting for impersonate user names
-// halfByteRemaining will always be false, because ObjID and Len haven't gotten that large yet
-// and user names are truncated
-func obj2Bytes(id FrameObjType, len int, data []byte) (output []byte, halfByteRemaining bool) {
- if len < 16 {
-
- // ObjIdentifier - 4 bits + ObjLength - 4 bits
- var idAndLen uint8
- idAndLen |= uint8(id) << 4
- idAndLen |= uint8(len)
- output = append(output, byte(idAndLen))
-
- // Rest is Data
- output = append(output, data[:len]...)
-
- } else {
- }
- return
-}
-
-func parseFrameInfoObjects(buf []byte, framingElen int) (objs []FrameInfo, err error, halfByteRemaining bool) {
- var curObjIdx int
- var byteIncrementCnt int
- var bitsToBeIncremented int
- var marker int
-
- // Parse frameInfo objects
- for curObjIdx = 0; curObjIdx < framingElen; curObjIdx += byteIncrementCnt {
- byteIncrementCnt = 0
- var oneFrameObj FrameInfo
-
- // First get the objId
- // -------------------------
- var objId int
- var objHeader uint8 = buf[curObjIdx]
- var objIdentifierRaw uint8
- if bitsToBeIncremented == 0 {
- // ObjHeader
- // 0 1 2 3 4 5 6 7
- // ^-----^
- // ObjIdentifierRaw
- objIdentifierRaw = (objHeader & 0xf0) >> 4
- } else {
- // ObjHeader
- // 0 1 2 3 4 5 6 7
- // ^-----^
- // ObjIdentifierRaw
- objIdentifierRaw = (objHeader & 0x0f)
- }
- bitsToBeIncremented += 4
-
- marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
- if err != nil {
- return
- }
-
- // Value is 0-14
- objId = int(objIdentifierRaw & 0xe)
- // If bit 15 is set, ID is 15 + value of next byte
- if objIdentifierRaw&0x1 > 0 {
- if bitsToBeIncremented > 0 {
- // ObjHeader
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
- // ^-----^ ^---------------^
- // ObjId1 Extension
- // ^ marker
- buffer := uint16(buf[marker])
- buffer = buffer << 8
- buffer |= uint16(buf[marker+1])
- var extension uint8 = uint8(buffer & 0xff0 >> 4)
- objId += int(extension)
- } else {
- // ObjHeader
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
- // ^-----^ ^-------------------^
- // ObjId1 extension
- // ^ marker
- var extension uint8 = uint8(buf[marker])
- objId += int(extension)
- }
- bitsToBeIncremented += 8
- }
-
- marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
- if err != nil {
- return
- }
- oneFrameObj.ObjId = FrameObjType(objId)
-
- // Then get the obj length
- // -------------------------
- var objLenRaw uint8
- var objLen int
- if bitsToBeIncremented > 0 {
- // ObjHeader
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
- // ^ ^---------^
- // marker objLen
- objLenRaw = uint8(buf[marker]) & 0x0f
- } else {
- // ObjHeader
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
- // ^--------^
- // objLen
- // ^ marker
- objLenRaw = uint8(buf[marker]) & 0xf0 >> 4
- }
- bitsToBeIncremented += 4
-
- marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
- if err != nil {
- return
- }
-
- // Length is 0-14
- objLen = int(objLenRaw & 0xe)
- // If bit 15 is set, lenghth is 15 + value of next byte
- if objLenRaw&0x1 > 0 {
- if bitsToBeIncremented == 0 {
- // ObjHeader
- // 12 13 14 15 16 17 18 19 20 21 22 23
- // ^---------^ ^--------------------^
- // objLen extension
- // ^ marker
- var extension uint8 = uint8(buf[marker])
- objLen += int(extension)
- } else {
- // ObjHeader
- // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
- // ^--------^ ^---------------------^
- // objLen extension
- // ^ marker var buffer uint16
- buffer := uint16(buf[marker])
- buffer = buffer << 8
- buffer |= uint16(buf[marker+1])
- var extension uint8 = uint8(buffer & 0xff0 >> 4)
- objLen += int(extension)
- }
- bitsToBeIncremented += 8
- }
-
- marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
- if err != nil {
- return
- }
- oneFrameObj.ObjLen = objLen
-
- // The rest is N-bytes of data based on the length
- if bitsToBeIncremented == 0 {
- // No weird alignment needed
- oneFrameObj.ObjData = buf[marker : marker+objLen]
- } else {
- // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
- // ^--------^ ^---------------------^ ^--------->
- // objLen extension data
- // ^ marker
- oneFrameObj.ObjData = ShiftByteSliceLeft4Bits(buf[marker : marker+objLen+1])
- }
- err = oneFrameObj.Validate()
- if err != nil {
- return
- }
- objs = append(objs, oneFrameObj)
-
- bitsToBeIncremented += 8 * objLen
- marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
- }
-
- if bitsToBeIncremented > 0 {
- halfByteRemaining = true
- }
- return
-}
-
-func ShiftByteSliceLeft4Bits(slice []byte) (replacement []byte) {
- var buffer uint16
- var i int
- sliceLen := len(slice)
-
- if sliceLen < 2 {
- // Let's not shift less than 16 bits
- return
- }
-
- replacement = make([]byte, sliceLen, cap(slice))
-
- for i = 0; i < sliceLen-1; i++ {
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
- // ^-----^ ^---------------^ ^-----------
- // garbage data byte 0 data byte 1
- buffer = uint16(slice[i])
- buffer = buffer << 8
- buffer |= uint16(slice[i+1])
- replacement[i] = uint8(buffer & 0xff0 >> 4)
- }
-
- if i < sliceLen {
- lastByte := slice[sliceLen-1]
- lastByte = lastByte << 4
- replacement[i] = lastByte
- }
- return
-}
-
-// The following is used to theoretically support frameInfo ObjID extensions
-// for completeness, but they are not very efficient though
-func ShiftByteSliceRight4Bits(slice []byte) (replacement []byte) {
- var buffer uint16
- var i int
- var leftovers uint8 // 4 bits only
- var replacementUnit uint16
- var first bool = true
- var firstLeftovers uint8
- var lastLeftovers uint8
- sliceLen := len(slice)
-
- if sliceLen < 2 {
- // Let's not shift less than 16 bits
- return
- }
-
- if slice[sliceLen-1]&0xf == 0 {
- replacement = make([]byte, sliceLen, cap(slice))
- } else {
- replacement = make([]byte, sliceLen+1, cap(slice)+1)
- }
-
- for i = 0; i < sliceLen-1; i++ {
- buffer = binary.BigEndian.Uint16(slice[i : i+2])
- // (buffer)
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
- // ^-------------^ ^-------------------^
- // data byte 0 data byte 1
- //
- // into
- //
- // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
- // ^-----^ ^---------------^ ^--------------------^ ^----------^
- // zeroes data byte 0 data byte 1 zeroes
-
- if first {
- // The leftover OR'ing will overwrite the first 4 bits of data byte 0. Save them
- firstLeftovers = uint8(buffer & 0xf000 >> 12)
- first = false
- }
- replacementUnit = 0
- replacementUnit |= uint16(leftovers) << 12
- replacementUnit |= (buffer & 0xff00) >> 4 // data byte 0
- replacementUnit |= buffer & 0xff >> 4 // data byte 1 first 4 bits
- lastLeftovers = uint8(buffer&0xf) << 4
-
- replacement[i+1] = byte(replacementUnit)
-
- leftovers = uint8((buffer & 0x000f) << 4)
- }
-
- replacement[0] = byte(uint8(replacement[0]) | firstLeftovers)
- if lastLeftovers > 0 {
- replacement[sliceLen] = byte(lastLeftovers)
- }
- return
-}
-
-func Merge2HalfByteSlices(src1, src2 []byte) (output []byte) {
- src1Len := len(src1)
- src2Len := len(src2)
- output = make([]byte, src1Len+src2Len-1)
-
- var mergeByte uint8 = src1[src1Len-1]
- mergeByte |= uint8(src2[0])
-
- copy(output, src1)
- copy(output[src1Len:], src2[1:])
-
- output[src1Len-1] = byte(mergeByte)
-
- return
-}
diff --git a/vendor/github.com/couchbase/gomemcached/go.mod b/vendor/github.com/couchbase/gomemcached/go.mod
deleted file mode 100644
index 3355d4ea75..0000000000
--- a/vendor/github.com/couchbase/gomemcached/go.mod
+++ /dev/null
@@ -1,3 +0,0 @@
-module github.com/couchbase/gomemcached
-
-go 1.13
diff --git a/vendor/github.com/couchbase/gomemcached/mc_constants.go b/vendor/github.com/couchbase/gomemcached/mc_constants.go
deleted file mode 100644
index 19741f5a0d..0000000000
--- a/vendor/github.com/couchbase/gomemcached/mc_constants.go
+++ /dev/null
@@ -1,364 +0,0 @@
-// Package gomemcached is binary protocol packet formats and constants.
-package gomemcached
-
-import (
- "fmt"
-)
-
-const (
- REQ_MAGIC = 0x80
- RES_MAGIC = 0x81
- FLEX_MAGIC = 0x08
- FLEX_RES_MAGIC = 0x18
-)
-
-// CommandCode for memcached packets.
-type CommandCode uint8
-
-const (
- GET = CommandCode(0x00)
- SET = CommandCode(0x01)
- ADD = CommandCode(0x02)
- REPLACE = CommandCode(0x03)
- DELETE = CommandCode(0x04)
- INCREMENT = CommandCode(0x05)
- DECREMENT = CommandCode(0x06)
- QUIT = CommandCode(0x07)
- FLUSH = CommandCode(0x08)
- GETQ = CommandCode(0x09)
- NOOP = CommandCode(0x0a)
- VERSION = CommandCode(0x0b)
- GETK = CommandCode(0x0c)
- GETKQ = CommandCode(0x0d)
- APPEND = CommandCode(0x0e)
- PREPEND = CommandCode(0x0f)
- STAT = CommandCode(0x10)
- SETQ = CommandCode(0x11)
- ADDQ = CommandCode(0x12)
- REPLACEQ = CommandCode(0x13)
- DELETEQ = CommandCode(0x14)
- INCREMENTQ = CommandCode(0x15)
- DECREMENTQ = CommandCode(0x16)
- QUITQ = CommandCode(0x17)
- FLUSHQ = CommandCode(0x18)
- APPENDQ = CommandCode(0x19)
- AUDIT = CommandCode(0x27)
- PREPENDQ = CommandCode(0x1a)
- GAT = CommandCode(0x1d)
- HELLO = CommandCode(0x1f)
- RGET = CommandCode(0x30)
- RSET = CommandCode(0x31)
- RSETQ = CommandCode(0x32)
- RAPPEND = CommandCode(0x33)
- RAPPENDQ = CommandCode(0x34)
- RPREPEND = CommandCode(0x35)
- RPREPENDQ = CommandCode(0x36)
- RDELETE = CommandCode(0x37)
- RDELETEQ = CommandCode(0x38)
- RINCR = CommandCode(0x39)
- RINCRQ = CommandCode(0x3a)
- RDECR = CommandCode(0x3b)
- RDECRQ = CommandCode(0x3c)
-
- SASL_LIST_MECHS = CommandCode(0x20)
- SASL_AUTH = CommandCode(0x21)
- SASL_STEP = CommandCode(0x22)
-
- SET_VBUCKET = CommandCode(0x3d)
-
- TAP_CONNECT = CommandCode(0x40) // Client-sent request to initiate Tap feed
- TAP_MUTATION = CommandCode(0x41) // Notification of a SET/ADD/REPLACE/etc. on the server
- TAP_DELETE = CommandCode(0x42) // Notification of a DELETE on the server
- TAP_FLUSH = CommandCode(0x43) // Replicates a flush_all command
- TAP_OPAQUE = CommandCode(0x44) // Opaque control data from the engine
- TAP_VBUCKET_SET = CommandCode(0x45) // Sets state of vbucket in receiver (used in takeover)
- TAP_CHECKPOINT_START = CommandCode(0x46) // Notifies start of new checkpoint
- TAP_CHECKPOINT_END = CommandCode(0x47) // Notifies end of checkpoint
- GET_ALL_VB_SEQNOS = CommandCode(0x48) // Get current high sequence numbers from all vbuckets located on the server
-
- UPR_OPEN = CommandCode(0x50) // Open a UPR connection with a name
- UPR_ADDSTREAM = CommandCode(0x51) // Sent by ebucketMigrator to UPR Consumer
- UPR_CLOSESTREAM = CommandCode(0x52) // Sent by eBucketMigrator to UPR Consumer
- UPR_FAILOVERLOG = CommandCode(0x54) // Request failover logs
- UPR_STREAMREQ = CommandCode(0x53) // Stream request from consumer to producer
- UPR_STREAMEND = CommandCode(0x55) // Sent by producer when it has no more messages to stream
- UPR_SNAPSHOT = CommandCode(0x56) // Start of a new snapshot
- UPR_MUTATION = CommandCode(0x57) // Key mutation
- UPR_DELETION = CommandCode(0x58) // Key deletion
- UPR_EXPIRATION = CommandCode(0x59) // Key expiration
- UPR_FLUSH = CommandCode(0x5a) // Delete all the data for a vbucket
- UPR_NOOP = CommandCode(0x5c) // UPR NOOP
- UPR_BUFFERACK = CommandCode(0x5d) // UPR Buffer Acknowledgement
- UPR_CONTROL = CommandCode(0x5e) // Set flow control params
-
- SELECT_BUCKET = CommandCode(0x89) // Select bucket
-
- OBSERVE_SEQNO = CommandCode(0x91) // Sequence Number based Observe
- OBSERVE = CommandCode(0x92)
-
- GET_META = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc
- GET_COLLECTIONS_MANIFEST = CommandCode(0xba) // Get entire collections manifest.
- COLLECTIONS_GET_CID = CommandCode(0xbb) // Get collection id.
- SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs
- SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta.
-
- DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred
- DCP_SEQNO_ADV = CommandCode(0x64) // Sent when the vb seqno has advanced due to an unsubscribed event
- DCP_OSO_SNAPSHOT = CommandCode(0x65) // Marks the begin and end of out-of-sequence-number stream
-)
-
-// command codes that are counted toward DCP control buffer
-// when DCP clients receive DCP messages with these command codes, they need to provide acknowledgement
-var BufferedCommandCodeMap = map[CommandCode]bool{
- SET_VBUCKET: true,
- UPR_STREAMEND: true,
- UPR_SNAPSHOT: true,
- UPR_MUTATION: true,
- UPR_DELETION: true,
- UPR_EXPIRATION: true,
- DCP_SYSTEM_EVENT: true,
- DCP_SEQNO_ADV: true,
- DCP_OSO_SNAPSHOT: true,
-}
-
-// Status field for memcached response.
-type Status uint16
-
-// Matches with protocol_binary.h as source of truth
-const (
- SUCCESS = Status(0x00)
- KEY_ENOENT = Status(0x01)
- KEY_EEXISTS = Status(0x02)
- E2BIG = Status(0x03)
- EINVAL = Status(0x04)
- NOT_STORED = Status(0x05)
- DELTA_BADVAL = Status(0x06)
- NOT_MY_VBUCKET = Status(0x07)
- NO_BUCKET = Status(0x08)
- LOCKED = Status(0x09)
- AUTH_STALE = Status(0x1f)
- AUTH_ERROR = Status(0x20)
- AUTH_CONTINUE = Status(0x21)
- ERANGE = Status(0x22)
- ROLLBACK = Status(0x23)
- EACCESS = Status(0x24)
- NOT_INITIALIZED = Status(0x25)
- UNKNOWN_COMMAND = Status(0x81)
- ENOMEM = Status(0x82)
- NOT_SUPPORTED = Status(0x83)
- EINTERNAL = Status(0x84)
- EBUSY = Status(0x85)
- TMPFAIL = Status(0x86)
- UNKNOWN_COLLECTION = Status(0x88)
-
- SYNC_WRITE_IN_PROGRESS = Status(0xa2)
- SYNC_WRITE_AMBIGUOUS = Status(0xa3)
-
- // SUBDOC
- SUBDOC_PATH_NOT_FOUND = Status(0xc0)
- SUBDOC_BAD_MULTI = Status(0xcc)
- SUBDOC_MULTI_PATH_FAILURE_DELETED = Status(0xd3)
-
- // Not a Memcached status
- UNKNOWN_STATUS = Status(0xffff)
-)
-
-// for log redaction
-const (
- UdTagBegin = "<ud>"
- UdTagEnd = "</ud>"
-)
-
-var isFatal = map[Status]bool{
- DELTA_BADVAL: true,
- NO_BUCKET: true,
- AUTH_STALE: true,
- AUTH_ERROR: true,
- ERANGE: true,
- ROLLBACK: true,
- EACCESS: true,
- ENOMEM: true,
- NOT_SUPPORTED: true,
-
- // consider statuses coming from outside couchbase (eg OS errors) as fatal for the connection
- // as there might be unread data left over on the wire
- UNKNOWN_STATUS: true,
-}
-
-// the producer/consumer bit in dcp flags
-var DCP_PRODUCER uint32 = 0x01
-
-// the include XATTRS bit in dcp flags
-var DCP_OPEN_INCLUDE_XATTRS uint32 = 0x04
-
-// the include deletion time bit in dcp flags
-var DCP_OPEN_INCLUDE_DELETE_TIMES uint32 = 0x20
-
-// Datatype to Include XATTRS in SUBDOC GET
-var SUBDOC_FLAG_XATTR uint8 = 0x04
-
-// MCItem is an internal representation of an item.
-type MCItem struct {
- Cas uint64
- Flags, Expiration uint32
- Data []byte
-}
-
-// Number of bytes in a binary protocol header.
-const HDR_LEN = 24
-
-// Mapping of CommandCode -> name of command (not exhaustive)
-var CommandNames map[CommandCode]string
-
-// StatusNames human readable names for memcached response.
-var StatusNames map[Status]string
-
-func init() {
- CommandNames = make(map[CommandCode]string)
- CommandNames[GET] = "GET"
- CommandNames[SET] = "SET"
- CommandNames[ADD] = "ADD"
- CommandNames[REPLACE] = "REPLACE"
- CommandNames[DELETE] = "DELETE"
- CommandNames[INCREMENT] = "INCREMENT"
- CommandNames[DECREMENT] = "DECREMENT"
- CommandNames[QUIT] = "QUIT"
- CommandNames[FLUSH] = "FLUSH"
- CommandNames[GETQ] = "GETQ"
- CommandNames[NOOP] = "NOOP"
- CommandNames[VERSION] = "VERSION"
- CommandNames[GETK] = "GETK"
- CommandNames[GETKQ] = "GETKQ"
- CommandNames[APPEND] = "APPEND"
- CommandNames[PREPEND] = "PREPEND"
- CommandNames[STAT] = "STAT"
- CommandNames[SETQ] = "SETQ"
- CommandNames[ADDQ] = "ADDQ"
- CommandNames[REPLACEQ] = "REPLACEQ"
- CommandNames[DELETEQ] = "DELETEQ"
- CommandNames[INCREMENTQ] = "INCREMENTQ"
- CommandNames[DECREMENTQ] = "DECREMENTQ"
- CommandNames[QUITQ] = "QUITQ"
- CommandNames[FLUSHQ] = "FLUSHQ"
- CommandNames[APPENDQ] = "APPENDQ"
- CommandNames[PREPENDQ] = "PREPENDQ"
- CommandNames[RGET] = "RGET"
- CommandNames[RSET] = "RSET"
- CommandNames[RSETQ] = "RSETQ"
- CommandNames[RAPPEND] = "RAPPEND"
- CommandNames[RAPPENDQ] = "RAPPENDQ"
- CommandNames[RPREPEND] = "RPREPEND"
- CommandNames[RPREPENDQ] = "RPREPENDQ"
- CommandNames[RDELETE] = "RDELETE"
- CommandNames[RDELETEQ] = "RDELETEQ"
- CommandNames[RINCR] = "RINCR"
- CommandNames[RINCRQ] = "RINCRQ"
- CommandNames[RDECR] = "RDECR"
- CommandNames[RDECRQ] = "RDECRQ"
-
- CommandNames[SASL_LIST_MECHS] = "SASL_LIST_MECHS"
- CommandNames[SASL_AUTH] = "SASL_AUTH"
- CommandNames[SASL_STEP] = "SASL_STEP"
-
- CommandNames[TAP_CONNECT] = "TAP_CONNECT"
- CommandNames[TAP_MUTATION] = "TAP_MUTATION"
- CommandNames[TAP_DELETE] = "TAP_DELETE"
- CommandNames[TAP_FLUSH] = "TAP_FLUSH"
- CommandNames[TAP_OPAQUE] = "TAP_OPAQUE"
- CommandNames[TAP_VBUCKET_SET] = "TAP_VBUCKET_SET"
- CommandNames[TAP_CHECKPOINT_START] = "TAP_CHECKPOINT_START"
- CommandNames[TAP_CHECKPOINT_END] = "TAP_CHECKPOINT_END"
-
- CommandNames[UPR_OPEN] = "UPR_OPEN"
- CommandNames[UPR_ADDSTREAM] = "UPR_ADDSTREAM"
- CommandNames[UPR_CLOSESTREAM] = "UPR_CLOSESTREAM"
- CommandNames[UPR_FAILOVERLOG] = "UPR_FAILOVERLOG"
- CommandNames[UPR_STREAMREQ] = "UPR_STREAMREQ"
- CommandNames[UPR_STREAMEND] = "UPR_STREAMEND"
- CommandNames[UPR_SNAPSHOT] = "UPR_SNAPSHOT"
- CommandNames[UPR_MUTATION] = "UPR_MUTATION"
- CommandNames[UPR_DELETION] = "UPR_DELETION"
- CommandNames[UPR_EXPIRATION] = "UPR_EXPIRATION"
- CommandNames[UPR_FLUSH] = "UPR_FLUSH"
- CommandNames[UPR_NOOP] = "UPR_NOOP"
- CommandNames[UPR_BUFFERACK] = "UPR_BUFFERACK"
- CommandNames[UPR_CONTROL] = "UPR_CONTROL"
- CommandNames[SUBDOC_GET] = "SUBDOC_GET"
- CommandNames[SUBDOC_MULTI_LOOKUP] = "SUBDOC_MULTI_LOOKUP"
- CommandNames[GET_COLLECTIONS_MANIFEST] = "GET_COLLECTIONS_MANIFEST"
- CommandNames[COLLECTIONS_GET_CID] = "COLLECTIONS_GET_CID"
- CommandNames[DCP_SYSTEM_EVENT] = "DCP_SYSTEM_EVENT"
- CommandNames[DCP_SEQNO_ADV] = "DCP_SEQNO_ADV"
-
- StatusNames = make(map[Status]string)
- StatusNames[SUCCESS] = "SUCCESS"
- StatusNames[KEY_ENOENT] = "KEY_ENOENT"
- StatusNames[KEY_EEXISTS] = "KEY_EEXISTS"
- StatusNames[E2BIG] = "E2BIG"
- StatusNames[EINVAL] = "EINVAL"
- StatusNames[NOT_STORED] = "NOT_STORED"
- StatusNames[DELTA_BADVAL] = "DELTA_BADVAL"
- StatusNames[NOT_MY_VBUCKET] = "NOT_MY_VBUCKET"
- StatusNames[NO_BUCKET] = "NO_BUCKET"
- StatusNames[AUTH_STALE] = "AUTH_STALE"
- StatusNames[AUTH_ERROR] = "AUTH_ERROR"
- StatusNames[AUTH_CONTINUE] = "AUTH_CONTINUE"
- StatusNames[ERANGE] = "ERANGE"
- StatusNames[ROLLBACK] = "ROLLBACK"
- StatusNames[EACCESS] = "EACCESS"
- StatusNames[NOT_INITIALIZED] = "NOT_INITIALIZED"
- StatusNames[UNKNOWN_COMMAND] = "UNKNOWN_COMMAND"
- StatusNames[ENOMEM] = "ENOMEM"
- StatusNames[NOT_SUPPORTED] = "NOT_SUPPORTED"
- StatusNames[EINTERNAL] = "EINTERNAL"
- StatusNames[EBUSY] = "EBUSY"
- StatusNames[TMPFAIL] = "TMPFAIL"
- StatusNames[UNKNOWN_COLLECTION] = "UNKNOWN_COLLECTION"
- StatusNames[SUBDOC_PATH_NOT_FOUND] = "SUBDOC_PATH_NOT_FOUND"
- StatusNames[SUBDOC_BAD_MULTI] = "SUBDOC_BAD_MULTI"
-
-}
-
-// String an op code.
-func (o CommandCode) String() (rv string) {
- rv = CommandNames[o]
- if rv == "" {
- rv = fmt.Sprintf("0x%02x", int(o))
- }
- return rv
-}
-
-// String an op code.
-func (s Status) String() (rv string) {
- rv = StatusNames[s]
- if rv == "" {
- rv = fmt.Sprintf("0x%02x", int(s))
- }
- return rv
-}
-
-// IsQuiet will return true if a command is a "quiet" command.
-func (o CommandCode) IsQuiet() bool {
- switch o {
- case GETQ,
- GETKQ,
- SETQ,
- ADDQ,
- REPLACEQ,
- DELETEQ,
- INCREMENTQ,
- DECREMENTQ,
- QUITQ,
- FLUSHQ,
- APPENDQ,
- PREPENDQ,
- RSETQ,
- RAPPENDQ,
- RPREPENDQ,
- RDELETEQ,
- RINCRQ,
- RDECRQ:
- return true
- }
- return false
-}
diff --git a/vendor/github.com/couchbase/gomemcached/mc_req.go b/vendor/github.com/couchbase/gomemcached/mc_req.go
deleted file mode 100644
index 9fbfde3457..0000000000
--- a/vendor/github.com/couchbase/gomemcached/mc_req.go
+++ /dev/null
@@ -1,656 +0,0 @@
-package gomemcached
-
-import (
- "encoding/binary"
- "fmt"
- "io"
-)
-
-// The maximum reasonable body length to expect.
-// Anything larger than this will result in an error.
-// The current limit, 20MB, is the size limit supported by ep-engine.
-var MaxBodyLen = int(20 * 1024 * 1024)
-
-const _BUFLEN = 256
-
-// MCRequest is memcached Request
-type MCRequest struct {
- // The command being issued
- Opcode CommandCode
- // The CAS (if applicable, or 0)
- Cas uint64
- // An opaque value to be returned with this request
- Opaque uint32
- // The vbucket to which this command belongs
- VBucket uint16
- // Command extras, key, and body
- Extras, Key, Body, ExtMeta []byte
- // Datatype identifier
- DataType uint8
- // len() calls are expensive - cache this in case for collection
- Keylen int
- // Collection id for collection based operations
- CollId [binary.MaxVarintLen32]byte
- // Length of collection id
- CollIdLen int
- // Impersonate user name - could go in FramingExtras, but for efficiency
- Username [MAX_USER_LEN]byte
- // Length of Impersonate user name
- UserLen int
- // Flexible Framing Extras
- FramingExtras []FrameInfo
- // Stored length of incoming framing extras
- FramingElen int
-}
-
-// Size gives the number of bytes this request requires.
-func (req *MCRequest) HdrSize() int {
- rv := HDR_LEN + len(req.Extras) + req.CollIdLen + req.FramingElen + len(req.Key)
- if req.UserLen != 0 {
- rv += req.UserLen + 1
-
- // half byte shifting required
- if req.UserLen > FAST_USER_LEN {
- rv++
- }
- }
- for _, e := range req.FramingExtras {
- rv += e.ObjLen + 1
-
- // half byte shifting required
- if e.ObjLen > FAST_USER_LEN {
- rv++
- }
- }
- return rv
-}
-
-func (req *MCRequest) Size() int {
- return req.HdrSize() + len(req.Body) + len(req.ExtMeta)
-}
-
-// A debugging string representation of this request
-func (req MCRequest) String() string {
- return fmt.Sprintf("{MCRequest opcode=%s, bodylen=%d, key='%s'}",
- req.Opcode, len(req.Body), req.Key)
-}
-
-func (req *MCRequest) fillRegularHeaderBytes(data []byte) int {
- // Byte/ 0 | 1 | 2 | 3 |
- // / | | | |
- // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
- // +---------------+---------------+---------------+---------------+
- // 0| Magic | Opcode | Key length |
- // +---------------+---------------+---------------+---------------+
- // 4| Extras length | Data type | vbucket id |
- // +---------------+---------------+---------------+---------------+
- // 8| Total body length |
- // +---------------+---------------+---------------+---------------+
- // 12| Opaque |
- // +---------------+---------------+---------------+---------------+
- // 16| CAS |
- // | |
- // +---------------+---------------+---------------+---------------+
- // Total 24 bytes
-
- pos := 0
- data[pos] = REQ_MAGIC
- pos++
- data[pos] = byte(req.Opcode)
- pos++
- binary.BigEndian.PutUint16(data[pos:pos+2],
- uint16(req.CollIdLen+len(req.Key)))
- pos += 2
-
- // 4
- data[pos] = byte(len(req.Extras))
- pos++
- // Data type
- if req.DataType != 0 {
- data[pos] = byte(req.DataType)
- }
- pos++
- binary.BigEndian.PutUint16(data[pos:pos+2], req.VBucket)
- pos += 2
-
- // 8
- binary.BigEndian.PutUint32(data[pos:pos+4],
- uint32(len(req.Body)+req.CollIdLen+len(req.Key)+len(req.Extras)+len(req.ExtMeta)))
- pos += 4
-
- // 12
- binary.BigEndian.PutUint32(data[pos:pos+4], req.Opaque)
- pos += 4
-
- // 16
- if req.Cas != 0 {
- binary.BigEndian.PutUint64(data[pos:pos+8], req.Cas)
- }
- pos += 8
-
- // 24 - extras
- if len(req.Extras) > 0 {
- copy(data[pos:pos+len(req.Extras)], req.Extras)
- pos += len(req.Extras)
- }
-
- if len(req.Key) > 0 {
- if req.CollIdLen > 0 {
- copy(data[pos:pos+req.CollIdLen], req.CollId[:])
- pos += req.CollIdLen
- }
- copy(data[pos:pos+len(req.Key)], req.Key)
- pos += len(req.Key)
- }
-
- return pos
-}
-
-func (req *MCRequest) fillFastFlexHeaderBytes(data []byte) int {
- // Byte/ 0 | 1 | 2 | 3 |
- // / | | | |
- // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
- // +---------------+---------------+---------------+---------------+
- // 0| Magic | Opcode | Framing extras| Key Length |
- // +---------------+---------------+---------------+---------------+
- // 4| Extras length | Data type | vbucket id |
- // +---------------+---------------+---------------+---------------+
- // 8| Total body length |
- // +---------------+---------------+---------------+---------------+
- // 12| Opaque |
- // +---------------+---------------+---------------+---------------+
- // 16| CAS |
- // | |
- // +---------------+---------------+---------------+---------------+
- // Total 24 bytes
-
- pos := 0
- data[pos] = FLEX_MAGIC
- pos++
- data[pos] = byte(req.Opcode)
- pos++
- data[pos] = byte(req.UserLen + 1)
- pos++
- data[pos] = byte(len(req.Key) + req.CollIdLen)
- pos++
-
- // 4
- data[pos] = byte(len(req.Extras))
- pos++
- // Data type
- if req.DataType != 0 {
- data[pos] = byte(req.DataType)
- }
- pos++
- binary.BigEndian.PutUint16(data[pos:pos+2], req.VBucket)
- pos += 2
-
- // 8
- binary.BigEndian.PutUint32(data[pos:pos+4],
- uint32(len(req.Body)+req.CollIdLen+len(req.Key)+(req.UserLen+1)+len(req.Extras)+len(req.ExtMeta)))
- pos += 4
-
- // 12
- binary.BigEndian.PutUint32(data[pos:pos+4], req.Opaque)
- pos += 4
-
- // 16
- if req.Cas != 0 {
- binary.BigEndian.PutUint64(data[pos:pos+8], req.Cas)
- }
- pos += 8
-
- // 24 Flexible extras
- if req.UserLen > 0 {
- data[pos] = byte((uint8(FrameImpersonate) << 4) | uint8(req.UserLen))
- pos++
- copy(data[pos:pos+req.UserLen], req.Username[:req.UserLen])
- pos += req.UserLen
- }
-
- if len(req.Extras) > 0 {
- copy(data[pos:pos+len(req.Extras)], req.Extras)
- pos += len(req.Extras)
- }
-
- if len(req.Key) > 0 {
- if req.CollIdLen > 0 {
- copy(data[pos:pos+req.CollIdLen], req.CollId[:])
- pos += req.CollIdLen
- }
- copy(data[pos:pos+len(req.Key)], req.Key)
- pos += len(req.Key)
- }
-
- return pos
-}
-
-// Returns pos and if trailing by half byte
-func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) {
-
- // Byte/ 0 | 1 | 2 | 3 |
- // / | | | |
- // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
- // +---------------+---------------+---------------+---------------+
- // 0| Magic (0x08) | Opcode | Framing extras| Key Length |
- // +---------------+---------------+---------------+---------------+
- // 4| Extras length | Data type | vbucket id |
- // +---------------+---------------+---------------+---------------+
- // 8| Total body length |
- // +---------------+---------------+---------------+---------------+
- // 12| Opaque |
- // +---------------+---------------+---------------+---------------+
- // 16| CAS |
- // | |
- // +---------------+---------------+---------------+---------------+
- // Total 24 bytes
-
- data[0] = FLEX_MAGIC
- data[1] = byte(req.Opcode)
- data[3] = byte(len(req.Key) + req.CollIdLen)
- elen := len(req.Extras)
- data[4] = byte(elen)
- if req.DataType != 0 {
- data[5] = byte(req.DataType)
- }
- binary.BigEndian.PutUint16(data[6:8], req.VBucket)
- binary.BigEndian.PutUint32(data[12:16], req.Opaque)
- if req.Cas != 0 {
- binary.BigEndian.PutUint64(data[16:24], req.Cas)
- }
- pos := HDR_LEN
-
- // Add framing infos
- var framingExtras []byte
- var outputBytes []byte
- var mergeModeSrc []byte
- var frameBytes int
- var halfByteMode bool
- var mergeMode bool
- for _, frameInfo := range req.FramingExtras {
- if !mergeMode {
- outputBytes, halfByteMode = frameInfo.Bytes()
- if !halfByteMode {
- framingExtras = append(framingExtras, outputBytes...)
- frameBytes += len(outputBytes)
- } else {
- mergeMode = true
- mergeModeSrc = outputBytes
- }
- } else {
- outputBytes, halfByteMode = frameInfo.Bytes()
- outputBytes := ShiftByteSliceRight4Bits(outputBytes)
- if halfByteMode {
- // Previous halfbyte merge with this halfbyte will result in a complete byte
- mergeMode = false
- outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
- framingExtras = append(framingExtras, outputBytes...)
- frameBytes += len(outputBytes)
- } else {
- // Merge half byte with a non-half byte will result in a combined half-byte that will
- // become the source for the next iteration
- mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
- }
- }
- }
-
- // fast impersonate Flexible Extra
- if req.UserLen > 0 {
- if !mergeMode {
- outputBytes, halfByteMode = obj2Bytes(FrameImpersonate, req.UserLen, req.Username[:req.UserLen])
- if !halfByteMode {
- framingExtras = append(framingExtras, outputBytes...)
- frameBytes += len(outputBytes)
- } else {
- mergeMode = true
- mergeModeSrc = outputBytes
- }
- } else {
- outputBytes, halfByteMode = obj2Bytes(FrameImpersonate, req.UserLen, req.Username[:req.UserLen])
- outputBytes := ShiftByteSliceRight4Bits(outputBytes)
- if halfByteMode {
- // Previous halfbyte merge with this halfbyte will result in a complete byte
- mergeMode = false
- outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
- framingExtras = append(framingExtras, outputBytes...)
- frameBytes += len(outputBytes)
- } else {
- // Merge half byte with a non-half byte will result in a combined half-byte that will
- // become the source for the next iteration
- mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
- }
- }
- }
-
- if mergeMode {
- // Commit the temporary merge area into framingExtras
- framingExtras = append(framingExtras, mergeModeSrc...)
- frameBytes += len(mergeModeSrc)
- }
-
- req.FramingElen = frameBytes
-
- // these have to be set after we have worked out the size of the Flexible Extras
- data[2] = byte(req.FramingElen)
- binary.BigEndian.PutUint32(data[8:12],
- uint32(len(req.Body)+len(req.Key)+req.CollIdLen+elen+len(req.ExtMeta)+req.FramingElen))
- copy(data[pos:pos+frameBytes], framingExtras)
-
- pos += frameBytes
-
- // Add Extras
- if len(req.Extras) > 0 {
- if mergeMode {
- outputBytes = ShiftByteSliceRight4Bits(req.Extras)
- data = Merge2HalfByteSlices(data, outputBytes)
- } else {
- copy(data[pos:pos+elen], req.Extras)
- }
- pos += elen
- }
-
- // Add keys
- if len(req.Key) > 0 {
- if mergeMode {
- var key []byte
- var keylen int
-
- if req.CollIdLen == 0 {
- key = req.Key
- keylen = len(req.Key)
- } else {
- key = append(key, req.CollId[:]...)
- key = append(key, req.Key...)
- keylen = len(req.Key) + req.CollIdLen
- }
-
- outputBytes = ShiftByteSliceRight4Bits(key)
- data = Merge2HalfByteSlices(data, outputBytes)
- pos += keylen
- } else {
- if req.CollIdLen > 0 {
- copy(data[pos:pos+req.CollIdLen], req.CollId[:])
- pos += req.CollIdLen
- }
- copy(data[pos:pos+len(req.Key)], req.Key)
- pos += len(req.Key)
- }
- }
-
- return pos, mergeMode
-}
-
-func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool) {
- if len(req.FramingExtras) > 0 || req.UserLen > FAST_USER_LEN {
- return req.fillFlexHeaderBytes(data)
- } else if req.UserLen > 0 {
- return req.fillFastFlexHeaderBytes(data), false
- } else {
- return req.fillRegularHeaderBytes(data), false
- }
-}
-
-// HeaderBytes will return the wire representation of the request header
-// (with the extras and key).
-func (req *MCRequest) HeaderBytes() []byte {
- data := make([]byte, req.HdrSize())
-
- req.FillHeaderBytes(data)
-
- return data
-}
-
-// Bytes will return the wire representation of this request.
-func (req *MCRequest) Bytes() []byte {
- data := make([]byte, req.Size())
- req.bytes(data)
- return data
-}
-
-func (req *MCRequest) bytes(data []byte) {
- pos, halfByteMode := req.FillHeaderBytes(data)
- // TODO - the halfByteMode should be revisited for a more efficient
- // way of doing things
-
- if len(req.Body) > 0 {
- if halfByteMode {
- shifted := ShiftByteSliceRight4Bits(req.Body)
- data = Merge2HalfByteSlices(data, shifted)
- } else {
- copy(data[pos:pos+len(req.Body)], req.Body)
- }
- }
-
- if len(req.ExtMeta) > 0 {
- if halfByteMode {
- shifted := ShiftByteSliceRight4Bits(req.ExtMeta)
- data = Merge2HalfByteSlices(data, shifted)
- } else {
- copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
- }
- }
-}
-
-// Transmit will send this request message across a writer.
-func (req *MCRequest) Transmit(w io.Writer) (n int, err error) {
- l := req.Size()
- if l < _BUFLEN {
- data := make([]byte, l)
- req.bytes(data)
- n, err = w.Write(data)
- } else {
- data := make([]byte, req.HdrSize())
- req.FillHeaderBytes(data)
- n, err = w.Write(data)
- if err == nil {
- m := 0
- m, err = w.Write(req.Body)
- n += m
- }
- }
- return
-}
-
-func (req *MCRequest) receiveHeaderCommon(hdrBytes []byte) (elen, totalBodyLen int) {
- elen = int(hdrBytes[4])
- // Data type at 5
- req.DataType = uint8(hdrBytes[5])
-
- req.Opcode = CommandCode(hdrBytes[1])
- // Vbucket at 6:7
- req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:])
- totalBodyLen = int(binary.BigEndian.Uint32(hdrBytes[8:]))
-
- req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:])
- req.Cas = binary.BigEndian.Uint64(hdrBytes[16:])
- return
-}
-
-func (req *MCRequest) receiveRegHeader(hdrBytes []byte) (elen, totalBodyLen int) {
- elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes)
- req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]))
- return
-}
-
-func (req *MCRequest) receiveFlexibleFramingHeader(hdrBytes []byte) (elen, totalBodyLen, framingElen int) {
- elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes)
-
- // For flexible framing header, key length is a single byte at byte index 3
- req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]) & 0x0ff)
- // Flexible framing lengh is a single byte at index 2
- framingElen = int(binary.BigEndian.Uint16(hdrBytes[2:]) >> 8)
- req.FramingElen = framingElen
- return
-}
-
-func (req *MCRequest) populateRegularBody(r io.Reader, totalBodyLen, elen int) (int, error) {
- var m int
- var err error
- if totalBodyLen > 0 {
- buf := make([]byte, totalBodyLen)
- m, err = io.ReadFull(r, buf)
- if err == nil {
- if req.Opcode >= TAP_MUTATION &&
- req.Opcode <= TAP_CHECKPOINT_END &&
- len(buf) > 1 {
- // In these commands there is "engine private"
- // data at the end of the extras. The first 2
- // bytes of extra data give its length.
- elen += int(binary.BigEndian.Uint16(buf))
- }
-
- req.Extras = buf[0:elen]
- req.Key = buf[elen : req.Keylen+elen]
-
- // get the length of extended metadata
- extMetaLen := 0
- if elen > 29 {
- extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
- }
-
- bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen
- if bodyLen > MaxBodyLen {
- return m, fmt.Errorf("%d is too big (max %d)",
- bodyLen, MaxBodyLen)
- }
-
- req.Body = buf[req.Keylen+elen : req.Keylen+elen+bodyLen]
- req.ExtMeta = buf[req.Keylen+elen+bodyLen:]
- }
- }
- return m, err
-}
-
-func (req *MCRequest) populateFlexBody(r io.Reader, totalBodyLen, elen, framingElen int) (int, error) {
- var m int
- var err error
- if totalBodyLen > 0 {
- buf := make([]byte, totalBodyLen)
- m, err = io.ReadFull(r, buf)
- if err != nil {
- return m, err
- }
- err = req.populateFlexBodyInternal(buf, totalBodyLen, elen, framingElen)
- }
- return m, err
-}
-
-func (req *MCRequest) populateFlexBodyInternal(buf []byte, totalBodyLen, elen, framingElen int) error {
- var halfByteOffset bool
- var err error
- if framingElen > 0 {
- var objs []FrameInfo
- objs, err, halfByteOffset = parseFrameInfoObjects(buf, framingElen)
- if err != nil {
- return err
- }
- req.FramingExtras = objs
- }
-
- err = req.populateFlexBodyAfterFrames(buf, totalBodyLen, elen, framingElen, halfByteOffset)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (req *MCRequest) populateFlexBodyAfterFrames(buf []byte, totalBodyLen, elen, framingElen int, halfByteOffset bool) error {
- var idxCursor int = framingElen
- if req.Opcode >= TAP_MUTATION && req.Opcode <= TAP_CHECKPOINT_END && len(buf[idxCursor:]) > 1 {
- // In these commands there is "engine private"
- // data at the end of the extras. The first 2
- // bytes of extra data give its length.
- if !halfByteOffset {
- elen += int(binary.BigEndian.Uint16(buf[idxCursor:]))
- } else {
- // 0 1 2 3 4 .... 19 20 21 22 ... 32
- // ^-----^ ^-------^ ^------------^
- // offset data do not care
- var buffer uint32 = binary.BigEndian.Uint32(buf[idxCursor:])
- buffer &= 0xffff000
- elen += int(buffer >> 12)
- }
- }
-
- // Get the extras
- if !halfByteOffset {
- req.Extras = buf[idxCursor : idxCursor+elen]
- } else {
- preShift := buf[idxCursor : idxCursor+elen+1]
- req.Extras = ShiftByteSliceLeft4Bits(preShift)
- }
- idxCursor += elen
-
- // Get the Key
- if !halfByteOffset {
- req.Key = buf[idxCursor : idxCursor+req.Keylen]
- } else {
- preShift := buf[idxCursor : idxCursor+req.Keylen+1]
- req.Key = ShiftByteSliceLeft4Bits(preShift)
- }
- idxCursor += req.Keylen
-
- // get the length of extended metadata
- extMetaLen := 0
- if elen > 29 {
- extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
- }
- idxCursor += extMetaLen
-
- bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen - framingElen
- if bodyLen > MaxBodyLen {
- return fmt.Errorf("%d is too big (max %d)",
- bodyLen, MaxBodyLen)
- }
-
- if !halfByteOffset {
- req.Body = buf[idxCursor : idxCursor+bodyLen]
- idxCursor += bodyLen
- } else {
- preShift := buf[idxCursor : idxCursor+bodyLen+1]
- req.Body = ShiftByteSliceLeft4Bits(preShift)
- idxCursor += bodyLen
- }
-
- if extMetaLen > 0 {
- if !halfByteOffset {
- req.ExtMeta = buf[idxCursor:]
- } else {
- preShift := buf[idxCursor:]
- req.ExtMeta = ShiftByteSliceLeft4Bits(preShift)
- }
- }
-
- return nil
-}
-
-// Receive will fill this MCRequest with the data from a reader.
-func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
- if len(hdrBytes) < HDR_LEN {
- hdrBytes = []byte{
- 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0}
- }
- n, err := io.ReadFull(r, hdrBytes)
- if err != nil {
- fmt.Printf("Err %v\n", err)
- return n, err
- }
-
- switch hdrBytes[0] {
- case RES_MAGIC:
- fallthrough
- case REQ_MAGIC:
- elen, totalBodyLen := req.receiveRegHeader(hdrBytes)
- bodyRead, err := req.populateRegularBody(r, totalBodyLen, elen)
- return n + bodyRead, err
- case FLEX_MAGIC:
- elen, totalBodyLen, framingElen := req.receiveFlexibleFramingHeader(hdrBytes)
- bodyRead, err := req.populateFlexBody(r, totalBodyLen, elen, framingElen)
- return n + bodyRead, err
- default:
- return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
- }
-}
diff --git a/vendor/github.com/couchbase/gomemcached/mc_res.go b/vendor/github.com/couchbase/gomemcached/mc_res.go
deleted file mode 100644
index 1e89020de2..0000000000
--- a/vendor/github.com/couchbase/gomemcached/mc_res.go
+++ /dev/null
@@ -1,280 +0,0 @@
-package gomemcached
-
-import (
- "encoding/binary"
- "fmt"
- "io"
- "sync"
-)
-
-// MCResponse is memcached response
-type MCResponse struct {
- // The command opcode of the command that sent the request
- Opcode CommandCode
- // The status of the response
- Status Status
- // The opaque sent in the request
- Opaque uint32
- // The CAS identifier (if applicable)
- Cas uint64
- // Extras, key, and body for this response
- Extras, Key, Body []byte
- // If true, this represents a fatal condition and we should hang up
- Fatal bool
- // Datatype identifier
- DataType uint8
-}
-
-// A debugging string representation of this response
-func (res MCResponse) String() string {
- return fmt.Sprintf("{MCResponse status=%v keylen=%d, extralen=%d, bodylen=%d}",
- res.Status, len(res.Key), len(res.Extras), len(res.Body))
-}
-
-// Response as an error.
-func (res *MCResponse) Error() string {
- return fmt.Sprintf("MCResponse status=%v, opcode=%v, opaque=%v, msg: %s",
- res.Status, res.Opcode, res.Opaque, string(res.Body))
-}
-
-func errStatus(e error) Status {
- status := UNKNOWN_STATUS
- if res, ok := e.(*MCResponse); ok {
- status = res.Status
- }
- return status
-}
-
-// IsNotFound is true if this error represents a "not found" response.
-func IsNotFound(e error) bool {
- return errStatus(e) == KEY_ENOENT
-}
-
-// IsFatal is false if this error isn't believed to be fatal to a connection.
-func IsFatal(e error) bool {
- if e == nil {
- return false
- }
- _, ok := isFatal[errStatus(e)]
- if ok {
- return true
- }
- return false
-}
-
-// Size is number of bytes this response consumes on the wire.
-func (res *MCResponse) Size() int {
- return HDR_LEN + len(res.Extras) + len(res.Key) + len(res.Body)
-}
-
-func (res *MCResponse) fillHeaderBytes(data []byte) int {
- pos := 0
- data[pos] = RES_MAGIC
- pos++
- data[pos] = byte(res.Opcode)
- pos++
- binary.BigEndian.PutUint16(data[pos:pos+2],
- uint16(len(res.Key)))
- pos += 2
-
- // 4
- data[pos] = byte(len(res.Extras))
- pos++
- // Data type
- if res.DataType != 0 {
- data[pos] = byte(res.DataType)
- } else {
- data[pos] = 0
- }
- pos++
- binary.BigEndian.PutUint16(data[pos:pos+2], uint16(res.Status))
- pos += 2
-
- // 8
- binary.BigEndian.PutUint32(data[pos:pos+4],
- uint32(len(res.Body)+len(res.Key)+len(res.Extras)))
- pos += 4
-
- // 12
- binary.BigEndian.PutUint32(data[pos:pos+4], res.Opaque)
- pos += 4
-
- // 16
- binary.BigEndian.PutUint64(data[pos:pos+8], res.Cas)
- pos += 8
-
- if len(res.Extras) > 0 {
- copy(data[pos:pos+len(res.Extras)], res.Extras)
- pos += len(res.Extras)
- }
-
- if len(res.Key) > 0 {
- copy(data[pos:pos+len(res.Key)], res.Key)
- pos += len(res.Key)
- }
-
- return pos
-}
-
-// HeaderBytes will get just the header bytes for this response.
-func (res *MCResponse) HeaderBytes() []byte {
- data := make([]byte, HDR_LEN+len(res.Extras)+len(res.Key))
-
- res.fillHeaderBytes(data)
-
- return data
-}
-
-// Bytes will return the actual bytes transmitted for this response.
-func (res *MCResponse) Bytes() []byte {
- data := make([]byte, res.Size())
-
- pos := res.fillHeaderBytes(data)
-
- copy(data[pos:pos+len(res.Body)], res.Body)
-
- return data
-}
-
-// Transmit will send this response message across a writer.
-func (res *MCResponse) Transmit(w io.Writer) (n int, err error) {
- if len(res.Body) < 128 {
- n, err = w.Write(res.Bytes())
- } else {
- n, err = w.Write(res.HeaderBytes())
- if err == nil {
- m := 0
- m, err = w.Write(res.Body)
- m += n
- }
- }
- return
-}
-
-// Receive will fill this MCResponse with the data from this reader.
-func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) {
- return res.ReceiveWithBuf(r, hdrBytes, nil)
-}
-
-// ReceiveWithBuf takes an optional pre-allocated []byte buf which
-// will be used if its capacity is large enough, otherwise a new
-// []byte slice is allocated.
-func (res *MCResponse) ReceiveWithBuf(r io.Reader, hdrBytes, buf []byte) (n int, err error) {
- if len(hdrBytes) < HDR_LEN {
- hdrBytes = []byte{
- 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0}
- }
- n, err = io.ReadFull(r, hdrBytes)
- if err != nil {
- return n, err
- }
-
- if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC {
- return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
- }
-
- klen := int(binary.BigEndian.Uint16(hdrBytes[2:4]))
- elen := int(hdrBytes[4])
-
- res.Opcode = CommandCode(hdrBytes[1])
- res.DataType = uint8(hdrBytes[5])
- res.Status = Status(binary.BigEndian.Uint16(hdrBytes[6:8]))
- res.Opaque = binary.BigEndian.Uint32(hdrBytes[12:16])
- res.Cas = binary.BigEndian.Uint64(hdrBytes[16:24])
-
- bodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:12])) - (klen + elen)
-
- //defer function to debug the panic seen with MB-15557
- defer func() {
- if e := recover(); e != nil {
- err = fmt.Errorf(`Panic in Receive. Response %v \n
- key len %v extra len %v bodylen %v`, res, klen, elen, bodyLen)
- }
- }()
-
- bufNeed := klen + elen + bodyLen
- if buf != nil && cap(buf) >= bufNeed {
- buf = buf[0:bufNeed]
- } else {
- buf = make([]byte, bufNeed)
- }
-
- m, err := io.ReadFull(r, buf)
- if err == nil {
- res.Extras = buf[0:elen]
- res.Key = buf[elen : klen+elen]
- res.Body = buf[klen+elen:]
- }
-
- return n + m, err
-}
-
-type MCResponsePool struct {
- pool *sync.Pool
-}
-
-func NewMCResponsePool() *MCResponsePool {
- rv := &MCResponsePool{
- pool: &sync.Pool{
- New: func() interface{} {
- return &MCResponse{}
- },
- },
- }
-
- return rv
-}
-
-func (this *MCResponsePool) Get() *MCResponse {
- return this.pool.Get().(*MCResponse)
-}
-
-func (this *MCResponsePool) Put(r *MCResponse) {
- if r == nil {
- return
- }
-
- r.Extras = nil
- r.Key = nil
- r.Body = nil
- r.Fatal = false
-
- this.pool.Put(r)
-}
-
-type StringMCResponsePool struct {
- pool *sync.Pool
- size int
-}
-
-func NewStringMCResponsePool(size int) *StringMCResponsePool {
- rv := &StringMCResponsePool{
- pool: &sync.Pool{
- New: func() interface{} {
- return make(map[string]*MCResponse, size)
- },
- },
- size: size,
- }
-
- return rv
-}
-
-func (this *StringMCResponsePool) Get() map[string]*MCResponse {
- return this.pool.Get().(map[string]*MCResponse)
-}
-
-func (this *StringMCResponsePool) Put(m map[string]*MCResponse) {
- if m == nil || len(m) > 2*this.size {
- return
- }
-
- for k := range m {
- m[k] = nil
- delete(m, k)
- }
-
- this.pool.Put(m)
-}
diff --git a/vendor/github.com/couchbase/gomemcached/tap.go b/vendor/github.com/couchbase/gomemcached/tap.go
deleted file mode 100644
index e48623281b..0000000000
--- a/vendor/github.com/couchbase/gomemcached/tap.go
+++ /dev/null
@@ -1,168 +0,0 @@
-package gomemcached
-
-import (
- "bytes"
- "encoding/binary"
- "fmt"
- "io"
- "io/ioutil"
- "strings"
-)
-
-type TapConnectFlag uint32
-
-// Tap connect option flags
-const (
- BACKFILL = TapConnectFlag(0x01)
- DUMP = TapConnectFlag(0x02)
- LIST_VBUCKETS = TapConnectFlag(0x04)
- TAKEOVER_VBUCKETS = TapConnectFlag(0x08)
- SUPPORT_ACK = TapConnectFlag(0x10)
- REQUEST_KEYS_ONLY = TapConnectFlag(0x20)
- CHECKPOINT = TapConnectFlag(0x40)
- REGISTERED_CLIENT = TapConnectFlag(0x80)
- FIX_FLAG_BYTEORDER = TapConnectFlag(0x100)
-)
-
-// Tap opaque event subtypes
-const (
- TAP_OPAQUE_ENABLE_AUTO_NACK = 0
- TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1
- TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2
- TAP_OPAQUE_CLOSE_TAP_STREAM = 7
- TAP_OPAQUE_CLOSE_BACKFILL = 8
-)
-
-// Tap item flags
-const (
- TAP_ACK = 1
- TAP_NO_VALUE = 2
- TAP_FLAG_NETWORK_BYTE_ORDER = 4
-)
-
-// TapConnectFlagNames for TapConnectFlag
-var TapConnectFlagNames = map[TapConnectFlag]string{
- BACKFILL: "BACKFILL",
- DUMP: "DUMP",
- LIST_VBUCKETS: "LIST_VBUCKETS",
- TAKEOVER_VBUCKETS: "TAKEOVER_VBUCKETS",
- SUPPORT_ACK: "SUPPORT_ACK",
- REQUEST_KEYS_ONLY: "REQUEST_KEYS_ONLY",
- CHECKPOINT: "CHECKPOINT",
- REGISTERED_CLIENT: "REGISTERED_CLIENT",
- FIX_FLAG_BYTEORDER: "FIX_FLAG_BYTEORDER",
-}
-
-// TapItemParser is a function to parse a single tap extra.
-type TapItemParser func(io.Reader) (interface{}, error)
-
-// TapParseUint64 is a function to parse a single tap uint64.
-func TapParseUint64(r io.Reader) (interface{}, error) {
- var rv uint64
- err := binary.Read(r, binary.BigEndian, &rv)
- return rv, err
-}
-
-// TapParseUint16 is a function to parse a single tap uint16.
-func TapParseUint16(r io.Reader) (interface{}, error) {
- var rv uint16
- err := binary.Read(r, binary.BigEndian, &rv)
- return rv, err
-}
-
-// TapParseBool is a function to parse a single tap boolean.
-func TapParseBool(r io.Reader) (interface{}, error) {
- return true, nil
-}
-
-// TapParseVBList parses a list of vBucket numbers as []uint16.
-func TapParseVBList(r io.Reader) (interface{}, error) {
- num, err := TapParseUint16(r)
- if err != nil {
- return nil, err
- }
- n := int(num.(uint16))
-
- rv := make([]uint16, n)
- for i := 0; i < n; i++ {
- x, err := TapParseUint16(r)
- if err != nil {
- return nil, err
- }
- rv[i] = x.(uint16)
- }
-
- return rv, err
-}
-
-// TapFlagParsers parser functions for TAP fields.
-var TapFlagParsers = map[TapConnectFlag]TapItemParser{
- BACKFILL: TapParseUint64,
- LIST_VBUCKETS: TapParseVBList,
-}
-
-// SplitFlags will split the ORed flags into the individual bit flags.
-func (f TapConnectFlag) SplitFlags() []TapConnectFlag {
- rv := []TapConnectFlag{}
- for i := uint32(1); f != 0; i = i << 1 {
- if uint32(f)&i == i {
- rv = append(rv, TapConnectFlag(i))
- }
- f = TapConnectFlag(uint32(f) & (^i))
- }
- return rv
-}
-
-func (f TapConnectFlag) String() string {
- parts := []string{}
- for _, x := range f.SplitFlags() {
- p := TapConnectFlagNames[x]
- if p == "" {
- p = fmt.Sprintf("0x%x", int(x))
- }
- parts = append(parts, p)
- }
- return strings.Join(parts, "|")
-}
-
-type TapConnect struct {
- Flags map[TapConnectFlag]interface{}
- RemainingBody []byte
- Name string
-}
-
-// ParseTapCommands parse the tap request into the interesting bits we may
-// need to do something with.
-func (req *MCRequest) ParseTapCommands() (TapConnect, error) {
- rv := TapConnect{
- Flags: map[TapConnectFlag]interface{}{},
- Name: string(req.Key),
- }
-
- if len(req.Extras) < 4 {
- return rv, fmt.Errorf("not enough extra bytes: %x", req.Extras)
- }
-
- flags := TapConnectFlag(binary.BigEndian.Uint32(req.Extras))
-
- r := bytes.NewReader(req.Body)
-
- for _, f := range flags.SplitFlags() {
- fun := TapFlagParsers[f]
- if fun == nil {
- fun = TapParseBool
- }
-
- val, err := fun(r)
- if err != nil {
- return rv, err
- }
-
- rv.Flags[f] = val
- }
-
- var err error
- rv.RemainingBody, err = ioutil.ReadAll(r)
-
- return rv, err
-}