diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2019-04-08 17:05:15 +0800 |
---|---|---|
committer | Lauris BH <lauris@nix.lv> | 2019-04-08 12:05:15 +0300 |
commit | e7d7dcb0901b32bed90061e7bc8ae1d2e3c75654 (patch) | |
tree | 1287587835fadda43f03f03ebb8b873091f356c8 /vendor/github.com | |
parent | 6e4af4985e60de2f7946be6b0fa5a5d24cdad78a (diff) | |
download | gitea-e7d7dcb0901b32bed90061e7bc8ae1d2e3c75654.tar.gz gitea-e7d7dcb0901b32bed90061e7bc8ae1d2e3c75654.zip |
Issue indexer queue redis support (#6218)
* add redis queue
* finished indexer redis queue
* add redis vendor
* fix vet
* Update docs/content/doc/advanced/config-cheat-sheet.en-us.md
Co-Authored-By: lunny <xiaolunwen@gmail.com>
* switch to go mod
* Update required changes for new logging func signatures
Diffstat (limited to 'vendor/github.com')
39 files changed, 11198 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/.gitignore b/vendor/github.com/go-redis/redis/.gitignore new file mode 100644 index 0000000000..ebfe903bcd --- /dev/null +++ b/vendor/github.com/go-redis/redis/.gitignore @@ -0,0 +1,2 @@ +*.rdb +testdata/*/ diff --git a/vendor/github.com/go-redis/redis/.travis.yml b/vendor/github.com/go-redis/redis/.travis.yml new file mode 100644 index 0000000000..6b110b4cbb --- /dev/null +++ b/vendor/github.com/go-redis/redis/.travis.yml @@ -0,0 +1,19 @@ +sudo: false +language: go + +services: + - redis-server + +go: + - 1.9.x + - 1.10.x + - 1.11.x + - tip + +matrix: + allow_failures: + - go: tip + +install: + - go get github.com/onsi/ginkgo + - go get github.com/onsi/gomega diff --git a/vendor/github.com/go-redis/redis/CHANGELOG.md b/vendor/github.com/go-redis/redis/CHANGELOG.md new file mode 100644 index 0000000000..19645661a4 --- /dev/null +++ b/vendor/github.com/go-redis/redis/CHANGELOG.md @@ -0,0 +1,25 @@ +# Changelog + +## Unreleased + +- Cluster and Ring pipelines process commands for each node in its own goroutine. + +## 6.14 + +- Added Options.MinIdleConns. +- Added Options.MaxConnAge. +- PoolStats.FreeConns is renamed to PoolStats.IdleConns. +- Add Client.Do to simplify creating custom commands. +- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. +- Lower memory usage. + +## v6.13 + +- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. +- Cluster client was optimized to use much less memory when reloading cluster state. +- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. +- Dialer.KeepAlive is set to 5 minutes by default. + +## v6.12 + +- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup diff --git a/vendor/github.com/go-redis/redis/LICENSE b/vendor/github.com/go-redis/redis/LICENSE new file mode 100644 index 0000000000..298bed9bea --- /dev/null +++ b/vendor/github.com/go-redis/redis/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2013 The github.com/go-redis/redis Authors. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/go-redis/redis/Makefile b/vendor/github.com/go-redis/redis/Makefile new file mode 100644 index 0000000000..fa3b4e004f --- /dev/null +++ b/vendor/github.com/go-redis/redis/Makefile @@ -0,0 +1,22 @@ +all: testdeps + go test ./... + go test ./... -short -race + env GOOS=linux GOARCH=386 go test ./... + go vet + go get github.com/gordonklaus/ineffassign + ineffassign . + +testdeps: testdata/redis/src/redis-server + +bench: testdeps + go test ./... -test.run=NONE -test.bench=. -test.benchmem + +.PHONY: all test testdeps bench + +testdata/redis: + mkdir -p $@ + wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@ + +testdata/redis/src/redis-server: testdata/redis + sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile + cd $< && make all diff --git a/vendor/github.com/go-redis/redis/README.md b/vendor/github.com/go-redis/redis/README.md new file mode 100644 index 0000000000..7d05b44661 --- /dev/null +++ b/vendor/github.com/go-redis/redis/README.md @@ -0,0 +1,146 @@ +# Redis client for Golang + +[![Build Status](https://travis-ci.org/go-redis/redis.png?branch=master)](https://travis-ci.org/go-redis/redis) +[![GoDoc](https://godoc.org/github.com/go-redis/redis?status.svg)](https://godoc.org/github.com/go-redis/redis) +[![Airbrake](https://img.shields.io/badge/kudos-airbrake.io-orange.svg)](https://airbrake.io) + +Supports: + +- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC. +- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support. +- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub). +- [Transactions](https://godoc.org/github.com/go-redis/redis#Multi). +- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). +- [Scripting](https://godoc.org/github.com/go-redis/redis#Script). +- [Timeouts](https://godoc.org/github.com/go-redis/redis#Options). +- [Redis Sentinel](https://godoc.org/github.com/go-redis/redis#NewFailoverClient). +- [Redis Cluster](https://godoc.org/github.com/go-redis/redis#NewClusterClient). +- [Cluster of Redis Servers](https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup) without using cluster mode and Redis Sentinel. +- [Ring](https://godoc.org/github.com/go-redis/redis#NewRing). +- [Instrumentation](https://godoc.org/github.com/go-redis/redis#ex-package--Instrumentation). +- [Cache friendly](https://github.com/go-redis/cache). +- [Rate limiting](https://github.com/go-redis/redis_rate). +- [Distributed Locks](https://github.com/bsm/redis-lock). + +API docs: https://godoc.org/github.com/go-redis/redis. +Examples: https://godoc.org/github.com/go-redis/redis#pkg-examples. + +## Installation + +Install: + +```shell +go get -u github.com/go-redis/redis +``` + +Import: + +```go +import "github.com/go-redis/redis" +``` + +## Quickstart + +```go +func ExampleNewClient() { + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", // no password set + DB: 0, // use default DB + }) + + pong, err := client.Ping().Result() + fmt.Println(pong, err) + // Output: PONG <nil> +} + +func ExampleClient() { + err := client.Set("key", "value", 0).Err() + if err != nil { + panic(err) + } + + val, err := client.Get("key").Result() + if err != nil { + panic(err) + } + fmt.Println("key", val) + + val2, err := client.Get("key2").Result() + if err == redis.Nil { + fmt.Println("key2 does not exist") + } else if err != nil { + panic(err) + } else { + fmt.Println("key2", val2) + } + // Output: key value + // key2 does not exist +} +``` + +## Howto + +Please go through [examples](https://godoc.org/github.com/go-redis/redis#pkg-examples) to get an idea how to use this package. + +## Look and feel + +Some corner cases: + +```go +// SET key value EX 10 NX +set, err := client.SetNX("key", "value", 10*time.Second).Result() + +// SORT list LIMIT 0 2 ASC +vals, err := client.Sort("list", redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() + +// ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2 +vals, err := client.ZRangeByScoreWithScores("zset", redis.ZRangeBy{ + Min: "-inf", + Max: "+inf", + Offset: 0, + Count: 2, +}).Result() + +// ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM +vals, err := client.ZInterStore("out", redis.ZStore{Weights: []int64{2, 3}}, "zset1", "zset2").Result() + +// EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" +vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() +``` + +## Benchmark + +go-redis vs redigo: + +``` +BenchmarkSetGoRedis10Conns64Bytes-4 200000 7621 ns/op 210 B/op 6 allocs/op +BenchmarkSetGoRedis100Conns64Bytes-4 200000 7554 ns/op 210 B/op 6 allocs/op +BenchmarkSetGoRedis10Conns1KB-4 200000 7697 ns/op 210 B/op 6 allocs/op +BenchmarkSetGoRedis100Conns1KB-4 200000 7688 ns/op 210 B/op 6 allocs/op +BenchmarkSetGoRedis10Conns10KB-4 200000 9214 ns/op 210 B/op 6 allocs/op +BenchmarkSetGoRedis100Conns10KB-4 200000 9181 ns/op 210 B/op 6 allocs/op +BenchmarkSetGoRedis10Conns1MB-4 2000 583242 ns/op 2337 B/op 6 allocs/op +BenchmarkSetGoRedis100Conns1MB-4 2000 583089 ns/op 2338 B/op 6 allocs/op +BenchmarkSetRedigo10Conns64Bytes-4 200000 7576 ns/op 208 B/op 7 allocs/op +BenchmarkSetRedigo100Conns64Bytes-4 200000 7782 ns/op 208 B/op 7 allocs/op +BenchmarkSetRedigo10Conns1KB-4 200000 7958 ns/op 208 B/op 7 allocs/op +BenchmarkSetRedigo100Conns1KB-4 200000 7725 ns/op 208 B/op 7 allocs/op +BenchmarkSetRedigo10Conns10KB-4 100000 18442 ns/op 208 B/op 7 allocs/op +BenchmarkSetRedigo100Conns10KB-4 100000 18818 ns/op 208 B/op 7 allocs/op +BenchmarkSetRedigo10Conns1MB-4 2000 668829 ns/op 226 B/op 7 allocs/op +BenchmarkSetRedigo100Conns1MB-4 2000 679542 ns/op 226 B/op 7 allocs/op +``` + +Redis Cluster: + +``` +BenchmarkRedisPing-4 200000 6983 ns/op 116 B/op 4 allocs/op +BenchmarkRedisClusterPing-4 100000 11535 ns/op 117 B/op 4 allocs/op +``` + +## See also + +- [Golang PostgreSQL ORM](https://github.com/go-pg/pg) +- [Golang msgpack](https://github.com/vmihailenco/msgpack) +- [Golang message task queue](https://github.com/go-msgqueue/msgqueue) diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go new file mode 100644 index 0000000000..0cecc62c4b --- /dev/null +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -0,0 +1,1621 @@ +package redis + +import ( + "context" + "crypto/tls" + "fmt" + "math" + "math/rand" + "net" + "runtime" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/hashtag" + "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" +) + +var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") + +// ClusterOptions are used to configure a cluster client and should be +// passed to NewClusterClient. +type ClusterOptions struct { + // A seed list of host:port addresses of cluster nodes. + Addrs []string + + // The maximum number of retries before giving up. Command is retried + // on network errors and MOVED/ASK redirects. + // Default is 8 retries. + MaxRedirects int + + // Enables read-only commands on slave nodes. + ReadOnly bool + // Allows routing read-only commands to the closest master or slave node. + // It automatically enables ReadOnly. + RouteByLatency bool + // Allows routing read-only commands to the random master or slave node. + // It automatically enables ReadOnly. + RouteRandomly bool + + // Optional function that returns cluster slots information. + // It is useful to manually create cluster of standalone Redis servers + // and load-balance read/write operations between master and slaves. + // It can use service like ZooKeeper to maintain configuration information + // and Cluster.ReloadState to manually trigger state reloading. + ClusterSlots func() ([]ClusterSlot, error) + + // Optional hook that is called when a new node is created. + OnNewNode func(*Client) + + // Following options are copied from Options struct. + + OnConnect func(*Conn) error + + Password string + + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + // PoolSize applies per cluster node and not for the whole cluster. + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration + + TLSConfig *tls.Config +} + +func (opt *ClusterOptions) init() { + if opt.MaxRedirects == -1 { + opt.MaxRedirects = 0 + } else if opt.MaxRedirects == 0 { + opt.MaxRedirects = 8 + } + + if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { + opt.ReadOnly = true + } + + if opt.PoolSize == 0 { + opt.PoolSize = 5 * runtime.NumCPU() + } + + switch opt.ReadTimeout { + case -1: + opt.ReadTimeout = 0 + case 0: + opt.ReadTimeout = 3 * time.Second + } + switch opt.WriteTimeout { + case -1: + opt.WriteTimeout = 0 + case 0: + opt.WriteTimeout = opt.ReadTimeout + } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } +} + +func (opt *ClusterOptions) clientOptions() *Options { + const disableIdleCheck = -1 + + return &Options{ + OnConnect: opt.OnConnect, + + MaxRetries: opt.MaxRetries, + MinRetryBackoff: opt.MinRetryBackoff, + MaxRetryBackoff: opt.MaxRetryBackoff, + Password: opt.Password, + readOnly: opt.ReadOnly, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: disableIdleCheck, + + TLSConfig: opt.TLSConfig, + } +} + +//------------------------------------------------------------------------------ + +type clusterNode struct { + Client *Client + + latency uint32 // atomic + generation uint32 // atomic + loading uint32 // atomic +} + +func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { + opt := clOpt.clientOptions() + opt.Addr = addr + node := clusterNode{ + Client: NewClient(opt), + } + + node.latency = math.MaxUint32 + if clOpt.RouteByLatency { + go node.updateLatency() + } + + if clOpt.OnNewNode != nil { + clOpt.OnNewNode(node.Client) + } + + return &node +} + +func (n *clusterNode) String() string { + return n.Client.String() +} + +func (n *clusterNode) Close() error { + return n.Client.Close() +} + +func (n *clusterNode) updateLatency() { + const probes = 10 + + var latency uint32 + for i := 0; i < probes; i++ { + start := time.Now() + n.Client.Ping() + probe := uint32(time.Since(start) / time.Microsecond) + latency = (latency + probe) / 2 + } + atomic.StoreUint32(&n.latency, latency) +} + +func (n *clusterNode) Latency() time.Duration { + latency := atomic.LoadUint32(&n.latency) + return time.Duration(latency) * time.Microsecond +} + +func (n *clusterNode) MarkAsLoading() { + atomic.StoreUint32(&n.loading, uint32(time.Now().Unix())) +} + +func (n *clusterNode) Loading() bool { + const minute = int64(time.Minute / time.Second) + + loading := atomic.LoadUint32(&n.loading) + if loading == 0 { + return false + } + if time.Now().Unix()-int64(loading) < minute { + return true + } + atomic.StoreUint32(&n.loading, 0) + return false +} + +func (n *clusterNode) Generation() uint32 { + return atomic.LoadUint32(&n.generation) +} + +func (n *clusterNode) SetGeneration(gen uint32) { + for { + v := atomic.LoadUint32(&n.generation) + if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) { + break + } + } +} + +//------------------------------------------------------------------------------ + +type clusterNodes struct { + opt *ClusterOptions + + mu sync.RWMutex + allAddrs []string + allNodes map[string]*clusterNode + clusterAddrs []string + closed bool + + _generation uint32 // atomic +} + +func newClusterNodes(opt *ClusterOptions) *clusterNodes { + return &clusterNodes{ + opt: opt, + + allAddrs: opt.Addrs, + allNodes: make(map[string]*clusterNode), + } +} + +func (c *clusterNodes) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil + } + c.closed = true + + var firstErr error + for _, node := range c.allNodes { + if err := node.Client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + + c.allNodes = nil + c.clusterAddrs = nil + + return firstErr +} + +func (c *clusterNodes) Addrs() ([]string, error) { + var addrs []string + c.mu.RLock() + closed := c.closed + if !closed { + if len(c.clusterAddrs) > 0 { + addrs = c.clusterAddrs + } else { + addrs = c.allAddrs + } + } + c.mu.RUnlock() + + if closed { + return nil, pool.ErrClosed + } + if len(addrs) == 0 { + return nil, errClusterNoNodes + } + return addrs, nil +} + +func (c *clusterNodes) NextGeneration() uint32 { + return atomic.AddUint32(&c._generation, 1) +} + +// GC removes unused nodes. +func (c *clusterNodes) GC(generation uint32) { + var collected []*clusterNode + c.mu.Lock() + for addr, node := range c.allNodes { + if node.Generation() >= generation { + continue + } + + c.clusterAddrs = remove(c.clusterAddrs, addr) + delete(c.allNodes, addr) + collected = append(collected, node) + } + c.mu.Unlock() + + for _, node := range collected { + _ = node.Client.Close() + } +} + +func (c *clusterNodes) Get(addr string) (*clusterNode, error) { + var node *clusterNode + var err error + c.mu.RLock() + if c.closed { + err = pool.ErrClosed + } else { + node = c.allNodes[addr] + } + c.mu.RUnlock() + return node, err +} + +func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { + node, err := c.Get(addr) + if err != nil { + return nil, err + } + if node != nil { + return node, nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil, pool.ErrClosed + } + + node, ok := c.allNodes[addr] + if ok { + return node, err + } + + node = newClusterNode(c.opt, addr) + + c.allAddrs = appendIfNotExists(c.allAddrs, addr) + c.clusterAddrs = append(c.clusterAddrs, addr) + c.allNodes[addr] = node + + return node, err +} + +func (c *clusterNodes) All() ([]*clusterNode, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.closed { + return nil, pool.ErrClosed + } + + cp := make([]*clusterNode, 0, len(c.allNodes)) + for _, node := range c.allNodes { + cp = append(cp, node) + } + return cp, nil +} + +func (c *clusterNodes) Random() (*clusterNode, error) { + addrs, err := c.Addrs() + if err != nil { + return nil, err + } + + n := rand.Intn(len(addrs)) + return c.GetOrCreate(addrs[n]) +} + +//------------------------------------------------------------------------------ + +type clusterSlot struct { + start, end int + nodes []*clusterNode +} + +type clusterSlotSlice []*clusterSlot + +func (p clusterSlotSlice) Len() int { + return len(p) +} + +func (p clusterSlotSlice) Less(i, j int) bool { + return p[i].start < p[j].start +} + +func (p clusterSlotSlice) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + +type clusterState struct { + nodes *clusterNodes + Masters []*clusterNode + Slaves []*clusterNode + + slots []*clusterSlot + + generation uint32 + createdAt time.Time +} + +func newClusterState( + nodes *clusterNodes, slots []ClusterSlot, origin string, +) (*clusterState, error) { + c := clusterState{ + nodes: nodes, + + slots: make([]*clusterSlot, 0, len(slots)), + + generation: nodes.NextGeneration(), + createdAt: time.Now(), + } + + originHost, _, _ := net.SplitHostPort(origin) + isLoopbackOrigin := isLoopback(originHost) + + for _, slot := range slots { + var nodes []*clusterNode + for i, slotNode := range slot.Nodes { + addr := slotNode.Addr + if !isLoopbackOrigin { + addr = replaceLoopbackHost(addr, originHost) + } + + node, err := c.nodes.GetOrCreate(addr) + if err != nil { + return nil, err + } + + node.SetGeneration(c.generation) + nodes = append(nodes, node) + + if i == 0 { + c.Masters = appendUniqueNode(c.Masters, node) + } else { + c.Slaves = appendUniqueNode(c.Slaves, node) + } + } + + c.slots = append(c.slots, &clusterSlot{ + start: slot.Start, + end: slot.End, + nodes: nodes, + }) + } + + sort.Sort(clusterSlotSlice(c.slots)) + + time.AfterFunc(time.Minute, func() { + nodes.GC(c.generation) + }) + + return &c, nil +} + +func replaceLoopbackHost(nodeAddr, originHost string) string { + nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) + if err != nil { + return nodeAddr + } + + nodeIP := net.ParseIP(nodeHost) + if nodeIP == nil { + return nodeAddr + } + + if !nodeIP.IsLoopback() { + return nodeAddr + } + + // Use origin host which is not loopback and node port. + return net.JoinHostPort(originHost, nodePort) +} + +func isLoopback(host string) bool { + ip := net.ParseIP(host) + if ip == nil { + return true + } + return ip.IsLoopback() +} + +func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { + nodes := c.slotNodes(slot) + if len(nodes) > 0 { + return nodes[0], nil + } + return c.nodes.Random() +} + +func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { + nodes := c.slotNodes(slot) + switch len(nodes) { + case 0: + return c.nodes.Random() + case 1: + return nodes[0], nil + case 2: + if slave := nodes[1]; !slave.Loading() { + return slave, nil + } + return nodes[0], nil + default: + var slave *clusterNode + for i := 0; i < 10; i++ { + n := rand.Intn(len(nodes)-1) + 1 + slave = nodes[n] + if !slave.Loading() { + return slave, nil + } + } + + // All slaves are loading - use master. + return nodes[0], nil + } +} + +func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { + const threshold = time.Millisecond + + nodes := c.slotNodes(slot) + if len(nodes) == 0 { + return c.nodes.Random() + } + + var node *clusterNode + for _, n := range nodes { + if n.Loading() { + continue + } + if node == nil || node.Latency()-n.Latency() > threshold { + node = n + } + } + return node, nil +} + +func (c *clusterState) slotRandomNode(slot int) *clusterNode { + nodes := c.slotNodes(slot) + n := rand.Intn(len(nodes)) + return nodes[n] +} + +func (c *clusterState) slotNodes(slot int) []*clusterNode { + i := sort.Search(len(c.slots), func(i int) bool { + return c.slots[i].end >= slot + }) + if i >= len(c.slots) { + return nil + } + x := c.slots[i] + if slot >= x.start && slot <= x.end { + return x.nodes + } + return nil +} + +//------------------------------------------------------------------------------ + +type clusterStateHolder struct { + load func() (*clusterState, error) + + state atomic.Value + reloading uint32 // atomic +} + +func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder { + return &clusterStateHolder{ + load: fn, + } +} + +func (c *clusterStateHolder) Reload() (*clusterState, error) { + state, err := c.load() + if err != nil { + return nil, err + } + c.state.Store(state) + return state, nil +} + +func (c *clusterStateHolder) LazyReload() { + if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { + return + } + go func() { + defer atomic.StoreUint32(&c.reloading, 0) + + _, err := c.Reload() + if err != nil { + return + } + time.Sleep(100 * time.Millisecond) + }() +} + +func (c *clusterStateHolder) Get() (*clusterState, error) { + v := c.state.Load() + if v != nil { + state := v.(*clusterState) + if time.Since(state.createdAt) > time.Minute { + c.LazyReload() + } + return state, nil + } + return c.Reload() +} + +func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { + state, err := c.Reload() + if err == nil { + return state, nil + } + return c.Get() +} + +//------------------------------------------------------------------------------ + +// ClusterClient is a Redis Cluster client representing a pool of zero +// or more underlying connections. It's safe for concurrent use by +// multiple goroutines. +type ClusterClient struct { + cmdable + + ctx context.Context + + opt *ClusterOptions + nodes *clusterNodes + state *clusterStateHolder + cmdsInfoCache *cmdsInfoCache + + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error +} + +// NewClusterClient returns a Redis Cluster client as described in +// http://redis.io/topics/cluster-spec. +func NewClusterClient(opt *ClusterOptions) *ClusterClient { + opt.init() + + c := &ClusterClient{ + opt: opt, + nodes: newClusterNodes(opt), + } + c.state = newClusterStateHolder(c.loadState) + c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) + + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline + + c.init() + if opt.IdleCheckFrequency > 0 { + go c.reaper(opt.IdleCheckFrequency) + } + + return c +} + +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + +// ReloadState reloads cluster state. If available it calls ClusterSlots func +// to get cluster slots information. +func (c *ClusterClient) ReloadState() error { + _, err := c.state.Reload() + return err +} + +func (c *ClusterClient) Context() context.Context { + if c.ctx != nil { + return c.ctx + } + return context.Background() +} + +func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { + if ctx == nil { + panic("nil context") + } + c2 := c.copy() + c2.ctx = ctx + return c2 +} + +func (c *ClusterClient) copy() *ClusterClient { + cp := *c + cp.init() + return &cp +} + +// Options returns read-only Options that were used to create the client. +func (c *ClusterClient) Options() *ClusterOptions { + return c.opt +} + +func (c *ClusterClient) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) { + addrs, err := c.nodes.Addrs() + if err != nil { + return nil, err + } + + var firstErr error + for _, addr := range addrs { + node, err := c.nodes.Get(addr) + if err != nil { + return nil, err + } + if node == nil { + continue + } + + info, err := node.Client.Command().Result() + if err == nil { + return info, nil + } + if firstErr == nil { + firstErr = err + } + } + return nil, firstErr +} + +func (c *ClusterClient) cmdInfo(name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get() + if err != nil { + return nil + } + + info := cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info +} + +func cmdSlot(cmd Cmder, pos int) int { + if pos == 0 { + return hashtag.RandomSlot() + } + firstKey := cmd.stringArg(pos) + return hashtag.Slot(firstKey) +} + +func (c *ClusterClient) cmdSlot(cmd Cmder) int { + args := cmd.Args() + if args[0] == "cluster" && args[1] == "getkeysinslot" { + return args[2].(int) + } + + cmdInfo := c.cmdInfo(cmd.Name()) + return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) +} + +func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { + state, err := c.state.Get() + if err != nil { + return 0, nil, err + } + + cmdInfo := c.cmdInfo(cmd.Name()) + slot := c.cmdSlot(cmd) + + if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { + if c.opt.RouteByLatency { + node, err := state.slotClosestNode(slot) + return slot, node, err + } + + if c.opt.RouteRandomly { + node := state.slotRandomNode(slot) + return slot, node, nil + } + + node, err := state.slotSlaveNode(slot) + return slot, node, err + } + + node, err := state.slotMasterNode(slot) + return slot, node, err +} + +func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { + state, err := c.state.Get() + if err != nil { + return nil, err + } + + nodes := state.slotNodes(slot) + if len(nodes) > 0 { + return nodes[0], nil + } + return c.nodes.Random() +} + +func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { + if len(keys) == 0 { + return fmt.Errorf("redis: Watch requires at least one key") + } + + slot := hashtag.Slot(keys[0]) + for _, key := range keys[1:] { + if hashtag.Slot(key) != slot { + err := fmt.Errorf("redis: Watch requires all keys to be in the same slot") + return err + } + } + + node, err := c.slotMasterNode(slot) + if err != nil { + return err + } + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + err = node.Client.Watch(fn, keys...) + if err == nil { + break + } + if err != Nil { + c.state.LazyReload() + } + + moved, ask, addr := internal.IsMovedError(err) + if moved || ask { + node, err = c.nodes.GetOrCreate(addr) + if err != nil { + return err + } + continue + } + + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { + node, err = c.slotMasterNode(slot) + if err != nil { + return err + } + continue + } + + if internal.IsRetryableError(err, true) { + continue + } + + return err + } + + return err +} + +// Close closes the cluster client, releasing any open resources. +// +// It is rare to Close a ClusterClient, as the ClusterClient is meant +// to be long-lived and shared between many goroutines. +func (c *ClusterClient) Close() error { + return c.nodes.Close() +} + +// Do creates a Cmd from the args and processes the cmd. +func (c *ClusterClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *ClusterClient) WrapProcess( + fn func(oldProcess func(Cmder) error) func(Cmder) error, +) { + c.process = fn(c.process) +} + +func (c *ClusterClient) Process(cmd Cmder) error { + return c.process(cmd) +} + +func (c *ClusterClient) defaultProcess(cmd Cmder) error { + var node *clusterNode + var ask bool + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + if node == nil { + var err error + _, node, err = c.cmdSlotAndNode(cmd) + if err != nil { + cmd.setErr(err) + break + } + } + + var err error + if ask { + pipe := node.Client.Pipeline() + _ = pipe.Process(NewCmd("ASKING")) + _ = pipe.Process(cmd) + _, err = pipe.Exec() + _ = pipe.Close() + ask = false + } else { + err = node.Client.Process(cmd) + } + + // If there is no error - we are done. + if err == nil { + break + } + if err != Nil { + c.state.LazyReload() + } + + // If slave is loading - pick another node. + if c.opt.ReadOnly && internal.IsLoadingError(err) { + node.MarkAsLoading() + node = nil + continue + } + + var moved bool + var addr string + moved, ask, addr = internal.IsMovedError(err) + if moved || ask { + node, err = c.nodes.GetOrCreate(addr) + if err != nil { + break + } + continue + } + + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { + node = nil + continue + } + + if internal.IsRetryableError(err, true) { + // First retry the same node. + if attempt == 0 { + continue + } + + // Second try random node. + node, err = c.nodes.Random() + if err != nil { + break + } + continue + } + + break + } + + return cmd.Err() +} + +// ForEachMaster concurrently calls the fn on each master node in the cluster. +// It returns the first error if any. +func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { + state, err := c.state.ReloadOrGet() + if err != nil { + return err + } + + var wg sync.WaitGroup + errCh := make(chan error, 1) + for _, master := range state.Masters { + wg.Add(1) + go func(node *clusterNode) { + defer wg.Done() + err := fn(node.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + }(master) + } + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } +} + +// ForEachSlave concurrently calls the fn on each slave node in the cluster. +// It returns the first error if any. +func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { + state, err := c.state.ReloadOrGet() + if err != nil { + return err + } + + var wg sync.WaitGroup + errCh := make(chan error, 1) + for _, slave := range state.Slaves { + wg.Add(1) + go func(node *clusterNode) { + defer wg.Done() + err := fn(node.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + }(slave) + } + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } +} + +// ForEachNode concurrently calls the fn on each known node in the cluster. +// It returns the first error if any. +func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { + state, err := c.state.ReloadOrGet() + if err != nil { + return err + } + + var wg sync.WaitGroup + errCh := make(chan error, 1) + worker := func(node *clusterNode) { + defer wg.Done() + err := fn(node.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + } + + for _, node := range state.Masters { + wg.Add(1) + go worker(node) + } + for _, node := range state.Slaves { + wg.Add(1) + go worker(node) + } + + wg.Wait() + select { + case err := <-errCh: + return err + default: + return nil + } +} + +// PoolStats returns accumulated connection pool stats. +func (c *ClusterClient) PoolStats() *PoolStats { + var acc PoolStats + + state, _ := c.state.Get() + if state == nil { + return &acc + } + + for _, node := range state.Masters { + s := node.Client.connPool.Stats() + acc.Hits += s.Hits + acc.Misses += s.Misses + acc.Timeouts += s.Timeouts + + acc.TotalConns += s.TotalConns + acc.IdleConns += s.IdleConns + acc.StaleConns += s.StaleConns + } + + for _, node := range state.Slaves { + s := node.Client.connPool.Stats() + acc.Hits += s.Hits + acc.Misses += s.Misses + acc.Timeouts += s.Timeouts + + acc.TotalConns += s.TotalConns + acc.IdleConns += s.IdleConns + acc.StaleConns += s.StaleConns + } + + return &acc +} + +func (c *ClusterClient) loadState() (*clusterState, error) { + if c.opt.ClusterSlots != nil { + slots, err := c.opt.ClusterSlots() + if err != nil { + return nil, err + } + return newClusterState(c.nodes, slots, "") + } + + addrs, err := c.nodes.Addrs() + if err != nil { + return nil, err + } + + var firstErr error + for _, addr := range addrs { + node, err := c.nodes.GetOrCreate(addr) + if err != nil { + if firstErr == nil { + firstErr = err + } + continue + } + + slots, err := node.Client.ClusterSlots().Result() + if err != nil { + if firstErr == nil { + firstErr = err + } + continue + } + + return newClusterState(c.nodes, slots, node.Client.opt.Addr) + } + + return nil, firstErr +} + +// reaper closes idle connections to the cluster. +func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { + ticker := time.NewTicker(idleCheckFrequency) + defer ticker.Stop() + + for range ticker.C { + nodes, err := c.nodes.All() + if err != nil { + break + } + + for _, node := range nodes { + _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() + if err != nil { + internal.Logf("ReapStaleConns failed: %s", err) + } + } + } +} + +func (c *ClusterClient) Pipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} + +func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(fn) +} + +func (c *ClusterClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { + cmdsMap := newCmdsMap() + err := c.mapCmdsByNode(cmds, cmdsMap) + if err != nil { + setCmdsErr(cmds, err) + return err + } + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + failedCmds := newCmdsMap() + var wg sync.WaitGroup + + for node, cmds := range cmdsMap.m { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return + } + + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) + } + + wg.Wait() + if len(failedCmds.m) == 0 { + break + } + cmdsMap = failedCmds + } + + return cmdsFirstErr(cmds) +} + +type cmdsMap struct { + mu sync.Mutex + m map[*clusterNode][]Cmder +} + +func newCmdsMap() *cmdsMap { + return &cmdsMap{ + m: make(map[*clusterNode][]Cmder), + } +} + +func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { + state, err := c.state.Get() + if err != nil { + setCmdsErr(cmds, err) + return err + } + + cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) + for _, cmd := range cmds { + var node *clusterNode + var err error + if cmdsAreReadOnly { + _, node, err = c.cmdSlotAndNode(cmd) + } else { + slot := c.cmdSlot(cmd) + node, err = state.slotMasterNode(slot) + } + if err != nil { + return err + } + cmdsMap.mu.Lock() + cmdsMap.m[node] = append(cmdsMap.m[node], cmd) + cmdsMap.mu.Unlock() + } + return nil +} + +func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { + for _, cmd := range cmds { + cmdInfo := c.cmdInfo(cmd.Name()) + if cmdInfo == nil || !cmdInfo.ReadOnly { + return false + } + } + return true +} + +func (c *ClusterClient) pipelineProcessCmds( + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, +) error { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) + if err != nil { + setCmdsErr(cmds, err) + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() + return err + } + + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return c.pipelineReadCmds(node, rd, cmds, failedCmds) + }) + return err +} + +func (c *ClusterClient) pipelineReadCmds( + node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, +) error { + var firstErr error + for _, cmd := range cmds { + err := cmd.readReply(rd) + if err == nil { + continue + } + + if c.checkMovedErr(cmd, err, failedCmds) { + continue + } + + if internal.IsRedisError(err) { + continue + } + + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() + if firstErr == nil { + firstErr = err + } + } + return firstErr +} + +func (c *ClusterClient) checkMovedErr( + cmd Cmder, err error, failedCmds *cmdsMap, +) bool { + moved, ask, addr := internal.IsMovedError(err) + + if moved { + c.state.LazyReload() + + node, err := c.nodes.GetOrCreate(addr) + if err != nil { + return false + } + + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() + return true + } + + if ask { + node, err := c.nodes.GetOrCreate(addr) + if err != nil { + return false + } + + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Unlock() + return true + } + + return false +} + +// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. +func (c *ClusterClient) TxPipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processTxPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} + +func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.TxPipeline().Pipelined(fn) +} + +func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { + state, err := c.state.Get() + if err != nil { + return err + } + + cmdsMap := c.mapCmdsBySlot(cmds) + for slot, cmds := range cmdsMap { + node, err := state.slotMasterNode(slot) + if err != nil { + setCmdsErr(cmds, err) + continue + } + cmdsMap := map[*clusterNode][]Cmder{node: cmds} + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + failedCmds := newCmdsMap() + var wg sync.WaitGroup + + for node, cmds := range cmdsMap { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return + } + + err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) + } + + wg.Wait() + if len(failedCmds.m) == 0 { + break + } + cmdsMap = failedCmds.m + } + } + + return cmdsFirstErr(cmds) +} + +func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { + cmdsMap := make(map[int][]Cmder) + for _, cmd := range cmds { + slot := c.cmdSlot(cmd) + cmdsMap[slot] = append(cmdsMap[slot], cmd) + } + return cmdsMap +} + +func (c *ClusterClient) txPipelineProcessCmds( + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, +) error { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { + setCmdsErr(cmds, err) + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() + return err + } + + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := c.txPipelineReadQueued(rd, cmds, failedCmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return err +} + +func (c *ClusterClient) txPipelineReadQueued( + rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, +) error { + // Parse queued replies. + var statusCmd StatusCmd + if err := statusCmd.readReply(rd); err != nil { + return err + } + + for _, cmd := range cmds { + err := statusCmd.readReply(rd) + if err == nil { + continue + } + + if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) { + continue + } + + return err + } + + // Parse number of replies. + line, err := rd.ReadLine() + if err != nil { + if err == Nil { + err = TxFailedErr + } + return err + } + + switch line[0] { + case proto.ErrorReply: + err := proto.ParseErrorReply(line) + for _, cmd := range cmds { + if !c.checkMovedErr(cmd, err, failedCmds) { + break + } + } + return err + case proto.ArrayReply: + // ok + default: + err := fmt.Errorf("redis: expected '*', but got line %q", line) + return err + } + + return nil +} + +func (c *ClusterClient) pubSub() *PubSub { + var node *clusterNode + pubsub := &PubSub{ + opt: c.opt.clientOptions(), + + newConn: func(channels []string) (*pool.Conn, error) { + if node != nil { + panic("node != nil") + } + + slot := hashtag.Slot(channels[0]) + + var err error + node, err = c.slotMasterNode(slot) + if err != nil { + return nil, err + } + + cn, err := node.Client.newConn() + if err != nil { + return nil, err + } + + return cn, nil + }, + closeConn: func(cn *pool.Conn) error { + err := node.Client.connPool.CloseConn(cn) + node = nil + return err + }, + } + pubsub.init() + + return pubsub +} + +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *ClusterClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + +func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { + for _, n := range nodes { + if n == node { + return nodes + } + } + return append(nodes, node) +} + +func appendIfNotExists(ss []string, es ...string) []string { +loop: + for _, e := range es { + for _, s := range ss { + if s == e { + continue loop + } + } + ss = append(ss, e) + } + return ss +} + +func remove(ss []string, es ...string) []string { + if len(es) == 0 { + return ss[:0] + } + for _, e := range es { + for i, s := range ss { + if s == e { + ss = append(ss[:i], ss[i+1:]...) + break + } + } + } + return ss +} diff --git a/vendor/github.com/go-redis/redis/cluster_commands.go b/vendor/github.com/go-redis/redis/cluster_commands.go new file mode 100644 index 0000000000..dff62c902d --- /dev/null +++ b/vendor/github.com/go-redis/redis/cluster_commands.go @@ -0,0 +1,22 @@ +package redis + +import "sync/atomic" + +func (c *ClusterClient) DBSize() *IntCmd { + cmd := NewIntCmd("dbsize") + var size int64 + err := c.ForEachMaster(func(master *Client) error { + n, err := master.DBSize().Result() + if err != nil { + return err + } + atomic.AddInt64(&size, n) + return nil + }) + if err != nil { + cmd.setErr(err) + return cmd + } + cmd.val = size + return cmd +} diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go new file mode 100644 index 0000000000..cb4f94b122 --- /dev/null +++ b/vendor/github.com/go-redis/redis/command.go @@ -0,0 +1,1936 @@ +package redis + +import ( + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/proto" +) + +type Cmder interface { + Name() string + Args() []interface{} + stringArg(int) string + + readReply(rd *proto.Reader) error + setErr(error) + + readTimeout() *time.Duration + + Err() error +} + +func setCmdsErr(cmds []Cmder, e error) { + for _, cmd := range cmds { + if cmd.Err() == nil { + cmd.setErr(e) + } + } +} + +func cmdsFirstErr(cmds []Cmder) error { + for _, cmd := range cmds { + if err := cmd.Err(); err != nil { + return err + } + } + return nil +} + +func writeCmd(wr *proto.Writer, cmds ...Cmder) error { + for _, cmd := range cmds { + err := wr.WriteArgs(cmd.Args()) + if err != nil { + return err + } + } + return nil +} + +func cmdString(cmd Cmder, val interface{}) string { + var ss []string + for _, arg := range cmd.Args() { + ss = append(ss, fmt.Sprint(arg)) + } + s := strings.Join(ss, " ") + if err := cmd.Err(); err != nil { + return s + ": " + err.Error() + } + if val != nil { + switch vv := val.(type) { + case []byte: + return s + ": " + string(vv) + default: + return s + ": " + fmt.Sprint(val) + } + } + return s + +} + +func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { + switch cmd.Name() { + case "eval", "evalsha": + if cmd.stringArg(2) != "0" { + return 3 + } + + return 0 + case "publish": + return 1 + } + if info == nil { + return 0 + } + return int(info.FirstKeyPos) +} + +//------------------------------------------------------------------------------ + +type baseCmd struct { + _args []interface{} + err error + + _readTimeout *time.Duration +} + +var _ Cmder = (*Cmd)(nil) + +func (cmd *baseCmd) Err() error { + return cmd.err +} + +func (cmd *baseCmd) Args() []interface{} { + return cmd._args +} + +func (cmd *baseCmd) stringArg(pos int) string { + if pos < 0 || pos >= len(cmd._args) { + return "" + } + s, _ := cmd._args[pos].(string) + return s +} + +func (cmd *baseCmd) Name() string { + if len(cmd._args) > 0 { + // Cmd name must be lower cased. + s := internal.ToLower(cmd.stringArg(0)) + cmd._args[0] = s + return s + } + return "" +} + +func (cmd *baseCmd) readTimeout() *time.Duration { + return cmd._readTimeout +} + +func (cmd *baseCmd) setReadTimeout(d time.Duration) { + cmd._readTimeout = &d +} + +func (cmd *baseCmd) setErr(e error) { + cmd.err = e +} + +//------------------------------------------------------------------------------ + +type Cmd struct { + baseCmd + + val interface{} +} + +func NewCmd(args ...interface{}) *Cmd { + return &Cmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *Cmd) Val() interface{} { + return cmd.val +} + +func (cmd *Cmd) Result() (interface{}, error) { + return cmd.val, cmd.err +} + +func (cmd *Cmd) String() (string, error) { + if cmd.err != nil { + return "", cmd.err + } + switch val := cmd.val.(type) { + case string: + return val, nil + default: + err := fmt.Errorf("redis: unexpected type=%T for String", val) + return "", err + } +} + +func (cmd *Cmd) Int() (int, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return int(val), nil + case string: + return strconv.Atoi(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int", val) + return 0, err + } +} + +func (cmd *Cmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val, nil + case string: + return strconv.ParseInt(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err + } +} + +func (cmd *Cmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return uint64(val), nil + case string: + return strconv.ParseUint(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + return 0, err + } +} + +func (cmd *Cmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Float64", val) + return 0, err + } +} + +func (cmd *Cmd) Bool() (bool, error) { + if cmd.err != nil { + return false, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val != 0, nil + case string: + return strconv.ParseBool(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Bool", val) + return false, err + } +} + +func (cmd *Cmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadReply(sliceParser) + return cmd.err +} + +// Implements proto.MultiBulkParse +func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { + vals := make([]interface{}, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(sliceParser) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue + } + return nil, err + } + + switch v := v.(type) { + case string: + vals = append(vals, v) + default: + vals = append(vals, v) + } + } + return vals, nil +} + +//------------------------------------------------------------------------------ + +type SliceCmd struct { + baseCmd + + val []interface{} +} + +var _ Cmder = (*SliceCmd)(nil) + +func NewSliceCmd(args ...interface{}) *SliceCmd { + return &SliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *SliceCmd) Val() []interface{} { + return cmd.val +} + +func (cmd *SliceCmd) Result() ([]interface{}, error) { + return cmd.val, cmd.err +} + +func (cmd *SliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *SliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(sliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]interface{}) + return nil +} + +//------------------------------------------------------------------------------ + +type StatusCmd struct { + baseCmd + + val string +} + +var _ Cmder = (*StatusCmd)(nil) + +func NewStatusCmd(args ...interface{}) *StatusCmd { + return &StatusCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *StatusCmd) Val() string { + return cmd.val +} + +func (cmd *StatusCmd) Result() (string, error) { + return cmd.val, cmd.err +} + +func (cmd *StatusCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *StatusCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() + return cmd.err +} + +//------------------------------------------------------------------------------ + +type IntCmd struct { + baseCmd + + val int64 +} + +var _ Cmder = (*IntCmd)(nil) + +func NewIntCmd(args ...interface{}) *IntCmd { + return &IntCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *IntCmd) Val() int64 { + return cmd.val +} + +func (cmd *IntCmd) Result() (int64, error) { + return cmd.val, cmd.err +} + +func (cmd *IntCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *IntCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadIntReply() + return cmd.err +} + +//------------------------------------------------------------------------------ + +type DurationCmd struct { + baseCmd + + val time.Duration + precision time.Duration +} + +var _ Cmder = (*DurationCmd)(nil) + +func NewDurationCmd(precision time.Duration, args ...interface{}) *DurationCmd { + return &DurationCmd{ + baseCmd: baseCmd{_args: args}, + precision: precision, + } +} + +func (cmd *DurationCmd) Val() time.Duration { + return cmd.val +} + +func (cmd *DurationCmd) Result() (time.Duration, error) { + return cmd.val, cmd.err +} + +func (cmd *DurationCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *DurationCmd) readReply(rd *proto.Reader) error { + var n int64 + n, cmd.err = rd.ReadIntReply() + if cmd.err != nil { + return cmd.err + } + cmd.val = time.Duration(n) * cmd.precision + return nil +} + +//------------------------------------------------------------------------------ + +type TimeCmd struct { + baseCmd + + val time.Time +} + +var _ Cmder = (*TimeCmd)(nil) + +func NewTimeCmd(args ...interface{}) *TimeCmd { + return &TimeCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *TimeCmd) Val() time.Time { + return cmd.val +} + +func (cmd *TimeCmd) Result() (time.Time, error) { + return cmd.val, cmd.err +} + +func (cmd *TimeCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *TimeCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(timeParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(time.Time) + return nil +} + +// Implements proto.MultiBulkParse +func timeParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d elements, expected 2", n) + } + + sec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + microsec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + return time.Unix(sec, microsec*1000), nil +} + +//------------------------------------------------------------------------------ + +type BoolCmd struct { + baseCmd + + val bool +} + +var _ Cmder = (*BoolCmd)(nil) + +func NewBoolCmd(args ...interface{}) *BoolCmd { + return &BoolCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *BoolCmd) Val() bool { + return cmd.val +} + +func (cmd *BoolCmd) Result() (bool, error) { + return cmd.val, cmd.err +} + +func (cmd *BoolCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *BoolCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadReply(nil) + // `SET key value NX` returns nil when key already exists. But + // `SETNX key value` returns bool (0/1). So convert nil to bool. + // TODO: is this okay? + if cmd.err == Nil { + cmd.val = false + cmd.err = nil + return nil + } + if cmd.err != nil { + return cmd.err + } + switch v := v.(type) { + case int64: + cmd.val = v == 1 + return nil + case string: + cmd.val = v == "OK" + return nil + default: + cmd.err = fmt.Errorf("got %T, wanted int64 or string", v) + return cmd.err + } +} + +//------------------------------------------------------------------------------ + +type StringCmd struct { + baseCmd + + val string +} + +var _ Cmder = (*StringCmd)(nil) + +func NewStringCmd(args ...interface{}) *StringCmd { + return &StringCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *StringCmd) Val() string { + return cmd.val +} + +func (cmd *StringCmd) Result() (string, error) { + return cmd.Val(), cmd.err +} + +func (cmd *StringCmd) Bytes() ([]byte, error) { + return []byte(cmd.val), cmd.err +} + +func (cmd *StringCmd) Int() (int, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.Atoi(cmd.Val()) +} + +func (cmd *StringCmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.ParseInt(cmd.Val(), 10, 64) +} + +func (cmd *StringCmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.ParseUint(cmd.Val(), 10, 64) +} + +func (cmd *StringCmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.ParseFloat(cmd.Val(), 64) +} + +func (cmd *StringCmd) Scan(val interface{}) error { + if cmd.err != nil { + return cmd.err + } + return proto.Scan([]byte(cmd.val), val) +} + +func (cmd *StringCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *StringCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() + return cmd.err +} + +//------------------------------------------------------------------------------ + +type FloatCmd struct { + baseCmd + + val float64 +} + +var _ Cmder = (*FloatCmd)(nil) + +func NewFloatCmd(args ...interface{}) *FloatCmd { + return &FloatCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *FloatCmd) Val() float64 { + return cmd.val +} + +func (cmd *FloatCmd) Result() (float64, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *FloatCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *FloatCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadFloatReply() + return cmd.err +} + +//------------------------------------------------------------------------------ + +type StringSliceCmd struct { + baseCmd + + val []string +} + +var _ Cmder = (*StringSliceCmd)(nil) + +func NewStringSliceCmd(args ...interface{}) *StringSliceCmd { + return &StringSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *StringSliceCmd) Val() []string { + return cmd.val +} + +func (cmd *StringSliceCmd) Result() ([]string, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *StringSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { + return proto.ScanSlice(cmd.Val(), container) +} + +func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(stringSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]string) + return nil +} + +// Implements proto.MultiBulkParse +func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ss := make([]string, 0, n) + for i := int64(0); i < n; i++ { + s, err := rd.ReadString() + if err == Nil { + ss = append(ss, "") + } else if err != nil { + return nil, err + } else { + ss = append(ss, s) + } + } + return ss, nil +} + +//------------------------------------------------------------------------------ + +type BoolSliceCmd struct { + baseCmd + + val []bool +} + +var _ Cmder = (*BoolSliceCmd)(nil) + +func NewBoolSliceCmd(args ...interface{}) *BoolSliceCmd { + return &BoolSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *BoolSliceCmd) Val() []bool { + return cmd.val +} + +func (cmd *BoolSliceCmd) Result() ([]bool, error) { + return cmd.val, cmd.err +} + +func (cmd *BoolSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(boolSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]bool) + return nil +} + +// Implements proto.MultiBulkParse +func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + bools := make([]bool, 0, n) + for i := int64(0); i < n; i++ { + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + bools = append(bools, n == 1) + } + return bools, nil +} + +//------------------------------------------------------------------------------ + +type StringStringMapCmd struct { + baseCmd + + val map[string]string +} + +var _ Cmder = (*StringStringMapCmd)(nil) + +func NewStringStringMapCmd(args ...interface{}) *StringStringMapCmd { + return &StringStringMapCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *StringStringMapCmd) Val() map[string]string { + return cmd.val +} + +func (cmd *StringStringMapCmd) Result() (map[string]string, error) { + return cmd.val, cmd.err +} + +func (cmd *StringStringMapCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(stringStringMapParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(map[string]string) + return nil +} + +// Implements proto.MultiBulkParse +func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]string, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + +//------------------------------------------------------------------------------ + +type StringIntMapCmd struct { + baseCmd + + val map[string]int64 +} + +var _ Cmder = (*StringIntMapCmd)(nil) + +func NewStringIntMapCmd(args ...interface{}) *StringIntMapCmd { + return &StringIntMapCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *StringIntMapCmd) Val() map[string]int64 { + return cmd.val +} + +func (cmd *StringIntMapCmd) Result() (map[string]int64, error) { + return cmd.val, cmd.err +} + +func (cmd *StringIntMapCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(stringIntMapParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(map[string]int64) + return nil +} + +// Implements proto.MultiBulkParse +func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]int64, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + m[key] = n + } + return m, nil +} + +//------------------------------------------------------------------------------ + +type StringStructMapCmd struct { + baseCmd + + val map[string]struct{} +} + +var _ Cmder = (*StringStructMapCmd)(nil) + +func NewStringStructMapCmd(args ...interface{}) *StringStructMapCmd { + return &StringStructMapCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *StringStructMapCmd) Val() map[string]struct{} { + return cmd.val +} + +func (cmd *StringStructMapCmd) Result() (map[string]struct{}, error) { + return cmd.val, cmd.err +} + +func (cmd *StringStructMapCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(stringStructMapParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(map[string]struct{}) + return nil +} + +// Implements proto.MultiBulkParse +func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]struct{}, n) + for i := int64(0); i < n; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = struct{}{} + } + return m, nil +} + +//------------------------------------------------------------------------------ + +type XMessage struct { + ID string + Values map[string]interface{} +} + +type XMessageSliceCmd struct { + baseCmd + + val []XMessage +} + +var _ Cmder = (*XMessageSliceCmd)(nil) + +func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { + return &XMessageSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XMessageSliceCmd) Val() []XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XMessage) + return nil +} + +// Implements proto.MultiBulkParse +func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + msgs := make([]XMessage, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + return nil, err + } + + msgs = append(msgs, XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return msgs, nil +} + +// Implements proto.MultiBulkParse +func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]interface{}, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + +//------------------------------------------------------------------------------ + +type XStream struct { + Stream string + Messages []XMessage +} + +type XStreamSliceCmd struct { + baseCmd + + val []XStream +} + +var _ Cmder = (*XStreamSliceCmd)(nil) + +func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { + return &XStreamSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XStreamSliceCmd) Val() []XStream { + return cmd.val +} + +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { + return cmd.val, cmd.err +} + +func (cmd *XStreamSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xStreamSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XStream) + return nil +} + +// Implements proto.MultiBulkParse +func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XStream, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + ret = append(ret, XStream{ + Stream: stream, + Messages: v.([]XMessage), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} + +//------------------------------------------------------------------------------ + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + baseCmd + val *XPending +} + +var _ Cmder = (*XPendingCmd)(nil) + +func NewXPendingCmd(args ...interface{}) *XPendingCmd { + return &XPendingCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XPendingCmd) Val() *XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (*XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.(*XPending) + return nil +} + +func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + count, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + lower, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + higher, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + pending := &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + for i := int64(0); i < n; i++ { + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + consumerName, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumerPending, err := rd.ReadInt() + if err != nil { + return nil, err + } + + if pending.Consumers == nil { + pending.Consumers = make(map[string]int64) + } + pending.Consumers[consumerName] = consumerPending + + return nil, nil + }) + if err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil && err != Nil { + return nil, err + } + + return pending, nil +} + +//------------------------------------------------------------------------------ + +type XPendingExt struct { + Id string + Consumer string + Idle time.Duration + RetryCount int64 +} + +type XPendingExtCmd struct { + baseCmd + val []XPendingExt +} + +var _ Cmder = (*XPendingExtCmd)(nil) + +func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { + return &XPendingExtCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XPendingExtCmd) Val() []XPendingExt { + return cmd.val +} + +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingExtCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.([]XPendingExt) + return nil +} + +func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XPendingExt, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumer, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + idle, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + retryCount, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + ret = append(ret, XPendingExt{ + Id: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} + +//------------------------------------------------------------------------------ + +//------------------------------------------------------------------------------ + +type ZSliceCmd struct { + baseCmd + + val []Z +} + +var _ Cmder = (*ZSliceCmd)(nil) + +func NewZSliceCmd(args ...interface{}) *ZSliceCmd { + return &ZSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *ZSliceCmd) Val() []Z { + return cmd.val +} + +func (cmd *ZSliceCmd) Result() ([]Z, error) { + return cmd.val, cmd.err +} + +func (cmd *ZSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(zSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]Z) + return nil +} + +// Implements proto.MultiBulkParse +func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + zz := make([]Z, n/2) + for i := int64(0); i < n; i += 2 { + var err error + + z := &zz[i/2] + + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + return zz, nil +} + +//------------------------------------------------------------------------------ + +type ZWithKeyCmd struct { + baseCmd + + val ZWithKey +} + +var _ Cmder = (*ZWithKeyCmd)(nil) + +func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { + return &ZWithKeyCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *ZWithKeyCmd) Val() ZWithKey { + return cmd.val +} + +func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ZWithKeyCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(zWithKeyParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(ZWithKey) + return nil +} + +// Implements proto.MultiBulkParse +func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 3 { + return nil, fmt.Errorf("got %d elements, expected 3", n) + } + + var z ZWithKey + var err error + + z.Key, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + return z, nil +} + +//------------------------------------------------------------------------------ + +type ScanCmd struct { + baseCmd + + page []string + cursor uint64 + + process func(cmd Cmder) error +} + +var _ Cmder = (*ScanCmd)(nil) + +func NewScanCmd(process func(cmd Cmder) error, args ...interface{}) *ScanCmd { + return &ScanCmd{ + baseCmd: baseCmd{_args: args}, + process: process, + } +} + +func (cmd *ScanCmd) Val() (keys []string, cursor uint64) { + return cmd.page, cmd.cursor +} + +func (cmd *ScanCmd) Result() (keys []string, cursor uint64, err error) { + return cmd.page, cmd.cursor, cmd.err +} + +func (cmd *ScanCmd) String() string { + return cmdString(cmd, cmd.page) +} + +func (cmd *ScanCmd) readReply(rd *proto.Reader) error { + cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() + return cmd.err +} + +// Iterator creates a new ScanIterator. +func (cmd *ScanCmd) Iterator() *ScanIterator { + return &ScanIterator{ + cmd: cmd, + } +} + +//------------------------------------------------------------------------------ + +type ClusterNode struct { + Id string + Addr string +} + +type ClusterSlot struct { + Start int + End int + Nodes []ClusterNode +} + +type ClusterSlotsCmd struct { + baseCmd + + val []ClusterSlot +} + +var _ Cmder = (*ClusterSlotsCmd)(nil) + +func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd { + return &ClusterSlotsCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *ClusterSlotsCmd) Val() []ClusterSlot { + return cmd.val +} + +func (cmd *ClusterSlotsCmd) Result() ([]ClusterSlot, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ClusterSlotsCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(clusterSlotsParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]ClusterSlot) + return nil +} + +// Implements proto.MultiBulkParse +func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { + slots := make([]ClusterSlot, n) + for i := 0; i < len(slots); i++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n < 2 { + err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + return nil, err + } + + start, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + end, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 && n != 3 { + err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) + return nil, err + } + + ip, err := rd.ReadString() + if err != nil { + return nil, err + } + + port, err := rd.ReadString() + if err != nil { + return nil, err + } + + nodes[j].Addr = net.JoinHostPort(ip, port) + + if n == 3 { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + nodes[j].Id = id + } + } + + slots[i] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } + } + return slots, nil +} + +//------------------------------------------------------------------------------ + +// GeoLocation is used with GeoAdd to add geospatial location. +type GeoLocation struct { + Name string + Longitude, Latitude, Dist float64 + GeoHash int64 +} + +// GeoRadiusQuery is used with GeoRadius to query geospatial index. +type GeoRadiusQuery struct { + Radius float64 + // Can be m, km, ft, or mi. Default is km. + Unit string + WithCoord bool + WithDist bool + WithGeoHash bool + Count int + // Can be ASC or DESC. Default is no sort order. + Sort string + Store string + StoreDist string +} + +type GeoLocationCmd struct { + baseCmd + + q *GeoRadiusQuery + locations []GeoLocation +} + +var _ Cmder = (*GeoLocationCmd)(nil) + +func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd { + args = append(args, q.Radius) + if q.Unit != "" { + args = append(args, q.Unit) + } else { + args = append(args, "km") + } + if q.WithCoord { + args = append(args, "withcoord") + } + if q.WithDist { + args = append(args, "withdist") + } + if q.WithGeoHash { + args = append(args, "withhash") + } + if q.Count > 0 { + args = append(args, "count", q.Count) + } + if q.Sort != "" { + args = append(args, q.Sort) + } + if q.Store != "" { + args = append(args, "store") + args = append(args, q.Store) + } + if q.StoreDist != "" { + args = append(args, "storedist") + args = append(args, q.StoreDist) + } + return &GeoLocationCmd{ + baseCmd: baseCmd{_args: args}, + q: q, + } +} + +func (cmd *GeoLocationCmd) Val() []GeoLocation { + return cmd.locations +} + +func (cmd *GeoLocationCmd) Result() ([]GeoLocation, error) { + return cmd.locations, cmd.err +} + +func (cmd *GeoLocationCmd) String() string { + return cmdString(cmd, cmd.locations) +} + +func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) + if cmd.err != nil { + return cmd.err + } + cmd.locations = v.([]GeoLocation) + return nil +} + +func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + var loc GeoLocation + var err error + + loc.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + if q.WithDist { + loc.Dist, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + if q.WithGeoHash { + loc.GeoHash, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + } + if q.WithCoord { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d coordinates, expected 2", n) + } + + loc.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + loc.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + + return &loc, nil + } +} + +func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + locs := make([]GeoLocation, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(newGeoLocationParser(q)) + if err != nil { + return nil, err + } + switch vv := v.(type) { + case string: + locs = append(locs, GeoLocation{ + Name: vv, + }) + case *GeoLocation: + locs = append(locs, *vv) + default: + return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) + } + } + return locs, nil + } +} + +//------------------------------------------------------------------------------ + +type GeoPos struct { + Longitude, Latitude float64 +} + +type GeoPosCmd struct { + baseCmd + + positions []*GeoPos +} + +var _ Cmder = (*GeoPosCmd)(nil) + +func NewGeoPosCmd(args ...interface{}) *GeoPosCmd { + return &GeoPosCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *GeoPosCmd) Val() []*GeoPos { + return cmd.positions +} + +func (cmd *GeoPosCmd) Result() ([]*GeoPos, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *GeoPosCmd) String() string { + return cmdString(cmd, cmd.positions) +} + +func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(geoPosSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.positions = v.([]*GeoPos) + return nil +} + +func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + positions := make([]*GeoPos, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(geoPosParser) + if err != nil { + if err == Nil { + positions = append(positions, nil) + continue + } + return nil, err + } + switch v := v.(type) { + case *GeoPos: + positions = append(positions, v) + default: + return nil, fmt.Errorf("got %T, expected *GeoPos", v) + } + } + return positions, nil +} + +func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { + var pos GeoPos + var err error + + pos.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + pos.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + return &pos, nil +} + +//------------------------------------------------------------------------------ + +type CommandInfo struct { + Name string + Arity int8 + Flags []string + FirstKeyPos int8 + LastKeyPos int8 + StepCount int8 + ReadOnly bool +} + +type CommandsInfoCmd struct { + baseCmd + + val map[string]*CommandInfo +} + +var _ Cmder = (*CommandsInfoCmd)(nil) + +func NewCommandsInfoCmd(args ...interface{}) *CommandsInfoCmd { + return &CommandsInfoCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *CommandsInfoCmd) Val() map[string]*CommandInfo { + return cmd.val +} + +func (cmd *CommandsInfoCmd) Result() (map[string]*CommandInfo, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *CommandsInfoCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(map[string]*CommandInfo) + return nil +} + +// Implements proto.MultiBulkParse +func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} + +func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) + } + + cmd.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + + arity, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := rd.ReadReply(stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + +//------------------------------------------------------------------------------ + +type cmdsInfoCache struct { + fn func() (map[string]*CommandInfo, error) + + once internal.Once + cmds map[string]*CommandInfo +} + +func newCmdsInfoCache(fn func() (map[string]*CommandInfo, error)) *cmdsInfoCache { + return &cmdsInfoCache{ + fn: fn, + } +} + +func (c *cmdsInfoCache) Get() (map[string]*CommandInfo, error) { + err := c.once.Do(func() error { + cmds, err := c.fn() + if err != nil { + return err + } + c.cmds = cmds + return nil + }) + return c.cmds, err +} diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go new file mode 100644 index 0000000000..653e4abe96 --- /dev/null +++ b/vendor/github.com/go-redis/redis/commands.go @@ -0,0 +1,2583 @@ +package redis + +import ( + "errors" + "io" + "time" + + "github.com/go-redis/redis/internal" +) + +func usePrecise(dur time.Duration) bool { + return dur < time.Second || dur%time.Second != 0 +} + +func formatMs(dur time.Duration) int64 { + if dur > 0 && dur < time.Millisecond { + internal.Logf( + "specified duration is %s, but minimal supported value is %s", + dur, time.Millisecond, + ) + } + return int64(dur / time.Millisecond) +} + +func formatSec(dur time.Duration) int64 { + if dur > 0 && dur < time.Second { + internal.Logf( + "specified duration is %s, but minimal supported value is %s", + dur, time.Second, + ) + } + return int64(dur / time.Second) +} + +func appendArgs(dst, src []interface{}) []interface{} { + if len(src) == 1 { + if ss, ok := src[0].([]string); ok { + for _, s := range ss { + dst = append(dst, s) + } + return dst + } + } + + for _, v := range src { + dst = append(dst, v) + } + return dst +} + +type Cmdable interface { + Pipeline() Pipeliner + Pipelined(fn func(Pipeliner) error) ([]Cmder, error) + + TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) + TxPipeline() Pipeliner + + Command() *CommandsInfoCmd + ClientGetName() *StringCmd + Echo(message interface{}) *StringCmd + Ping() *StatusCmd + Quit() *StatusCmd + Del(keys ...string) *IntCmd + Unlink(keys ...string) *IntCmd + Dump(key string) *StringCmd + Exists(keys ...string) *IntCmd + Expire(key string, expiration time.Duration) *BoolCmd + ExpireAt(key string, tm time.Time) *BoolCmd + Keys(pattern string) *StringSliceCmd + Migrate(host, port, key string, db int64, timeout time.Duration) *StatusCmd + Move(key string, db int64) *BoolCmd + ObjectRefCount(key string) *IntCmd + ObjectEncoding(key string) *StringCmd + ObjectIdleTime(key string) *DurationCmd + Persist(key string) *BoolCmd + PExpire(key string, expiration time.Duration) *BoolCmd + PExpireAt(key string, tm time.Time) *BoolCmd + PTTL(key string) *DurationCmd + RandomKey() *StringCmd + Rename(key, newkey string) *StatusCmd + RenameNX(key, newkey string) *BoolCmd + Restore(key string, ttl time.Duration, value string) *StatusCmd + RestoreReplace(key string, ttl time.Duration, value string) *StatusCmd + Sort(key string, sort *Sort) *StringSliceCmd + SortStore(key, store string, sort *Sort) *IntCmd + SortInterfaces(key string, sort *Sort) *SliceCmd + Touch(keys ...string) *IntCmd + TTL(key string) *DurationCmd + Type(key string) *StatusCmd + Scan(cursor uint64, match string, count int64) *ScanCmd + SScan(key string, cursor uint64, match string, count int64) *ScanCmd + HScan(key string, cursor uint64, match string, count int64) *ScanCmd + ZScan(key string, cursor uint64, match string, count int64) *ScanCmd + Append(key, value string) *IntCmd + BitCount(key string, bitCount *BitCount) *IntCmd + BitOpAnd(destKey string, keys ...string) *IntCmd + BitOpOr(destKey string, keys ...string) *IntCmd + BitOpXor(destKey string, keys ...string) *IntCmd + BitOpNot(destKey string, key string) *IntCmd + BitPos(key string, bit int64, pos ...int64) *IntCmd + Decr(key string) *IntCmd + DecrBy(key string, decrement int64) *IntCmd + Get(key string) *StringCmd + GetBit(key string, offset int64) *IntCmd + GetRange(key string, start, end int64) *StringCmd + GetSet(key string, value interface{}) *StringCmd + Incr(key string) *IntCmd + IncrBy(key string, value int64) *IntCmd + IncrByFloat(key string, value float64) *FloatCmd + MGet(keys ...string) *SliceCmd + MSet(pairs ...interface{}) *StatusCmd + MSetNX(pairs ...interface{}) *BoolCmd + Set(key string, value interface{}, expiration time.Duration) *StatusCmd + SetBit(key string, offset int64, value int) *IntCmd + SetNX(key string, value interface{}, expiration time.Duration) *BoolCmd + SetXX(key string, value interface{}, expiration time.Duration) *BoolCmd + SetRange(key string, offset int64, value string) *IntCmd + StrLen(key string) *IntCmd + HDel(key string, fields ...string) *IntCmd + HExists(key, field string) *BoolCmd + HGet(key, field string) *StringCmd + HGetAll(key string) *StringStringMapCmd + HIncrBy(key, field string, incr int64) *IntCmd + HIncrByFloat(key, field string, incr float64) *FloatCmd + HKeys(key string) *StringSliceCmd + HLen(key string) *IntCmd + HMGet(key string, fields ...string) *SliceCmd + HMSet(key string, fields map[string]interface{}) *StatusCmd + HSet(key, field string, value interface{}) *BoolCmd + HSetNX(key, field string, value interface{}) *BoolCmd + HVals(key string) *StringSliceCmd + BLPop(timeout time.Duration, keys ...string) *StringSliceCmd + BRPop(timeout time.Duration, keys ...string) *StringSliceCmd + BRPopLPush(source, destination string, timeout time.Duration) *StringCmd + LIndex(key string, index int64) *StringCmd + LInsert(key, op string, pivot, value interface{}) *IntCmd + LInsertBefore(key string, pivot, value interface{}) *IntCmd + LInsertAfter(key string, pivot, value interface{}) *IntCmd + LLen(key string) *IntCmd + LPop(key string) *StringCmd + LPush(key string, values ...interface{}) *IntCmd + LPushX(key string, value interface{}) *IntCmd + LRange(key string, start, stop int64) *StringSliceCmd + LRem(key string, count int64, value interface{}) *IntCmd + LSet(key string, index int64, value interface{}) *StatusCmd + LTrim(key string, start, stop int64) *StatusCmd + RPop(key string) *StringCmd + RPopLPush(source, destination string) *StringCmd + RPush(key string, values ...interface{}) *IntCmd + RPushX(key string, value interface{}) *IntCmd + SAdd(key string, members ...interface{}) *IntCmd + SCard(key string) *IntCmd + SDiff(keys ...string) *StringSliceCmd + SDiffStore(destination string, keys ...string) *IntCmd + SInter(keys ...string) *StringSliceCmd + SInterStore(destination string, keys ...string) *IntCmd + SIsMember(key string, member interface{}) *BoolCmd + SMembers(key string) *StringSliceCmd + SMembersMap(key string) *StringStructMapCmd + SMove(source, destination string, member interface{}) *BoolCmd + SPop(key string) *StringCmd + SPopN(key string, count int64) *StringSliceCmd + SRandMember(key string) *StringCmd + SRandMemberN(key string, count int64) *StringSliceCmd + SRem(key string, members ...interface{}) *IntCmd + SUnion(keys ...string) *StringSliceCmd + SUnionStore(destination string, keys ...string) *IntCmd + XAdd(a *XAddArgs) *StringCmd + XDel(stream string, ids ...string) *IntCmd + XLen(stream string) *IntCmd + XRange(stream, start, stop string) *XMessageSliceCmd + XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd + XRevRange(stream string, start, stop string) *XMessageSliceCmd + XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd + XRead(a *XReadArgs) *XStreamSliceCmd + XReadStreams(streams ...string) *XStreamSliceCmd + XGroupCreate(stream, group, start string) *StatusCmd + XGroupCreateMkStream(stream, group, start string) *StatusCmd + XGroupSetID(stream, group, start string) *StatusCmd + XGroupDestroy(stream, group string) *IntCmd + XGroupDelConsumer(stream, group, consumer string) *IntCmd + XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd + XAck(stream, group string, ids ...string) *IntCmd + XPending(stream, group string) *XPendingCmd + XPendingExt(a *XPendingExtArgs) *XPendingExtCmd + XClaim(a *XClaimArgs) *XMessageSliceCmd + XClaimJustID(a *XClaimArgs) *StringSliceCmd + XTrim(key string, maxLen int64) *IntCmd + XTrimApprox(key string, maxLen int64) *IntCmd + BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd + BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd + ZAdd(key string, members ...Z) *IntCmd + ZAddNX(key string, members ...Z) *IntCmd + ZAddXX(key string, members ...Z) *IntCmd + ZAddCh(key string, members ...Z) *IntCmd + ZAddNXCh(key string, members ...Z) *IntCmd + ZAddXXCh(key string, members ...Z) *IntCmd + ZIncr(key string, member Z) *FloatCmd + ZIncrNX(key string, member Z) *FloatCmd + ZIncrXX(key string, member Z) *FloatCmd + ZCard(key string) *IntCmd + ZCount(key, min, max string) *IntCmd + ZLexCount(key, min, max string) *IntCmd + ZIncrBy(key string, increment float64, member string) *FloatCmd + ZInterStore(destination string, store ZStore, keys ...string) *IntCmd + ZPopMax(key string, count ...int64) *ZSliceCmd + ZPopMin(key string, count ...int64) *ZSliceCmd + ZRange(key string, start, stop int64) *StringSliceCmd + ZRangeWithScores(key string, start, stop int64) *ZSliceCmd + ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd + ZRangeByLex(key string, opt ZRangeBy) *StringSliceCmd + ZRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceCmd + ZRank(key, member string) *IntCmd + ZRem(key string, members ...interface{}) *IntCmd + ZRemRangeByRank(key string, start, stop int64) *IntCmd + ZRemRangeByScore(key, min, max string) *IntCmd + ZRemRangeByLex(key, min, max string) *IntCmd + ZRevRange(key string, start, stop int64) *StringSliceCmd + ZRevRangeWithScores(key string, start, stop int64) *ZSliceCmd + ZRevRangeByScore(key string, opt ZRangeBy) *StringSliceCmd + ZRevRangeByLex(key string, opt ZRangeBy) *StringSliceCmd + ZRevRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceCmd + ZRevRank(key, member string) *IntCmd + ZScore(key, member string) *FloatCmd + ZUnionStore(dest string, store ZStore, keys ...string) *IntCmd + PFAdd(key string, els ...interface{}) *IntCmd + PFCount(keys ...string) *IntCmd + PFMerge(dest string, keys ...string) *StatusCmd + BgRewriteAOF() *StatusCmd + BgSave() *StatusCmd + ClientKill(ipPort string) *StatusCmd + ClientKillByFilter(keys ...string) *IntCmd + ClientList() *StringCmd + ClientPause(dur time.Duration) *BoolCmd + ClientID() *IntCmd + ConfigGet(parameter string) *SliceCmd + ConfigResetStat() *StatusCmd + ConfigSet(parameter, value string) *StatusCmd + ConfigRewrite() *StatusCmd + DBSize() *IntCmd + FlushAll() *StatusCmd + FlushAllAsync() *StatusCmd + FlushDB() *StatusCmd + FlushDBAsync() *StatusCmd + Info(section ...string) *StringCmd + LastSave() *IntCmd + Save() *StatusCmd + Shutdown() *StatusCmd + ShutdownSave() *StatusCmd + ShutdownNoSave() *StatusCmd + SlaveOf(host, port string) *StatusCmd + Time() *TimeCmd + Eval(script string, keys []string, args ...interface{}) *Cmd + EvalSha(sha1 string, keys []string, args ...interface{}) *Cmd + ScriptExists(hashes ...string) *BoolSliceCmd + ScriptFlush() *StatusCmd + ScriptKill() *StatusCmd + ScriptLoad(script string) *StringCmd + DebugObject(key string) *StringCmd + Publish(channel string, message interface{}) *IntCmd + PubSubChannels(pattern string) *StringSliceCmd + PubSubNumSub(channels ...string) *StringIntMapCmd + PubSubNumPat() *IntCmd + ClusterSlots() *ClusterSlotsCmd + ClusterNodes() *StringCmd + ClusterMeet(host, port string) *StatusCmd + ClusterForget(nodeID string) *StatusCmd + ClusterReplicate(nodeID string) *StatusCmd + ClusterResetSoft() *StatusCmd + ClusterResetHard() *StatusCmd + ClusterInfo() *StringCmd + ClusterKeySlot(key string) *IntCmd + ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd + ClusterCountFailureReports(nodeID string) *IntCmd + ClusterCountKeysInSlot(slot int) *IntCmd + ClusterDelSlots(slots ...int) *StatusCmd + ClusterDelSlotsRange(min, max int) *StatusCmd + ClusterSaveConfig() *StatusCmd + ClusterSlaves(nodeID string) *StringSliceCmd + ClusterFailover() *StatusCmd + ClusterAddSlots(slots ...int) *StatusCmd + ClusterAddSlotsRange(min, max int) *StatusCmd + GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd + GeoPos(key string, members ...string) *GeoPosCmd + GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd + GeoRadiusRO(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd + GeoRadiusByMember(key, member string, query *GeoRadiusQuery) *GeoLocationCmd + GeoRadiusByMemberRO(key, member string, query *GeoRadiusQuery) *GeoLocationCmd + GeoDist(key string, member1, member2, unit string) *FloatCmd + GeoHash(key string, members ...string) *StringSliceCmd + ReadOnly() *StatusCmd + ReadWrite() *StatusCmd + MemoryUsage(key string, samples ...int) *IntCmd +} + +type StatefulCmdable interface { + Cmdable + Auth(password string) *StatusCmd + Select(index int) *StatusCmd + SwapDB(index1, index2 int) *StatusCmd + ClientSetName(name string) *BoolCmd +} + +var _ Cmdable = (*Client)(nil) +var _ Cmdable = (*Tx)(nil) +var _ Cmdable = (*Ring)(nil) +var _ Cmdable = (*ClusterClient)(nil) + +type cmdable struct { + process func(cmd Cmder) error +} + +func (c *cmdable) setProcessor(fn func(Cmder) error) { + c.process = fn +} + +type statefulCmdable struct { + cmdable + process func(cmd Cmder) error +} + +func (c *statefulCmdable) setProcessor(fn func(Cmder) error) { + c.process = fn + c.cmdable.setProcessor(fn) +} + +//------------------------------------------------------------------------------ + +func (c *statefulCmdable) Auth(password string) *StatusCmd { + cmd := NewStatusCmd("auth", password) + c.process(cmd) + return cmd +} + +func (c *cmdable) Echo(message interface{}) *StringCmd { + cmd := NewStringCmd("echo", message) + c.process(cmd) + return cmd +} + +func (c *cmdable) Ping() *StatusCmd { + cmd := NewStatusCmd("ping") + c.process(cmd) + return cmd +} + +func (c *cmdable) Wait(numSlaves int, timeout time.Duration) *IntCmd { + cmd := NewIntCmd("wait", numSlaves, int(timeout/time.Millisecond)) + c.process(cmd) + return cmd +} + +func (c *cmdable) Quit() *StatusCmd { + panic("not implemented") +} + +func (c *statefulCmdable) Select(index int) *StatusCmd { + cmd := NewStatusCmd("select", index) + c.process(cmd) + return cmd +} + +func (c *statefulCmdable) SwapDB(index1, index2 int) *StatusCmd { + cmd := NewStatusCmd("swapdb", index1, index2) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) Command() *CommandsInfoCmd { + cmd := NewCommandsInfoCmd("command") + c.process(cmd) + return cmd +} + +func (c *cmdable) Del(keys ...string) *IntCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "del" + for i, key := range keys { + args[1+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) Unlink(keys ...string) *IntCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "unlink" + for i, key := range keys { + args[1+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) Dump(key string) *StringCmd { + cmd := NewStringCmd("dump", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) Exists(keys ...string) *IntCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "exists" + for i, key := range keys { + args[1+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) Expire(key string, expiration time.Duration) *BoolCmd { + cmd := NewBoolCmd("expire", key, formatSec(expiration)) + c.process(cmd) + return cmd +} + +func (c *cmdable) ExpireAt(key string, tm time.Time) *BoolCmd { + cmd := NewBoolCmd("expireat", key, tm.Unix()) + c.process(cmd) + return cmd +} + +func (c *cmdable) Keys(pattern string) *StringSliceCmd { + cmd := NewStringSliceCmd("keys", pattern) + c.process(cmd) + return cmd +} + +func (c *cmdable) Migrate(host, port, key string, db int64, timeout time.Duration) *StatusCmd { + cmd := NewStatusCmd( + "migrate", + host, + port, + key, + db, + formatMs(timeout), + ) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +func (c *cmdable) Move(key string, db int64) *BoolCmd { + cmd := NewBoolCmd("move", key, db) + c.process(cmd) + return cmd +} + +func (c *cmdable) ObjectRefCount(key string) *IntCmd { + cmd := NewIntCmd("object", "refcount", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) ObjectEncoding(key string) *StringCmd { + cmd := NewStringCmd("object", "encoding", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) ObjectIdleTime(key string) *DurationCmd { + cmd := NewDurationCmd(time.Second, "object", "idletime", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) Persist(key string) *BoolCmd { + cmd := NewBoolCmd("persist", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) PExpire(key string, expiration time.Duration) *BoolCmd { + cmd := NewBoolCmd("pexpire", key, formatMs(expiration)) + c.process(cmd) + return cmd +} + +func (c *cmdable) PExpireAt(key string, tm time.Time) *BoolCmd { + cmd := NewBoolCmd( + "pexpireat", + key, + tm.UnixNano()/int64(time.Millisecond), + ) + c.process(cmd) + return cmd +} + +func (c *cmdable) PTTL(key string) *DurationCmd { + cmd := NewDurationCmd(time.Millisecond, "pttl", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) RandomKey() *StringCmd { + cmd := NewStringCmd("randomkey") + c.process(cmd) + return cmd +} + +func (c *cmdable) Rename(key, newkey string) *StatusCmd { + cmd := NewStatusCmd("rename", key, newkey) + c.process(cmd) + return cmd +} + +func (c *cmdable) RenameNX(key, newkey string) *BoolCmd { + cmd := NewBoolCmd("renamenx", key, newkey) + c.process(cmd) + return cmd +} + +func (c *cmdable) Restore(key string, ttl time.Duration, value string) *StatusCmd { + cmd := NewStatusCmd( + "restore", + key, + formatMs(ttl), + value, + ) + c.process(cmd) + return cmd +} + +func (c *cmdable) RestoreReplace(key string, ttl time.Duration, value string) *StatusCmd { + cmd := NewStatusCmd( + "restore", + key, + formatMs(ttl), + value, + "replace", + ) + c.process(cmd) + return cmd +} + +type Sort struct { + By string + Offset, Count int64 + Get []string + Order string + Alpha bool +} + +func (sort *Sort) args(key string) []interface{} { + args := []interface{}{"sort", key} + if sort.By != "" { + args = append(args, "by", sort.By) + } + if sort.Offset != 0 || sort.Count != 0 { + args = append(args, "limit", sort.Offset, sort.Count) + } + for _, get := range sort.Get { + args = append(args, "get", get) + } + if sort.Order != "" { + args = append(args, sort.Order) + } + if sort.Alpha { + args = append(args, "alpha") + } + return args +} + +func (c *cmdable) Sort(key string, sort *Sort) *StringSliceCmd { + cmd := NewStringSliceCmd(sort.args(key)...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SortStore(key, store string, sort *Sort) *IntCmd { + args := sort.args(key) + if store != "" { + args = append(args, "store", store) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SortInterfaces(key string, sort *Sort) *SliceCmd { + cmd := NewSliceCmd(sort.args(key)...) + c.process(cmd) + return cmd +} + +func (c *cmdable) Touch(keys ...string) *IntCmd { + args := make([]interface{}, len(keys)+1) + args[0] = "touch" + for i, key := range keys { + args[i+1] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) TTL(key string) *DurationCmd { + cmd := NewDurationCmd(time.Second, "ttl", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) Type(key string) *StatusCmd { + cmd := NewStatusCmd("type", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) Scan(cursor uint64, match string, count int64) *ScanCmd { + args := []interface{}{"scan", cursor} + if match != "" { + args = append(args, "match", match) + } + if count > 0 { + args = append(args, "count", count) + } + cmd := NewScanCmd(c.process, args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SScan(key string, cursor uint64, match string, count int64) *ScanCmd { + args := []interface{}{"sscan", key, cursor} + if match != "" { + args = append(args, "match", match) + } + if count > 0 { + args = append(args, "count", count) + } + cmd := NewScanCmd(c.process, args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) HScan(key string, cursor uint64, match string, count int64) *ScanCmd { + args := []interface{}{"hscan", key, cursor} + if match != "" { + args = append(args, "match", match) + } + if count > 0 { + args = append(args, "count", count) + } + cmd := NewScanCmd(c.process, args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZScan(key string, cursor uint64, match string, count int64) *ScanCmd { + args := []interface{}{"zscan", key, cursor} + if match != "" { + args = append(args, "match", match) + } + if count > 0 { + args = append(args, "count", count) + } + cmd := NewScanCmd(c.process, args...) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) Append(key, value string) *IntCmd { + cmd := NewIntCmd("append", key, value) + c.process(cmd) + return cmd +} + +type BitCount struct { + Start, End int64 +} + +func (c *cmdable) BitCount(key string, bitCount *BitCount) *IntCmd { + args := []interface{}{"bitcount", key} + if bitCount != nil { + args = append( + args, + bitCount.Start, + bitCount.End, + ) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) bitOp(op, destKey string, keys ...string) *IntCmd { + args := make([]interface{}, 3+len(keys)) + args[0] = "bitop" + args[1] = op + args[2] = destKey + for i, key := range keys { + args[3+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) BitOpAnd(destKey string, keys ...string) *IntCmd { + return c.bitOp("and", destKey, keys...) +} + +func (c *cmdable) BitOpOr(destKey string, keys ...string) *IntCmd { + return c.bitOp("or", destKey, keys...) +} + +func (c *cmdable) BitOpXor(destKey string, keys ...string) *IntCmd { + return c.bitOp("xor", destKey, keys...) +} + +func (c *cmdable) BitOpNot(destKey string, key string) *IntCmd { + return c.bitOp("not", destKey, key) +} + +func (c *cmdable) BitPos(key string, bit int64, pos ...int64) *IntCmd { + args := make([]interface{}, 3+len(pos)) + args[0] = "bitpos" + args[1] = key + args[2] = bit + switch len(pos) { + case 0: + case 1: + args[3] = pos[0] + case 2: + args[3] = pos[0] + args[4] = pos[1] + default: + panic("too many arguments") + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) Decr(key string) *IntCmd { + cmd := NewIntCmd("decr", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) DecrBy(key string, decrement int64) *IntCmd { + cmd := NewIntCmd("decrby", key, decrement) + c.process(cmd) + return cmd +} + +// Redis `GET key` command. It returns redis.Nil error when key does not exist. +func (c *cmdable) Get(key string) *StringCmd { + cmd := NewStringCmd("get", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) GetBit(key string, offset int64) *IntCmd { + cmd := NewIntCmd("getbit", key, offset) + c.process(cmd) + return cmd +} + +func (c *cmdable) GetRange(key string, start, end int64) *StringCmd { + cmd := NewStringCmd("getrange", key, start, end) + c.process(cmd) + return cmd +} + +func (c *cmdable) GetSet(key string, value interface{}) *StringCmd { + cmd := NewStringCmd("getset", key, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) Incr(key string) *IntCmd { + cmd := NewIntCmd("incr", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) IncrBy(key string, value int64) *IntCmd { + cmd := NewIntCmd("incrby", key, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) IncrByFloat(key string, value float64) *FloatCmd { + cmd := NewFloatCmd("incrbyfloat", key, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) MGet(keys ...string) *SliceCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "mget" + for i, key := range keys { + args[1+i] = key + } + cmd := NewSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) MSet(pairs ...interface{}) *StatusCmd { + args := make([]interface{}, 1, 1+len(pairs)) + args[0] = "mset" + args = appendArgs(args, pairs) + cmd := NewStatusCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) MSetNX(pairs ...interface{}) *BoolCmd { + args := make([]interface{}, 1, 1+len(pairs)) + args[0] = "msetnx" + args = appendArgs(args, pairs) + cmd := NewBoolCmd(args...) + c.process(cmd) + return cmd +} + +// Redis `SET key value [expiration]` command. +// +// Use expiration for `SETEX`-like behavior. +// Zero expiration means the key has no expiration time. +func (c *cmdable) Set(key string, value interface{}, expiration time.Duration) *StatusCmd { + args := make([]interface{}, 3, 4) + args[0] = "set" + args[1] = key + args[2] = value + if expiration > 0 { + if usePrecise(expiration) { + args = append(args, "px", formatMs(expiration)) + } else { + args = append(args, "ex", formatSec(expiration)) + } + } + cmd := NewStatusCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SetBit(key string, offset int64, value int) *IntCmd { + cmd := NewIntCmd( + "setbit", + key, + offset, + value, + ) + c.process(cmd) + return cmd +} + +// Redis `SET key value [expiration] NX` command. +// +// Zero expiration means the key has no expiration time. +func (c *cmdable) SetNX(key string, value interface{}, expiration time.Duration) *BoolCmd { + var cmd *BoolCmd + if expiration == 0 { + // Use old `SETNX` to support old Redis versions. + cmd = NewBoolCmd("setnx", key, value) + } else { + if usePrecise(expiration) { + cmd = NewBoolCmd("set", key, value, "px", formatMs(expiration), "nx") + } else { + cmd = NewBoolCmd("set", key, value, "ex", formatSec(expiration), "nx") + } + } + c.process(cmd) + return cmd +} + +// Redis `SET key value [expiration] XX` command. +// +// Zero expiration means the key has no expiration time. +func (c *cmdable) SetXX(key string, value interface{}, expiration time.Duration) *BoolCmd { + var cmd *BoolCmd + if expiration == 0 { + cmd = NewBoolCmd("set", key, value, "xx") + } else { + if usePrecise(expiration) { + cmd = NewBoolCmd("set", key, value, "px", formatMs(expiration), "xx") + } else { + cmd = NewBoolCmd("set", key, value, "ex", formatSec(expiration), "xx") + } + } + c.process(cmd) + return cmd +} + +func (c *cmdable) SetRange(key string, offset int64, value string) *IntCmd { + cmd := NewIntCmd("setrange", key, offset, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) StrLen(key string) *IntCmd { + cmd := NewIntCmd("strlen", key) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) HDel(key string, fields ...string) *IntCmd { + args := make([]interface{}, 2+len(fields)) + args[0] = "hdel" + args[1] = key + for i, field := range fields { + args[2+i] = field + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) HExists(key, field string) *BoolCmd { + cmd := NewBoolCmd("hexists", key, field) + c.process(cmd) + return cmd +} + +func (c *cmdable) HGet(key, field string) *StringCmd { + cmd := NewStringCmd("hget", key, field) + c.process(cmd) + return cmd +} + +func (c *cmdable) HGetAll(key string) *StringStringMapCmd { + cmd := NewStringStringMapCmd("hgetall", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) HIncrBy(key, field string, incr int64) *IntCmd { + cmd := NewIntCmd("hincrby", key, field, incr) + c.process(cmd) + return cmd +} + +func (c *cmdable) HIncrByFloat(key, field string, incr float64) *FloatCmd { + cmd := NewFloatCmd("hincrbyfloat", key, field, incr) + c.process(cmd) + return cmd +} + +func (c *cmdable) HKeys(key string) *StringSliceCmd { + cmd := NewStringSliceCmd("hkeys", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) HLen(key string) *IntCmd { + cmd := NewIntCmd("hlen", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) HMGet(key string, fields ...string) *SliceCmd { + args := make([]interface{}, 2+len(fields)) + args[0] = "hmget" + args[1] = key + for i, field := range fields { + args[2+i] = field + } + cmd := NewSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) HMSet(key string, fields map[string]interface{}) *StatusCmd { + args := make([]interface{}, 2+len(fields)*2) + args[0] = "hmset" + args[1] = key + i := 2 + for k, v := range fields { + args[i] = k + args[i+1] = v + i += 2 + } + cmd := NewStatusCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) HSet(key, field string, value interface{}) *BoolCmd { + cmd := NewBoolCmd("hset", key, field, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) HSetNX(key, field string, value interface{}) *BoolCmd { + cmd := NewBoolCmd("hsetnx", key, field, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) HVals(key string) *StringSliceCmd { + cmd := NewStringSliceCmd("hvals", key) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "blpop" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewStringSliceCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +func (c *cmdable) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "brpop" + for i, key := range keys { + args[1+i] = key + } + args[len(keys)+1] = formatSec(timeout) + cmd := NewStringSliceCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +func (c *cmdable) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd { + cmd := NewStringCmd( + "brpoplpush", + source, + destination, + formatSec(timeout), + ) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +func (c *cmdable) LIndex(key string, index int64) *StringCmd { + cmd := NewStringCmd("lindex", key, index) + c.process(cmd) + return cmd +} + +func (c *cmdable) LInsert(key, op string, pivot, value interface{}) *IntCmd { + cmd := NewIntCmd("linsert", key, op, pivot, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) LInsertBefore(key string, pivot, value interface{}) *IntCmd { + cmd := NewIntCmd("linsert", key, "before", pivot, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) LInsertAfter(key string, pivot, value interface{}) *IntCmd { + cmd := NewIntCmd("linsert", key, "after", pivot, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) LLen(key string) *IntCmd { + cmd := NewIntCmd("llen", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) LPop(key string) *StringCmd { + cmd := NewStringCmd("lpop", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) LPush(key string, values ...interface{}) *IntCmd { + args := make([]interface{}, 2, 2+len(values)) + args[0] = "lpush" + args[1] = key + args = appendArgs(args, values) + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) LPushX(key string, value interface{}) *IntCmd { + cmd := NewIntCmd("lpushx", key, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) LRange(key string, start, stop int64) *StringSliceCmd { + cmd := NewStringSliceCmd( + "lrange", + key, + start, + stop, + ) + c.process(cmd) + return cmd +} + +func (c *cmdable) LRem(key string, count int64, value interface{}) *IntCmd { + cmd := NewIntCmd("lrem", key, count, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) LSet(key string, index int64, value interface{}) *StatusCmd { + cmd := NewStatusCmd("lset", key, index, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) LTrim(key string, start, stop int64) *StatusCmd { + cmd := NewStatusCmd( + "ltrim", + key, + start, + stop, + ) + c.process(cmd) + return cmd +} + +func (c *cmdable) RPop(key string) *StringCmd { + cmd := NewStringCmd("rpop", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) RPopLPush(source, destination string) *StringCmd { + cmd := NewStringCmd("rpoplpush", source, destination) + c.process(cmd) + return cmd +} + +func (c *cmdable) RPush(key string, values ...interface{}) *IntCmd { + args := make([]interface{}, 2, 2+len(values)) + args[0] = "rpush" + args[1] = key + args = appendArgs(args, values) + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) RPushX(key string, value interface{}) *IntCmd { + cmd := NewIntCmd("rpushx", key, value) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) SAdd(key string, members ...interface{}) *IntCmd { + args := make([]interface{}, 2, 2+len(members)) + args[0] = "sadd" + args[1] = key + args = appendArgs(args, members) + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SCard(key string) *IntCmd { + cmd := NewIntCmd("scard", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) SDiff(keys ...string) *StringSliceCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "sdiff" + for i, key := range keys { + args[1+i] = key + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SDiffStore(destination string, keys ...string) *IntCmd { + args := make([]interface{}, 2+len(keys)) + args[0] = "sdiffstore" + args[1] = destination + for i, key := range keys { + args[2+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SInter(keys ...string) *StringSliceCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "sinter" + for i, key := range keys { + args[1+i] = key + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SInterStore(destination string, keys ...string) *IntCmd { + args := make([]interface{}, 2+len(keys)) + args[0] = "sinterstore" + args[1] = destination + for i, key := range keys { + args[2+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SIsMember(key string, member interface{}) *BoolCmd { + cmd := NewBoolCmd("sismember", key, member) + c.process(cmd) + return cmd +} + +// Redis `SMEMBERS key` command output as a slice +func (c *cmdable) SMembers(key string) *StringSliceCmd { + cmd := NewStringSliceCmd("smembers", key) + c.process(cmd) + return cmd +} + +// Redis `SMEMBERS key` command output as a map +func (c *cmdable) SMembersMap(key string) *StringStructMapCmd { + cmd := NewStringStructMapCmd("smembers", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) SMove(source, destination string, member interface{}) *BoolCmd { + cmd := NewBoolCmd("smove", source, destination, member) + c.process(cmd) + return cmd +} + +// Redis `SPOP key` command. +func (c *cmdable) SPop(key string) *StringCmd { + cmd := NewStringCmd("spop", key) + c.process(cmd) + return cmd +} + +// Redis `SPOP key count` command. +func (c *cmdable) SPopN(key string, count int64) *StringSliceCmd { + cmd := NewStringSliceCmd("spop", key, count) + c.process(cmd) + return cmd +} + +// Redis `SRANDMEMBER key` command. +func (c *cmdable) SRandMember(key string) *StringCmd { + cmd := NewStringCmd("srandmember", key) + c.process(cmd) + return cmd +} + +// Redis `SRANDMEMBER key count` command. +func (c *cmdable) SRandMemberN(key string, count int64) *StringSliceCmd { + cmd := NewStringSliceCmd("srandmember", key, count) + c.process(cmd) + return cmd +} + +func (c *cmdable) SRem(key string, members ...interface{}) *IntCmd { + args := make([]interface{}, 2, 2+len(members)) + args[0] = "srem" + args[1] = key + args = appendArgs(args, members) + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SUnion(keys ...string) *StringSliceCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "sunion" + for i, key := range keys { + args[1+i] = key + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd { + args := make([]interface{}, 2+len(keys)) + args[0] = "sunionstore" + args[1] = destination + for i, key := range keys { + args[2+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +type XAddArgs struct { + Stream string + MaxLen int64 // MAXLEN N + MaxLenApprox int64 // MAXLEN ~ N + ID string + Values map[string]interface{} +} + +func (c *cmdable) XAdd(a *XAddArgs) *StringCmd { + args := make([]interface{}, 0, 6+len(a.Values)*2) + args = append(args, "xadd") + args = append(args, a.Stream) + if a.MaxLen > 0 { + args = append(args, "maxlen", a.MaxLen) + } else if a.MaxLenApprox > 0 { + args = append(args, "maxlen", "~", a.MaxLenApprox) + } + if a.ID != "" { + args = append(args, a.ID) + } else { + args = append(args, "*") + } + for k, v := range a.Values { + args = append(args, k) + args = append(args, v) + } + + cmd := NewStringCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XDel(stream string, ids ...string) *IntCmd { + args := []interface{}{"xdel", stream} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XLen(stream string) *IntCmd { + cmd := NewIntCmd("xlen", stream) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRange(stream, start, stop string) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrange", stream, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrange", stream, start, stop, "count", count) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRevRange(stream, start, stop string) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop, "count", count) + c.process(cmd) + return cmd +} + +type XReadArgs struct { + Streams []string + Count int64 + Block time.Duration +} + +func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 5+len(a.Streams)) + args = append(args, "xread") + if a.Count > 0 { + args = append(args, "count") + args = append(args, a.Count) + } + if a.Block >= 0 { + args = append(args, "block") + args = append(args, int64(a.Block/time.Millisecond)) + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + if a.Block >= 0 { + cmd.setReadTimeout(a.Block) + } + c.process(cmd) + return cmd +} + +func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd { + return c.XRead(&XReadArgs{ + Streams: streams, + Block: -1, + }) +} + +func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream") + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "setid", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd { + cmd := NewIntCmd("xgroup", "destroy", stream, group) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { + cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer) + c.process(cmd) + return cmd +} + +type XReadGroupArgs struct { + Group string + Consumer string + // List of streams and ids. + Streams []string + Count int64 + Block time.Duration + NoAck bool +} + +func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 8+len(a.Streams)) + args = append(args, "xreadgroup", "group", a.Group, a.Consumer) + if a.Count > 0 { + args = append(args, "count", a.Count) + } + if a.Block >= 0 { + args = append(args, "block", int64(a.Block/time.Millisecond)) + } + if a.NoAck { + args = append(args, "noack") + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + if a.Block >= 0 { + cmd.setReadTimeout(a.Block) + } + c.process(cmd) + return cmd +} + +func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd { + args := []interface{}{"xack", stream, group} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XPending(stream, group string) *XPendingCmd { + cmd := NewXPendingCmd("xpending", stream, group) + c.process(cmd) + return cmd +} + +type XPendingExtArgs struct { + Stream string + Group string + Start string + End string + Count int64 + Consumer string +} + +func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + if a.Consumer != "" { + args = append(args, a.Consumer) + } + cmd := NewXPendingExtCmd(args...) + c.process(cmd) + return cmd +} + +type XClaimArgs struct { + Stream string + Group string + Consumer string + MinIdle time.Duration + Messages []string +} + +func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd { + args := xClaimArgs(a) + cmd := NewXMessageSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd { + args := xClaimArgs(a) + args = append(args, "justid") + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func xClaimArgs(a *XClaimArgs) []interface{} { + args := make([]interface{}, 0, 4+len(a.Messages)) + args = append(args, + "xclaim", + a.Stream, + a.Group, a.Consumer, + int64(a.MinIdle/time.Millisecond)) + for _, id := range a.Messages { + args = append(args, id) + } + return args +} + +func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", maxLen) + c.process(cmd) + return cmd +} + +func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +// Z represents sorted set member. +type Z struct { + Score float64 + Member interface{} +} + +// ZWithKey represents sorted set member including the name of the key where it was popped. +type ZWithKey struct { + Z + Key string +} + +// ZStore is used as an arg to ZInterStore and ZUnionStore. +type ZStore struct { + Weights []float64 + // Can be SUM, MIN or MAX. + Aggregate string +} + +// Redis `BZPOPMAX key [key ...] timeout` command. +func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmax" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +// Redis `BZPOPMIN key [key ...] timeout` command. +func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmin" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { + for i, m := range members { + a[n+2*i] = m.Score + a[n+2*i+1] = m.Member + } + cmd := NewIntCmd(a...) + c.process(cmd) + return cmd +} + +// Redis `ZADD key score member [score member ...]` command. +func (c *cmdable) ZAdd(key string, members ...Z) *IntCmd { + const n = 2 + a := make([]interface{}, n+2*len(members)) + a[0], a[1] = "zadd", key + return c.zAdd(a, n, members...) +} + +// Redis `ZADD key NX score member [score member ...]` command. +func (c *cmdable) ZAddNX(key string, members ...Z) *IntCmd { + const n = 3 + a := make([]interface{}, n+2*len(members)) + a[0], a[1], a[2] = "zadd", key, "nx" + return c.zAdd(a, n, members...) +} + +// Redis `ZADD key XX score member [score member ...]` command. +func (c *cmdable) ZAddXX(key string, members ...Z) *IntCmd { + const n = 3 + a := make([]interface{}, n+2*len(members)) + a[0], a[1], a[2] = "zadd", key, "xx" + return c.zAdd(a, n, members...) +} + +// Redis `ZADD key CH score member [score member ...]` command. +func (c *cmdable) ZAddCh(key string, members ...Z) *IntCmd { + const n = 3 + a := make([]interface{}, n+2*len(members)) + a[0], a[1], a[2] = "zadd", key, "ch" + return c.zAdd(a, n, members...) +} + +// Redis `ZADD key NX CH score member [score member ...]` command. +func (c *cmdable) ZAddNXCh(key string, members ...Z) *IntCmd { + const n = 4 + a := make([]interface{}, n+2*len(members)) + a[0], a[1], a[2], a[3] = "zadd", key, "nx", "ch" + return c.zAdd(a, n, members...) +} + +// Redis `ZADD key XX CH score member [score member ...]` command. +func (c *cmdable) ZAddXXCh(key string, members ...Z) *IntCmd { + const n = 4 + a := make([]interface{}, n+2*len(members)) + a[0], a[1], a[2], a[3] = "zadd", key, "xx", "ch" + return c.zAdd(a, n, members...) +} + +func (c *cmdable) zIncr(a []interface{}, n int, members ...Z) *FloatCmd { + for i, m := range members { + a[n+2*i] = m.Score + a[n+2*i+1] = m.Member + } + cmd := NewFloatCmd(a...) + c.process(cmd) + return cmd +} + +// Redis `ZADD key INCR score member` command. +func (c *cmdable) ZIncr(key string, member Z) *FloatCmd { + const n = 3 + a := make([]interface{}, n+2) + a[0], a[1], a[2] = "zadd", key, "incr" + return c.zIncr(a, n, member) +} + +// Redis `ZADD key NX INCR score member` command. +func (c *cmdable) ZIncrNX(key string, member Z) *FloatCmd { + const n = 4 + a := make([]interface{}, n+2) + a[0], a[1], a[2], a[3] = "zadd", key, "incr", "nx" + return c.zIncr(a, n, member) +} + +// Redis `ZADD key XX INCR score member` command. +func (c *cmdable) ZIncrXX(key string, member Z) *FloatCmd { + const n = 4 + a := make([]interface{}, n+2) + a[0], a[1], a[2], a[3] = "zadd", key, "incr", "xx" + return c.zIncr(a, n, member) +} + +func (c *cmdable) ZCard(key string) *IntCmd { + cmd := NewIntCmd("zcard", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZCount(key, min, max string) *IntCmd { + cmd := NewIntCmd("zcount", key, min, max) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZLexCount(key, min, max string) *IntCmd { + cmd := NewIntCmd("zlexcount", key, min, max) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZIncrBy(key string, increment float64, member string) *FloatCmd { + cmd := NewFloatCmd("zincrby", key, increment, member) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string) *IntCmd { + args := make([]interface{}, 3+len(keys)) + args[0] = "zinterstore" + args[1] = destination + args[2] = len(keys) + for i, key := range keys { + args[3+i] = key + } + if len(store.Weights) > 0 { + args = append(args, "weights") + for _, weight := range store.Weights { + args = append(args, weight) + } + } + if store.Aggregate != "" { + args = append(args, "aggregate", store.Aggregate) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd { + args := []interface{}{ + "zpopmax", + key, + } + + switch len(count) { + case 0: + break + case 1: + args = append(args, count[0]) + default: + panic("too many arguments") + } + + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd { + args := []interface{}{ + "zpopmin", + key, + } + + switch len(count) { + case 0: + break + case 1: + args = append(args, count[0]) + default: + panic("too many arguments") + } + + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { + args := []interface{}{ + "zrange", + key, + start, + stop, + } + if withScores { + args = append(args, "withscores") + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRange(key string, start, stop int64) *StringSliceCmd { + return c.zRange(key, start, stop, false) +} + +func (c *cmdable) ZRangeWithScores(key string, start, stop int64) *ZSliceCmd { + cmd := NewZSliceCmd("zrange", key, start, stop, "withscores") + c.process(cmd) + return cmd +} + +type ZRangeBy struct { + Min, Max string + Offset, Count int64 +} + +func (c *cmdable) zRangeBy(zcmd, key string, opt ZRangeBy, withScores bool) *StringSliceCmd { + args := []interface{}{zcmd, key, opt.Min, opt.Max} + if withScores { + args = append(args, "withscores") + } + if opt.Offset != 0 || opt.Count != 0 { + args = append( + args, + "limit", + opt.Offset, + opt.Count, + ) + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd { + return c.zRangeBy("zrangebyscore", key, opt, false) +} + +func (c *cmdable) ZRangeByLex(key string, opt ZRangeBy) *StringSliceCmd { + return c.zRangeBy("zrangebylex", key, opt, false) +} + +func (c *cmdable) ZRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceCmd { + args := []interface{}{"zrangebyscore", key, opt.Min, opt.Max, "withscores"} + if opt.Offset != 0 || opt.Count != 0 { + args = append( + args, + "limit", + opt.Offset, + opt.Count, + ) + } + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRank(key, member string) *IntCmd { + cmd := NewIntCmd("zrank", key, member) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRem(key string, members ...interface{}) *IntCmd { + args := make([]interface{}, 2, 2+len(members)) + args[0] = "zrem" + args[1] = key + args = appendArgs(args, members) + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRemRangeByRank(key string, start, stop int64) *IntCmd { + cmd := NewIntCmd( + "zremrangebyrank", + key, + start, + stop, + ) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRemRangeByScore(key, min, max string) *IntCmd { + cmd := NewIntCmd("zremrangebyscore", key, min, max) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRemRangeByLex(key, min, max string) *IntCmd { + cmd := NewIntCmd("zremrangebylex", key, min, max) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRevRange(key string, start, stop int64) *StringSliceCmd { + cmd := NewStringSliceCmd("zrevrange", key, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRevRangeWithScores(key string, start, stop int64) *ZSliceCmd { + cmd := NewZSliceCmd("zrevrange", key, start, stop, "withscores") + c.process(cmd) + return cmd +} + +func (c *cmdable) zRevRangeBy(zcmd, key string, opt ZRangeBy) *StringSliceCmd { + args := []interface{}{zcmd, key, opt.Max, opt.Min} + if opt.Offset != 0 || opt.Count != 0 { + args = append( + args, + "limit", + opt.Offset, + opt.Count, + ) + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRevRangeByScore(key string, opt ZRangeBy) *StringSliceCmd { + return c.zRevRangeBy("zrevrangebyscore", key, opt) +} + +func (c *cmdable) ZRevRangeByLex(key string, opt ZRangeBy) *StringSliceCmd { + return c.zRevRangeBy("zrevrangebylex", key, opt) +} + +func (c *cmdable) ZRevRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceCmd { + args := []interface{}{"zrevrangebyscore", key, opt.Max, opt.Min, "withscores"} + if opt.Offset != 0 || opt.Count != 0 { + args = append( + args, + "limit", + opt.Offset, + opt.Count, + ) + } + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZRevRank(key, member string) *IntCmd { + cmd := NewIntCmd("zrevrank", key, member) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZScore(key, member string) *FloatCmd { + cmd := NewFloatCmd("zscore", key, member) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZUnionStore(dest string, store ZStore, keys ...string) *IntCmd { + args := make([]interface{}, 3+len(keys)) + args[0] = "zunionstore" + args[1] = dest + args[2] = len(keys) + for i, key := range keys { + args[3+i] = key + } + if len(store.Weights) > 0 { + args = append(args, "weights") + for _, weight := range store.Weights { + args = append(args, weight) + } + } + if store.Aggregate != "" { + args = append(args, "aggregate", store.Aggregate) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) PFAdd(key string, els ...interface{}) *IntCmd { + args := make([]interface{}, 2, 2+len(els)) + args[0] = "pfadd" + args[1] = key + args = appendArgs(args, els) + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) PFCount(keys ...string) *IntCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "pfcount" + for i, key := range keys { + args[1+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) PFMerge(dest string, keys ...string) *StatusCmd { + args := make([]interface{}, 2+len(keys)) + args[0] = "pfmerge" + args[1] = dest + for i, key := range keys { + args[2+i] = key + } + cmd := NewStatusCmd(args...) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) BgRewriteAOF() *StatusCmd { + cmd := NewStatusCmd("bgrewriteaof") + c.process(cmd) + return cmd +} + +func (c *cmdable) BgSave() *StatusCmd { + cmd := NewStatusCmd("bgsave") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientKill(ipPort string) *StatusCmd { + cmd := NewStatusCmd("client", "kill", ipPort) + c.process(cmd) + return cmd +} + +// ClientKillByFilter is new style synx, while the ClientKill is old +// CLIENT KILL <option> [value] ... <option> [value] +func (c *cmdable) ClientKillByFilter(keys ...string) *IntCmd { + args := make([]interface{}, 2+len(keys)) + args[0] = "client" + args[1] = "kill" + for i, key := range keys { + args[2+i] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientList() *StringCmd { + cmd := NewStringCmd("client", "list") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd { + cmd := NewBoolCmd("client", "pause", formatMs(dur)) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientID() *IntCmd { + cmd := NewIntCmd("client", "id") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientUnblock(id int64) *IntCmd { + cmd := NewIntCmd("client", "unblock", id) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd { + cmd := NewIntCmd("client", "unblock", id, "error") + c.process(cmd) + return cmd +} + +// ClientSetName assigns a name to the connection. +func (c *statefulCmdable) ClientSetName(name string) *BoolCmd { + cmd := NewBoolCmd("client", "setname", name) + c.process(cmd) + return cmd +} + +// ClientGetName returns the name of the connection. +func (c *cmdable) ClientGetName() *StringCmd { + cmd := NewStringCmd("client", "getname") + c.process(cmd) + return cmd +} + +func (c *cmdable) ConfigGet(parameter string) *SliceCmd { + cmd := NewSliceCmd("config", "get", parameter) + c.process(cmd) + return cmd +} + +func (c *cmdable) ConfigResetStat() *StatusCmd { + cmd := NewStatusCmd("config", "resetstat") + c.process(cmd) + return cmd +} + +func (c *cmdable) ConfigSet(parameter, value string) *StatusCmd { + cmd := NewStatusCmd("config", "set", parameter, value) + c.process(cmd) + return cmd +} + +func (c *cmdable) ConfigRewrite() *StatusCmd { + cmd := NewStatusCmd("config", "rewrite") + c.process(cmd) + return cmd +} + +// Deperecated. Use DBSize instead. +func (c *cmdable) DbSize() *IntCmd { + return c.DBSize() +} + +func (c *cmdable) DBSize() *IntCmd { + cmd := NewIntCmd("dbsize") + c.process(cmd) + return cmd +} + +func (c *cmdable) FlushAll() *StatusCmd { + cmd := NewStatusCmd("flushall") + c.process(cmd) + return cmd +} + +func (c *cmdable) FlushAllAsync() *StatusCmd { + cmd := NewStatusCmd("flushall", "async") + c.process(cmd) + return cmd +} + +// Deprecated. Use FlushDB instead. +func (c *cmdable) FlushDb() *StatusCmd { + return c.FlushDB() +} + +func (c *cmdable) FlushDB() *StatusCmd { + cmd := NewStatusCmd("flushdb") + c.process(cmd) + return cmd +} + +func (c *cmdable) FlushDBAsync() *StatusCmd { + cmd := NewStatusCmd("flushdb", "async") + c.process(cmd) + return cmd +} + +func (c *cmdable) Info(section ...string) *StringCmd { + args := []interface{}{"info"} + if len(section) > 0 { + args = append(args, section[0]) + } + cmd := NewStringCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) LastSave() *IntCmd { + cmd := NewIntCmd("lastsave") + c.process(cmd) + return cmd +} + +func (c *cmdable) Save() *StatusCmd { + cmd := NewStatusCmd("save") + c.process(cmd) + return cmd +} + +func (c *cmdable) shutdown(modifier string) *StatusCmd { + var args []interface{} + if modifier == "" { + args = []interface{}{"shutdown"} + } else { + args = []interface{}{"shutdown", modifier} + } + cmd := NewStatusCmd(args...) + c.process(cmd) + if err := cmd.Err(); err != nil { + if err == io.EOF { + // Server quit as expected. + cmd.err = nil + } + } else { + // Server did not quit. String reply contains the reason. + cmd.err = errors.New(cmd.val) + cmd.val = "" + } + return cmd +} + +func (c *cmdable) Shutdown() *StatusCmd { + return c.shutdown("") +} + +func (c *cmdable) ShutdownSave() *StatusCmd { + return c.shutdown("save") +} + +func (c *cmdable) ShutdownNoSave() *StatusCmd { + return c.shutdown("nosave") +} + +func (c *cmdable) SlaveOf(host, port string) *StatusCmd { + cmd := NewStatusCmd("slaveof", host, port) + c.process(cmd) + return cmd +} + +func (c *cmdable) SlowLog() { + panic("not implemented") +} + +func (c *cmdable) Sync() { + panic("not implemented") +} + +func (c *cmdable) Time() *TimeCmd { + cmd := NewTimeCmd("time") + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) Eval(script string, keys []string, args ...interface{}) *Cmd { + cmdArgs := make([]interface{}, 3+len(keys), 3+len(keys)+len(args)) + cmdArgs[0] = "eval" + cmdArgs[1] = script + cmdArgs[2] = len(keys) + for i, key := range keys { + cmdArgs[3+i] = key + } + cmdArgs = appendArgs(cmdArgs, args) + cmd := NewCmd(cmdArgs...) + c.process(cmd) + return cmd +} + +func (c *cmdable) EvalSha(sha1 string, keys []string, args ...interface{}) *Cmd { + cmdArgs := make([]interface{}, 3+len(keys), 3+len(keys)+len(args)) + cmdArgs[0] = "evalsha" + cmdArgs[1] = sha1 + cmdArgs[2] = len(keys) + for i, key := range keys { + cmdArgs[3+i] = key + } + cmdArgs = appendArgs(cmdArgs, args) + cmd := NewCmd(cmdArgs...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ScriptExists(hashes ...string) *BoolSliceCmd { + args := make([]interface{}, 2+len(hashes)) + args[0] = "script" + args[1] = "exists" + for i, hash := range hashes { + args[2+i] = hash + } + cmd := NewBoolSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ScriptFlush() *StatusCmd { + cmd := NewStatusCmd("script", "flush") + c.process(cmd) + return cmd +} + +func (c *cmdable) ScriptKill() *StatusCmd { + cmd := NewStatusCmd("script", "kill") + c.process(cmd) + return cmd +} + +func (c *cmdable) ScriptLoad(script string) *StringCmd { + cmd := NewStringCmd("script", "load", script) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) DebugObject(key string) *StringCmd { + cmd := NewStringCmd("debug", "object", key) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +// Publish posts the message to the channel. +func (c *cmdable) Publish(channel string, message interface{}) *IntCmd { + cmd := NewIntCmd("publish", channel, message) + c.process(cmd) + return cmd +} + +func (c *cmdable) PubSubChannels(pattern string) *StringSliceCmd { + args := []interface{}{"pubsub", "channels"} + if pattern != "*" { + args = append(args, pattern) + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) PubSubNumSub(channels ...string) *StringIntMapCmd { + args := make([]interface{}, 2+len(channels)) + args[0] = "pubsub" + args[1] = "numsub" + for i, channel := range channels { + args[2+i] = channel + } + cmd := NewStringIntMapCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) PubSubNumPat() *IntCmd { + cmd := NewIntCmd("pubsub", "numpat") + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) ClusterSlots() *ClusterSlotsCmd { + cmd := NewClusterSlotsCmd("cluster", "slots") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterNodes() *StringCmd { + cmd := NewStringCmd("cluster", "nodes") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterMeet(host, port string) *StatusCmd { + cmd := NewStatusCmd("cluster", "meet", host, port) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterForget(nodeID string) *StatusCmd { + cmd := NewStatusCmd("cluster", "forget", nodeID) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterReplicate(nodeID string) *StatusCmd { + cmd := NewStatusCmd("cluster", "replicate", nodeID) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterResetSoft() *StatusCmd { + cmd := NewStatusCmd("cluster", "reset", "soft") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterResetHard() *StatusCmd { + cmd := NewStatusCmd("cluster", "reset", "hard") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterInfo() *StringCmd { + cmd := NewStringCmd("cluster", "info") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterKeySlot(key string) *IntCmd { + cmd := NewIntCmd("cluster", "keyslot", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd { + cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd { + cmd := NewIntCmd("cluster", "count-failure-reports", nodeID) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterCountKeysInSlot(slot int) *IntCmd { + cmd := NewIntCmd("cluster", "countkeysinslot", slot) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterDelSlots(slots ...int) *StatusCmd { + args := make([]interface{}, 2+len(slots)) + args[0] = "cluster" + args[1] = "delslots" + for i, slot := range slots { + args[2+i] = slot + } + cmd := NewStatusCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterDelSlotsRange(min, max int) *StatusCmd { + size := max - min + 1 + slots := make([]int, size) + for i := 0; i < size; i++ { + slots[i] = min + i + } + return c.ClusterDelSlots(slots...) +} + +func (c *cmdable) ClusterSaveConfig() *StatusCmd { + cmd := NewStatusCmd("cluster", "saveconfig") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterSlaves(nodeID string) *StringSliceCmd { + cmd := NewStringSliceCmd("cluster", "slaves", nodeID) + c.process(cmd) + return cmd +} + +func (c *cmdable) ReadOnly() *StatusCmd { + cmd := NewStatusCmd("readonly") + c.process(cmd) + return cmd +} + +func (c *cmdable) ReadWrite() *StatusCmd { + cmd := NewStatusCmd("readwrite") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterFailover() *StatusCmd { + cmd := NewStatusCmd("cluster", "failover") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterAddSlots(slots ...int) *StatusCmd { + args := make([]interface{}, 2+len(slots)) + args[0] = "cluster" + args[1] = "addslots" + for i, num := range slots { + args[2+i] = num + } + cmd := NewStatusCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClusterAddSlotsRange(min, max int) *StatusCmd { + size := max - min + 1 + slots := make([]int, size) + for i := 0; i < size; i++ { + slots[i] = min + i + } + return c.ClusterAddSlots(slots...) +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd { + args := make([]interface{}, 2+3*len(geoLocation)) + args[0] = "geoadd" + args[1] = key + for i, eachLoc := range geoLocation { + args[2+3*i] = eachLoc.Longitude + args[2+3*i+1] = eachLoc.Latitude + args[2+3*i+2] = eachLoc.Name + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd { + cmd := NewGeoLocationCmd(query, "georadius", key, longitude, latitude) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoRadiusRO(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd { + cmd := NewGeoLocationCmd(query, "georadius_ro", key, longitude, latitude) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) *GeoLocationCmd { + cmd := NewGeoLocationCmd(query, "georadiusbymember", key, member) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoRadiusByMemberRO(key, member string, query *GeoRadiusQuery) *GeoLocationCmd { + cmd := NewGeoLocationCmd(query, "georadiusbymember_ro", key, member) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoDist(key string, member1, member2, unit string) *FloatCmd { + if unit == "" { + unit = "km" + } + cmd := NewFloatCmd("geodist", key, member1, member2, unit) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoHash(key string, members ...string) *StringSliceCmd { + args := make([]interface{}, 2+len(members)) + args[0] = "geohash" + args[1] = key + for i, member := range members { + args[2+i] = member + } + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) GeoPos(key string, members ...string) *GeoPosCmd { + args := make([]interface{}, 2+len(members)) + args[0] = "geopos" + args[1] = key + for i, member := range members { + args[2+i] = member + } + cmd := NewGeoPosCmd(args...) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + +func (c *cmdable) MemoryUsage(key string, samples ...int) *IntCmd { + args := []interface{}{"memory", "usage", key} + if len(samples) > 0 { + if len(samples) != 1 { + panic("MemoryUsage expects single sample count") + } + args = append(args, "SAMPLES", samples[0]) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} diff --git a/vendor/github.com/go-redis/redis/doc.go b/vendor/github.com/go-redis/redis/doc.go new file mode 100644 index 0000000000..55262533a6 --- /dev/null +++ b/vendor/github.com/go-redis/redis/doc.go @@ -0,0 +1,4 @@ +/* +Package redis implements a Redis client. +*/ +package redis diff --git a/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go b/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go new file mode 100644 index 0000000000..a9c56f0762 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go @@ -0,0 +1,81 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package consistenthash provides an implementation of a ring hash. +package consistenthash + +import ( + "hash/crc32" + "sort" + "strconv" +) + +type Hash func(data []byte) uint32 + +type Map struct { + hash Hash + replicas int + keys []int // Sorted + hashMap map[int]string +} + +func New(replicas int, fn Hash) *Map { + m := &Map{ + replicas: replicas, + hash: fn, + hashMap: make(map[int]string), + } + if m.hash == nil { + m.hash = crc32.ChecksumIEEE + } + return m +} + +// Returns true if there are no items available. +func (m *Map) IsEmpty() bool { + return len(m.keys) == 0 +} + +// Adds some keys to the hash. +func (m *Map) Add(keys ...string) { + for _, key := range keys { + for i := 0; i < m.replicas; i++ { + hash := int(m.hash([]byte(strconv.Itoa(i) + key))) + m.keys = append(m.keys, hash) + m.hashMap[hash] = key + } + } + sort.Ints(m.keys) +} + +// Gets the closest item in the hash to the provided key. +func (m *Map) Get(key string) string { + if m.IsEmpty() { + return "" + } + + hash := int(m.hash([]byte(key))) + + // Binary search for appropriate replica. + idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) + + // Means we have cycled back to the first replica. + if idx == len(m.keys) { + idx = 0 + } + + return m.hashMap[m.keys[idx]] +} diff --git a/vendor/github.com/go-redis/redis/internal/error.go b/vendor/github.com/go-redis/redis/internal/error.go new file mode 100644 index 0000000000..34f6bd4dc7 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/error.go @@ -0,0 +1,89 @@ +package internal + +import ( + "io" + "net" + "strings" + + "github.com/go-redis/redis/internal/proto" +) + +func IsRetryableError(err error, retryTimeout bool) bool { + if err == nil { + return false + } + if err == io.EOF { + return true + } + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + return retryTimeout + } + return true + } + s := err.Error() + if s == "ERR max number of clients reached" { + return true + } + if strings.HasPrefix(s, "LOADING ") { + return true + } + if strings.HasPrefix(s, "READONLY ") { + return true + } + if strings.HasPrefix(s, "CLUSTERDOWN ") { + return true + } + return false +} + +func IsRedisError(err error) bool { + _, ok := err.(proto.RedisError) + return ok +} + +func IsBadConn(err error, allowTimeout bool) bool { + if err == nil { + return false + } + if IsRedisError(err) { + // #790 + return IsReadOnlyError(err) + } + if allowTimeout { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + return false + } + } + return true +} + +func IsMovedError(err error) (moved bool, ask bool, addr string) { + if !IsRedisError(err) { + return + } + + s := err.Error() + if strings.HasPrefix(s, "MOVED ") { + moved = true + } else if strings.HasPrefix(s, "ASK ") { + ask = true + } else { + return + } + + ind := strings.LastIndex(s, " ") + if ind == -1 { + return false, false, "" + } + addr = s[ind+1:] + return +} + +func IsLoadingError(err error) bool { + return strings.HasPrefix(err.Error(), "LOADING ") +} + +func IsReadOnlyError(err error) bool { + return strings.HasPrefix(err.Error(), "READONLY ") +} diff --git a/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go b/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go new file mode 100644 index 0000000000..22f5b3981b --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go @@ -0,0 +1,77 @@ +package hashtag + +import ( + "math/rand" + "strings" +) + +const slotNumber = 16384 + +// CRC16 implementation according to CCITT standards. +// Copyright 2001-2010 Georges Menie (www.menie.org) +// Copyright 2013 The Go Authors. All rights reserved. +// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c +var crc16tab = [256]uint16{ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, +} + +func Key(key string) string { + if s := strings.IndexByte(key, '{'); s > -1 { + if e := strings.IndexByte(key[s+1:], '}'); e > 0 { + return key[s+1 : s+e+1] + } + } + return key +} + +func RandomSlot() int { + return rand.Intn(slotNumber) +} + +// hashSlot returns a consistent slot number between 0 and 16383 +// for any given string key. +func Slot(key string) int { + if key == "" { + return RandomSlot() + } + key = Key(key) + return int(crc16sum(key)) % slotNumber +} + +func crc16sum(key string) (crc uint16) { + for i := 0; i < len(key); i++ { + crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff] + } + return +} diff --git a/vendor/github.com/go-redis/redis/internal/internal.go b/vendor/github.com/go-redis/redis/internal/internal.go new file mode 100644 index 0000000000..ad3fc3c9ff --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/internal.go @@ -0,0 +1,24 @@ +package internal + +import ( + "math/rand" + "time" +) + +// Retry backoff with jitter sleep to prevent overloaded conditions during intervals +// https://www.awsarchitectureblog.com/2015/03/backoff.html +func RetryBackoff(retry int, minBackoff, maxBackoff time.Duration) time.Duration { + if retry < 0 { + retry = 0 + } + + backoff := minBackoff << uint(retry) + if backoff > maxBackoff || backoff < minBackoff { + backoff = maxBackoff + } + + if backoff == 0 { + return 0 + } + return time.Duration(rand.Int63n(int64(backoff))) +} diff --git a/vendor/github.com/go-redis/redis/internal/log.go b/vendor/github.com/go-redis/redis/internal/log.go new file mode 100644 index 0000000000..fd14222eee --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/log.go @@ -0,0 +1,15 @@ +package internal + +import ( + "fmt" + "log" +) + +var Logger *log.Logger + +func Logf(s string, args ...interface{}) { + if Logger == nil { + return + } + Logger.Output(2, fmt.Sprintf(s, args...)) +} diff --git a/vendor/github.com/go-redis/redis/internal/once.go b/vendor/github.com/go-redis/redis/internal/once.go new file mode 100644 index 0000000000..64f46272ae --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/once.go @@ -0,0 +1,60 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "sync" + "sync/atomic" +) + +// A Once will perform a successful action exactly once. +// +// Unlike a sync.Once, this Once's func returns an error +// and is re-armed on failure. +type Once struct { + m sync.Mutex + done uint32 +} + +// Do calls the function f if and only if Do has not been invoked +// without error for this instance of Once. In other words, given +// var once Once +// if once.Do(f) is called multiple times, only the first call will +// invoke f, even if f has a different value in each invocation unless +// f returns an error. A new instance of Once is required for each +// function to execute. +// +// Do is intended for initialization that must be run exactly once. Since f +// is niladic, it may be necessary to use a function literal to capture the +// arguments to a function to be invoked by Do: +// err := config.once.Do(func() error { return config.init(filename) }) +func (o *Once) Do(f func() error) error { + if atomic.LoadUint32(&o.done) == 1 { + return nil + } + // Slow-path. + o.m.Lock() + defer o.m.Unlock() + var err error + if o.done == 0 { + err = f() + if err == nil { + atomic.StoreUint32(&o.done, 1) + } + } + return err +} diff --git a/vendor/github.com/go-redis/redis/internal/pool/conn.go b/vendor/github.com/go-redis/redis/internal/pool/conn.go new file mode 100644 index 0000000000..1095bfe59b --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -0,0 +1,93 @@ +package pool + +import ( + "net" + "sync/atomic" + "time" + + "github.com/go-redis/redis/internal/proto" +) + +var noDeadline = time.Time{} + +type Conn struct { + netConn net.Conn + + rd *proto.Reader + rdLocked bool + wr *proto.Writer + + InitedAt time.Time + pooled bool + usedAt atomic.Value +} + +func NewConn(netConn net.Conn) *Conn { + cn := &Conn{ + netConn: netConn, + } + cn.rd = proto.NewReader(netConn) + cn.wr = proto.NewWriter(netConn) + cn.SetUsedAt(time.Now()) + return cn +} + +func (cn *Conn) UsedAt() time.Time { + return cn.usedAt.Load().(time.Time) +} + +func (cn *Conn) SetUsedAt(tm time.Time) { + cn.usedAt.Store(tm) +} + +func (cn *Conn) SetNetConn(netConn net.Conn) { + cn.netConn = netConn + cn.rd.Reset(netConn) + cn.wr.Reset(netConn) +} + +func (cn *Conn) setReadTimeout(timeout time.Duration) error { + now := time.Now() + cn.SetUsedAt(now) + if timeout > 0 { + return cn.netConn.SetReadDeadline(now.Add(timeout)) + } + return cn.netConn.SetReadDeadline(noDeadline) +} + +func (cn *Conn) setWriteTimeout(timeout time.Duration) error { + now := time.Now() + cn.SetUsedAt(now) + if timeout > 0 { + return cn.netConn.SetWriteDeadline(now.Add(timeout)) + } + return cn.netConn.SetWriteDeadline(noDeadline) +} + +func (cn *Conn) Write(b []byte) (int, error) { + return cn.netConn.Write(b) +} + +func (cn *Conn) RemoteAddr() net.Addr { + return cn.netConn.RemoteAddr() +} + +func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error { + _ = cn.setReadTimeout(timeout) + return fn(cn.rd) +} + +func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error { + _ = cn.setWriteTimeout(timeout) + + firstErr := fn(cn.wr) + err := cn.wr.Flush() + if err != nil && firstErr == nil { + firstErr = err + } + return firstErr +} + +func (cn *Conn) Close() error { + return cn.netConn.Close() +} diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go new file mode 100644 index 0000000000..9cecee8ad4 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -0,0 +1,476 @@ +package pool + +import ( + "errors" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/go-redis/redis/internal" +) + +var ErrClosed = errors.New("redis: client is closed") +var ErrPoolTimeout = errors.New("redis: connection pool timeout") + +var timers = sync.Pool{ + New: func() interface{} { + t := time.NewTimer(time.Hour) + t.Stop() + return t + }, +} + +// Stats contains pool state information and accumulated stats. +type Stats struct { + Hits uint32 // number of times free connection was found in the pool + Misses uint32 // number of times free connection was NOT found in the pool + Timeouts uint32 // number of times a wait timeout occurred + + TotalConns uint32 // number of total connections in the pool + IdleConns uint32 // number of idle connections in the pool + StaleConns uint32 // number of stale connections removed from the pool +} + +type Pooler interface { + NewConn() (*Conn, error) + CloseConn(*Conn) error + + Get() (*Conn, error) + Put(*Conn) + Remove(*Conn) + + Len() int + IdleLen() int + Stats() *Stats + + Close() error +} + +type Options struct { + Dialer func() (net.Conn, error) + OnClose func(*Conn) error + + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration +} + +type ConnPool struct { + opt *Options + + dialErrorsNum uint32 // atomic + + lastDialErrorMu sync.RWMutex + lastDialError error + + queue chan struct{} + + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int + + stats Stats + + _closed uint32 // atomic +} + +var _ Pooler = (*ConnPool)(nil) + +func NewConnPool(opt *Options) *ConnPool { + p := &ConnPool{ + opt: opt, + + queue: make(chan struct{}, opt.PoolSize), + conns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), + } + + for i := 0; i < opt.MinIdleConns; i++ { + p.checkMinIdleConns() + } + + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { + go p.reaper(opt.IdleCheckFrequency) + } + + return p +} + +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go p.addIdleConn() + } +} + +func (p *ConnPool) addIdleConn() { + cn, err := p.newConn(true) + if err != nil { + return + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() +} + +func (p *ConnPool) NewConn() (*Conn, error) { + return p._NewConn(false) +} + +func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) { + cn, err := p.newConn(pooled) + if err != nil { + return nil, err + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + if pooled { + if p.poolSize < p.opt.PoolSize { + p.poolSize++ + } else { + cn.pooled = false + } + } + p.connsMu.Unlock() + return cn, nil +} + +func (p *ConnPool) newConn(pooled bool) (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + + if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { + return nil, p.getLastDialError() + } + + netConn, err := p.opt.Dialer() + if err != nil { + p.setLastDialError(err) + if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { + go p.tryDial() + } + return nil, err + } + + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil +} + +func (p *ConnPool) tryDial() { + for { + if p.closed() { + return + } + + conn, err := p.opt.Dialer() + if err != nil { + p.setLastDialError(err) + time.Sleep(time.Second) + continue + } + + atomic.StoreUint32(&p.dialErrorsNum, 0) + _ = conn.Close() + return + } +} + +func (p *ConnPool) setLastDialError(err error) { + p.lastDialErrorMu.Lock() + p.lastDialError = err + p.lastDialErrorMu.Unlock() +} + +func (p *ConnPool) getLastDialError() error { + p.lastDialErrorMu.RLock() + err := p.lastDialError + p.lastDialErrorMu.RUnlock() + return err +} + +// Get returns existed connection from the pool or creates a new one. +func (p *ConnPool) Get() (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + + err := p.waitTurn() + if err != nil { + return nil, err + } + + for { + p.connsMu.Lock() + cn := p.popIdle() + p.connsMu.Unlock() + + if cn == nil { + break + } + + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) + continue + } + + atomic.AddUint32(&p.stats.Hits, 1) + return cn, nil + } + + atomic.AddUint32(&p.stats.Misses, 1) + + newcn, err := p._NewConn(true) + if err != nil { + p.freeTurn() + return nil, err + } + + return newcn, nil +} + +func (p *ConnPool) getTurn() { + p.queue <- struct{}{} +} + +func (p *ConnPool) waitTurn() error { + select { + case p.queue <- struct{}{}: + return nil + default: + timer := timers.Get().(*time.Timer) + timer.Reset(p.opt.PoolTimeout) + + select { + case p.queue <- struct{}{}: + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return nil + case <-timer.C: + timers.Put(timer) + atomic.AddUint32(&p.stats.Timeouts, 1) + return ErrPoolTimeout + } + } +} + +func (p *ConnPool) freeTurn() { + <-p.queue +} + +func (p *ConnPool) popIdle() *Conn { + if len(p.idleConns) == 0 { + return nil + } + + idx := len(p.idleConns) - 1 + cn := p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + p.idleConnsLen-- + p.checkMinIdleConns() + return cn +} + +func (p *ConnPool) Put(cn *Conn) { + if !cn.pooled { + p.Remove(cn) + return + } + + p.connsMu.Lock() + p.idleConns = append(p.idleConns, cn) + p.idleConnsLen++ + p.connsMu.Unlock() + p.freeTurn() +} + +func (p *ConnPool) Remove(cn *Conn) { + p.removeConn(cn) + p.freeTurn() + _ = p.closeConn(cn) +} + +func (p *ConnPool) CloseConn(cn *Conn) error { + p.removeConn(cn) + return p.closeConn(cn) +} + +func (p *ConnPool) removeConn(cn *Conn) { + p.connsMu.Lock() + for i, c := range p.conns { + if c == cn { + p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } + break + } + } + p.connsMu.Unlock() +} + +func (p *ConnPool) closeConn(cn *Conn) error { + if p.opt.OnClose != nil { + _ = p.opt.OnClose(cn) + } + return cn.Close() +} + +// Len returns total number of connections. +func (p *ConnPool) Len() int { + p.connsMu.Lock() + n := len(p.conns) + p.connsMu.Unlock() + return n +} + +// IdleLen returns number of idle connections. +func (p *ConnPool) IdleLen() int { + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n +} + +func (p *ConnPool) Stats() *Stats { + idleLen := p.IdleLen() + return &Stats{ + Hits: atomic.LoadUint32(&p.stats.Hits), + Misses: atomic.LoadUint32(&p.stats.Misses), + Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + + TotalConns: uint32(p.Len()), + IdleConns: uint32(idleLen), + StaleConns: atomic.LoadUint32(&p.stats.StaleConns), + } +} + +func (p *ConnPool) closed() bool { + return atomic.LoadUint32(&p._closed) == 1 +} + +func (p *ConnPool) Filter(fn func(*Conn) bool) error { + var firstErr error + p.connsMu.Lock() + for _, cn := range p.conns { + if fn(cn) { + if err := p.closeConn(cn); err != nil && firstErr == nil { + firstErr = err + } + } + } + p.connsMu.Unlock() + return firstErr +} + +func (p *ConnPool) Close() error { + if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) { + return ErrClosed + } + + var firstErr error + p.connsMu.Lock() + for _, cn := range p.conns { + if err := p.closeConn(cn); err != nil && firstErr == nil { + firstErr = err + } + } + p.conns = nil + p.poolSize = 0 + p.idleConns = nil + p.idleConnsLen = 0 + p.connsMu.Unlock() + + return firstErr +} + +func (p *ConnPool) reapStaleConn() *Conn { + if len(p.idleConns) == 0 { + return nil + } + + cn := p.idleConns[0] + if !p.isStaleConn(cn) { + return nil + } + + p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- + + return cn +} + +func (p *ConnPool) ReapStaleConns() (int, error) { + var n int + for { + p.getTurn() + + p.connsMu.Lock() + cn := p.reapStaleConn() + p.connsMu.Unlock() + + if cn != nil { + p.removeConn(cn) + } + + p.freeTurn() + + if cn != nil { + p.closeConn(cn) + n++ + } else { + break + } + } + return n, nil +} + +func (p *ConnPool) reaper(frequency time.Duration) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + for range ticker.C { + if p.closed() { + break + } + n, err := p.ReapStaleConns() + if err != nil { + internal.Logf("ReapStaleConns failed: %s", err) + continue + } + atomic.AddUint32(&p.stats.StaleConns, uint32(n)) + } +} + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go new file mode 100644 index 0000000000..b35b78afbd --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go @@ -0,0 +1,53 @@ +package pool + +type SingleConnPool struct { + cn *Conn +} + +var _ Pooler = (*SingleConnPool)(nil) + +func NewSingleConnPool(cn *Conn) *SingleConnPool { + return &SingleConnPool{ + cn: cn, + } +} + +func (p *SingleConnPool) NewConn() (*Conn, error) { + panic("not implemented") +} + +func (p *SingleConnPool) CloseConn(*Conn) error { + panic("not implemented") +} + +func (p *SingleConnPool) Get() (*Conn, error) { + return p.cn, nil +} + +func (p *SingleConnPool) Put(cn *Conn) { + if p.cn != cn { + panic("p.cn != cn") + } +} + +func (p *SingleConnPool) Remove(cn *Conn) { + if p.cn != cn { + panic("p.cn != cn") + } +} + +func (p *SingleConnPool) Len() int { + return 1 +} + +func (p *SingleConnPool) IdleLen() int { + return 0 +} + +func (p *SingleConnPool) Stats() *Stats { + return nil +} + +func (p *SingleConnPool) Close() error { + return nil +} diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go new file mode 100644 index 0000000000..91bd913330 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go @@ -0,0 +1,109 @@ +package pool + +import "sync" + +type StickyConnPool struct { + pool *ConnPool + reusable bool + + cn *Conn + closed bool + mu sync.Mutex +} + +var _ Pooler = (*StickyConnPool)(nil) + +func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool { + return &StickyConnPool{ + pool: pool, + reusable: reusable, + } +} + +func (p *StickyConnPool) NewConn() (*Conn, error) { + panic("not implemented") +} + +func (p *StickyConnPool) CloseConn(*Conn) error { + panic("not implemented") +} + +func (p *StickyConnPool) Get() (*Conn, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil, ErrClosed + } + if p.cn != nil { + return p.cn, nil + } + + cn, err := p.pool.Get() + if err != nil { + return nil, err + } + + p.cn = cn + return cn, nil +} + +func (p *StickyConnPool) putUpstream() { + p.pool.Put(p.cn) + p.cn = nil +} + +func (p *StickyConnPool) Put(cn *Conn) {} + +func (p *StickyConnPool) removeUpstream() { + p.pool.Remove(p.cn) + p.cn = nil +} + +func (p *StickyConnPool) Remove(cn *Conn) { + p.removeUpstream() +} + +func (p *StickyConnPool) Len() int { + p.mu.Lock() + defer p.mu.Unlock() + + if p.cn == nil { + return 0 + } + return 1 +} + +func (p *StickyConnPool) IdleLen() int { + p.mu.Lock() + defer p.mu.Unlock() + + if p.cn == nil { + return 1 + } + return 0 +} + +func (p *StickyConnPool) Stats() *Stats { + return nil +} + +func (p *StickyConnPool) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return ErrClosed + } + p.closed = true + + if p.cn != nil { + if p.reusable { + p.putUpstream() + } else { + p.removeUpstream() + } + } + + return nil +} diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader.go b/vendor/github.com/go-redis/redis/internal/proto/reader.go new file mode 100644 index 0000000000..896b6f654c --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/proto/reader.go @@ -0,0 +1,290 @@ +package proto + +import ( + "bufio" + "fmt" + "io" + "strconv" + + "github.com/go-redis/redis/internal/util" +) + +const ( + ErrorReply = '-' + StatusReply = '+' + IntReply = ':' + StringReply = '$' + ArrayReply = '*' +) + +//------------------------------------------------------------------------------ + +const Nil = RedisError("redis: nil") + +type RedisError string + +func (e RedisError) Error() string { return string(e) } + +//------------------------------------------------------------------------------ + +type MultiBulkParse func(*Reader, int64) (interface{}, error) + +type Reader struct { + rd *bufio.Reader + _buf []byte +} + +func NewReader(rd io.Reader) *Reader { + return &Reader{ + rd: bufio.NewReader(rd), + _buf: make([]byte, 64), + } +} + +func (r *Reader) Reset(rd io.Reader) { + r.rd.Reset(rd) +} + +func (r *Reader) ReadLine() ([]byte, error) { + line, isPrefix, err := r.rd.ReadLine() + if err != nil { + return nil, err + } + if isPrefix { + return nil, bufio.ErrBufferFull + } + if len(line) == 0 { + return nil, fmt.Errorf("redis: reply is empty") + } + if isNilReply(line) { + return nil, Nil + } + return line, nil +} + +func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { + line, err := r.ReadLine() + if err != nil { + return nil, err + } + + switch line[0] { + case ErrorReply: + return nil, ParseErrorReply(line) + case StatusReply: + return string(line[1:]), nil + case IntReply: + return util.ParseInt(line[1:], 10, 64) + case StringReply: + return r.readStringReply(line) + case ArrayReply: + n, err := parseArrayLen(line) + if err != nil { + return nil, err + } + return m(r, n) + } + return nil, fmt.Errorf("redis: can't parse %.100q", line) +} + +func (r *Reader) ReadIntReply() (int64, error) { + line, err := r.ReadLine() + if err != nil { + return 0, err + } + switch line[0] { + case ErrorReply: + return 0, ParseErrorReply(line) + case IntReply: + return util.ParseInt(line[1:], 10, 64) + default: + return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line) + } +} + +func (r *Reader) ReadString() (string, error) { + line, err := r.ReadLine() + if err != nil { + return "", err + } + switch line[0] { + case ErrorReply: + return "", ParseErrorReply(line) + case StringReply: + return r.readStringReply(line) + case StatusReply: + return string(line[1:]), nil + case IntReply: + return string(line[1:]), nil + default: + return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line) + } +} + +func (r *Reader) readStringReply(line []byte) (string, error) { + if isNilReply(line) { + return "", Nil + } + + replyLen, err := strconv.Atoi(string(line[1:])) + if err != nil { + return "", err + } + + b := make([]byte, replyLen+2) + _, err = io.ReadFull(r.rd, b) + if err != nil { + return "", err + } + + return util.BytesToString(b[:replyLen]), nil +} + +func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { + line, err := r.ReadLine() + if err != nil { + return nil, err + } + switch line[0] { + case ErrorReply: + return nil, ParseErrorReply(line) + case ArrayReply: + n, err := parseArrayLen(line) + if err != nil { + return nil, err + } + return m(r, n) + default: + return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line) + } +} + +func (r *Reader) ReadArrayLen() (int64, error) { + line, err := r.ReadLine() + if err != nil { + return 0, err + } + switch line[0] { + case ErrorReply: + return 0, ParseErrorReply(line) + case ArrayReply: + return parseArrayLen(line) + default: + return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line) + } +} + +func (r *Reader) ReadScanReply() ([]string, uint64, error) { + n, err := r.ReadArrayLen() + if err != nil { + return nil, 0, err + } + if n != 2 { + return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n) + } + + cursor, err := r.ReadUint() + if err != nil { + return nil, 0, err + } + + n, err = r.ReadArrayLen() + if err != nil { + return nil, 0, err + } + + keys := make([]string, n) + for i := int64(0); i < n; i++ { + key, err := r.ReadString() + if err != nil { + return nil, 0, err + } + keys[i] = key + } + + return keys, cursor, err +} + +func (r *Reader) ReadInt() (int64, error) { + b, err := r.readTmpBytesReply() + if err != nil { + return 0, err + } + return util.ParseInt(b, 10, 64) +} + +func (r *Reader) ReadUint() (uint64, error) { + b, err := r.readTmpBytesReply() + if err != nil { + return 0, err + } + return util.ParseUint(b, 10, 64) +} + +func (r *Reader) ReadFloatReply() (float64, error) { + b, err := r.readTmpBytesReply() + if err != nil { + return 0, err + } + return util.ParseFloat(b, 64) +} + +func (r *Reader) readTmpBytesReply() ([]byte, error) { + line, err := r.ReadLine() + if err != nil { + return nil, err + } + switch line[0] { + case ErrorReply: + return nil, ParseErrorReply(line) + case StringReply: + return r._readTmpBytesReply(line) + case StatusReply: + return line[1:], nil + default: + return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) + } +} + +func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) { + if isNilReply(line) { + return nil, Nil + } + + replyLen, err := strconv.Atoi(string(line[1:])) + if err != nil { + return nil, err + } + + buf := r.buf(replyLen + 2) + _, err = io.ReadFull(r.rd, buf) + if err != nil { + return nil, err + } + + return buf[:replyLen], nil +} + +func (r *Reader) buf(n int) []byte { + if d := n - cap(r._buf); d > 0 { + r._buf = append(r._buf, make([]byte, d)...) + } + return r._buf[:n] +} + +func isNilReply(b []byte) bool { + return len(b) == 3 && + (b[0] == StringReply || b[0] == ArrayReply) && + b[1] == '-' && b[2] == '1' +} + +func ParseErrorReply(line []byte) error { + return RedisError(string(line[1:])) +} + +func parseArrayLen(line []byte) (int64, error) { + if isNilReply(line) { + return 0, Nil + } + return util.ParseInt(line[1:], 10, 64) +} diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan.go b/vendor/github.com/go-redis/redis/internal/proto/scan.go new file mode 100644 index 0000000000..3bdb33f9db --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/proto/scan.go @@ -0,0 +1,166 @@ +package proto + +import ( + "encoding" + "fmt" + "reflect" + + "github.com/go-redis/redis/internal/util" +) + +func Scan(b []byte, v interface{}) error { + switch v := v.(type) { + case nil: + return fmt.Errorf("redis: Scan(nil)") + case *string: + *v = util.BytesToString(b) + return nil + case *[]byte: + *v = b + return nil + case *int: + var err error + *v, err = util.Atoi(b) + return err + case *int8: + n, err := util.ParseInt(b, 10, 8) + if err != nil { + return err + } + *v = int8(n) + return nil + case *int16: + n, err := util.ParseInt(b, 10, 16) + if err != nil { + return err + } + *v = int16(n) + return nil + case *int32: + n, err := util.ParseInt(b, 10, 32) + if err != nil { + return err + } + *v = int32(n) + return nil + case *int64: + n, err := util.ParseInt(b, 10, 64) + if err != nil { + return err + } + *v = n + return nil + case *uint: + n, err := util.ParseUint(b, 10, 64) + if err != nil { + return err + } + *v = uint(n) + return nil + case *uint8: + n, err := util.ParseUint(b, 10, 8) + if err != nil { + return err + } + *v = uint8(n) + return nil + case *uint16: + n, err := util.ParseUint(b, 10, 16) + if err != nil { + return err + } + *v = uint16(n) + return nil + case *uint32: + n, err := util.ParseUint(b, 10, 32) + if err != nil { + return err + } + *v = uint32(n) + return nil + case *uint64: + n, err := util.ParseUint(b, 10, 64) + if err != nil { + return err + } + *v = n + return nil + case *float32: + n, err := util.ParseFloat(b, 32) + if err != nil { + return err + } + *v = float32(n) + return err + case *float64: + var err error + *v, err = util.ParseFloat(b, 64) + return err + case *bool: + *v = len(b) == 1 && b[0] == '1' + return nil + case encoding.BinaryUnmarshaler: + return v.UnmarshalBinary(b) + default: + return fmt.Errorf( + "redis: can't unmarshal %T (consider implementing BinaryUnmarshaler)", v) + } +} + +func ScanSlice(data []string, slice interface{}) error { + v := reflect.ValueOf(slice) + if !v.IsValid() { + return fmt.Errorf("redis: ScanSlice(nil)") + } + if v.Kind() != reflect.Ptr { + return fmt.Errorf("redis: ScanSlice(non-pointer %T)", slice) + } + v = v.Elem() + if v.Kind() != reflect.Slice { + return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice) + } + + next := makeSliceNextElemFunc(v) + for i, s := range data { + elem := next() + if err := Scan([]byte(s), elem.Addr().Interface()); err != nil { + err = fmt.Errorf("redis: ScanSlice index=%d value=%q failed: %s", i, s, err) + return err + } + } + + return nil +} + +func makeSliceNextElemFunc(v reflect.Value) func() reflect.Value { + elemType := v.Type().Elem() + + if elemType.Kind() == reflect.Ptr { + elemType = elemType.Elem() + return func() reflect.Value { + if v.Len() < v.Cap() { + v.Set(v.Slice(0, v.Len()+1)) + elem := v.Index(v.Len() - 1) + if elem.IsNil() { + elem.Set(reflect.New(elemType)) + } + return elem.Elem() + } + + elem := reflect.New(elemType) + v.Set(reflect.Append(v, elem)) + return elem.Elem() + } + } + + zero := reflect.Zero(elemType) + return func() reflect.Value { + if v.Len() < v.Cap() { + v.Set(v.Slice(0, v.Len()+1)) + return v.Index(v.Len() - 1) + } + + v.Set(reflect.Append(v, zero)) + return v.Index(v.Len() - 1) + } +} diff --git a/vendor/github.com/go-redis/redis/internal/proto/writer.go b/vendor/github.com/go-redis/redis/internal/proto/writer.go new file mode 100644 index 0000000000..d106ce0ee6 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/proto/writer.go @@ -0,0 +1,159 @@ +package proto + +import ( + "bufio" + "encoding" + "fmt" + "io" + "strconv" + + "github.com/go-redis/redis/internal/util" +) + +type Writer struct { + wr *bufio.Writer + + lenBuf []byte + numBuf []byte +} + +func NewWriter(wr io.Writer) *Writer { + return &Writer{ + wr: bufio.NewWriter(wr), + + lenBuf: make([]byte, 64), + numBuf: make([]byte, 64), + } +} + +func (w *Writer) WriteArgs(args []interface{}) error { + err := w.wr.WriteByte(ArrayReply) + if err != nil { + return err + } + + err = w.writeLen(len(args)) + if err != nil { + return err + } + + for _, arg := range args { + err := w.writeArg(arg) + if err != nil { + return err + } + } + + return nil +} + +func (w *Writer) writeLen(n int) error { + w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10) + w.lenBuf = append(w.lenBuf, '\r', '\n') + _, err := w.wr.Write(w.lenBuf) + return err +} + +func (w *Writer) writeArg(v interface{}) error { + switch v := v.(type) { + case nil: + return w.string("") + case string: + return w.string(v) + case []byte: + return w.bytes(v) + case int: + return w.int(int64(v)) + case int8: + return w.int(int64(v)) + case int16: + return w.int(int64(v)) + case int32: + return w.int(int64(v)) + case int64: + return w.int(v) + case uint: + return w.uint(uint64(v)) + case uint8: + return w.uint(uint64(v)) + case uint16: + return w.uint(uint64(v)) + case uint32: + return w.uint(uint64(v)) + case uint64: + return w.uint(v) + case float32: + return w.float(float64(v)) + case float64: + return w.float(v) + case bool: + if v { + return w.int(1) + } else { + return w.int(0) + } + case encoding.BinaryMarshaler: + b, err := v.MarshalBinary() + if err != nil { + return err + } + return w.bytes(b) + default: + return fmt.Errorf( + "redis: can't marshal %T (implement encoding.BinaryMarshaler)", v) + } +} + +func (w *Writer) bytes(b []byte) error { + err := w.wr.WriteByte(StringReply) + if err != nil { + return err + } + + err = w.writeLen(len(b)) + if err != nil { + return err + } + + _, err = w.wr.Write(b) + if err != nil { + return err + } + + return w.crlf() +} + +func (w *Writer) string(s string) error { + return w.bytes(util.StringToBytes(s)) +} + +func (w *Writer) uint(n uint64) error { + w.numBuf = strconv.AppendUint(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) int(n int64) error { + w.numBuf = strconv.AppendInt(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) float(f float64) error { + w.numBuf = strconv.AppendFloat(w.numBuf[:0], f, 'f', -1, 64) + return w.bytes(w.numBuf) +} + +func (w *Writer) crlf() error { + err := w.wr.WriteByte('\r') + if err != nil { + return err + } + return w.wr.WriteByte('\n') +} + +func (w *Writer) Reset(wr io.Writer) { + w.wr.Reset(wr) +} + +func (w *Writer) Flush() error { + return w.wr.Flush() +} diff --git a/vendor/github.com/go-redis/redis/internal/util.go b/vendor/github.com/go-redis/redis/internal/util.go new file mode 100644 index 0000000000..ffd2353e0e --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/util.go @@ -0,0 +1,29 @@ +package internal + +import "github.com/go-redis/redis/internal/util" + +func ToLower(s string) string { + if isLower(s) { + return s + } + + b := make([]byte, len(s)) + for i := range b { + c := s[i] + if c >= 'A' && c <= 'Z' { + c += 'a' - 'A' + } + b[i] = c + } + return util.BytesToString(b) +} + +func isLower(s string) bool { + for i := 0; i < len(s); i++ { + c := s[i] + if c >= 'A' && c <= 'Z' { + return false + } + } + return true +} diff --git a/vendor/github.com/go-redis/redis/internal/util/safe.go b/vendor/github.com/go-redis/redis/internal/util/safe.go new file mode 100644 index 0000000000..1b3060ebc2 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/util/safe.go @@ -0,0 +1,11 @@ +// +build appengine + +package util + +func BytesToString(b []byte) string { + return string(b) +} + +func StringToBytes(s string) []byte { + return []byte(s) +} diff --git a/vendor/github.com/go-redis/redis/internal/util/strconv.go b/vendor/github.com/go-redis/redis/internal/util/strconv.go new file mode 100644 index 0000000000..db5033802a --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/util/strconv.go @@ -0,0 +1,19 @@ +package util + +import "strconv" + +func Atoi(b []byte) (int, error) { + return strconv.Atoi(BytesToString(b)) +} + +func ParseInt(b []byte, base int, bitSize int) (int64, error) { + return strconv.ParseInt(BytesToString(b), base, bitSize) +} + +func ParseUint(b []byte, base int, bitSize int) (uint64, error) { + return strconv.ParseUint(BytesToString(b), base, bitSize) +} + +func ParseFloat(b []byte, bitSize int) (float64, error) { + return strconv.ParseFloat(BytesToString(b), bitSize) +} diff --git a/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/vendor/github.com/go-redis/redis/internal/util/unsafe.go new file mode 100644 index 0000000000..c9868aac2b --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/util/unsafe.go @@ -0,0 +1,22 @@ +// +build !appengine + +package util + +import ( + "unsafe" +) + +// BytesToString converts byte slice to string. +func BytesToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} + +// StringToBytes converts string to byte slice. +func StringToBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer( + &struct { + string + Cap int + }{s, len(s)}, + )) +} diff --git a/vendor/github.com/go-redis/redis/iterator.go b/vendor/github.com/go-redis/redis/iterator.go new file mode 100644 index 0000000000..5d4bedfe5d --- /dev/null +++ b/vendor/github.com/go-redis/redis/iterator.go @@ -0,0 +1,73 @@ +package redis + +import "sync" + +// ScanIterator is used to incrementally iterate over a collection of elements. +// It's safe for concurrent use by multiple goroutines. +type ScanIterator struct { + mu sync.Mutex // protects Scanner and pos + cmd *ScanCmd + pos int +} + +// Err returns the last iterator error, if any. +func (it *ScanIterator) Err() error { + it.mu.Lock() + err := it.cmd.Err() + it.mu.Unlock() + return err +} + +// Next advances the cursor and returns true if more values can be read. +func (it *ScanIterator) Next() bool { + it.mu.Lock() + defer it.mu.Unlock() + + // Instantly return on errors. + if it.cmd.Err() != nil { + return false + } + + // Advance cursor, check if we are still within range. + if it.pos < len(it.cmd.page) { + it.pos++ + return true + } + + for { + // Return if there is no more data to fetch. + if it.cmd.cursor == 0 { + return false + } + + // Fetch next page. + if it.cmd._args[0] == "scan" { + it.cmd._args[1] = it.cmd.cursor + } else { + it.cmd._args[2] = it.cmd.cursor + } + + err := it.cmd.process(it.cmd) + if err != nil { + return false + } + + it.pos = 1 + + // Redis can occasionally return empty page. + if len(it.cmd.page) > 0 { + return true + } + } +} + +// Val returns the key/field at the current cursor position. +func (it *ScanIterator) Val() string { + var v string + it.mu.Lock() + if it.cmd.Err() == nil && it.pos > 0 && it.pos <= len(it.cmd.page) { + v = it.cmd.page[it.pos-1] + } + it.mu.Unlock() + return v +} diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go new file mode 100644 index 0000000000..b6fabf3f24 --- /dev/null +++ b/vendor/github.com/go-redis/redis/options.go @@ -0,0 +1,226 @@ +package redis + +import ( + "crypto/tls" + "errors" + "fmt" + "net" + "net/url" + "runtime" + "strconv" + "strings" + "time" + + "github.com/go-redis/redis/internal/pool" +) + +// Limiter is the interface of a rate limiter or a circuit breaker. +type Limiter interface { + // Allow returns a nil if operation is allowed or an error otherwise. + // If operation is allowed client must report the result of operation + // whether is a success or a failure. + Allow() error + // ReportResult reports the result of previously allowed operation. + // nil indicates a success, non-nil error indicates a failure. + ReportResult(result error) +} + +type Options struct { + // The network type, either tcp or unix. + // Default is tcp. + Network string + // host:port address. + Addr string + + // Dialer creates new network connection and has priority over + // Network and Addr options. + Dialer func() (net.Conn, error) + + // Hook that is called when new connection is established. + OnConnect func(*Conn) error + + // Optional password. Must match the password specified in the + // requirepass server configuration option. + Password string + // Database to be selected after connecting to the server. + DB int + + // Maximum number of retries before giving up. + // Default is to not retry failed commands. + MaxRetries int + // Minimum backoff between each retry. + // Default is 8 milliseconds; -1 disables backoff. + MinRetryBackoff time.Duration + // Maximum backoff between each retry. + // Default is 512 milliseconds; -1 disables backoff. + MaxRetryBackoff time.Duration + + // Dial timeout for establishing new connections. + // Default is 5 seconds. + DialTimeout time.Duration + // Timeout for socket reads. If reached, commands will fail + // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. + // Default is 3 seconds. + ReadTimeout time.Duration + // Timeout for socket writes. If reached, commands will fail + // with a timeout instead of blocking. + // Default is ReadTimeout. + WriteTimeout time.Duration + + // Maximum number of socket connections. + // Default is 10 connections per every CPU as reported by runtime.NumCPU. + PoolSize int + // Minimum number of idle connections which is useful when establishing + // new connection is slow. + MinIdleConns int + // Connection age at which client retires (closes) the connection. + // Default is to not close aged connections. + MaxConnAge time.Duration + // Amount of time client waits for connection if all connections + // are busy before returning an error. + // Default is ReadTimeout + 1 second. + PoolTimeout time.Duration + // Amount of time after which client closes idle connections. + // Should be less than server's timeout. + // Default is 5 minutes. -1 disables idle timeout check. + IdleTimeout time.Duration + // Frequency of idle checks made by idle connections reaper. + // Default is 1 minute. -1 disables idle connections reaper, + // but idle connections are still discarded by the client + // if IdleTimeout is set. + IdleCheckFrequency time.Duration + + // Enables read only queries on slave nodes. + readOnly bool + + // TLS Config to use. When set TLS will be negotiated. + TLSConfig *tls.Config +} + +func (opt *Options) init() { + if opt.Network == "" { + opt.Network = "tcp" + } + if opt.Addr == "" { + opt.Addr = "localhost:6379" + } + if opt.Dialer == nil { + opt.Dialer = func() (net.Conn, error) { + netDialer := &net.Dialer{ + Timeout: opt.DialTimeout, + KeepAlive: 5 * time.Minute, + } + if opt.TLSConfig == nil { + return netDialer.Dial(opt.Network, opt.Addr) + } else { + return tls.DialWithDialer(netDialer, opt.Network, opt.Addr, opt.TLSConfig) + } + } + } + if opt.PoolSize == 0 { + opt.PoolSize = 10 * runtime.NumCPU() + } + if opt.DialTimeout == 0 { + opt.DialTimeout = 5 * time.Second + } + switch opt.ReadTimeout { + case -1: + opt.ReadTimeout = 0 + case 0: + opt.ReadTimeout = 3 * time.Second + } + switch opt.WriteTimeout { + case -1: + opt.WriteTimeout = 0 + case 0: + opt.WriteTimeout = opt.ReadTimeout + } + if opt.PoolTimeout == 0 { + opt.PoolTimeout = opt.ReadTimeout + time.Second + } + if opt.IdleTimeout == 0 { + opt.IdleTimeout = 5 * time.Minute + } + if opt.IdleCheckFrequency == 0 { + opt.IdleCheckFrequency = time.Minute + } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } +} + +// ParseURL parses an URL into Options that can be used to connect to Redis. +func ParseURL(redisURL string) (*Options, error) { + o := &Options{Network: "tcp"} + u, err := url.Parse(redisURL) + if err != nil { + return nil, err + } + + if u.Scheme != "redis" && u.Scheme != "rediss" { + return nil, errors.New("invalid redis URL scheme: " + u.Scheme) + } + + if u.User != nil { + if p, ok := u.User.Password(); ok { + o.Password = p + } + } + + if len(u.Query()) > 0 { + return nil, errors.New("no options supported") + } + + h, p, err := net.SplitHostPort(u.Host) + if err != nil { + h = u.Host + } + if h == "" { + h = "localhost" + } + if p == "" { + p = "6379" + } + o.Addr = net.JoinHostPort(h, p) + + f := strings.FieldsFunc(u.Path, func(r rune) bool { + return r == '/' + }) + switch len(f) { + case 0: + o.DB = 0 + case 1: + if o.DB, err = strconv.Atoi(f[0]); err != nil { + return nil, fmt.Errorf("invalid redis database number: %q", f[0]) + } + default: + return nil, errors.New("invalid redis URL path: " + u.Path) + } + + if u.Scheme == "rediss" { + o.TLSConfig = &tls.Config{ServerName: h} + } + return o, nil +} + +func newConnPool(opt *Options) *pool.ConnPool { + return pool.NewConnPool(&pool.Options{ + Dialer: opt.Dialer, + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + }) +} diff --git a/vendor/github.com/go-redis/redis/pipeline.go b/vendor/github.com/go-redis/redis/pipeline.go new file mode 100644 index 0000000000..b3a8844af0 --- /dev/null +++ b/vendor/github.com/go-redis/redis/pipeline.go @@ -0,0 +1,120 @@ +package redis + +import ( + "sync" + + "github.com/go-redis/redis/internal/pool" +) + +type pipelineExecer func([]Cmder) error + +type Pipeliner interface { + StatefulCmdable + Do(args ...interface{}) *Cmd + Process(cmd Cmder) error + Close() error + Discard() error + Exec() ([]Cmder, error) +} + +var _ Pipeliner = (*Pipeline)(nil) + +// Pipeline implements pipelining as described in +// http://redis.io/topics/pipelining. It's safe for concurrent use +// by multiple goroutines. +type Pipeline struct { + statefulCmdable + + exec pipelineExecer + + mu sync.Mutex + cmds []Cmder + closed bool +} + +func (c *Pipeline) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + _ = c.Process(cmd) + return cmd +} + +// Process queues the cmd for later execution. +func (c *Pipeline) Process(cmd Cmder) error { + c.mu.Lock() + c.cmds = append(c.cmds, cmd) + c.mu.Unlock() + return nil +} + +// Close closes the pipeline, releasing any open resources. +func (c *Pipeline) Close() error { + c.mu.Lock() + c.discard() + c.closed = true + c.mu.Unlock() + return nil +} + +// Discard resets the pipeline and discards queued commands. +func (c *Pipeline) Discard() error { + c.mu.Lock() + err := c.discard() + c.mu.Unlock() + return err +} + +func (c *Pipeline) discard() error { + if c.closed { + return pool.ErrClosed + } + c.cmds = c.cmds[:0] + return nil +} + +// Exec executes all previously queued commands using one +// client-server roundtrip. +// +// Exec always returns list of commands and error of the first failed +// command if any. +func (c *Pipeline) Exec() ([]Cmder, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil, pool.ErrClosed + } + + if len(c.cmds) == 0 { + return nil, nil + } + + cmds := c.cmds + c.cmds = nil + + return cmds, c.exec(cmds) +} + +func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + if err := fn(c); err != nil { + return nil, err + } + cmds, err := c.Exec() + _ = c.Close() + return cmds, err +} + +func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.pipelined(fn) +} + +func (c *Pipeline) Pipeline() Pipeliner { + return c +} + +func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.pipelined(fn) +} + +func (c *Pipeline) TxPipeline() Pipeliner { + return c +} diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go new file mode 100644 index 0000000000..0afb47cda3 --- /dev/null +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -0,0 +1,473 @@ +package redis + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" +) + +var errPingTimeout = errors.New("redis: ping timeout") + +// PubSub implements Pub/Sub commands bas described in +// http://redis.io/topics/pubsub. Message receiving is NOT safe +// for concurrent use by multiple goroutines. +// +// PubSub automatically reconnects to Redis Server and resubscribes +// to the channels in case of network errors. +type PubSub struct { + opt *Options + + newConn func([]string) (*pool.Conn, error) + closeConn func(*pool.Conn) error + + mu sync.Mutex + cn *pool.Conn + channels map[string]struct{} + patterns map[string]struct{} + closed bool + exit chan struct{} + + cmd *Cmd + + chOnce sync.Once + ch chan *Message + ping chan struct{} +} + +func (c *PubSub) init() { + c.exit = make(chan struct{}) +} + +func (c *PubSub) conn() (*pool.Conn, error) { + c.mu.Lock() + cn, err := c._conn(nil) + c.mu.Unlock() + return cn, err +} + +func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { + if c.closed { + return nil, pool.ErrClosed + } + if c.cn != nil { + return c.cn, nil + } + + channels := mapKeys(c.channels) + channels = append(channels, newChannels...) + + cn, err := c.newConn(channels) + if err != nil { + return nil, err + } + + if err := c.resubscribe(cn); err != nil { + _ = c.closeConn(cn) + return nil, err + } + + c.cn = cn + return cn, nil +} + +func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error { + return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) +} + +func (c *PubSub) resubscribe(cn *pool.Conn) error { + var firstErr error + + if len(c.channels) > 0 { + err := c._subscribe(cn, "subscribe", mapKeys(c.channels)) + if err != nil && firstErr == nil { + firstErr = err + } + } + + if len(c.patterns) > 0 { + err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns)) + if err != nil && firstErr == nil { + firstErr = err + } + } + + return firstErr +} + +func mapKeys(m map[string]struct{}) []string { + s := make([]string, len(m)) + i := 0 + for k := range m { + s[i] = k + i++ + } + return s +} + +func (c *PubSub) _subscribe( + cn *pool.Conn, redisCmd string, channels []string, +) error { + args := make([]interface{}, 0, 1+len(channels)) + args = append(args, redisCmd) + for _, channel := range channels { + args = append(args, channel) + } + cmd := NewSliceCmd(args...) + return c.writeCmd(cn, cmd) +} + +func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) { + c.mu.Lock() + c._releaseConn(cn, err, allowTimeout) + c.mu.Unlock() +} + +func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) { + if c.cn != cn { + return + } + if internal.IsBadConn(err, allowTimeout) { + c._reconnect(err) + } +} + +func (c *PubSub) _reconnect(reason error) { + _ = c._closeTheCn(reason) + _, _ = c._conn(nil) +} + +func (c *PubSub) _closeTheCn(reason error) error { + if c.cn == nil { + return nil + } + if !c.closed { + internal.Logf("redis: discarding bad PubSub connection: %s", reason) + } + err := c.closeConn(c.cn) + c.cn = nil + return err +} + +func (c *PubSub) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return pool.ErrClosed + } + c.closed = true + close(c.exit) + + err := c._closeTheCn(pool.ErrClosed) + return err +} + +// Subscribe the client to the specified channels. It returns +// empty subscription if there are no channels. +func (c *PubSub) Subscribe(channels ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + err := c.subscribe("subscribe", channels...) + if c.channels == nil { + c.channels = make(map[string]struct{}) + } + for _, s := range channels { + c.channels[s] = struct{}{} + } + return err +} + +// PSubscribe the client to the given patterns. It returns +// empty subscription if there are no patterns. +func (c *PubSub) PSubscribe(patterns ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + err := c.subscribe("psubscribe", patterns...) + if c.patterns == nil { + c.patterns = make(map[string]struct{}) + } + for _, s := range patterns { + c.patterns[s] = struct{}{} + } + return err +} + +// Unsubscribe the client from the given channels, or from all of +// them if none is given. +func (c *PubSub) Unsubscribe(channels ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, channel := range channels { + delete(c.channels, channel) + } + err := c.subscribe("unsubscribe", channels...) + return err +} + +// PUnsubscribe the client from the given patterns, or from all of +// them if none is given. +func (c *PubSub) PUnsubscribe(patterns ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, pattern := range patterns { + delete(c.patterns, pattern) + } + err := c.subscribe("punsubscribe", patterns...) + return err +} + +func (c *PubSub) subscribe(redisCmd string, channels ...string) error { + cn, err := c._conn(channels) + if err != nil { + return err + } + + err = c._subscribe(cn, redisCmd, channels) + c._releaseConn(cn, err, false) + return err +} + +func (c *PubSub) Ping(payload ...string) error { + args := []interface{}{"ping"} + if len(payload) == 1 { + args = append(args, payload[0]) + } + cmd := NewCmd(args...) + + cn, err := c.conn() + if err != nil { + return err + } + + err = c.writeCmd(cn, cmd) + c.releaseConn(cn, err, false) + return err +} + +// Subscription received after a successful subscription to channel. +type Subscription struct { + // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". + Kind string + // Channel name we have subscribed to. + Channel string + // Number of channels we are currently subscribed to. + Count int +} + +func (m *Subscription) String() string { + return fmt.Sprintf("%s: %s", m.Kind, m.Channel) +} + +// Message received as result of a PUBLISH command issued by another client. +type Message struct { + Channel string + Pattern string + Payload string +} + +func (m *Message) String() string { + return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload) +} + +// Pong received as result of a PING command issued by another client. +type Pong struct { + Payload string +} + +func (p *Pong) String() string { + if p.Payload != "" { + return fmt.Sprintf("Pong<%s>", p.Payload) + } + return "Pong" +} + +func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { + switch reply := reply.(type) { + case string: + return &Pong{ + Payload: reply, + }, nil + case []interface{}: + switch kind := reply[0].(string); kind { + case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": + return &Subscription{ + Kind: kind, + Channel: reply[1].(string), + Count: int(reply[2].(int64)), + }, nil + case "message": + return &Message{ + Channel: reply[1].(string), + Payload: reply[2].(string), + }, nil + case "pmessage": + return &Message{ + Pattern: reply[1].(string), + Channel: reply[2].(string), + Payload: reply[3].(string), + }, nil + case "pong": + return &Pong{ + Payload: reply[1].(string), + }, nil + default: + return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind) + } + default: + return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply) + } +} + +// ReceiveTimeout acts like Receive but returns an error if message +// is not received in time. This is low-level API and in most cases +// Channel should be used instead. +func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { + if c.cmd == nil { + c.cmd = NewCmd() + } + + cn, err := c.conn() + if err != nil { + return nil, err + } + + err = cn.WithReader(timeout, func(rd *proto.Reader) error { + return c.cmd.readReply(rd) + }) + + c.releaseConn(cn, err, timeout > 0) + if err != nil { + return nil, err + } + + return c.newMessage(c.cmd.Val()) +} + +// Receive returns a message as a Subscription, Message, Pong or error. +// See PubSub example for details. This is low-level API and in most cases +// Channel should be used instead. +func (c *PubSub) Receive() (interface{}, error) { + return c.ReceiveTimeout(0) +} + +// ReceiveMessage returns a Message or error ignoring Subscription and Pong +// messages. This is low-level API and in most cases Channel should be used +// instead. +func (c *PubSub) ReceiveMessage() (*Message, error) { + for { + msg, err := c.Receive() + if err != nil { + return nil, err + } + + switch msg := msg.(type) { + case *Subscription: + // Ignore. + case *Pong: + // Ignore. + case *Message: + return msg, nil + default: + err := fmt.Errorf("redis: unknown message: %T", msg) + return nil, err + } + } +} + +// Channel returns a Go channel for concurrently receiving messages. +// It periodically sends Ping messages to test connection health. +// The channel is closed with PubSub. Receive* APIs can not be used +// after channel is created. +func (c *PubSub) Channel() <-chan *Message { + c.chOnce.Do(c.initChannel) + return c.ch +} + +func (c *PubSub) initChannel() { + c.ch = make(chan *Message, 100) + c.ping = make(chan struct{}, 10) + + go func() { + var errCount int + for { + msg, err := c.Receive() + if err != nil { + if err == pool.ErrClosed { + close(c.ch) + return + } + if errCount > 0 { + time.Sleep(c.retryBackoff(errCount)) + } + errCount++ + continue + } + errCount = 0 + + // Any message is as good as a ping. + select { + case c.ping <- struct{}{}: + default: + } + + switch msg := msg.(type) { + case *Subscription: + // Ignore. + case *Pong: + // Ignore. + case *Message: + c.ch <- msg + default: + internal.Logf("redis: unknown message: %T", msg) + } + } + }() + + go func() { + const timeout = 5 * time.Second + + timer := time.NewTimer(timeout) + timer.Stop() + + healthy := true + for { + timer.Reset(timeout) + select { + case <-c.ping: + healthy = true + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + pingErr := c.Ping() + if healthy { + healthy = false + } else { + if pingErr == nil { + pingErr = errPingTimeout + } + c.mu.Lock() + c._reconnect(pingErr) + c.mu.Unlock() + } + case <-c.exit: + return + } + } + }() +} + +func (c *PubSub) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go new file mode 100644 index 0000000000..aca30648f5 --- /dev/null +++ b/vendor/github.com/go-redis/redis/redis.go @@ -0,0 +1,580 @@ +package redis + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" +) + +// Nil reply Redis returns when key does not exist. +const Nil = proto.Nil + +func init() { + SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) +} + +func SetLogger(logger *log.Logger) { + internal.Logger = logger +} + +type baseClient struct { + opt *Options + connPool pool.Pooler + limiter Limiter + + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + + onClose func() error // hook called when client is closed +} + +func (c *baseClient) init() { + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline +} + +func (c *baseClient) String() string { + return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) +} + +func (c *baseClient) newConn() (*pool.Conn, error) { + cn, err := c.connPool.NewConn() + if err != nil { + return nil, err + } + + if cn.InitedAt.IsZero() { + if err := c.initConn(cn); err != nil { + _ = c.connPool.CloseConn(cn) + return nil, err + } + } + + return cn, nil +} + +func (c *baseClient) getConn() (*pool.Conn, error) { + if c.limiter != nil { + err := c.limiter.Allow() + if err != nil { + return nil, err + } + } + + cn, err := c._getConn() + if err != nil { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + return nil, err + } + return cn, nil +} + +func (c *baseClient) _getConn() (*pool.Conn, error) { + cn, err := c.connPool.Get() + if err != nil { + return nil, err + } + + if cn.InitedAt.IsZero() { + err := c.initConn(cn) + if err != nil { + c.connPool.Remove(cn) + return nil, err + } + } + + return cn, nil +} + +func (c *baseClient) releaseConn(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + + if internal.IsBadConn(err, false) { + c.connPool.Remove(cn) + } else { + c.connPool.Put(cn) + } +} + +func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + + if err == nil || internal.IsRedisError(err) { + c.connPool.Put(cn) + } else { + c.connPool.Remove(cn) + } +} + +func (c *baseClient) initConn(cn *pool.Conn) error { + cn.InitedAt = time.Now() + + if c.opt.Password == "" && + c.opt.DB == 0 && + !c.opt.readOnly && + c.opt.OnConnect == nil { + return nil + } + + conn := newConn(c.opt, cn) + _, err := conn.Pipelined(func(pipe Pipeliner) error { + if c.opt.Password != "" { + pipe.Auth(c.opt.Password) + } + + if c.opt.DB > 0 { + pipe.Select(c.opt.DB) + } + + if c.opt.readOnly { + pipe.ReadOnly() + } + + return nil + }) + if err != nil { + return err + } + + if c.opt.OnConnect != nil { + return c.opt.OnConnect(conn) + } + return nil +} + +// Do creates a Cmd from the args and processes the cmd. +func (c *baseClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + _ = c.Process(cmd) + return cmd +} + +// WrapProcess wraps function that processes Redis commands. +func (c *baseClient) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { + c.process = fn(c.process) +} + +func (c *baseClient) Process(cmd Cmder) error { + return c.process(cmd) +} + +func (c *baseClient) defaultProcess(cmd Cmder) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + cn, err := c.getConn() + if err != nil { + cmd.setErr(err) + if internal.IsRetryableError(err, true) { + continue + } + return err + } + + err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) + if err != nil { + c.releaseConn(cn, err) + cmd.setErr(err) + if internal.IsRetryableError(err, true) { + continue + } + return err + } + + err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { + return cmd.readReply(rd) + }) + c.releaseConn(cn, err) + if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { + continue + } + + return err + } + + return cmd.Err() +} + +func (c *baseClient) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { + if timeout := cmd.readTimeout(); timeout != nil { + t := *timeout + if t == 0 { + return 0 + } + return t + 10*time.Second + } + return c.opt.ReadTimeout +} + +// Close closes the client, releasing any open resources. +// +// It is rare to Close a Client, as the Client is meant to be +// long-lived and shared between many goroutines. +func (c *baseClient) Close() error { + var firstErr error + if c.onClose != nil { + if err := c.onClose(); err != nil && firstErr == nil { + firstErr = err + } + } + if err := c.connPool.Close(); err != nil && firstErr == nil { + firstErr = err + } + return firstErr +} + +func (c *baseClient) getAddr() string { + return c.opt.Addr +} + +func (c *baseClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) + c.processTxPipeline = fn(c.processTxPipeline) +} + +func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) +} + +func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) +} + +type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) + +func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + cn, err := c.getConn() + if err != nil { + setCmdsErr(cmds, err) + return err + } + + canRetry, err := p(cn, cmds) + c.releaseConnStrict(cn, err) + + if !canRetry || !internal.IsRetryableError(err, true) { + break + } + } + return cmdsFirstErr(cmds) +} + +func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) + if err != nil { + setCmdsErr(cmds, err) + return true, err + } + + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return pipelineReadCmds(rd, cmds) + }) + return true, err +} + +func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { + for _, cmd := range cmds { + err := cmd.readReply(rd) + if err != nil && !internal.IsRedisError(err) { + return err + } + } + return nil +} + +func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { + setCmdsErr(cmds, err) + return true, err + } + + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := txPipelineReadQueued(rd, cmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return false, err +} + +func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error { + multiExec := make([]Cmder, 0, len(cmds)+2) + multiExec = append(multiExec, NewStatusCmd("MULTI")) + multiExec = append(multiExec, cmds...) + multiExec = append(multiExec, NewSliceCmd("EXEC")) + return writeCmd(wr, multiExec...) +} + +func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error { + // Parse queued replies. + var statusCmd StatusCmd + err := statusCmd.readReply(rd) + if err != nil { + return err + } + + for range cmds { + err = statusCmd.readReply(rd) + if err != nil && !internal.IsRedisError(err) { + return err + } + } + + // Parse number of replies. + line, err := rd.ReadLine() + if err != nil { + if err == Nil { + err = TxFailedErr + } + return err + } + + switch line[0] { + case proto.ErrorReply: + return proto.ParseErrorReply(line) + case proto.ArrayReply: + // ok + default: + err := fmt.Errorf("redis: expected '*', but got line %q", line) + return err + } + + return nil +} + +//------------------------------------------------------------------------------ + +// Client is a Redis client representing a pool of zero or more +// underlying connections. It's safe for concurrent use by multiple +// goroutines. +type Client struct { + baseClient + cmdable + + ctx context.Context +} + +// NewClient returns a client to the Redis Server specified by Options. +func NewClient(opt *Options) *Client { + opt.init() + + c := Client{ + baseClient: baseClient{ + opt: opt, + connPool: newConnPool(opt), + }, + } + c.baseClient.init() + c.init() + + return &c +} + +func (c *Client) init() { + c.cmdable.setProcessor(c.Process) +} + +func (c *Client) Context() context.Context { + if c.ctx != nil { + return c.ctx + } + return context.Background() +} + +func (c *Client) WithContext(ctx context.Context) *Client { + if ctx == nil { + panic("nil context") + } + c2 := c.clone() + c2.ctx = ctx + return c2 +} + +func (c *Client) clone() *Client { + cp := *c + cp.init() + return &cp +} + +// Options returns read-only Options that were used to create the client. +func (c *Client) Options() *Options { + return c.opt +} + +func (c *Client) SetLimiter(l Limiter) *Client { + c.limiter = l + return c +} + +type PoolStats pool.Stats + +// PoolStats returns connection pool stats. +func (c *Client) PoolStats() *PoolStats { + stats := c.connPool.Stats() + return (*PoolStats)(stats) +} + +func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(fn) +} + +func (c *Client) Pipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} + +func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.TxPipeline().Pipelined(fn) +} + +// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. +func (c *Client) TxPipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processTxPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} + +func (c *Client) pubSub() *PubSub { + pubsub := &PubSub{ + opt: c.opt, + + newConn: func(channels []string) (*pool.Conn, error) { + return c.newConn() + }, + closeConn: c.connPool.CloseConn, + } + pubsub.init() + return pubsub +} + +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +// Note that this method does not wait on a response from Redis, so the +// subscription may not be active immediately. To force the connection to wait, +// you may call the Receive() method on the returned *PubSub like so: +// +// sub := client.Subscribe(queryResp) +// iface, err := sub.Receive() +// if err != nil { +// // handle error +// } +// +// // Should be *Subscription, but others are possible if other actions have been +// // taken on sub since it was created. +// switch iface.(type) { +// case *Subscription: +// // subscribe succeeded +// case *Message: +// // received first message +// case *Pong: +// // pong received +// default: +// // handle error +// } +// +// ch := sub.Channel() +func (c *Client) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *Client) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + +//------------------------------------------------------------------------------ + +// Conn is like Client, but its pool contains single connection. +type Conn struct { + baseClient + statefulCmdable +} + +func newConn(opt *Options, cn *pool.Conn) *Conn { + c := Conn{ + baseClient: baseClient{ + opt: opt, + connPool: pool.NewSingleConnPool(cn), + }, + } + c.baseClient.init() + c.statefulCmdable.setProcessor(c.Process) + return &c +} + +func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(fn) +} + +func (c *Conn) Pipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} + +func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.TxPipeline().Pipelined(fn) +} + +// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. +func (c *Conn) TxPipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processTxPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} diff --git a/vendor/github.com/go-redis/redis/result.go b/vendor/github.com/go-redis/redis/result.go new file mode 100644 index 0000000000..e438f260b4 --- /dev/null +++ b/vendor/github.com/go-redis/redis/result.go @@ -0,0 +1,140 @@ +package redis + +import "time" + +// NewCmdResult returns a Cmd initialised with val and err for testing +func NewCmdResult(val interface{}, err error) *Cmd { + var cmd Cmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewSliceResult returns a SliceCmd initialised with val and err for testing +func NewSliceResult(val []interface{}, err error) *SliceCmd { + var cmd SliceCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewStatusResult returns a StatusCmd initialised with val and err for testing +func NewStatusResult(val string, err error) *StatusCmd { + var cmd StatusCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewIntResult returns an IntCmd initialised with val and err for testing +func NewIntResult(val int64, err error) *IntCmd { + var cmd IntCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewDurationResult returns a DurationCmd initialised with val and err for testing +func NewDurationResult(val time.Duration, err error) *DurationCmd { + var cmd DurationCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewBoolResult returns a BoolCmd initialised with val and err for testing +func NewBoolResult(val bool, err error) *BoolCmd { + var cmd BoolCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewStringResult returns a StringCmd initialised with val and err for testing +func NewStringResult(val string, err error) *StringCmd { + var cmd StringCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewFloatResult returns a FloatCmd initialised with val and err for testing +func NewFloatResult(val float64, err error) *FloatCmd { + var cmd FloatCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewStringSliceResult returns a StringSliceCmd initialised with val and err for testing +func NewStringSliceResult(val []string, err error) *StringSliceCmd { + var cmd StringSliceCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewBoolSliceResult returns a BoolSliceCmd initialised with val and err for testing +func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd { + var cmd BoolSliceCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewStringStringMapResult returns a StringStringMapCmd initialised with val and err for testing +func NewStringStringMapResult(val map[string]string, err error) *StringStringMapCmd { + var cmd StringStringMapCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewStringIntMapCmdResult returns a StringIntMapCmd initialised with val and err for testing +func NewStringIntMapCmdResult(val map[string]int64, err error) *StringIntMapCmd { + var cmd StringIntMapCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewZSliceCmdResult returns a ZSliceCmd initialised with val and err for testing +func NewZSliceCmdResult(val []Z, err error) *ZSliceCmd { + var cmd ZSliceCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewScanCmdResult returns a ScanCmd initialised with val and err for testing +func NewScanCmdResult(keys []string, cursor uint64, err error) *ScanCmd { + var cmd ScanCmd + cmd.page = keys + cmd.cursor = cursor + cmd.setErr(err) + return &cmd +} + +// NewClusterSlotsCmdResult returns a ClusterSlotsCmd initialised with val and err for testing +func NewClusterSlotsCmdResult(val []ClusterSlot, err error) *ClusterSlotsCmd { + var cmd ClusterSlotsCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} + +// NewGeoLocationCmdResult returns a GeoLocationCmd initialised with val and err for testing +func NewGeoLocationCmdResult(val []GeoLocation, err error) *GeoLocationCmd { + var cmd GeoLocationCmd + cmd.locations = val + cmd.setErr(err) + return &cmd +} + +// NewCommandsInfoCmdResult returns a CommandsInfoCmd initialised with val and err for testing +func NewCommandsInfoCmdResult(val map[string]*CommandInfo, err error) *CommandsInfoCmd { + var cmd CommandsInfoCmd + cmd.val = val + cmd.setErr(err) + return &cmd +} diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go new file mode 100644 index 0000000000..250e5f640c --- /dev/null +++ b/vendor/github.com/go-redis/redis/ring.go @@ -0,0 +1,658 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/consistenthash" + "github.com/go-redis/redis/internal/hashtag" + "github.com/go-redis/redis/internal/pool" +) + +// Hash is type of hash function used in consistent hash. +type Hash consistenthash.Hash + +var errRingShardsDown = errors.New("redis: all ring shards are down") + +// RingOptions are used to configure a ring client and should be +// passed to NewRing. +type RingOptions struct { + // Map of name => host:port addresses of ring shards. + Addrs map[string]string + + // Frequency of PING commands sent to check shards availability. + // Shard is considered down after 3 subsequent failed checks. + HeartbeatFrequency time.Duration + + // Hash function used in consistent hash. + // Default is crc32.ChecksumIEEE. + Hash Hash + + // Number of replicas in consistent hash. + // Default is 100 replicas. + // + // Higher number of replicas will provide less deviation, that is keys will be + // distributed to nodes more evenly. + // + // Following is deviation for common nreplicas: + // -------------------------------------------------------- + // | nreplicas | standard error | 99% confidence interval | + // | 10 | 0.3152 | (0.37, 1.98) | + // | 100 | 0.0997 | (0.76, 1.28) | + // | 1000 | 0.0316 | (0.92, 1.09) | + // -------------------------------------------------------- + // + // See https://arxiv.org/abs/1406.2294 for reference + HashReplicas int + + // Following options are copied from Options struct. + + OnConnect func(*Conn) error + + DB int + Password string + + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration +} + +func (opt *RingOptions) init() { + if opt.HeartbeatFrequency == 0 { + opt.HeartbeatFrequency = 500 * time.Millisecond + } + + if opt.HashReplicas == 0 { + opt.HashReplicas = 100 + } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } +} + +func (opt *RingOptions) clientOptions() *Options { + return &Options{ + OnConnect: opt.OnConnect, + + DB: opt.DB, + Password: opt.Password, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + } +} + +//------------------------------------------------------------------------------ + +type ringShard struct { + Client *Client + down int32 +} + +func (shard *ringShard) String() string { + var state string + if shard.IsUp() { + state = "up" + } else { + state = "down" + } + return fmt.Sprintf("%s is %s", shard.Client, state) +} + +func (shard *ringShard) IsDown() bool { + const threshold = 3 + return atomic.LoadInt32(&shard.down) >= threshold +} + +func (shard *ringShard) IsUp() bool { + return !shard.IsDown() +} + +// Vote votes to set shard state and returns true if state was changed. +func (shard *ringShard) Vote(up bool) bool { + if up { + changed := shard.IsDown() + atomic.StoreInt32(&shard.down, 0) + return changed + } + + if shard.IsDown() { + return false + } + + atomic.AddInt32(&shard.down, 1) + return shard.IsDown() +} + +//------------------------------------------------------------------------------ + +type ringShards struct { + opt *RingOptions + + mu sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard // read only + list []*ringShard // read only + len int + closed bool +} + +func newRingShards(opt *RingOptions) *ringShards { + return &ringShards{ + opt: opt, + + hash: newConsistentHash(opt), + shards: make(map[string]*ringShard), + } +} + +func (c *ringShards) Add(name string, cl *Client) { + shard := &ringShard{Client: cl} + c.hash.Add(name) + c.shards[name] = shard + c.list = append(c.list, shard) +} + +func (c *ringShards) List() []*ringShard { + c.mu.RLock() + list := c.list + c.mu.RUnlock() + return list +} + +func (c *ringShards) Hash(key string) string { + c.mu.RLock() + hash := c.hash.Get(key) + c.mu.RUnlock() + return hash +} + +func (c *ringShards) GetByKey(key string) (*ringShard, error) { + key = hashtag.Key(key) + + c.mu.RLock() + + if c.closed { + c.mu.RUnlock() + return nil, pool.ErrClosed + } + + hash := c.hash.Get(key) + if hash == "" { + c.mu.RUnlock() + return nil, errRingShardsDown + } + + shard := c.shards[hash] + c.mu.RUnlock() + + return shard, nil +} + +func (c *ringShards) GetByHash(name string) (*ringShard, error) { + if name == "" { + return c.Random() + } + + c.mu.RLock() + shard := c.shards[name] + c.mu.RUnlock() + return shard, nil +} + +func (c *ringShards) Random() (*ringShard, error) { + return c.GetByKey(strconv.Itoa(rand.Int())) +} + +// heartbeat monitors state of each shard in the ring. +func (c *ringShards) Heartbeat(frequency time.Duration) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + for range ticker.C { + var rebalance bool + + c.mu.RLock() + + if c.closed { + c.mu.RUnlock() + break + } + + shards := c.list + c.mu.RUnlock() + + for _, shard := range shards { + err := shard.Client.Ping().Err() + if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { + internal.Logf("ring shard state changed: %s", shard) + rebalance = true + } + } + + if rebalance { + c.rebalance() + } + } +} + +// rebalance removes dead shards from the Ring. +func (c *ringShards) rebalance() { + hash := newConsistentHash(c.opt) + var shardsNum int + for name, shard := range c.shards { + if shard.IsUp() { + hash.Add(name) + shardsNum++ + } + } + + c.mu.Lock() + c.hash = hash + c.len = shardsNum + c.mu.Unlock() +} + +func (c *ringShards) Len() int { + c.mu.RLock() + l := c.len + c.mu.RUnlock() + return l +} + +func (c *ringShards) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil + } + c.closed = true + + var firstErr error + for _, shard := range c.shards { + if err := shard.Client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + c.hash = nil + c.shards = nil + c.list = nil + + return firstErr +} + +//------------------------------------------------------------------------------ + +// Ring is a Redis client that uses consistent hashing to distribute +// keys across multiple Redis servers (shards). It's safe for +// concurrent use by multiple goroutines. +// +// Ring monitors the state of each shard and removes dead shards from +// the ring. When a shard comes online it is added back to the ring. This +// gives you maximum availability and partition tolerance, but no +// consistency between different shards or even clients. Each client +// uses shards that are available to the client and does not do any +// coordination when shard state is changed. +// +// Ring should be used when you need multiple Redis servers for caching +// and can tolerate losing data when one of the servers dies. +// Otherwise you should use Redis Cluster. +type Ring struct { + cmdable + + ctx context.Context + + opt *RingOptions + shards *ringShards + cmdsInfoCache *cmdsInfoCache + + process func(Cmder) error + processPipeline func([]Cmder) error +} + +func NewRing(opt *RingOptions) *Ring { + opt.init() + + ring := &Ring{ + opt: opt, + shards: newRingShards(opt), + } + ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + + ring.process = ring.defaultProcess + ring.processPipeline = ring.defaultProcessPipeline + ring.cmdable.setProcessor(ring.Process) + + for name, addr := range opt.Addrs { + clopt := opt.clientOptions() + clopt.Addr = addr + ring.shards.Add(name, NewClient(clopt)) + } + + go ring.shards.Heartbeat(opt.HeartbeatFrequency) + + return ring +} + +func (c *Ring) Context() context.Context { + if c.ctx != nil { + return c.ctx + } + return context.Background() +} + +func (c *Ring) WithContext(ctx context.Context) *Ring { + if ctx == nil { + panic("nil context") + } + c2 := c.copy() + c2.ctx = ctx + return c2 +} + +func (c *Ring) copy() *Ring { + cp := *c + return &cp +} + +// Options returns read-only Options that were used to create the client. +func (c *Ring) Options() *RingOptions { + return c.opt +} + +func (c *Ring) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +// PoolStats returns accumulated connection pool stats. +func (c *Ring) PoolStats() *PoolStats { + shards := c.shards.List() + var acc PoolStats + for _, shard := range shards { + s := shard.Client.connPool.Stats() + acc.Hits += s.Hits + acc.Misses += s.Misses + acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns + acc.IdleConns += s.IdleConns + } + return &acc +} + +// Len returns the current number of shards in the ring. +func (c *Ring) Len() int { + return c.shards.Len() +} + +// Subscribe subscribes the client to the specified channels. +func (c *Ring) Subscribe(channels ...string) *PubSub { + if len(channels) == 0 { + panic("at least one channel is required") + } + + shard, err := c.shards.GetByKey(channels[0]) + if err != nil { + // TODO: return PubSub with sticky error + panic(err) + } + return shard.Client.Subscribe(channels...) +} + +// PSubscribe subscribes the client to the given patterns. +func (c *Ring) PSubscribe(channels ...string) *PubSub { + if len(channels) == 0 { + panic("at least one channel is required") + } + + shard, err := c.shards.GetByKey(channels[0]) + if err != nil { + // TODO: return PubSub with sticky error + panic(err) + } + return shard.Client.PSubscribe(channels...) +} + +// ForEachShard concurrently calls the fn on each live shard in the ring. +// It returns the first error if any. +func (c *Ring) ForEachShard(fn func(client *Client) error) error { + shards := c.shards.List() + var wg sync.WaitGroup + errCh := make(chan error, 1) + for _, shard := range shards { + if shard.IsDown() { + continue + } + + wg.Add(1) + go func(shard *ringShard) { + defer wg.Done() + err := fn(shard.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + }(shard) + } + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } +} + +func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) { + shards := c.shards.List() + firstErr := errRingShardsDown + for _, shard := range shards { + cmdsInfo, err := shard.Client.Command().Result() + if err == nil { + return cmdsInfo, nil + } + if firstErr == nil { + firstErr = err + } + } + return nil, firstErr +} + +func (c *Ring) cmdInfo(name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get() + if err != nil { + return nil + } + info := cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info +} + +func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { + cmdInfo := c.cmdInfo(cmd.Name()) + pos := cmdFirstKeyPos(cmd, cmdInfo) + if pos == 0 { + return c.shards.Random() + } + firstKey := cmd.stringArg(pos) + return c.shards.GetByKey(firstKey) +} + +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *Ring) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { + c.process = fn(c.process) +} + +func (c *Ring) Process(cmd Cmder) error { + return c.process(cmd) +} + +func (c *Ring) defaultProcess(cmd Cmder) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + shard, err := c.cmdShard(cmd) + if err != nil { + cmd.setErr(err) + return err + } + + err = shard.Client.Process(cmd) + if err == nil { + return nil + } + if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { + return err + } + } + return cmd.Err() +} + +func (c *Ring) Pipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processPipeline, + } + pipe.cmdable.setProcessor(pipe.Process) + return &pipe +} + +func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(fn) +} + +func (c *Ring) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { + cmdsMap := make(map[string][]Cmder) + for _, cmd := range cmds { + cmdInfo := c.cmdInfo(cmd.Name()) + hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) + if hash != "" { + hash = c.shards.Hash(hashtag.Key(hash)) + } + cmdsMap[hash] = append(cmdsMap[hash], cmd) + } + + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + var mu sync.Mutex + var failedCmdsMap map[string][]Cmder + var wg sync.WaitGroup + + for hash, cmds := range cmdsMap { + wg.Add(1) + go func(hash string, cmds []Cmder) { + defer wg.Done() + + shard, err := c.shards.GetByHash(hash) + if err != nil { + setCmdsErr(cmds, err) + return + } + + cn, err := shard.Client.getConn() + if err != nil { + setCmdsErr(cmds, err) + return + } + + canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) + shard.Client.releaseConnStrict(cn, err) + + if canRetry && internal.IsRetryableError(err, true) { + mu.Lock() + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) + } + failedCmdsMap[hash] = cmds + mu.Unlock() + } + }(hash, cmds) + } + + wg.Wait() + if len(failedCmdsMap) == 0 { + break + } + cmdsMap = failedCmdsMap + } + + return cmdsFirstErr(cmds) +} + +func (c *Ring) TxPipeline() Pipeliner { + panic("not implemented") +} + +func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + panic("not implemented") +} + +// Close closes the ring client, releasing any open resources. +// +// It is rare to Close a Ring, as the Ring is meant to be long-lived +// and shared between many goroutines. +func (c *Ring) Close() error { + return c.shards.Close() +} + +func newConsistentHash(opt *RingOptions) *consistenthash.Map { + return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) +} diff --git a/vendor/github.com/go-redis/redis/script.go b/vendor/github.com/go-redis/redis/script.go new file mode 100644 index 0000000000..09f36d9320 --- /dev/null +++ b/vendor/github.com/go-redis/redis/script.go @@ -0,0 +1,62 @@ +package redis + +import ( + "crypto/sha1" + "encoding/hex" + "io" + "strings" +) + +type scripter interface { + Eval(script string, keys []string, args ...interface{}) *Cmd + EvalSha(sha1 string, keys []string, args ...interface{}) *Cmd + ScriptExists(hashes ...string) *BoolSliceCmd + ScriptLoad(script string) *StringCmd +} + +var _ scripter = (*Client)(nil) +var _ scripter = (*Ring)(nil) +var _ scripter = (*ClusterClient)(nil) + +type Script struct { + src, hash string +} + +func NewScript(src string) *Script { + h := sha1.New() + io.WriteString(h, src) + return &Script{ + src: src, + hash: hex.EncodeToString(h.Sum(nil)), + } +} + +func (s *Script) Hash() string { + return s.hash +} + +func (s *Script) Load(c scripter) *StringCmd { + return c.ScriptLoad(s.src) +} + +func (s *Script) Exists(c scripter) *BoolSliceCmd { + return c.ScriptExists(s.hash) +} + +func (s *Script) Eval(c scripter, keys []string, args ...interface{}) *Cmd { + return c.Eval(s.src, keys, args...) +} + +func (s *Script) EvalSha(c scripter, keys []string, args ...interface{}) *Cmd { + return c.EvalSha(s.hash, keys, args...) +} + +// Run optimistically uses EVALSHA to run the script. If script does not exist +// it is retried using EVAL. +func (s *Script) Run(c scripter, keys []string, args ...interface{}) *Cmd { + r := s.EvalSha(c, keys, args...) + if err := r.Err(); err != nil && strings.HasPrefix(err.Error(), "NOSCRIPT ") { + return s.Eval(c, keys, args...) + } + return r +} diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go new file mode 100644 index 0000000000..7cbb90bdb4 --- /dev/null +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -0,0 +1,399 @@ +package redis + +import ( + "crypto/tls" + "errors" + "net" + "strings" + "sync" + "time" + + "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/pool" +) + +//------------------------------------------------------------------------------ + +// FailoverOptions are used to configure a failover client and should +// be passed to NewFailoverClient. +type FailoverOptions struct { + // The master name. + MasterName string + // A seed list of host:port addresses of sentinel nodes. + SentinelAddrs []string + + // Following options are copied from Options struct. + + OnConnect func(*Conn) error + + Password string + DB int + + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration + + TLSConfig *tls.Config +} + +func (opt *FailoverOptions) options() *Options { + return &Options{ + Addr: "FailoverClient", + + OnConnect: opt.OnConnect, + + DB: opt.DB, + Password: opt.Password, + + MaxRetries: opt.MaxRetries, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + + TLSConfig: opt.TLSConfig, + } +} + +// NewFailoverClient returns a Redis client that uses Redis Sentinel +// for automatic failover. It's safe for concurrent use by multiple +// goroutines. +func NewFailoverClient(failoverOpt *FailoverOptions) *Client { + opt := failoverOpt.options() + opt.init() + + failover := &sentinelFailover{ + masterName: failoverOpt.MasterName, + sentinelAddrs: failoverOpt.SentinelAddrs, + + opt: opt, + } + + c := Client{ + baseClient: baseClient{ + opt: opt, + connPool: failover.Pool(), + + onClose: func() error { + return failover.Close() + }, + }, + } + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + + return &c +} + +//------------------------------------------------------------------------------ + +type SentinelClient struct { + baseClient +} + +func NewSentinelClient(opt *Options) *SentinelClient { + opt.init() + c := &SentinelClient{ + baseClient: baseClient{ + opt: opt, + connPool: newConnPool(opt), + }, + } + c.baseClient.init() + return c +} + +func (c *SentinelClient) pubSub() *PubSub { + pubsub := &PubSub{ + opt: c.opt, + + newConn: func(channels []string) (*pool.Conn, error) { + return c.newConn() + }, + closeConn: c.connPool.CloseConn, + } + pubsub.init() + return pubsub +} + +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *SentinelClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *SentinelClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + +func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { + cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name) + c.Process(cmd) + return cmd +} + +func (c *SentinelClient) Sentinels(name string) *SliceCmd { + cmd := NewSliceCmd("sentinel", "sentinels", name) + c.Process(cmd) + return cmd +} + +// Failover forces a failover as if the master was not reachable, and without +// asking for agreement to other Sentinels. +func (c *SentinelClient) Failover(name string) *StatusCmd { + cmd := NewStatusCmd("sentinel", "failover", name) + c.Process(cmd) + return cmd +} + +// Reset resets all the masters with matching name. The pattern argument is a +// glob-style pattern. The reset process clears any previous state in a master +// (including a failover in progress), and removes every slave and sentinel +// already discovered and associated with the master. +func (c *SentinelClient) Reset(pattern string) *IntCmd { + cmd := NewIntCmd("sentinel", "reset", pattern) + c.Process(cmd) + return cmd +} + +type sentinelFailover struct { + sentinelAddrs []string + + opt *Options + + pool *pool.ConnPool + poolOnce sync.Once + + mu sync.RWMutex + masterName string + _masterAddr string + sentinel *SentinelClient + pubsub *PubSub +} + +func (c *sentinelFailover) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.sentinel != nil { + return c.closeSentinel() + } + return nil +} + +func (c *sentinelFailover) Pool() *pool.ConnPool { + c.poolOnce.Do(func() { + c.opt.Dialer = c.dial + c.pool = newConnPool(c.opt) + }) + return c.pool +} + +func (c *sentinelFailover) dial() (net.Conn, error) { + addr, err := c.MasterAddr() + if err != nil { + return nil, err + } + return net.DialTimeout("tcp", addr, c.opt.DialTimeout) +} + +func (c *sentinelFailover) MasterAddr() (string, error) { + addr, err := c.masterAddr() + if err != nil { + return "", err + } + c.switchMaster(addr) + return addr, nil +} + +func (c *sentinelFailover) masterAddr() (string, error) { + addr := c.getMasterAddr() + if addr != "" { + return addr, nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + for i, sentinelAddr := range c.sentinelAddrs { + sentinel := NewSentinelClient(&Options{ + Addr: sentinelAddr, + + MaxRetries: c.opt.MaxRetries, + + DialTimeout: c.opt.DialTimeout, + ReadTimeout: c.opt.ReadTimeout, + WriteTimeout: c.opt.WriteTimeout, + + PoolSize: c.opt.PoolSize, + PoolTimeout: c.opt.PoolTimeout, + IdleTimeout: c.opt.IdleTimeout, + IdleCheckFrequency: c.opt.IdleCheckFrequency, + + TLSConfig: c.opt.TLSConfig, + }) + + masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result() + if err != nil { + internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", + c.masterName, err) + _ = sentinel.Close() + continue + } + + // Push working sentinel to the top. + c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] + c.setSentinel(sentinel) + + addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) + return addr, nil + } + + return "", errors.New("redis: all sentinels are unreachable") +} + +func (c *sentinelFailover) getMasterAddr() string { + c.mu.RLock() + sentinel := c.sentinel + c.mu.RUnlock() + + if sentinel == nil { + return "" + } + + addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() + if err != nil { + internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", + c.masterName, err) + c.mu.Lock() + if c.sentinel == sentinel { + c.closeSentinel() + } + c.mu.Unlock() + return "" + } + + return net.JoinHostPort(addr[0], addr[1]) +} + +func (c *sentinelFailover) switchMaster(addr string) { + c.mu.RLock() + masterAddr := c._masterAddr + c.mu.RUnlock() + if masterAddr == addr { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + internal.Logf("sentinel: new master=%q addr=%q", + c.masterName, addr) + _ = c.Pool().Filter(func(cn *pool.Conn) bool { + return cn.RemoteAddr().String() != addr + }) + c._masterAddr = addr +} + +func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { + c.discoverSentinels(sentinel) + c.sentinel = sentinel + + c.pubsub = sentinel.Subscribe("+switch-master") + go c.listen(c.pubsub) +} + +func (c *sentinelFailover) closeSentinel() error { + var firstErr error + + err := c.pubsub.Close() + if err != nil && firstErr == err { + firstErr = err + } + c.pubsub = nil + + err = c.sentinel.Close() + if err != nil && firstErr == err { + firstErr = err + } + c.sentinel = nil + + return firstErr +} + +func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { + sentinels, err := sentinel.Sentinels(c.masterName).Result() + if err != nil { + internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) + return + } + for _, sentinel := range sentinels { + vals := sentinel.([]interface{}) + for i := 0; i < len(vals); i += 2 { + key := vals[i].(string) + if key == "name" { + sentinelAddr := vals[i+1].(string) + if !contains(c.sentinelAddrs, sentinelAddr) { + internal.Logf("sentinel: discovered new sentinel=%q for master=%q", + sentinelAddr, c.masterName) + c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) + } + } + } + } +} + +func (c *sentinelFailover) listen(pubsub *PubSub) { + ch := pubsub.Channel() + for { + msg, ok := <-ch + if !ok { + break + } + + switch msg.Channel { + case "+switch-master": + parts := strings.Split(msg.Payload, " ") + if parts[0] != c.masterName { + internal.Logf("sentinel: ignore addr for master=%q", parts[0]) + continue + } + addr := net.JoinHostPort(parts[3], parts[4]) + c.switchMaster(addr) + } + } +} + +func contains(slice []string, str string) bool { + for _, s := range slice { + if s == str { + return true + } + } + return false +} diff --git a/vendor/github.com/go-redis/redis/tx.go b/vendor/github.com/go-redis/redis/tx.go new file mode 100644 index 0000000000..fb3e633102 --- /dev/null +++ b/vendor/github.com/go-redis/redis/tx.go @@ -0,0 +1,110 @@ +package redis + +import ( + "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" +) + +// TxFailedErr transaction redis failed. +const TxFailedErr = proto.RedisError("redis: transaction failed") + +// Tx implements Redis transactions as described in +// http://redis.io/topics/transactions. It's NOT safe for concurrent use +// by multiple goroutines, because Exec resets list of watched keys. +// If you don't need WATCH it is better to use Pipeline. +type Tx struct { + statefulCmdable + baseClient +} + +func (c *Client) newTx() *Tx { + tx := Tx{ + baseClient: baseClient{ + opt: c.opt, + connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true), + }, + } + tx.baseClient.init() + tx.statefulCmdable.setProcessor(tx.Process) + return &tx +} + +// Watch prepares a transaction and marks the keys to be watched +// for conditional execution if there are any keys. +// +// The transaction is automatically closed when fn exits. +func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { + tx := c.newTx() + if len(keys) > 0 { + if err := tx.Watch(keys...).Err(); err != nil { + _ = tx.Close() + return err + } + } + + err := fn(tx) + _ = tx.Close() + return err +} + +// Close closes the transaction, releasing any open resources. +func (c *Tx) Close() error { + _ = c.Unwatch().Err() + return c.baseClient.Close() +} + +// Watch marks the keys to be watched for conditional execution +// of a transaction. +func (c *Tx) Watch(keys ...string) *StatusCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "watch" + for i, key := range keys { + args[1+i] = key + } + cmd := NewStatusCmd(args...) + c.Process(cmd) + return cmd +} + +// Unwatch flushes all the previously watched keys for a transaction. +func (c *Tx) Unwatch(keys ...string) *StatusCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "unwatch" + for i, key := range keys { + args[1+i] = key + } + cmd := NewStatusCmd(args...) + c.Process(cmd) + return cmd +} + +// Pipeline creates a new pipeline. It is more convenient to use Pipelined. +func (c *Tx) Pipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processTxPipeline, + } + pipe.statefulCmdable.setProcessor(pipe.Process) + return &pipe +} + +// Pipelined executes commands queued in the fn in a transaction. +// +// When using WATCH, EXEC will execute commands only if the watched keys +// were not modified, allowing for a check-and-set mechanism. +// +// Exec always returns list of commands. If transaction fails +// TxFailedErr is returned. Otherwise Exec returns an error of the first +// failed command or nil. +func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(fn) +} + +// TxPipelined is an alias for Pipelined. +func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipelined(fn) +} + +// TxPipeline is an alias for Pipeline. +func (c *Tx) TxPipeline() Pipeliner { + return c.Pipeline() +} diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go new file mode 100644 index 0000000000..a607562464 --- /dev/null +++ b/vendor/github.com/go-redis/redis/universal.go @@ -0,0 +1,179 @@ +package redis + +import ( + "crypto/tls" + "time" +) + +// UniversalOptions information is required by UniversalClient to establish +// connections. +type UniversalOptions struct { + // Either a single address or a seed list of host:port addresses + // of cluster/sentinel nodes. + Addrs []string + + // Database to be selected after connecting to the server. + // Only single-node and failover clients. + DB int + + // Common options. + + OnConnect func(*Conn) error + Password string + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration + TLSConfig *tls.Config + + // Only cluster clients. + + MaxRedirects int + ReadOnly bool + RouteByLatency bool + RouteRandomly bool + + // The sentinel master name. + // Only failover clients. + MasterName string +} + +func (o *UniversalOptions) cluster() *ClusterOptions { + if len(o.Addrs) == 0 { + o.Addrs = []string{"127.0.0.1:6379"} + } + + return &ClusterOptions{ + Addrs: o.Addrs, + OnConnect: o.OnConnect, + + Password: o.Password, + + MaxRedirects: o.MaxRedirects, + ReadOnly: o.ReadOnly, + RouteByLatency: o.RouteByLatency, + RouteRandomly: o.RouteRandomly, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, + PoolTimeout: o.PoolTimeout, + IdleTimeout: o.IdleTimeout, + IdleCheckFrequency: o.IdleCheckFrequency, + + TLSConfig: o.TLSConfig, + } +} + +func (o *UniversalOptions) failover() *FailoverOptions { + if len(o.Addrs) == 0 { + o.Addrs = []string{"127.0.0.1:26379"} + } + + return &FailoverOptions{ + SentinelAddrs: o.Addrs, + MasterName: o.MasterName, + OnConnect: o.OnConnect, + + DB: o.DB, + Password: o.Password, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + + PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, + PoolTimeout: o.PoolTimeout, + IdleTimeout: o.IdleTimeout, + IdleCheckFrequency: o.IdleCheckFrequency, + + TLSConfig: o.TLSConfig, + } +} + +func (o *UniversalOptions) simple() *Options { + addr := "127.0.0.1:6379" + if len(o.Addrs) > 0 { + addr = o.Addrs[0] + } + + return &Options{ + Addr: addr, + OnConnect: o.OnConnect, + + DB: o.DB, + Password: o.Password, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + + PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, + PoolTimeout: o.PoolTimeout, + IdleTimeout: o.IdleTimeout, + IdleCheckFrequency: o.IdleCheckFrequency, + + TLSConfig: o.TLSConfig, + } +} + +// -------------------------------------------------------------------- + +// UniversalClient is an abstract client which - based on the provided options - +// can connect to either clusters, or sentinel-backed failover instances or simple +// single-instance servers. This can be useful for testing cluster-specific +// applications locally. +type UniversalClient interface { + Cmdable + Watch(fn func(*Tx) error, keys ...string) error + Process(cmd Cmder) error + WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) + Subscribe(channels ...string) *PubSub + PSubscribe(channels ...string) *PubSub + Close() error +} + +var _ UniversalClient = (*Client)(nil) +var _ UniversalClient = (*ClusterClient)(nil) + +// NewUniversalClient returns a new multi client. The type of client returned depends +// on the following three conditions: +// +// 1. if a MasterName is passed a sentinel-backed FailoverClient will be returned +// 2. if the number of Addrs is two or more, a ClusterClient will be returned +// 3. otherwise, a single-node redis Client will be returned. +func NewUniversalClient(opts *UniversalOptions) UniversalClient { + if opts.MasterName != "" { + return NewFailoverClient(opts.failover()) + } else if len(opts.Addrs) > 1 { + return NewClusterClient(opts.cluster()) + } + return NewClient(opts.simple()) +} |