diff options
author | Tamal Saha <tamal@appscode.com> | 2019-08-23 09:40:30 -0700 |
---|---|---|
committer | techknowlogick <techknowlogick@gitea.io> | 2019-08-23 12:40:29 -0400 |
commit | 171b3598778a1ecd0a921c71ed6755bfef68f7f0 (patch) | |
tree | 02857629ef9e8e26ee0ee559153f803f77b588b7 /vendor/github.com/couchbase | |
parent | ca6fb004ac50fc924861112403895d637c6a2d1d (diff) | |
download | gitea-171b3598778a1ecd0a921c71ed6755bfef68f7f0.tar.gz gitea-171b3598778a1ecd0a921c71ed6755bfef68f7f0.zip |
Use gitea forked macaron (#7933)
Signed-off-by: Tamal Saha <tamal@appscode.com>
Diffstat (limited to 'vendor/github.com/couchbase')
5 files changed, 276 insertions, 50 deletions
diff --git a/vendor/github.com/couchbase/gomemcached/client/mc.go b/vendor/github.com/couchbase/gomemcached/client/mc.go index bd1433ba28..0f1d61e512 100644 --- a/vendor/github.com/couchbase/gomemcached/client/mc.go +++ b/vendor/github.com/couchbase/gomemcached/client/mc.go @@ -2,6 +2,7 @@ package memcached import ( + "crypto/tls" "encoding/binary" "fmt" "github.com/couchbase/gomemcached" @@ -26,11 +27,14 @@ type ClientIface interface { AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) CASNext(vb uint16, k string, exp int, state *CASState) bool CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error) + CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) Close() error Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error) Del(vb uint16, key string) (*gomemcached.MCResponse, error) EnableMutationToken() (*gomemcached.MCResponse, error) Get(vb uint16, key string) (*gomemcached.MCResponse, error) + GetCollectionsManifest() (*gomemcached.MCResponse, error) + GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error @@ -74,11 +78,19 @@ type Feature uint16 const FeatureMutationToken = Feature(0x04) const FeatureXattr = Feature(0x06) +const FeatureCollections = Feature(0x12) const FeatureDataType = Feature(0x0b) +type memcachedConnection interface { + io.ReadWriteCloser + + SetReadDeadline(time.Time) error + SetDeadline(time.Time) error +} + // The Client itself. type Client struct { - conn io.ReadWriteCloser + conn memcachedConnection // use uint32 type so that it can be accessed through atomic APIs healthy uint32 opaque uint32 @@ -105,6 +117,15 @@ func Connect(prot, dest string) (rv *Client, err error) { return Wrap(conn) } +// Connect to a memcached server using TLS. +func ConnectTLS(prot, dest string, config *tls.Config) (rv *Client, err error) { + conn, err := tls.Dial(prot, dest, config) + if err != nil { + return nil, err + } + return Wrap(conn) +} + func SetDefaultTimeouts(dial, read, write time.Duration) { DefaultDialTimeout = dial DefaultWriteTimeout = write @@ -115,22 +136,25 @@ func SetDefaultDialTimeout(dial time.Duration) { } func (c *Client) SetKeepAliveOptions(interval time.Duration) { - c.conn.(*net.TCPConn).SetKeepAlive(true) - c.conn.(*net.TCPConn).SetKeepAlivePeriod(interval) + tcpConn, ok := c.conn.(*net.TCPConn) + if ok { + tcpConn.SetKeepAlive(true) + tcpConn.SetKeepAlivePeriod(interval) + } } func (c *Client) SetReadDeadline(t time.Time) { - c.conn.(*net.TCPConn).SetReadDeadline(t) + c.conn.SetReadDeadline(t) } func (c *Client) SetDeadline(t time.Time) { - c.conn.(*net.TCPConn).SetDeadline(t) + c.conn.SetDeadline(t) } // Wrap an existing transport. -func Wrap(rwc io.ReadWriteCloser) (rv *Client, err error) { +func Wrap(conn memcachedConnection) (rv *Client, err error) { client := &Client{ - conn: rwc, + conn: conn, hdrBuf: make([]byte, gomemcached.HDR_LEN), opaque: uint32(1), } @@ -278,6 +302,22 @@ func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error) { }) } +// Get the value for a key from a collection, identified by collection id. +func (c *Client) GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error) { + keyBytes := []byte(key) + encodedCid := make([]byte, binary.MaxVarintLen32) + lenEncodedCid := binary.PutUvarint(encodedCid, uint64(cid)) + encodedKey := make([]byte, 0, lenEncodedCid+len(keyBytes)) + encodedKey = append(encodedKey, encodedCid[0:lenEncodedCid]...) + encodedKey = append(encodedKey, keyBytes...) + + return c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.GET, + VBucket: vb, + Key: encodedKey, + }) +} + // Get the xattrs, doc value for the input key func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) { @@ -296,6 +336,33 @@ func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcach return res, nil } +// Retrieve the collections manifest. +func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error) { + + res, err := c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.GET_COLLECTIONS_MANIFEST, + }) + + if err != nil && IfResStatusError(res) { + return res, err + } + return res, nil +} + +// Retrieve the collections manifest. +func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) { + + res, err := c.Send(&gomemcached.MCRequest{ + Opcode: gomemcached.COLLECTIONS_GET_CID, + Key: []byte(scope + "." + collection), + }) + + if err != nil && IfResStatusError(res) { + return res, err + } + return res, nil +} + // Get the value for a key, and update expiry func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) { extraBuf := make([]byte, 4) @@ -425,10 +492,9 @@ func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) { // select bucket func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) { - return c.Send(&gomemcached.MCRequest{ Opcode: gomemcached.SELECT_BUCKET, - Key: []byte(fmt.Sprintf("%s", bucket))}) + Key: []byte(bucket)}) } func (c *Client) store(opcode gomemcached.CommandCode, vb uint16, diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go index dc737e6cc0..95fa12577f 100644 --- a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go +++ b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go @@ -53,6 +53,16 @@ type UprEvent struct { AckSize uint32 // The number of bytes that can be Acked to DCP } +type PriorityType string + +// high > medium > disabled > low +const ( + PriorityDisabled PriorityType = "" + PriorityLow PriorityType = "low" + PriorityMed PriorityType = "medium" + PriorityHigh PriorityType = "high" +) + // UprStream is per stream data structure over an UPR Connection. type UprStream struct { Vbucket uint16 // Vbucket id @@ -62,6 +72,27 @@ type UprStream struct { connected bool } +type FeedState int + +const ( + FeedStateInitial = iota + FeedStateOpened = iota + FeedStateClosed = iota +) + +func (fs FeedState) String() string { + switch fs { + case FeedStateInitial: + return "Initial" + case FeedStateOpened: + return "Opened" + case FeedStateClosed: + return "Closed" + default: + return "Unknown" + } +} + const ( CompressionTypeStartMarker = iota // also means invalid CompressionTypeNone = iota @@ -80,6 +111,8 @@ type UprFeatures struct { Xattribute bool CompressionType int IncludeDeletionTime bool + DcpPriority PriorityType + EnableExpiry bool } /** @@ -179,7 +212,7 @@ func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed, stream, err := negotiator.getStreamFromMap(vbno, opaque) if err != nil { - err = fmt.Errorf("Stream not found for vb %d appOpaque %v: %#v", vbno, appOpaque, *pktPtr) + err = fmt.Errorf("Stream not found for vb %d: %#v", vbno, *pktPtr) logging.Errorf(err.Error()) return nil, err } @@ -226,8 +259,6 @@ func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) { type UprFeed struct { // lock for feed.vbstreams muVbstreams sync.RWMutex - // lock for feed.closed - muClosed sync.RWMutex C <-chan *UprEvent // Exported channel for receiving UPR events negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation vbstreams map[uint16]*UprStream // official live vb->stream mapping @@ -240,12 +271,12 @@ type UprFeed struct { stats UprStats // Stats for upr client transmitCh chan *gomemcached.MCRequest // transmit command channel transmitCl chan bool // closer channel for transmit go-routine - closed bool // flag indicating whether the feed has been closed - // flag indicating whether client of upr feed will send ack to upr feed // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP // if flag is false, upr feed will track how many bytes it has sent to client // and use that to determine whether/when to send ack to DCP ackByClient bool + feedState FeedState + muFeedState sync.RWMutex } // Exported interface - to allow for mocking @@ -263,6 +294,8 @@ type UprFeedIface interface { UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error + // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response + SetPriorityAsync(p PriorityType) error } type UprStats struct { @@ -494,6 +527,31 @@ func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize u return feed.uprOpen(name, sequence, bufSize, features) } +func (feed *UprFeed) SetPriorityAsync(p PriorityType) error { + if !feed.isOpen() { + // do not send this command if upr feed is not yet open, otherwise it may interfere with + // feed start up process, which relies on synchronous message exchange with DCP. + return fmt.Errorf("Upr feed is not open. State=%v", feed.getState()) + } + + return feed.setPriority(p, false /*sync*/) +} + +func (feed *UprFeed) setPriority(p PriorityType, sync bool) error { + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("set_priority"), + Body: []byte(p), + Opaque: getUprOpenCtrlOpaque(), + } + if sync { + return sendMcRequestSync(feed.conn, rq) + } else { + return feed.writeToTransmitCh(rq) + + } +} + func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) { mc := feed.conn @@ -561,6 +619,31 @@ func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, featu } activatedFeatures.CompressionType = features.CompressionType + if features.DcpPriority != PriorityDisabled { + err = feed.setPriority(features.DcpPriority, true /*sync*/) + if err == nil { + activatedFeatures.DcpPriority = features.DcpPriority + } else { + return + } + } + + if features.EnableExpiry { + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("enable_expiry_opcode"), + Body: []byte("true"), + Opaque: getUprOpenCtrlOpaque(), + } + err = sendMcRequestSync(feed.conn, rq) + if err != nil { + return + } + activatedFeatures.EnableExpiry = true + } + + // everything is ok so far, set upr feed to open state + feed.setOpen() return } @@ -988,18 +1071,37 @@ func vbOpaque(opq32 uint32) uint16 { // Close this UprFeed. func (feed *UprFeed) Close() { - feed.muClosed.Lock() - defer feed.muClosed.Unlock() - if !feed.closed { + feed.muFeedState.Lock() + defer feed.muFeedState.Unlock() + if feed.feedState != FeedStateClosed { close(feed.closer) - feed.closed = true + feed.feedState = FeedStateClosed feed.negotiator.initialize() } } // check if the UprFeed has been closed func (feed *UprFeed) Closed() bool { - feed.muClosed.RLock() - defer feed.muClosed.RUnlock() - return feed.closed + feed.muFeedState.RLock() + defer feed.muFeedState.RUnlock() + return feed.feedState == FeedStateClosed +} + +// set upr feed to opened state after initialization is done +func (feed *UprFeed) setOpen() { + feed.muFeedState.Lock() + defer feed.muFeedState.Unlock() + feed.feedState = FeedStateOpened +} + +func (feed *UprFeed) isOpen() bool { + feed.muFeedState.RLock() + defer feed.muFeedState.RUnlock() + return feed.feedState == FeedStateOpened +} + +func (feed *UprFeed) getState() FeedState { + feed.muFeedState.RLock() + defer feed.muFeedState.RUnlock() + return feed.feedState } diff --git a/vendor/github.com/couchbase/gomemcached/mc_constants.go b/vendor/github.com/couchbase/gomemcached/mc_constants.go index 1d5027d16c..32f4f51852 100644 --- a/vendor/github.com/couchbase/gomemcached/mc_constants.go +++ b/vendor/github.com/couchbase/gomemcached/mc_constants.go @@ -93,9 +93,12 @@ const ( OBSERVE_SEQNO = CommandCode(0x91) // Sequence Number based Observe OBSERVE = CommandCode(0x92) - GET_META = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc - SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs - SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta. + GET_META = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc + GET_COLLECTIONS_MANIFEST = CommandCode(0xba) // Get entire collections manifest. + COLLECTIONS_GET_CID = CommandCode(0xbb) // Get collection id. + SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs + SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta. + ) // command codes that are counted toward DCP control buffer @@ -113,29 +116,33 @@ type Status uint16 // Matches with protocol_binary.h as source of truth const ( - SUCCESS = Status(0x00) - KEY_ENOENT = Status(0x01) - KEY_EEXISTS = Status(0x02) - E2BIG = Status(0x03) - EINVAL = Status(0x04) - NOT_STORED = Status(0x05) - DELTA_BADVAL = Status(0x06) - NOT_MY_VBUCKET = Status(0x07) - NO_BUCKET = Status(0x08) - LOCKED = Status(0x09) - AUTH_STALE = Status(0x1f) - AUTH_ERROR = Status(0x20) - AUTH_CONTINUE = Status(0x21) - ERANGE = Status(0x22) - ROLLBACK = Status(0x23) - EACCESS = Status(0x24) - NOT_INITIALIZED = Status(0x25) - UNKNOWN_COMMAND = Status(0x81) - ENOMEM = Status(0x82) - NOT_SUPPORTED = Status(0x83) - EINTERNAL = Status(0x84) - EBUSY = Status(0x85) - TMPFAIL = Status(0x86) + SUCCESS = Status(0x00) + KEY_ENOENT = Status(0x01) + KEY_EEXISTS = Status(0x02) + E2BIG = Status(0x03) + EINVAL = Status(0x04) + NOT_STORED = Status(0x05) + DELTA_BADVAL = Status(0x06) + NOT_MY_VBUCKET = Status(0x07) + NO_BUCKET = Status(0x08) + LOCKED = Status(0x09) + AUTH_STALE = Status(0x1f) + AUTH_ERROR = Status(0x20) + AUTH_CONTINUE = Status(0x21) + ERANGE = Status(0x22) + ROLLBACK = Status(0x23) + EACCESS = Status(0x24) + NOT_INITIALIZED = Status(0x25) + UNKNOWN_COMMAND = Status(0x81) + ENOMEM = Status(0x82) + NOT_SUPPORTED = Status(0x83) + EINTERNAL = Status(0x84) + EBUSY = Status(0x85) + TMPFAIL = Status(0x86) + UNKNOWN_COLLECTION = Status(0x88) + + SYNC_WRITE_IN_PROGRESS = Status(0xa2) + SYNC_WRITE_AMBIGUOUS = Status(0xa3) // SUBDOC SUBDOC_PATH_NOT_FOUND = Status(0xc0) @@ -261,6 +268,8 @@ func init() { CommandNames[UPR_CONTROL] = "UPR_CONTROL" CommandNames[SUBDOC_GET] = "SUBDOC_GET" CommandNames[SUBDOC_MULTI_LOOKUP] = "SUBDOC_MULTI_LOOKUP" + CommandNames[GET_COLLECTIONS_MANIFEST] = "GET_COLLECTIONS_MANIFEST" + CommandNames[COLLECTIONS_GET_CID] = "COLLECTIONS_GET_CID" StatusNames = make(map[Status]string) StatusNames[SUCCESS] = "SUCCESS" @@ -285,6 +294,7 @@ func init() { StatusNames[EINTERNAL] = "EINTERNAL" StatusNames[EBUSY] = "EBUSY" StatusNames[TMPFAIL] = "TMPFAIL" + StatusNames[UNKNOWN_COLLECTION] = "UNKNOWN_COLLECTION" StatusNames[SUBDOC_PATH_NOT_FOUND] = "SUBDOC_PATH_NOT_FOUND" StatusNames[SUBDOC_BAD_MULTI] = "SUBDOC_BAD_MULTI" diff --git a/vendor/github.com/couchbase/goutils/logging/logger.go b/vendor/github.com/couchbase/goutils/logging/logger.go index b9948f9b2e..ab854635bf 100644 --- a/vendor/github.com/couchbase/goutils/logging/logger.go +++ b/vendor/github.com/couchbase/goutils/logging/logger.go @@ -36,6 +36,7 @@ const ( TEXTFORMATTER = LogEntryFormatter(iota) JSONFORMATTER KVFORMATTER + UNIFORMFORMATTER ) func (level Level) String() string { @@ -476,6 +477,6 @@ func Stackf(level Level, fmt string, args ...interface{}) { } func init() { - logger = NewLogger(os.Stderr, INFO, TEXTFORMATTER) + logger := NewLogger(os.Stderr, INFO, TEXTFORMATTER) SetLogger(logger) } diff --git a/vendor/github.com/couchbase/goutils/logging/logger_golog.go b/vendor/github.com/couchbase/goutils/logging/logger_golog.go index eec432a513..14fd3c391d 100644 --- a/vendor/github.com/couchbase/goutils/logging/logger_golog.go +++ b/vendor/github.com/couchbase/goutils/logging/logger_golog.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Couchbase, Inc. +// Copyright (c) 2016-2019 Couchbase, Inc. // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file // except in compliance with the License. You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 @@ -31,7 +31,7 @@ const ( _RLEVEL = "_rlevel" ) -func NewLogger(out io.Writer, lvl Level, fmtLogging LogEntryFormatter) *goLogger { +func NewLogger(out io.Writer, lvl Level, fmtLogging LogEntryFormatter, fmtArgs ...interface{}) *goLogger { logger := &goLogger{ logger: log.New(out, "", 0), level: lvl, @@ -40,6 +40,10 @@ func NewLogger(out io.Writer, lvl Level, fmtLogging LogEntryFormatter) *goLogger logger.entryFormatter = &jsonFormatter{} } else if fmtLogging == KVFORMATTER { logger.entryFormatter = &keyvalueFormatter{} + } else if fmtLogging == UNIFORMFORMATTER { + logger.entryFormatter = &uniformFormatter{ + callback: fmtArgs[0].(ComponentCallback), + } } else { logger.entryFormatter = &textFormatter{} } @@ -316,3 +320,46 @@ func (*jsonFormatter) format(newEntry *logEntry) string { s := bytes.NewBuffer(append(serialized, '\n')) return s.String() } + +type ComponentCallback func() string + +type uniformFormatter struct { + callback ComponentCallback +} + +// ex. 2019-03-15T11:28:07.652-04:00 DEBU COMPONENT.subcomponent This is a message from test in uniform format + +var _LEVEL_UNIFORM = []string{ + DEBUG: "DEBU", + TRACE: "TRAC", + REQUEST: "REQU", + INFO: "INFO", + WARN: "WARN", + ERROR: "ERRO", + SEVERE: "SEVE", + FATAL: "FATA", + NONE: "NONE", +} + +func (level Level) UniformString() string { + return _LEVEL_UNIFORM[level] +} + +func (uf *uniformFormatter) format(newEntry *logEntry) string { + b := &bytes.Buffer{} + appendValue(b, newEntry.Time) + component := uf.callback() + if newEntry.Rlevel != NONE { + // not really any accommodation for a composite level in the uniform standard; just output as abbr,abbr + fmt.Fprintf(b, "%s,%s %s ", newEntry.Level.UniformString(), newEntry.Rlevel.UniformString(), component) + } else { + fmt.Fprintf(b, "%s %s ", newEntry.Level.UniformString(), component) + } + appendValue(b, newEntry.Message) + for key, value := range newEntry.Data { + appendKeyValue(b, key, value) + } + b.WriteByte('\n') + s := bytes.NewBuffer(b.Bytes()) + return s.String() +} |