diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/pubsub.go | 473 |
1 files changed, 473 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go new file mode 100644 index 0000000000..0afb47cda3 --- /dev/null +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -0,0 +1,473 @@ +package redis + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" +) + +var errPingTimeout = errors.New("redis: ping timeout") + +// PubSub implements Pub/Sub commands bas described in +// http://redis.io/topics/pubsub. Message receiving is NOT safe +// for concurrent use by multiple goroutines. +// +// PubSub automatically reconnects to Redis Server and resubscribes +// to the channels in case of network errors. +type PubSub struct { + opt *Options + + newConn func([]string) (*pool.Conn, error) + closeConn func(*pool.Conn) error + + mu sync.Mutex + cn *pool.Conn + channels map[string]struct{} + patterns map[string]struct{} + closed bool + exit chan struct{} + + cmd *Cmd + + chOnce sync.Once + ch chan *Message + ping chan struct{} +} + +func (c *PubSub) init() { + c.exit = make(chan struct{}) +} + +func (c *PubSub) conn() (*pool.Conn, error) { + c.mu.Lock() + cn, err := c._conn(nil) + c.mu.Unlock() + return cn, err +} + +func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { + if c.closed { + return nil, pool.ErrClosed + } + if c.cn != nil { + return c.cn, nil + } + + channels := mapKeys(c.channels) + channels = append(channels, newChannels...) + + cn, err := c.newConn(channels) + if err != nil { + return nil, err + } + + if err := c.resubscribe(cn); err != nil { + _ = c.closeConn(cn) + return nil, err + } + + c.cn = cn + return cn, nil +} + +func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error { + return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) +} + +func (c *PubSub) resubscribe(cn *pool.Conn) error { + var firstErr error + + if len(c.channels) > 0 { + err := c._subscribe(cn, "subscribe", mapKeys(c.channels)) + if err != nil && firstErr == nil { + firstErr = err + } + } + + if len(c.patterns) > 0 { + err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns)) + if err != nil && firstErr == nil { + firstErr = err + } + } + + return firstErr +} + +func mapKeys(m map[string]struct{}) []string { + s := make([]string, len(m)) + i := 0 + for k := range m { + s[i] = k + i++ + } + return s +} + +func (c *PubSub) _subscribe( + cn *pool.Conn, redisCmd string, channels []string, +) error { + args := make([]interface{}, 0, 1+len(channels)) + args = append(args, redisCmd) + for _, channel := range channels { + args = append(args, channel) + } + cmd := NewSliceCmd(args...) + return c.writeCmd(cn, cmd) +} + +func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) { + c.mu.Lock() + c._releaseConn(cn, err, allowTimeout) + c.mu.Unlock() +} + +func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) { + if c.cn != cn { + return + } + if internal.IsBadConn(err, allowTimeout) { + c._reconnect(err) + } +} + +func (c *PubSub) _reconnect(reason error) { + _ = c._closeTheCn(reason) + _, _ = c._conn(nil) +} + +func (c *PubSub) _closeTheCn(reason error) error { + if c.cn == nil { + return nil + } + if !c.closed { + internal.Logf("redis: discarding bad PubSub connection: %s", reason) + } + err := c.closeConn(c.cn) + c.cn = nil + return err +} + +func (c *PubSub) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return pool.ErrClosed + } + c.closed = true + close(c.exit) + + err := c._closeTheCn(pool.ErrClosed) + return err +} + +// Subscribe the client to the specified channels. It returns +// empty subscription if there are no channels. +func (c *PubSub) Subscribe(channels ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + err := c.subscribe("subscribe", channels...) + if c.channels == nil { + c.channels = make(map[string]struct{}) + } + for _, s := range channels { + c.channels[s] = struct{}{} + } + return err +} + +// PSubscribe the client to the given patterns. It returns +// empty subscription if there are no patterns. +func (c *PubSub) PSubscribe(patterns ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + err := c.subscribe("psubscribe", patterns...) + if c.patterns == nil { + c.patterns = make(map[string]struct{}) + } + for _, s := range patterns { + c.patterns[s] = struct{}{} + } + return err +} + +// Unsubscribe the client from the given channels, or from all of +// them if none is given. +func (c *PubSub) Unsubscribe(channels ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, channel := range channels { + delete(c.channels, channel) + } + err := c.subscribe("unsubscribe", channels...) + return err +} + +// PUnsubscribe the client from the given patterns, or from all of +// them if none is given. +func (c *PubSub) PUnsubscribe(patterns ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, pattern := range patterns { + delete(c.patterns, pattern) + } + err := c.subscribe("punsubscribe", patterns...) + return err +} + +func (c *PubSub) subscribe(redisCmd string, channels ...string) error { + cn, err := c._conn(channels) + if err != nil { + return err + } + + err = c._subscribe(cn, redisCmd, channels) + c._releaseConn(cn, err, false) + return err +} + +func (c *PubSub) Ping(payload ...string) error { + args := []interface{}{"ping"} + if len(payload) == 1 { + args = append(args, payload[0]) + } + cmd := NewCmd(args...) + + cn, err := c.conn() + if err != nil { + return err + } + + err = c.writeCmd(cn, cmd) + c.releaseConn(cn, err, false) + return err +} + +// Subscription received after a successful subscription to channel. +type Subscription struct { + // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". + Kind string + // Channel name we have subscribed to. + Channel string + // Number of channels we are currently subscribed to. + Count int +} + +func (m *Subscription) String() string { + return fmt.Sprintf("%s: %s", m.Kind, m.Channel) +} + +// Message received as result of a PUBLISH command issued by another client. +type Message struct { + Channel string + Pattern string + Payload string +} + +func (m *Message) String() string { + return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload) +} + +// Pong received as result of a PING command issued by another client. +type Pong struct { + Payload string +} + +func (p *Pong) String() string { + if p.Payload != "" { + return fmt.Sprintf("Pong<%s>", p.Payload) + } + return "Pong" +} + +func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { + switch reply := reply.(type) { + case string: + return &Pong{ + Payload: reply, + }, nil + case []interface{}: + switch kind := reply[0].(string); kind { + case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": + return &Subscription{ + Kind: kind, + Channel: reply[1].(string), + Count: int(reply[2].(int64)), + }, nil + case "message": + return &Message{ + Channel: reply[1].(string), + Payload: reply[2].(string), + }, nil + case "pmessage": + return &Message{ + Pattern: reply[1].(string), + Channel: reply[2].(string), + Payload: reply[3].(string), + }, nil + case "pong": + return &Pong{ + Payload: reply[1].(string), + }, nil + default: + return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind) + } + default: + return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply) + } +} + +// ReceiveTimeout acts like Receive but returns an error if message +// is not received in time. This is low-level API and in most cases +// Channel should be used instead. +func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { + if c.cmd == nil { + c.cmd = NewCmd() + } + + cn, err := c.conn() + if err != nil { + return nil, err + } + + err = cn.WithReader(timeout, func(rd *proto.Reader) error { + return c.cmd.readReply(rd) + }) + + c.releaseConn(cn, err, timeout > 0) + if err != nil { + return nil, err + } + + return c.newMessage(c.cmd.Val()) +} + +// Receive returns a message as a Subscription, Message, Pong or error. +// See PubSub example for details. This is low-level API and in most cases +// Channel should be used instead. +func (c *PubSub) Receive() (interface{}, error) { + return c.ReceiveTimeout(0) +} + +// ReceiveMessage returns a Message or error ignoring Subscription and Pong +// messages. This is low-level API and in most cases Channel should be used +// instead. +func (c *PubSub) ReceiveMessage() (*Message, error) { + for { + msg, err := c.Receive() + if err != nil { + return nil, err + } + + switch msg := msg.(type) { + case *Subscription: + // Ignore. + case *Pong: + // Ignore. + case *Message: + return msg, nil + default: + err := fmt.Errorf("redis: unknown message: %T", msg) + return nil, err + } + } +} + +// Channel returns a Go channel for concurrently receiving messages. +// It periodically sends Ping messages to test connection health. +// The channel is closed with PubSub. Receive* APIs can not be used +// after channel is created. +func (c *PubSub) Channel() <-chan *Message { + c.chOnce.Do(c.initChannel) + return c.ch +} + +func (c *PubSub) initChannel() { + c.ch = make(chan *Message, 100) + c.ping = make(chan struct{}, 10) + + go func() { + var errCount int + for { + msg, err := c.Receive() + if err != nil { + if err == pool.ErrClosed { + close(c.ch) + return + } + if errCount > 0 { + time.Sleep(c.retryBackoff(errCount)) + } + errCount++ + continue + } + errCount = 0 + + // Any message is as good as a ping. + select { + case c.ping <- struct{}{}: + default: + } + + switch msg := msg.(type) { + case *Subscription: + // Ignore. + case *Pong: + // Ignore. + case *Message: + c.ch <- msg + default: + internal.Logf("redis: unknown message: %T", msg) + } + } + }() + + go func() { + const timeout = 5 * time.Second + + timer := time.NewTimer(timeout) + timer.Stop() + + healthy := true + for { + timer.Reset(timeout) + select { + case <-c.ping: + healthy = true + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + pingErr := c.Ping() + if healthy { + healthy = false + } else { + if pingErr == nil { + pingErr = errPingTimeout + } + c.mu.Lock() + c._reconnect(pingErr) + c.mu.Unlock() + } + case <-c.exit: + return + } + } + }() +} + +func (c *PubSub) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} |