]> source.dussan.org Git - gitea.git/commitdiff
chore: update github.com/couchbase/gomemcached and github.com/couchbase/go-couchbase...
authorAntoine GIRARD <sapk@users.noreply.github.com>
Thu, 19 Dec 2019 02:03:26 +0000 (03:03 +0100)
committertechknowlogick <techknowlogick@gitea.io>
Thu, 19 Dec 2019 02:03:26 +0000 (21:03 -0500)
12 files changed:
go.mod
go.sum
vendor/github.com/couchbase/gomemcached/client/collections_filter.go [new file with mode: 0644]
vendor/github.com/couchbase/gomemcached/client/mc.go
vendor/github.com/couchbase/gomemcached/client/upr_event.go [new file with mode: 0644]
vendor/github.com/couchbase/gomemcached/client/upr_feed.go
vendor/github.com/couchbase/gomemcached/flexibleFraming.go [new file with mode: 0644]
vendor/github.com/couchbase/gomemcached/mc_constants.go
vendor/github.com/couchbase/gomemcached/mc_req.go
vendor/github.com/couchbase/gomemcached/mc_res.go
vendor/github.com/couchbase/goutils/LICENSE.md
vendor/modules.txt

diff --git a/go.mod b/go.mod
index 89f92e574ed74ba6c60760e80b2f691fe7d4dfd7..21778e9b6780dcaec7cef61f5732b71ec5b77c95 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -24,6 +24,8 @@ require (
        github.com/blevesearch/go-porterstemmer v1.0.2 // indirect
        github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
        github.com/boombuler/barcode v0.0.0-20161226211916-fe0f26ff6d26 // indirect
+       github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 // indirect
+       github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 // indirect
        github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd // indirect
        github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect
        github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
diff --git a/go.sum b/go.sum
index 5fb1dff7c4544319d1882a312953d4e3e116b006..5563e2d61942bfe433eb306e2ed2c1ca6e1cd2da 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -97,8 +97,12 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
 github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d h1:XMf4E1U+b9E3ElF0mjvfXZdflBRZz4gLp16nQ/QSHQM=
 github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
+github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 h1:vZryARwW4PSFXd9arwegEywvMTvPuXL3/oa+4L5NTe8=
+github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
 github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b h1:bZ9rKU2/V8sY+NulSfxDOnXTWcs1rySqdF1sVepihvo=
 github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
+github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 h1:0WMIDtuXCKEm4wtAJgAAXa/qtM5O9MariLwgHaRlYmk=
+github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
 github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd h1:zeuJhcG3f8eePshH3KxkNE+Xtl53pVln9MOUPMyr/1w=
 github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd/go.mod h1:xbc8Ff/oG7h2ejd7AlwOpfd+6QZntc92ygpAOfGwcKY=
 github.com/couchbaselabs/go-couchbase v0.0.0-20190708161019-23e7ca2ce2b7 h1:1XjEY/gnjQ+AfXef2U6dxCquhiRzkEpxZuWqs+QxTL8=
diff --git a/vendor/github.com/couchbase/gomemcached/client/collections_filter.go b/vendor/github.com/couchbase/gomemcached/client/collections_filter.go
new file mode 100644 (file)
index 0000000..0bedae1
--- /dev/null
@@ -0,0 +1,123 @@
+package memcached
+
+import (
+       "encoding/json"
+       "fmt"
+)
+
+// Collection based filter
+type CollectionsFilter struct {
+       ManifestUid    uint64
+       UseManifestUid bool
+       StreamId       uint16
+       UseStreamId    bool
+
+       // Use either ScopeId OR CollectionsList, not both
+       CollectionsList []uint32
+       ScopeId         uint32
+}
+
+type nonStreamIdNonResumeScopeMeta struct {
+       ScopeId string `json:"scope"`
+}
+
+type nonStreamIdResumeScopeMeta struct {
+       ManifestId string `json:"uid"`
+}
+
+type nonStreamIdNonResumeCollectionsMeta struct {
+       CollectionsList []string `json:"collections"`
+}
+
+type nonStreamIdResumeCollectionsMeta struct {
+       ManifestId      string   `json:"uid"`
+       CollectionsList []string `json:"collections"`
+}
+
+type streamIdNonResumeCollectionsMeta struct {
+       CollectionsList []string `json:"collections"`
+       StreamId        uint16   `json:"sid"`
+}
+
+type streamIdNonResumeScopeMeta struct {
+       ScopeId  string `json:"scope"`
+       StreamId uint16 `json:"sid"`
+}
+
+func (c *CollectionsFilter) IsValid() error {
+       if c.UseManifestUid {
+               return fmt.Errorf("Not implemented yet")
+       }
+
+       if len(c.CollectionsList) > 0 && c.ScopeId > 0 {
+               return fmt.Errorf("Collection list is specified but scope ID is also specified")
+       }
+
+       return nil
+}
+
+func (c *CollectionsFilter) outputCollectionsFilterColList() (outputList []string) {
+       for _, collectionUint := range c.CollectionsList {
+               outputList = append(outputList, fmt.Sprintf("%x", collectionUint))
+       }
+       return
+}
+
+func (c *CollectionsFilter) outputScopeId() string {
+       return fmt.Sprintf("%x", c.ScopeId)
+}
+
+func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) {
+       if err := c.IsValid(); err != nil {
+               return nil, err
+       }
+
+       var output interface{}
+
+       switch c.UseStreamId {
+       case true:
+               switch c.UseManifestUid {
+               case true:
+                       // TODO
+                       return nil, fmt.Errorf("NotImplemented0")
+               case false:
+                       switch len(c.CollectionsList) > 0 {
+                       case true:
+                               filter := &streamIdNonResumeCollectionsMeta{
+                                       StreamId:        c.StreamId,
+                                       CollectionsList: c.outputCollectionsFilterColList(),
+                               }
+                               output = *filter
+                       case false:
+                               filter := &streamIdNonResumeScopeMeta{
+                                       StreamId: c.StreamId,
+                                       ScopeId:  c.outputScopeId(),
+                               }
+                               output = *filter
+                       }
+               }
+       case false:
+               switch c.UseManifestUid {
+               case true:
+                       // TODO
+                       return nil, fmt.Errorf("NotImplemented1")
+               case false:
+                       switch len(c.CollectionsList) > 0 {
+                       case true:
+                               filter := &nonStreamIdNonResumeCollectionsMeta{
+                                       CollectionsList: c.outputCollectionsFilterColList(),
+                               }
+                               output = *filter
+                       case false:
+                               output = nonStreamIdNonResumeScopeMeta{ScopeId: c.outputScopeId()}
+                       }
+               }
+       }
+
+       data, err := json.Marshal(output)
+       if err != nil {
+               return nil, err
+       } else {
+               return data, nil
+       }
+}
index 0f1d61e5129c14229fb7a3ae63e07b8aa85d4901..66c897c5d6b14bb7e7d0020ab074ebed0ea3b64a 100644 (file)
@@ -28,10 +28,12 @@ type ClientIface interface {
        CASNext(vb uint16, k string, exp int, state *CASState) bool
        CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
        CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
+       CollectionEnabled() bool
        Close() error
        Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
        Del(vb uint16, key string) (*gomemcached.MCResponse, error)
        EnableMutationToken() (*gomemcached.MCResponse, error)
+       EnableFeatures(features Features) (*gomemcached.MCResponse, error)
        Get(vb uint16, key string) (*gomemcached.MCResponse, error)
        GetCollectionsManifest() (*gomemcached.MCResponse, error)
        GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error)
@@ -76,9 +78,12 @@ var Healthy uint32 = 1
 type Features []Feature
 type Feature uint16
 
-const FeatureMutationToken = Feature(0x04)
+const FeatureTcpNoDelay = Feature(0x03)
+const FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations
 const FeatureXattr = Feature(0x06)
+const FeatureXerror = Feature(0x07)
 const FeatureCollections = Feature(0x12)
+const FeatureSnappyCompression = Feature(0x0a)
 const FeatureDataType = Feature(0x0b)
 
 type memcachedConnection interface {
@@ -96,6 +101,9 @@ type Client struct {
        opaque  uint32
 
        hdrBuf []byte
+
+       featureMtx       sync.RWMutex
+       sentHeloFeatures Features
 }
 
 var (
@@ -285,6 +293,10 @@ func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, err
                binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
        }
 
+       c.featureMtx.Lock()
+       c.sentHeloFeatures = features
+       c.featureMtx.Unlock()
+
        return c.Send(&gomemcached.MCRequest{
                Opcode: gomemcached.HELLO,
                Key:    []byte("GoMemcached"),
@@ -363,6 +375,18 @@ func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcache
        return res, nil
 }
 
+func (c *Client) CollectionEnabled() bool {
+       c.featureMtx.RLock()
+       defer c.featureMtx.RUnlock()
+
+       for _, feature := range c.sentHeloFeatures {
+               if feature == FeatureCollections {
+                       return true
+               }
+       }
+       return false
+}
+
 // Get the value for a key, and update expiry
 func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) {
        extraBuf := make([]byte, 4)
@@ -1138,3 +1162,7 @@ func IfResStatusError(response *gomemcached.MCResponse) bool {
                        response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
                        response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
 }
+
+func (c *Client) Conn() io.ReadWriteCloser {
+       return c.conn
+}
diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_event.go b/vendor/github.com/couchbase/gomemcached/client/upr_event.go
new file mode 100644 (file)
index 0000000..31e0abf
--- /dev/null
@@ -0,0 +1,346 @@
+package memcached
+
+import (
+       "encoding/binary"
+       "fmt"
+       "github.com/couchbase/gomemcached"
+       "math"
+)
+
+type SystemEventType int
+
+const InvalidSysEvent SystemEventType = -1
+
+const (
+       CollectionCreate  SystemEventType = 0
+       CollectionDrop    SystemEventType = iota
+       CollectionFlush   SystemEventType = iota // KV did not implement
+       ScopeCreate       SystemEventType = iota
+       ScopeDrop         SystemEventType = iota
+       CollectionChanged SystemEventType = iota
+)
+
+type ScopeCreateEvent interface {
+       GetSystemEventName() (string, error)
+       GetScopeId() (uint32, error)
+       GetManifestId() (uint64, error)
+}
+
+type CollectionCreateEvent interface {
+       GetSystemEventName() (string, error)
+       GetScopeId() (uint32, error)
+       GetCollectionId() (uint32, error)
+       GetManifestId() (uint64, error)
+       GetMaxTTL() (uint32, error)
+}
+
+type CollectionDropEvent interface {
+       GetScopeId() (uint32, error)
+       GetCollectionId() (uint32, error)
+       GetManifestId() (uint64, error)
+}
+
+type ScopeDropEvent interface {
+       GetScopeId() (uint32, error)
+       GetManifestId() (uint64, error)
+}
+
+type CollectionChangedEvent interface {
+       GetCollectionId() (uint32, error)
+       GetManifestId() (uint64, error)
+       GetMaxTTL() (uint32, error)
+}
+
+var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
+var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
+var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
+var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")
+
+// UprEvent memcached events for UPR streams.
+type UprEvent struct {
+       Opcode          gomemcached.CommandCode // Type of event
+       Status          gomemcached.Status      // Response status
+       VBucket         uint16                  // VBucket this event applies to
+       DataType        uint8                   // data type
+       Opaque          uint16                  // 16 MSB of opaque
+       VBuuid          uint64                  // This field is set by downstream
+       Flags           uint32                  // Item flags
+       Expiry          uint32                  // Item expiration time
+       Key, Value      []byte                  // Item key/value
+       OldValue        []byte                  // TODO: TBD: old document value
+       Cas             uint64                  // CAS value of the item
+       Seqno           uint64                  // sequence number of the mutation
+       RevSeqno        uint64                  // rev sequence number : deletions
+       LockTime        uint32                  // Lock time
+       MetadataSize    uint16                  // Metadata size
+       SnapstartSeq    uint64                  // start sequence number of this snapshot
+       SnapendSeq      uint64                  // End sequence number of the snapshot
+       SnapshotType    uint32                  // 0: disk 1: memory
+       FailoverLog     *FailoverLog            // Failover log containing vvuid and sequnce number
+       Error           error                   // Error value in case of a failure
+       ExtMeta         []byte                  // Extended Metadata
+       AckSize         uint32                  // The number of bytes that can be Acked to DCP
+       SystemEvent     SystemEventType         // Only valid if IsSystemEvent() is true
+       SysEventVersion uint8                   // Based on the version, the way Extra bytes is parsed is different
+       ValueLen        int                     // Cache it to avoid len() calls for performance
+       CollectionId    uint64                  // Valid if Collection is in use
+}
+
+// FailoverLog containing vvuid and sequnce number
+type FailoverLog [][2]uint64
+
+func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
+       event := &UprEvent{
+               Opcode:       rq.Opcode,
+               VBucket:      stream.Vbucket,
+               VBuuid:       stream.Vbuuid,
+               Value:        rq.Body,
+               Cas:          rq.Cas,
+               ExtMeta:      rq.ExtMeta,
+               DataType:     rq.DataType,
+               ValueLen:     len(rq.Body),
+               SystemEvent:  InvalidSysEvent,
+               CollectionId: math.MaxUint64,
+       }
+
+       event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType)
+
+       // set AckSize for events that need to be acked to DCP,
+       // i.e., events with CommandCodes that need to be buffered in DCP
+       if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
+               event.AckSize = uint32(bytesReceivedFromDCP)
+       }
+
+       // 16 LSBits are used by client library to encode vbucket number.
+       // 16 MSBits are left for application to multiplex on opaque value.
+       event.Opaque = appOpaque(rq.Opaque)
+
+       if len(rq.Extras) >= uprMutationExtraLen &&
+               event.Opcode == gomemcached.UPR_MUTATION {
+
+               event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+               event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+               event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
+               event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
+               event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
+               event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
+
+       } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
+               event.Opcode == gomemcached.UPR_DELETION {
+
+               event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+               event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+               event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
+
+       } else if len(rq.Extras) >= uprDeletetionExtraLen &&
+               event.Opcode == gomemcached.UPR_DELETION ||
+               event.Opcode == gomemcached.UPR_EXPIRATION {
+
+               event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+               event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+               event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
+
+       } else if len(rq.Extras) >= uprSnapshotExtraLen &&
+               event.Opcode == gomemcached.UPR_SNAPSHOT {
+
+               event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
+               event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
+               event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
+       } else if event.IsSystemEvent() {
+               event.PopulateEvent(rq.Extras)
+       }
+
+       return event
+}
+
+func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) {
+       switch streamType {
+       case CollectionsNonStreamId:
+               switch rq.Opcode {
+               // Only these will have CID encoded within the key
+               case gomemcached.UPR_MUTATION,
+                       gomemcached.UPR_DELETION,
+                       gomemcached.UPR_EXPIRATION:
+                       uleb128 := Uleb128(rq.Key)
+                       result, bytesShifted := uleb128.ToUint64(rq.Keylen)
+                       event.CollectionId = result
+                       event.Key = rq.Key[bytesShifted:]
+               default:
+                       event.Key = rq.Key
+               }
+       case CollectionsStreamId:
+               // TODO - not implemented
+               fallthrough
+       case NonCollectionStream:
+               // Let default behavior be legacy stream type
+               fallthrough
+       default:
+               event.Key = rq.Key
+       }
+}
+
+func (event *UprEvent) String() string {
+       name := gomemcached.CommandNames[event.Opcode]
+       if name == "" {
+               name = fmt.Sprintf("#%d", event.Opcode)
+       }
+       return name
+}
+
+func (event *UprEvent) IsSnappyDataType() bool {
+       return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
+}
+
+func (event *UprEvent) IsCollectionType() bool {
+       return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32
+}
+
+func (event *UprEvent) IsSystemEvent() bool {
+       return event.Opcode == gomemcached.DCP_SYSTEM_EVENT
+}
+
+func (event *UprEvent) PopulateEvent(extras []byte) {
+       if len(extras) < dcpSystemEventExtraLen {
+               // Wrong length, don't parse
+               return
+       }
+       event.Seqno = binary.BigEndian.Uint64(extras[:8])
+       event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12]))
+       var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14])
+       event.SysEventVersion = uint8(versionTemp >> 8)
+}
+
+func (event *UprEvent) GetSystemEventName() (string, error) {
+       switch event.SystemEvent {
+       case CollectionCreate:
+               fallthrough
+       case ScopeCreate:
+               return string(event.Key), nil
+       default:
+               return "", ErrorInvalidOp
+       }
+}
+
+func (event *UprEvent) GetManifestId() (uint64, error) {
+       switch event.SystemEvent {
+       // Version 0 only checks
+       case CollectionChanged:
+               fallthrough
+       case ScopeDrop:
+               fallthrough
+       case ScopeCreate:
+               fallthrough
+       case CollectionDrop:
+               if event.SysEventVersion > 0 {
+                       return 0, ErrorInvalidVersion
+               }
+               fallthrough
+       case CollectionCreate:
+               // CollectionCreate supports version 1
+               if event.SysEventVersion > 1 {
+                       return 0, ErrorInvalidVersion
+               }
+               if event.ValueLen < 8 {
+                       return 0, ErrorValueTooShort
+               }
+               return binary.BigEndian.Uint64(event.Value[0:8]), nil
+       default:
+               return 0, ErrorInvalidOp
+       }
+}
+
+func (event *UprEvent) GetCollectionId() (uint32, error) {
+       switch event.SystemEvent {
+       case CollectionDrop:
+               if event.SysEventVersion > 0 {
+                       return 0, ErrorInvalidVersion
+               }
+               fallthrough
+       case CollectionCreate:
+               if event.SysEventVersion > 1 {
+                       return 0, ErrorInvalidVersion
+               }
+               if event.ValueLen < 16 {
+                       return 0, ErrorValueTooShort
+               }
+               return binary.BigEndian.Uint32(event.Value[12:16]), nil
+       case CollectionChanged:
+               if event.SysEventVersion > 0 {
+                       return 0, ErrorInvalidVersion
+               }
+               if event.ValueLen < 12 {
+                       return 0, ErrorValueTooShort
+               }
+               return binary.BigEndian.Uint32(event.Value[8:12]), nil
+       default:
+               return 0, ErrorInvalidOp
+       }
+}
+
+func (event *UprEvent) GetScopeId() (uint32, error) {
+       switch event.SystemEvent {
+       // version 0 checks
+       case ScopeCreate:
+               fallthrough
+       case ScopeDrop:
+               fallthrough
+       case CollectionDrop:
+               if event.SysEventVersion > 0 {
+                       return 0, ErrorInvalidVersion
+               }
+               fallthrough
+       case CollectionCreate:
+               // CollectionCreate could be either 0 or 1
+               if event.SysEventVersion > 1 {
+                       return 0, ErrorInvalidVersion
+               }
+               if event.ValueLen < 12 {
+                       return 0, ErrorValueTooShort
+               }
+               return binary.BigEndian.Uint32(event.Value[8:12]), nil
+       default:
+               return 0, ErrorInvalidOp
+       }
+}
+
+func (event *UprEvent) GetMaxTTL() (uint32, error) {
+       switch event.SystemEvent {
+       case CollectionCreate:
+               if event.SysEventVersion < 1 {
+                       return 0, ErrorNoMaxTTL
+               }
+               if event.ValueLen < 20 {
+                       return 0, ErrorValueTooShort
+               }
+               return binary.BigEndian.Uint32(event.Value[16:20]), nil
+       case CollectionChanged:
+               if event.SysEventVersion > 0 {
+                       return 0, ErrorInvalidVersion
+               }
+               if event.ValueLen < 16 {
+                       return 0, ErrorValueTooShort
+               }
+               return binary.BigEndian.Uint32(event.Value[12:16]), nil
+       default:
+               return 0, ErrorInvalidOp
+       }
+}
+
+type Uleb128 []byte
+
+func (u Uleb128) ToUint64(cachedLen int) (result uint64, bytesShifted int) {
+       var shift uint = 0
+
+       for curByte := 0; curByte < cachedLen; curByte++ {
+               oneByte := u[curByte]
+               last7Bits := 0x7f & oneByte
+               result |= uint64(last7Bits) << shift
+               bytesShifted++
+               if oneByte&0x80 == 0 {
+                       break
+               }
+               shift += 7
+       }
+
+       return
+}
index 95fa12577f315a284ff427e776f45f7d80f70ae2..085b03c14582152354918c80632bcbbacecc6aaf 100644 (file)
@@ -19,6 +19,7 @@ const uprMutationExtraLen = 30
 const uprDeletetionExtraLen = 18
 const uprDeletetionWithDeletionTimeExtraLen = 21
 const uprSnapshotExtraLen = 20
+const dcpSystemEventExtraLen = 13
 const bufferAckThreshold = 0.2
 const opaqueOpen = 0xBEAF0001
 const opaqueFailover = 0xDEADBEEF
@@ -27,32 +28,6 @@ const uprDefaultNoopInterval = 120
 // Counter on top of opaqueOpen that others can draw from for open and control msgs
 var opaqueOpenCtrlWell uint32 = opaqueOpen
 
-// UprEvent memcached events for UPR streams.
-type UprEvent struct {
-       Opcode       gomemcached.CommandCode // Type of event
-       Status       gomemcached.Status      // Response status
-       VBucket      uint16                  // VBucket this event applies to
-       DataType     uint8                   // data type
-       Opaque       uint16                  // 16 MSB of opaque
-       VBuuid       uint64                  // This field is set by downstream
-       Flags        uint32                  // Item flags
-       Expiry       uint32                  // Item expiration time
-       Key, Value   []byte                  // Item key/value
-       OldValue     []byte                  // TODO: TBD: old document value
-       Cas          uint64                  // CAS value of the item
-       Seqno        uint64                  // sequence number of the mutation
-       RevSeqno     uint64                  // rev sequence number : deletions
-       LockTime     uint32                  // Lock time
-       MetadataSize uint16                  // Metadata size
-       SnapstartSeq uint64                  // start sequence number of this snapshot
-       SnapendSeq   uint64                  // End sequence number of the snapshot
-       SnapshotType uint32                  // 0: disk 1: memory
-       FailoverLog  *FailoverLog            // Failover log containing vvuid and sequnce number
-       Error        error                   // Error value in case of a failure
-       ExtMeta      []byte
-       AckSize      uint32 // The number of bytes that can be Acked to DCP
-}
-
 type PriorityType string
 
 // high > medium > disabled > low
@@ -63,13 +38,39 @@ const (
        PriorityHigh     PriorityType = "high"
 )
 
+type DcpStreamType int32
+
+var UninitializedStream DcpStreamType = -1
+
+const (
+       NonCollectionStream    DcpStreamType = 0
+       CollectionsNonStreamId DcpStreamType = iota
+       CollectionsStreamId    DcpStreamType = iota
+)
+
+func (t DcpStreamType) String() string {
+       switch t {
+       case UninitializedStream:
+               return "Un-Initialized Stream"
+       case NonCollectionStream:
+               return "Traditional Non-Collection Stream"
+       case CollectionsNonStreamId:
+               return "Collections Stream without StreamID"
+       case CollectionsStreamId:
+               return "Collection Stream with StreamID"
+       default:
+               return "Unknown Stream Type"
+       }
+}
+
 // UprStream is per stream data structure over an UPR Connection.
 type UprStream struct {
-       Vbucket   uint16 // Vbucket id
-       Vbuuid    uint64 // vbucket uuid
-       StartSeq  uint64 // start sequence number
-       EndSeq    uint64 // end sequence number
-       connected bool
+       Vbucket    uint16 // Vbucket id
+       Vbuuid     uint64 // vbucket uuid
+       StartSeq   uint64 // start sequence number
+       EndSeq     uint64 // end sequence number
+       connected  bool
+       StreamType DcpStreamType
 }
 
 type FeedState int
@@ -113,6 +114,7 @@ type UprFeatures struct {
        IncludeDeletionTime bool
        DcpPriority         PriorityType
        EnableExpiry        bool
+       EnableStreamId      bool
 }
 
 /**
@@ -274,9 +276,15 @@ type UprFeed struct {
        // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP
        // if flag is false, upr feed will track how many bytes it has sent to client
        // and use that to determine whether/when to send ack to DCP
-       ackByClient bool
-       feedState   FeedState
-       muFeedState sync.RWMutex
+       ackByClient       bool
+       feedState         FeedState
+       muFeedState       sync.RWMutex
+       activatedFeatures UprFeatures
+       collectionEnabled bool // This is needed separately because parsing depends on this
+       // DCP StreamID allows multiple filtered collection streams to share a single DCP Stream
+       // It is not allowed once a regular/legacy stream was started originally
+       streamsType        DcpStreamType
+       initStreamTypeOnce sync.Once
 }
 
 // Exported interface - to allow for mocking
@@ -296,6 +304,9 @@ type UprFeedIface interface {
        UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
        // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response
        SetPriorityAsync(p PriorityType) error
+
+       // Various Collection-Type RequestStreams
+       UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error
 }
 
 type UprStats struct {
@@ -305,9 +316,6 @@ type UprStats struct {
        TotalSnapShot      uint64
 }
 
-// FailoverLog containing vvuid and sequnce number
-type FailoverLog [][2]uint64
-
 // error codes
 var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
 
@@ -320,76 +328,6 @@ func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) {
        return vbuuid, seqno, ErrorInvalidLog
 }
 
-func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
-       event := &UprEvent{
-               Opcode:   rq.Opcode,
-               VBucket:  stream.Vbucket,
-               VBuuid:   stream.Vbuuid,
-               Key:      rq.Key,
-               Value:    rq.Body,
-               Cas:      rq.Cas,
-               ExtMeta:  rq.ExtMeta,
-               DataType: rq.DataType,
-       }
-
-       // set AckSize for events that need to be acked to DCP,
-       // i.e., events with CommandCodes that need to be buffered in DCP
-       if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
-               event.AckSize = uint32(bytesReceivedFromDCP)
-       }
-
-       // 16 LSBits are used by client library to encode vbucket number.
-       // 16 MSBits are left for application to multiplex on opaque value.
-       event.Opaque = appOpaque(rq.Opaque)
-
-       if len(rq.Extras) >= uprMutationExtraLen &&
-               event.Opcode == gomemcached.UPR_MUTATION {
-
-               event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
-               event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
-               event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
-               event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
-               event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
-               event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
-
-       } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
-               event.Opcode == gomemcached.UPR_DELETION {
-
-               event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
-               event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
-               event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
-
-       } else if len(rq.Extras) >= uprDeletetionExtraLen &&
-               event.Opcode == gomemcached.UPR_DELETION ||
-               event.Opcode == gomemcached.UPR_EXPIRATION {
-
-               event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
-               event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
-               event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
-
-       } else if len(rq.Extras) >= uprSnapshotExtraLen &&
-               event.Opcode == gomemcached.UPR_SNAPSHOT {
-
-               event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
-               event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
-               event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
-       }
-
-       return event
-}
-
-func (event *UprEvent) String() string {
-       name := gomemcached.CommandNames[event.Opcode]
-       if name == "" {
-               name = fmt.Sprintf("#%d", event.Opcode)
-       }
-       return name
-}
-
-func (event *UprEvent) IsSnappyDataType() bool {
-       return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
-}
-
 func (feed *UprFeed) sendCommands(mc *Client) {
        transmitCh := feed.transmitCh
        transmitCl := feed.transmitCl
@@ -420,6 +358,10 @@ func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) erro
        feed.muVbstreams.Lock()
        defer feed.muVbstreams.Unlock()
 
+       if feed.collectionEnabled {
+               stream.StreamType = feed.streamsType
+       }
+
        // Set this stream as the officially connected stream for this vb
        stream.connected = true
        feed.vbstreams[vbno] = stream
@@ -440,14 +382,15 @@ func (mc *Client) NewUprFeed() (*UprFeed, error) {
 }
 
 func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) {
-
        feed := &UprFeed{
-               conn:        mc,
-               closer:      make(chan bool, 1),
-               vbstreams:   make(map[uint16]*UprStream),
-               transmitCh:  make(chan *gomemcached.MCRequest),
-               transmitCl:  make(chan bool),
-               ackByClient: ackByClient,
+               conn:              mc,
+               closer:            make(chan bool, 1),
+               vbstreams:         make(map[uint16]*UprStream),
+               transmitCh:        make(chan *gomemcached.MCRequest),
+               transmitCl:        make(chan bool),
+               ackByClient:       ackByClient,
+               collectionEnabled: mc.CollectionEnabled(),
+               streamsType:       UninitializedStream,
        }
 
        feed.negotiator.initialize()
@@ -642,7 +585,22 @@ func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, featu
                activatedFeatures.EnableExpiry = true
        }
 
+       if features.EnableStreamId {
+               rq := &gomemcached.MCRequest{
+                       Opcode: gomemcached.UPR_CONTROL,
+                       Key:    []byte("enable_stream_id"),
+                       Body:   []byte("true"),
+                       Opaque: getUprOpenCtrlOpaque(),
+               }
+               err = sendMcRequestSync(feed.conn, rq)
+               if err != nil {
+                       return
+               }
+               activatedFeatures.EnableStreamId = true
+       }
+
        // everything is ok so far, set upr feed to open state
+       feed.activatedFeatures = activatedFeatures
        feed.setOpen()
        return
 }
@@ -689,10 +647,60 @@ func (mc *Client) UprGetFailoverLog(
 func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
        vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
 
+       return feed.UprRequestCollectionsStream(vbno, opaqueMSB, flags, vuuid, startSequence, endSequence, snapStart, snapEnd, nil)
+}
+
+func (feed *UprFeed) initStreamType(filter *CollectionsFilter) (err error) {
+       if filter != nil && filter.UseStreamId && !feed.activatedFeatures.EnableStreamId {
+               err = fmt.Errorf("Cannot use streamID based filter if the feed was not started with the streamID feature")
+               return
+       }
+
+       streamInitFunc := func() {
+               if feed.streamsType != UninitializedStream {
+                       // Shouldn't happen
+                       err = fmt.Errorf("The current feed has already been started in %v mode", feed.streamsType.String())
+               } else {
+                       if !feed.collectionEnabled {
+                               feed.streamsType = NonCollectionStream
+                       } else {
+                               if filter != nil && filter.UseStreamId {
+                                       feed.streamsType = CollectionsStreamId
+                               } else {
+                                       feed.streamsType = CollectionsNonStreamId
+                               }
+                       }
+               }
+       }
+       feed.initStreamTypeOnce.Do(streamInitFunc)
+       return
+}
+
+func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32,
+       vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error {
+
+       err := feed.initStreamType(filter)
+       if err != nil {
+               return err
+       }
+
+       var mcRequestBody []byte
+       if filter != nil {
+               err = filter.IsValid()
+               if err != nil {
+                       return err
+               }
+               mcRequestBody, err = filter.ToStreamReqBody()
+               if err != nil {
+                       return err
+               }
+       }
+
        rq := &gomemcached.MCRequest{
                Opcode:  gomemcached.UPR_STREAMREQ,
                VBucket: vbno,
                Opaque:  composeOpaque(vbno, opaqueMSB),
+               Body:    mcRequestBody,
        }
 
        rq.Extras = make([]byte, 48) // #Extras
@@ -700,15 +708,15 @@ func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
        binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
        binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
        binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
-       binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid)
+       binary.BigEndian.PutUint64(rq.Extras[24:32], vbuuid)
        binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
        binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
 
-       feed.negotiator.registerRequest(vbno, opaqueMSB, vuuid, startSequence, endSequence)
+       feed.negotiator.registerRequest(vbno, opaqueMSB, vbuuid, startSequence, endSequence)
        // Any client that has ever called this method, regardless of return code,
        // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit.
 
-       if err := feed.conn.Transmit(rq); err != nil {
+       if err = feed.conn.Transmit(rq); err != nil {
                logging.Errorf("Error in StreamRequest %s", err.Error())
                // If an error occurs during transmit, then the UPRFeed will keep the stream
                // in the vbstreams map. This is to prevent nil lookup from any previously
@@ -973,6 +981,12 @@ loop:
                                        if err := feed.conn.TransmitResponse(noop); err != nil {
                                                logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error())
                                        }
+                               case gomemcached.DCP_SYSTEM_EVENT:
+                                       if stream == nil {
+                                               logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+                                               break loop
+                                       }
+                                       event = makeUprEvent(pkt, stream, bytes)
                                default:
                                        logging.Infof("Recived an unknown response for vbucket %d", vb)
                                }
diff --git a/vendor/github.com/couchbase/gomemcached/flexibleFraming.go b/vendor/github.com/couchbase/gomemcached/flexibleFraming.go
new file mode 100644 (file)
index 0000000..6f75403
--- /dev/null
@@ -0,0 +1,381 @@
+package gomemcached
+
+import (
+       "encoding/binary"
+       "fmt"
+)
+
+type FrameObjType int
+
+const (
+       FrameBarrier     FrameObjType = iota
+       FrameDurability  FrameObjType = iota
+       FrameDcpStreamId FrameObjType = iota
+       FrameOpenTracing FrameObjType = iota
+)
+
+type FrameInfo struct {
+       ObjId   FrameObjType
+       ObjLen  int
+       ObjData []byte
+}
+
+var ErrorInvalidOp error = fmt.Errorf("Specified method is not applicable")
+var ErrorObjLenNotMatch error = fmt.Errorf("Object length does not match data")
+
+func (f *FrameInfo) Validate() error {
+       switch f.ObjId {
+       case FrameBarrier:
+               if f.ObjLen != 0 {
+                       return fmt.Errorf("Invalid FrameBarrier - length is %v\n", f.ObjLen)
+               } else if f.ObjLen != len(f.ObjData) {
+                       return ErrorObjLenNotMatch
+               }
+       case FrameDurability:
+               if f.ObjLen != 1 && f.ObjLen != 3 {
+                       return fmt.Errorf("Invalid FrameDurability - length is %v\n", f.ObjLen)
+               } else if f.ObjLen != len(f.ObjData) {
+                       return ErrorObjLenNotMatch
+               }
+       case FrameDcpStreamId:
+               if f.ObjLen != 2 {
+                       return fmt.Errorf("Invalid FrameDcpStreamId - length is %v\n", f.ObjLen)
+               } else if f.ObjLen != len(f.ObjData) {
+                       return ErrorObjLenNotMatch
+               }
+       case FrameOpenTracing:
+               if f.ObjLen == 0 {
+                       return fmt.Errorf("Invalid FrameOpenTracing - length must be > 0")
+               } else if f.ObjLen != len(f.ObjData) {
+                       return ErrorObjLenNotMatch
+               }
+       default:
+               return fmt.Errorf("Unknown FrameInfo type")
+       }
+       return nil
+}
+
+func (f *FrameInfo) GetStreamId() (uint16, error) {
+       if f.ObjId != FrameDcpStreamId {
+               return 0, ErrorInvalidOp
+       }
+
+       var output uint16
+       output = uint16(f.ObjData[0])
+       output = output << 8
+       output |= uint16(f.ObjData[1])
+       return output, nil
+}
+
+type DurabilityLvl uint8
+
+const (
+       DuraInvalid                    DurabilityLvl = iota // Not used (0x0)
+       DuraMajority                   DurabilityLvl = iota // (0x01)
+       DuraMajorityAndPersistOnMaster DurabilityLvl = iota // (0x02)
+       DuraPersistToMajority          DurabilityLvl = iota // (0x03)
+)
+
+func (f *FrameInfo) GetDurabilityRequirements() (lvl DurabilityLvl, timeoutProvided bool, timeoutMs uint16, err error) {
+       if f.ObjId != FrameDurability {
+               err = ErrorInvalidOp
+               return
+       }
+       if f.ObjLen != 1 && f.ObjLen != 3 {
+               err = ErrorObjLenNotMatch
+               return
+       }
+
+       lvl = DurabilityLvl(uint8(f.ObjData[0]))
+
+       if f.ObjLen == 3 {
+               timeoutProvided = true
+               timeoutMs = binary.BigEndian.Uint16(f.ObjData[1:2])
+       }
+
+       return
+}
+
+func incrementMarker(bitsToBeIncremented, byteIncrementCnt *int, framingElen, curObjIdx int) (int, error) {
+       for *bitsToBeIncremented >= 8 {
+               *byteIncrementCnt++
+               *bitsToBeIncremented -= 8
+       }
+       marker := curObjIdx + *byteIncrementCnt
+       if marker > framingElen {
+               return -1, fmt.Errorf("Out of bounds")
+       }
+       return marker, nil
+}
+
+// Right now, halfByteRemaining will always be false, because ObjID and Len haven't gotten that large yet
+func (f *FrameInfo) Bytes() (output []byte, halfByteRemaining bool) {
+       // ObjIdentifier - 4 bits + ObjLength - 4 bits
+       var idAndLen uint8
+       idAndLen |= uint8(f.ObjId) << 4
+       idAndLen |= uint8(f.ObjLen)
+       output = append(output, byte(idAndLen))
+
+       // Rest is Data
+       output = append(output, f.ObjData...)
+       return
+}
+
+func parseFrameInfoObjects(buf []byte, framingElen int) (objs []FrameInfo, err error, halfByteRemaining bool) {
+       var curObjIdx int
+       var byteIncrementCnt int
+       var bitsToBeIncremented int
+       var marker int
+
+       // Parse frameInfo objects
+       for curObjIdx = 0; curObjIdx < framingElen; curObjIdx += byteIncrementCnt {
+               byteIncrementCnt = 0
+               var oneFrameObj FrameInfo
+
+               // First get the objId
+               // -------------------------
+               var objId int
+               var objHeader uint8 = buf[curObjIdx]
+               var objIdentifierRaw uint8
+               if bitsToBeIncremented == 0 {
+                       // ObjHeader
+                       // 0 1 2 3 4 5 6 7
+                       // ^-----^
+                       // ObjIdentifierRaw
+                       objIdentifierRaw = (objHeader & 0xf0) >> 4
+               } else {
+                       // ObjHeader
+                       // 0 1 2 3 4 5 6 7
+                       //         ^-----^
+                       //           ObjIdentifierRaw
+                       objIdentifierRaw = (objHeader & 0x0f)
+               }
+               bitsToBeIncremented += 4
+
+               marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+               if err != nil {
+                       return
+               }
+
+               // Value is 0-14
+               objId = int(objIdentifierRaw & 0xe)
+               // If bit 15 is set, ID is 15 + value of next byte
+               if objIdentifierRaw&0x1 > 0 {
+                       if bitsToBeIncremented > 0 {
+                               // ObjHeader
+                               // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+                               // ^-----^ ^---------------^
+                               // ObjId1    Extension
+                               // ^ marker
+                               buffer := uint16(buf[marker])
+                               buffer = buffer << 8
+                               buffer |= uint16(buf[marker+1])
+                               var extension uint8 = uint8(buffer & 0xff0 >> 4)
+                               objId += int(extension)
+                       } else {
+                               // ObjHeader
+                               // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+                               //         ^-----^ ^-------------------^
+                               //          ObjId1    extension
+                               //                 ^ marker
+                               var extension uint8 = uint8(buf[marker])
+                               objId += int(extension)
+                       }
+                       bitsToBeIncremented += 8
+               }
+
+               marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+               if err != nil {
+                       return
+               }
+               oneFrameObj.ObjId = FrameObjType(objId)
+
+               // Then get the obj length
+               // -------------------------
+               var objLenRaw uint8
+               var objLen int
+               if bitsToBeIncremented > 0 {
+                       // ObjHeader
+                       // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+                       //                 ^         ^---------^
+                       //                 marker       objLen
+                       objLenRaw = uint8(buf[marker]) & 0x0f
+               } else {
+                       // ObjHeader
+                       // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
+                       //                                        ^--------^
+                       //                                          objLen
+                       //                                        ^ marker
+                       objLenRaw = uint8(buf[marker]) & 0xf0 >> 4
+               }
+               bitsToBeIncremented += 4
+
+               marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+               if err != nil {
+                       return
+               }
+
+               // Length is 0-14
+               objLen = int(objLenRaw & 0xe)
+               // If bit 15 is set, lenghth is 15 + value of next byte
+               if objLenRaw&0x1 > 0 {
+                       if bitsToBeIncremented == 0 {
+                               // ObjHeader
+                               // 12 13 14 15 16 17 18 19 20 21 22 23
+                               // ^---------^ ^--------------------^
+                               //   objLen        extension
+                               //             ^ marker
+                               var extension uint8 = uint8(buf[marker])
+                               objLen += int(extension)
+                       } else {
+                               // ObjHeader
+                               // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
+                               // ^--------^  ^---------------------^
+                               //  objLen          extension
+                               // ^ marker                             var buffer uint16
+                               buffer := uint16(buf[marker])
+                               buffer = buffer << 8
+                               buffer |= uint16(buf[marker+1])
+                               var extension uint8 = uint8(buffer & 0xff0 >> 4)
+                               objLen += int(extension)
+                       }
+                       bitsToBeIncremented += 8
+               }
+
+               marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+               if err != nil {
+                       return
+               }
+               oneFrameObj.ObjLen = objLen
+
+               // The rest is N-bytes of data based on the length
+               if bitsToBeIncremented == 0 {
+                       // No weird alignment needed
+                       oneFrameObj.ObjData = buf[marker : marker+objLen]
+               } else {
+                       // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
+                       // ^--------^  ^---------------------^ ^--------->
+                       //  objLen          extension            data
+                       //                          ^ marker
+                       oneFrameObj.ObjData = ShiftByteSliceLeft4Bits(buf[marker : marker+objLen+1])
+               }
+               err = oneFrameObj.Validate()
+               if err != nil {
+                       return
+               }
+               objs = append(objs, oneFrameObj)
+
+               bitsToBeIncremented += 8 * objLen
+               marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+       }
+
+       if bitsToBeIncremented > 0 {
+               halfByteRemaining = true
+       }
+       return
+}
+
+func ShiftByteSliceLeft4Bits(slice []byte) (replacement []byte) {
+       var buffer uint16
+       var i int
+       sliceLen := len(slice)
+
+       if sliceLen < 2 {
+               // Let's not shift less than 16 bits
+               return
+       }
+
+       replacement = make([]byte, sliceLen, cap(slice))
+
+       for i = 0; i < sliceLen-1; i++ {
+               // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+               // ^-----^ ^---------------^ ^-----------
+               // garbage   data byte 0       data byte 1
+               buffer = uint16(slice[i])
+               buffer = buffer << 8
+               buffer |= uint16(slice[i+1])
+               replacement[i] = uint8(buffer & 0xff0 >> 4)
+       }
+
+       if i < sliceLen {
+               lastByte := slice[sliceLen-1]
+               lastByte = lastByte << 4
+               replacement[i] = lastByte
+       }
+       return
+}
+
+// The following is used to theoretically support frameInfo ObjID extensions
+// for completeness, but they are not very efficient though
+func ShiftByteSliceRight4Bits(slice []byte) (replacement []byte) {
+       var buffer uint16
+       var i int
+       var leftovers uint8 // 4 bits only
+       var replacementUnit uint16
+       var first bool = true
+       var firstLeftovers uint8
+       var lastLeftovers uint8
+       sliceLen := len(slice)
+
+       if sliceLen < 2 {
+               // Let's not shift less than 16 bits
+               return
+       }
+
+       if slice[sliceLen-1]&0xf == 0 {
+               replacement = make([]byte, sliceLen, cap(slice))
+       } else {
+               replacement = make([]byte, sliceLen+1, cap(slice)+1)
+       }
+
+       for i = 0; i < sliceLen-1; i++ {
+               buffer = binary.BigEndian.Uint16(slice[i : i+2])
+               // (buffer)
+               // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+               // ^-------------^ ^-------------------^
+               //     data byte 0        data byte 1
+               //
+               // into
+               //
+               // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
+               // ^-----^ ^---------------^ ^--------------------^ ^----------^
+               // zeroes   data byte 0      data byte 1              zeroes
+
+               if first {
+                       // The leftover OR'ing will overwrite the first 4 bits of data byte 0. Save them
+                       firstLeftovers = uint8(buffer & 0xf000 >> 12)
+                       first = false
+               }
+               replacementUnit = 0
+               replacementUnit |= uint16(leftovers) << 12
+               replacementUnit |= (buffer & 0xff00) >> 4 // data byte 0
+               replacementUnit |= buffer & 0xff >> 4     // data byte 1 first 4 bits
+               lastLeftovers = uint8(buffer&0xf) << 4
+
+               replacement[i+1] = byte(replacementUnit)
+
+               leftovers = uint8((buffer & 0x000f) << 4)
+       }
+
+       replacement[0] = byte(uint8(replacement[0]) | firstLeftovers)
+       if lastLeftovers > 0 {
+               replacement[sliceLen] = byte(lastLeftovers)
+       }
+       return
+}
+
+func Merge2HalfByteSlices(src1, src2 []byte) (output []byte) {
+       src1Len := len(src1)
+       src2Len := len(src2)
+       output = make([]byte, src1Len+src2Len-1)
+
+       var mergeByte uint8 = src1[src1Len-1]
+       mergeByte |= uint8(src2[0])
+
+       copy(output, src1)
+       copy(output[src1Len:], src2[1:])
+
+       output[src1Len-1] = byte(mergeByte)
+
+       return
+}
index 32f4f51852fd1c73ddb8153acdbf02c0fdf74534..11f383b8ff8c867f6f01ef9df25a6d114e5e1cb9 100644 (file)
@@ -6,8 +6,10 @@ import (
 )
 
 const (
-       REQ_MAGIC = 0x80
-       RES_MAGIC = 0x81
+       REQ_MAGIC      = 0x80
+       RES_MAGIC      = 0x81
+       FLEX_MAGIC     = 0x08
+       FLEX_RES_MAGIC = 0x18
 )
 
 // CommandCode for memcached packets.
@@ -99,6 +101,8 @@ const (
        SUBDOC_GET               = CommandCode(0xc5) // Get subdoc. Returns with xattrs
        SUBDOC_MULTI_LOOKUP      = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta.
 
+       DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred
+
 )
 
 // command codes that are counted toward DCP control buffer
index 3ff67ab9a7a704deb75e3cff613d6ef517703fba..35d0fe2dafc7cb02c28e7a4c8b4a71e055352a8d 100644 (file)
@@ -25,11 +25,17 @@ type MCRequest struct {
        Extras, Key, Body, ExtMeta []byte
        // Datatype identifier
        DataType uint8
+       // len() calls are expensive - cache this in case for collection
+       Keylen int
+       // Flexible Framing Extras
+       FramingExtras []FrameInfo
+       // Stored length of incoming framing extras
+       FramingElen int
 }
 
 // Size gives the number of bytes this request requires.
 func (req *MCRequest) Size() int {
-       return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta)
+       return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta) + req.FramingElen
 }
 
 // A debugging string representation of this request
@@ -38,7 +44,23 @@ func (req MCRequest) String() string {
                req.Opcode, len(req.Body), req.Key)
 }
 
-func (req *MCRequest) fillHeaderBytes(data []byte) int {
+func (req *MCRequest) fillRegularHeaderBytes(data []byte) int {
+       //  Byte/     0       |       1       |       2       |       3       |
+       //     /              |               |               |               |
+       //    |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+       //    +---------------+---------------+---------------+---------------+
+       //   0| Magic         | Opcode        | Key length                    |
+       //    +---------------+---------------+---------------+---------------+
+       //   4| Extras length | Data type     | vbucket id                    |
+       //    +---------------+---------------+---------------+---------------+
+       //   8| Total body length                                             |
+       //    +---------------+---------------+---------------+---------------+
+       //  12| Opaque                                                        |
+       //    +---------------+---------------+---------------+---------------+
+       //  16| CAS                                                           |
+       //    |                                                               |
+       //    +---------------+---------------+---------------+---------------+
+       //    Total 24 bytes
 
        pos := 0
        data[pos] = REQ_MAGIC
@@ -84,16 +106,130 @@ func (req *MCRequest) fillHeaderBytes(data []byte) int {
                copy(data[pos:pos+len(req.Key)], req.Key)
                pos += len(req.Key)
        }
-
        return pos
 }
 
+// Returns pos and if trailing by half byte
+func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) {
+
+       //  Byte/     0       |       1       |       2       |       3       |
+       //     /              |               |               |               |
+       //    |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+       //    +---------------+---------------+---------------+---------------+
+       //   0| Magic (0x08)  | Opcode        | Framing extras| Key Length    |
+       //    +---------------+---------------+---------------+---------------+
+       //   4| Extras length | Data type     | vbucket id                    |
+       //    +---------------+---------------+---------------+---------------+
+       //   8| Total body length                                             |
+       //    +---------------+---------------+---------------+---------------+
+       //  12| Opaque                                                        |
+       //    +---------------+---------------+---------------+---------------+
+       //  16| CAS                                                           |
+       //    |                                                               |
+       //    +---------------+---------------+---------------+---------------+
+       //    Total 24 bytes
+
+       data[0] = FLEX_MAGIC
+       data[1] = byte(req.Opcode)
+       data[2] = byte(req.FramingElen)
+       data[3] = byte(req.Keylen)
+       elen := len(req.Extras)
+       data[4] = byte(elen)
+       if req.DataType != 0 {
+               data[5] = byte(req.DataType)
+       }
+       binary.BigEndian.PutUint16(data[6:8], req.VBucket)
+       binary.BigEndian.PutUint32(data[8:12],
+               uint32(len(req.Body)+req.Keylen+elen+len(req.ExtMeta)+req.FramingElen))
+       binary.BigEndian.PutUint32(data[12:16], req.Opaque)
+       if req.Cas != 0 {
+               binary.BigEndian.PutUint64(data[16:24], req.Cas)
+       }
+       pos := HDR_LEN
+
+       // Add framing infos
+       var framingExtras []byte
+       var outputBytes []byte
+       var mergeModeSrc []byte
+       var frameBytes int
+       var halfByteMode bool
+       var mergeMode bool
+       for _, frameInfo := range req.FramingExtras {
+               if !mergeMode {
+                       outputBytes, halfByteMode = frameInfo.Bytes()
+                       if !halfByteMode {
+                               framingExtras = append(framingExtras, outputBytes...)
+                               frameBytes += len(outputBytes)
+                       } else {
+                               mergeMode = true
+                               mergeModeSrc = outputBytes
+                       }
+               } else {
+                       outputBytes, halfByteMode = frameInfo.Bytes()
+                       outputBytes := ShiftByteSliceRight4Bits(outputBytes)
+                       if halfByteMode {
+                               // Previous halfbyte merge with this halfbyte will result in a complete byte
+                               mergeMode = false
+                               outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
+                               framingExtras = append(framingExtras, outputBytes...)
+                               frameBytes += len(outputBytes)
+                       } else {
+                               // Merge half byte with a non-half byte will result in a combined half-byte that will
+                               // become the source for the next iteration
+                               mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
+                       }
+               }
+       }
+
+       if mergeMode {
+               // Commit the temporary merge area into framingExtras
+               framingExtras = append(framingExtras, mergeModeSrc...)
+               frameBytes += len(mergeModeSrc)
+       }
+
+       copy(data[pos:pos+frameBytes], framingExtras)
+
+       pos += frameBytes
+
+       // Add Extras
+       if len(req.Extras) > 0 {
+               if mergeMode {
+                       outputBytes = ShiftByteSliceRight4Bits(req.Extras)
+                       data = Merge2HalfByteSlices(data, outputBytes)
+               } else {
+                       copy(data[pos:pos+elen], req.Extras)
+               }
+               pos += elen
+       }
+
+       // Add keys
+       if req.Keylen > 0 {
+               if mergeMode {
+                       outputBytes = ShiftByteSliceRight4Bits(req.Key)
+                       data = Merge2HalfByteSlices(data, outputBytes)
+               } else {
+                       copy(data[pos:pos+req.Keylen], req.Key)
+               }
+               pos += req.Keylen
+       }
+
+       return pos, mergeMode
+}
+
+func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool) {
+       if req.FramingElen == 0 {
+               return req.fillRegularHeaderBytes(data), false
+       } else {
+               return req.fillFlexHeaderBytes(data)
+       }
+}
+
 // HeaderBytes will return the wire representation of the request header
 // (with the extras and key).
 func (req *MCRequest) HeaderBytes() []byte {
-       data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key))
+       data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key)+req.FramingElen)
 
-       req.fillHeaderBytes(data)
+       req.FillHeaderBytes(data)
 
        return data
 }
@@ -102,16 +238,27 @@ func (req *MCRequest) HeaderBytes() []byte {
 func (req *MCRequest) Bytes() []byte {
        data := make([]byte, req.Size())
 
-       pos := req.fillHeaderBytes(data)
+       pos, halfByteMode := req.FillHeaderBytes(data)
+       // TODO - the halfByteMode should be revisited for a more efficient
+       // way of doing things
 
        if len(req.Body) > 0 {
-               copy(data[pos:pos+len(req.Body)], req.Body)
+               if halfByteMode {
+                       shifted := ShiftByteSliceRight4Bits(req.Body)
+                       data = Merge2HalfByteSlices(data, shifted)
+               } else {
+                       copy(data[pos:pos+len(req.Body)], req.Body)
+               }
        }
 
        if len(req.ExtMeta) > 0 {
-               copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
+               if halfByteMode {
+                       shifted := ShiftByteSliceRight4Bits(req.ExtMeta)
+                       data = Merge2HalfByteSlices(data, shifted)
+               } else {
+                       copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
+               }
        }
-
        return data
 }
 
@@ -130,40 +277,44 @@ func (req *MCRequest) Transmit(w io.Writer) (n int, err error) {
        return
 }
 
-// Receive will fill this MCRequest with the data from a reader.
-func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
-       if len(hdrBytes) < HDR_LEN {
-               hdrBytes = []byte{
-                       0, 0, 0, 0, 0, 0, 0, 0,
-                       0, 0, 0, 0, 0, 0, 0, 0,
-                       0, 0, 0, 0, 0, 0, 0, 0}
-       }
-       n, err := io.ReadFull(r, hdrBytes)
-       if err != nil {
-               return n, err
-       }
-
-       if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC {
-               return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
-       }
-
-       klen := int(binary.BigEndian.Uint16(hdrBytes[2:]))
-       elen := int(hdrBytes[4])
+func (req *MCRequest) receiveHeaderCommon(hdrBytes []byte) (elen, totalBodyLen int) {
+       elen = int(hdrBytes[4])
        // Data type at 5
        req.DataType = uint8(hdrBytes[5])
 
        req.Opcode = CommandCode(hdrBytes[1])
        // Vbucket at 6:7
        req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:])
-       totalBodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:]))
+       totalBodyLen = int(binary.BigEndian.Uint32(hdrBytes[8:]))
 
        req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:])
        req.Cas = binary.BigEndian.Uint64(hdrBytes[16:])
+       return
+}
 
+func (req *MCRequest) receiveRegHeader(hdrBytes []byte) (elen, totalBodyLen int) {
+       elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes)
+       req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]))
+       return
+}
+
+func (req *MCRequest) receiveFlexibleFramingHeader(hdrBytes []byte) (elen, totalBodyLen, framingElen int) {
+       elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes)
+
+       //       For flexible framing header, key length is a single byte at byte index 3
+       req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]) & 0x0ff)
+       // Flexible framing lengh is a single byte at index 2
+       framingElen = int(binary.BigEndian.Uint16(hdrBytes[2:]) >> 8)
+       req.FramingElen = framingElen
+       return
+}
+
+func (req *MCRequest) populateRegularBody(r io.Reader, totalBodyLen, elen int) (int, error) {
+       var m int
+       var err error
        if totalBodyLen > 0 {
                buf := make([]byte, totalBodyLen)
-               m, err := io.ReadFull(r, buf)
-               n += m
+               m, err = io.ReadFull(r, buf)
                if err == nil {
                        if req.Opcode >= TAP_MUTATION &&
                                req.Opcode <= TAP_CHECKPOINT_END &&
@@ -175,7 +326,7 @@ func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
                        }
 
                        req.Extras = buf[0:elen]
-                       req.Key = buf[elen : klen+elen]
+                       req.Key = buf[elen : req.Keylen+elen]
 
                        // get the length of extended metadata
                        extMetaLen := 0
@@ -183,15 +334,149 @@ func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
                                extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
                        }
 
-                       bodyLen := totalBodyLen - klen - elen - extMetaLen
+                       bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen
                        if bodyLen > MaxBodyLen {
-                               return n, fmt.Errorf("%d is too big (max %d)",
+                               return m, fmt.Errorf("%d is too big (max %d)",
                                        bodyLen, MaxBodyLen)
                        }
 
-                       req.Body = buf[klen+elen : klen+elen+bodyLen]
-                       req.ExtMeta = buf[klen+elen+bodyLen:]
+                       req.Body = buf[req.Keylen+elen : req.Keylen+elen+bodyLen]
+                       req.ExtMeta = buf[req.Keylen+elen+bodyLen:]
+               }
+       }
+       return m, err
+}
+
+func (req *MCRequest) populateFlexBody(r io.Reader, totalBodyLen, elen, framingElen int) (int, error) {
+       var m int
+       var err error
+       if totalBodyLen > 0 {
+               buf := make([]byte, totalBodyLen)
+               m, err = io.ReadFull(r, buf)
+               if err != nil {
+                       return m, err
+               }
+               err = req.populateFlexBodyInternal(buf, totalBodyLen, elen, framingElen)
+       }
+       return m, err
+}
+
+func (req *MCRequest) populateFlexBodyInternal(buf []byte, totalBodyLen, elen, framingElen int) error {
+       var halfByteOffset bool
+       var err error
+       if framingElen > 0 {
+               var objs []FrameInfo
+               objs, err, halfByteOffset = parseFrameInfoObjects(buf, framingElen)
+               if err != nil {
+                       return err
                }
+               req.FramingExtras = objs
+       }
+
+       err = req.populateFlexBodyAfterFrames(buf, totalBodyLen, elen, framingElen, halfByteOffset)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (req *MCRequest) populateFlexBodyAfterFrames(buf []byte, totalBodyLen, elen, framingElen int, halfByteOffset bool) error {
+       var idxCursor int = framingElen
+       if req.Opcode >= TAP_MUTATION && req.Opcode <= TAP_CHECKPOINT_END && len(buf[idxCursor:]) > 1 {
+               // In these commands there is "engine private"
+               // data at the end of the extras.  The first 2
+               // bytes of extra data give its length.
+               if !halfByteOffset {
+                       elen += int(binary.BigEndian.Uint16(buf[idxCursor:]))
+               } else {
+                       // 0 1 2 3 4 .... 19 20 21 22 ... 32
+                       // ^-----^ ^-------^  ^------------^
+                       //  offset   data       do not care
+                       var buffer uint32 = binary.BigEndian.Uint32(buf[idxCursor:])
+                       buffer &= 0xffff000
+                       elen += int(buffer >> 12)
+               }
+       }
+
+       // Get the extras
+       if !halfByteOffset {
+               req.Extras = buf[idxCursor : idxCursor+elen]
+       } else {
+               preShift := buf[idxCursor : idxCursor+elen+1]
+               req.Extras = ShiftByteSliceLeft4Bits(preShift)
+       }
+       idxCursor += elen
+
+       // Get the Key
+       if !halfByteOffset {
+               req.Key = buf[idxCursor : idxCursor+req.Keylen]
+       } else {
+               preShift := buf[idxCursor : idxCursor+req.Keylen+1]
+               req.Key = ShiftByteSliceLeft4Bits(preShift)
+       }
+       idxCursor += req.Keylen
+
+       // get the length of extended metadata
+       extMetaLen := 0
+       if elen > 29 {
+               extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
+       }
+       idxCursor += extMetaLen
+
+       bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen - framingElen
+       if bodyLen > MaxBodyLen {
+               return fmt.Errorf("%d is too big (max %d)",
+                       bodyLen, MaxBodyLen)
+       }
+
+       if !halfByteOffset {
+               req.Body = buf[idxCursor : idxCursor+bodyLen]
+               idxCursor += bodyLen
+       } else {
+               preShift := buf[idxCursor : idxCursor+bodyLen+1]
+               req.Body = ShiftByteSliceLeft4Bits(preShift)
+               idxCursor += bodyLen
+       }
+
+       if extMetaLen > 0 {
+               if !halfByteOffset {
+                       req.ExtMeta = buf[idxCursor:]
+               } else {
+                       preShift := buf[idxCursor:]
+                       req.ExtMeta = ShiftByteSliceLeft4Bits(preShift)
+               }
+       }
+
+       return nil
+}
+
+// Receive will fill this MCRequest with the data from a reader.
+func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
+       if len(hdrBytes) < HDR_LEN {
+               hdrBytes = []byte{
+                       0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0, 0, 0, 0, 0, 0, 0}
+       }
+       n, err := io.ReadFull(r, hdrBytes)
+       if err != nil {
+               fmt.Printf("Err %v\n", err)
+               return n, err
+       }
+
+       switch hdrBytes[0] {
+       case RES_MAGIC:
+               fallthrough
+       case REQ_MAGIC:
+               elen, totalBodyLen := req.receiveRegHeader(hdrBytes)
+               bodyRead, err := req.populateRegularBody(r, totalBodyLen, elen)
+               return n + bodyRead, err
+       case FLEX_MAGIC:
+               elen, totalBodyLen, framingElen := req.receiveFlexibleFramingHeader(hdrBytes)
+               bodyRead, err := req.populateFlexBody(r, totalBodyLen, elen, framingElen)
+               return n + bodyRead, err
+       default:
+               return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
        }
-       return n, err
 }
index 2b4cfe13495eff5685cefc00d03a2fa961d02c04..f6be989847e7086b6b29bf58835fdc44fd0937a5 100644 (file)
@@ -153,6 +153,13 @@ func (res *MCResponse) Transmit(w io.Writer) (n int, err error) {
 
 // Receive will fill this MCResponse with the data from this reader.
 func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) {
+       return res.ReceiveWithBuf(r, hdrBytes, nil)
+}
+
+// ReceiveWithBuf takes an optional pre-allocated []byte buf which
+// will be used if its capacity is large enough, otherwise a new
+// []byte slice is allocated.
+func (res *MCResponse) ReceiveWithBuf(r io.Reader, hdrBytes, buf []byte) (n int, err error) {
        if len(hdrBytes) < HDR_LEN {
                hdrBytes = []byte{
                        0, 0, 0, 0, 0, 0, 0, 0,
@@ -187,7 +194,13 @@ func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error)
                }
        }()
 
-       buf := make([]byte, klen+elen+bodyLen)
+       bufNeed := klen + elen + bodyLen
+       if buf != nil && cap(buf) >= bufNeed {
+               buf = buf[0:bufNeed]
+       } else {
+               buf = make([]byte, bufNeed)
+       }
+
        m, err := io.ReadFull(r, buf)
        if err == nil {
                res.Extras = buf[0:elen]
index a572e246e6351c1a052602fd78ba58d152172568..e06d2081865a766a8668acc12878f98b27fc9ea0 100644 (file)
-COUCHBASE INC. COMMUNITY EDITION LICENSE AGREEMENT
-
-IMPORTANT-READ CAREFULLY: BY CLICKING THE "I ACCEPT" BOX OR INSTALLING,
-DOWNLOADING OR OTHERWISE USING THIS SOFTWARE AND ANY ASSOCIATED
-DOCUMENTATION, YOU, ON BEHALF OF YOURSELF OR AS AN AUTHORIZED
-REPRESENTATIVE ON BEHALF OF AN ENTITY ("LICENSEE") AGREE TO ALL THE
-TERMS OF THIS COMMUNITY EDITION LICENSE AGREEMENT (THE "AGREEMENT")
-REGARDING YOUR USE OF THE SOFTWARE.  YOU REPRESENT AND WARRANT THAT YOU
-HAVE FULL LEGAL AUTHORITY TO BIND THE LICENSEE TO THIS AGREEMENT. IF YOU
-DO NOT AGREE WITH ALL OF THESE TERMS, DO NOT SELECT THE "I ACCEPT" BOX
-AND DO NOT INSTALL, DOWNLOAD OR OTHERWISE USE THE SOFTWARE. THE
-EFFECTIVE DATE OF THIS AGREEMENT IS THE DATE ON WHICH YOU CLICK "I
-ACCEPT" OR OTHERWISE INSTALL, DOWNLOAD OR USE THE SOFTWARE.
-
-1. License Grant. Couchbase Inc. hereby grants Licensee, free of charge,
-the non-exclusive right to use, copy, merge, publish, distribute,
-sublicense, and/or sell copies of the Software, and to permit persons to
-whom the Software is furnished to do so, subject to Licensee including
-the following copyright notice in all copies or substantial portions of
-the Software:
-
-Couchbase (r) http://www.Couchbase.com Copyright 2016 Couchbase, Inc.
-
-As used in this Agreement, "Software" means the object code version of
-the applicable elastic data management server software provided by
-Couchbase Inc.
-
-2. Restrictions. Licensee will not reverse engineer, disassemble, or
-decompile the Software (except to the extent such restrictions are
-prohibited by law).
-
-3. Support. Couchbase, Inc. will provide Licensee with access to, and
-use of, the Couchbase, Inc. support forum available at the following
-URL: http://www.couchbase.org/forums/. Couchbase, Inc. may, at its
-discretion, modify, suspend or terminate support at any time upon notice
-to Licensee.
-
-4. Warranty Disclaimer and Limitation of Liability. THE SOFTWARE IS
-PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
-INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
-COUCHBASE INC. OR THE AUTHORS OR COPYRIGHT HOLDERS IN THE SOFTWARE BE
-LIABLE FOR ANY CLAIM, DAMAGES (IINCLUDING, WITHOUT LIMITATION, DIRECT,
-INDIRECT OR CONSEQUENTIAL DAMAGES) OR OTHER LIABILITY, WHETHER IN AN
-ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
index 92657bb6b5fe8a8c56183c7110420bc7cb73116d..920dbf864047b724f43a3bd4b3cc19afdcbf6759 100644 (file)
@@ -99,10 +99,10 @@ github.com/boombuler/barcode/qr
 github.com/boombuler/barcode/utils
 # github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668
 github.com/bradfitz/gomemcache/memcache
-# github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d
+# github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2
 github.com/couchbase/gomemcached
 github.com/couchbase/gomemcached/client
-# github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b
+# github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85
 github.com/couchbase/goutils/logging
 github.com/couchbase/goutils/scramsha
 # github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd