github.com/blevesearch/go-porterstemmer v1.0.2 // indirect
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/boombuler/barcode v0.0.0-20161226211916-fe0f26ff6d26 // indirect
+ github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 // indirect
+ github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 // indirect
github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd // indirect
github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d h1:XMf4E1U+b9E3ElF0mjvfXZdflBRZz4gLp16nQ/QSHQM=
github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
+github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 h1:vZryARwW4PSFXd9arwegEywvMTvPuXL3/oa+4L5NTe8=
+github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b h1:bZ9rKU2/V8sY+NulSfxDOnXTWcs1rySqdF1sVepihvo=
github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
+github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 h1:0WMIDtuXCKEm4wtAJgAAXa/qtM5O9MariLwgHaRlYmk=
+github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd h1:zeuJhcG3f8eePshH3KxkNE+Xtl53pVln9MOUPMyr/1w=
github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd/go.mod h1:xbc8Ff/oG7h2ejd7AlwOpfd+6QZntc92ygpAOfGwcKY=
github.com/couchbaselabs/go-couchbase v0.0.0-20190708161019-23e7ca2ce2b7 h1:1XjEY/gnjQ+AfXef2U6dxCquhiRzkEpxZuWqs+QxTL8=
--- /dev/null
+package memcached
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+// Collection based filter
+type CollectionsFilter struct {
+ ManifestUid uint64
+ UseManifestUid bool
+ StreamId uint16
+ UseStreamId bool
+
+ // Use either ScopeId OR CollectionsList, not both
+ CollectionsList []uint32
+ ScopeId uint32
+}
+
+type nonStreamIdNonResumeScopeMeta struct {
+ ScopeId string `json:"scope"`
+}
+
+type nonStreamIdResumeScopeMeta struct {
+ ManifestId string `json:"uid"`
+}
+
+type nonStreamIdNonResumeCollectionsMeta struct {
+ CollectionsList []string `json:"collections"`
+}
+
+type nonStreamIdResumeCollectionsMeta struct {
+ ManifestId string `json:"uid"`
+ CollectionsList []string `json:"collections"`
+}
+
+type streamIdNonResumeCollectionsMeta struct {
+ CollectionsList []string `json:"collections"`
+ StreamId uint16 `json:"sid"`
+}
+
+type streamIdNonResumeScopeMeta struct {
+ ScopeId string `json:"scope"`
+ StreamId uint16 `json:"sid"`
+}
+
+func (c *CollectionsFilter) IsValid() error {
+ if c.UseManifestUid {
+ return fmt.Errorf("Not implemented yet")
+ }
+
+ if len(c.CollectionsList) > 0 && c.ScopeId > 0 {
+ return fmt.Errorf("Collection list is specified but scope ID is also specified")
+ }
+
+ return nil
+}
+
+func (c *CollectionsFilter) outputCollectionsFilterColList() (outputList []string) {
+ for _, collectionUint := range c.CollectionsList {
+ outputList = append(outputList, fmt.Sprintf("%x", collectionUint))
+ }
+ return
+}
+
+func (c *CollectionsFilter) outputScopeId() string {
+ return fmt.Sprintf("%x", c.ScopeId)
+}
+
+func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) {
+ if err := c.IsValid(); err != nil {
+ return nil, err
+ }
+
+ var output interface{}
+
+ switch c.UseStreamId {
+ case true:
+ switch c.UseManifestUid {
+ case true:
+ // TODO
+ return nil, fmt.Errorf("NotImplemented0")
+ case false:
+ switch len(c.CollectionsList) > 0 {
+ case true:
+ filter := &streamIdNonResumeCollectionsMeta{
+ StreamId: c.StreamId,
+ CollectionsList: c.outputCollectionsFilterColList(),
+ }
+ output = *filter
+ case false:
+ filter := &streamIdNonResumeScopeMeta{
+ StreamId: c.StreamId,
+ ScopeId: c.outputScopeId(),
+ }
+ output = *filter
+ }
+ }
+ case false:
+ switch c.UseManifestUid {
+ case true:
+ // TODO
+ return nil, fmt.Errorf("NotImplemented1")
+ case false:
+ switch len(c.CollectionsList) > 0 {
+ case true:
+ filter := &nonStreamIdNonResumeCollectionsMeta{
+ CollectionsList: c.outputCollectionsFilterColList(),
+ }
+ output = *filter
+ case false:
+ output = nonStreamIdNonResumeScopeMeta{ScopeId: c.outputScopeId()}
+ }
+ }
+ }
+
+ data, err := json.Marshal(output)
+ if err != nil {
+ return nil, err
+ } else {
+ return data, nil
+ }
+}
CASNext(vb uint16, k string, exp int, state *CASState) bool
CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
+ CollectionEnabled() bool
Close() error
Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
Del(vb uint16, key string) (*gomemcached.MCResponse, error)
EnableMutationToken() (*gomemcached.MCResponse, error)
+ EnableFeatures(features Features) (*gomemcached.MCResponse, error)
Get(vb uint16, key string) (*gomemcached.MCResponse, error)
GetCollectionsManifest() (*gomemcached.MCResponse, error)
GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error)
type Features []Feature
type Feature uint16
-const FeatureMutationToken = Feature(0x04)
+const FeatureTcpNoDelay = Feature(0x03)
+const FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations
const FeatureXattr = Feature(0x06)
+const FeatureXerror = Feature(0x07)
const FeatureCollections = Feature(0x12)
+const FeatureSnappyCompression = Feature(0x0a)
const FeatureDataType = Feature(0x0b)
type memcachedConnection interface {
opaque uint32
hdrBuf []byte
+
+ featureMtx sync.RWMutex
+ sentHeloFeatures Features
}
var (
binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
}
+ c.featureMtx.Lock()
+ c.sentHeloFeatures = features
+ c.featureMtx.Unlock()
+
return c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.HELLO,
Key: []byte("GoMemcached"),
return res, nil
}
+func (c *Client) CollectionEnabled() bool {
+ c.featureMtx.RLock()
+ defer c.featureMtx.RUnlock()
+
+ for _, feature := range c.sentHeloFeatures {
+ if feature == FeatureCollections {
+ return true
+ }
+ }
+ return false
+}
+
// Get the value for a key, and update expiry
func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) {
extraBuf := make([]byte, 4)
response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
}
+
+func (c *Client) Conn() io.ReadWriteCloser {
+ return c.conn
+}
--- /dev/null
+package memcached
+
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/couchbase/gomemcached"
+ "math"
+)
+
+type SystemEventType int
+
+const InvalidSysEvent SystemEventType = -1
+
+const (
+ CollectionCreate SystemEventType = 0
+ CollectionDrop SystemEventType = iota
+ CollectionFlush SystemEventType = iota // KV did not implement
+ ScopeCreate SystemEventType = iota
+ ScopeDrop SystemEventType = iota
+ CollectionChanged SystemEventType = iota
+)
+
+type ScopeCreateEvent interface {
+ GetSystemEventName() (string, error)
+ GetScopeId() (uint32, error)
+ GetManifestId() (uint64, error)
+}
+
+type CollectionCreateEvent interface {
+ GetSystemEventName() (string, error)
+ GetScopeId() (uint32, error)
+ GetCollectionId() (uint32, error)
+ GetManifestId() (uint64, error)
+ GetMaxTTL() (uint32, error)
+}
+
+type CollectionDropEvent interface {
+ GetScopeId() (uint32, error)
+ GetCollectionId() (uint32, error)
+ GetManifestId() (uint64, error)
+}
+
+type ScopeDropEvent interface {
+ GetScopeId() (uint32, error)
+ GetManifestId() (uint64, error)
+}
+
+type CollectionChangedEvent interface {
+ GetCollectionId() (uint32, error)
+ GetManifestId() (uint64, error)
+ GetMaxTTL() (uint32, error)
+}
+
+var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
+var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
+var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
+var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")
+
+// UprEvent memcached events for UPR streams.
+type UprEvent struct {
+ Opcode gomemcached.CommandCode // Type of event
+ Status gomemcached.Status // Response status
+ VBucket uint16 // VBucket this event applies to
+ DataType uint8 // data type
+ Opaque uint16 // 16 MSB of opaque
+ VBuuid uint64 // This field is set by downstream
+ Flags uint32 // Item flags
+ Expiry uint32 // Item expiration time
+ Key, Value []byte // Item key/value
+ OldValue []byte // TODO: TBD: old document value
+ Cas uint64 // CAS value of the item
+ Seqno uint64 // sequence number of the mutation
+ RevSeqno uint64 // rev sequence number : deletions
+ LockTime uint32 // Lock time
+ MetadataSize uint16 // Metadata size
+ SnapstartSeq uint64 // start sequence number of this snapshot
+ SnapendSeq uint64 // End sequence number of the snapshot
+ SnapshotType uint32 // 0: disk 1: memory
+ FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
+ Error error // Error value in case of a failure
+ ExtMeta []byte // Extended Metadata
+ AckSize uint32 // The number of bytes that can be Acked to DCP
+ SystemEvent SystemEventType // Only valid if IsSystemEvent() is true
+ SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different
+ ValueLen int // Cache it to avoid len() calls for performance
+ CollectionId uint64 // Valid if Collection is in use
+}
+
+// FailoverLog containing vvuid and sequnce number
+type FailoverLog [][2]uint64
+
+func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
+ event := &UprEvent{
+ Opcode: rq.Opcode,
+ VBucket: stream.Vbucket,
+ VBuuid: stream.Vbuuid,
+ Value: rq.Body,
+ Cas: rq.Cas,
+ ExtMeta: rq.ExtMeta,
+ DataType: rq.DataType,
+ ValueLen: len(rq.Body),
+ SystemEvent: InvalidSysEvent,
+ CollectionId: math.MaxUint64,
+ }
+
+ event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType)
+
+ // set AckSize for events that need to be acked to DCP,
+ // i.e., events with CommandCodes that need to be buffered in DCP
+ if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
+ event.AckSize = uint32(bytesReceivedFromDCP)
+ }
+
+ // 16 LSBits are used by client library to encode vbucket number.
+ // 16 MSBits are left for application to multiplex on opaque value.
+ event.Opaque = appOpaque(rq.Opaque)
+
+ if len(rq.Extras) >= uprMutationExtraLen &&
+ event.Opcode == gomemcached.UPR_MUTATION {
+
+ event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
+ event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
+ event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
+ event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
+
+ } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
+ event.Opcode == gomemcached.UPR_DELETION {
+
+ event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
+
+ } else if len(rq.Extras) >= uprDeletetionExtraLen &&
+ event.Opcode == gomemcached.UPR_DELETION ||
+ event.Opcode == gomemcached.UPR_EXPIRATION {
+
+ event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
+
+ } else if len(rq.Extras) >= uprSnapshotExtraLen &&
+ event.Opcode == gomemcached.UPR_SNAPSHOT {
+
+ event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
+ event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
+ event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
+ } else if event.IsSystemEvent() {
+ event.PopulateEvent(rq.Extras)
+ }
+
+ return event
+}
+
+func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) {
+ switch streamType {
+ case CollectionsNonStreamId:
+ switch rq.Opcode {
+ // Only these will have CID encoded within the key
+ case gomemcached.UPR_MUTATION,
+ gomemcached.UPR_DELETION,
+ gomemcached.UPR_EXPIRATION:
+ uleb128 := Uleb128(rq.Key)
+ result, bytesShifted := uleb128.ToUint64(rq.Keylen)
+ event.CollectionId = result
+ event.Key = rq.Key[bytesShifted:]
+ default:
+ event.Key = rq.Key
+ }
+ case CollectionsStreamId:
+ // TODO - not implemented
+ fallthrough
+ case NonCollectionStream:
+ // Let default behavior be legacy stream type
+ fallthrough
+ default:
+ event.Key = rq.Key
+ }
+}
+
+func (event *UprEvent) String() string {
+ name := gomemcached.CommandNames[event.Opcode]
+ if name == "" {
+ name = fmt.Sprintf("#%d", event.Opcode)
+ }
+ return name
+}
+
+func (event *UprEvent) IsSnappyDataType() bool {
+ return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
+}
+
+func (event *UprEvent) IsCollectionType() bool {
+ return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32
+}
+
+func (event *UprEvent) IsSystemEvent() bool {
+ return event.Opcode == gomemcached.DCP_SYSTEM_EVENT
+}
+
+func (event *UprEvent) PopulateEvent(extras []byte) {
+ if len(extras) < dcpSystemEventExtraLen {
+ // Wrong length, don't parse
+ return
+ }
+ event.Seqno = binary.BigEndian.Uint64(extras[:8])
+ event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12]))
+ var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14])
+ event.SysEventVersion = uint8(versionTemp >> 8)
+}
+
+func (event *UprEvent) GetSystemEventName() (string, error) {
+ switch event.SystemEvent {
+ case CollectionCreate:
+ fallthrough
+ case ScopeCreate:
+ return string(event.Key), nil
+ default:
+ return "", ErrorInvalidOp
+ }
+}
+
+func (event *UprEvent) GetManifestId() (uint64, error) {
+ switch event.SystemEvent {
+ // Version 0 only checks
+ case CollectionChanged:
+ fallthrough
+ case ScopeDrop:
+ fallthrough
+ case ScopeCreate:
+ fallthrough
+ case CollectionDrop:
+ if event.SysEventVersion > 0 {
+ return 0, ErrorInvalidVersion
+ }
+ fallthrough
+ case CollectionCreate:
+ // CollectionCreate supports version 1
+ if event.SysEventVersion > 1 {
+ return 0, ErrorInvalidVersion
+ }
+ if event.ValueLen < 8 {
+ return 0, ErrorValueTooShort
+ }
+ return binary.BigEndian.Uint64(event.Value[0:8]), nil
+ default:
+ return 0, ErrorInvalidOp
+ }
+}
+
+func (event *UprEvent) GetCollectionId() (uint32, error) {
+ switch event.SystemEvent {
+ case CollectionDrop:
+ if event.SysEventVersion > 0 {
+ return 0, ErrorInvalidVersion
+ }
+ fallthrough
+ case CollectionCreate:
+ if event.SysEventVersion > 1 {
+ return 0, ErrorInvalidVersion
+ }
+ if event.ValueLen < 16 {
+ return 0, ErrorValueTooShort
+ }
+ return binary.BigEndian.Uint32(event.Value[12:16]), nil
+ case CollectionChanged:
+ if event.SysEventVersion > 0 {
+ return 0, ErrorInvalidVersion
+ }
+ if event.ValueLen < 12 {
+ return 0, ErrorValueTooShort
+ }
+ return binary.BigEndian.Uint32(event.Value[8:12]), nil
+ default:
+ return 0, ErrorInvalidOp
+ }
+}
+
+func (event *UprEvent) GetScopeId() (uint32, error) {
+ switch event.SystemEvent {
+ // version 0 checks
+ case ScopeCreate:
+ fallthrough
+ case ScopeDrop:
+ fallthrough
+ case CollectionDrop:
+ if event.SysEventVersion > 0 {
+ return 0, ErrorInvalidVersion
+ }
+ fallthrough
+ case CollectionCreate:
+ // CollectionCreate could be either 0 or 1
+ if event.SysEventVersion > 1 {
+ return 0, ErrorInvalidVersion
+ }
+ if event.ValueLen < 12 {
+ return 0, ErrorValueTooShort
+ }
+ return binary.BigEndian.Uint32(event.Value[8:12]), nil
+ default:
+ return 0, ErrorInvalidOp
+ }
+}
+
+func (event *UprEvent) GetMaxTTL() (uint32, error) {
+ switch event.SystemEvent {
+ case CollectionCreate:
+ if event.SysEventVersion < 1 {
+ return 0, ErrorNoMaxTTL
+ }
+ if event.ValueLen < 20 {
+ return 0, ErrorValueTooShort
+ }
+ return binary.BigEndian.Uint32(event.Value[16:20]), nil
+ case CollectionChanged:
+ if event.SysEventVersion > 0 {
+ return 0, ErrorInvalidVersion
+ }
+ if event.ValueLen < 16 {
+ return 0, ErrorValueTooShort
+ }
+ return binary.BigEndian.Uint32(event.Value[12:16]), nil
+ default:
+ return 0, ErrorInvalidOp
+ }
+}
+
+type Uleb128 []byte
+
+func (u Uleb128) ToUint64(cachedLen int) (result uint64, bytesShifted int) {
+ var shift uint = 0
+
+ for curByte := 0; curByte < cachedLen; curByte++ {
+ oneByte := u[curByte]
+ last7Bits := 0x7f & oneByte
+ result |= uint64(last7Bits) << shift
+ bytesShifted++
+ if oneByte&0x80 == 0 {
+ break
+ }
+ shift += 7
+ }
+
+ return
+}
const uprDeletetionExtraLen = 18
const uprDeletetionWithDeletionTimeExtraLen = 21
const uprSnapshotExtraLen = 20
+const dcpSystemEventExtraLen = 13
const bufferAckThreshold = 0.2
const opaqueOpen = 0xBEAF0001
const opaqueFailover = 0xDEADBEEF
// Counter on top of opaqueOpen that others can draw from for open and control msgs
var opaqueOpenCtrlWell uint32 = opaqueOpen
-// UprEvent memcached events for UPR streams.
-type UprEvent struct {
- Opcode gomemcached.CommandCode // Type of event
- Status gomemcached.Status // Response status
- VBucket uint16 // VBucket this event applies to
- DataType uint8 // data type
- Opaque uint16 // 16 MSB of opaque
- VBuuid uint64 // This field is set by downstream
- Flags uint32 // Item flags
- Expiry uint32 // Item expiration time
- Key, Value []byte // Item key/value
- OldValue []byte // TODO: TBD: old document value
- Cas uint64 // CAS value of the item
- Seqno uint64 // sequence number of the mutation
- RevSeqno uint64 // rev sequence number : deletions
- LockTime uint32 // Lock time
- MetadataSize uint16 // Metadata size
- SnapstartSeq uint64 // start sequence number of this snapshot
- SnapendSeq uint64 // End sequence number of the snapshot
- SnapshotType uint32 // 0: disk 1: memory
- FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
- Error error // Error value in case of a failure
- ExtMeta []byte
- AckSize uint32 // The number of bytes that can be Acked to DCP
-}
-
type PriorityType string
// high > medium > disabled > low
PriorityHigh PriorityType = "high"
)
+type DcpStreamType int32
+
+var UninitializedStream DcpStreamType = -1
+
+const (
+ NonCollectionStream DcpStreamType = 0
+ CollectionsNonStreamId DcpStreamType = iota
+ CollectionsStreamId DcpStreamType = iota
+)
+
+func (t DcpStreamType) String() string {
+ switch t {
+ case UninitializedStream:
+ return "Un-Initialized Stream"
+ case NonCollectionStream:
+ return "Traditional Non-Collection Stream"
+ case CollectionsNonStreamId:
+ return "Collections Stream without StreamID"
+ case CollectionsStreamId:
+ return "Collection Stream with StreamID"
+ default:
+ return "Unknown Stream Type"
+ }
+}
+
// UprStream is per stream data structure over an UPR Connection.
type UprStream struct {
- Vbucket uint16 // Vbucket id
- Vbuuid uint64 // vbucket uuid
- StartSeq uint64 // start sequence number
- EndSeq uint64 // end sequence number
- connected bool
+ Vbucket uint16 // Vbucket id
+ Vbuuid uint64 // vbucket uuid
+ StartSeq uint64 // start sequence number
+ EndSeq uint64 // end sequence number
+ connected bool
+ StreamType DcpStreamType
}
type FeedState int
IncludeDeletionTime bool
DcpPriority PriorityType
EnableExpiry bool
+ EnableStreamId bool
}
/**
// if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP
// if flag is false, upr feed will track how many bytes it has sent to client
// and use that to determine whether/when to send ack to DCP
- ackByClient bool
- feedState FeedState
- muFeedState sync.RWMutex
+ ackByClient bool
+ feedState FeedState
+ muFeedState sync.RWMutex
+ activatedFeatures UprFeatures
+ collectionEnabled bool // This is needed separately because parsing depends on this
+ // DCP StreamID allows multiple filtered collection streams to share a single DCP Stream
+ // It is not allowed once a regular/legacy stream was started originally
+ streamsType DcpStreamType
+ initStreamTypeOnce sync.Once
}
// Exported interface - to allow for mocking
UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
// Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response
SetPriorityAsync(p PriorityType) error
+
+ // Various Collection-Type RequestStreams
+ UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error
}
type UprStats struct {
TotalSnapShot uint64
}
-// FailoverLog containing vvuid and sequnce number
-type FailoverLog [][2]uint64
-
// error codes
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
return vbuuid, seqno, ErrorInvalidLog
}
-func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
- event := &UprEvent{
- Opcode: rq.Opcode,
- VBucket: stream.Vbucket,
- VBuuid: stream.Vbuuid,
- Key: rq.Key,
- Value: rq.Body,
- Cas: rq.Cas,
- ExtMeta: rq.ExtMeta,
- DataType: rq.DataType,
- }
-
- // set AckSize for events that need to be acked to DCP,
- // i.e., events with CommandCodes that need to be buffered in DCP
- if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
- event.AckSize = uint32(bytesReceivedFromDCP)
- }
-
- // 16 LSBits are used by client library to encode vbucket number.
- // 16 MSBits are left for application to multiplex on opaque value.
- event.Opaque = appOpaque(rq.Opaque)
-
- if len(rq.Extras) >= uprMutationExtraLen &&
- event.Opcode == gomemcached.UPR_MUTATION {
-
- event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
- event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
- event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
- event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
- event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
-
- } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
- event.Opcode == gomemcached.UPR_DELETION {
-
- event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
- event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
-
- } else if len(rq.Extras) >= uprDeletetionExtraLen &&
- event.Opcode == gomemcached.UPR_DELETION ||
- event.Opcode == gomemcached.UPR_EXPIRATION {
-
- event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
- event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
-
- } else if len(rq.Extras) >= uprSnapshotExtraLen &&
- event.Opcode == gomemcached.UPR_SNAPSHOT {
-
- event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
- event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
- event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
- }
-
- return event
-}
-
-func (event *UprEvent) String() string {
- name := gomemcached.CommandNames[event.Opcode]
- if name == "" {
- name = fmt.Sprintf("#%d", event.Opcode)
- }
- return name
-}
-
-func (event *UprEvent) IsSnappyDataType() bool {
- return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
-}
-
func (feed *UprFeed) sendCommands(mc *Client) {
transmitCh := feed.transmitCh
transmitCl := feed.transmitCl
feed.muVbstreams.Lock()
defer feed.muVbstreams.Unlock()
+ if feed.collectionEnabled {
+ stream.StreamType = feed.streamsType
+ }
+
// Set this stream as the officially connected stream for this vb
stream.connected = true
feed.vbstreams[vbno] = stream
}
func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) {
-
feed := &UprFeed{
- conn: mc,
- closer: make(chan bool, 1),
- vbstreams: make(map[uint16]*UprStream),
- transmitCh: make(chan *gomemcached.MCRequest),
- transmitCl: make(chan bool),
- ackByClient: ackByClient,
+ conn: mc,
+ closer: make(chan bool, 1),
+ vbstreams: make(map[uint16]*UprStream),
+ transmitCh: make(chan *gomemcached.MCRequest),
+ transmitCl: make(chan bool),
+ ackByClient: ackByClient,
+ collectionEnabled: mc.CollectionEnabled(),
+ streamsType: UninitializedStream,
}
feed.negotiator.initialize()
activatedFeatures.EnableExpiry = true
}
+ if features.EnableStreamId {
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.UPR_CONTROL,
+ Key: []byte("enable_stream_id"),
+ Body: []byte("true"),
+ Opaque: getUprOpenCtrlOpaque(),
+ }
+ err = sendMcRequestSync(feed.conn, rq)
+ if err != nil {
+ return
+ }
+ activatedFeatures.EnableStreamId = true
+ }
+
// everything is ok so far, set upr feed to open state
+ feed.activatedFeatures = activatedFeatures
feed.setOpen()
return
}
func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
+ return feed.UprRequestCollectionsStream(vbno, opaqueMSB, flags, vuuid, startSequence, endSequence, snapStart, snapEnd, nil)
+}
+
+func (feed *UprFeed) initStreamType(filter *CollectionsFilter) (err error) {
+ if filter != nil && filter.UseStreamId && !feed.activatedFeatures.EnableStreamId {
+ err = fmt.Errorf("Cannot use streamID based filter if the feed was not started with the streamID feature")
+ return
+ }
+
+ streamInitFunc := func() {
+ if feed.streamsType != UninitializedStream {
+ // Shouldn't happen
+ err = fmt.Errorf("The current feed has already been started in %v mode", feed.streamsType.String())
+ } else {
+ if !feed.collectionEnabled {
+ feed.streamsType = NonCollectionStream
+ } else {
+ if filter != nil && filter.UseStreamId {
+ feed.streamsType = CollectionsStreamId
+ } else {
+ feed.streamsType = CollectionsNonStreamId
+ }
+ }
+ }
+ }
+ feed.initStreamTypeOnce.Do(streamInitFunc)
+ return
+}
+
+func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32,
+ vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error {
+
+ err := feed.initStreamType(filter)
+ if err != nil {
+ return err
+ }
+
+ var mcRequestBody []byte
+ if filter != nil {
+ err = filter.IsValid()
+ if err != nil {
+ return err
+ }
+ mcRequestBody, err = filter.ToStreamReqBody()
+ if err != nil {
+ return err
+ }
+ }
+
rq := &gomemcached.MCRequest{
Opcode: gomemcached.UPR_STREAMREQ,
VBucket: vbno,
Opaque: composeOpaque(vbno, opaqueMSB),
+ Body: mcRequestBody,
}
rq.Extras = make([]byte, 48) // #Extras
binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
- binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid)
+ binary.BigEndian.PutUint64(rq.Extras[24:32], vbuuid)
binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
- feed.negotiator.registerRequest(vbno, opaqueMSB, vuuid, startSequence, endSequence)
+ feed.negotiator.registerRequest(vbno, opaqueMSB, vbuuid, startSequence, endSequence)
// Any client that has ever called this method, regardless of return code,
// should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit.
- if err := feed.conn.Transmit(rq); err != nil {
+ if err = feed.conn.Transmit(rq); err != nil {
logging.Errorf("Error in StreamRequest %s", err.Error())
// If an error occurs during transmit, then the UPRFeed will keep the stream
// in the vbstreams map. This is to prevent nil lookup from any previously
if err := feed.conn.TransmitResponse(noop); err != nil {
logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error())
}
+ case gomemcached.DCP_SYSTEM_EVENT:
+ if stream == nil {
+ logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
+ break loop
+ }
+ event = makeUprEvent(pkt, stream, bytes)
default:
logging.Infof("Recived an unknown response for vbucket %d", vb)
}
--- /dev/null
+package gomemcached
+
+import (
+ "encoding/binary"
+ "fmt"
+)
+
+type FrameObjType int
+
+const (
+ FrameBarrier FrameObjType = iota
+ FrameDurability FrameObjType = iota
+ FrameDcpStreamId FrameObjType = iota
+ FrameOpenTracing FrameObjType = iota
+)
+
+type FrameInfo struct {
+ ObjId FrameObjType
+ ObjLen int
+ ObjData []byte
+}
+
+var ErrorInvalidOp error = fmt.Errorf("Specified method is not applicable")
+var ErrorObjLenNotMatch error = fmt.Errorf("Object length does not match data")
+
+func (f *FrameInfo) Validate() error {
+ switch f.ObjId {
+ case FrameBarrier:
+ if f.ObjLen != 0 {
+ return fmt.Errorf("Invalid FrameBarrier - length is %v\n", f.ObjLen)
+ } else if f.ObjLen != len(f.ObjData) {
+ return ErrorObjLenNotMatch
+ }
+ case FrameDurability:
+ if f.ObjLen != 1 && f.ObjLen != 3 {
+ return fmt.Errorf("Invalid FrameDurability - length is %v\n", f.ObjLen)
+ } else if f.ObjLen != len(f.ObjData) {
+ return ErrorObjLenNotMatch
+ }
+ case FrameDcpStreamId:
+ if f.ObjLen != 2 {
+ return fmt.Errorf("Invalid FrameDcpStreamId - length is %v\n", f.ObjLen)
+ } else if f.ObjLen != len(f.ObjData) {
+ return ErrorObjLenNotMatch
+ }
+ case FrameOpenTracing:
+ if f.ObjLen == 0 {
+ return fmt.Errorf("Invalid FrameOpenTracing - length must be > 0")
+ } else if f.ObjLen != len(f.ObjData) {
+ return ErrorObjLenNotMatch
+ }
+ default:
+ return fmt.Errorf("Unknown FrameInfo type")
+ }
+ return nil
+}
+
+func (f *FrameInfo) GetStreamId() (uint16, error) {
+ if f.ObjId != FrameDcpStreamId {
+ return 0, ErrorInvalidOp
+ }
+
+ var output uint16
+ output = uint16(f.ObjData[0])
+ output = output << 8
+ output |= uint16(f.ObjData[1])
+ return output, nil
+}
+
+type DurabilityLvl uint8
+
+const (
+ DuraInvalid DurabilityLvl = iota // Not used (0x0)
+ DuraMajority DurabilityLvl = iota // (0x01)
+ DuraMajorityAndPersistOnMaster DurabilityLvl = iota // (0x02)
+ DuraPersistToMajority DurabilityLvl = iota // (0x03)
+)
+
+func (f *FrameInfo) GetDurabilityRequirements() (lvl DurabilityLvl, timeoutProvided bool, timeoutMs uint16, err error) {
+ if f.ObjId != FrameDurability {
+ err = ErrorInvalidOp
+ return
+ }
+ if f.ObjLen != 1 && f.ObjLen != 3 {
+ err = ErrorObjLenNotMatch
+ return
+ }
+
+ lvl = DurabilityLvl(uint8(f.ObjData[0]))
+
+ if f.ObjLen == 3 {
+ timeoutProvided = true
+ timeoutMs = binary.BigEndian.Uint16(f.ObjData[1:2])
+ }
+
+ return
+}
+
+func incrementMarker(bitsToBeIncremented, byteIncrementCnt *int, framingElen, curObjIdx int) (int, error) {
+ for *bitsToBeIncremented >= 8 {
+ *byteIncrementCnt++
+ *bitsToBeIncremented -= 8
+ }
+ marker := curObjIdx + *byteIncrementCnt
+ if marker > framingElen {
+ return -1, fmt.Errorf("Out of bounds")
+ }
+ return marker, nil
+}
+
+// Right now, halfByteRemaining will always be false, because ObjID and Len haven't gotten that large yet
+func (f *FrameInfo) Bytes() (output []byte, halfByteRemaining bool) {
+ // ObjIdentifier - 4 bits + ObjLength - 4 bits
+ var idAndLen uint8
+ idAndLen |= uint8(f.ObjId) << 4
+ idAndLen |= uint8(f.ObjLen)
+ output = append(output, byte(idAndLen))
+
+ // Rest is Data
+ output = append(output, f.ObjData...)
+ return
+}
+
+func parseFrameInfoObjects(buf []byte, framingElen int) (objs []FrameInfo, err error, halfByteRemaining bool) {
+ var curObjIdx int
+ var byteIncrementCnt int
+ var bitsToBeIncremented int
+ var marker int
+
+ // Parse frameInfo objects
+ for curObjIdx = 0; curObjIdx < framingElen; curObjIdx += byteIncrementCnt {
+ byteIncrementCnt = 0
+ var oneFrameObj FrameInfo
+
+ // First get the objId
+ // -------------------------
+ var objId int
+ var objHeader uint8 = buf[curObjIdx]
+ var objIdentifierRaw uint8
+ if bitsToBeIncremented == 0 {
+ // ObjHeader
+ // 0 1 2 3 4 5 6 7
+ // ^-----^
+ // ObjIdentifierRaw
+ objIdentifierRaw = (objHeader & 0xf0) >> 4
+ } else {
+ // ObjHeader
+ // 0 1 2 3 4 5 6 7
+ // ^-----^
+ // ObjIdentifierRaw
+ objIdentifierRaw = (objHeader & 0x0f)
+ }
+ bitsToBeIncremented += 4
+
+ marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+ if err != nil {
+ return
+ }
+
+ // Value is 0-14
+ objId = int(objIdentifierRaw & 0xe)
+ // If bit 15 is set, ID is 15 + value of next byte
+ if objIdentifierRaw&0x1 > 0 {
+ if bitsToBeIncremented > 0 {
+ // ObjHeader
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ // ^-----^ ^---------------^
+ // ObjId1 Extension
+ // ^ marker
+ buffer := uint16(buf[marker])
+ buffer = buffer << 8
+ buffer |= uint16(buf[marker+1])
+ var extension uint8 = uint8(buffer & 0xff0 >> 4)
+ objId += int(extension)
+ } else {
+ // ObjHeader
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ // ^-----^ ^-------------------^
+ // ObjId1 extension
+ // ^ marker
+ var extension uint8 = uint8(buf[marker])
+ objId += int(extension)
+ }
+ bitsToBeIncremented += 8
+ }
+
+ marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+ if err != nil {
+ return
+ }
+ oneFrameObj.ObjId = FrameObjType(objId)
+
+ // Then get the obj length
+ // -------------------------
+ var objLenRaw uint8
+ var objLen int
+ if bitsToBeIncremented > 0 {
+ // ObjHeader
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ // ^ ^---------^
+ // marker objLen
+ objLenRaw = uint8(buf[marker]) & 0x0f
+ } else {
+ // ObjHeader
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
+ // ^--------^
+ // objLen
+ // ^ marker
+ objLenRaw = uint8(buf[marker]) & 0xf0 >> 4
+ }
+ bitsToBeIncremented += 4
+
+ marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+ if err != nil {
+ return
+ }
+
+ // Length is 0-14
+ objLen = int(objLenRaw & 0xe)
+ // If bit 15 is set, lenghth is 15 + value of next byte
+ if objLenRaw&0x1 > 0 {
+ if bitsToBeIncremented == 0 {
+ // ObjHeader
+ // 12 13 14 15 16 17 18 19 20 21 22 23
+ // ^---------^ ^--------------------^
+ // objLen extension
+ // ^ marker
+ var extension uint8 = uint8(buf[marker])
+ objLen += int(extension)
+ } else {
+ // ObjHeader
+ // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
+ // ^--------^ ^---------------------^
+ // objLen extension
+ // ^ marker var buffer uint16
+ buffer := uint16(buf[marker])
+ buffer = buffer << 8
+ buffer |= uint16(buf[marker+1])
+ var extension uint8 = uint8(buffer & 0xff0 >> 4)
+ objLen += int(extension)
+ }
+ bitsToBeIncremented += 8
+ }
+
+ marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+ if err != nil {
+ return
+ }
+ oneFrameObj.ObjLen = objLen
+
+ // The rest is N-bytes of data based on the length
+ if bitsToBeIncremented == 0 {
+ // No weird alignment needed
+ oneFrameObj.ObjData = buf[marker : marker+objLen]
+ } else {
+ // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
+ // ^--------^ ^---------------------^ ^--------->
+ // objLen extension data
+ // ^ marker
+ oneFrameObj.ObjData = ShiftByteSliceLeft4Bits(buf[marker : marker+objLen+1])
+ }
+ err = oneFrameObj.Validate()
+ if err != nil {
+ return
+ }
+ objs = append(objs, oneFrameObj)
+
+ bitsToBeIncremented += 8 * objLen
+ marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx)
+ }
+
+ if bitsToBeIncremented > 0 {
+ halfByteRemaining = true
+ }
+ return
+}
+
+func ShiftByteSliceLeft4Bits(slice []byte) (replacement []byte) {
+ var buffer uint16
+ var i int
+ sliceLen := len(slice)
+
+ if sliceLen < 2 {
+ // Let's not shift less than 16 bits
+ return
+ }
+
+ replacement = make([]byte, sliceLen, cap(slice))
+
+ for i = 0; i < sliceLen-1; i++ {
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ // ^-----^ ^---------------^ ^-----------
+ // garbage data byte 0 data byte 1
+ buffer = uint16(slice[i])
+ buffer = buffer << 8
+ buffer |= uint16(slice[i+1])
+ replacement[i] = uint8(buffer & 0xff0 >> 4)
+ }
+
+ if i < sliceLen {
+ lastByte := slice[sliceLen-1]
+ lastByte = lastByte << 4
+ replacement[i] = lastByte
+ }
+ return
+}
+
+// The following is used to theoretically support frameInfo ObjID extensions
+// for completeness, but they are not very efficient though
+func ShiftByteSliceRight4Bits(slice []byte) (replacement []byte) {
+ var buffer uint16
+ var i int
+ var leftovers uint8 // 4 bits only
+ var replacementUnit uint16
+ var first bool = true
+ var firstLeftovers uint8
+ var lastLeftovers uint8
+ sliceLen := len(slice)
+
+ if sliceLen < 2 {
+ // Let's not shift less than 16 bits
+ return
+ }
+
+ if slice[sliceLen-1]&0xf == 0 {
+ replacement = make([]byte, sliceLen, cap(slice))
+ } else {
+ replacement = make([]byte, sliceLen+1, cap(slice)+1)
+ }
+
+ for i = 0; i < sliceLen-1; i++ {
+ buffer = binary.BigEndian.Uint16(slice[i : i+2])
+ // (buffer)
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ // ^-------------^ ^-------------------^
+ // data byte 0 data byte 1
+ //
+ // into
+ //
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
+ // ^-----^ ^---------------^ ^--------------------^ ^----------^
+ // zeroes data byte 0 data byte 1 zeroes
+
+ if first {
+ // The leftover OR'ing will overwrite the first 4 bits of data byte 0. Save them
+ firstLeftovers = uint8(buffer & 0xf000 >> 12)
+ first = false
+ }
+ replacementUnit = 0
+ replacementUnit |= uint16(leftovers) << 12
+ replacementUnit |= (buffer & 0xff00) >> 4 // data byte 0
+ replacementUnit |= buffer & 0xff >> 4 // data byte 1 first 4 bits
+ lastLeftovers = uint8(buffer&0xf) << 4
+
+ replacement[i+1] = byte(replacementUnit)
+
+ leftovers = uint8((buffer & 0x000f) << 4)
+ }
+
+ replacement[0] = byte(uint8(replacement[0]) | firstLeftovers)
+ if lastLeftovers > 0 {
+ replacement[sliceLen] = byte(lastLeftovers)
+ }
+ return
+}
+
+func Merge2HalfByteSlices(src1, src2 []byte) (output []byte) {
+ src1Len := len(src1)
+ src2Len := len(src2)
+ output = make([]byte, src1Len+src2Len-1)
+
+ var mergeByte uint8 = src1[src1Len-1]
+ mergeByte |= uint8(src2[0])
+
+ copy(output, src1)
+ copy(output[src1Len:], src2[1:])
+
+ output[src1Len-1] = byte(mergeByte)
+
+ return
+}
)
const (
- REQ_MAGIC = 0x80
- RES_MAGIC = 0x81
+ REQ_MAGIC = 0x80
+ RES_MAGIC = 0x81
+ FLEX_MAGIC = 0x08
+ FLEX_RES_MAGIC = 0x18
)
// CommandCode for memcached packets.
SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs
SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta.
+ DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred
+
)
// command codes that are counted toward DCP control buffer
Extras, Key, Body, ExtMeta []byte
// Datatype identifier
DataType uint8
+ // len() calls are expensive - cache this in case for collection
+ Keylen int
+ // Flexible Framing Extras
+ FramingExtras []FrameInfo
+ // Stored length of incoming framing extras
+ FramingElen int
}
// Size gives the number of bytes this request requires.
func (req *MCRequest) Size() int {
- return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta)
+ return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta) + req.FramingElen
}
// A debugging string representation of this request
req.Opcode, len(req.Body), req.Key)
}
-func (req *MCRequest) fillHeaderBytes(data []byte) int {
+func (req *MCRequest) fillRegularHeaderBytes(data []byte) int {
+ // Byte/ 0 | 1 | 2 | 3 |
+ // / | | | |
+ // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+ // +---------------+---------------+---------------+---------------+
+ // 0| Magic | Opcode | Key length |
+ // +---------------+---------------+---------------+---------------+
+ // 4| Extras length | Data type | vbucket id |
+ // +---------------+---------------+---------------+---------------+
+ // 8| Total body length |
+ // +---------------+---------------+---------------+---------------+
+ // 12| Opaque |
+ // +---------------+---------------+---------------+---------------+
+ // 16| CAS |
+ // | |
+ // +---------------+---------------+---------------+---------------+
+ // Total 24 bytes
pos := 0
data[pos] = REQ_MAGIC
copy(data[pos:pos+len(req.Key)], req.Key)
pos += len(req.Key)
}
-
return pos
}
+// Returns pos and if trailing by half byte
+func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) {
+
+ // Byte/ 0 | 1 | 2 | 3 |
+ // / | | | |
+ // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+ // +---------------+---------------+---------------+---------------+
+ // 0| Magic (0x08) | Opcode | Framing extras| Key Length |
+ // +---------------+---------------+---------------+---------------+
+ // 4| Extras length | Data type | vbucket id |
+ // +---------------+---------------+---------------+---------------+
+ // 8| Total body length |
+ // +---------------+---------------+---------------+---------------+
+ // 12| Opaque |
+ // +---------------+---------------+---------------+---------------+
+ // 16| CAS |
+ // | |
+ // +---------------+---------------+---------------+---------------+
+ // Total 24 bytes
+
+ data[0] = FLEX_MAGIC
+ data[1] = byte(req.Opcode)
+ data[2] = byte(req.FramingElen)
+ data[3] = byte(req.Keylen)
+ elen := len(req.Extras)
+ data[4] = byte(elen)
+ if req.DataType != 0 {
+ data[5] = byte(req.DataType)
+ }
+ binary.BigEndian.PutUint16(data[6:8], req.VBucket)
+ binary.BigEndian.PutUint32(data[8:12],
+ uint32(len(req.Body)+req.Keylen+elen+len(req.ExtMeta)+req.FramingElen))
+ binary.BigEndian.PutUint32(data[12:16], req.Opaque)
+ if req.Cas != 0 {
+ binary.BigEndian.PutUint64(data[16:24], req.Cas)
+ }
+ pos := HDR_LEN
+
+ // Add framing infos
+ var framingExtras []byte
+ var outputBytes []byte
+ var mergeModeSrc []byte
+ var frameBytes int
+ var halfByteMode bool
+ var mergeMode bool
+ for _, frameInfo := range req.FramingExtras {
+ if !mergeMode {
+ outputBytes, halfByteMode = frameInfo.Bytes()
+ if !halfByteMode {
+ framingExtras = append(framingExtras, outputBytes...)
+ frameBytes += len(outputBytes)
+ } else {
+ mergeMode = true
+ mergeModeSrc = outputBytes
+ }
+ } else {
+ outputBytes, halfByteMode = frameInfo.Bytes()
+ outputBytes := ShiftByteSliceRight4Bits(outputBytes)
+ if halfByteMode {
+ // Previous halfbyte merge with this halfbyte will result in a complete byte
+ mergeMode = false
+ outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
+ framingExtras = append(framingExtras, outputBytes...)
+ frameBytes += len(outputBytes)
+ } else {
+ // Merge half byte with a non-half byte will result in a combined half-byte that will
+ // become the source for the next iteration
+ mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes)
+ }
+ }
+ }
+
+ if mergeMode {
+ // Commit the temporary merge area into framingExtras
+ framingExtras = append(framingExtras, mergeModeSrc...)
+ frameBytes += len(mergeModeSrc)
+ }
+
+ copy(data[pos:pos+frameBytes], framingExtras)
+
+ pos += frameBytes
+
+ // Add Extras
+ if len(req.Extras) > 0 {
+ if mergeMode {
+ outputBytes = ShiftByteSliceRight4Bits(req.Extras)
+ data = Merge2HalfByteSlices(data, outputBytes)
+ } else {
+ copy(data[pos:pos+elen], req.Extras)
+ }
+ pos += elen
+ }
+
+ // Add keys
+ if req.Keylen > 0 {
+ if mergeMode {
+ outputBytes = ShiftByteSliceRight4Bits(req.Key)
+ data = Merge2HalfByteSlices(data, outputBytes)
+ } else {
+ copy(data[pos:pos+req.Keylen], req.Key)
+ }
+ pos += req.Keylen
+ }
+
+ return pos, mergeMode
+}
+
+func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool) {
+ if req.FramingElen == 0 {
+ return req.fillRegularHeaderBytes(data), false
+ } else {
+ return req.fillFlexHeaderBytes(data)
+ }
+}
+
// HeaderBytes will return the wire representation of the request header
// (with the extras and key).
func (req *MCRequest) HeaderBytes() []byte {
- data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key))
+ data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key)+req.FramingElen)
- req.fillHeaderBytes(data)
+ req.FillHeaderBytes(data)
return data
}
func (req *MCRequest) Bytes() []byte {
data := make([]byte, req.Size())
- pos := req.fillHeaderBytes(data)
+ pos, halfByteMode := req.FillHeaderBytes(data)
+ // TODO - the halfByteMode should be revisited for a more efficient
+ // way of doing things
if len(req.Body) > 0 {
- copy(data[pos:pos+len(req.Body)], req.Body)
+ if halfByteMode {
+ shifted := ShiftByteSliceRight4Bits(req.Body)
+ data = Merge2HalfByteSlices(data, shifted)
+ } else {
+ copy(data[pos:pos+len(req.Body)], req.Body)
+ }
}
if len(req.ExtMeta) > 0 {
- copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
+ if halfByteMode {
+ shifted := ShiftByteSliceRight4Bits(req.ExtMeta)
+ data = Merge2HalfByteSlices(data, shifted)
+ } else {
+ copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta)
+ }
}
-
return data
}
return
}
-// Receive will fill this MCRequest with the data from a reader.
-func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
- if len(hdrBytes) < HDR_LEN {
- hdrBytes = []byte{
- 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0}
- }
- n, err := io.ReadFull(r, hdrBytes)
- if err != nil {
- return n, err
- }
-
- if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC {
- return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
- }
-
- klen := int(binary.BigEndian.Uint16(hdrBytes[2:]))
- elen := int(hdrBytes[4])
+func (req *MCRequest) receiveHeaderCommon(hdrBytes []byte) (elen, totalBodyLen int) {
+ elen = int(hdrBytes[4])
// Data type at 5
req.DataType = uint8(hdrBytes[5])
req.Opcode = CommandCode(hdrBytes[1])
// Vbucket at 6:7
req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:])
- totalBodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:]))
+ totalBodyLen = int(binary.BigEndian.Uint32(hdrBytes[8:]))
req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:])
req.Cas = binary.BigEndian.Uint64(hdrBytes[16:])
+ return
+}
+func (req *MCRequest) receiveRegHeader(hdrBytes []byte) (elen, totalBodyLen int) {
+ elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes)
+ req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]))
+ return
+}
+
+func (req *MCRequest) receiveFlexibleFramingHeader(hdrBytes []byte) (elen, totalBodyLen, framingElen int) {
+ elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes)
+
+ // For flexible framing header, key length is a single byte at byte index 3
+ req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]) & 0x0ff)
+ // Flexible framing lengh is a single byte at index 2
+ framingElen = int(binary.BigEndian.Uint16(hdrBytes[2:]) >> 8)
+ req.FramingElen = framingElen
+ return
+}
+
+func (req *MCRequest) populateRegularBody(r io.Reader, totalBodyLen, elen int) (int, error) {
+ var m int
+ var err error
if totalBodyLen > 0 {
buf := make([]byte, totalBodyLen)
- m, err := io.ReadFull(r, buf)
- n += m
+ m, err = io.ReadFull(r, buf)
if err == nil {
if req.Opcode >= TAP_MUTATION &&
req.Opcode <= TAP_CHECKPOINT_END &&
}
req.Extras = buf[0:elen]
- req.Key = buf[elen : klen+elen]
+ req.Key = buf[elen : req.Keylen+elen]
// get the length of extended metadata
extMetaLen := 0
extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
}
- bodyLen := totalBodyLen - klen - elen - extMetaLen
+ bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen
if bodyLen > MaxBodyLen {
- return n, fmt.Errorf("%d is too big (max %d)",
+ return m, fmt.Errorf("%d is too big (max %d)",
bodyLen, MaxBodyLen)
}
- req.Body = buf[klen+elen : klen+elen+bodyLen]
- req.ExtMeta = buf[klen+elen+bodyLen:]
+ req.Body = buf[req.Keylen+elen : req.Keylen+elen+bodyLen]
+ req.ExtMeta = buf[req.Keylen+elen+bodyLen:]
+ }
+ }
+ return m, err
+}
+
+func (req *MCRequest) populateFlexBody(r io.Reader, totalBodyLen, elen, framingElen int) (int, error) {
+ var m int
+ var err error
+ if totalBodyLen > 0 {
+ buf := make([]byte, totalBodyLen)
+ m, err = io.ReadFull(r, buf)
+ if err != nil {
+ return m, err
+ }
+ err = req.populateFlexBodyInternal(buf, totalBodyLen, elen, framingElen)
+ }
+ return m, err
+}
+
+func (req *MCRequest) populateFlexBodyInternal(buf []byte, totalBodyLen, elen, framingElen int) error {
+ var halfByteOffset bool
+ var err error
+ if framingElen > 0 {
+ var objs []FrameInfo
+ objs, err, halfByteOffset = parseFrameInfoObjects(buf, framingElen)
+ if err != nil {
+ return err
}
+ req.FramingExtras = objs
+ }
+
+ err = req.populateFlexBodyAfterFrames(buf, totalBodyLen, elen, framingElen, halfByteOffset)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (req *MCRequest) populateFlexBodyAfterFrames(buf []byte, totalBodyLen, elen, framingElen int, halfByteOffset bool) error {
+ var idxCursor int = framingElen
+ if req.Opcode >= TAP_MUTATION && req.Opcode <= TAP_CHECKPOINT_END && len(buf[idxCursor:]) > 1 {
+ // In these commands there is "engine private"
+ // data at the end of the extras. The first 2
+ // bytes of extra data give its length.
+ if !halfByteOffset {
+ elen += int(binary.BigEndian.Uint16(buf[idxCursor:]))
+ } else {
+ // 0 1 2 3 4 .... 19 20 21 22 ... 32
+ // ^-----^ ^-------^ ^------------^
+ // offset data do not care
+ var buffer uint32 = binary.BigEndian.Uint32(buf[idxCursor:])
+ buffer &= 0xffff000
+ elen += int(buffer >> 12)
+ }
+ }
+
+ // Get the extras
+ if !halfByteOffset {
+ req.Extras = buf[idxCursor : idxCursor+elen]
+ } else {
+ preShift := buf[idxCursor : idxCursor+elen+1]
+ req.Extras = ShiftByteSliceLeft4Bits(preShift)
+ }
+ idxCursor += elen
+
+ // Get the Key
+ if !halfByteOffset {
+ req.Key = buf[idxCursor : idxCursor+req.Keylen]
+ } else {
+ preShift := buf[idxCursor : idxCursor+req.Keylen+1]
+ req.Key = ShiftByteSliceLeft4Bits(preShift)
+ }
+ idxCursor += req.Keylen
+
+ // get the length of extended metadata
+ extMetaLen := 0
+ if elen > 29 {
+ extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30]))
+ }
+ idxCursor += extMetaLen
+
+ bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen - framingElen
+ if bodyLen > MaxBodyLen {
+ return fmt.Errorf("%d is too big (max %d)",
+ bodyLen, MaxBodyLen)
+ }
+
+ if !halfByteOffset {
+ req.Body = buf[idxCursor : idxCursor+bodyLen]
+ idxCursor += bodyLen
+ } else {
+ preShift := buf[idxCursor : idxCursor+bodyLen+1]
+ req.Body = ShiftByteSliceLeft4Bits(preShift)
+ idxCursor += bodyLen
+ }
+
+ if extMetaLen > 0 {
+ if !halfByteOffset {
+ req.ExtMeta = buf[idxCursor:]
+ } else {
+ preShift := buf[idxCursor:]
+ req.ExtMeta = ShiftByteSliceLeft4Bits(preShift)
+ }
+ }
+
+ return nil
+}
+
+// Receive will fill this MCRequest with the data from a reader.
+func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) {
+ if len(hdrBytes) < HDR_LEN {
+ hdrBytes = []byte{
+ 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0}
+ }
+ n, err := io.ReadFull(r, hdrBytes)
+ if err != nil {
+ fmt.Printf("Err %v\n", err)
+ return n, err
+ }
+
+ switch hdrBytes[0] {
+ case RES_MAGIC:
+ fallthrough
+ case REQ_MAGIC:
+ elen, totalBodyLen := req.receiveRegHeader(hdrBytes)
+ bodyRead, err := req.populateRegularBody(r, totalBodyLen, elen)
+ return n + bodyRead, err
+ case FLEX_MAGIC:
+ elen, totalBodyLen, framingElen := req.receiveFlexibleFramingHeader(hdrBytes)
+ bodyRead, err := req.populateFlexBody(r, totalBodyLen, elen, framingElen)
+ return n + bodyRead, err
+ default:
+ return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
}
- return n, err
}
// Receive will fill this MCResponse with the data from this reader.
func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) {
+ return res.ReceiveWithBuf(r, hdrBytes, nil)
+}
+
+// ReceiveWithBuf takes an optional pre-allocated []byte buf which
+// will be used if its capacity is large enough, otherwise a new
+// []byte slice is allocated.
+func (res *MCResponse) ReceiveWithBuf(r io.Reader, hdrBytes, buf []byte) (n int, err error) {
if len(hdrBytes) < HDR_LEN {
hdrBytes = []byte{
0, 0, 0, 0, 0, 0, 0, 0,
}
}()
- buf := make([]byte, klen+elen+bodyLen)
+ bufNeed := klen + elen + bodyLen
+ if buf != nil && cap(buf) >= bufNeed {
+ buf = buf[0:bufNeed]
+ } else {
+ buf = make([]byte, bufNeed)
+ }
+
m, err := io.ReadFull(r, buf)
if err == nil {
res.Extras = buf[0:elen]
-COUCHBASE INC. COMMUNITY EDITION LICENSE AGREEMENT
-
-IMPORTANT-READ CAREFULLY: BY CLICKING THE "I ACCEPT" BOX OR INSTALLING,
-DOWNLOADING OR OTHERWISE USING THIS SOFTWARE AND ANY ASSOCIATED
-DOCUMENTATION, YOU, ON BEHALF OF YOURSELF OR AS AN AUTHORIZED
-REPRESENTATIVE ON BEHALF OF AN ENTITY ("LICENSEE") AGREE TO ALL THE
-TERMS OF THIS COMMUNITY EDITION LICENSE AGREEMENT (THE "AGREEMENT")
-REGARDING YOUR USE OF THE SOFTWARE. YOU REPRESENT AND WARRANT THAT YOU
-HAVE FULL LEGAL AUTHORITY TO BIND THE LICENSEE TO THIS AGREEMENT. IF YOU
-DO NOT AGREE WITH ALL OF THESE TERMS, DO NOT SELECT THE "I ACCEPT" BOX
-AND DO NOT INSTALL, DOWNLOAD OR OTHERWISE USE THE SOFTWARE. THE
-EFFECTIVE DATE OF THIS AGREEMENT IS THE DATE ON WHICH YOU CLICK "I
-ACCEPT" OR OTHERWISE INSTALL, DOWNLOAD OR USE THE SOFTWARE.
-
-1. License Grant. Couchbase Inc. hereby grants Licensee, free of charge,
-the non-exclusive right to use, copy, merge, publish, distribute,
-sublicense, and/or sell copies of the Software, and to permit persons to
-whom the Software is furnished to do so, subject to Licensee including
-the following copyright notice in all copies or substantial portions of
-the Software:
-
-Couchbase (r) http://www.Couchbase.com Copyright 2016 Couchbase, Inc.
-
-As used in this Agreement, "Software" means the object code version of
-the applicable elastic data management server software provided by
-Couchbase Inc.
-
-2. Restrictions. Licensee will not reverse engineer, disassemble, or
-decompile the Software (except to the extent such restrictions are
-prohibited by law).
-
-3. Support. Couchbase, Inc. will provide Licensee with access to, and
-use of, the Couchbase, Inc. support forum available at the following
-URL: http://www.couchbase.org/forums/. Couchbase, Inc. may, at its
-discretion, modify, suspend or terminate support at any time upon notice
-to Licensee.
-
-4. Warranty Disclaimer and Limitation of Liability. THE SOFTWARE IS
-PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
-INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
-COUCHBASE INC. OR THE AUTHORS OR COPYRIGHT HOLDERS IN THE SOFTWARE BE
-LIABLE FOR ANY CLAIM, DAMAGES (IINCLUDING, WITHOUT LIMITATION, DIRECT,
-INDIRECT OR CONSEQUENTIAL DAMAGES) OR OTHER LIABILITY, WHETHER IN AN
-ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
+Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
github.com/boombuler/barcode/utils
# github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668
github.com/bradfitz/gomemcache/memcache
-# github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d
+# github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2
github.com/couchbase/gomemcached
github.com/couchbase/gomemcached/client
-# github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b
+# github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85
github.com/couchbase/goutils/logging
github.com/couchbase/goutils/scramsha
# github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd