From ac97ea573c1b10d03e72775e8f74b9fe5453bfc8 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 10 Feb 2021 22:28:32 +0100 Subject: [Vendor] Update go-redis to v8.5.0 (#13749) * Update go-redis to v8.4.0 * github.com/go-redis/redis/v8 v8.4.0 -> v8.5.0 * Apply suggestions from code review Co-authored-by: zeripath * TODO * Use the Queue termination channel as the default context for pushes Signed-off-by: Andrew Thornton * missed one Signed-off-by: Andrew Thornton Co-authored-by: zeripath --- vendor/github.com/go-redis/redis/v7/.gitignore | 2 - vendor/github.com/go-redis/redis/v7/.golangci.yml | 15 - vendor/github.com/go-redis/redis/v7/.travis.yml | 22 - vendor/github.com/go-redis/redis/v7/CHANGELOG.md | 46 - vendor/github.com/go-redis/redis/v7/LICENSE | 25 - vendor/github.com/go-redis/redis/v7/Makefile | 20 - vendor/github.com/go-redis/redis/v7/README.md | 128 - vendor/github.com/go-redis/redis/v7/cluster.go | 1669 ------------ .../go-redis/redis/v7/cluster_commands.go | 22 - vendor/github.com/go-redis/redis/v7/command.go | 2064 --------------- vendor/github.com/go-redis/redis/v7/commands.go | 2643 -------------------- vendor/github.com/go-redis/redis/v7/doc.go | 4 - vendor/github.com/go-redis/redis/v7/error.go | 108 - vendor/github.com/go-redis/redis/v7/go.mod | 15 - vendor/github.com/go-redis/redis/v7/go.sum | 47 - .../v7/internal/consistenthash/consistenthash.go | 81 - .../go-redis/redis/v7/internal/hashtag/hashtag.go | 77 - .../go-redis/redis/v7/internal/internal.go | 24 - .../github.com/go-redis/redis/v7/internal/log.go | 8 - .../github.com/go-redis/redis/v7/internal/once.go | 60 - .../go-redis/redis/v7/internal/pool/conn.go | 118 - .../go-redis/redis/v7/internal/pool/pool.go | 517 ---- .../go-redis/redis/v7/internal/pool/pool_single.go | 208 -- .../go-redis/redis/v7/internal/pool/pool_sticky.go | 112 - .../go-redis/redis/v7/internal/proto/reader.go | 314 --- .../go-redis/redis/v7/internal/proto/scan.go | 166 -- .../go-redis/redis/v7/internal/proto/writer.go | 165 -- .../github.com/go-redis/redis/v7/internal/util.go | 56 - .../go-redis/redis/v7/internal/util/safe.go | 11 - .../go-redis/redis/v7/internal/util/strconv.go | 19 - .../go-redis/redis/v7/internal/util/unsafe.go | 22 - vendor/github.com/go-redis/redis/v7/iterator.go | 75 - vendor/github.com/go-redis/redis/v7/options.go | 249 -- vendor/github.com/go-redis/redis/v7/pipeline.go | 142 -- vendor/github.com/go-redis/redis/v7/pubsub.go | 595 ----- vendor/github.com/go-redis/redis/v7/redis.go | 758 ------ vendor/github.com/go-redis/redis/v7/result.go | 180 -- vendor/github.com/go-redis/redis/v7/ring.go | 726 ------ vendor/github.com/go-redis/redis/v7/script.go | 62 - vendor/github.com/go-redis/redis/v7/sentinel.go | 509 ---- vendor/github.com/go-redis/redis/v7/tx.go | 159 -- vendor/github.com/go-redis/redis/v7/universal.go | 198 -- 42 files changed, 12441 deletions(-) delete mode 100644 vendor/github.com/go-redis/redis/v7/.gitignore delete mode 100644 vendor/github.com/go-redis/redis/v7/.golangci.yml delete mode 100644 vendor/github.com/go-redis/redis/v7/.travis.yml delete mode 100644 vendor/github.com/go-redis/redis/v7/CHANGELOG.md delete mode 100644 vendor/github.com/go-redis/redis/v7/LICENSE delete mode 100644 vendor/github.com/go-redis/redis/v7/Makefile delete mode 100644 vendor/github.com/go-redis/redis/v7/README.md delete mode 100644 vendor/github.com/go-redis/redis/v7/cluster.go delete mode 100644 vendor/github.com/go-redis/redis/v7/cluster_commands.go delete mode 100644 vendor/github.com/go-redis/redis/v7/command.go delete mode 100644 vendor/github.com/go-redis/redis/v7/commands.go delete mode 100644 vendor/github.com/go-redis/redis/v7/doc.go delete mode 100644 vendor/github.com/go-redis/redis/v7/error.go delete mode 100644 vendor/github.com/go-redis/redis/v7/go.mod delete mode 100644 vendor/github.com/go-redis/redis/v7/go.sum delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/consistenthash/consistenthash.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/hashtag/hashtag.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/internal.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/log.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/once.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/pool/conn.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/pool/pool.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/pool/pool_single.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/pool/pool_sticky.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/proto/reader.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/proto/scan.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/proto/writer.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/util.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/util/safe.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/util/strconv.go delete mode 100644 vendor/github.com/go-redis/redis/v7/internal/util/unsafe.go delete mode 100644 vendor/github.com/go-redis/redis/v7/iterator.go delete mode 100644 vendor/github.com/go-redis/redis/v7/options.go delete mode 100644 vendor/github.com/go-redis/redis/v7/pipeline.go delete mode 100644 vendor/github.com/go-redis/redis/v7/pubsub.go delete mode 100644 vendor/github.com/go-redis/redis/v7/redis.go delete mode 100644 vendor/github.com/go-redis/redis/v7/result.go delete mode 100644 vendor/github.com/go-redis/redis/v7/ring.go delete mode 100644 vendor/github.com/go-redis/redis/v7/script.go delete mode 100644 vendor/github.com/go-redis/redis/v7/sentinel.go delete mode 100644 vendor/github.com/go-redis/redis/v7/tx.go delete mode 100644 vendor/github.com/go-redis/redis/v7/universal.go (limited to 'vendor/github.com/go-redis/redis/v7') diff --git a/vendor/github.com/go-redis/redis/v7/.gitignore b/vendor/github.com/go-redis/redis/v7/.gitignore deleted file mode 100644 index ebfe903bcd..0000000000 --- a/vendor/github.com/go-redis/redis/v7/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.rdb -testdata/*/ diff --git a/vendor/github.com/go-redis/redis/v7/.golangci.yml b/vendor/github.com/go-redis/redis/v7/.golangci.yml deleted file mode 100644 index 912dab1ef3..0000000000 --- a/vendor/github.com/go-redis/redis/v7/.golangci.yml +++ /dev/null @@ -1,15 +0,0 @@ -run: - concurrency: 8 - deadline: 5m - tests: false -linters: - enable-all: true - disable: - - funlen - - gochecknoglobals - - gocognit - - goconst - - godox - - gosec - - maligned - - wsl diff --git a/vendor/github.com/go-redis/redis/v7/.travis.yml b/vendor/github.com/go-redis/redis/v7/.travis.yml deleted file mode 100644 index 3f93932bc8..0000000000 --- a/vendor/github.com/go-redis/redis/v7/.travis.yml +++ /dev/null @@ -1,22 +0,0 @@ -dist: xenial -language: go - -services: - - redis-server - -go: - - 1.12.x - - 1.13.x - - tip - -matrix: - allow_failures: - - go: tip - -env: - - GO111MODULE=on - -go_import_path: github.com/go-redis/redis - -before_install: - - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.21.0 diff --git a/vendor/github.com/go-redis/redis/v7/CHANGELOG.md b/vendor/github.com/go-redis/redis/v7/CHANGELOG.md deleted file mode 100644 index bd4eccff24..0000000000 --- a/vendor/github.com/go-redis/redis/v7/CHANGELOG.md +++ /dev/null @@ -1,46 +0,0 @@ -# Changelog - -## v7.2 - -- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users. - -## v7.1 - -- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer` interface. - -## v7 - -- *Important*. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline. -- WrapProcess is replaced with more convenient AddHook that has access to context.Context. -- WithContext now can not be used to create a shallow copy of the client. -- New methods ProcessContext, DoContext, and ExecContext. -- Client respects Context.Deadline when setting net.Conn deadline. -- Client listens on Context.Done while waiting for a connection from the pool and returns an error when context context is cancelled. -- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow detecting reconnections. -- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse the time. -- `SetLimiter` is removed and added `Options.Limiter` instead. -- `HMSet` is deprecated as of Redis v4. - -## v6.15 - -- Cluster and Ring pipelines process commands for each node in its own goroutine. - -## 6.14 - -- Added Options.MinIdleConns. -- Added Options.MaxConnAge. -- PoolStats.FreeConns is renamed to PoolStats.IdleConns. -- Add Client.Do to simplify creating custom commands. -- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. -- Lower memory usage. - -## v6.13 - -- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. -- Cluster client was optimized to use much less memory when reloading cluster state. -- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. -- Dialer.KeepAlive is set to 5 minutes by default. - -## v6.12 - -- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup diff --git a/vendor/github.com/go-redis/redis/v7/LICENSE b/vendor/github.com/go-redis/redis/v7/LICENSE deleted file mode 100644 index 298bed9bea..0000000000 --- a/vendor/github.com/go-redis/redis/v7/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2013 The github.com/go-redis/redis Authors. -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/go-redis/redis/v7/Makefile b/vendor/github.com/go-redis/redis/v7/Makefile deleted file mode 100644 index 86609c6e07..0000000000 --- a/vendor/github.com/go-redis/redis/v7/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -all: testdeps - go test ./... - go test ./... -short -race - go test ./... -run=NONE -bench=. -benchmem - env GOOS=linux GOARCH=386 go test ./... - golangci-lint run - -testdeps: testdata/redis/src/redis-server - -bench: testdeps - go test ./... -test.run=NONE -test.bench=. -test.benchmem - -.PHONY: all test testdeps bench - -testdata/redis: - mkdir -p $@ - wget -qO- http://download.redis.io/redis-stable.tar.gz | tar xvz --strip-components=1 -C $@ - -testdata/redis/src/redis-server: testdata/redis - cd $< && make all diff --git a/vendor/github.com/go-redis/redis/v7/README.md b/vendor/github.com/go-redis/redis/v7/README.md deleted file mode 100644 index 0fbb506ead..0000000000 --- a/vendor/github.com/go-redis/redis/v7/README.md +++ /dev/null @@ -1,128 +0,0 @@ -# Redis client for Golang - -[![Build Status](https://travis-ci.org/go-redis/redis.png?branch=master)](https://travis-ci.org/go-redis/redis) -[![GoDoc](https://godoc.org/github.com/go-redis/redis?status.svg)](https://godoc.org/github.com/go-redis/redis) -[![Airbrake](https://img.shields.io/badge/kudos-airbrake.io-orange.svg)](https://airbrake.io) - -Supports: - -- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC. -- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support. -- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub). -- [Transactions](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). -- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). -- [Scripting](https://godoc.org/github.com/go-redis/redis#Script). -- [Timeouts](https://godoc.org/github.com/go-redis/redis#Options). -- [Redis Sentinel](https://godoc.org/github.com/go-redis/redis#NewFailoverClient). -- [Redis Cluster](https://godoc.org/github.com/go-redis/redis#NewClusterClient). -- [Cluster of Redis Servers](https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup) without using cluster mode and Redis Sentinel. -- [Ring](https://godoc.org/github.com/go-redis/redis#NewRing). -- [Instrumentation](https://godoc.org/github.com/go-redis/redis#ex-package--Instrumentation). -- [Cache friendly](https://github.com/go-redis/cache). -- [Rate limiting](https://github.com/go-redis/redis_rate). -- [Distributed Locks](https://github.com/bsm/redislock). - -API docs: https://godoc.org/github.com/go-redis/redis. -Examples: https://godoc.org/github.com/go-redis/redis#pkg-examples. - -## Installation - -go-redis requires a Go version with [Modules](https://github.com/golang/go/wiki/Modules) support and uses import versioning. So please make sure to initialize a Go module before installing go-redis: - -``` shell -go mod init github.com/my/repo -go get github.com/go-redis/redis/v7 -``` - -Import: - -``` go -import "github.com/go-redis/redis/v7" -``` - -## Quickstart - -``` go -func ExampleNewClient() { - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", // no password set - DB: 0, // use default DB - }) - - pong, err := client.Ping().Result() - fmt.Println(pong, err) - // Output: PONG -} - -func ExampleClient() { - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", // no password set - DB: 0, // use default DB - }) - err := client.Set("key", "value", 0).Err() - if err != nil { - panic(err) - } - - val, err := client.Get("key").Result() - if err != nil { - panic(err) - } - fmt.Println("key", val) - - val2, err := client.Get("key2").Result() - if err == redis.Nil { - fmt.Println("key2 does not exist") - } else if err != nil { - panic(err) - } else { - fmt.Println("key2", val2) - } - // Output: key value - // key2 does not exist -} -``` - -## Howto - -Please go through [examples](https://godoc.org/github.com/go-redis/redis#pkg-examples) to get an idea how to use this package. - -## Look and feel - -Some corner cases: - -``` go -// SET key value EX 10 NX -set, err := client.SetNX("key", "value", 10*time.Second).Result() - -// SORT list LIMIT 0 2 ASC -vals, err := client.Sort("list", &redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() - -// ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2 -vals, err := client.ZRangeByScoreWithScores("zset", &redis.ZRangeBy{ - Min: "-inf", - Max: "+inf", - Offset: 0, - Count: 2, -}).Result() - -// ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM -vals, err := client.ZInterStore("out", &redis.ZStore{ - Keys: []string{"zset1", "zset2"}, - Weights: []int64{2, 3} -}).Result() - -// EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" -vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() - -// custom command -res, err := client.Do("set", "key", "value").Result() -``` - -## See also - -- [Golang PostgreSQL ORM](https://github.com/go-pg/pg) -- [Golang msgpack](https://github.com/vmihailenco/msgpack) -- [Golang message task queue](https://github.com/vmihailenco/taskq) diff --git a/vendor/github.com/go-redis/redis/v7/cluster.go b/vendor/github.com/go-redis/redis/v7/cluster.go deleted file mode 100644 index 1907de6c41..0000000000 --- a/vendor/github.com/go-redis/redis/v7/cluster.go +++ /dev/null @@ -1,1669 +0,0 @@ -package redis - -import ( - "context" - "crypto/tls" - "fmt" - "math" - "math/rand" - "net" - "runtime" - "sort" - "sync" - "sync/atomic" - "time" - - "github.com/go-redis/redis/v7/internal" - "github.com/go-redis/redis/v7/internal/hashtag" - "github.com/go-redis/redis/v7/internal/pool" - "github.com/go-redis/redis/v7/internal/proto" -) - -var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") - -// ClusterOptions are used to configure a cluster client and should be -// passed to NewClusterClient. -type ClusterOptions struct { - // A seed list of host:port addresses of cluster nodes. - Addrs []string - - // The maximum number of retries before giving up. Command is retried - // on network errors and MOVED/ASK redirects. - // Default is 8 retries. - MaxRedirects int - - // Enables read-only commands on slave nodes. - ReadOnly bool - // Allows routing read-only commands to the closest master or slave node. - // It automatically enables ReadOnly. - RouteByLatency bool - // Allows routing read-only commands to the random master or slave node. - // It automatically enables ReadOnly. - RouteRandomly bool - - // Optional function that returns cluster slots information. - // It is useful to manually create cluster of standalone Redis servers - // and load-balance read/write operations between master and slaves. - // It can use service like ZooKeeper to maintain configuration information - // and Cluster.ReloadState to manually trigger state reloading. - ClusterSlots func() ([]ClusterSlot, error) - - // Optional hook that is called when a new node is created. - OnNewNode func(*Client) - - // Following options are copied from Options struct. - - Dialer func(ctx context.Context, network, addr string) (net.Conn, error) - - OnConnect func(*Conn) error - - Username string - Password string - - MaxRetries int - MinRetryBackoff time.Duration - MaxRetryBackoff time.Duration - - DialTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration - - // NewClient creates a cluster node client with provided name and options. - NewClient func(opt *Options) *Client - - // PoolSize applies per cluster node and not for the whole cluster. - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration - - TLSConfig *tls.Config -} - -func (opt *ClusterOptions) init() { - if opt.MaxRedirects == -1 { - opt.MaxRedirects = 0 - } else if opt.MaxRedirects == 0 { - opt.MaxRedirects = 8 - } - - if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { - opt.ReadOnly = true - } - - if opt.PoolSize == 0 { - opt.PoolSize = 5 * runtime.NumCPU() - } - - switch opt.ReadTimeout { - case -1: - opt.ReadTimeout = 0 - case 0: - opt.ReadTimeout = 3 * time.Second - } - switch opt.WriteTimeout { - case -1: - opt.WriteTimeout = 0 - case 0: - opt.WriteTimeout = opt.ReadTimeout - } - - switch opt.MinRetryBackoff { - case -1: - opt.MinRetryBackoff = 0 - case 0: - opt.MinRetryBackoff = 8 * time.Millisecond - } - switch opt.MaxRetryBackoff { - case -1: - opt.MaxRetryBackoff = 0 - case 0: - opt.MaxRetryBackoff = 512 * time.Millisecond - } - - if opt.NewClient == nil { - opt.NewClient = NewClient - } -} - -func (opt *ClusterOptions) clientOptions() *Options { - const disableIdleCheck = -1 - - return &Options{ - Dialer: opt.Dialer, - OnConnect: opt.OnConnect, - - MaxRetries: opt.MaxRetries, - MinRetryBackoff: opt.MinRetryBackoff, - MaxRetryBackoff: opt.MaxRetryBackoff, - Username: opt.Username, - Password: opt.Password, - readOnly: opt.ReadOnly, - - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, - - PoolSize: opt.PoolSize, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: disableIdleCheck, - - TLSConfig: opt.TLSConfig, - } -} - -//------------------------------------------------------------------------------ - -type clusterNode struct { - Client *Client - - latency uint32 // atomic - generation uint32 // atomic - failing uint32 // atomic -} - -func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { - opt := clOpt.clientOptions() - opt.Addr = addr - node := clusterNode{ - Client: clOpt.NewClient(opt), - } - - node.latency = math.MaxUint32 - if clOpt.RouteByLatency { - go node.updateLatency() - } - - if clOpt.OnNewNode != nil { - clOpt.OnNewNode(node.Client) - } - - return &node -} - -func (n *clusterNode) String() string { - return n.Client.String() -} - -func (n *clusterNode) Close() error { - return n.Client.Close() -} - -func (n *clusterNode) updateLatency() { - const probes = 10 - - var latency uint32 - for i := 0; i < probes; i++ { - start := time.Now() - n.Client.Ping() - probe := uint32(time.Since(start) / time.Microsecond) - latency = (latency + probe) / 2 - } - atomic.StoreUint32(&n.latency, latency) -} - -func (n *clusterNode) Latency() time.Duration { - latency := atomic.LoadUint32(&n.latency) - return time.Duration(latency) * time.Microsecond -} - -func (n *clusterNode) MarkAsFailing() { - atomic.StoreUint32(&n.failing, uint32(time.Now().Unix())) -} - -func (n *clusterNode) Failing() bool { - const timeout = 15 // 15 seconds - - failing := atomic.LoadUint32(&n.failing) - if failing == 0 { - return false - } - if time.Now().Unix()-int64(failing) < timeout { - return true - } - atomic.StoreUint32(&n.failing, 0) - return false -} - -func (n *clusterNode) Generation() uint32 { - return atomic.LoadUint32(&n.generation) -} - -func (n *clusterNode) SetGeneration(gen uint32) { - for { - v := atomic.LoadUint32(&n.generation) - if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) { - break - } - } -} - -//------------------------------------------------------------------------------ - -type clusterNodes struct { - opt *ClusterOptions - - mu sync.RWMutex - allAddrs []string - allNodes map[string]*clusterNode - clusterAddrs []string - closed bool - - _generation uint32 // atomic -} - -func newClusterNodes(opt *ClusterOptions) *clusterNodes { - return &clusterNodes{ - opt: opt, - - allAddrs: opt.Addrs, - allNodes: make(map[string]*clusterNode), - } -} - -func (c *clusterNodes) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.closed { - return nil - } - c.closed = true - - var firstErr error - for _, node := range c.allNodes { - if err := node.Client.Close(); err != nil && firstErr == nil { - firstErr = err - } - } - - c.allNodes = nil - c.clusterAddrs = nil - - return firstErr -} - -func (c *clusterNodes) Addrs() ([]string, error) { - var addrs []string - c.mu.RLock() - closed := c.closed - if !closed { - if len(c.clusterAddrs) > 0 { - addrs = c.clusterAddrs - } else { - addrs = c.allAddrs - } - } - c.mu.RUnlock() - - if closed { - return nil, pool.ErrClosed - } - if len(addrs) == 0 { - return nil, errClusterNoNodes - } - return addrs, nil -} - -func (c *clusterNodes) NextGeneration() uint32 { - return atomic.AddUint32(&c._generation, 1) -} - -// GC removes unused nodes. -func (c *clusterNodes) GC(generation uint32) { - //nolint:prealloc - var collected []*clusterNode - c.mu.Lock() - for addr, node := range c.allNodes { - if node.Generation() >= generation { - continue - } - - c.clusterAddrs = remove(c.clusterAddrs, addr) - delete(c.allNodes, addr) - collected = append(collected, node) - } - c.mu.Unlock() - - for _, node := range collected { - _ = node.Client.Close() - } -} - -func (c *clusterNodes) Get(addr string) (*clusterNode, error) { - node, err := c.get(addr) - if err != nil { - return nil, err - } - if node != nil { - return node, nil - } - - c.mu.Lock() - defer c.mu.Unlock() - - if c.closed { - return nil, pool.ErrClosed - } - - node, ok := c.allNodes[addr] - if ok { - return node, err - } - - node = newClusterNode(c.opt, addr) - - c.allAddrs = appendIfNotExists(c.allAddrs, addr) - c.clusterAddrs = append(c.clusterAddrs, addr) - c.allNodes[addr] = node - - return node, err -} - -func (c *clusterNodes) get(addr string) (*clusterNode, error) { - var node *clusterNode - var err error - c.mu.RLock() - if c.closed { - err = pool.ErrClosed - } else { - node = c.allNodes[addr] - } - c.mu.RUnlock() - return node, err -} - -func (c *clusterNodes) All() ([]*clusterNode, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - if c.closed { - return nil, pool.ErrClosed - } - - cp := make([]*clusterNode, 0, len(c.allNodes)) - for _, node := range c.allNodes { - cp = append(cp, node) - } - return cp, nil -} - -func (c *clusterNodes) Random() (*clusterNode, error) { - addrs, err := c.Addrs() - if err != nil { - return nil, err - } - - n := rand.Intn(len(addrs)) - return c.Get(addrs[n]) -} - -//------------------------------------------------------------------------------ - -type clusterSlot struct { - start, end int - nodes []*clusterNode -} - -type clusterSlotSlice []*clusterSlot - -func (p clusterSlotSlice) Len() int { - return len(p) -} - -func (p clusterSlotSlice) Less(i, j int) bool { - return p[i].start < p[j].start -} - -func (p clusterSlotSlice) Swap(i, j int) { - p[i], p[j] = p[j], p[i] -} - -type clusterState struct { - nodes *clusterNodes - Masters []*clusterNode - Slaves []*clusterNode - - slots []*clusterSlot - - generation uint32 - createdAt time.Time -} - -func newClusterState( - nodes *clusterNodes, slots []ClusterSlot, origin string, -) (*clusterState, error) { - c := clusterState{ - nodes: nodes, - - slots: make([]*clusterSlot, 0, len(slots)), - - generation: nodes.NextGeneration(), - createdAt: time.Now(), - } - - originHost, _, _ := net.SplitHostPort(origin) - isLoopbackOrigin := isLoopback(originHost) - - for _, slot := range slots { - var nodes []*clusterNode - for i, slotNode := range slot.Nodes { - addr := slotNode.Addr - if !isLoopbackOrigin { - addr = replaceLoopbackHost(addr, originHost) - } - - node, err := c.nodes.Get(addr) - if err != nil { - return nil, err - } - - node.SetGeneration(c.generation) - nodes = append(nodes, node) - - if i == 0 { - c.Masters = appendUniqueNode(c.Masters, node) - } else { - c.Slaves = appendUniqueNode(c.Slaves, node) - } - } - - c.slots = append(c.slots, &clusterSlot{ - start: slot.Start, - end: slot.End, - nodes: nodes, - }) - } - - sort.Sort(clusterSlotSlice(c.slots)) - - time.AfterFunc(time.Minute, func() { - nodes.GC(c.generation) - }) - - return &c, nil -} - -func replaceLoopbackHost(nodeAddr, originHost string) string { - nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) - if err != nil { - return nodeAddr - } - - nodeIP := net.ParseIP(nodeHost) - if nodeIP == nil { - return nodeAddr - } - - if !nodeIP.IsLoopback() { - return nodeAddr - } - - // Use origin host which is not loopback and node port. - return net.JoinHostPort(originHost, nodePort) -} - -func isLoopback(host string) bool { - ip := net.ParseIP(host) - if ip == nil { - return true - } - return ip.IsLoopback() -} - -func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { - nodes := c.slotNodes(slot) - if len(nodes) > 0 { - return nodes[0], nil - } - return c.nodes.Random() -} - -func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { - nodes := c.slotNodes(slot) - switch len(nodes) { - case 0: - return c.nodes.Random() - case 1: - return nodes[0], nil - case 2: - if slave := nodes[1]; !slave.Failing() { - return slave, nil - } - return nodes[0], nil - default: - var slave *clusterNode - for i := 0; i < 10; i++ { - n := rand.Intn(len(nodes)-1) + 1 - slave = nodes[n] - if !slave.Failing() { - return slave, nil - } - } - - // All slaves are loading - use master. - return nodes[0], nil - } -} - -func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { - const threshold = time.Millisecond - - nodes := c.slotNodes(slot) - if len(nodes) == 0 { - return c.nodes.Random() - } - - var node *clusterNode - for _, n := range nodes { - if n.Failing() { - continue - } - if node == nil || node.Latency()-n.Latency() > threshold { - node = n - } - } - return node, nil -} - -func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) { - nodes := c.slotNodes(slot) - if len(nodes) == 0 { - return c.nodes.Random() - } - n := rand.Intn(len(nodes)) - return nodes[n], nil -} - -func (c *clusterState) slotNodes(slot int) []*clusterNode { - i := sort.Search(len(c.slots), func(i int) bool { - return c.slots[i].end >= slot - }) - if i >= len(c.slots) { - return nil - } - x := c.slots[i] - if slot >= x.start && slot <= x.end { - return x.nodes - } - return nil -} - -//------------------------------------------------------------------------------ - -type clusterStateHolder struct { - load func() (*clusterState, error) - - state atomic.Value - reloading uint32 // atomic -} - -func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder { - return &clusterStateHolder{ - load: fn, - } -} - -func (c *clusterStateHolder) Reload() (*clusterState, error) { - state, err := c.load() - if err != nil { - return nil, err - } - c.state.Store(state) - return state, nil -} - -func (c *clusterStateHolder) LazyReload() { - if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { - return - } - go func() { - defer atomic.StoreUint32(&c.reloading, 0) - - _, err := c.Reload() - if err != nil { - return - } - time.Sleep(100 * time.Millisecond) - }() -} - -func (c *clusterStateHolder) Get() (*clusterState, error) { - v := c.state.Load() - if v != nil { - state := v.(*clusterState) - if time.Since(state.createdAt) > time.Minute { - c.LazyReload() - } - return state, nil - } - return c.Reload() -} - -func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { - state, err := c.Reload() - if err == nil { - return state, nil - } - return c.Get() -} - -//------------------------------------------------------------------------------ - -type clusterClient struct { - opt *ClusterOptions - nodes *clusterNodes - state *clusterStateHolder //nolint:structcheck - cmdsInfoCache *cmdsInfoCache //nolint:structcheck -} - -// ClusterClient is a Redis Cluster client representing a pool of zero -// or more underlying connections. It's safe for concurrent use by -// multiple goroutines. -type ClusterClient struct { - *clusterClient - cmdable - hooks - ctx context.Context -} - -// NewClusterClient returns a Redis Cluster client as described in -// http://redis.io/topics/cluster-spec. -func NewClusterClient(opt *ClusterOptions) *ClusterClient { - opt.init() - - c := &ClusterClient{ - clusterClient: &clusterClient{ - opt: opt, - nodes: newClusterNodes(opt), - }, - ctx: context.Background(), - } - c.state = newClusterStateHolder(c.loadState) - c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) - c.cmdable = c.Process - - if opt.IdleCheckFrequency > 0 { - go c.reaper(opt.IdleCheckFrequency) - } - - return c -} - -func (c *ClusterClient) Context() context.Context { - return c.ctx -} - -func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { - if ctx == nil { - panic("nil context") - } - clone := *c - clone.cmdable = clone.Process - clone.hooks.lock() - clone.ctx = ctx - return &clone -} - -// Options returns read-only Options that were used to create the client. -func (c *ClusterClient) Options() *ClusterOptions { - return c.opt -} - -// ReloadState reloads cluster state. If available it calls ClusterSlots func -// to get cluster slots information. -func (c *ClusterClient) ReloadState() error { - _, err := c.state.Reload() - return err -} - -// Close closes the cluster client, releasing any open resources. -// -// It is rare to Close a ClusterClient, as the ClusterClient is meant -// to be long-lived and shared between many goroutines. -func (c *ClusterClient) Close() error { - return c.nodes.Close() -} - -// Do creates a Cmd from the args and processes the cmd. -func (c *ClusterClient) Do(args ...interface{}) *Cmd { - return c.DoContext(c.ctx, args...) -} - -func (c *ClusterClient) DoContext(ctx context.Context, args ...interface{}) *Cmd { - cmd := NewCmd(args...) - _ = c.ProcessContext(ctx, cmd) - return cmd -} - -func (c *ClusterClient) Process(cmd Cmder) error { - return c.ProcessContext(c.ctx, cmd) -} - -func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error { - return c.hooks.process(ctx, cmd, c.process) -} - -func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { - err := c._process(ctx, cmd) - if err != nil { - cmd.SetErr(err) - return err - } - return nil -} - -func (c *ClusterClient) _process(ctx context.Context, cmd Cmder) error { - cmdInfo := c.cmdInfo(cmd.Name()) - slot := c.cmdSlot(cmd) - - var node *clusterNode - var ask bool - var lastErr error - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - return err - } - } - - if node == nil { - var err error - node, err = c.cmdNode(cmdInfo, slot) - if err != nil { - return err - } - } - - if ask { - pipe := node.Client.Pipeline() - _ = pipe.Process(NewCmd("asking")) - _ = pipe.Process(cmd) - _, lastErr = pipe.ExecContext(ctx) - _ = pipe.Close() - ask = false - } else { - lastErr = node.Client.ProcessContext(ctx, cmd) - } - - // If there is no error - we are done. - if lastErr == nil { - return nil - } - if lastErr != Nil { - c.state.LazyReload() - } - if lastErr == pool.ErrClosed || isReadOnlyError(lastErr) { - node = nil - continue - } - - // If slave is loading - pick another node. - if c.opt.ReadOnly && isLoadingError(lastErr) { - node.MarkAsFailing() - node = nil - continue - } - - var moved bool - var addr string - moved, ask, addr = isMovedError(lastErr) - if moved || ask { - var err error - node, err = c.nodes.Get(addr) - if err != nil { - return err - } - continue - } - - if isRetryableError(lastErr, cmd.readTimeout() == nil) { - // First retry the same node. - if attempt == 0 { - continue - } - - // Second try another node. - node.MarkAsFailing() - node = nil - continue - } - - return lastErr - } - return lastErr -} - -// ForEachMaster concurrently calls the fn on each master node in the cluster. -// It returns the first error if any. -func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state, err := c.state.ReloadOrGet() - if err != nil { - return err - } - - var wg sync.WaitGroup - errCh := make(chan error, 1) - - for _, master := range state.Masters { - wg.Add(1) - go func(node *clusterNode) { - defer wg.Done() - err := fn(node.Client) - if err != nil { - select { - case errCh <- err: - default: - } - } - }(master) - } - - wg.Wait() - - select { - case err := <-errCh: - return err - default: - return nil - } -} - -// ForEachSlave concurrently calls the fn on each slave node in the cluster. -// It returns the first error if any. -func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state, err := c.state.ReloadOrGet() - if err != nil { - return err - } - - var wg sync.WaitGroup - errCh := make(chan error, 1) - - for _, slave := range state.Slaves { - wg.Add(1) - go func(node *clusterNode) { - defer wg.Done() - err := fn(node.Client) - if err != nil { - select { - case errCh <- err: - default: - } - } - }(slave) - } - - wg.Wait() - - select { - case err := <-errCh: - return err - default: - return nil - } -} - -// ForEachNode concurrently calls the fn on each known node in the cluster. -// It returns the first error if any. -func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state, err := c.state.ReloadOrGet() - if err != nil { - return err - } - - var wg sync.WaitGroup - errCh := make(chan error, 1) - - worker := func(node *clusterNode) { - defer wg.Done() - err := fn(node.Client) - if err != nil { - select { - case errCh <- err: - default: - } - } - } - - for _, node := range state.Masters { - wg.Add(1) - go worker(node) - } - for _, node := range state.Slaves { - wg.Add(1) - go worker(node) - } - - wg.Wait() - - select { - case err := <-errCh: - return err - default: - return nil - } -} - -// PoolStats returns accumulated connection pool stats. -func (c *ClusterClient) PoolStats() *PoolStats { - var acc PoolStats - - state, _ := c.state.Get() - if state == nil { - return &acc - } - - for _, node := range state.Masters { - s := node.Client.connPool.Stats() - acc.Hits += s.Hits - acc.Misses += s.Misses - acc.Timeouts += s.Timeouts - - acc.TotalConns += s.TotalConns - acc.IdleConns += s.IdleConns - acc.StaleConns += s.StaleConns - } - - for _, node := range state.Slaves { - s := node.Client.connPool.Stats() - acc.Hits += s.Hits - acc.Misses += s.Misses - acc.Timeouts += s.Timeouts - - acc.TotalConns += s.TotalConns - acc.IdleConns += s.IdleConns - acc.StaleConns += s.StaleConns - } - - return &acc -} - -func (c *ClusterClient) loadState() (*clusterState, error) { - if c.opt.ClusterSlots != nil { - slots, err := c.opt.ClusterSlots() - if err != nil { - return nil, err - } - return newClusterState(c.nodes, slots, "") - } - - addrs, err := c.nodes.Addrs() - if err != nil { - return nil, err - } - - var firstErr error - for _, addr := range addrs { - node, err := c.nodes.Get(addr) - if err != nil { - if firstErr == nil { - firstErr = err - } - continue - } - - slots, err := node.Client.ClusterSlots().Result() - if err != nil { - if firstErr == nil { - firstErr = err - } - continue - } - - return newClusterState(c.nodes, slots, node.Client.opt.Addr) - } - - return nil, firstErr -} - -// reaper closes idle connections to the cluster. -func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { - ticker := time.NewTicker(idleCheckFrequency) - defer ticker.Stop() - - for range ticker.C { - nodes, err := c.nodes.All() - if err != nil { - break - } - - for _, node := range nodes { - _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() - if err != nil { - internal.Logger.Printf("ReapStaleConns failed: %s", err) - } - } - } -} - -func (c *ClusterClient) Pipeline() Pipeliner { - pipe := Pipeline{ - ctx: c.ctx, - exec: c.processPipeline, - } - pipe.init() - return &pipe -} - -func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().Pipelined(fn) -} - -func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error { - return c.hooks.processPipeline(ctx, cmds, c._processPipeline) -} - -func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error { - cmdsMap := newCmdsMap() - err := c.mapCmdsByNode(cmdsMap, cmds) - if err != nil { - setCmdsErr(cmds, err) - return err - } - - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - setCmdsErr(cmds, err) - return err - } - } - - failedCmds := newCmdsMap() - var wg sync.WaitGroup - - for node, cmds := range cmdsMap.m { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - - err := c._processPipelineNode(ctx, node, cmds, failedCmds) - if err == nil { - return - } - if attempt < c.opt.MaxRedirects { - if err := c.mapCmdsByNode(failedCmds, cmds); err != nil { - setCmdsErr(cmds, err) - } - } else { - setCmdsErr(cmds, err) - } - }(node, cmds) - } - - wg.Wait() - if len(failedCmds.m) == 0 { - break - } - cmdsMap = failedCmds - } - - return cmdsFirstErr(cmds) -} - -func (c *ClusterClient) mapCmdsByNode(cmdsMap *cmdsMap, cmds []Cmder) error { - state, err := c.state.Get() - if err != nil { - return err - } - - if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) { - for _, cmd := range cmds { - slot := c.cmdSlot(cmd) - node, err := c.slotReadOnlyNode(state, slot) - if err != nil { - return err - } - cmdsMap.Add(node, cmd) - } - return nil - } - - for _, cmd := range cmds { - slot := c.cmdSlot(cmd) - node, err := state.slotMasterNode(slot) - if err != nil { - return err - } - cmdsMap.Add(node, cmd) - } - return nil -} - -func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { - for _, cmd := range cmds { - cmdInfo := c.cmdInfo(cmd.Name()) - if cmdInfo == nil || !cmdInfo.ReadOnly { - return false - } - } - return true -} - -func (c *ClusterClient) _processPipelineNode( - ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, -) error { - return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { - return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { - err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { - return writeCmds(wr, cmds) - }) - if err != nil { - return err - } - - return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { - return c.pipelineReadCmds(node, rd, cmds, failedCmds) - }) - }) - }) -} - -func (c *ClusterClient) pipelineReadCmds( - node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, -) error { - for _, cmd := range cmds { - err := cmd.readReply(rd) - if err == nil { - continue - } - if c.checkMovedErr(cmd, err, failedCmds) { - continue - } - - if c.opt.ReadOnly && isLoadingError(err) { - node.MarkAsFailing() - return err - } - if isRedisError(err) { - continue - } - return err - } - return nil -} - -func (c *ClusterClient) checkMovedErr( - cmd Cmder, err error, failedCmds *cmdsMap, -) bool { - moved, ask, addr := isMovedError(err) - if !moved && !ask { - return false - } - - node, err := c.nodes.Get(addr) - if err != nil { - return false - } - - if moved { - c.state.LazyReload() - failedCmds.Add(node, cmd) - return true - } - - if ask { - failedCmds.Add(node, NewCmd("asking"), cmd) - return true - } - - panic("not reached") -} - -// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. -func (c *ClusterClient) TxPipeline() Pipeliner { - pipe := Pipeline{ - ctx: c.ctx, - exec: c.processTxPipeline, - } - pipe.init() - return &pipe -} - -func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.TxPipeline().Pipelined(fn) -} - -func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { - return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline) -} - -func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error { - state, err := c.state.Get() - if err != nil { - setCmdsErr(cmds, err) - return err - } - - cmdsMap := c.mapCmdsBySlot(cmds) - for slot, cmds := range cmdsMap { - node, err := state.slotMasterNode(slot) - if err != nil { - setCmdsErr(cmds, err) - continue - } - - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - setCmdsErr(cmds, err) - return err - } - } - - failedCmds := newCmdsMap() - var wg sync.WaitGroup - - for node, cmds := range cmdsMap { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - - err := c._processTxPipelineNode(ctx, node, cmds, failedCmds) - if err == nil { - return - } - if attempt < c.opt.MaxRedirects { - if err := c.mapCmdsByNode(failedCmds, cmds); err != nil { - setCmdsErr(cmds, err) - } - } else { - setCmdsErr(cmds, err) - } - }(node, cmds) - } - - wg.Wait() - if len(failedCmds.m) == 0 { - break - } - cmdsMap = failedCmds.m - } - } - - return cmdsFirstErr(cmds) -} - -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { - cmdsMap := make(map[int][]Cmder) - for _, cmd := range cmds { - slot := c.cmdSlot(cmd) - cmdsMap[slot] = append(cmdsMap[slot], cmd) - } - return cmdsMap -} - -func (c *ClusterClient) _processTxPipelineNode( - ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, -) error { - return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { - return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { - err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { - return writeCmds(wr, cmds) - }) - if err != nil { - return err - } - - return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { - statusCmd := cmds[0].(*StatusCmd) - // Trim multi and exec. - cmds = cmds[1 : len(cmds)-1] - - err := c.txPipelineReadQueued(rd, statusCmd, cmds, failedCmds) - if err != nil { - moved, ask, addr := isMovedError(err) - if moved || ask { - return c.cmdsMoved(cmds, moved, ask, addr, failedCmds) - } - return err - } - - return pipelineReadCmds(rd, cmds) - }) - }) - }) -} - -func (c *ClusterClient) txPipelineReadQueued( - rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder, failedCmds *cmdsMap, -) error { - // Parse queued replies. - if err := statusCmd.readReply(rd); err != nil { - return err - } - - for _, cmd := range cmds { - err := statusCmd.readReply(rd) - if err == nil || c.checkMovedErr(cmd, err, failedCmds) || isRedisError(err) { - continue - } - return err - } - - // Parse number of replies. - line, err := rd.ReadLine() - if err != nil { - if err == Nil { - err = TxFailedErr - } - return err - } - - switch line[0] { - case proto.ErrorReply: - return proto.ParseErrorReply(line) - case proto.ArrayReply: - // ok - default: - return fmt.Errorf("redis: expected '*', but got line %q", line) - } - - return nil -} - -func (c *ClusterClient) cmdsMoved( - cmds []Cmder, moved, ask bool, addr string, failedCmds *cmdsMap, -) error { - node, err := c.nodes.Get(addr) - if err != nil { - return err - } - - if moved { - c.state.LazyReload() - for _, cmd := range cmds { - failedCmds.Add(node, cmd) - } - return nil - } - - if ask { - for _, cmd := range cmds { - failedCmds.Add(node, NewCmd("asking"), cmd) - } - return nil - } - - return nil -} - -func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { - return c.WatchContext(c.ctx, fn, keys...) -} - -func (c *ClusterClient) WatchContext(ctx context.Context, fn func(*Tx) error, keys ...string) error { - if len(keys) == 0 { - return fmt.Errorf("redis: Watch requires at least one key") - } - - slot := hashtag.Slot(keys[0]) - for _, key := range keys[1:] { - if hashtag.Slot(key) != slot { - err := fmt.Errorf("redis: Watch requires all keys to be in the same slot") - return err - } - } - - node, err := c.slotMasterNode(slot) - if err != nil { - return err - } - - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - return err - } - } - - err = node.Client.WatchContext(ctx, fn, keys...) - if err == nil { - break - } - if err != Nil { - c.state.LazyReload() - } - - moved, ask, addr := isMovedError(err) - if moved || ask { - node, err = c.nodes.Get(addr) - if err != nil { - return err - } - continue - } - - if err == pool.ErrClosed || isReadOnlyError(err) { - node, err = c.slotMasterNode(slot) - if err != nil { - return err - } - continue - } - - if isRetryableError(err, true) { - continue - } - - return err - } - - return err -} - -func (c *ClusterClient) pubSub() *PubSub { - var node *clusterNode - pubsub := &PubSub{ - opt: c.opt.clientOptions(), - - newConn: func(channels []string) (*pool.Conn, error) { - if node != nil { - panic("node != nil") - } - - var err error - if len(channels) > 0 { - slot := hashtag.Slot(channels[0]) - node, err = c.slotMasterNode(slot) - } else { - node, err = c.nodes.Random() - } - if err != nil { - return nil, err - } - - cn, err := node.Client.newConn(context.TODO()) - if err != nil { - node = nil - - return nil, err - } - - return cn, nil - }, - closeConn: func(cn *pool.Conn) error { - err := node.Client.connPool.CloseConn(cn) - node = nil - return err - }, - } - pubsub.init() - - return pubsub -} - -// Subscribe subscribes the client to the specified channels. -// Channels can be omitted to create empty subscription. -func (c *ClusterClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub() - if len(channels) > 0 { - _ = pubsub.Subscribe(channels...) - } - return pubsub -} - -// PSubscribe subscribes the client to the given patterns. -// Patterns can be omitted to create empty subscription. -func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub() - if len(channels) > 0 { - _ = pubsub.PSubscribe(channels...) - } - return pubsub -} - -func (c *ClusterClient) retryBackoff(attempt int) time.Duration { - return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) -} - -func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) { - addrs, err := c.nodes.Addrs() - if err != nil { - return nil, err - } - - var firstErr error - for _, addr := range addrs { - node, err := c.nodes.Get(addr) - if err != nil { - return nil, err - } - if node == nil { - continue - } - - info, err := node.Client.Command().Result() - if err == nil { - return info, nil - } - if firstErr == nil { - firstErr = err - } - } - return nil, firstErr -} - -func (c *ClusterClient) cmdInfo(name string) *CommandInfo { - cmdsInfo, err := c.cmdsInfoCache.Get() - if err != nil { - return nil - } - - info := cmdsInfo[name] - if info == nil { - internal.Logger.Printf("info for cmd=%s not found", name) - } - return info -} - -func (c *ClusterClient) cmdSlot(cmd Cmder) int { - args := cmd.Args() - if args[0] == "cluster" && args[1] == "getkeysinslot" { - return args[2].(int) - } - - cmdInfo := c.cmdInfo(cmd.Name()) - return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) -} - -func cmdSlot(cmd Cmder, pos int) int { - if pos == 0 { - return hashtag.RandomSlot() - } - firstKey := cmd.stringArg(pos) - return hashtag.Slot(firstKey) -} - -func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, error) { - state, err := c.state.Get() - if err != nil { - return nil, err - } - - if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { - return c.slotReadOnlyNode(state, slot) - } - return state.slotMasterNode(slot) -} - -func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) { - if c.opt.RouteByLatency { - return state.slotClosestNode(slot) - } - if c.opt.RouteRandomly { - return state.slotRandomNode(slot) - } - return state.slotSlaveNode(slot) -} - -func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { - state, err := c.state.Get() - if err != nil { - return nil, err - } - return state.slotMasterNode(slot) -} - -func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { - for _, n := range nodes { - if n == node { - return nodes - } - } - return append(nodes, node) -} - -func appendIfNotExists(ss []string, es ...string) []string { -loop: - for _, e := range es { - for _, s := range ss { - if s == e { - continue loop - } - } - ss = append(ss, e) - } - return ss -} - -func remove(ss []string, es ...string) []string { - if len(es) == 0 { - return ss[:0] - } - for _, e := range es { - for i, s := range ss { - if s == e { - ss = append(ss[:i], ss[i+1:]...) - break - } - } - } - return ss -} - -//------------------------------------------------------------------------------ - -type cmdsMap struct { - mu sync.Mutex - m map[*clusterNode][]Cmder -} - -func newCmdsMap() *cmdsMap { - return &cmdsMap{ - m: make(map[*clusterNode][]Cmder), - } -} - -func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) { - m.mu.Lock() - m.m[node] = append(m.m[node], cmds...) - m.mu.Unlock() -} diff --git a/vendor/github.com/go-redis/redis/v7/cluster_commands.go b/vendor/github.com/go-redis/redis/v7/cluster_commands.go deleted file mode 100644 index c9b9b9de24..0000000000 --- a/vendor/github.com/go-redis/redis/v7/cluster_commands.go +++ /dev/null @@ -1,22 +0,0 @@ -package redis - -import "sync/atomic" - -func (c *ClusterClient) DBSize() *IntCmd { - cmd := NewIntCmd("dbsize") - var size int64 - err := c.ForEachMaster(func(master *Client) error { - n, err := master.DBSize().Result() - if err != nil { - return err - } - atomic.AddInt64(&size, n) - return nil - }) - if err != nil { - cmd.SetErr(err) - return cmd - } - cmd.val = size - return cmd -} diff --git a/vendor/github.com/go-redis/redis/v7/command.go b/vendor/github.com/go-redis/redis/v7/command.go deleted file mode 100644 index dd7fe4a91e..0000000000 --- a/vendor/github.com/go-redis/redis/v7/command.go +++ /dev/null @@ -1,2064 +0,0 @@ -package redis - -import ( - "fmt" - "net" - "strconv" - "strings" - "time" - - "github.com/go-redis/redis/v7/internal" - "github.com/go-redis/redis/v7/internal/proto" - "github.com/go-redis/redis/v7/internal/util" -) - -type Cmder interface { - Name() string - Args() []interface{} - String() string - stringArg(int) string - - readTimeout() *time.Duration - readReply(rd *proto.Reader) error - - SetErr(error) - Err() error -} - -func setCmdsErr(cmds []Cmder, e error) { - for _, cmd := range cmds { - if cmd.Err() == nil { - cmd.SetErr(e) - } - } -} - -func cmdsFirstErr(cmds []Cmder) error { - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - return err - } - } - return nil -} - -func writeCmds(wr *proto.Writer, cmds []Cmder) error { - for _, cmd := range cmds { - if err := writeCmd(wr, cmd); err != nil { - return err - } - } - return nil -} - -func writeCmd(wr *proto.Writer, cmd Cmder) error { - return wr.WriteArgs(cmd.Args()) -} - -func cmdString(cmd Cmder, val interface{}) string { - ss := make([]string, 0, len(cmd.Args())) - for _, arg := range cmd.Args() { - ss = append(ss, fmt.Sprint(arg)) - } - s := strings.Join(ss, " ") - if err := cmd.Err(); err != nil { - return s + ": " + err.Error() - } - if val != nil { - switch vv := val.(type) { - case []byte: - return s + ": " + string(vv) - default: - return s + ": " + fmt.Sprint(val) - } - } - return s -} - -func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { - switch cmd.Name() { - case "eval", "evalsha": - if cmd.stringArg(2) != "0" { - return 3 - } - - return 0 - case "publish": - return 1 - } - if info == nil { - return 0 - } - return int(info.FirstKeyPos) -} - -//------------------------------------------------------------------------------ - -type baseCmd struct { - args []interface{} - err error - - _readTimeout *time.Duration -} - -var _ Cmder = (*Cmd)(nil) - -func (cmd *baseCmd) Name() string { - if len(cmd.args) == 0 { - return "" - } - // Cmd name must be lower cased. - return internal.ToLower(cmd.stringArg(0)) -} - -func (cmd *baseCmd) Args() []interface{} { - return cmd.args -} - -func (cmd *baseCmd) stringArg(pos int) string { - if pos < 0 || pos >= len(cmd.args) { - return "" - } - s, _ := cmd.args[pos].(string) - return s -} - -func (cmd *baseCmd) SetErr(e error) { - cmd.err = e -} - -func (cmd *baseCmd) Err() error { - return cmd.err -} - -func (cmd *baseCmd) readTimeout() *time.Duration { - return cmd._readTimeout -} - -func (cmd *baseCmd) setReadTimeout(d time.Duration) { - cmd._readTimeout = &d -} - -//------------------------------------------------------------------------------ - -type Cmd struct { - baseCmd - - val interface{} -} - -func NewCmd(args ...interface{}) *Cmd { - return &Cmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *Cmd) Val() interface{} { - return cmd.val -} - -func (cmd *Cmd) Result() (interface{}, error) { - return cmd.val, cmd.err -} - -func (cmd *Cmd) Text() (string, error) { - if cmd.err != nil { - return "", cmd.err - } - switch val := cmd.val.(type) { - case string: - return val, nil - default: - err := fmt.Errorf("redis: unexpected type=%T for String", val) - return "", err - } -} - -func (cmd *Cmd) Int() (int, error) { - if cmd.err != nil { - return 0, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return int(val), nil - case string: - return strconv.Atoi(val) - default: - err := fmt.Errorf("redis: unexpected type=%T for Int", val) - return 0, err - } -} - -func (cmd *Cmd) Int64() (int64, error) { - if cmd.err != nil { - return 0, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return val, nil - case string: - return strconv.ParseInt(val, 10, 64) - default: - err := fmt.Errorf("redis: unexpected type=%T for Int64", val) - return 0, err - } -} - -func (cmd *Cmd) Uint64() (uint64, error) { - if cmd.err != nil { - return 0, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return uint64(val), nil - case string: - return strconv.ParseUint(val, 10, 64) - default: - err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) - return 0, err - } -} - -func (cmd *Cmd) Float32() (float32, error) { - if cmd.err != nil { - return 0, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return float32(val), nil - case string: - f, err := strconv.ParseFloat(val, 32) - if err != nil { - return 0, err - } - return float32(f), nil - default: - err := fmt.Errorf("redis: unexpected type=%T for Float32", val) - return 0, err - } -} - -func (cmd *Cmd) Float64() (float64, error) { - if cmd.err != nil { - return 0, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return float64(val), nil - case string: - return strconv.ParseFloat(val, 64) - default: - err := fmt.Errorf("redis: unexpected type=%T for Float64", val) - return 0, err - } -} - -func (cmd *Cmd) Bool() (bool, error) { - if cmd.err != nil { - return false, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return val != 0, nil - case string: - return strconv.ParseBool(val) - default: - err := fmt.Errorf("redis: unexpected type=%T for Bool", val) - return false, err - } -} - -func (cmd *Cmd) readReply(rd *proto.Reader) error { - cmd.val, cmd.err = rd.ReadReply(sliceParser) - return cmd.err -} - -// Implements proto.MultiBulkParse -func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { - vals := make([]interface{}, n) - for i := 0; i < len(vals); i++ { - v, err := rd.ReadReply(sliceParser) - if err != nil { - if err == Nil { - vals[i] = nil - continue - } - if err, ok := err.(proto.RedisError); ok { - vals[i] = err - continue - } - return nil, err - } - vals[i] = v - } - return vals, nil -} - -//------------------------------------------------------------------------------ - -type SliceCmd struct { - baseCmd - - val []interface{} -} - -var _ Cmder = (*SliceCmd)(nil) - -func NewSliceCmd(args ...interface{}) *SliceCmd { - return &SliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *SliceCmd) Val() []interface{} { - return cmd.val -} - -func (cmd *SliceCmd) Result() ([]interface{}, error) { - return cmd.val, cmd.err -} - -func (cmd *SliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *SliceCmd) readReply(rd *proto.Reader) error { - var v interface{} - v, cmd.err = rd.ReadArrayReply(sliceParser) - if cmd.err != nil { - return cmd.err - } - cmd.val = v.([]interface{}) - return nil -} - -//------------------------------------------------------------------------------ - -type StatusCmd struct { - baseCmd - - val string -} - -var _ Cmder = (*StatusCmd)(nil) - -func NewStatusCmd(args ...interface{}) *StatusCmd { - return &StatusCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *StatusCmd) Val() string { - return cmd.val -} - -func (cmd *StatusCmd) Result() (string, error) { - return cmd.val, cmd.err -} - -func (cmd *StatusCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *StatusCmd) readReply(rd *proto.Reader) error { - cmd.val, cmd.err = rd.ReadString() - return cmd.err -} - -//------------------------------------------------------------------------------ - -type IntCmd struct { - baseCmd - - val int64 -} - -var _ Cmder = (*IntCmd)(nil) - -func NewIntCmd(args ...interface{}) *IntCmd { - return &IntCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *IntCmd) Val() int64 { - return cmd.val -} - -func (cmd *IntCmd) Result() (int64, error) { - return cmd.val, cmd.err -} - -func (cmd *IntCmd) Uint64() (uint64, error) { - return uint64(cmd.val), cmd.err -} - -func (cmd *IntCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *IntCmd) readReply(rd *proto.Reader) error { - cmd.val, cmd.err = rd.ReadIntReply() - return cmd.err -} - -//------------------------------------------------------------------------------ - -type IntSliceCmd struct { - baseCmd - - val []int64 -} - -var _ Cmder = (*IntSliceCmd)(nil) - -func NewIntSliceCmd(args ...interface{}) *IntSliceCmd { - return &IntSliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *IntSliceCmd) Val() []int64 { - return cmd.val -} - -func (cmd *IntSliceCmd) Result() ([]int64, error) { - return cmd.val, cmd.err -} - -func (cmd *IntSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *IntSliceCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]int64, n) - for i := 0; i < len(cmd.val); i++ { - num, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.val[i] = num - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type DurationCmd struct { - baseCmd - - val time.Duration - precision time.Duration -} - -var _ Cmder = (*DurationCmd)(nil) - -func NewDurationCmd(precision time.Duration, args ...interface{}) *DurationCmd { - return &DurationCmd{ - baseCmd: baseCmd{args: args}, - precision: precision, - } -} - -func (cmd *DurationCmd) Val() time.Duration { - return cmd.val -} - -func (cmd *DurationCmd) Result() (time.Duration, error) { - return cmd.val, cmd.err -} - -func (cmd *DurationCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *DurationCmd) readReply(rd *proto.Reader) error { - var n int64 - n, cmd.err = rd.ReadIntReply() - if cmd.err != nil { - return cmd.err - } - switch n { - // -2 if the key does not exist - // -1 if the key exists but has no associated expire - case -2, -1: - cmd.val = time.Duration(n) - default: - cmd.val = time.Duration(n) * cmd.precision - } - return nil -} - -//------------------------------------------------------------------------------ - -type TimeCmd struct { - baseCmd - - val time.Time -} - -var _ Cmder = (*TimeCmd)(nil) - -func NewTimeCmd(args ...interface{}) *TimeCmd { - return &TimeCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *TimeCmd) Val() time.Time { - return cmd.val -} - -func (cmd *TimeCmd) Result() (time.Time, error) { - return cmd.val, cmd.err -} - -func (cmd *TimeCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *TimeCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d elements, expected 2", n) - } - - sec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - microsec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - cmd.val = time.Unix(sec, microsec*1000) - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type BoolCmd struct { - baseCmd - - val bool -} - -var _ Cmder = (*BoolCmd)(nil) - -func NewBoolCmd(args ...interface{}) *BoolCmd { - return &BoolCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *BoolCmd) Val() bool { - return cmd.val -} - -func (cmd *BoolCmd) Result() (bool, error) { - return cmd.val, cmd.err -} - -func (cmd *BoolCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *BoolCmd) readReply(rd *proto.Reader) error { - var v interface{} - v, cmd.err = rd.ReadReply(nil) - // `SET key value NX` returns nil when key already exists. But - // `SETNX key value` returns bool (0/1). So convert nil to bool. - if cmd.err == Nil { - cmd.val = false - cmd.err = nil - return nil - } - if cmd.err != nil { - return cmd.err - } - switch v := v.(type) { - case int64: - cmd.val = v == 1 - return nil - case string: - cmd.val = v == "OK" - return nil - default: - cmd.err = fmt.Errorf("got %T, wanted int64 or string", v) - return cmd.err - } -} - -//------------------------------------------------------------------------------ - -type StringCmd struct { - baseCmd - - val string -} - -var _ Cmder = (*StringCmd)(nil) - -func NewStringCmd(args ...interface{}) *StringCmd { - return &StringCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *StringCmd) Val() string { - return cmd.val -} - -func (cmd *StringCmd) Result() (string, error) { - return cmd.Val(), cmd.err -} - -func (cmd *StringCmd) Bytes() ([]byte, error) { - return util.StringToBytes(cmd.val), cmd.err -} - -func (cmd *StringCmd) Int() (int, error) { - if cmd.err != nil { - return 0, cmd.err - } - return strconv.Atoi(cmd.Val()) -} - -func (cmd *StringCmd) Int64() (int64, error) { - if cmd.err != nil { - return 0, cmd.err - } - return strconv.ParseInt(cmd.Val(), 10, 64) -} - -func (cmd *StringCmd) Uint64() (uint64, error) { - if cmd.err != nil { - return 0, cmd.err - } - return strconv.ParseUint(cmd.Val(), 10, 64) -} - -func (cmd *StringCmd) Float32() (float32, error) { - if cmd.err != nil { - return 0, cmd.err - } - f, err := strconv.ParseFloat(cmd.Val(), 32) - if err != nil { - return 0, err - } - return float32(f), nil -} - -func (cmd *StringCmd) Float64() (float64, error) { - if cmd.err != nil { - return 0, cmd.err - } - return strconv.ParseFloat(cmd.Val(), 64) -} - -func (cmd *StringCmd) Time() (time.Time, error) { - if cmd.err != nil { - return time.Time{}, cmd.err - } - return time.Parse(time.RFC3339Nano, cmd.Val()) -} - -func (cmd *StringCmd) Scan(val interface{}) error { - if cmd.err != nil { - return cmd.err - } - return proto.Scan([]byte(cmd.val), val) -} - -func (cmd *StringCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *StringCmd) readReply(rd *proto.Reader) error { - cmd.val, cmd.err = rd.ReadString() - return cmd.err -} - -//------------------------------------------------------------------------------ - -type FloatCmd struct { - baseCmd - - val float64 -} - -var _ Cmder = (*FloatCmd)(nil) - -func NewFloatCmd(args ...interface{}) *FloatCmd { - return &FloatCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *FloatCmd) Val() float64 { - return cmd.val -} - -func (cmd *FloatCmd) Result() (float64, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *FloatCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *FloatCmd) readReply(rd *proto.Reader) error { - cmd.val, cmd.err = rd.ReadFloatReply() - return cmd.err -} - -//------------------------------------------------------------------------------ - -type StringSliceCmd struct { - baseCmd - - val []string -} - -var _ Cmder = (*StringSliceCmd)(nil) - -func NewStringSliceCmd(args ...interface{}) *StringSliceCmd { - return &StringSliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *StringSliceCmd) Val() []string { - return cmd.val -} - -func (cmd *StringSliceCmd) Result() ([]string, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *StringSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { - return proto.ScanSlice(cmd.Val(), container) -} - -func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]string, n) - for i := 0; i < len(cmd.val); i++ { - switch s, err := rd.ReadString(); { - case err == Nil: - cmd.val[i] = "" - case err != nil: - return nil, err - default: - cmd.val[i] = s - } - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type BoolSliceCmd struct { - baseCmd - - val []bool -} - -var _ Cmder = (*BoolSliceCmd)(nil) - -func NewBoolSliceCmd(args ...interface{}) *BoolSliceCmd { - return &BoolSliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *BoolSliceCmd) Val() []bool { - return cmd.val -} - -func (cmd *BoolSliceCmd) Result() ([]bool, error) { - return cmd.val, cmd.err -} - -func (cmd *BoolSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]bool, n) - for i := 0; i < len(cmd.val); i++ { - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.val[i] = n == 1 - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type StringStringMapCmd struct { - baseCmd - - val map[string]string -} - -var _ Cmder = (*StringStringMapCmd)(nil) - -func NewStringStringMapCmd(args ...interface{}) *StringStringMapCmd { - return &StringStringMapCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *StringStringMapCmd) Val() map[string]string { - return cmd.val -} - -func (cmd *StringStringMapCmd) Result() (map[string]string, error) { - return cmd.val, cmd.err -} - -func (cmd *StringStringMapCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]string, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - - value, err := rd.ReadString() - if err != nil { - return nil, err - } - - cmd.val[key] = value - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type StringIntMapCmd struct { - baseCmd - - val map[string]int64 -} - -var _ Cmder = (*StringIntMapCmd)(nil) - -func NewStringIntMapCmd(args ...interface{}) *StringIntMapCmd { - return &StringIntMapCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *StringIntMapCmd) Val() map[string]int64 { - return cmd.val -} - -func (cmd *StringIntMapCmd) Result() (map[string]int64, error) { - return cmd.val, cmd.err -} - -func (cmd *StringIntMapCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]int64, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - cmd.val[key] = n - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type StringStructMapCmd struct { - baseCmd - - val map[string]struct{} -} - -var _ Cmder = (*StringStructMapCmd)(nil) - -func NewStringStructMapCmd(args ...interface{}) *StringStructMapCmd { - return &StringStructMapCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *StringStructMapCmd) Val() map[string]struct{} { - return cmd.val -} - -func (cmd *StringStructMapCmd) Result() (map[string]struct{}, error) { - return cmd.val, cmd.err -} - -func (cmd *StringStructMapCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]struct{}, n) - for i := int64(0); i < n; i++ { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - cmd.val[key] = struct{}{} - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type XMessage struct { - ID string - Values map[string]interface{} -} - -type XMessageSliceCmd struct { - baseCmd - - val []XMessage -} - -var _ Cmder = (*XMessageSliceCmd)(nil) - -func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { - return &XMessageSliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *XMessageSliceCmd) Val() []XMessage { - return cmd.val -} - -func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { - return cmd.val, cmd.err -} - -func (cmd *XMessageSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { - var v interface{} - v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) - if cmd.err != nil { - return cmd.err - } - cmd.val = v.([]XMessage) - return nil -} - -// Implements proto.MultiBulkParse -func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - msgs := make([]XMessage, n) - for i := 0; i < len(msgs); i++ { - i := i - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - id, err := rd.ReadString() - if err != nil { - return nil, err - } - - var values map[string]interface{} - - v, err := rd.ReadArrayReply(stringInterfaceMapParser) - if err != nil { - if err != proto.Nil { - return nil, err - } - } else { - values = v.(map[string]interface{}) - } - - msgs[i] = XMessage{ - ID: id, - Values: values, - } - return nil, nil - }) - if err != nil { - return nil, err - } - } - return msgs, nil -} - -// Implements proto.MultiBulkParse -func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]interface{}, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - - value, err := rd.ReadString() - if err != nil { - return nil, err - } - - m[key] = value - } - return m, nil -} - -//------------------------------------------------------------------------------ - -type XStream struct { - Stream string - Messages []XMessage -} - -type XStreamSliceCmd struct { - baseCmd - - val []XStream -} - -var _ Cmder = (*XStreamSliceCmd)(nil) - -func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { - return &XStreamSliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *XStreamSliceCmd) Val() []XStream { - return cmd.val -} - -func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { - return cmd.val, cmd.err -} - -func (cmd *XStreamSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]XStream, n) - for i := 0; i < len(cmd.val); i++ { - i := i - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) - } - - stream, err := rd.ReadString() - if err != nil { - return nil, err - } - - v, err := rd.ReadArrayReply(xMessageSliceParser) - if err != nil { - return nil, err - } - - cmd.val[i] = XStream{ - Stream: stream, - Messages: v.([]XMessage), - } - return nil, nil - }) - if err != nil { - return nil, err - } - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type XPending struct { - Count int64 - Lower string - Higher string - Consumers map[string]int64 -} - -type XPendingCmd struct { - baseCmd - val *XPending -} - -var _ Cmder = (*XPendingCmd)(nil) - -func NewXPendingCmd(args ...interface{}) *XPendingCmd { - return &XPendingCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *XPendingCmd) Val() *XPending { - return cmd.val -} - -func (cmd *XPendingCmd) Result() (*XPending, error) { - return cmd.val, cmd.err -} - -func (cmd *XPendingCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 4 { - return nil, fmt.Errorf("got %d, wanted 4", n) - } - - count, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - lower, err := rd.ReadString() - if err != nil && err != Nil { - return nil, err - } - - higher, err := rd.ReadString() - if err != nil && err != Nil { - return nil, err - } - - cmd.val = &XPending{ - Count: count, - Lower: lower, - Higher: higher, - } - _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - for i := int64(0); i < n; i++ { - _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) - } - - consumerName, err := rd.ReadString() - if err != nil { - return nil, err - } - - consumerPending, err := rd.ReadInt() - if err != nil { - return nil, err - } - - if cmd.val.Consumers == nil { - cmd.val.Consumers = make(map[string]int64) - } - cmd.val.Consumers[consumerName] = consumerPending - - return nil, nil - }) - if err != nil { - return nil, err - } - } - return nil, nil - }) - if err != nil && err != Nil { - return nil, err - } - - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type XPendingExt struct { - ID string - Consumer string - Idle time.Duration - RetryCount int64 -} - -type XPendingExtCmd struct { - baseCmd - val []XPendingExt -} - -var _ Cmder = (*XPendingExtCmd)(nil) - -func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { - return &XPendingExtCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *XPendingExtCmd) Val() []XPendingExt { - return cmd.val -} - -func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { - return cmd.val, cmd.err -} - -func (cmd *XPendingExtCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]XPendingExt, 0, n) - for i := int64(0); i < n; i++ { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 4 { - return nil, fmt.Errorf("got %d, wanted 4", n) - } - - id, err := rd.ReadString() - if err != nil { - return nil, err - } - - consumer, err := rd.ReadString() - if err != nil && err != Nil { - return nil, err - } - - idle, err := rd.ReadIntReply() - if err != nil && err != Nil { - return nil, err - } - - retryCount, err := rd.ReadIntReply() - if err != nil && err != Nil { - return nil, err - } - - cmd.val = append(cmd.val, XPendingExt{ - ID: id, - Consumer: consumer, - Idle: time.Duration(idle) * time.Millisecond, - RetryCount: retryCount, - }) - return nil, nil - }) - if err != nil { - return nil, err - } - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type XInfoGroupsCmd struct { - baseCmd - val []XInfoGroups -} - -type XInfoGroups struct { - Name string - Consumers int64 - Pending int64 - LastDeliveredID string -} - -var _ Cmder = (*XInfoGroupsCmd)(nil) - -func NewXInfoGroupsCmd(stream string) *XInfoGroupsCmd { - return &XInfoGroupsCmd{ - baseCmd: baseCmd{args: []interface{}{"xinfo", "groups", stream}}, - } -} - -func (cmd *XInfoGroupsCmd) Val() []XInfoGroups { - return cmd.val -} - -func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroups, error) { - return cmd.val, cmd.err -} - -func (cmd *XInfoGroupsCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply( - func(rd *proto.Reader, n int64) (interface{}, error) { - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(xGroupInfoParser) - if err != nil { - return nil, err - } - cmd.val = append(cmd.val, v.(XInfoGroups)) - } - return nil, nil - }) - return nil -} - -func xGroupInfoParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 8 { - return nil, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply,"+ - "wanted 8", n) - } - var ( - err error - grp XInfoGroups - key string - val string - ) - - for i := 0; i < 4; i++ { - key, err = rd.ReadString() - if err != nil { - return nil, err - } - val, err = rd.ReadString() - if err != nil { - return nil, err - } - switch key { - case "name": - grp.Name = val - case "consumers": - grp.Consumers, err = strconv.ParseInt(val, 0, 64) - case "pending": - grp.Pending, err = strconv.ParseInt(val, 0, 64) - case "last-delivered-id": - grp.LastDeliveredID = val - default: - return nil, fmt.Errorf("redis: unexpected content %s "+ - "in XINFO GROUPS reply", key) - } - if err != nil { - return nil, err - } - } - return grp, err -} - -//------------------------------------------------------------------------------ - -type ZSliceCmd struct { - baseCmd - - val []Z -} - -var _ Cmder = (*ZSliceCmd)(nil) - -func NewZSliceCmd(args ...interface{}) *ZSliceCmd { - return &ZSliceCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *ZSliceCmd) Val() []Z { - return cmd.val -} - -func (cmd *ZSliceCmd) Result() ([]Z, error) { - return cmd.val, cmd.err -} - -func (cmd *ZSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]Z, n/2) - for i := 0; i < len(cmd.val); i++ { - member, err := rd.ReadString() - if err != nil { - return nil, err - } - - score, err := rd.ReadFloatReply() - if err != nil { - return nil, err - } - - cmd.val[i] = Z{ - Member: member, - Score: score, - } - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type ZWithKeyCmd struct { - baseCmd - - val *ZWithKey -} - -var _ Cmder = (*ZWithKeyCmd)(nil) - -func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { - return &ZWithKeyCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *ZWithKeyCmd) Val() *ZWithKey { - return cmd.val -} - -func (cmd *ZWithKeyCmd) Result() (*ZWithKey, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *ZWithKeyCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 3 { - return nil, fmt.Errorf("got %d elements, expected 3", n) - } - - cmd.val = &ZWithKey{} - var err error - - cmd.val.Key, err = rd.ReadString() - if err != nil { - return nil, err - } - - cmd.val.Member, err = rd.ReadString() - if err != nil { - return nil, err - } - - cmd.val.Score, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type ScanCmd struct { - baseCmd - - page []string - cursor uint64 - - process func(cmd Cmder) error -} - -var _ Cmder = (*ScanCmd)(nil) - -func NewScanCmd(process func(cmd Cmder) error, args ...interface{}) *ScanCmd { - return &ScanCmd{ - baseCmd: baseCmd{args: args}, - process: process, - } -} - -func (cmd *ScanCmd) Val() (keys []string, cursor uint64) { - return cmd.page, cmd.cursor -} - -func (cmd *ScanCmd) Result() (keys []string, cursor uint64, err error) { - return cmd.page, cmd.cursor, cmd.err -} - -func (cmd *ScanCmd) String() string { - return cmdString(cmd, cmd.page) -} - -func (cmd *ScanCmd) readReply(rd *proto.Reader) error { - cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() - return cmd.err -} - -// Iterator creates a new ScanIterator. -func (cmd *ScanCmd) Iterator() *ScanIterator { - return &ScanIterator{ - cmd: cmd, - } -} - -//------------------------------------------------------------------------------ - -type ClusterNode struct { - ID string - Addr string -} - -type ClusterSlot struct { - Start int - End int - Nodes []ClusterNode -} - -type ClusterSlotsCmd struct { - baseCmd - - val []ClusterSlot -} - -var _ Cmder = (*ClusterSlotsCmd)(nil) - -func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd { - return &ClusterSlotsCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *ClusterSlotsCmd) Val() []ClusterSlot { - return cmd.val -} - -func (cmd *ClusterSlotsCmd) Result() ([]ClusterSlot, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *ClusterSlotsCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]ClusterSlot, n) - for i := 0; i < len(cmd.val); i++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n < 2 { - err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) - return nil, err - } - - start, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - end, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - nodes := make([]ClusterNode, n-2) - for j := 0; j < len(nodes); j++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 && n != 3 { - err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) - return nil, err - } - - ip, err := rd.ReadString() - if err != nil { - return nil, err - } - - port, err := rd.ReadString() - if err != nil { - return nil, err - } - - nodes[j].Addr = net.JoinHostPort(ip, port) - - if n == 3 { - id, err := rd.ReadString() - if err != nil { - return nil, err - } - nodes[j].ID = id - } - } - - cmd.val[i] = ClusterSlot{ - Start: int(start), - End: int(end), - Nodes: nodes, - } - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -// GeoLocation is used with GeoAdd to add geospatial location. -type GeoLocation struct { - Name string - Longitude, Latitude, Dist float64 - GeoHash int64 -} - -// GeoRadiusQuery is used with GeoRadius to query geospatial index. -type GeoRadiusQuery struct { - Radius float64 - // Can be m, km, ft, or mi. Default is km. - Unit string - WithCoord bool - WithDist bool - WithGeoHash bool - Count int - // Can be ASC or DESC. Default is no sort order. - Sort string - Store string - StoreDist string -} - -type GeoLocationCmd struct { - baseCmd - - q *GeoRadiusQuery - locations []GeoLocation -} - -var _ Cmder = (*GeoLocationCmd)(nil) - -func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd { - return &GeoLocationCmd{ - baseCmd: baseCmd{args: geoLocationArgs(q, args...)}, - q: q, - } -} - -func geoLocationArgs(q *GeoRadiusQuery, args ...interface{}) []interface{} { - args = append(args, q.Radius) - if q.Unit != "" { - args = append(args, q.Unit) - } else { - args = append(args, "km") - } - if q.WithCoord { - args = append(args, "withcoord") - } - if q.WithDist { - args = append(args, "withdist") - } - if q.WithGeoHash { - args = append(args, "withhash") - } - if q.Count > 0 { - args = append(args, "count", q.Count) - } - if q.Sort != "" { - args = append(args, q.Sort) - } - if q.Store != "" { - args = append(args, "store") - args = append(args, q.Store) - } - if q.StoreDist != "" { - args = append(args, "storedist") - args = append(args, q.StoreDist) - } - return args -} - -func (cmd *GeoLocationCmd) Val() []GeoLocation { - return cmd.locations -} - -func (cmd *GeoLocationCmd) Result() ([]GeoLocation, error) { - return cmd.locations, cmd.err -} - -func (cmd *GeoLocationCmd) String() string { - return cmdString(cmd, cmd.locations) -} - -func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { - var v interface{} - v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) - if cmd.err != nil { - return cmd.err - } - cmd.locations = v.([]GeoLocation) - return nil -} - -func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - locs := make([]GeoLocation, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(newGeoLocationParser(q)) - if err != nil { - return nil, err - } - switch vv := v.(type) { - case string: - locs = append(locs, GeoLocation{ - Name: vv, - }) - case *GeoLocation: - //TODO: avoid copying - locs = append(locs, *vv) - default: - return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) - } - } - return locs, nil - } -} - -func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - var loc GeoLocation - var err error - - loc.Name, err = rd.ReadString() - if err != nil { - return nil, err - } - if q.WithDist { - loc.Dist, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - if q.WithGeoHash { - loc.GeoHash, err = rd.ReadIntReply() - if err != nil { - return nil, err - } - } - if q.WithCoord { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 { - return nil, fmt.Errorf("got %d coordinates, expected 2", n) - } - - loc.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - loc.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - - return &loc, nil - } -} - -//------------------------------------------------------------------------------ - -type GeoPos struct { - Longitude, Latitude float64 -} - -type GeoPosCmd struct { - baseCmd - - val []*GeoPos -} - -var _ Cmder = (*GeoPosCmd)(nil) - -func NewGeoPosCmd(args ...interface{}) *GeoPosCmd { - return &GeoPosCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *GeoPosCmd) Val() []*GeoPos { - return cmd.val -} - -func (cmd *GeoPosCmd) Result() ([]*GeoPos, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *GeoPosCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]*GeoPos, n) - for i := 0; i < len(cmd.val); i++ { - i := i - _, err := rd.ReadReply(func(rd *proto.Reader, n int64) (interface{}, error) { - longitude, err := rd.ReadFloatReply() - if err != nil { - return nil, err - } - - latitude, err := rd.ReadFloatReply() - if err != nil { - return nil, err - } - - cmd.val[i] = &GeoPos{ - Longitude: longitude, - Latitude: latitude, - } - return nil, nil - }) - if err != nil { - if err == Nil { - cmd.val[i] = nil - continue - } - return nil, err - } - } - return nil, nil - }) - return cmd.err -} - -//------------------------------------------------------------------------------ - -type CommandInfo struct { - Name string - Arity int8 - Flags []string - ACLFlags []string - FirstKeyPos int8 - LastKeyPos int8 - StepCount int8 - ReadOnly bool -} - -type CommandsInfoCmd struct { - baseCmd - - val map[string]*CommandInfo -} - -var _ Cmder = (*CommandsInfoCmd)(nil) - -func NewCommandsInfoCmd(args ...interface{}) *CommandsInfoCmd { - return &CommandsInfoCmd{ - baseCmd: baseCmd{args: args}, - } -} - -func (cmd *CommandsInfoCmd) Val() map[string]*CommandInfo { - return cmd.val -} - -func (cmd *CommandsInfoCmd) Result() (map[string]*CommandInfo, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *CommandsInfoCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { - _, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]*CommandInfo, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(commandInfoParser) - if err != nil { - return nil, err - } - vv := v.(*CommandInfo) - cmd.val[vv.Name] = vv - } - return nil, nil - }) - return cmd.err -} - -func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { - const numArgRedis5 = 6 - const numArgRedis6 = 7 - - switch n { - case numArgRedis5, numArgRedis6: - // continue - default: - return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 7", n) - } - - var cmd CommandInfo - var err error - - cmd.Name, err = rd.ReadString() - if err != nil { - return nil, err - } - - arity, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.Arity = int8(arity) - - _, err = rd.ReadReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.Flags = make([]string, n) - for i := 0; i < len(cmd.Flags); i++ { - switch s, err := rd.ReadString(); { - case err == Nil: - cmd.Flags[i] = "" - case err != nil: - return nil, err - default: - cmd.Flags[i] = s - } - } - return nil, nil - }) - if err != nil { - return nil, err - } - - firstKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.FirstKeyPos = int8(firstKeyPos) - - lastKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.LastKeyPos = int8(lastKeyPos) - - stepCount, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.StepCount = int8(stepCount) - - for _, flag := range cmd.Flags { - if flag == "readonly" { - cmd.ReadOnly = true - break - } - } - - if n == numArgRedis5 { - return &cmd, nil - } - - _, err = rd.ReadReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.ACLFlags = make([]string, n) - for i := 0; i < len(cmd.ACLFlags); i++ { - switch s, err := rd.ReadString(); { - case err == Nil: - cmd.ACLFlags[i] = "" - case err != nil: - return nil, err - default: - cmd.ACLFlags[i] = s - } - } - return nil, nil - }) - if err != nil { - return nil, err - } - - return &cmd, nil -} - -//------------------------------------------------------------------------------ - -type cmdsInfoCache struct { - fn func() (map[string]*CommandInfo, error) - - once internal.Once - cmds map[string]*CommandInfo -} - -func newCmdsInfoCache(fn func() (map[string]*CommandInfo, error)) *cmdsInfoCache { - return &cmdsInfoCache{ - fn: fn, - } -} - -func (c *cmdsInfoCache) Get() (map[string]*CommandInfo, error) { - err := c.once.Do(func() error { - cmds, err := c.fn() - if err != nil { - return err - } - - // Extensions have cmd names in upper case. Convert them to lower case. - for k, v := range cmds { - lower := internal.ToLower(k) - if lower != k { - cmds[lower] = v - } - } - - c.cmds = cmds - return nil - }) - return c.cmds, err -} diff --git a/vendor/github.com/go-redis/redis/v7/commands.go b/vendor/github.com/go-redis/redis/v7/commands.go deleted file mode 100644 index da5ceda13e..0000000000 --- a/vendor/github.com/go-redis/redis/v7/commands.go +++ /dev/null @@ -1,2643 +0,0 @@ -package redis - -import ( - "errors" - "io" - "time" - - "github.com/go-redis/redis/v7/internal" -) - -func usePrecise(dur time.Duration) bool { - return dur < time.Second || dur%time.Second != 0 -} - -func formatMs(dur time.Duration) int64 { - if dur > 0 && dur < time.Millisecond { - internal.Logger.Printf( - "specified duration is %s, but minimal supported value is %s", - dur, time.Millisecond, - ) - } - return int64(dur / time.Millisecond) -} - -func formatSec(dur time.Duration) int64 { - if dur > 0 && dur < time.Second { - internal.Logger.Printf( - "specified duration is %s, but minimal supported value is %s", - dur, time.Second, - ) - } - return int64(dur / time.Second) -} - -func appendArgs(dst, src []interface{}) []interface{} { - if len(src) == 1 { - switch v := src[0].(type) { - case []string: - for _, s := range v { - dst = append(dst, s) - } - return dst - case map[string]interface{}: - for k, v := range v { - dst = append(dst, k, v) - } - return dst - } - } - - dst = append(dst, src...) - return dst -} - -type Cmdable interface { - Pipeline() Pipeliner - Pipelined(fn func(Pipeliner) error) ([]Cmder, error) - - TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) - TxPipeline() Pipeliner - - Command() *CommandsInfoCmd - ClientGetName() *StringCmd - Echo(message interface{}) *StringCmd - Ping() *StatusCmd - Quit() *StatusCmd - Del(keys ...string) *IntCmd - Unlink(keys ...string) *IntCmd - Dump(key string) *StringCmd - Exists(keys ...string) *IntCmd - Expire(key string, expiration time.Duration) *BoolCmd - ExpireAt(key string, tm time.Time) *BoolCmd - Keys(pattern string) *StringSliceCmd - Migrate(host, port, key string, db int, timeout time.Duration) *StatusCmd - Move(key string, db int) *BoolCmd - ObjectRefCount(key string) *IntCmd - ObjectEncoding(key string) *StringCmd - ObjectIdleTime(key string) *DurationCmd - Persist(key string) *BoolCmd - PExpire(key string, expiration time.Duration) *BoolCmd - PExpireAt(key string, tm time.Time) *BoolCmd - PTTL(key string) *DurationCmd - RandomKey() *StringCmd - Rename(key, newkey string) *StatusCmd - RenameNX(key, newkey string) *BoolCmd - Restore(key string, ttl time.Duration, value string) *StatusCmd - RestoreReplace(key string, ttl time.Duration, value string) *StatusCmd - Sort(key string, sort *Sort) *StringSliceCmd - SortStore(key, store string, sort *Sort) *IntCmd - SortInterfaces(key string, sort *Sort) *SliceCmd - Touch(keys ...string) *IntCmd - TTL(key string) *DurationCmd - Type(key string) *StatusCmd - Scan(cursor uint64, match string, count int64) *ScanCmd - SScan(key string, cursor uint64, match string, count int64) *ScanCmd - HScan(key string, cursor uint64, match string, count int64) *ScanCmd - ZScan(key string, cursor uint64, match string, count int64) *ScanCmd - Append(key, value string) *IntCmd - BitCount(key string, bitCount *BitCount) *IntCmd - BitOpAnd(destKey string, keys ...string) *IntCmd - BitOpOr(destKey string, keys ...string) *IntCmd - BitOpXor(destKey string, keys ...string) *IntCmd - BitOpNot(destKey string, key string) *IntCmd - BitPos(key string, bit int64, pos ...int64) *IntCmd - BitField(key string, args ...interface{}) *IntSliceCmd - Decr(key string) *IntCmd - DecrBy(key string, decrement int64) *IntCmd - Get(key string) *StringCmd - GetBit(key string, offset int64) *IntCmd - GetRange(key string, start, end int64) *StringCmd - GetSet(key string, value interface{}) *StringCmd - Incr(key string) *IntCmd - IncrBy(key string, value int64) *IntCmd - IncrByFloat(key string, value float64) *FloatCmd - MGet(keys ...string) *SliceCmd - MSet(values ...interface{}) *StatusCmd - MSetNX(values ...interface{}) *BoolCmd - Set(key string, value interface{}, expiration time.Duration) *StatusCmd - SetBit(key string, offset int64, value int) *IntCmd - SetNX(key string, value interface{}, expiration time.Duration) *BoolCmd - SetXX(key string, value interface{}, expiration time.Duration) *BoolCmd - SetRange(key string, offset int64, value string) *IntCmd - StrLen(key string) *IntCmd - HDel(key string, fields ...string) *IntCmd - HExists(key, field string) *BoolCmd - HGet(key, field string) *StringCmd - HGetAll(key string) *StringStringMapCmd - HIncrBy(key, field string, incr int64) *IntCmd - HIncrByFloat(key, field string, incr float64) *FloatCmd - HKeys(key string) *StringSliceCmd - HLen(key string) *IntCmd - HMGet(key string, fields ...string) *SliceCmd - HSet(key string, values ...interface{}) *IntCmd - HMSet(key string, values ...interface{}) *BoolCmd - HSetNX(key, field string, value interface{}) *BoolCmd - HVals(key string) *StringSliceCmd - BLPop(timeout time.Duration, keys ...string) *StringSliceCmd - BRPop(timeout time.Duration, keys ...string) *StringSliceCmd - BRPopLPush(source, destination string, timeout time.Duration) *StringCmd - LIndex(key string, index int64) *StringCmd - LInsert(key, op string, pivot, value interface{}) *IntCmd - LInsertBefore(key string, pivot, value interface{}) *IntCmd - LInsertAfter(key string, pivot, value interface{}) *IntCmd - LLen(key string) *IntCmd - LPop(key string) *StringCmd - LPush(key string, values ...interface{}) *IntCmd - LPushX(key string, values ...interface{}) *IntCmd - LRange(key string, start, stop int64) *StringSliceCmd - LRem(key string, count int64, value interface{}) *IntCmd - LSet(key string, index int64, value interface{}) *StatusCmd - LTrim(key string, start, stop int64) *StatusCmd - RPop(key string) *StringCmd - RPopLPush(source, destination string) *StringCmd - RPush(key string, values ...interface{}) *IntCmd - RPushX(key string, values ...interface{}) *IntCmd - SAdd(key string, members ...interface{}) *IntCmd - SCard(key string) *IntCmd - SDiff(keys ...string) *StringSliceCmd - SDiffStore(destination string, keys ...string) *IntCmd - SInter(keys ...string) *StringSliceCmd - SInterStore(destination string, keys ...string) *IntCmd - SIsMember(key string, member interface{}) *BoolCmd - SMembers(key string) *StringSliceCmd - SMembersMap(key string) *StringStructMapCmd - SMove(source, destination string, member interface{}) *BoolCmd - SPop(key string) *StringCmd - SPopN(key string, count int64) *StringSliceCmd - SRandMember(key string) *StringCmd - SRandMemberN(key string, count int64) *StringSliceCmd - SRem(key string, members ...interface{}) *IntCmd - SUnion(keys ...string) *StringSliceCmd - SUnionStore(destination string, keys ...string) *IntCmd - XAdd(a *XAddArgs) *StringCmd - XDel(stream string, ids ...string) *IntCmd - XLen(stream string) *IntCmd - XRange(stream, start, stop string) *XMessageSliceCmd - XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd - XRevRange(stream string, start, stop string) *XMessageSliceCmd - XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd - XRead(a *XReadArgs) *XStreamSliceCmd - XReadStreams(streams ...string) *XStreamSliceCmd - XGroupCreate(stream, group, start string) *StatusCmd - XGroupCreateMkStream(stream, group, start string) *StatusCmd - XGroupSetID(stream, group, start string) *StatusCmd - XGroupDestroy(stream, group string) *IntCmd - XGroupDelConsumer(stream, group, consumer string) *IntCmd - XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd - XAck(stream, group string, ids ...string) *IntCmd - XPending(stream, group string) *XPendingCmd - XPendingExt(a *XPendingExtArgs) *XPendingExtCmd - XClaim(a *XClaimArgs) *XMessageSliceCmd - XClaimJustID(a *XClaimArgs) *StringSliceCmd - XTrim(key string, maxLen int64) *IntCmd - XTrimApprox(key string, maxLen int64) *IntCmd - XInfoGroups(key string) *XInfoGroupsCmd - BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd - BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd - ZAdd(key string, members ...*Z) *IntCmd - ZAddNX(key string, members ...*Z) *IntCmd - ZAddXX(key string, members ...*Z) *IntCmd - ZAddCh(key string, members ...*Z) *IntCmd - ZAddNXCh(key string, members ...*Z) *IntCmd - ZAddXXCh(key string, members ...*Z) *IntCmd - ZIncr(key string, member *Z) *FloatCmd - ZIncrNX(key string, member *Z) *FloatCmd - ZIncrXX(key string, member *Z) *FloatCmd - ZCard(key string) *IntCmd - ZCount(key, min, max string) *IntCmd - ZLexCount(key, min, max string) *IntCmd - ZIncrBy(key string, increment float64, member string) *FloatCmd - ZInterStore(destination string, store *ZStore) *IntCmd - ZPopMax(key string, count ...int64) *ZSliceCmd - ZPopMin(key string, count ...int64) *ZSliceCmd - ZRange(key string, start, stop int64) *StringSliceCmd - ZRangeWithScores(key string, start, stop int64) *ZSliceCmd - ZRangeByScore(key string, opt *ZRangeBy) *StringSliceCmd - ZRangeByLex(key string, opt *ZRangeBy) *StringSliceCmd - ZRangeByScoreWithScores(key string, opt *ZRangeBy) *ZSliceCmd - ZRank(key, member string) *IntCmd - ZRem(key string, members ...interface{}) *IntCmd - ZRemRangeByRank(key string, start, stop int64) *IntCmd - ZRemRangeByScore(key, min, max string) *IntCmd - ZRemRangeByLex(key, min, max string) *IntCmd - ZRevRange(key string, start, stop int64) *StringSliceCmd - ZRevRangeWithScores(key string, start, stop int64) *ZSliceCmd - ZRevRangeByScore(key string, opt *ZRangeBy) *StringSliceCmd - ZRevRangeByLex(key string, opt *ZRangeBy) *StringSliceCmd - ZRevRangeByScoreWithScores(key string, opt *ZRangeBy) *ZSliceCmd - ZRevRank(key, member string) *IntCmd - ZScore(key, member string) *FloatCmd - ZUnionStore(dest string, store *ZStore) *IntCmd - PFAdd(key string, els ...interface{}) *IntCmd - PFCount(keys ...string) *IntCmd - PFMerge(dest string, keys ...string) *StatusCmd - BgRewriteAOF() *StatusCmd - BgSave() *StatusCmd - ClientKill(ipPort string) *StatusCmd - ClientKillByFilter(keys ...string) *IntCmd - ClientList() *StringCmd - ClientPause(dur time.Duration) *BoolCmd - ClientID() *IntCmd - ConfigGet(parameter string) *SliceCmd - ConfigResetStat() *StatusCmd - ConfigSet(parameter, value string) *StatusCmd - ConfigRewrite() *StatusCmd - DBSize() *IntCmd - FlushAll() *StatusCmd - FlushAllAsync() *StatusCmd - FlushDB() *StatusCmd - FlushDBAsync() *StatusCmd - Info(section ...string) *StringCmd - LastSave() *IntCmd - Save() *StatusCmd - Shutdown() *StatusCmd - ShutdownSave() *StatusCmd - ShutdownNoSave() *StatusCmd - SlaveOf(host, port string) *StatusCmd - Time() *TimeCmd - Eval(script string, keys []string, args ...interface{}) *Cmd - EvalSha(sha1 string, keys []string, args ...interface{}) *Cmd - ScriptExists(hashes ...string) *BoolSliceCmd - ScriptFlush() *StatusCmd - ScriptKill() *StatusCmd - ScriptLoad(script string) *StringCmd - DebugObject(key string) *StringCmd - Publish(channel string, message interface{}) *IntCmd - PubSubChannels(pattern string) *StringSliceCmd - PubSubNumSub(channels ...string) *StringIntMapCmd - PubSubNumPat() *IntCmd - ClusterSlots() *ClusterSlotsCmd - ClusterNodes() *StringCmd - ClusterMeet(host, port string) *StatusCmd - ClusterForget(nodeID string) *StatusCmd - ClusterReplicate(nodeID string) *StatusCmd - ClusterResetSoft() *StatusCmd - ClusterResetHard() *StatusCmd - ClusterInfo() *StringCmd - ClusterKeySlot(key string) *IntCmd - ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd - ClusterCountFailureReports(nodeID string) *IntCmd - ClusterCountKeysInSlot(slot int) *IntCmd - ClusterDelSlots(slots ...int) *StatusCmd - ClusterDelSlotsRange(min, max int) *StatusCmd - ClusterSaveConfig() *StatusCmd - ClusterSlaves(nodeID string) *StringSliceCmd - ClusterFailover() *StatusCmd - ClusterAddSlots(slots ...int) *StatusCmd - ClusterAddSlotsRange(min, max int) *StatusCmd - GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd - GeoPos(key string, members ...string) *GeoPosCmd - GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd - GeoRadiusStore(key string, longitude, latitude float64, query *GeoRadiusQuery) *IntCmd - GeoRadiusByMember(key, member string, query *GeoRadiusQuery) *GeoLocationCmd - GeoRadiusByMemberStore(key, member string, query *GeoRadiusQuery) *IntCmd - GeoDist(key string, member1, member2, unit string) *FloatCmd - GeoHash(key string, members ...string) *StringSliceCmd - ReadOnly() *StatusCmd - ReadWrite() *StatusCmd - MemoryUsage(key string, samples ...int) *IntCmd -} - -type StatefulCmdable interface { - Cmdable - Auth(password string) *StatusCmd - AuthACL(username, password string) *StatusCmd - Select(index int) *StatusCmd - SwapDB(index1, index2 int) *StatusCmd - ClientSetName(name string) *BoolCmd -} - -var _ Cmdable = (*Client)(nil) -var _ Cmdable = (*Tx)(nil) -var _ Cmdable = (*Ring)(nil) -var _ Cmdable = (*ClusterClient)(nil) - -type cmdable func(cmd Cmder) error - -type statefulCmdable func(cmd Cmder) error - -//------------------------------------------------------------------------------ - -func (c statefulCmdable) Auth(password string) *StatusCmd { - cmd := NewStatusCmd("auth", password) - _ = c(cmd) - return cmd -} - -// Perform an AUTH command, using the given user and pass. -// Should be used to authenticate the current connection with one of the connections defined in the ACL list -// when connecting to a Redis 6.0 instance, or greater, that is using the Redis ACL system. -func (c statefulCmdable) AuthACL(username, password string) *StatusCmd { - cmd := NewStatusCmd("auth", username, password) - _ = c(cmd) - return cmd -} - -func (c cmdable) Echo(message interface{}) *StringCmd { - cmd := NewStringCmd("echo", message) - _ = c(cmd) - return cmd -} - -func (c cmdable) Ping() *StatusCmd { - cmd := NewStatusCmd("ping") - _ = c(cmd) - return cmd -} - -func (c cmdable) Wait(numSlaves int, timeout time.Duration) *IntCmd { - cmd := NewIntCmd("wait", numSlaves, int(timeout/time.Millisecond)) - _ = c(cmd) - return cmd -} - -func (c cmdable) Quit() *StatusCmd { - panic("not implemented") -} - -func (c statefulCmdable) Select(index int) *StatusCmd { - cmd := NewStatusCmd("select", index) - _ = c(cmd) - return cmd -} - -func (c statefulCmdable) SwapDB(index1, index2 int) *StatusCmd { - cmd := NewStatusCmd("swapdb", index1, index2) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) Command() *CommandsInfoCmd { - cmd := NewCommandsInfoCmd("command") - _ = c(cmd) - return cmd -} - -func (c cmdable) Del(keys ...string) *IntCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "del" - for i, key := range keys { - args[1+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) Unlink(keys ...string) *IntCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "unlink" - for i, key := range keys { - args[1+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) Dump(key string) *StringCmd { - cmd := NewStringCmd("dump", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) Exists(keys ...string) *IntCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "exists" - for i, key := range keys { - args[1+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) Expire(key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd("expire", key, formatSec(expiration)) - _ = c(cmd) - return cmd -} - -func (c cmdable) ExpireAt(key string, tm time.Time) *BoolCmd { - cmd := NewBoolCmd("expireat", key, tm.Unix()) - _ = c(cmd) - return cmd -} - -func (c cmdable) Keys(pattern string) *StringSliceCmd { - cmd := NewStringSliceCmd("keys", pattern) - _ = c(cmd) - return cmd -} - -func (c cmdable) Migrate(host, port, key string, db int, timeout time.Duration) *StatusCmd { - cmd := NewStatusCmd( - "migrate", - host, - port, - key, - db, - formatMs(timeout), - ) - cmd.setReadTimeout(timeout) - _ = c(cmd) - return cmd -} - -func (c cmdable) Move(key string, db int) *BoolCmd { - cmd := NewBoolCmd("move", key, db) - _ = c(cmd) - return cmd -} - -func (c cmdable) ObjectRefCount(key string) *IntCmd { - cmd := NewIntCmd("object", "refcount", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) ObjectEncoding(key string) *StringCmd { - cmd := NewStringCmd("object", "encoding", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) ObjectIdleTime(key string) *DurationCmd { - cmd := NewDurationCmd(time.Second, "object", "idletime", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) Persist(key string) *BoolCmd { - cmd := NewBoolCmd("persist", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) PExpire(key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd("pexpire", key, formatMs(expiration)) - _ = c(cmd) - return cmd -} - -func (c cmdable) PExpireAt(key string, tm time.Time) *BoolCmd { - cmd := NewBoolCmd( - "pexpireat", - key, - tm.UnixNano()/int64(time.Millisecond), - ) - _ = c(cmd) - return cmd -} - -func (c cmdable) PTTL(key string) *DurationCmd { - cmd := NewDurationCmd(time.Millisecond, "pttl", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) RandomKey() *StringCmd { - cmd := NewStringCmd("randomkey") - _ = c(cmd) - return cmd -} - -func (c cmdable) Rename(key, newkey string) *StatusCmd { - cmd := NewStatusCmd("rename", key, newkey) - _ = c(cmd) - return cmd -} - -func (c cmdable) RenameNX(key, newkey string) *BoolCmd { - cmd := NewBoolCmd("renamenx", key, newkey) - _ = c(cmd) - return cmd -} - -func (c cmdable) Restore(key string, ttl time.Duration, value string) *StatusCmd { - cmd := NewStatusCmd( - "restore", - key, - formatMs(ttl), - value, - ) - _ = c(cmd) - return cmd -} - -func (c cmdable) RestoreReplace(key string, ttl time.Duration, value string) *StatusCmd { - cmd := NewStatusCmd( - "restore", - key, - formatMs(ttl), - value, - "replace", - ) - _ = c(cmd) - return cmd -} - -type Sort struct { - By string - Offset, Count int64 - Get []string - Order string - Alpha bool -} - -func (sort *Sort) args(key string) []interface{} { - args := []interface{}{"sort", key} - if sort.By != "" { - args = append(args, "by", sort.By) - } - if sort.Offset != 0 || sort.Count != 0 { - args = append(args, "limit", sort.Offset, sort.Count) - } - for _, get := range sort.Get { - args = append(args, "get", get) - } - if sort.Order != "" { - args = append(args, sort.Order) - } - if sort.Alpha { - args = append(args, "alpha") - } - return args -} - -func (c cmdable) Sort(key string, sort *Sort) *StringSliceCmd { - cmd := NewStringSliceCmd(sort.args(key)...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SortStore(key, store string, sort *Sort) *IntCmd { - args := sort.args(key) - if store != "" { - args = append(args, "store", store) - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SortInterfaces(key string, sort *Sort) *SliceCmd { - cmd := NewSliceCmd(sort.args(key)...) - _ = c(cmd) - return cmd -} - -func (c cmdable) Touch(keys ...string) *IntCmd { - args := make([]interface{}, len(keys)+1) - args[0] = "touch" - for i, key := range keys { - args[i+1] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) TTL(key string) *DurationCmd { - cmd := NewDurationCmd(time.Second, "ttl", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) Type(key string) *StatusCmd { - cmd := NewStatusCmd("type", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) Scan(cursor uint64, match string, count int64) *ScanCmd { - args := []interface{}{"scan", cursor} - if match != "" { - args = append(args, "match", match) - } - if count > 0 { - args = append(args, "count", count) - } - cmd := NewScanCmd(c, args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SScan(key string, cursor uint64, match string, count int64) *ScanCmd { - args := []interface{}{"sscan", key, cursor} - if match != "" { - args = append(args, "match", match) - } - if count > 0 { - args = append(args, "count", count) - } - cmd := NewScanCmd(c, args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) HScan(key string, cursor uint64, match string, count int64) *ScanCmd { - args := []interface{}{"hscan", key, cursor} - if match != "" { - args = append(args, "match", match) - } - if count > 0 { - args = append(args, "count", count) - } - cmd := NewScanCmd(c, args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZScan(key string, cursor uint64, match string, count int64) *ScanCmd { - args := []interface{}{"zscan", key, cursor} - if match != "" { - args = append(args, "match", match) - } - if count > 0 { - args = append(args, "count", count) - } - cmd := NewScanCmd(c, args...) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) Append(key, value string) *IntCmd { - cmd := NewIntCmd("append", key, value) - _ = c(cmd) - return cmd -} - -type BitCount struct { - Start, End int64 -} - -func (c cmdable) BitCount(key string, bitCount *BitCount) *IntCmd { - args := []interface{}{"bitcount", key} - if bitCount != nil { - args = append( - args, - bitCount.Start, - bitCount.End, - ) - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) bitOp(op, destKey string, keys ...string) *IntCmd { - args := make([]interface{}, 3+len(keys)) - args[0] = "bitop" - args[1] = op - args[2] = destKey - for i, key := range keys { - args[3+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) BitOpAnd(destKey string, keys ...string) *IntCmd { - return c.bitOp("and", destKey, keys...) -} - -func (c cmdable) BitOpOr(destKey string, keys ...string) *IntCmd { - return c.bitOp("or", destKey, keys...) -} - -func (c cmdable) BitOpXor(destKey string, keys ...string) *IntCmd { - return c.bitOp("xor", destKey, keys...) -} - -func (c cmdable) BitOpNot(destKey string, key string) *IntCmd { - return c.bitOp("not", destKey, key) -} - -func (c cmdable) BitPos(key string, bit int64, pos ...int64) *IntCmd { - args := make([]interface{}, 3+len(pos)) - args[0] = "bitpos" - args[1] = key - args[2] = bit - switch len(pos) { - case 0: - case 1: - args[3] = pos[0] - case 2: - args[3] = pos[0] - args[4] = pos[1] - default: - panic("too many arguments") - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) BitField(key string, args ...interface{}) *IntSliceCmd { - a := make([]interface{}, 0, 2+len(args)) - a = append(a, "bitfield") - a = append(a, key) - a = append(a, args...) - cmd := NewIntSliceCmd(a...) - _ = c(cmd) - return cmd -} - -func (c cmdable) Decr(key string) *IntCmd { - cmd := NewIntCmd("decr", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) DecrBy(key string, decrement int64) *IntCmd { - cmd := NewIntCmd("decrby", key, decrement) - _ = c(cmd) - return cmd -} - -// Redis `GET key` command. It returns redis.Nil error when key does not exist. -func (c cmdable) Get(key string) *StringCmd { - cmd := NewStringCmd("get", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) GetBit(key string, offset int64) *IntCmd { - cmd := NewIntCmd("getbit", key, offset) - _ = c(cmd) - return cmd -} - -func (c cmdable) GetRange(key string, start, end int64) *StringCmd { - cmd := NewStringCmd("getrange", key, start, end) - _ = c(cmd) - return cmd -} - -func (c cmdable) GetSet(key string, value interface{}) *StringCmd { - cmd := NewStringCmd("getset", key, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) Incr(key string) *IntCmd { - cmd := NewIntCmd("incr", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) IncrBy(key string, value int64) *IntCmd { - cmd := NewIntCmd("incrby", key, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) IncrByFloat(key string, value float64) *FloatCmd { - cmd := NewFloatCmd("incrbyfloat", key, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) MGet(keys ...string) *SliceCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "mget" - for i, key := range keys { - args[1+i] = key - } - cmd := NewSliceCmd(args...) - _ = c(cmd) - return cmd -} - -// MSet is like Set but accepts multiple values: -// - MSet("key1", "value1", "key2", "value2") -// - MSet([]string{"key1", "value1", "key2", "value2"}) -// - MSet(map[string]interface{}{"key1": "value1", "key2": "value2"}) -func (c cmdable) MSet(values ...interface{}) *StatusCmd { - args := make([]interface{}, 1, 1+len(values)) - args[0] = "mset" - args = appendArgs(args, values) - cmd := NewStatusCmd(args...) - _ = c(cmd) - return cmd -} - -// MSetNX is like SetNX but accepts multiple values: -// - MSetNX("key1", "value1", "key2", "value2") -// - MSetNX([]string{"key1", "value1", "key2", "value2"}) -// - MSetNX(map[string]interface{}{"key1": "value1", "key2": "value2"}) -func (c cmdable) MSetNX(values ...interface{}) *BoolCmd { - args := make([]interface{}, 1, 1+len(values)) - args[0] = "msetnx" - args = appendArgs(args, values) - cmd := NewBoolCmd(args...) - _ = c(cmd) - return cmd -} - -// Redis `SET key value [expiration]` command. -// -// Use expiration for `SETEX`-like behavior. -// Zero expiration means the key has no expiration time. -func (c cmdable) Set(key string, value interface{}, expiration time.Duration) *StatusCmd { - args := make([]interface{}, 3, 5) - args[0] = "set" - args[1] = key - args[2] = value - if expiration > 0 { - if usePrecise(expiration) { - args = append(args, "px", formatMs(expiration)) - } else { - args = append(args, "ex", formatSec(expiration)) - } - } - cmd := NewStatusCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SetBit(key string, offset int64, value int) *IntCmd { - cmd := NewIntCmd( - "setbit", - key, - offset, - value, - ) - _ = c(cmd) - return cmd -} - -// Redis `SET key value [expiration] NX` command. -// -// Zero expiration means the key has no expiration time. -func (c cmdable) SetNX(key string, value interface{}, expiration time.Duration) *BoolCmd { - var cmd *BoolCmd - if expiration == 0 { - // Use old `SETNX` to support old Redis versions. - cmd = NewBoolCmd("setnx", key, value) - } else { - if usePrecise(expiration) { - cmd = NewBoolCmd("set", key, value, "px", formatMs(expiration), "nx") - } else { - cmd = NewBoolCmd("set", key, value, "ex", formatSec(expiration), "nx") - } - } - _ = c(cmd) - return cmd -} - -// Redis `SET key value [expiration] XX` command. -// -// Zero expiration means the key has no expiration time. -func (c cmdable) SetXX(key string, value interface{}, expiration time.Duration) *BoolCmd { - var cmd *BoolCmd - if expiration == 0 { - cmd = NewBoolCmd("set", key, value, "xx") - } else { - if usePrecise(expiration) { - cmd = NewBoolCmd("set", key, value, "px", formatMs(expiration), "xx") - } else { - cmd = NewBoolCmd("set", key, value, "ex", formatSec(expiration), "xx") - } - } - _ = c(cmd) - return cmd -} - -func (c cmdable) SetRange(key string, offset int64, value string) *IntCmd { - cmd := NewIntCmd("setrange", key, offset, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) StrLen(key string) *IntCmd { - cmd := NewIntCmd("strlen", key) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) HDel(key string, fields ...string) *IntCmd { - args := make([]interface{}, 2+len(fields)) - args[0] = "hdel" - args[1] = key - for i, field := range fields { - args[2+i] = field - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) HExists(key, field string) *BoolCmd { - cmd := NewBoolCmd("hexists", key, field) - _ = c(cmd) - return cmd -} - -func (c cmdable) HGet(key, field string) *StringCmd { - cmd := NewStringCmd("hget", key, field) - _ = c(cmd) - return cmd -} - -func (c cmdable) HGetAll(key string) *StringStringMapCmd { - cmd := NewStringStringMapCmd("hgetall", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) HIncrBy(key, field string, incr int64) *IntCmd { - cmd := NewIntCmd("hincrby", key, field, incr) - _ = c(cmd) - return cmd -} - -func (c cmdable) HIncrByFloat(key, field string, incr float64) *FloatCmd { - cmd := NewFloatCmd("hincrbyfloat", key, field, incr) - _ = c(cmd) - return cmd -} - -func (c cmdable) HKeys(key string) *StringSliceCmd { - cmd := NewStringSliceCmd("hkeys", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) HLen(key string) *IntCmd { - cmd := NewIntCmd("hlen", key) - _ = c(cmd) - return cmd -} - -// HMGet returns the values for the specified fields in the hash stored at key. -// It returns an interface{} to distinguish between empty string and nil value. -func (c cmdable) HMGet(key string, fields ...string) *SliceCmd { - args := make([]interface{}, 2+len(fields)) - args[0] = "hmget" - args[1] = key - for i, field := range fields { - args[2+i] = field - } - cmd := NewSliceCmd(args...) - _ = c(cmd) - return cmd -} - -// HSet accepts values in following formats: -// - HMSet("myhash", "key1", "value1", "key2", "value2") -// - HMSet("myhash", []string{"key1", "value1", "key2", "value2"}) -// - HMSet("myhash", map[string]interface{}{"key1": "value1", "key2": "value2"}) -// -// Note that it requires Redis v4 for multiple field/value pairs support. -func (c cmdable) HSet(key string, values ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(values)) - args[0] = "hset" - args[1] = key - args = appendArgs(args, values) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -// HMSet is a deprecated version of HSet left for compatibility with Redis 3. -func (c cmdable) HMSet(key string, values ...interface{}) *BoolCmd { - args := make([]interface{}, 2, 2+len(values)) - args[0] = "hmset" - args[1] = key - args = appendArgs(args, values) - cmd := NewBoolCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) HSetNX(key, field string, value interface{}) *BoolCmd { - cmd := NewBoolCmd("hsetnx", key, field, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) HVals(key string) *StringSliceCmd { - cmd := NewStringSliceCmd("hvals", key) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd { - args := make([]interface{}, 1+len(keys)+1) - args[0] = "blpop" - for i, key := range keys { - args[1+i] = key - } - args[len(args)-1] = formatSec(timeout) - cmd := NewStringSliceCmd(args...) - cmd.setReadTimeout(timeout) - _ = c(cmd) - return cmd -} - -func (c cmdable) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd { - args := make([]interface{}, 1+len(keys)+1) - args[0] = "brpop" - for i, key := range keys { - args[1+i] = key - } - args[len(keys)+1] = formatSec(timeout) - cmd := NewStringSliceCmd(args...) - cmd.setReadTimeout(timeout) - _ = c(cmd) - return cmd -} - -func (c cmdable) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd { - cmd := NewStringCmd( - "brpoplpush", - source, - destination, - formatSec(timeout), - ) - cmd.setReadTimeout(timeout) - _ = c(cmd) - return cmd -} - -func (c cmdable) LIndex(key string, index int64) *StringCmd { - cmd := NewStringCmd("lindex", key, index) - _ = c(cmd) - return cmd -} - -func (c cmdable) LInsert(key, op string, pivot, value interface{}) *IntCmd { - cmd := NewIntCmd("linsert", key, op, pivot, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) LInsertBefore(key string, pivot, value interface{}) *IntCmd { - cmd := NewIntCmd("linsert", key, "before", pivot, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) LInsertAfter(key string, pivot, value interface{}) *IntCmd { - cmd := NewIntCmd("linsert", key, "after", pivot, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) LLen(key string) *IntCmd { - cmd := NewIntCmd("llen", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) LPop(key string) *StringCmd { - cmd := NewStringCmd("lpop", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) LPush(key string, values ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(values)) - args[0] = "lpush" - args[1] = key - args = appendArgs(args, values) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) LPushX(key string, values ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(values)) - args[0] = "lpushx" - args[1] = key - args = appendArgs(args, values) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) LRange(key string, start, stop int64) *StringSliceCmd { - cmd := NewStringSliceCmd( - "lrange", - key, - start, - stop, - ) - _ = c(cmd) - return cmd -} - -func (c cmdable) LRem(key string, count int64, value interface{}) *IntCmd { - cmd := NewIntCmd("lrem", key, count, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) LSet(key string, index int64, value interface{}) *StatusCmd { - cmd := NewStatusCmd("lset", key, index, value) - _ = c(cmd) - return cmd -} - -func (c cmdable) LTrim(key string, start, stop int64) *StatusCmd { - cmd := NewStatusCmd( - "ltrim", - key, - start, - stop, - ) - _ = c(cmd) - return cmd -} - -func (c cmdable) RPop(key string) *StringCmd { - cmd := NewStringCmd("rpop", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) RPopLPush(source, destination string) *StringCmd { - cmd := NewStringCmd("rpoplpush", source, destination) - _ = c(cmd) - return cmd -} - -func (c cmdable) RPush(key string, values ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(values)) - args[0] = "rpush" - args[1] = key - args = appendArgs(args, values) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) RPushX(key string, values ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(values)) - args[0] = "rpushx" - args[1] = key - args = appendArgs(args, values) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) SAdd(key string, members ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(members)) - args[0] = "sadd" - args[1] = key - args = appendArgs(args, members) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SCard(key string) *IntCmd { - cmd := NewIntCmd("scard", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) SDiff(keys ...string) *StringSliceCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "sdiff" - for i, key := range keys { - args[1+i] = key - } - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SDiffStore(destination string, keys ...string) *IntCmd { - args := make([]interface{}, 2+len(keys)) - args[0] = "sdiffstore" - args[1] = destination - for i, key := range keys { - args[2+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SInter(keys ...string) *StringSliceCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "sinter" - for i, key := range keys { - args[1+i] = key - } - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SInterStore(destination string, keys ...string) *IntCmd { - args := make([]interface{}, 2+len(keys)) - args[0] = "sinterstore" - args[1] = destination - for i, key := range keys { - args[2+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SIsMember(key string, member interface{}) *BoolCmd { - cmd := NewBoolCmd("sismember", key, member) - _ = c(cmd) - return cmd -} - -// Redis `SMEMBERS key` command output as a slice -func (c cmdable) SMembers(key string) *StringSliceCmd { - cmd := NewStringSliceCmd("smembers", key) - _ = c(cmd) - return cmd -} - -// Redis `SMEMBERS key` command output as a map -func (c cmdable) SMembersMap(key string) *StringStructMapCmd { - cmd := NewStringStructMapCmd("smembers", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) SMove(source, destination string, member interface{}) *BoolCmd { - cmd := NewBoolCmd("smove", source, destination, member) - _ = c(cmd) - return cmd -} - -// Redis `SPOP key` command. -func (c cmdable) SPop(key string) *StringCmd { - cmd := NewStringCmd("spop", key) - _ = c(cmd) - return cmd -} - -// Redis `SPOP key count` command. -func (c cmdable) SPopN(key string, count int64) *StringSliceCmd { - cmd := NewStringSliceCmd("spop", key, count) - _ = c(cmd) - return cmd -} - -// Redis `SRANDMEMBER key` command. -func (c cmdable) SRandMember(key string) *StringCmd { - cmd := NewStringCmd("srandmember", key) - _ = c(cmd) - return cmd -} - -// Redis `SRANDMEMBER key count` command. -func (c cmdable) SRandMemberN(key string, count int64) *StringSliceCmd { - cmd := NewStringSliceCmd("srandmember", key, count) - _ = c(cmd) - return cmd -} - -func (c cmdable) SRem(key string, members ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(members)) - args[0] = "srem" - args[1] = key - args = appendArgs(args, members) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SUnion(keys ...string) *StringSliceCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "sunion" - for i, key := range keys { - args[1+i] = key - } - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) SUnionStore(destination string, keys ...string) *IntCmd { - args := make([]interface{}, 2+len(keys)) - args[0] = "sunionstore" - args[1] = destination - for i, key := range keys { - args[2+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -type XAddArgs struct { - Stream string - MaxLen int64 // MAXLEN N - MaxLenApprox int64 // MAXLEN ~ N - ID string - Values map[string]interface{} -} - -func (c cmdable) XAdd(a *XAddArgs) *StringCmd { - args := make([]interface{}, 0, 6+len(a.Values)*2) - args = append(args, "xadd") - args = append(args, a.Stream) - if a.MaxLen > 0 { - args = append(args, "maxlen", a.MaxLen) - } else if a.MaxLenApprox > 0 { - args = append(args, "maxlen", "~", a.MaxLenApprox) - } - if a.ID != "" { - args = append(args, a.ID) - } else { - args = append(args, "*") - } - for k, v := range a.Values { - args = append(args, k) - args = append(args, v) - } - - cmd := NewStringCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) XDel(stream string, ids ...string) *IntCmd { - args := []interface{}{"xdel", stream} - for _, id := range ids { - args = append(args, id) - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) XLen(stream string) *IntCmd { - cmd := NewIntCmd("xlen", stream) - _ = c(cmd) - return cmd -} - -func (c cmdable) XRange(stream, start, stop string) *XMessageSliceCmd { - cmd := NewXMessageSliceCmd("xrange", stream, start, stop) - _ = c(cmd) - return cmd -} - -func (c cmdable) XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { - cmd := NewXMessageSliceCmd("xrange", stream, start, stop, "count", count) - _ = c(cmd) - return cmd -} - -func (c cmdable) XRevRange(stream, start, stop string) *XMessageSliceCmd { - cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop) - _ = c(cmd) - return cmd -} - -func (c cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { - cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop, "count", count) - _ = c(cmd) - return cmd -} - -type XReadArgs struct { - Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2 - Count int64 - Block time.Duration -} - -func (c cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 5+len(a.Streams)) - args = append(args, "xread") - if a.Count > 0 { - args = append(args, "count") - args = append(args, a.Count) - } - if a.Block >= 0 { - args = append(args, "block") - args = append(args, int64(a.Block/time.Millisecond)) - } - - args = append(args, "streams") - for _, s := range a.Streams { - args = append(args, s) - } - - cmd := NewXStreamSliceCmd(args...) - if a.Block >= 0 { - cmd.setReadTimeout(a.Block) - } - _ = c(cmd) - return cmd -} - -func (c cmdable) XReadStreams(streams ...string) *XStreamSliceCmd { - return c.XRead(&XReadArgs{ - Streams: streams, - Block: -1, - }) -} - -func (c cmdable) XGroupCreate(stream, group, start string) *StatusCmd { - cmd := NewStatusCmd("xgroup", "create", stream, group, start) - _ = c(cmd) - return cmd -} - -func (c cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd { - cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream") - _ = c(cmd) - return cmd -} - -func (c cmdable) XGroupSetID(stream, group, start string) *StatusCmd { - cmd := NewStatusCmd("xgroup", "setid", stream, group, start) - _ = c(cmd) - return cmd -} - -func (c cmdable) XGroupDestroy(stream, group string) *IntCmd { - cmd := NewIntCmd("xgroup", "destroy", stream, group) - _ = c(cmd) - return cmd -} - -func (c cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { - cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer) - _ = c(cmd) - return cmd -} - -type XReadGroupArgs struct { - Group string - Consumer string - Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2 - Count int64 - Block time.Duration - NoAck bool -} - -func (c cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 8+len(a.Streams)) - args = append(args, "xreadgroup", "group", a.Group, a.Consumer) - if a.Count > 0 { - args = append(args, "count", a.Count) - } - if a.Block >= 0 { - args = append(args, "block", int64(a.Block/time.Millisecond)) - } - if a.NoAck { - args = append(args, "noack") - } - args = append(args, "streams") - for _, s := range a.Streams { - args = append(args, s) - } - - cmd := NewXStreamSliceCmd(args...) - if a.Block >= 0 { - cmd.setReadTimeout(a.Block) - } - _ = c(cmd) - return cmd -} - -func (c cmdable) XAck(stream, group string, ids ...string) *IntCmd { - args := []interface{}{"xack", stream, group} - for _, id := range ids { - args = append(args, id) - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) XPending(stream, group string) *XPendingCmd { - cmd := NewXPendingCmd("xpending", stream, group) - _ = c(cmd) - return cmd -} - -type XPendingExtArgs struct { - Stream string - Group string - Start string - End string - Count int64 - Consumer string -} - -func (c cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd { - args := make([]interface{}, 0, 7) - args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) - if a.Consumer != "" { - args = append(args, a.Consumer) - } - cmd := NewXPendingExtCmd(args...) - _ = c(cmd) - return cmd -} - -type XClaimArgs struct { - Stream string - Group string - Consumer string - MinIdle time.Duration - Messages []string -} - -func (c cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd { - args := xClaimArgs(a) - cmd := NewXMessageSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd { - args := xClaimArgs(a) - args = append(args, "justid") - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func xClaimArgs(a *XClaimArgs) []interface{} { - args := make([]interface{}, 0, 4+len(a.Messages)) - args = append(args, - "xclaim", - a.Stream, - a.Group, a.Consumer, - int64(a.MinIdle/time.Millisecond)) - for _, id := range a.Messages { - args = append(args, id) - } - return args -} - -func (c cmdable) XTrim(key string, maxLen int64) *IntCmd { - cmd := NewIntCmd("xtrim", key, "maxlen", maxLen) - _ = c(cmd) - return cmd -} - -func (c cmdable) XTrimApprox(key string, maxLen int64) *IntCmd { - cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen) - _ = c(cmd) - return cmd -} - -func (c cmdable) XInfoGroups(key string) *XInfoGroupsCmd { - cmd := NewXInfoGroupsCmd(key) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -// Z represents sorted set member. -type Z struct { - Score float64 - Member interface{} -} - -// ZWithKey represents sorted set member including the name of the key where it was popped. -type ZWithKey struct { - Z - Key string -} - -// ZStore is used as an arg to ZInterStore and ZUnionStore. -type ZStore struct { - Keys []string - Weights []float64 - // Can be SUM, MIN or MAX. - Aggregate string -} - -// Redis `BZPOPMAX key [key ...] timeout` command. -func (c cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd { - args := make([]interface{}, 1+len(keys)+1) - args[0] = "bzpopmax" - for i, key := range keys { - args[1+i] = key - } - args[len(args)-1] = formatSec(timeout) - cmd := NewZWithKeyCmd(args...) - cmd.setReadTimeout(timeout) - _ = c(cmd) - return cmd -} - -// Redis `BZPOPMIN key [key ...] timeout` command. -func (c cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd { - args := make([]interface{}, 1+len(keys)+1) - args[0] = "bzpopmin" - for i, key := range keys { - args[1+i] = key - } - args[len(args)-1] = formatSec(timeout) - cmd := NewZWithKeyCmd(args...) - cmd.setReadTimeout(timeout) - _ = c(cmd) - return cmd -} - -func (c cmdable) zAdd(a []interface{}, n int, members ...*Z) *IntCmd { - for i, m := range members { - a[n+2*i] = m.Score - a[n+2*i+1] = m.Member - } - cmd := NewIntCmd(a...) - _ = c(cmd) - return cmd -} - -// Redis `ZADD key score member [score member ...]` command. -func (c cmdable) ZAdd(key string, members ...*Z) *IntCmd { - const n = 2 - a := make([]interface{}, n+2*len(members)) - a[0], a[1] = "zadd", key - return c.zAdd(a, n, members...) -} - -// Redis `ZADD key NX score member [score member ...]` command. -func (c cmdable) ZAddNX(key string, members ...*Z) *IntCmd { - const n = 3 - a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2] = "zadd", key, "nx" - return c.zAdd(a, n, members...) -} - -// Redis `ZADD key XX score member [score member ...]` command. -func (c cmdable) ZAddXX(key string, members ...*Z) *IntCmd { - const n = 3 - a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2] = "zadd", key, "xx" - return c.zAdd(a, n, members...) -} - -// Redis `ZADD key CH score member [score member ...]` command. -func (c cmdable) ZAddCh(key string, members ...*Z) *IntCmd { - const n = 3 - a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2] = "zadd", key, "ch" - return c.zAdd(a, n, members...) -} - -// Redis `ZADD key NX CH score member [score member ...]` command. -func (c cmdable) ZAddNXCh(key string, members ...*Z) *IntCmd { - const n = 4 - a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2], a[3] = "zadd", key, "nx", "ch" - return c.zAdd(a, n, members...) -} - -// Redis `ZADD key XX CH score member [score member ...]` command. -func (c cmdable) ZAddXXCh(key string, members ...*Z) *IntCmd { - const n = 4 - a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2], a[3] = "zadd", key, "xx", "ch" - return c.zAdd(a, n, members...) -} - -func (c cmdable) zIncr(a []interface{}, n int, members ...*Z) *FloatCmd { - for i, m := range members { - a[n+2*i] = m.Score - a[n+2*i+1] = m.Member - } - cmd := NewFloatCmd(a...) - _ = c(cmd) - return cmd -} - -// Redis `ZADD key INCR score member` command. -func (c cmdable) ZIncr(key string, member *Z) *FloatCmd { - const n = 3 - a := make([]interface{}, n+2) - a[0], a[1], a[2] = "zadd", key, "incr" - return c.zIncr(a, n, member) -} - -// Redis `ZADD key NX INCR score member` command. -func (c cmdable) ZIncrNX(key string, member *Z) *FloatCmd { - const n = 4 - a := make([]interface{}, n+2) - a[0], a[1], a[2], a[3] = "zadd", key, "incr", "nx" - return c.zIncr(a, n, member) -} - -// Redis `ZADD key XX INCR score member` command. -func (c cmdable) ZIncrXX(key string, member *Z) *FloatCmd { - const n = 4 - a := make([]interface{}, n+2) - a[0], a[1], a[2], a[3] = "zadd", key, "incr", "xx" - return c.zIncr(a, n, member) -} - -func (c cmdable) ZCard(key string) *IntCmd { - cmd := NewIntCmd("zcard", key) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZCount(key, min, max string) *IntCmd { - cmd := NewIntCmd("zcount", key, min, max) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZLexCount(key, min, max string) *IntCmd { - cmd := NewIntCmd("zlexcount", key, min, max) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZIncrBy(key string, increment float64, member string) *FloatCmd { - cmd := NewFloatCmd("zincrby", key, increment, member) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZInterStore(destination string, store *ZStore) *IntCmd { - args := make([]interface{}, 3+len(store.Keys)) - args[0] = "zinterstore" - args[1] = destination - args[2] = len(store.Keys) - for i, key := range store.Keys { - args[3+i] = key - } - if len(store.Weights) > 0 { - args = append(args, "weights") - for _, weight := range store.Weights { - args = append(args, weight) - } - } - if store.Aggregate != "" { - args = append(args, "aggregate", store.Aggregate) - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd { - args := []interface{}{ - "zpopmax", - key, - } - - switch len(count) { - case 0: - break - case 1: - args = append(args, count[0]) - default: - panic("too many arguments") - } - - cmd := NewZSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd { - args := []interface{}{ - "zpopmin", - key, - } - - switch len(count) { - case 0: - break - case 1: - args = append(args, count[0]) - default: - panic("too many arguments") - } - - cmd := NewZSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { - args := []interface{}{ - "zrange", - key, - start, - stop, - } - if withScores { - args = append(args, "withscores") - } - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRange(key string, start, stop int64) *StringSliceCmd { - return c.zRange(key, start, stop, false) -} - -func (c cmdable) ZRangeWithScores(key string, start, stop int64) *ZSliceCmd { - cmd := NewZSliceCmd("zrange", key, start, stop, "withscores") - _ = c(cmd) - return cmd -} - -type ZRangeBy struct { - Min, Max string - Offset, Count int64 -} - -func (c cmdable) zRangeBy(zcmd, key string, opt *ZRangeBy, withScores bool) *StringSliceCmd { - args := []interface{}{zcmd, key, opt.Min, opt.Max} - if withScores { - args = append(args, "withscores") - } - if opt.Offset != 0 || opt.Count != 0 { - args = append( - args, - "limit", - opt.Offset, - opt.Count, - ) - } - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRangeByScore(key string, opt *ZRangeBy) *StringSliceCmd { - return c.zRangeBy("zrangebyscore", key, opt, false) -} - -func (c cmdable) ZRangeByLex(key string, opt *ZRangeBy) *StringSliceCmd { - return c.zRangeBy("zrangebylex", key, opt, false) -} - -func (c cmdable) ZRangeByScoreWithScores(key string, opt *ZRangeBy) *ZSliceCmd { - args := []interface{}{"zrangebyscore", key, opt.Min, opt.Max, "withscores"} - if opt.Offset != 0 || opt.Count != 0 { - args = append( - args, - "limit", - opt.Offset, - opt.Count, - ) - } - cmd := NewZSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRank(key, member string) *IntCmd { - cmd := NewIntCmd("zrank", key, member) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRem(key string, members ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(members)) - args[0] = "zrem" - args[1] = key - args = appendArgs(args, members) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRemRangeByRank(key string, start, stop int64) *IntCmd { - cmd := NewIntCmd( - "zremrangebyrank", - key, - start, - stop, - ) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRemRangeByScore(key, min, max string) *IntCmd { - cmd := NewIntCmd("zremrangebyscore", key, min, max) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRemRangeByLex(key, min, max string) *IntCmd { - cmd := NewIntCmd("zremrangebylex", key, min, max) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRevRange(key string, start, stop int64) *StringSliceCmd { - cmd := NewStringSliceCmd("zrevrange", key, start, stop) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRevRangeWithScores(key string, start, stop int64) *ZSliceCmd { - cmd := NewZSliceCmd("zrevrange", key, start, stop, "withscores") - _ = c(cmd) - return cmd -} - -func (c cmdable) zRevRangeBy(zcmd, key string, opt *ZRangeBy) *StringSliceCmd { - args := []interface{}{zcmd, key, opt.Max, opt.Min} - if opt.Offset != 0 || opt.Count != 0 { - args = append( - args, - "limit", - opt.Offset, - opt.Count, - ) - } - cmd := NewStringSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRevRangeByScore(key string, opt *ZRangeBy) *StringSliceCmd { - return c.zRevRangeBy("zrevrangebyscore", key, opt) -} - -func (c cmdable) ZRevRangeByLex(key string, opt *ZRangeBy) *StringSliceCmd { - return c.zRevRangeBy("zrevrangebylex", key, opt) -} - -func (c cmdable) ZRevRangeByScoreWithScores(key string, opt *ZRangeBy) *ZSliceCmd { - args := []interface{}{"zrevrangebyscore", key, opt.Max, opt.Min, "withscores"} - if opt.Offset != 0 || opt.Count != 0 { - args = append( - args, - "limit", - opt.Offset, - opt.Count, - ) - } - cmd := NewZSliceCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZRevRank(key, member string) *IntCmd { - cmd := NewIntCmd("zrevrank", key, member) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZScore(key, member string) *FloatCmd { - cmd := NewFloatCmd("zscore", key, member) - _ = c(cmd) - return cmd -} - -func (c cmdable) ZUnionStore(dest string, store *ZStore) *IntCmd { - args := make([]interface{}, 3+len(store.Keys)) - args[0] = "zunionstore" - args[1] = dest - args[2] = len(store.Keys) - for i, key := range store.Keys { - args[3+i] = key - } - if len(store.Weights) > 0 { - args = append(args, "weights") - for _, weight := range store.Weights { - args = append(args, weight) - } - } - if store.Aggregate != "" { - args = append(args, "aggregate", store.Aggregate) - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) PFAdd(key string, els ...interface{}) *IntCmd { - args := make([]interface{}, 2, 2+len(els)) - args[0] = "pfadd" - args[1] = key - args = appendArgs(args, els) - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) PFCount(keys ...string) *IntCmd { - args := make([]interface{}, 1+len(keys)) - args[0] = "pfcount" - for i, key := range keys { - args[1+i] = key - } - cmd := NewIntCmd(args...) - _ = c(cmd) - return cmd -} - -func (c cmdable) PFMerge(dest string, keys ...string) *StatusCmd { - args := make([]interface{}, 2+len(keys)) - args[0] = "pfmerge" - args[1] = dest - for i, key := range keys { - args[2+i] = key - } - cmd := NewStatusCmd(args...) - _ = c(cmd) - return cmd -} - -//------------------------------------------------------------------------------ - -func (c cmdable) BgRewriteAOF() *StatusCmd { - cmd := NewStatusCmd("bgrewriteaof") - _ = c(cmd) - return cmd -} - -func (c cmdable) BgSave() *StatusCmd { - cmd := NewStatusCmd("bgsave") - _ = c(cmd) - return cmd -} - -func (c cmdable) ClientKill(ipPort string) *StatusCmd { - cmd := NewStatusCmd("client", "kill", ipPort) - _ = c(cmd) - return cmd -} - -// ClientKillByFilter is new style synx, while the ClientKill is old -// CLIENT KILL