You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

upr_feed.go 32KB


  1. // go implementation of upr client.
  2. // See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md
  3. // TODO
  4. // 1. Use a pool allocator to avoid garbage
  5. package memcached
  6. import (
  7. "encoding/binary"
  8. "errors"
  9. "fmt"
  10. "github.com/couchbase/gomemcached"
  11. "github.com/couchbase/goutils/logging"
  12. "strconv"
  13. "sync"
  14. "sync/atomic"
  15. )
  16. const uprMutationExtraLen = 30
  17. const uprDeletetionExtraLen = 18
  18. const uprDeletetionWithDeletionTimeExtraLen = 21
  19. const uprSnapshotExtraLen = 20
  20. const bufferAckThreshold = 0.2
  21. const opaqueOpen = 0xBEAF0001
  22. const opaqueFailover = 0xDEADBEEF
  23. const uprDefaultNoopInterval = 120
  24. // Counter on top of opaqueOpen that others can draw from for open and control msgs
  25. var opaqueOpenCtrlWell uint32 = opaqueOpen
  26. // UprEvent memcached events for UPR streams.
  27. type UprEvent struct {
  28. Opcode gomemcached.CommandCode // Type of event
  29. Status gomemcached.Status // Response status
  30. VBucket uint16 // VBucket this event applies to
  31. DataType uint8 // data type
  32. Opaque uint16 // 16 MSB of opaque
  33. VBuuid uint64 // This field is set by downstream
  34. Flags uint32 // Item flags
  35. Expiry uint32 // Item expiration time
  36. Key, Value []byte // Item key/value
  37. OldValue []byte // TODO: TBD: old document value
  38. Cas uint64 // CAS value of the item
  39. Seqno uint64 // sequence number of the mutation
  40. RevSeqno uint64 // rev sequence number : deletions
  41. LockTime uint32 // Lock time
  42. MetadataSize uint16 // Metadata size
  43. SnapstartSeq uint64 // start sequence number of this snapshot
  44. SnapendSeq uint64 // End sequence number of the snapshot
  45. SnapshotType uint32 // 0: disk 1: memory
  46. FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
  47. Error error // Error value in case of a failure
  48. ExtMeta []byte
  49. AckSize uint32 // The number of bytes that can be Acked to DCP
  50. }
  51. type PriorityType string
  52. // high > medium > disabled > low
  53. const (
  54. PriorityDisabled PriorityType = ""
  55. PriorityLow PriorityType = "low"
  56. PriorityMed PriorityType = "medium"
  57. PriorityHigh PriorityType = "high"
  58. )
  59. // UprStream is per stream data structure over an UPR Connection.
  60. type UprStream struct {
  61. Vbucket uint16 // Vbucket id
  62. Vbuuid uint64 // vbucket uuid
  63. StartSeq uint64 // start sequence number
  64. EndSeq uint64 // end sequence number
  65. connected bool
  66. }
  67. type FeedState int
  68. const (
  69. FeedStateInitial = iota
  70. FeedStateOpened = iota
  71. FeedStateClosed = iota
  72. )
  73. func (fs FeedState) String() string {
  74. switch fs {
  75. case FeedStateInitial:
  76. return "Initial"
  77. case FeedStateOpened:
  78. return "Opened"
  79. case FeedStateClosed:
  80. return "Closed"
  81. default:
  82. return "Unknown"
  83. }
  84. }
  85. const (
  86. CompressionTypeStartMarker = iota // also means invalid
  87. CompressionTypeNone = iota
  88. CompressionTypeSnappy = iota
  89. CompressionTypeEndMarker = iota // also means invalid
  90. )
  91. // kv_engine/include/mcbp/protocol/datatype.h
  92. const (
  93. JSONDataType uint8 = 1
  94. SnappyDataType uint8 = 2
  95. XattrDataType uint8 = 4
  96. )
  97. type UprFeatures struct {
  98. Xattribute bool
  99. CompressionType int
  100. IncludeDeletionTime bool
  101. DcpPriority PriorityType
  102. EnableExpiry bool
  103. }
  104. /**
  105. * Used to handle multiple concurrent calls UprRequestStream() by UprFeed clients
  106. * It is expected that a client that calls UprRequestStream() more than once should issue
  107. * different "opaque" (version) numbers
  108. */
  109. type opaqueStreamMap map[uint16]*UprStream // opaque -> stream
  110. type vbStreamNegotiator struct {
  111. vbHandshakeMap map[uint16]opaqueStreamMap // vbno -> opaqueStreamMap
  112. mutex sync.RWMutex
  113. }
  114. func (negotiator *vbStreamNegotiator) initialize() {
  115. negotiator.mutex.Lock()
  116. negotiator.vbHandshakeMap = make(map[uint16]opaqueStreamMap)
  117. negotiator.mutex.Unlock()
  118. }
  119. func (negotiator *vbStreamNegotiator) registerRequest(vbno, opaque uint16, vbuuid, startSequence, endSequence uint64) {
  120. negotiator.mutex.Lock()
  121. defer negotiator.mutex.Unlock()
  122. var osMap opaqueStreamMap
  123. var ok bool
  124. if osMap, ok = negotiator.vbHandshakeMap[vbno]; !ok {
  125. osMap = make(opaqueStreamMap)
  126. negotiator.vbHandshakeMap[vbno] = osMap
  127. }
  128. if _, ok = osMap[opaque]; !ok {
  129. osMap[opaque] = &UprStream{
  130. Vbucket: vbno,
  131. Vbuuid: vbuuid,
  132. StartSeq: startSequence,
  133. EndSeq: endSequence,
  134. }
  135. }
  136. }
  137. func (negotiator *vbStreamNegotiator) getStreamsCntFromMap(vbno uint16) int {
  138. negotiator.mutex.RLock()
  139. defer negotiator.mutex.RUnlock()
  140. osmap, ok := negotiator.vbHandshakeMap[vbno]
  141. if !ok {
  142. return 0
  143. } else {
  144. return len(osmap)
  145. }
  146. }
  147. func (negotiator *vbStreamNegotiator) getStreamFromMap(vbno, opaque uint16) (*UprStream, error) {
  148. negotiator.mutex.RLock()
  149. defer negotiator.mutex.RUnlock()
  150. osmap, ok := negotiator.vbHandshakeMap[vbno]
  151. if !ok {
  152. return nil, fmt.Errorf("Error: stream for vb: %v does not exist", vbno)
  153. }
  154. stream, ok := osmap[opaque]
  155. if !ok {
  156. return nil, fmt.Errorf("Error: stream for vb: %v opaque: %v does not exist", vbno, opaque)
  157. }
  158. return stream, nil
  159. }
  160. func (negotiator *vbStreamNegotiator) deleteStreamFromMap(vbno, opaque uint16) {
  161. negotiator.mutex.Lock()
  162. defer negotiator.mutex.Unlock()
  163. osmap, ok := negotiator.vbHandshakeMap[vbno]
  164. if !ok {
  165. return
  166. }
  167. delete(osmap, opaque)
  168. if len(osmap) == 0 {
  169. delete(negotiator.vbHandshakeMap, vbno)
  170. }
  171. }
  172. func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed,
  173. headerBuf [gomemcached.HDR_LEN]byte, pktPtr *gomemcached.MCRequest, bytesReceivedFromDCP int,
  174. response *gomemcached.MCResponse) (*UprEvent, error) {
  175. var event *UprEvent
  176. if feed == nil || response == nil || pktPtr == nil {
  177. return nil, errors.New("Invalid inputs")
  178. }
  179. // Get Stream from negotiator map
  180. vbno := vbOpaque(response.Opaque)
  181. opaque := appOpaque(response.Opaque)
  182. stream, err := negotiator.getStreamFromMap(vbno, opaque)
  183. if err != nil {
  184. err = fmt.Errorf("Stream not found for vb %d: %#v", vbno, *pktPtr)
  185. logging.Errorf(err.Error())
  186. return nil, err
  187. }
  188. status, rb, flog, err := handleStreamRequest(response, headerBuf[:])
  189. if status == gomemcached.ROLLBACK {
  190. event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
  191. event.Status = status
  192. // rollback stream
  193. logging.Infof("UPR_STREAMREQ with rollback %d for vb %d Failed: %v", rb, vbno, err)
  194. negotiator.deleteStreamFromMap(vbno, opaque)
  195. } else if status == gomemcached.SUCCESS {
  196. event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
  197. event.Seqno = stream.StartSeq
  198. event.FailoverLog = flog
  199. event.Status = status
  200. feed.activateStream(vbno, opaque, stream)
  201. feed.negotiator.deleteStreamFromMap(vbno, opaque)
  202. logging.Infof("UPR_STREAMREQ for vb %d successful", vbno)
  203. } else if err != nil {
  204. logging.Errorf("UPR_STREAMREQ for vbucket %d erro %s", vbno, err.Error())
  205. event = &UprEvent{
  206. Opcode: gomemcached.UPR_STREAMREQ,
  207. Status: status,
  208. VBucket: vbno,
  209. Error: err,
  210. }
  211. negotiator.deleteStreamFromMap(vbno, opaque)
  212. }
  213. return event, nil
  214. }
  215. func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) {
  216. negotiator.mutex.Lock()
  217. defer negotiator.mutex.Unlock()
  218. delete(negotiator.vbHandshakeMap, vbno)
  219. }
  220. // UprFeed represents an UPR feed. A feed contains a connection to a single
  221. // host and multiple vBuckets
  222. type UprFeed struct {
  223. // lock for feed.vbstreams
  224. muVbstreams sync.RWMutex
  225. C <-chan *UprEvent // Exported channel for receiving UPR events
  226. negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation
  227. vbstreams map[uint16]*UprStream // official live vb->stream mapping
  228. closer chan bool // closer
  229. conn *Client // connection to UPR producer
  230. Error error // error
  231. bytesRead uint64 // total bytes read on this connection
  232. toAckBytes uint32 // bytes client has read
  233. maxAckBytes uint32 // Max buffer control ack bytes
  234. stats UprStats // Stats for upr client
  235. transmitCh chan *gomemcached.MCRequest // transmit command channel
  236. transmitCl chan bool // closer channel for transmit go-routine
  237. // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP
  238. // if flag is false, upr feed will track how many bytes it has sent to client
  239. // and use that to determine whether/when to send ack to DCP
  240. ackByClient bool
  241. feedState FeedState
  242. muFeedState sync.RWMutex
  243. }
  244. // Exported interface - to allow for mocking
  245. type UprFeedIface interface {
  246. Close()
  247. Closed() bool
  248. CloseStream(vbno, opaqueMSB uint16) error
  249. GetError() error
  250. GetUprStats() *UprStats
  251. ClientAck(event *UprEvent) error
  252. GetUprEventCh() <-chan *UprEvent
  253. StartFeed() error
  254. StartFeedWithConfig(datachan_len int) error
  255. UprOpen(name string, sequence uint32, bufSize uint32) error
  256. UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
  257. UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
  258. UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
  259. // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response
  260. SetPriorityAsync(p PriorityType) error
  261. }
  262. type UprStats struct {
  263. TotalBytes uint64
  264. TotalMutation uint64
  265. TotalBufferAckSent uint64
  266. TotalSnapShot uint64
  267. }
  268. // FailoverLog containing vvuid and sequnce number
  269. type FailoverLog [][2]uint64
  270. // error codes
  271. var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
  272. func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) {
  273. if flogp != nil {
  274. flog := *flogp
  275. latest := flog[len(flog)-1]
  276. return latest[0], latest[1], nil
  277. }
  278. return vbuuid, seqno, ErrorInvalidLog
  279. }
  280. func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
  281. event := &UprEvent{
  282. Opcode: rq.Opcode,
  283. VBucket: stream.Vbucket,
  284. VBuuid: stream.Vbuuid,
  285. Key: rq.Key,
  286. Value: rq.Body,
  287. Cas: rq.Cas,
  288. ExtMeta: rq.ExtMeta,
  289. DataType: rq.DataType,
  290. }
  291. // set AckSize for events that need to be acked to DCP,
  292. // i.e., events with CommandCodes that need to be buffered in DCP
  293. if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
  294. event.AckSize = uint32(bytesReceivedFromDCP)
  295. }
  296. // 16 LSBits are used by client library to encode vbucket number.
  297. // 16 MSBits are left for application to multiplex on opaque value.
  298. event.Opaque = appOpaque(rq.Opaque)
  299. if len(rq.Extras) >= uprMutationExtraLen &&
  300. event.Opcode == gomemcached.UPR_MUTATION {
  301. event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
  302. event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
  303. event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
  304. event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
  305. event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
  306. event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
  307. } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
  308. event.Opcode == gomemcached.UPR_DELETION {
  309. event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
  310. event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
  311. event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
  312. } else if len(rq.Extras) >= uprDeletetionExtraLen &&
  313. event.Opcode == gomemcached.UPR_DELETION ||
  314. event.Opcode == gomemcached.UPR_EXPIRATION {
  315. event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
  316. event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
  317. event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
  318. } else if len(rq.Extras) >= uprSnapshotExtraLen &&
  319. event.Opcode == gomemcached.UPR_SNAPSHOT {
  320. event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
  321. event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
  322. event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
  323. }
  324. return event
  325. }
  326. func (event *UprEvent) String() string {
  327. name := gomemcached.CommandNames[event.Opcode]
  328. if name == "" {
  329. name = fmt.Sprintf("#%d", event.Opcode)
  330. }
  331. return name
  332. }
  333. func (event *UprEvent) IsSnappyDataType() bool {
  334. return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
  335. }
  336. func (feed *UprFeed) sendCommands(mc *Client) {
  337. transmitCh := feed.transmitCh
  338. transmitCl := feed.transmitCl
  339. loop:
  340. for {
  341. select {
  342. case command := <-transmitCh:
  343. if err := mc.Transmit(command); err != nil {
  344. logging.Errorf("Failed to transmit command %s. Error %s", command.Opcode.String(), err.Error())
  345. // get feed to close and runFeed routine to exit
  346. feed.Close()
  347. break loop
  348. }
  349. case <-transmitCl:
  350. break loop
  351. }
  352. }
  353. // After sendCommands exits, write to transmitCh will block forever
  354. // when we write to transmitCh, e.g., at CloseStream(), we need to check feed closure to have an exit route
  355. logging.Infof("sendCommands exiting")
  356. }
  357. // Sets the specified stream as the connected stream for this vbno, and also cleans up negotiator
  358. func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) error {
  359. feed.muVbstreams.Lock()
  360. defer feed.muVbstreams.Unlock()
  361. // Set this stream as the officially connected stream for this vb
  362. stream.connected = true
  363. feed.vbstreams[vbno] = stream
  364. return nil
  365. }
  366. func (feed *UprFeed) cleanUpVbStream(vbno uint16) {
  367. feed.muVbstreams.Lock()
  368. defer feed.muVbstreams.Unlock()
  369. delete(feed.vbstreams, vbno)
  370. }
  371. // NewUprFeed creates a new UPR Feed.
  372. // TODO: Describe side-effects on bucket instance and its connection pool.
  373. func (mc *Client) NewUprFeed() (*UprFeed, error) {
  374. return mc.NewUprFeedWithConfig(false /*ackByClient*/)
  375. }
  376. func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) {
  377. feed := &UprFeed{
  378. conn: mc,
  379. closer: make(chan bool, 1),
  380. vbstreams: make(map[uint16]*UprStream),
  381. transmitCh: make(chan *gomemcached.MCRequest),
  382. transmitCl: make(chan bool),
  383. ackByClient: ackByClient,
  384. }
  385. feed.negotiator.initialize()
  386. go feed.sendCommands(mc)
  387. return feed, nil
  388. }
  389. func (mc *Client) NewUprFeedIface() (UprFeedIface, error) {
  390. return mc.NewUprFeed()
  391. }
  392. func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) {
  393. return mc.NewUprFeedWithConfig(ackByClient)
  394. }
  395. func doUprOpen(mc *Client, name string, sequence uint32, features UprFeatures) error {
  396. rq := &gomemcached.MCRequest{
  397. Opcode: gomemcached.UPR_OPEN,
  398. Key: []byte(name),
  399. Opaque: getUprOpenCtrlOpaque(),
  400. }
  401. rq.Extras = make([]byte, 8)
  402. binary.BigEndian.PutUint32(rq.Extras[:4], sequence)
  403. // opens a producer type connection
  404. flags := gomemcached.DCP_PRODUCER
  405. if features.Xattribute {
  406. flags = flags | gomemcached.DCP_OPEN_INCLUDE_XATTRS
  407. }
  408. if features.IncludeDeletionTime {
  409. flags = flags | gomemcached.DCP_OPEN_INCLUDE_DELETE_TIMES
  410. }
  411. binary.BigEndian.PutUint32(rq.Extras[4:], flags)
  412. return sendMcRequestSync(mc, rq)
  413. }
  414. // Synchronously send a memcached request and wait for the response
  415. func sendMcRequestSync(mc *Client, req *gomemcached.MCRequest) error {
  416. if err := mc.Transmit(req); err != nil {
  417. return err
  418. }
  419. if res, err := mc.Receive(); err != nil {
  420. return err
  421. } else if req.Opcode != res.Opcode {
  422. return fmt.Errorf("unexpected #opcode sent %v received %v", req.Opcode, res.Opaque)
  423. } else if req.Opaque != res.Opaque {
  424. return fmt.Errorf("opaque mismatch, sent %v received %v", req.Opaque, res.Opaque)
  425. } else if res.Status != gomemcached.SUCCESS {
  426. return fmt.Errorf("error %v", res.Status)
  427. }
  428. return nil
  429. }
  430. // UprOpen to connect with a UPR producer.
  431. // Name: name of te UPR connection
  432. // sequence: sequence number for the connection
  433. // bufsize: max size of the application
  434. func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error {
  435. var allFeaturesDisabled UprFeatures
  436. err, _ := feed.uprOpen(name, sequence, bufSize, allFeaturesDisabled)
  437. return err
  438. }
  439. // UprOpen with XATTR enabled.
  440. func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error {
  441. var onlyXattrEnabled UprFeatures
  442. onlyXattrEnabled.Xattribute = true
  443. err, _ := feed.uprOpen(name, sequence, bufSize, onlyXattrEnabled)
  444. return err
  445. }
  446. func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) {
  447. return feed.uprOpen(name, sequence, bufSize, features)
  448. }
  449. func (feed *UprFeed) SetPriorityAsync(p PriorityType) error {
  450. if !feed.isOpen() {
  451. // do not send this command if upr feed is not yet open, otherwise it may interfere with
  452. // feed start up process, which relies on synchronous message exchange with DCP.
  453. return fmt.Errorf("Upr feed is not open. State=%v", feed.getState())
  454. }
  455. return feed.setPriority(p, false /*sync*/)
  456. }
  457. func (feed *UprFeed) setPriority(p PriorityType, sync bool) error {
  458. rq := &gomemcached.MCRequest{
  459. Opcode: gomemcached.UPR_CONTROL,
  460. Key: []byte("set_priority"),
  461. Body: []byte(p),
  462. Opaque: getUprOpenCtrlOpaque(),
  463. }
  464. if sync {
  465. return sendMcRequestSync(feed.conn, rq)
  466. } else {
  467. return feed.writeToTransmitCh(rq)
  468. }
  469. }
  470. func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) {
  471. mc := feed.conn
  472. // First set this to an invalid value to state that the method hasn't gotten to executing this control yet
  473. activatedFeatures.CompressionType = CompressionTypeEndMarker
  474. if err = doUprOpen(mc, name, sequence, features); err != nil {
  475. return
  476. }
  477. activatedFeatures.Xattribute = features.Xattribute
  478. // send a UPR control message to set the window size for the this connection
  479. if bufSize > 0 {
  480. rq := &gomemcached.MCRequest{
  481. Opcode: gomemcached.UPR_CONTROL,
  482. Key: []byte("connection_buffer_size"),
  483. Body: []byte(strconv.Itoa(int(bufSize))),
  484. Opaque: getUprOpenCtrlOpaque(),
  485. }
  486. err = sendMcRequestSync(feed.conn, rq)
  487. if err != nil {
  488. return
  489. }
  490. feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufSize))
  491. }
  492. // enable noop and set noop interval
  493. rq := &gomemcached.MCRequest{
  494. Opcode: gomemcached.UPR_CONTROL,
  495. Key: []byte("enable_noop"),
  496. Body: []byte("true"),
  497. Opaque: getUprOpenCtrlOpaque(),
  498. }
  499. err = sendMcRequestSync(feed.conn, rq)
  500. if err != nil {
  501. return
  502. }
  503. rq = &gomemcached.MCRequest{
  504. Opcode: gomemcached.UPR_CONTROL,
  505. Key: []byte("set_noop_interval"),
  506. Body: []byte(strconv.Itoa(int(uprDefaultNoopInterval))),
  507. Opaque: getUprOpenCtrlOpaque(),
  508. }
  509. err = sendMcRequestSync(feed.conn, rq)
  510. if err != nil {
  511. return
  512. }
  513. if features.CompressionType == CompressionTypeSnappy {
  514. activatedFeatures.CompressionType = CompressionTypeNone
  515. rq = &gomemcached.MCRequest{
  516. Opcode: gomemcached.UPR_CONTROL,
  517. Key: []byte("force_value_compression"),
  518. Body: []byte("true"),
  519. Opaque: getUprOpenCtrlOpaque(),
  520. }
  521. err = sendMcRequestSync(feed.conn, rq)
  522. } else if features.CompressionType == CompressionTypeEndMarker {
  523. err = fmt.Errorf("UPR_CONTROL Failed - Invalid CompressionType: %v", features.CompressionType)
  524. }
  525. if err != nil {
  526. return
  527. }
  528. activatedFeatures.CompressionType = features.CompressionType
  529. if features.DcpPriority != PriorityDisabled {
  530. err = feed.setPriority(features.DcpPriority, true /*sync*/)
  531. if err == nil {
  532. activatedFeatures.DcpPriority = features.DcpPriority
  533. } else {
  534. return
  535. }
  536. }
  537. if features.EnableExpiry {
  538. rq := &gomemcached.MCRequest{
  539. Opcode: gomemcached.UPR_CONTROL,
  540. Key: []byte("enable_expiry_opcode"),
  541. Body: []byte("true"),
  542. Opaque: getUprOpenCtrlOpaque(),
  543. }
  544. err = sendMcRequestSync(feed.conn, rq)
  545. if err != nil {
  546. return
  547. }
  548. activatedFeatures.EnableExpiry = true
  549. }
  550. // everything is ok so far, set upr feed to open state
  551. feed.setOpen()
  552. return
  553. }
  554. // UprGetFailoverLog for given list of vbuckets.
  555. func (mc *Client) UprGetFailoverLog(
  556. vb []uint16) (map[uint16]*FailoverLog, error) {
  557. rq := &gomemcached.MCRequest{
  558. Opcode: gomemcached.UPR_FAILOVERLOG,
  559. Opaque: opaqueFailover,
  560. }
  561. var allFeaturesDisabled UprFeatures
  562. if err := doUprOpen(mc, "FailoverLog", 0, allFeaturesDisabled); err != nil {
  563. return nil, fmt.Errorf("UPR_OPEN Failed %s", err.Error())
  564. }
  565. failoverLogs := make(map[uint16]*FailoverLog)
  566. for _, vBucket := range vb {
  567. rq.VBucket = vBucket
  568. if err := mc.Transmit(rq); err != nil {
  569. return nil, err
  570. }
  571. res, err := mc.Receive()
  572. if err != nil {
  573. return nil, fmt.Errorf("failed to receive %s", err.Error())
  574. } else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
  575. return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
  576. }
  577. flog, err := parseFailoverLog(res.Body)
  578. if err != nil {
  579. return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
  580. }
  581. failoverLogs[vBucket] = flog
  582. }
  583. return failoverLogs, nil
  584. }
  585. // UprRequestStream for a single vbucket.
  586. func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
  587. vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
  588. rq := &gomemcached.MCRequest{
  589. Opcode: gomemcached.UPR_STREAMREQ,
  590. VBucket: vbno,
  591. Opaque: composeOpaque(vbno, opaqueMSB),
  592. }
  593. rq.Extras = make([]byte, 48) // #Extras
  594. binary.BigEndian.PutUint32(rq.Extras[:4], flags)
  595. binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
  596. binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
  597. binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
  598. binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid)
  599. binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
  600. binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
  601. feed.negotiator.registerRequest(vbno, opaqueMSB, vuuid, startSequence, endSequence)
  602. // Any client that has ever called this method, regardless of return code,
  603. // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit.
  604. if err := feed.conn.Transmit(rq); err != nil {
  605. logging.Errorf("Error in StreamRequest %s", err.Error())
  606. // If an error occurs during transmit, then the UPRFeed will keep the stream
  607. // in the vbstreams map. This is to prevent nil lookup from any previously
  608. // sent stream requests.
  609. return err
  610. }
  611. return nil
  612. }
  613. // CloseStream for specified vbucket.
  614. func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error {
  615. err := feed.validateCloseStream(vbno)
  616. if err != nil {
  617. logging.Infof("CloseStream for %v has been skipped because of error %v", vbno, err)
  618. return err
  619. }
  620. closeStream := &gomemcached.MCRequest{
  621. Opcode: gomemcached.UPR_CLOSESTREAM,
  622. VBucket: vbno,
  623. Opaque: composeOpaque(vbno, opaqueMSB),
  624. }
  625. feed.writeToTransmitCh(closeStream)
  626. return nil
  627. }
  628. func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent {
  629. return feed.C
  630. }
  631. func (feed *UprFeed) GetError() error {
  632. return feed.Error
  633. }
  634. func (feed *UprFeed) validateCloseStream(vbno uint16) error {
  635. feed.muVbstreams.RLock()
  636. nilVbStream := feed.vbstreams[vbno] == nil
  637. feed.muVbstreams.RUnlock()
  638. if nilVbStream && (feed.negotiator.getStreamsCntFromMap(vbno) == 0) {
  639. return fmt.Errorf("Stream for vb %d has not been requested", vbno)
  640. }
  641. return nil
  642. }
  643. func (feed *UprFeed) writeToTransmitCh(rq *gomemcached.MCRequest) error {
  644. // write to transmitCh may block forever if sendCommands has exited
  645. // check for feed closure to have an exit route in this case
  646. select {
  647. case <-feed.closer:
  648. errMsg := fmt.Sprintf("Abort sending request to transmitCh because feed has been closed. request=%v", rq)
  649. logging.Infof(errMsg)
  650. return errors.New(errMsg)
  651. case feed.transmitCh <- rq:
  652. }
  653. return nil
  654. }
  655. // StartFeed to start the upper feed.
  656. func (feed *UprFeed) StartFeed() error {
  657. return feed.StartFeedWithConfig(10)
  658. }
  659. func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error {
  660. ch := make(chan *UprEvent, datachan_len)
  661. feed.C = ch
  662. go feed.runFeed(ch)
  663. return nil
  664. }
  665. func parseFailoverLog(body []byte) (*FailoverLog, error) {
  666. if len(body)%16 != 0 {
  667. err := fmt.Errorf("invalid body length %v, in failover-log", len(body))
  668. return nil, err
  669. }
  670. log := make(FailoverLog, len(body)/16)
  671. for i, j := 0, 0; i < len(body); i += 16 {
  672. vuuid := binary.BigEndian.Uint64(body[i : i+8])
  673. seqno := binary.BigEndian.Uint64(body[i+8 : i+16])
  674. log[j] = [2]uint64{vuuid, seqno}
  675. j++
  676. }
  677. return &log, nil
  678. }
  679. func handleStreamRequest(
  680. res *gomemcached.MCResponse,
  681. headerBuf []byte,
  682. ) (gomemcached.Status, uint64, *FailoverLog, error) {
  683. var rollback uint64
  684. var err error
  685. switch {
  686. case res.Status == gomemcached.ROLLBACK:
  687. logging.Infof("Rollback response. body=%v, headerBuf=%v\n", res.Body, headerBuf)
  688. rollback = binary.BigEndian.Uint64(res.Body)
  689. logging.Infof("Rollback seqno is %v for response with opaque %v\n", rollback, res.Opaque)
  690. return res.Status, rollback, nil, nil
  691. case res.Status != gomemcached.SUCCESS:
  692. err = fmt.Errorf("unexpected status %v for response with opaque %v", res.Status, res.Opaque)
  693. return res.Status, 0, nil, err
  694. }
  695. flog, err := parseFailoverLog(res.Body[:])
  696. return res.Status, rollback, flog, err
  697. }
  698. // generate stream end responses for all active vb streams
  699. func (feed *UprFeed) doStreamClose(ch chan *UprEvent) {
  700. feed.muVbstreams.RLock()
  701. uprEvents := make([]*UprEvent, len(feed.vbstreams))
  702. index := 0
  703. for vbno, stream := range feed.vbstreams {
  704. uprEvent := &UprEvent{
  705. VBucket: vbno,
  706. VBuuid: stream.Vbuuid,
  707. Opcode: gomemcached.UPR_STREAMEND,
  708. }
  709. uprEvents[index] = uprEvent
  710. index++
  711. }
  712. // release the lock before sending uprEvents to ch, which may block
  713. feed.muVbstreams.RUnlock()
  714. loop:
  715. for _, uprEvent := range uprEvents {
  716. select {
  717. case ch <- uprEvent:
  718. case <-feed.closer:
  719. logging.Infof("Feed has been closed. Aborting doStreamClose.")
  720. break loop
  721. }
  722. }
  723. }
  724. func (feed *UprFeed) runFeed(ch chan *UprEvent) {
  725. defer close(ch)
  726. var headerBuf [gomemcached.HDR_LEN]byte
  727. var pkt gomemcached.MCRequest
  728. var event *UprEvent
  729. mc := feed.conn.Hijack()
  730. uprStats := &feed.stats
  731. loop:
  732. for {
  733. select {
  734. case <-feed.closer:
  735. logging.Infof("Feed has been closed. Exiting.")
  736. break loop
  737. default:
  738. bytes, err := pkt.Receive(mc, headerBuf[:])
  739. if err != nil {
  740. logging.Errorf("Error in receive %s", err.Error())
  741. feed.Error = err
  742. // send all the stream close messages to the client
  743. feed.doStreamClose(ch)
  744. break loop
  745. } else {
  746. event = nil
  747. res := &gomemcached.MCResponse{
  748. Opcode: pkt.Opcode,
  749. Cas: pkt.Cas,
  750. Opaque: pkt.Opaque,
  751. Status: gomemcached.Status(pkt.VBucket),
  752. Extras: pkt.Extras,
  753. Key: pkt.Key,
  754. Body: pkt.Body,
  755. }
  756. vb := vbOpaque(pkt.Opaque)
  757. appOpaque := appOpaque(pkt.Opaque)
  758. uprStats.TotalBytes = uint64(bytes)
  759. feed.muVbstreams.RLock()
  760. stream := feed.vbstreams[vb]
  761. feed.muVbstreams.RUnlock()
  762. switch pkt.Opcode {
  763. case gomemcached.UPR_STREAMREQ:
  764. event, err = feed.negotiator.handleStreamRequest(feed, headerBuf, &pkt, bytes, res)
  765. if err != nil {
  766. logging.Infof(err.Error())
  767. break loop
  768. }
  769. case gomemcached.UPR_MUTATION,
  770. gomemcached.UPR_DELETION,
  771. gomemcached.UPR_EXPIRATION:
  772. if stream == nil {
  773. logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
  774. break loop
  775. }
  776. event = makeUprEvent(pkt, stream, bytes)
  777. uprStats.TotalMutation++
  778. case gomemcached.UPR_STREAMEND:
  779. if stream == nil {
  780. logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
  781. break loop
  782. }
  783. //stream has ended
  784. event = makeUprEvent(pkt, stream, bytes)
  785. logging.Infof("Stream Ended for vb %d", vb)
  786. feed.negotiator.deleteStreamFromMap(vb, appOpaque)
  787. feed.cleanUpVbStream(vb)
  788. case gomemcached.UPR_SNAPSHOT:
  789. if stream == nil {
  790. logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
  791. break loop
  792. }
  793. // snapshot marker
  794. event = makeUprEvent(pkt, stream, bytes)
  795. uprStats.TotalSnapShot++
  796. case gomemcached.UPR_FLUSH:
  797. if stream == nil {
  798. logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
  799. break loop
  800. }
  801. // special processing for flush ?
  802. event = makeUprEvent(pkt, stream, bytes)
  803. case gomemcached.UPR_CLOSESTREAM:
  804. if stream == nil {
  805. logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
  806. break loop
  807. }
  808. event = makeUprEvent(pkt, stream, bytes)
  809. event.Opcode = gomemcached.UPR_STREAMEND // opcode re-write !!
  810. logging.Infof("Stream Closed for vb %d StreamEnd simulated", vb)
  811. feed.negotiator.deleteStreamFromMap(vb, appOpaque)
  812. feed.cleanUpVbStream(vb)
  813. case gomemcached.UPR_ADDSTREAM:
  814. logging.Infof("Opcode %v not implemented", pkt.Opcode)
  815. case gomemcached.UPR_CONTROL, gomemcached.UPR_BUFFERACK:
  816. if res.Status != gomemcached.SUCCESS {
  817. logging.Infof("Opcode %v received status %d", pkt.Opcode.String(), res.Status)
  818. }
  819. case gomemcached.UPR_NOOP:
  820. // send a NOOP back
  821. noop := &gomemcached.MCResponse{
  822. Opcode: gomemcached.UPR_NOOP,
  823. Opaque: pkt.Opaque,
  824. }
  825. if err := feed.conn.TransmitResponse(noop); err != nil {
  826. logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error())
  827. }
  828. default:
  829. logging.Infof("Recived an unknown response for vbucket %d", vb)
  830. }
  831. }
  832. if event != nil {
  833. select {
  834. case ch <- event:
  835. case <-feed.closer:
  836. logging.Infof("Feed has been closed. Skip sending events. Exiting.")
  837. break loop
  838. }
  839. feed.muVbstreams.RLock()
  840. l := len(feed.vbstreams)
  841. feed.muVbstreams.RUnlock()
  842. if event.Opcode == gomemcached.UPR_CLOSESTREAM && l == 0 {
  843. logging.Infof("No more streams")
  844. }
  845. }
  846. if !feed.ackByClient {
  847. // if client does not ack, do the ack check now
  848. feed.sendBufferAckIfNeeded(event)
  849. }
  850. }
  851. }
  852. // make sure that feed is closed before we signal transmitCl and exit runFeed
  853. feed.Close()
  854. close(feed.transmitCl)
  855. logging.Infof("runFeed exiting")
  856. }
  857. // Client, after completing processing of an UprEvent, need to call this API to notify UprFeed,
  858. // so that UprFeed can update its ack bytes stats and send ack to DCP if needed
  859. // Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work
  860. // This API is not thread safe. Caller should NOT have more than one go rountine calling this API
  861. func (feed *UprFeed) ClientAck(event *UprEvent) error {
  862. if !feed.ackByClient {
  863. return errors.New("Upr feed does not have ackByclient flag set")
  864. }
  865. feed.sendBufferAckIfNeeded(event)
  866. return nil
  867. }
  868. // increment ack bytes if the event needs to be acked to DCP
  869. // send buffer ack if enough ack bytes have been accumulated
  870. func (feed *UprFeed) sendBufferAckIfNeeded(event *UprEvent) {
  871. if event == nil || event.AckSize == 0 {
  872. // this indicates that there is no need to ack to DCP
  873. return
  874. }
  875. totalBytes := feed.toAckBytes + event.AckSize
  876. if totalBytes > feed.maxAckBytes {
  877. feed.toAckBytes = 0
  878. feed.sendBufferAck(totalBytes)
  879. } else {
  880. feed.toAckBytes = totalBytes
  881. }
  882. }
  883. // send buffer ack to dcp
  884. func (feed *UprFeed) sendBufferAck(sendSize uint32) {
  885. bufferAck := &gomemcached.MCRequest{
  886. Opcode: gomemcached.UPR_BUFFERACK,
  887. }
  888. bufferAck.Extras = make([]byte, 4)
  889. binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(sendSize))
  890. feed.writeToTransmitCh(bufferAck)
  891. feed.stats.TotalBufferAckSent++
  892. }
  893. func (feed *UprFeed) GetUprStats() *UprStats {
  894. return &feed.stats
  895. }
  896. func composeOpaque(vbno, opaqueMSB uint16) uint32 {
  897. return (uint32(opaqueMSB) << 16) | uint32(vbno)
  898. }
  899. func getUprOpenCtrlOpaque() uint32 {
  900. return atomic.AddUint32(&opaqueOpenCtrlWell, 1)
  901. }
  902. func appOpaque(opq32 uint32) uint16 {
  903. return uint16((opq32 & 0xFFFF0000) >> 16)
  904. }
  905. func vbOpaque(opq32 uint32) uint16 {
  906. return uint16(opq32 & 0xFFFF)
  907. }
  908. // Close this UprFeed.
  909. func (feed *UprFeed) Close() {
  910. feed.muFeedState.Lock()
  911. defer feed.muFeedState.Unlock()
  912. if feed.feedState != FeedStateClosed {
  913. close(feed.closer)
  914. feed.feedState = FeedStateClosed
  915. feed.negotiator.initialize()
  916. }
  917. }
  918. // check if the UprFeed has been closed
  919. func (feed *UprFeed) Closed() bool {
  920. feed.muFeedState.RLock()
  921. defer feed.muFeedState.RUnlock()
  922. return feed.feedState == FeedStateClosed
  923. }
  924. // set upr feed to opened state after initialization is done
  925. func (feed *UprFeed) setOpen() {
  926. feed.muFeedState.Lock()
  927. defer feed.muFeedState.Unlock()
  928. feed.feedState = FeedStateOpened
  929. }
  930. func (feed *UprFeed) isOpen() bool {
  931. feed.muFeedState.RLock()
  932. defer feed.muFeedState.RUnlock()
  933. return feed.feedState == FeedStateOpened
  934. }
  935. func (feed *UprFeed) getState() FeedState {
  936. feed.muFeedState.RLock()
  937. defer feed.muFeedState.RUnlock()
  938. return feed.feedState
  939. }