diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis/v8/ring.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/v8/ring.go | 731 |
1 files changed, 731 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/v8/ring.go b/vendor/github.com/go-redis/redis/v8/ring.go new file mode 100644 index 0000000000..34d05f35ae --- /dev/null +++ b/vendor/github.com/go-redis/redis/v8/ring.go @@ -0,0 +1,731 @@ +package redis + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/dgryski/go-rendezvous" + "github.com/go-redis/redis/v8/internal" + "github.com/go-redis/redis/v8/internal/hashtag" + "github.com/go-redis/redis/v8/internal/pool" + "github.com/go-redis/redis/v8/internal/rand" +) + +var errRingShardsDown = errors.New("redis: all ring shards are down") + +//------------------------------------------------------------------------------ + +type ConsistentHash interface { + Get(string) string +} + +type rendezvousWrapper struct { + *rendezvous.Rendezvous +} + +func (w rendezvousWrapper) Get(key string) string { + return w.Lookup(key) +} + +func newRendezvous(shards []string) ConsistentHash { + return rendezvousWrapper{rendezvous.New(shards, xxhash.Sum64String)} +} + +//------------------------------------------------------------------------------ + +// RingOptions are used to configure a ring client and should be +// passed to NewRing. +type RingOptions struct { + // Map of name => host:port addresses of ring shards. + Addrs map[string]string + + // NewClient creates a shard client with provided name and options. + NewClient func(name string, opt *Options) *Client + + // Frequency of PING commands sent to check shards availability. + // Shard is considered down after 3 subsequent failed checks. + HeartbeatFrequency time.Duration + + // NewConsistentHash returns a consistent hash that is used + // to distribute keys across the shards. + // + // See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8 + // for consistent hashing algorithmic tradeoffs. + NewConsistentHash func(shards []string) ConsistentHash + + // Following options are copied from Options struct. + + Dialer func(ctx context.Context, network, addr string) (net.Conn, error) + OnConnect func(ctx context.Context, cn *Conn) error + + Username string + Password string + DB int + + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration + + TLSConfig *tls.Config + Limiter Limiter +} + +func (opt *RingOptions) init() { + if opt.NewClient == nil { + opt.NewClient = func(name string, opt *Options) *Client { + return NewClient(opt) + } + } + + if opt.HeartbeatFrequency == 0 { + opt.HeartbeatFrequency = 500 * time.Millisecond + } + + if opt.NewConsistentHash == nil { + opt.NewConsistentHash = newRendezvous + } + + if opt.MaxRetries == -1 { + opt.MaxRetries = 0 + } else if opt.MaxRetries == 0 { + opt.MaxRetries = 3 + } + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } +} + +func (opt *RingOptions) clientOptions() *Options { + return &Options{ + Dialer: opt.Dialer, + OnConnect: opt.OnConnect, + + Username: opt.Username, + Password: opt.Password, + DB: opt.DB, + + MaxRetries: -1, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + + TLSConfig: opt.TLSConfig, + Limiter: opt.Limiter, + } +} + +//------------------------------------------------------------------------------ + +type ringShard struct { + Client *Client + down int32 +} + +func newRingShard(opt *RingOptions, name, addr string) *ringShard { + clopt := opt.clientOptions() + clopt.Addr = addr + + return &ringShard{ + Client: opt.NewClient(name, clopt), + } +} + +func (shard *ringShard) String() string { + var state string + if shard.IsUp() { + state = "up" + } else { + state = "down" + } + return fmt.Sprintf("%s is %s", shard.Client, state) +} + +func (shard *ringShard) IsDown() bool { + const threshold = 3 + return atomic.LoadInt32(&shard.down) >= threshold +} + +func (shard *ringShard) IsUp() bool { + return !shard.IsDown() +} + +// Vote votes to set shard state and returns true if state was changed. +func (shard *ringShard) Vote(up bool) bool { + if up { + changed := shard.IsDown() + atomic.StoreInt32(&shard.down, 0) + return changed + } + + if shard.IsDown() { + return false + } + + atomic.AddInt32(&shard.down, 1) + return shard.IsDown() +} + +//------------------------------------------------------------------------------ + +type ringShards struct { + opt *RingOptions + + mu sync.RWMutex + hash ConsistentHash + shards map[string]*ringShard // read only + list []*ringShard // read only + numShard int + closed bool +} + +func newRingShards(opt *RingOptions) *ringShards { + shards := make(map[string]*ringShard, len(opt.Addrs)) + list := make([]*ringShard, 0, len(shards)) + + for name, addr := range opt.Addrs { + shard := newRingShard(opt, name, addr) + shards[name] = shard + + list = append(list, shard) + } + + c := &ringShards{ + opt: opt, + + shards: shards, + list: list, + } + c.rebalance() + + return c +} + +func (c *ringShards) List() []*ringShard { + var list []*ringShard + + c.mu.RLock() + if !c.closed { + list = c.list + } + c.mu.RUnlock() + + return list +} + +func (c *ringShards) Hash(key string) string { + key = hashtag.Key(key) + + var hash string + + c.mu.RLock() + if c.numShard > 0 { + hash = c.hash.Get(key) + } + c.mu.RUnlock() + + return hash +} + +func (c *ringShards) GetByKey(key string) (*ringShard, error) { + key = hashtag.Key(key) + + c.mu.RLock() + + if c.closed { + c.mu.RUnlock() + return nil, pool.ErrClosed + } + + if c.numShard == 0 { + c.mu.RUnlock() + return nil, errRingShardsDown + } + + hash := c.hash.Get(key) + if hash == "" { + c.mu.RUnlock() + return nil, errRingShardsDown + } + + shard := c.shards[hash] + c.mu.RUnlock() + + return shard, nil +} + +func (c *ringShards) GetByName(shardName string) (*ringShard, error) { + if shardName == "" { + return c.Random() + } + + c.mu.RLock() + shard := c.shards[shardName] + c.mu.RUnlock() + return shard, nil +} + +func (c *ringShards) Random() (*ringShard, error) { + return c.GetByKey(strconv.Itoa(rand.Int())) +} + +// heartbeat monitors state of each shard in the ring. +func (c *ringShards) Heartbeat(frequency time.Duration) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + ctx := context.Background() + for range ticker.C { + var rebalance bool + + for _, shard := range c.List() { + err := shard.Client.Ping(ctx).Err() + isUp := err == nil || err == pool.ErrPoolTimeout + if shard.Vote(isUp) { + internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard) + rebalance = true + } + } + + if rebalance { + c.rebalance() + } + } +} + +// rebalance removes dead shards from the Ring. +func (c *ringShards) rebalance() { + c.mu.RLock() + shards := c.shards + c.mu.RUnlock() + + liveShards := make([]string, 0, len(shards)) + + for name, shard := range shards { + if shard.IsUp() { + liveShards = append(liveShards, name) + } + } + + hash := c.opt.NewConsistentHash(liveShards) + + c.mu.Lock() + c.hash = hash + c.numShard = len(liveShards) + c.mu.Unlock() +} + +func (c *ringShards) Len() int { + c.mu.RLock() + l := c.numShard + c.mu.RUnlock() + return l +} + +func (c *ringShards) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil + } + c.closed = true + + var firstErr error + for _, shard := range c.shards { + if err := shard.Client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + c.hash = nil + c.shards = nil + c.list = nil + + return firstErr +} + +//------------------------------------------------------------------------------ + +type ring struct { + opt *RingOptions + shards *ringShards + cmdsInfoCache *cmdsInfoCache //nolint:structcheck +} + +// Ring is a Redis client that uses consistent hashing to distribute +// keys across multiple Redis servers (shards). It's safe for +// concurrent use by multiple goroutines. +// +// Ring monitors the state of each shard and removes dead shards from +// the ring. When a shard comes online it is added back to the ring. This +// gives you maximum availability and partition tolerance, but no +// consistency between different shards or even clients. Each client +// uses shards that are available to the client and does not do any +// coordination when shard state is changed. +// +// Ring should be used when you need multiple Redis servers for caching +// and can tolerate losing data when one of the servers dies. +// Otherwise you should use Redis Cluster. +type Ring struct { + *ring + cmdable + hooks + ctx context.Context +} + +func NewRing(opt *RingOptions) *Ring { + opt.init() + + ring := Ring{ + ring: &ring{ + opt: opt, + shards: newRingShards(opt), + }, + ctx: context.Background(), + } + + ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.cmdable = ring.Process + + go ring.shards.Heartbeat(opt.HeartbeatFrequency) + + return &ring +} + +func (c *Ring) Context() context.Context { + return c.ctx +} + +func (c *Ring) WithContext(ctx context.Context) *Ring { + if ctx == nil { + panic("nil context") + } + clone := *c + clone.cmdable = clone.Process + clone.hooks.lock() + clone.ctx = ctx + return &clone +} + +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd { + cmd := NewCmd(ctx, args...) + _ = c.Process(ctx, cmd) + return cmd +} + +func (c *Ring) Process(ctx context.Context, cmd Cmder) error { + return c.hooks.process(ctx, cmd, c.process) +} + +// Options returns read-only Options that were used to create the client. +func (c *Ring) Options() *RingOptions { + return c.opt +} + +func (c *Ring) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +// PoolStats returns accumulated connection pool stats. +func (c *Ring) PoolStats() *PoolStats { + shards := c.shards.List() + var acc PoolStats + for _, shard := range shards { + s := shard.Client.connPool.Stats() + acc.Hits += s.Hits + acc.Misses += s.Misses + acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns + acc.IdleConns += s.IdleConns + } + return &acc +} + +// Len returns the current number of shards in the ring. +func (c *Ring) Len() int { + return c.shards.Len() +} + +// Subscribe subscribes the client to the specified channels. +func (c *Ring) Subscribe(ctx context.Context, channels ...string) *PubSub { + if len(channels) == 0 { + panic("at least one channel is required") + } + + shard, err := c.shards.GetByKey(channels[0]) + if err != nil { + // TODO: return PubSub with sticky error + panic(err) + } + return shard.Client.Subscribe(ctx, channels...) +} + +// PSubscribe subscribes the client to the given patterns. +func (c *Ring) PSubscribe(ctx context.Context, channels ...string) *PubSub { + if len(channels) == 0 { + panic("at least one channel is required") + } + + shard, err := c.shards.GetByKey(channels[0]) + if err != nil { + // TODO: return PubSub with sticky error + panic(err) + } + return shard.Client.PSubscribe(ctx, channels...) +} + +// ForEachShard concurrently calls the fn on each live shard in the ring. +// It returns the first error if any. +func (c *Ring) ForEachShard( + ctx context.Context, + fn func(ctx context.Context, client *Client) error, +) error { + shards := c.shards.List() + var wg sync.WaitGroup + errCh := make(chan error, 1) + for _, shard := range shards { + if shard.IsDown() { + continue + } + + wg.Add(1) + go func(shard *ringShard) { + defer wg.Done() + err := fn(ctx, shard.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + }(shard) + } + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } +} + +func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) { + shards := c.shards.List() + var firstErr error + for _, shard := range shards { + cmdsInfo, err := shard.Client.Command(ctx).Result() + if err == nil { + return cmdsInfo, nil + } + if firstErr == nil { + firstErr = err + } + } + if firstErr == nil { + return nil, errRingShardsDown + } + return nil, firstErr +} + +func (c *Ring) cmdInfo(ctx context.Context, name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get(ctx) + if err != nil { + return nil + } + info := cmdsInfo[name] + if info == nil { + internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name) + } + return info +} + +func (c *Ring) cmdShard(ctx context.Context, cmd Cmder) (*ringShard, error) { + cmdInfo := c.cmdInfo(ctx, cmd.Name()) + pos := cmdFirstKeyPos(cmd, cmdInfo) + if pos == 0 { + return c.shards.Random() + } + firstKey := cmd.stringArg(pos) + return c.shards.GetByKey(firstKey) +} + +func (c *Ring) process(ctx context.Context, cmd Cmder) error { + var lastErr error + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } + } + + shard, err := c.cmdShard(ctx, cmd) + if err != nil { + return err + } + + lastErr = shard.Client.Process(ctx, cmd) + if lastErr == nil || !shouldRetry(lastErr, cmd.readTimeout() == nil) { + return lastErr + } + } + return lastErr +} + +func (c *Ring) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(ctx, fn) +} + +func (c *Ring) Pipeline() Pipeliner { + pipe := Pipeline{ + ctx: c.ctx, + exec: c.processPipeline, + } + pipe.init() + return &pipe +} + +func (c *Ring) processPipeline(ctx context.Context, cmds []Cmder) error { + return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { + return c.generalProcessPipeline(ctx, cmds, false) + }) +} + +func (c *Ring) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { + return c.TxPipeline().Pipelined(ctx, fn) +} + +func (c *Ring) TxPipeline() Pipeliner { + pipe := Pipeline{ + ctx: c.ctx, + exec: c.processTxPipeline, + } + pipe.init() + return &pipe +} + +func (c *Ring) processTxPipeline(ctx context.Context, cmds []Cmder) error { + return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { + return c.generalProcessPipeline(ctx, cmds, true) + }) +} + +func (c *Ring) generalProcessPipeline( + ctx context.Context, cmds []Cmder, tx bool, +) error { + cmdsMap := make(map[string][]Cmder) + for _, cmd := range cmds { + cmdInfo := c.cmdInfo(ctx, cmd.Name()) + hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) + if hash != "" { + hash = c.shards.Hash(hash) + } + cmdsMap[hash] = append(cmdsMap[hash], cmd) + } + + var wg sync.WaitGroup + for hash, cmds := range cmdsMap { + wg.Add(1) + go func(hash string, cmds []Cmder) { + defer wg.Done() + + _ = c.processShardPipeline(ctx, hash, cmds, tx) + }(hash, cmds) + } + + wg.Wait() + return cmdsFirstErr(cmds) +} + +func (c *Ring) processShardPipeline( + ctx context.Context, hash string, cmds []Cmder, tx bool, +) error { + // TODO: retry? + shard, err := c.shards.GetByName(hash) + if err != nil { + setCmdsErr(cmds, err) + return err + } + + if tx { + return shard.Client.processTxPipeline(ctx, cmds) + } + return shard.Client.processPipeline(ctx, cmds) +} + +func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { + if len(keys) == 0 { + return fmt.Errorf("redis: Watch requires at least one key") + } + + var shards []*ringShard + for _, key := range keys { + if key != "" { + shard, err := c.shards.GetByKey(hashtag.Key(key)) + if err != nil { + return err + } + + shards = append(shards, shard) + } + } + + if len(shards) == 0 { + return fmt.Errorf("redis: Watch requires at least one shard") + } + + if len(shards) > 1 { + for _, shard := range shards[1:] { + if shard.Client != shards[0].Client { + err := fmt.Errorf("redis: Watch requires all keys to be in the same shard") + return err + } + } + } + + return shards[0].Client.Watch(ctx, fn, keys...) +} + +// Close closes the ring client, releasing any open resources. +// +// It is rare to Close a Ring, as the Ring is meant to be long-lived +// and shared between many goroutines. +func (c *Ring) Close() error { + return c.shards.Close() +} |