summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/couchbase/gomemcached/client/tap_feed.go')
-rw-r--r--vendor/github.com/couchbase/gomemcached/client/tap_feed.go333
1 files changed, 333 insertions, 0 deletions
diff --git a/vendor/github.com/couchbase/gomemcached/client/tap_feed.go b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
new file mode 100644
index 0000000000..fd628c5de2
--- /dev/null
+++ b/vendor/github.com/couchbase/gomemcached/client/tap_feed.go
@@ -0,0 +1,333 @@
+package memcached
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/couchbase/gomemcached"
+ "github.com/couchbase/goutils/logging"
+)
+
+// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol>
+
+// TapOpcode is the tap operation type (found in TapEvent)
+type TapOpcode uint8
+
+// Tap opcode values.
+const (
+ TapBeginBackfill = TapOpcode(iota)
+ TapEndBackfill
+ TapMutation
+ TapDeletion
+ TapCheckpointStart
+ TapCheckpointEnd
+ tapEndStream
+)
+
+const tapMutationExtraLen = 16
+
+var tapOpcodeNames map[TapOpcode]string
+
+func init() {
+ tapOpcodeNames = map[TapOpcode]string{
+ TapBeginBackfill: "BeginBackfill",
+ TapEndBackfill: "EndBackfill",
+ TapMutation: "Mutation",
+ TapDeletion: "Deletion",
+ TapCheckpointStart: "TapCheckpointStart",
+ TapCheckpointEnd: "TapCheckpointEnd",
+ tapEndStream: "EndStream",
+ }
+}
+
+func (opcode TapOpcode) String() string {
+ name := tapOpcodeNames[opcode]
+ if name == "" {
+ name = fmt.Sprintf("#%d", opcode)
+ }
+ return name
+}
+
+// TapEvent is a TAP notification of an operation on the server.
+type TapEvent struct {
+ Opcode TapOpcode // Type of event
+ VBucket uint16 // VBucket this event applies to
+ Flags uint32 // Item flags
+ Expiry uint32 // Item expiration time
+ Key, Value []byte // Item key/value
+ Cas uint64
+}
+
+func makeTapEvent(req gomemcached.MCRequest) *TapEvent {
+ event := TapEvent{
+ VBucket: req.VBucket,
+ }
+ switch req.Opcode {
+ case gomemcached.TAP_MUTATION:
+ event.Opcode = TapMutation
+ event.Key = req.Key
+ event.Value = req.Body
+ event.Cas = req.Cas
+ case gomemcached.TAP_DELETE:
+ event.Opcode = TapDeletion
+ event.Key = req.Key
+ event.Cas = req.Cas
+ case gomemcached.TAP_CHECKPOINT_START:
+ event.Opcode = TapCheckpointStart
+ case gomemcached.TAP_CHECKPOINT_END:
+ event.Opcode = TapCheckpointEnd
+ case gomemcached.TAP_OPAQUE:
+ if len(req.Extras) < 8+4 {
+ return nil
+ }
+ switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op {
+ case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
+ event.Opcode = TapBeginBackfill
+ case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL:
+ event.Opcode = TapEndBackfill
+ case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM:
+ event.Opcode = tapEndStream
+ case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK:
+ return nil
+ case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
+ return nil
+ default:
+ logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op)
+ return nil // unknown opaque event
+ }
+ case gomemcached.NOOP:
+ return nil // ignore
+ default:
+ logging.Infof("TapFeed: Ignoring %s", req.Opcode)
+ return nil // unknown event
+ }
+
+ if len(req.Extras) >= tapMutationExtraLen &&
+ (event.Opcode == TapMutation || event.Opcode == TapDeletion) {
+
+ event.Flags = binary.BigEndian.Uint32(req.Extras[8:])
+ event.Expiry = binary.BigEndian.Uint32(req.Extras[12:])
+ }
+
+ return &event
+}
+
+func (event TapEvent) String() string {
+ switch event.Opcode {
+ case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd:
+ return fmt.Sprintf("<TapEvent %s, vbucket=%d>",
+ event.Opcode, event.VBucket)
+ default:
+ return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>",
+ event.Opcode, event.Key, len(event.Value),
+ event.Flags, event.Expiry)
+ }
+}
+
+// TapArguments are parameters for requesting a TAP feed.
+//
+// Call DefaultTapArguments to get a default one.
+type TapArguments struct {
+ // Timestamp of oldest item to send.
+ //
+ // Use TapNoBackfill to suppress all past items.
+ Backfill uint64
+ // If set, server will disconnect after sending existing items.
+ Dump bool
+ // The indices of the vbuckets to watch; empty/nil to watch all.
+ VBuckets []uint16
+ // Transfers ownership of vbuckets during cluster rebalance.
+ Takeover bool
+ // If true, server will wait for client ACK after every notification.
+ SupportAck bool
+ // If true, client doesn't want values so server shouldn't send them.
+ KeysOnly bool
+ // If true, client wants the server to send checkpoint events.
+ Checkpoint bool
+ // Optional identifier to use for this client, to allow reconnects
+ ClientName string
+ // Registers this client (by name) till explicitly deregistered.
+ RegisteredClient bool
+}
+
+// Value for TapArguments.Backfill denoting that no past events at all
+// should be sent.
+const TapNoBackfill = math.MaxUint64
+
+// DefaultTapArguments returns a default set of parameter values to
+// pass to StartTapFeed.
+func DefaultTapArguments() TapArguments {
+ return TapArguments{
+ Backfill: TapNoBackfill,
+ }
+}
+
+func (args *TapArguments) flags() []byte {
+ var flags gomemcached.TapConnectFlag
+ if args.Backfill != 0 {
+ flags |= gomemcached.BACKFILL
+ }
+ if args.Dump {
+ flags |= gomemcached.DUMP
+ }
+ if len(args.VBuckets) > 0 {
+ flags |= gomemcached.LIST_VBUCKETS
+ }
+ if args.Takeover {
+ flags |= gomemcached.TAKEOVER_VBUCKETS
+ }
+ if args.SupportAck {
+ flags |= gomemcached.SUPPORT_ACK
+ }
+ if args.KeysOnly {
+ flags |= gomemcached.REQUEST_KEYS_ONLY
+ }
+ if args.Checkpoint {
+ flags |= gomemcached.CHECKPOINT
+ }
+ if args.RegisteredClient {
+ flags |= gomemcached.REGISTERED_CLIENT
+ }
+ encoded := make([]byte, 4)
+ binary.BigEndian.PutUint32(encoded, uint32(flags))
+ return encoded
+}
+
+func must(err error) {
+ if err != nil {
+ panic(err)
+ }
+}
+
+func (args *TapArguments) bytes() (rv []byte) {
+ buf := bytes.NewBuffer([]byte{})
+
+ if args.Backfill > 0 {
+ must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill)))
+ }
+
+ if len(args.VBuckets) > 0 {
+ must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets))))
+ for i := 0; i < len(args.VBuckets); i++ {
+ must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i])))
+ }
+ }
+ return buf.Bytes()
+}
+
+// TapFeed represents a stream of events from a server.
+type TapFeed struct {
+ C <-chan TapEvent
+ Error error
+ closer chan bool
+}
+
+// StartTapFeed starts a TAP feed on a client connection.
+//
+// The events can be read from the returned channel. The connection
+// can no longer be used for other purposes; it's now reserved for
+// receiving the TAP messages. To stop receiving events, close the
+// client connection.
+func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) {
+ rq := &gomemcached.MCRequest{
+ Opcode: gomemcached.TAP_CONNECT,
+ Key: []byte(args.ClientName),
+ Extras: args.flags(),
+ Body: args.bytes()}
+
+ err := mc.Transmit(rq)
+ if err != nil {
+ return nil, err
+ }
+
+ ch := make(chan TapEvent)
+ feed := &TapFeed{
+ C: ch,
+ closer: make(chan bool),
+ }
+ go mc.runFeed(ch, feed)
+ return feed, nil
+}
+
+// TapRecvHook is called after every incoming tap packet is received.
+var TapRecvHook func(*gomemcached.MCRequest, int, error)
+
+// Internal goroutine that reads from the socket and writes events to
+// the channel
+func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) {
+ defer close(ch)
+ var headerBuf [gomemcached.HDR_LEN]byte
+loop:
+ for {
+ // Read the next request from the server.
+ //
+ // (Can't call mc.Receive() because it reads a
+ // _response_ not a request.)
+ var pkt gomemcached.MCRequest
+ n, err := pkt.Receive(mc.conn, headerBuf[:])
+ if TapRecvHook != nil {
+ TapRecvHook(&pkt, n, err)
+ }
+
+ if err != nil {
+ if err != io.EOF {
+ feed.Error = err
+ }
+ break loop
+ }
+
+ //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body)
+
+ if pkt.Opcode == gomemcached.TAP_CONNECT {
+ // This is not an event from the server; it's
+ // an error response to my connect request.
+ feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body)
+ break loop
+ }
+
+ event := makeTapEvent(pkt)
+ if event != nil {
+ if event.Opcode == tapEndStream {
+ break loop
+ }
+
+ select {
+ case ch <- *event:
+ case <-feed.closer:
+ break loop
+ }
+ }
+
+ if len(pkt.Extras) >= 4 {
+ reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:])
+ if reqFlags&gomemcached.TAP_ACK != 0 {
+ if _, err := mc.sendAck(&pkt); err != nil {
+ feed.Error = err
+ break loop
+ }
+ }
+ }
+ }
+ if err := mc.Close(); err != nil {
+ logging.Errorf("Error closing memcached client: %v", err)
+ }
+}
+
+func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) {
+ res := gomemcached.MCResponse{
+ Opcode: pkt.Opcode,
+ Opaque: pkt.Opaque,
+ Status: gomemcached.SUCCESS,
+ }
+ return res.Transmit(mc.conn)
+}
+
+// Close terminates a TapFeed.
+//
+// Call this if you stop using a TapFeed before its channel ends.
+func (feed *TapFeed) Close() {
+ close(feed.closer)
+}