summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/redis.v2/pipeline.go
blob: 540d6c51d9bcb20c61ec9ffe1752d1070bee0421 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package redis

// Not thread-safe.
type Pipeline struct {
	*Client

	closed bool
}

func (c *Client) Pipeline() *Pipeline {
	return &Pipeline{
		Client: &Client{
			baseClient: &baseClient{
				opt:      c.opt,
				connPool: c.connPool,

				cmds: make([]Cmder, 0),
			},
		},
	}
}

func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
	pc := c.Pipeline()
	if err := f(pc); err != nil {
		return nil, err
	}
	cmds, err := pc.Exec()
	pc.Close()
	return cmds, err
}

func (c *Pipeline) Close() error {
	c.closed = true
	return nil
}

func (c *Pipeline) Discard() error {
	if c.closed {
		return errClosed
	}
	c.cmds = c.cmds[:0]
	return nil
}

// Exec always returns list of commands and error of the first failed
// command if any.
func (c *Pipeline) Exec() ([]Cmder, error) {
	if c.closed {
		return nil, errClosed
	}

	cmds := c.cmds
	c.cmds = make([]Cmder, 0)

	if len(cmds) == 0 {
		return []Cmder{}, nil
	}

	cn, err := c.conn()
	if err != nil {
		setCmdsErr(cmds, err)
		return cmds, err
	}

	if err := c.execCmds(cn, cmds); err != nil {
		c.freeConn(cn, err)
		return cmds, err
	}

	c.putConn(cn)
	return cmds, nil
}

func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
	if err := c.writeCmd(cn, cmds...); err != nil {
		setCmdsErr(cmds, err)
		return err
	}

	var firstCmdErr error
	for _, cmd := range cmds {
		if err := cmd.parseReply(cn.rd); err != nil {
			if firstCmdErr == nil {
				firstCmdErr = err
			}
		}
	}

	return firstCmdErr
}