summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/v8/ring.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/v8/ring.go')
-rw-r--r--vendor/github.com/go-redis/redis/v8/ring.go731
1 files changed, 731 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/v8/ring.go b/vendor/github.com/go-redis/redis/v8/ring.go
new file mode 100644
index 0000000000..34d05f35ae
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/v8/ring.go
@@ -0,0 +1,731 @@
+package redis
+
+import (
+ "context"
+ "crypto/tls"
+ "errors"
+ "fmt"
+ "net"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/cespare/xxhash/v2"
+ "github.com/dgryski/go-rendezvous"
+ "github.com/go-redis/redis/v8/internal"
+ "github.com/go-redis/redis/v8/internal/hashtag"
+ "github.com/go-redis/redis/v8/internal/pool"
+ "github.com/go-redis/redis/v8/internal/rand"
+)
+
+var errRingShardsDown = errors.New("redis: all ring shards are down")
+
+//------------------------------------------------------------------------------
+
+type ConsistentHash interface {
+ Get(string) string
+}
+
+type rendezvousWrapper struct {
+ *rendezvous.Rendezvous
+}
+
+func (w rendezvousWrapper) Get(key string) string {
+ return w.Lookup(key)
+}
+
+func newRendezvous(shards []string) ConsistentHash {
+ return rendezvousWrapper{rendezvous.New(shards, xxhash.Sum64String)}
+}
+
+//------------------------------------------------------------------------------
+
+// RingOptions are used to configure a ring client and should be
+// passed to NewRing.
+type RingOptions struct {
+ // Map of name => host:port addresses of ring shards.
+ Addrs map[string]string
+
+ // NewClient creates a shard client with provided name and options.
+ NewClient func(name string, opt *Options) *Client
+
+ // Frequency of PING commands sent to check shards availability.
+ // Shard is considered down after 3 subsequent failed checks.
+ HeartbeatFrequency time.Duration
+
+ // NewConsistentHash returns a consistent hash that is used
+ // to distribute keys across the shards.
+ //
+ // See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8
+ // for consistent hashing algorithmic tradeoffs.
+ NewConsistentHash func(shards []string) ConsistentHash
+
+ // Following options are copied from Options struct.
+
+ Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
+ OnConnect func(ctx context.Context, cn *Conn) error
+
+ Username string
+ Password string
+ DB int
+
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
+
+ DialTimeout time.Duration
+ ReadTimeout time.Duration
+ WriteTimeout time.Duration
+
+ PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
+ PoolTimeout time.Duration
+ IdleTimeout time.Duration
+ IdleCheckFrequency time.Duration
+
+ TLSConfig *tls.Config
+ Limiter Limiter
+}
+
+func (opt *RingOptions) init() {
+ if opt.NewClient == nil {
+ opt.NewClient = func(name string, opt *Options) *Client {
+ return NewClient(opt)
+ }
+ }
+
+ if opt.HeartbeatFrequency == 0 {
+ opt.HeartbeatFrequency = 500 * time.Millisecond
+ }
+
+ if opt.NewConsistentHash == nil {
+ opt.NewConsistentHash = newRendezvous
+ }
+
+ if opt.MaxRetries == -1 {
+ opt.MaxRetries = 0
+ } else if opt.MaxRetries == 0 {
+ opt.MaxRetries = 3
+ }
+ switch opt.MinRetryBackoff {
+ case -1:
+ opt.MinRetryBackoff = 0
+ case 0:
+ opt.MinRetryBackoff = 8 * time.Millisecond
+ }
+ switch opt.MaxRetryBackoff {
+ case -1:
+ opt.MaxRetryBackoff = 0
+ case 0:
+ opt.MaxRetryBackoff = 512 * time.Millisecond
+ }
+}
+
+func (opt *RingOptions) clientOptions() *Options {
+ return &Options{
+ Dialer: opt.Dialer,
+ OnConnect: opt.OnConnect,
+
+ Username: opt.Username,
+ Password: opt.Password,
+ DB: opt.DB,
+
+ MaxRetries: -1,
+
+ DialTimeout: opt.DialTimeout,
+ ReadTimeout: opt.ReadTimeout,
+ WriteTimeout: opt.WriteTimeout,
+
+ PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
+ PoolTimeout: opt.PoolTimeout,
+ IdleTimeout: opt.IdleTimeout,
+ IdleCheckFrequency: opt.IdleCheckFrequency,
+
+ TLSConfig: opt.TLSConfig,
+ Limiter: opt.Limiter,
+ }
+}
+
+//------------------------------------------------------------------------------
+
+type ringShard struct {
+ Client *Client
+ down int32
+}
+
+func newRingShard(opt *RingOptions, name, addr string) *ringShard {
+ clopt := opt.clientOptions()
+ clopt.Addr = addr
+
+ return &ringShard{
+ Client: opt.NewClient(name, clopt),
+ }
+}
+
+func (shard *ringShard) String() string {
+ var state string
+ if shard.IsUp() {
+ state = "up"
+ } else {
+ state = "down"
+ }
+ return fmt.Sprintf("%s is %s", shard.Client, state)
+}
+
+func (shard *ringShard) IsDown() bool {
+ const threshold = 3
+ return atomic.LoadInt32(&shard.down) >= threshold
+}
+
+func (shard *ringShard) IsUp() bool {
+ return !shard.IsDown()
+}
+
+// Vote votes to set shard state and returns true if state was changed.
+func (shard *ringShard) Vote(up bool) bool {
+ if up {
+ changed := shard.IsDown()
+ atomic.StoreInt32(&shard.down, 0)
+ return changed
+ }
+
+ if shard.IsDown() {
+ return false
+ }
+
+ atomic.AddInt32(&shard.down, 1)
+ return shard.IsDown()
+}
+
+//------------------------------------------------------------------------------
+
+type ringShards struct {
+ opt *RingOptions
+
+ mu sync.RWMutex
+ hash ConsistentHash
+ shards map[string]*ringShard // read only
+ list []*ringShard // read only
+ numShard int
+ closed bool
+}
+
+func newRingShards(opt *RingOptions) *ringShards {
+ shards := make(map[string]*ringShard, len(opt.Addrs))
+ list := make([]*ringShard, 0, len(shards))
+
+ for name, addr := range opt.Addrs {
+ shard := newRingShard(opt, name, addr)
+ shards[name] = shard
+
+ list = append(list, shard)
+ }
+
+ c := &ringShards{
+ opt: opt,
+
+ shards: shards,
+ list: list,
+ }
+ c.rebalance()
+
+ return c
+}
+
+func (c *ringShards) List() []*ringShard {
+ var list []*ringShard
+
+ c.mu.RLock()
+ if !c.closed {
+ list = c.list
+ }
+ c.mu.RUnlock()
+
+ return list
+}
+
+func (c *ringShards) Hash(key string) string {
+ key = hashtag.Key(key)
+
+ var hash string
+
+ c.mu.RLock()
+ if c.numShard > 0 {
+ hash = c.hash.Get(key)
+ }
+ c.mu.RUnlock()
+
+ return hash
+}
+
+func (c *ringShards) GetByKey(key string) (*ringShard, error) {
+ key = hashtag.Key(key)
+
+ c.mu.RLock()
+
+ if c.closed {
+ c.mu.RUnlock()
+ return nil, pool.ErrClosed
+ }
+
+ if c.numShard == 0 {
+ c.mu.RUnlock()
+ return nil, errRingShardsDown
+ }
+
+ hash := c.hash.Get(key)
+ if hash == "" {
+ c.mu.RUnlock()
+ return nil, errRingShardsDown
+ }
+
+ shard := c.shards[hash]
+ c.mu.RUnlock()
+
+ return shard, nil
+}
+
+func (c *ringShards) GetByName(shardName string) (*ringShard, error) {
+ if shardName == "" {
+ return c.Random()
+ }
+
+ c.mu.RLock()
+ shard := c.shards[shardName]
+ c.mu.RUnlock()
+ return shard, nil
+}
+
+func (c *ringShards) Random() (*ringShard, error) {
+ return c.GetByKey(strconv.Itoa(rand.Int()))
+}
+
+// heartbeat monitors state of each shard in the ring.
+func (c *ringShards) Heartbeat(frequency time.Duration) {
+ ticker := time.NewTicker(frequency)
+ defer ticker.Stop()
+
+ ctx := context.Background()
+ for range ticker.C {
+ var rebalance bool
+
+ for _, shard := range c.List() {
+ err := shard.Client.Ping(ctx).Err()
+ isUp := err == nil || err == pool.ErrPoolTimeout
+ if shard.Vote(isUp) {
+ internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
+ rebalance = true
+ }
+ }
+
+ if rebalance {
+ c.rebalance()
+ }
+ }
+}
+
+// rebalance removes dead shards from the Ring.
+func (c *ringShards) rebalance() {
+ c.mu.RLock()
+ shards := c.shards
+ c.mu.RUnlock()
+
+ liveShards := make([]string, 0, len(shards))
+
+ for name, shard := range shards {
+ if shard.IsUp() {
+ liveShards = append(liveShards, name)
+ }
+ }
+
+ hash := c.opt.NewConsistentHash(liveShards)
+
+ c.mu.Lock()
+ c.hash = hash
+ c.numShard = len(liveShards)
+ c.mu.Unlock()
+}
+
+func (c *ringShards) Len() int {
+ c.mu.RLock()
+ l := c.numShard
+ c.mu.RUnlock()
+ return l
+}
+
+func (c *ringShards) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.closed {
+ return nil
+ }
+ c.closed = true
+
+ var firstErr error
+ for _, shard := range c.shards {
+ if err := shard.Client.Close(); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ c.hash = nil
+ c.shards = nil
+ c.list = nil
+
+ return firstErr
+}
+
+//------------------------------------------------------------------------------
+
+type ring struct {
+ opt *RingOptions
+ shards *ringShards
+ cmdsInfoCache *cmdsInfoCache //nolint:structcheck
+}
+
+// Ring is a Redis client that uses consistent hashing to distribute
+// keys across multiple Redis servers (shards). It's safe for
+// concurrent use by multiple goroutines.
+//
+// Ring monitors the state of each shard and removes dead shards from
+// the ring. When a shard comes online it is added back to the ring. This
+// gives you maximum availability and partition tolerance, but no
+// consistency between different shards or even clients. Each client
+// uses shards that are available to the client and does not do any
+// coordination when shard state is changed.
+//
+// Ring should be used when you need multiple Redis servers for caching
+// and can tolerate losing data when one of the servers dies.
+// Otherwise you should use Redis Cluster.
+type Ring struct {
+ *ring
+ cmdable
+ hooks
+ ctx context.Context
+}
+
+func NewRing(opt *RingOptions) *Ring {
+ opt.init()
+
+ ring := Ring{
+ ring: &ring{
+ opt: opt,
+ shards: newRingShards(opt),
+ },
+ ctx: context.Background(),
+ }
+
+ ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
+ ring.cmdable = ring.Process
+
+ go ring.shards.Heartbeat(opt.HeartbeatFrequency)
+
+ return &ring
+}
+
+func (c *Ring) Context() context.Context {
+ return c.ctx
+}
+
+func (c *Ring) WithContext(ctx context.Context) *Ring {
+ if ctx == nil {
+ panic("nil context")
+ }
+ clone := *c
+ clone.cmdable = clone.Process
+ clone.hooks.lock()
+ clone.ctx = ctx
+ return &clone
+}
+
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
+ cmd := NewCmd(ctx, args...)
+ _ = c.Process(ctx, cmd)
+ return cmd
+}
+
+func (c *Ring) Process(ctx context.Context, cmd Cmder) error {
+ return c.hooks.process(ctx, cmd, c.process)
+}
+
+// Options returns read-only Options that were used to create the client.
+func (c *Ring) Options() *RingOptions {
+ return c.opt
+}
+
+func (c *Ring) retryBackoff(attempt int) time.Duration {
+ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
+}
+
+// PoolStats returns accumulated connection pool stats.
+func (c *Ring) PoolStats() *PoolStats {
+ shards := c.shards.List()
+ var acc PoolStats
+ for _, shard := range shards {
+ s := shard.Client.connPool.Stats()
+ acc.Hits += s.Hits
+ acc.Misses += s.Misses
+ acc.Timeouts += s.Timeouts
+ acc.TotalConns += s.TotalConns
+ acc.IdleConns += s.IdleConns
+ }
+ return &acc
+}
+
+// Len returns the current number of shards in the ring.
+func (c *Ring) Len() int {
+ return c.shards.Len()
+}
+
+// Subscribe subscribes the client to the specified channels.
+func (c *Ring) Subscribe(ctx context.Context, channels ...string) *PubSub {
+ if len(channels) == 0 {
+ panic("at least one channel is required")
+ }
+
+ shard, err := c.shards.GetByKey(channels[0])
+ if err != nil {
+ // TODO: return PubSub with sticky error
+ panic(err)
+ }
+ return shard.Client.Subscribe(ctx, channels...)
+}
+
+// PSubscribe subscribes the client to the given patterns.
+func (c *Ring) PSubscribe(ctx context.Context, channels ...string) *PubSub {
+ if len(channels) == 0 {
+ panic("at least one channel is required")
+ }
+
+ shard, err := c.shards.GetByKey(channels[0])
+ if err != nil {
+ // TODO: return PubSub with sticky error
+ panic(err)
+ }
+ return shard.Client.PSubscribe(ctx, channels...)
+}
+
+// ForEachShard concurrently calls the fn on each live shard in the ring.
+// It returns the first error if any.
+func (c *Ring) ForEachShard(
+ ctx context.Context,
+ fn func(ctx context.Context, client *Client) error,
+) error {
+ shards := c.shards.List()
+ var wg sync.WaitGroup
+ errCh := make(chan error, 1)
+ for _, shard := range shards {
+ if shard.IsDown() {
+ continue
+ }
+
+ wg.Add(1)
+ go func(shard *ringShard) {
+ defer wg.Done()
+ err := fn(ctx, shard.Client)
+ if err != nil {
+ select {
+ case errCh <- err:
+ default:
+ }
+ }
+ }(shard)
+ }
+ wg.Wait()
+
+ select {
+ case err := <-errCh:
+ return err
+ default:
+ return nil
+ }
+}
+
+func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
+ shards := c.shards.List()
+ var firstErr error
+ for _, shard := range shards {
+ cmdsInfo, err := shard.Client.Command(ctx).Result()
+ if err == nil {
+ return cmdsInfo, nil
+ }
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ if firstErr == nil {
+ return nil, errRingShardsDown
+ }
+ return nil, firstErr
+}
+
+func (c *Ring) cmdInfo(ctx context.Context, name string) *CommandInfo {
+ cmdsInfo, err := c.cmdsInfoCache.Get(ctx)
+ if err != nil {
+ return nil
+ }
+ info := cmdsInfo[name]
+ if info == nil {
+ internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
+ }
+ return info
+}
+
+func (c *Ring) cmdShard(ctx context.Context, cmd Cmder) (*ringShard, error) {
+ cmdInfo := c.cmdInfo(ctx, cmd.Name())
+ pos := cmdFirstKeyPos(cmd, cmdInfo)
+ if pos == 0 {
+ return c.shards.Random()
+ }
+ firstKey := cmd.stringArg(pos)
+ return c.shards.GetByKey(firstKey)
+}
+
+func (c *Ring) process(ctx context.Context, cmd Cmder) error {
+ var lastErr error
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
+ return err
+ }
+ }
+
+ shard, err := c.cmdShard(ctx, cmd)
+ if err != nil {
+ return err
+ }
+
+ lastErr = shard.Client.Process(ctx, cmd)
+ if lastErr == nil || !shouldRetry(lastErr, cmd.readTimeout() == nil) {
+ return lastErr
+ }
+ }
+ return lastErr
+}
+
+func (c *Ring) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
+ return c.Pipeline().Pipelined(ctx, fn)
+}
+
+func (c *Ring) Pipeline() Pipeliner {
+ pipe := Pipeline{
+ ctx: c.ctx,
+ exec: c.processPipeline,
+ }
+ pipe.init()
+ return &pipe
+}
+
+func (c *Ring) processPipeline(ctx context.Context, cmds []Cmder) error {
+ return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
+ return c.generalProcessPipeline(ctx, cmds, false)
+ })
+}
+
+func (c *Ring) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
+ return c.TxPipeline().Pipelined(ctx, fn)
+}
+
+func (c *Ring) TxPipeline() Pipeliner {
+ pipe := Pipeline{
+ ctx: c.ctx,
+ exec: c.processTxPipeline,
+ }
+ pipe.init()
+ return &pipe
+}
+
+func (c *Ring) processTxPipeline(ctx context.Context, cmds []Cmder) error {
+ return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
+ return c.generalProcessPipeline(ctx, cmds, true)
+ })
+}
+
+func (c *Ring) generalProcessPipeline(
+ ctx context.Context, cmds []Cmder, tx bool,
+) error {
+ cmdsMap := make(map[string][]Cmder)
+ for _, cmd := range cmds {
+ cmdInfo := c.cmdInfo(ctx, cmd.Name())
+ hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
+ if hash != "" {
+ hash = c.shards.Hash(hash)
+ }
+ cmdsMap[hash] = append(cmdsMap[hash], cmd)
+ }
+
+ var wg sync.WaitGroup
+ for hash, cmds := range cmdsMap {
+ wg.Add(1)
+ go func(hash string, cmds []Cmder) {
+ defer wg.Done()
+
+ _ = c.processShardPipeline(ctx, hash, cmds, tx)
+ }(hash, cmds)
+ }
+
+ wg.Wait()
+ return cmdsFirstErr(cmds)
+}
+
+func (c *Ring) processShardPipeline(
+ ctx context.Context, hash string, cmds []Cmder, tx bool,
+) error {
+ // TODO: retry?
+ shard, err := c.shards.GetByName(hash)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+
+ if tx {
+ return shard.Client.processTxPipeline(ctx, cmds)
+ }
+ return shard.Client.processPipeline(ctx, cmds)
+}
+
+func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
+ if len(keys) == 0 {
+ return fmt.Errorf("redis: Watch requires at least one key")
+ }
+
+ var shards []*ringShard
+ for _, key := range keys {
+ if key != "" {
+ shard, err := c.shards.GetByKey(hashtag.Key(key))
+ if err != nil {
+ return err
+ }
+
+ shards = append(shards, shard)
+ }
+ }
+
+ if len(shards) == 0 {
+ return fmt.Errorf("redis: Watch requires at least one shard")
+ }
+
+ if len(shards) > 1 {
+ for _, shard := range shards[1:] {
+ if shard.Client != shards[0].Client {
+ err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
+ return err
+ }
+ }
+ }
+
+ return shards[0].Client.Watch(ctx, fn, keys...)
+}
+
+// Close closes the ring client, releasing any open resources.
+//
+// It is rare to Close a Ring, as the Ring is meant to be long-lived
+// and shared between many goroutines.
+func (c *Ring) Close() error {
+ return c.shards.Close()
+}