You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ring.go 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/cespare/xxhash/v2"
  13. "github.com/dgryski/go-rendezvous"
  14. "github.com/go-redis/redis/v8/internal"
  15. "github.com/go-redis/redis/v8/internal/hashtag"
  16. "github.com/go-redis/redis/v8/internal/pool"
  17. "github.com/go-redis/redis/v8/internal/rand"
  18. )
  19. var errRingShardsDown = errors.New("redis: all ring shards are down")
  20. //------------------------------------------------------------------------------
  21. type ConsistentHash interface {
  22. Get(string) string
  23. }
  24. type rendezvousWrapper struct {
  25. *rendezvous.Rendezvous
  26. }
  27. func (w rendezvousWrapper) Get(key string) string {
  28. return w.Lookup(key)
  29. }
  30. func newRendezvous(shards []string) ConsistentHash {
  31. return rendezvousWrapper{rendezvous.New(shards, xxhash.Sum64String)}
  32. }
  33. //------------------------------------------------------------------------------
  34. // RingOptions are used to configure a ring client and should be
  35. // passed to NewRing.
  36. type RingOptions struct {
  37. // Map of name => host:port addresses of ring shards.
  38. Addrs map[string]string
  39. // NewClient creates a shard client with provided name and options.
  40. NewClient func(name string, opt *Options) *Client
  41. // Frequency of PING commands sent to check shards availability.
  42. // Shard is considered down after 3 subsequent failed checks.
  43. HeartbeatFrequency time.Duration
  44. // NewConsistentHash returns a consistent hash that is used
  45. // to distribute keys across the shards.
  46. //
  47. // See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8
  48. // for consistent hashing algorithmic tradeoffs.
  49. NewConsistentHash func(shards []string) ConsistentHash
  50. // Following options are copied from Options struct.
  51. Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
  52. OnConnect func(ctx context.Context, cn *Conn) error
  53. Username string
  54. Password string
  55. DB int
  56. MaxRetries int
  57. MinRetryBackoff time.Duration
  58. MaxRetryBackoff time.Duration
  59. DialTimeout time.Duration
  60. ReadTimeout time.Duration
  61. WriteTimeout time.Duration
  62. PoolSize int
  63. MinIdleConns int
  64. MaxConnAge time.Duration
  65. PoolTimeout time.Duration
  66. IdleTimeout time.Duration
  67. IdleCheckFrequency time.Duration
  68. TLSConfig *tls.Config
  69. Limiter Limiter
  70. }
  71. func (opt *RingOptions) init() {
  72. if opt.NewClient == nil {
  73. opt.NewClient = func(name string, opt *Options) *Client {
  74. return NewClient(opt)
  75. }
  76. }
  77. if opt.HeartbeatFrequency == 0 {
  78. opt.HeartbeatFrequency = 500 * time.Millisecond
  79. }
  80. if opt.NewConsistentHash == nil {
  81. opt.NewConsistentHash = newRendezvous
  82. }
  83. if opt.MaxRetries == -1 {
  84. opt.MaxRetries = 0
  85. } else if opt.MaxRetries == 0 {
  86. opt.MaxRetries = 3
  87. }
  88. switch opt.MinRetryBackoff {
  89. case -1:
  90. opt.MinRetryBackoff = 0
  91. case 0:
  92. opt.MinRetryBackoff = 8 * time.Millisecond
  93. }
  94. switch opt.MaxRetryBackoff {
  95. case -1:
  96. opt.MaxRetryBackoff = 0
  97. case 0:
  98. opt.MaxRetryBackoff = 512 * time.Millisecond
  99. }
  100. }
  101. func (opt *RingOptions) clientOptions() *Options {
  102. return &Options{
  103. Dialer: opt.Dialer,
  104. OnConnect: opt.OnConnect,
  105. Username: opt.Username,
  106. Password: opt.Password,
  107. DB: opt.DB,
  108. MaxRetries: -1,
  109. DialTimeout: opt.DialTimeout,
  110. ReadTimeout: opt.ReadTimeout,
  111. WriteTimeout: opt.WriteTimeout,
  112. PoolSize: opt.PoolSize,
  113. MinIdleConns: opt.MinIdleConns,
  114. MaxConnAge: opt.MaxConnAge,
  115. PoolTimeout: opt.PoolTimeout,
  116. IdleTimeout: opt.IdleTimeout,
  117. IdleCheckFrequency: opt.IdleCheckFrequency,
  118. TLSConfig: opt.TLSConfig,
  119. Limiter: opt.Limiter,
  120. }
  121. }
  122. //------------------------------------------------------------------------------
  123. type ringShard struct {
  124. Client *Client
  125. down int32
  126. }
  127. func newRingShard(opt *RingOptions, name, addr string) *ringShard {
  128. clopt := opt.clientOptions()
  129. clopt.Addr = addr
  130. return &ringShard{
  131. Client: opt.NewClient(name, clopt),
  132. }
  133. }
  134. func (shard *ringShard) String() string {
  135. var state string
  136. if shard.IsUp() {
  137. state = "up"
  138. } else {
  139. state = "down"
  140. }
  141. return fmt.Sprintf("%s is %s", shard.Client, state)
  142. }
  143. func (shard *ringShard) IsDown() bool {
  144. const threshold = 3
  145. return atomic.LoadInt32(&shard.down) >= threshold
  146. }
  147. func (shard *ringShard) IsUp() bool {
  148. return !shard.IsDown()
  149. }
  150. // Vote votes to set shard state and returns true if state was changed.
  151. func (shard *ringShard) Vote(up bool) bool {
  152. if up {
  153. changed := shard.IsDown()
  154. atomic.StoreInt32(&shard.down, 0)
  155. return changed
  156. }
  157. if shard.IsDown() {
  158. return false
  159. }
  160. atomic.AddInt32(&shard.down, 1)
  161. return shard.IsDown()
  162. }
  163. //------------------------------------------------------------------------------
  164. type ringShards struct {
  165. opt *RingOptions
  166. mu sync.RWMutex
  167. hash ConsistentHash
  168. shards map[string]*ringShard // read only
  169. list []*ringShard // read only
  170. numShard int
  171. closed bool
  172. }
  173. func newRingShards(opt *RingOptions) *ringShards {
  174. shards := make(map[string]*ringShard, len(opt.Addrs))
  175. list := make([]*ringShard, 0, len(shards))
  176. for name, addr := range opt.Addrs {
  177. shard := newRingShard(opt, name, addr)
  178. shards[name] = shard
  179. list = append(list, shard)
  180. }
  181. c := &ringShards{
  182. opt: opt,
  183. shards: shards,
  184. list: list,
  185. }
  186. c.rebalance()
  187. return c
  188. }
  189. func (c *ringShards) List() []*ringShard {
  190. var list []*ringShard
  191. c.mu.RLock()
  192. if !c.closed {
  193. list = c.list
  194. }
  195. c.mu.RUnlock()
  196. return list
  197. }
  198. func (c *ringShards) Hash(key string) string {
  199. key = hashtag.Key(key)
  200. var hash string
  201. c.mu.RLock()
  202. if c.numShard > 0 {
  203. hash = c.hash.Get(key)
  204. }
  205. c.mu.RUnlock()
  206. return hash
  207. }
  208. func (c *ringShards) GetByKey(key string) (*ringShard, error) {
  209. key = hashtag.Key(key)
  210. c.mu.RLock()
  211. if c.closed {
  212. c.mu.RUnlock()
  213. return nil, pool.ErrClosed
  214. }
  215. if c.numShard == 0 {
  216. c.mu.RUnlock()
  217. return nil, errRingShardsDown
  218. }
  219. hash := c.hash.Get(key)
  220. if hash == "" {
  221. c.mu.RUnlock()
  222. return nil, errRingShardsDown
  223. }
  224. shard := c.shards[hash]
  225. c.mu.RUnlock()
  226. return shard, nil
  227. }
  228. func (c *ringShards) GetByName(shardName string) (*ringShard, error) {
  229. if shardName == "" {
  230. return c.Random()
  231. }
  232. c.mu.RLock()
  233. shard := c.shards[shardName]
  234. c.mu.RUnlock()
  235. return shard, nil
  236. }
  237. func (c *ringShards) Random() (*ringShard, error) {
  238. return c.GetByKey(strconv.Itoa(rand.Int()))
  239. }
  240. // heartbeat monitors state of each shard in the ring.
  241. func (c *ringShards) Heartbeat(frequency time.Duration) {
  242. ticker := time.NewTicker(frequency)
  243. defer ticker.Stop()
  244. ctx := context.Background()
  245. for range ticker.C {
  246. var rebalance bool
  247. for _, shard := range c.List() {
  248. err := shard.Client.Ping(ctx).Err()
  249. isUp := err == nil || err == pool.ErrPoolTimeout
  250. if shard.Vote(isUp) {
  251. internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
  252. rebalance = true
  253. }
  254. }
  255. if rebalance {
  256. c.rebalance()
  257. }
  258. }
  259. }
  260. // rebalance removes dead shards from the Ring.
  261. func (c *ringShards) rebalance() {
  262. c.mu.RLock()
  263. shards := c.shards
  264. c.mu.RUnlock()
  265. liveShards := make([]string, 0, len(shards))
  266. for name, shard := range shards {
  267. if shard.IsUp() {
  268. liveShards = append(liveShards, name)
  269. }
  270. }
  271. hash := c.opt.NewConsistentHash(liveShards)
  272. c.mu.Lock()
  273. c.hash = hash
  274. c.numShard = len(liveShards)
  275. c.mu.Unlock()
  276. }
  277. func (c *ringShards) Len() int {
  278. c.mu.RLock()
  279. l := c.numShard
  280. c.mu.RUnlock()
  281. return l
  282. }
  283. func (c *ringShards) Close() error {
  284. c.mu.Lock()
  285. defer c.mu.Unlock()
  286. if c.closed {
  287. return nil
  288. }
  289. c.closed = true
  290. var firstErr error
  291. for _, shard := range c.shards {
  292. if err := shard.Client.Close(); err != nil && firstErr == nil {
  293. firstErr = err
  294. }
  295. }
  296. c.hash = nil
  297. c.shards = nil
  298. c.list = nil
  299. return firstErr
  300. }
  301. //------------------------------------------------------------------------------
  302. type ring struct {
  303. opt *RingOptions
  304. shards *ringShards
  305. cmdsInfoCache *cmdsInfoCache //nolint:structcheck
  306. }
  307. // Ring is a Redis client that uses consistent hashing to distribute
  308. // keys across multiple Redis servers (shards). It's safe for
  309. // concurrent use by multiple goroutines.
  310. //
  311. // Ring monitors the state of each shard and removes dead shards from
  312. // the ring. When a shard comes online it is added back to the ring. This
  313. // gives you maximum availability and partition tolerance, but no
  314. // consistency between different shards or even clients. Each client
  315. // uses shards that are available to the client and does not do any
  316. // coordination when shard state is changed.
  317. //
  318. // Ring should be used when you need multiple Redis servers for caching
  319. // and can tolerate losing data when one of the servers dies.
  320. // Otherwise you should use Redis Cluster.
  321. type Ring struct {
  322. *ring
  323. cmdable
  324. hooks
  325. ctx context.Context
  326. }
  327. func NewRing(opt *RingOptions) *Ring {
  328. opt.init()
  329. ring := Ring{
  330. ring: &ring{
  331. opt: opt,
  332. shards: newRingShards(opt),
  333. },
  334. ctx: context.Background(),
  335. }
  336. ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
  337. ring.cmdable = ring.Process
  338. go ring.shards.Heartbeat(opt.HeartbeatFrequency)
  339. return &ring
  340. }
  341. func (c *Ring) Context() context.Context {
  342. return c.ctx
  343. }
  344. func (c *Ring) WithContext(ctx context.Context) *Ring {
  345. if ctx == nil {
  346. panic("nil context")
  347. }
  348. clone := *c
  349. clone.cmdable = clone.Process
  350. clone.hooks.lock()
  351. clone.ctx = ctx
  352. return &clone
  353. }
  354. // Do creates a Cmd from the args and processes the cmd.
  355. func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
  356. cmd := NewCmd(ctx, args...)
  357. _ = c.Process(ctx, cmd)
  358. return cmd
  359. }
  360. func (c *Ring) Process(ctx context.Context, cmd Cmder) error {
  361. return c.hooks.process(ctx, cmd, c.process)
  362. }
  363. // Options returns read-only Options that were used to create the client.
  364. func (c *Ring) Options() *RingOptions {
  365. return c.opt
  366. }
  367. func (c *Ring) retryBackoff(attempt int) time.Duration {
  368. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  369. }
  370. // PoolStats returns accumulated connection pool stats.
  371. func (c *Ring) PoolStats() *PoolStats {
  372. shards := c.shards.List()
  373. var acc PoolStats
  374. for _, shard := range shards {
  375. s := shard.Client.connPool.Stats()
  376. acc.Hits += s.Hits
  377. acc.Misses += s.Misses
  378. acc.Timeouts += s.Timeouts
  379. acc.TotalConns += s.TotalConns
  380. acc.IdleConns += s.IdleConns
  381. }
  382. return &acc
  383. }
  384. // Len returns the current number of shards in the ring.
  385. func (c *Ring) Len() int {
  386. return c.shards.Len()
  387. }
  388. // Subscribe subscribes the client to the specified channels.
  389. func (c *Ring) Subscribe(ctx context.Context, channels ...string) *PubSub {
  390. if len(channels) == 0 {
  391. panic("at least one channel is required")
  392. }
  393. shard, err := c.shards.GetByKey(channels[0])
  394. if err != nil {
  395. // TODO: return PubSub with sticky error
  396. panic(err)
  397. }
  398. return shard.Client.Subscribe(ctx, channels...)
  399. }
  400. // PSubscribe subscribes the client to the given patterns.
  401. func (c *Ring) PSubscribe(ctx context.Context, channels ...string) *PubSub {
  402. if len(channels) == 0 {
  403. panic("at least one channel is required")
  404. }
  405. shard, err := c.shards.GetByKey(channels[0])
  406. if err != nil {
  407. // TODO: return PubSub with sticky error
  408. panic(err)
  409. }
  410. return shard.Client.PSubscribe(ctx, channels...)
  411. }
  412. // ForEachShard concurrently calls the fn on each live shard in the ring.
  413. // It returns the first error if any.
  414. func (c *Ring) ForEachShard(
  415. ctx context.Context,
  416. fn func(ctx context.Context, client *Client) error,
  417. ) error {
  418. shards := c.shards.List()
  419. var wg sync.WaitGroup
  420. errCh := make(chan error, 1)
  421. for _, shard := range shards {
  422. if shard.IsDown() {
  423. continue
  424. }
  425. wg.Add(1)
  426. go func(shard *ringShard) {
  427. defer wg.Done()
  428. err := fn(ctx, shard.Client)
  429. if err != nil {
  430. select {
  431. case errCh <- err:
  432. default:
  433. }
  434. }
  435. }(shard)
  436. }
  437. wg.Wait()
  438. select {
  439. case err := <-errCh:
  440. return err
  441. default:
  442. return nil
  443. }
  444. }
  445. func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
  446. shards := c.shards.List()
  447. var firstErr error
  448. for _, shard := range shards {
  449. cmdsInfo, err := shard.Client.Command(ctx).Result()
  450. if err == nil {
  451. return cmdsInfo, nil
  452. }
  453. if firstErr == nil {
  454. firstErr = err
  455. }
  456. }
  457. if firstErr == nil {
  458. return nil, errRingShardsDown
  459. }
  460. return nil, firstErr
  461. }
  462. func (c *Ring) cmdInfo(ctx context.Context, name string) *CommandInfo {
  463. cmdsInfo, err := c.cmdsInfoCache.Get(ctx)
  464. if err != nil {
  465. return nil
  466. }
  467. info := cmdsInfo[name]
  468. if info == nil {
  469. internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
  470. }
  471. return info
  472. }
  473. func (c *Ring) cmdShard(ctx context.Context, cmd Cmder) (*ringShard, error) {
  474. cmdInfo := c.cmdInfo(ctx, cmd.Name())
  475. pos := cmdFirstKeyPos(cmd, cmdInfo)
  476. if pos == 0 {
  477. return c.shards.Random()
  478. }
  479. firstKey := cmd.stringArg(pos)
  480. return c.shards.GetByKey(firstKey)
  481. }
  482. func (c *Ring) process(ctx context.Context, cmd Cmder) error {
  483. var lastErr error
  484. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  485. if attempt > 0 {
  486. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  487. return err
  488. }
  489. }
  490. shard, err := c.cmdShard(ctx, cmd)
  491. if err != nil {
  492. return err
  493. }
  494. lastErr = shard.Client.Process(ctx, cmd)
  495. if lastErr == nil || !shouldRetry(lastErr, cmd.readTimeout() == nil) {
  496. return lastErr
  497. }
  498. }
  499. return lastErr
  500. }
  501. func (c *Ring) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
  502. return c.Pipeline().Pipelined(ctx, fn)
  503. }
  504. func (c *Ring) Pipeline() Pipeliner {
  505. pipe := Pipeline{
  506. ctx: c.ctx,
  507. exec: c.processPipeline,
  508. }
  509. pipe.init()
  510. return &pipe
  511. }
  512. func (c *Ring) processPipeline(ctx context.Context, cmds []Cmder) error {
  513. return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  514. return c.generalProcessPipeline(ctx, cmds, false)
  515. })
  516. }
  517. func (c *Ring) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
  518. return c.TxPipeline().Pipelined(ctx, fn)
  519. }
  520. func (c *Ring) TxPipeline() Pipeliner {
  521. pipe := Pipeline{
  522. ctx: c.ctx,
  523. exec: c.processTxPipeline,
  524. }
  525. pipe.init()
  526. return &pipe
  527. }
  528. func (c *Ring) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  529. return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  530. return c.generalProcessPipeline(ctx, cmds, true)
  531. })
  532. }
  533. func (c *Ring) generalProcessPipeline(
  534. ctx context.Context, cmds []Cmder, tx bool,
  535. ) error {
  536. cmdsMap := make(map[string][]Cmder)
  537. for _, cmd := range cmds {
  538. cmdInfo := c.cmdInfo(ctx, cmd.Name())
  539. hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
  540. if hash != "" {
  541. hash = c.shards.Hash(hash)
  542. }
  543. cmdsMap[hash] = append(cmdsMap[hash], cmd)
  544. }
  545. var wg sync.WaitGroup
  546. for hash, cmds := range cmdsMap {
  547. wg.Add(1)
  548. go func(hash string, cmds []Cmder) {
  549. defer wg.Done()
  550. _ = c.processShardPipeline(ctx, hash, cmds, tx)
  551. }(hash, cmds)
  552. }
  553. wg.Wait()
  554. return cmdsFirstErr(cmds)
  555. }
  556. func (c *Ring) processShardPipeline(
  557. ctx context.Context, hash string, cmds []Cmder, tx bool,
  558. ) error {
  559. // TODO: retry?
  560. shard, err := c.shards.GetByName(hash)
  561. if err != nil {
  562. setCmdsErr(cmds, err)
  563. return err
  564. }
  565. if tx {
  566. return shard.Client.processTxPipeline(ctx, cmds)
  567. }
  568. return shard.Client.processPipeline(ctx, cmds)
  569. }
  570. func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
  571. if len(keys) == 0 {
  572. return fmt.Errorf("redis: Watch requires at least one key")
  573. }
  574. var shards []*ringShard
  575. for _, key := range keys {
  576. if key != "" {
  577. shard, err := c.shards.GetByKey(hashtag.Key(key))
  578. if err != nil {
  579. return err
  580. }
  581. shards = append(shards, shard)
  582. }
  583. }
  584. if len(shards) == 0 {
  585. return fmt.Errorf("redis: Watch requires at least one shard")
  586. }
  587. if len(shards) > 1 {
  588. for _, shard := range shards[1:] {
  589. if shard.Client != shards[0].Client {
  590. err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
  591. return err
  592. }
  593. }
  594. }
  595. return shards[0].Client.Watch(ctx, fn, keys...)
  596. }
  597. // Close closes the ring client, releasing any open resources.
  598. //
  599. // It is rare to Close a Ring, as the Ring is meant to be long-lived
  600. // and shared between many goroutines.
  601. func (c *Ring) Close() error {
  602. return c.shards.Close()
  603. }