aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/internal/pool/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/internal/pool/pool.go')
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go476
1 files changed, 476 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
new file mode 100644
index 0000000000..9cecee8ad4
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -0,0 +1,476 @@
+package pool
+
+import (
+ "errors"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/internal"
+)
+
+var ErrClosed = errors.New("redis: client is closed")
+var ErrPoolTimeout = errors.New("redis: connection pool timeout")
+
+var timers = sync.Pool{
+ New: func() interface{} {
+ t := time.NewTimer(time.Hour)
+ t.Stop()
+ return t
+ },
+}
+
+// Stats contains pool state information and accumulated stats.
+type Stats struct {
+ Hits uint32 // number of times free connection was found in the pool
+ Misses uint32 // number of times free connection was NOT found in the pool
+ Timeouts uint32 // number of times a wait timeout occurred
+
+ TotalConns uint32 // number of total connections in the pool
+ IdleConns uint32 // number of idle connections in the pool
+ StaleConns uint32 // number of stale connections removed from the pool
+}
+
+type Pooler interface {
+ NewConn() (*Conn, error)
+ CloseConn(*Conn) error
+
+ Get() (*Conn, error)
+ Put(*Conn)
+ Remove(*Conn)
+
+ Len() int
+ IdleLen() int
+ Stats() *Stats
+
+ Close() error
+}
+
+type Options struct {
+ Dialer func() (net.Conn, error)
+ OnClose func(*Conn) error
+
+ PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
+ PoolTimeout time.Duration
+ IdleTimeout time.Duration
+ IdleCheckFrequency time.Duration
+}
+
+type ConnPool struct {
+ opt *Options
+
+ dialErrorsNum uint32 // atomic
+
+ lastDialErrorMu sync.RWMutex
+ lastDialError error
+
+ queue chan struct{}
+
+ connsMu sync.Mutex
+ conns []*Conn
+ idleConns []*Conn
+ poolSize int
+ idleConnsLen int
+
+ stats Stats
+
+ _closed uint32 // atomic
+}
+
+var _ Pooler = (*ConnPool)(nil)
+
+func NewConnPool(opt *Options) *ConnPool {
+ p := &ConnPool{
+ opt: opt,
+
+ queue: make(chan struct{}, opt.PoolSize),
+ conns: make([]*Conn, 0, opt.PoolSize),
+ idleConns: make([]*Conn, 0, opt.PoolSize),
+ }
+
+ for i := 0; i < opt.MinIdleConns; i++ {
+ p.checkMinIdleConns()
+ }
+
+ if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
+ go p.reaper(opt.IdleCheckFrequency)
+ }
+
+ return p
+}
+
+func (p *ConnPool) checkMinIdleConns() {
+ if p.opt.MinIdleConns == 0 {
+ return
+ }
+ if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
+ p.poolSize++
+ p.idleConnsLen++
+ go p.addIdleConn()
+ }
+}
+
+func (p *ConnPool) addIdleConn() {
+ cn, err := p.newConn(true)
+ if err != nil {
+ return
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.idleConns = append(p.idleConns, cn)
+ p.connsMu.Unlock()
+}
+
+func (p *ConnPool) NewConn() (*Conn, error) {
+ return p._NewConn(false)
+}
+
+func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
+ cn, err := p.newConn(pooled)
+ if err != nil {
+ return nil, err
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ if pooled {
+ if p.poolSize < p.opt.PoolSize {
+ p.poolSize++
+ } else {
+ cn.pooled = false
+ }
+ }
+ p.connsMu.Unlock()
+ return cn, nil
+}
+
+func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
+ return nil, p.getLastDialError()
+ }
+
+ netConn, err := p.opt.Dialer()
+ if err != nil {
+ p.setLastDialError(err)
+ if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
+ go p.tryDial()
+ }
+ return nil, err
+ }
+
+ cn := NewConn(netConn)
+ cn.pooled = pooled
+ return cn, nil
+}
+
+func (p *ConnPool) tryDial() {
+ for {
+ if p.closed() {
+ return
+ }
+
+ conn, err := p.opt.Dialer()
+ if err != nil {
+ p.setLastDialError(err)
+ time.Sleep(time.Second)
+ continue
+ }
+
+ atomic.StoreUint32(&p.dialErrorsNum, 0)
+ _ = conn.Close()
+ return
+ }
+}
+
+func (p *ConnPool) setLastDialError(err error) {
+ p.lastDialErrorMu.Lock()
+ p.lastDialError = err
+ p.lastDialErrorMu.Unlock()
+}
+
+func (p *ConnPool) getLastDialError() error {
+ p.lastDialErrorMu.RLock()
+ err := p.lastDialError
+ p.lastDialErrorMu.RUnlock()
+ return err
+}
+
+// Get returns existed connection from the pool or creates a new one.
+func (p *ConnPool) Get() (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ err := p.waitTurn()
+ if err != nil {
+ return nil, err
+ }
+
+ for {
+ p.connsMu.Lock()
+ cn := p.popIdle()
+ p.connsMu.Unlock()
+
+ if cn == nil {
+ break
+ }
+
+ if p.isStaleConn(cn) {
+ _ = p.CloseConn(cn)
+ continue
+ }
+
+ atomic.AddUint32(&p.stats.Hits, 1)
+ return cn, nil
+ }
+
+ atomic.AddUint32(&p.stats.Misses, 1)
+
+ newcn, err := p._NewConn(true)
+ if err != nil {
+ p.freeTurn()
+ return nil, err
+ }
+
+ return newcn, nil
+}
+
+func (p *ConnPool) getTurn() {
+ p.queue <- struct{}{}
+}
+
+func (p *ConnPool) waitTurn() error {
+ select {
+ case p.queue <- struct{}{}:
+ return nil
+ default:
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return nil
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return ErrPoolTimeout
+ }
+ }
+}
+
+func (p *ConnPool) freeTurn() {
+ <-p.queue
+}
+
+func (p *ConnPool) popIdle() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
+ }
+
+ idx := len(p.idleConns) - 1
+ cn := p.idleConns[idx]
+ p.idleConns = p.idleConns[:idx]
+ p.idleConnsLen--
+ p.checkMinIdleConns()
+ return cn
+}
+
+func (p *ConnPool) Put(cn *Conn) {
+ if !cn.pooled {
+ p.Remove(cn)
+ return
+ }
+
+ p.connsMu.Lock()
+ p.idleConns = append(p.idleConns, cn)
+ p.idleConnsLen++
+ p.connsMu.Unlock()
+ p.freeTurn()
+}
+
+func (p *ConnPool) Remove(cn *Conn) {
+ p.removeConn(cn)
+ p.freeTurn()
+ _ = p.closeConn(cn)
+}
+
+func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.removeConn(cn)
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) removeConn(cn *Conn) {
+ p.connsMu.Lock()
+ for i, c := range p.conns {
+ if c == cn {
+ p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ if cn.pooled {
+ p.poolSize--
+ p.checkMinIdleConns()
+ }
+ break
+ }
+ }
+ p.connsMu.Unlock()
+}
+
+func (p *ConnPool) closeConn(cn *Conn) error {
+ if p.opt.OnClose != nil {
+ _ = p.opt.OnClose(cn)
+ }
+ return cn.Close()
+}
+
+// Len returns total number of connections.
+func (p *ConnPool) Len() int {
+ p.connsMu.Lock()
+ n := len(p.conns)
+ p.connsMu.Unlock()
+ return n
+}
+
+// IdleLen returns number of idle connections.
+func (p *ConnPool) IdleLen() int {
+ p.connsMu.Lock()
+ n := p.idleConnsLen
+ p.connsMu.Unlock()
+ return n
+}
+
+func (p *ConnPool) Stats() *Stats {
+ idleLen := p.IdleLen()
+ return &Stats{
+ Hits: atomic.LoadUint32(&p.stats.Hits),
+ Misses: atomic.LoadUint32(&p.stats.Misses),
+ Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
+
+ TotalConns: uint32(p.Len()),
+ IdleConns: uint32(idleLen),
+ StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
+ }
+}
+
+func (p *ConnPool) closed() bool {
+ return atomic.LoadUint32(&p._closed) == 1
+}
+
+func (p *ConnPool) Filter(fn func(*Conn) bool) error {
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if fn(cn) {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ }
+ p.connsMu.Unlock()
+ return firstErr
+}
+
+func (p *ConnPool) Close() error {
+ if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
+ return ErrClosed
+ }
+
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ p.conns = nil
+ p.poolSize = 0
+ p.idleConns = nil
+ p.idleConnsLen = 0
+ p.connsMu.Unlock()
+
+ return firstErr
+}
+
+func (p *ConnPool) reapStaleConn() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
+ }
+
+ cn := p.idleConns[0]
+ if !p.isStaleConn(cn) {
+ return nil
+ }
+
+ p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+ p.idleConnsLen--
+
+ return cn
+}
+
+func (p *ConnPool) ReapStaleConns() (int, error) {
+ var n int
+ for {
+ p.getTurn()
+
+ p.connsMu.Lock()
+ cn := p.reapStaleConn()
+ p.connsMu.Unlock()
+
+ if cn != nil {
+ p.removeConn(cn)
+ }
+
+ p.freeTurn()
+
+ if cn != nil {
+ p.closeConn(cn)
+ n++
+ } else {
+ break
+ }
+ }
+ return n, nil
+}
+
+func (p *ConnPool) reaper(frequency time.Duration) {
+ ticker := time.NewTicker(frequency)
+ defer ticker.Stop()
+
+ for range ticker.C {
+ if p.closed() {
+ break
+ }
+ n, err := p.ReapStaleConns()
+ if err != nil {
+ internal.Logf("ReapStaleConns failed: %s", err)
+ continue
+ }
+ atomic.AddUint32(&p.stats.StaleConns, uint32(n))
+ }
+}
+
+func (p *ConnPool) isStaleConn(cn *Conn) bool {
+ if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
+ return false
+ }
+
+ now := time.Now()
+ if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
+ return true
+ }
+ if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
+ return true
+ }
+
+ return false
+}