aboutsummaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/cache/cache.go1
-rw-r--r--modules/cache/cache_redis.go140
-rw-r--r--modules/nosql/leveldb.go25
-rw-r--r--modules/nosql/manager.go71
-rw-r--r--modules/nosql/manager_leveldb.go151
-rw-r--r--modules/nosql/manager_redis.go205
-rw-r--r--modules/nosql/redis.go102
-rw-r--r--modules/nosql/redis_test.go35
-rw-r--r--modules/queue/queue_disk.go31
-rw-r--r--modules/queue/queue_redis.go29
-rw-r--r--modules/queue/unique_queue_disk.go31
-rw-r--r--modules/queue/unique_queue_redis.go2
-rw-r--r--modules/session/redis.go216
-rw-r--r--modules/session/virtual.go3
14 files changed, 1000 insertions, 42 deletions
diff --git a/modules/cache/cache.go b/modules/cache/cache.go
index 859f4a4b47..60865d8335 100644
--- a/modules/cache/cache.go
+++ b/modules/cache/cache.go
@@ -13,7 +13,6 @@ import (
mc "gitea.com/macaron/cache"
_ "gitea.com/macaron/cache/memcache" // memcache plugin for cache
- _ "gitea.com/macaron/cache/redis"
)
var (
diff --git a/modules/cache/cache_redis.go b/modules/cache/cache_redis.go
new file mode 100644
index 0000000000..96e865a382
--- /dev/null
+++ b/modules/cache/cache_redis.go
@@ -0,0 +1,140 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package cache
+
+import (
+ "fmt"
+ "time"
+
+ "code.gitea.io/gitea/modules/nosql"
+
+ "gitea.com/macaron/cache"
+ "github.com/go-redis/redis/v7"
+ "github.com/unknwon/com"
+)
+
+// RedisCacher represents a redis cache adapter implementation.
+type RedisCacher struct {
+ c redis.UniversalClient
+ prefix string
+ hsetName string
+ occupyMode bool
+}
+
+// Put puts value into cache with key and expire time.
+// If expired is 0, it lives forever.
+func (c *RedisCacher) Put(key string, val interface{}, expire int64) error {
+ key = c.prefix + key
+ if expire == 0 {
+ if err := c.c.Set(key, com.ToStr(val), 0).Err(); err != nil {
+ return err
+ }
+ } else {
+ dur, err := time.ParseDuration(com.ToStr(expire) + "s")
+ if err != nil {
+ return err
+ }
+ if err = c.c.Set(key, com.ToStr(val), dur).Err(); err != nil {
+ return err
+ }
+ }
+
+ if c.occupyMode {
+ return nil
+ }
+ return c.c.HSet(c.hsetName, key, "0").Err()
+}
+
+// Get gets cached value by given key.
+func (c *RedisCacher) Get(key string) interface{} {
+ val, err := c.c.Get(c.prefix + key).Result()
+ if err != nil {
+ return nil
+ }
+ return val
+}
+
+// Delete deletes cached value by given key.
+func (c *RedisCacher) Delete(key string) error {
+ key = c.prefix + key
+ if err := c.c.Del(key).Err(); err != nil {
+ return err
+ }
+
+ if c.occupyMode {
+ return nil
+ }
+ return c.c.HDel(c.hsetName, key).Err()
+}
+
+// Incr increases cached int-type value by given key as a counter.
+func (c *RedisCacher) Incr(key string) error {
+ if !c.IsExist(key) {
+ return fmt.Errorf("key '%s' not exist", key)
+ }
+ return c.c.Incr(c.prefix + key).Err()
+}
+
+// Decr decreases cached int-type value by given key as a counter.
+func (c *RedisCacher) Decr(key string) error {
+ if !c.IsExist(key) {
+ return fmt.Errorf("key '%s' not exist", key)
+ }
+ return c.c.Decr(c.prefix + key).Err()
+}
+
+// IsExist returns true if cached value exists.
+func (c *RedisCacher) IsExist(key string) bool {
+ if c.c.Exists(c.prefix+key).Val() == 1 {
+ return true
+ }
+
+ if !c.occupyMode {
+ c.c.HDel(c.hsetName, c.prefix+key)
+ }
+ return false
+}
+
+// Flush deletes all cached data.
+func (c *RedisCacher) Flush() error {
+ if c.occupyMode {
+ return c.c.FlushDB().Err()
+ }
+
+ keys, err := c.c.HKeys(c.hsetName).Result()
+ if err != nil {
+ return err
+ }
+ if err = c.c.Del(keys...).Err(); err != nil {
+ return err
+ }
+ return c.c.Del(c.hsetName).Err()
+}
+
+// StartAndGC starts GC routine based on config string settings.
+// AdapterConfig: network=tcp,addr=:6379,password=macaron,db=0,pool_size=100,idle_timeout=180,hset_name=MacaronCache,prefix=cache:
+func (c *RedisCacher) StartAndGC(opts cache.Options) error {
+ c.hsetName = "MacaronCache"
+ c.occupyMode = opts.OccupyMode
+
+ uri := nosql.ToRedisURI(opts.AdapterConfig)
+
+ c.c = nosql.GetManager().GetRedisClient(uri.String())
+
+ for k, v := range uri.Query() {
+ switch k {
+ case "hset_name":
+ c.hsetName = v[0]
+ case "prefix":
+ c.prefix = v[0]
+ }
+ }
+
+ return c.c.Ping().Err()
+}
+
+func init() {
+ cache.Register("redis", &RedisCacher{})
+}
diff --git a/modules/nosql/leveldb.go b/modules/nosql/leveldb.go
new file mode 100644
index 0000000000..5da2291e03
--- /dev/null
+++ b/modules/nosql/leveldb.go
@@ -0,0 +1,25 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import "net/url"
+
+// ToLevelDBURI converts old style connections to a LevelDBURI
+//
+// A LevelDBURI matches the pattern:
+//
+// leveldb://path[?[option=value]*]
+//
+// We have previously just provided the path but this prevent other options
+func ToLevelDBURI(connection string) *url.URL {
+ uri, err := url.Parse(connection)
+ if err == nil && uri.Scheme == "leveldb" {
+ return uri
+ }
+ uri, _ = url.Parse("leveldb://common")
+ uri.Host = ""
+ uri.Path = connection
+ return uri
+}
diff --git a/modules/nosql/manager.go b/modules/nosql/manager.go
new file mode 100644
index 0000000000..ad61d6d18c
--- /dev/null
+++ b/modules/nosql/manager.go
@@ -0,0 +1,71 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/go-redis/redis/v7"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+var manager *Manager
+
+// Manager is the nosql connection manager
+type Manager struct {
+ mutex sync.Mutex
+
+ RedisConnections map[string]*redisClientHolder
+ LevelDBConnections map[string]*levelDBHolder
+}
+
+type redisClientHolder struct {
+ redis.UniversalClient
+ name []string
+ count int64
+}
+
+func (r *redisClientHolder) Close() error {
+ return manager.CloseRedisClient(r.name[0])
+}
+
+type levelDBHolder struct {
+ name []string
+ count int64
+ db *leveldb.DB
+}
+
+func init() {
+ _ = GetManager()
+}
+
+// GetManager returns a Manager and initializes one as singleton is there's none yet
+func GetManager() *Manager {
+ if manager == nil {
+ manager = &Manager{
+ RedisConnections: make(map[string]*redisClientHolder),
+ LevelDBConnections: make(map[string]*levelDBHolder),
+ }
+ }
+ return manager
+}
+
+func valToTimeDuration(vs []string) (result time.Duration) {
+ var err error
+ for _, v := range vs {
+ result, err = time.ParseDuration(v)
+ if err != nil {
+ var val int
+ val, err = strconv.Atoi(v)
+ result = time.Duration(val)
+ }
+ if err == nil {
+ return
+ }
+ }
+ return
+}
diff --git a/modules/nosql/manager_leveldb.go b/modules/nosql/manager_leveldb.go
new file mode 100644
index 0000000000..769d5002d0
--- /dev/null
+++ b/modules/nosql/manager_leveldb.go
@@ -0,0 +1,151 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+// CloseLevelDB closes a levelDB
+func (m *Manager) CloseLevelDB(connection string) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ db, ok := m.LevelDBConnections[connection]
+ if !ok {
+ connection = ToLevelDBURI(connection).String()
+ db, ok = m.LevelDBConnections[connection]
+ }
+ if !ok {
+ return nil
+ }
+
+ db.count--
+ if db.count > 0 {
+ return nil
+ }
+
+ for _, name := range db.name {
+ delete(m.LevelDBConnections, name)
+ }
+ return db.db.Close()
+}
+
+// GetLevelDB gets a levelDB for a particular connection
+func (m *Manager) GetLevelDB(connection string) (*leveldb.DB, error) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ db, ok := m.LevelDBConnections[connection]
+ if ok {
+ db.count++
+
+ return db.db, nil
+ }
+ dataDir := connection
+ uri := ToLevelDBURI(connection)
+ db = &levelDBHolder{
+ name: []string{connection, uri.String()},
+ }
+
+ dataDir = path.Join(uri.Host, uri.Path)
+ opts := &opt.Options{}
+ for k, v := range uri.Query() {
+ switch replacer.Replace(strings.ToLower(k)) {
+ case "blockcachecapacity":
+ opts.BlockCacheCapacity, _ = strconv.Atoi(v[0])
+ case "blockcacheevictremoved":
+ opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0])
+ case "blockrestartinterval":
+ opts.BlockRestartInterval, _ = strconv.Atoi(v[0])
+ case "blocksize":
+ opts.BlockSize, _ = strconv.Atoi(v[0])
+ case "compactionexpandlimitfactor":
+ opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0])
+ case "compactiongpoverlapsfactor":
+ opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0])
+ case "compactionl0trigger":
+ opts.CompactionL0Trigger, _ = strconv.Atoi(v[0])
+ case "compactionsourcelimitfactor":
+ opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0])
+ case "compactiontablesize":
+ opts.CompactionTableSize, _ = strconv.Atoi(v[0])
+ case "compactiontablesizemultiplier":
+ opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
+ case "compactiontablesizemultiplierperlevel":
+ for _, val := range v {
+ f, _ := strconv.ParseFloat(val, 64)
+ opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f)
+ }
+ case "compactiontotalsize":
+ opts.CompactionTotalSize, _ = strconv.Atoi(v[0])
+ case "compactiontotalsizemultiplier":
+ opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
+ case "compactiontotalsizemultiplierperlevel":
+ for _, val := range v {
+ f, _ := strconv.ParseFloat(val, 64)
+ opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f)
+ }
+ case "compression":
+ val, _ := strconv.Atoi(v[0])
+ opts.Compression = opt.Compression(val)
+ case "disablebufferpool":
+ opts.DisableBufferPool, _ = strconv.ParseBool(v[0])
+ case "disableblockcache":
+ opts.DisableBlockCache, _ = strconv.ParseBool(v[0])
+ case "disablecompactionbackoff":
+ opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0])
+ case "disablelargebatchtransaction":
+ opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0])
+ case "errorifexist":
+ opts.ErrorIfExist, _ = strconv.ParseBool(v[0])
+ case "errorifmissing":
+ opts.ErrorIfMissing, _ = strconv.ParseBool(v[0])
+ case "iteratorsamplingrate":
+ opts.IteratorSamplingRate, _ = strconv.Atoi(v[0])
+ case "nosync":
+ opts.NoSync, _ = strconv.ParseBool(v[0])
+ case "nowritemerge":
+ opts.NoWriteMerge, _ = strconv.ParseBool(v[0])
+ case "openfilescachecapacity":
+ opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0])
+ case "readonly":
+ opts.ReadOnly, _ = strconv.ParseBool(v[0])
+ case "strict":
+ val, _ := strconv.Atoi(v[0])
+ opts.Strict = opt.Strict(val)
+ case "writebuffer":
+ opts.WriteBuffer, _ = strconv.Atoi(v[0])
+ case "writel0pausetrigger":
+ opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0])
+ case "writel0slowdowntrigger":
+ opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0])
+ case "clientname":
+ db.name = append(db.name, v[0])
+ }
+ }
+
+ var err error
+ db.db, err = leveldb.OpenFile(dataDir, opts)
+ if err != nil {
+ if !errors.IsCorrupted(err) {
+ return nil, err
+ }
+ db.db, err = leveldb.RecoverFile(dataDir, opts)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ for _, name := range db.name {
+ m.LevelDBConnections[name] = db
+ }
+ db.count++
+ return db.db, nil
+}
diff --git a/modules/nosql/manager_redis.go b/modules/nosql/manager_redis.go
new file mode 100644
index 0000000000..7792a90112
--- /dev/null
+++ b/modules/nosql/manager_redis.go
@@ -0,0 +1,205 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "crypto/tls"
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/go-redis/redis/v7"
+)
+
+var replacer = strings.NewReplacer("_", "", "-", "")
+
+// CloseRedisClient closes a redis client
+func (m *Manager) CloseRedisClient(connection string) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ client, ok := m.RedisConnections[connection]
+ if !ok {
+ connection = ToRedisURI(connection).String()
+ client, ok = m.RedisConnections[connection]
+ }
+ if !ok {
+ return nil
+ }
+
+ client.count--
+ if client.count > 0 {
+ return nil
+ }
+
+ for _, name := range client.name {
+ delete(m.RedisConnections, name)
+ }
+ return client.UniversalClient.Close()
+}
+
+// GetRedisClient gets a redis client for a particular connection
+func (m *Manager) GetRedisClient(connection string) redis.UniversalClient {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ client, ok := m.RedisConnections[connection]
+ if ok {
+ client.count++
+ return client
+ }
+
+ uri := ToRedisURI(connection)
+ client, ok = m.RedisConnections[uri.String()]
+ if ok {
+ client.count++
+ return client
+ }
+ client = &redisClientHolder{
+ name: []string{connection, uri.String()},
+ }
+
+ opts := &redis.UniversalOptions{}
+ tlsConfig := &tls.Config{}
+
+ // Handle username/password
+ if password, ok := uri.User.Password(); ok {
+ opts.Password = password
+ // Username does not appear to be handled by redis.Options
+ opts.Username = uri.User.Username()
+ } else if uri.User.Username() != "" {
+ // assume this is the password
+ opts.Password = uri.User.Username()
+ }
+
+ // Now handle the uri query sets
+ for k, v := range uri.Query() {
+ switch replacer.Replace(strings.ToLower(k)) {
+ case "addr":
+ opts.Addrs = append(opts.Addrs, v...)
+ case "addrs":
+ opts.Addrs = append(opts.Addrs, strings.Split(v[0], ",")...)
+ case "username":
+ opts.Username = v[0]
+ case "password":
+ opts.Password = v[0]
+ case "database":
+ fallthrough
+ case "db":
+ opts.DB, _ = strconv.Atoi(v[0])
+ case "maxretries":
+ opts.MaxRetries, _ = strconv.Atoi(v[0])
+ case "minretrybackoff":
+ opts.MinRetryBackoff = valToTimeDuration(v)
+ case "maxretrybackoff":
+ opts.MaxRetryBackoff = valToTimeDuration(v)
+ case "timeout":
+ timeout := valToTimeDuration(v)
+ if timeout != 0 {
+ if opts.DialTimeout == 0 {
+ opts.DialTimeout = timeout
+ }
+ if opts.ReadTimeout == 0 {
+ opts.ReadTimeout = timeout
+ }
+ }
+ case "dialtimeout":
+ opts.DialTimeout = valToTimeDuration(v)
+ case "readtimeout":
+ opts.ReadTimeout = valToTimeDuration(v)
+ case "writetimeout":
+ opts.WriteTimeout = valToTimeDuration(v)
+ case "poolsize":
+ opts.PoolSize, _ = strconv.Atoi(v[0])
+ case "minidleconns":
+ opts.MinIdleConns, _ = strconv.Atoi(v[0])
+ case "pooltimeout":
+ opts.PoolTimeout = valToTimeDuration(v)
+ case "idletimeout":
+ opts.IdleTimeout = valToTimeDuration(v)
+ case "idlecheckfrequency":
+ opts.IdleCheckFrequency = valToTimeDuration(v)
+ case "maxredirects":
+ opts.MaxRedirects, _ = strconv.Atoi(v[0])
+ case "readonly":
+ opts.ReadOnly, _ = strconv.ParseBool(v[0])
+ case "routebylatency":
+ opts.RouteByLatency, _ = strconv.ParseBool(v[0])
+ case "routerandomly":
+ opts.RouteRandomly, _ = strconv.ParseBool(v[0])
+ case "sentinelmasterid":
+ fallthrough
+ case "mastername":
+ opts.MasterName = v[0]
+ case "skipverify":
+ fallthrough
+ case "insecureskipverify":
+ insecureSkipVerify, _ := strconv.ParseBool(v[0])
+ tlsConfig.InsecureSkipVerify = insecureSkipVerify
+ case "clientname":
+ client.name = append(client.name, v[0])
+ }
+ }
+
+ switch uri.Scheme {
+ case "redis+sentinels":
+ fallthrough
+ case "rediss+sentinel":
+ opts.TLSConfig = tlsConfig
+ fallthrough
+ case "redis+sentinel":
+ if uri.Host != "" {
+ opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...)
+ }
+ if uri.Path != "" {
+ if db, err := strconv.Atoi(uri.Path); err == nil {
+ opts.DB = db
+ }
+ }
+
+ client.UniversalClient = redis.NewFailoverClient(opts.Failover())
+ case "redis+clusters":
+ fallthrough
+ case "rediss+cluster":
+ opts.TLSConfig = tlsConfig
+ fallthrough
+ case "redis+cluster":
+ if uri.Host != "" {
+ opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...)
+ }
+ if uri.Path != "" {
+ if db, err := strconv.Atoi(uri.Path); err == nil {
+ opts.DB = db
+ }
+ }
+ client.UniversalClient = redis.NewClusterClient(opts.Cluster())
+ case "redis+socket":
+ simpleOpts := opts.Simple()
+ simpleOpts.Network = "unix"
+ simpleOpts.Addr = path.Join(uri.Host, uri.Path)
+ client.UniversalClient = redis.NewClient(simpleOpts)
+ case "rediss":
+ opts.TLSConfig = tlsConfig
+ fallthrough
+ case "redis":
+ if uri.Host != "" {
+ opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...)
+ }
+ if uri.Path != "" {
+ if db, err := strconv.Atoi(uri.Path); err == nil {
+ opts.DB = db
+ }
+ }
+ client.UniversalClient = redis.NewClient(opts.Simple())
+ default:
+ return nil
+ }
+
+ for _, name := range client.name {
+ m.RedisConnections[name] = client
+ }
+
+ client.count++
+
+ return client
+}
diff --git a/modules/nosql/redis.go b/modules/nosql/redis.go
new file mode 100644
index 0000000000..528f5fc802
--- /dev/null
+++ b/modules/nosql/redis.go
@@ -0,0 +1,102 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "net/url"
+ "strconv"
+ "strings"
+)
+
+// The file contains common redis connection functions
+
+// ToRedisURI converts old style connections to a RedisURI
+//
+// A RedisURI matches the pattern:
+//
+// redis://[username:password@]host[:port][/database][?[option=value]*]
+// rediss://[username:password@]host[:port][/database][?[option=value]*]
+// redis+socket://[username:password@]path[/database][?[option=value]*]
+// redis+sentinel://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*]
+// redis+cluster://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*]
+//
+// We have previously used a URI like:
+// addrs=127.0.0.1:6379 db=0
+// network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180
+//
+// We need to convert this old style to the new style
+func ToRedisURI(connection string) *url.URL {
+ uri, err := url.Parse(connection)
+ if err == nil && strings.HasPrefix(uri.Scheme, "redis") {
+ // OK we're going to assume that this is a reasonable redis URI
+ return uri
+ }
+
+ // Let's set a nice default
+ uri, _ = url.Parse("redis://127.0.0.1:6379/0")
+ network := "tcp"
+ query := uri.Query()
+
+ // OK so there are two types: Space delimited and Comma delimited
+ // Let's assume that we have a space delimited string - as this is the most common
+ fields := strings.Fields(connection)
+ if len(fields) == 1 {
+ // It's a comma delimited string, then...
+ fields = strings.Split(connection, ",")
+
+ }
+ for _, f := range fields {
+ items := strings.SplitN(f, "=", 2)
+ if len(items) < 2 {
+ continue
+ }
+ switch strings.ToLower(items[0]) {
+ case "network":
+ if items[1] == "unix" {
+ uri.Scheme = "redis+socket"
+ }
+ network = items[1]
+ case "addrs":
+ uri.Host = items[1]
+ // now we need to handle the clustering
+ if strings.Contains(items[1], ",") && network == "tcp" {
+ uri.Scheme = "redis+cluster"
+ }
+ case "addr":
+ uri.Host = items[1]
+ case "password":
+ uri.User = url.UserPassword(uri.User.Username(), items[1])
+ case "username":
+ password, set := uri.User.Password()
+ if !set {
+ uri.User = url.User(items[1])
+ } else {
+ uri.User = url.UserPassword(items[1], password)
+ }
+ case "db":
+ uri.Path = "/" + items[1]
+ case "idle_timeout":
+ _, err := strconv.Atoi(items[1])
+ if err == nil {
+ query.Add("idle_timeout", items[1]+"s")
+ } else {
+ query.Add("idle_timeout", items[1])
+ }
+ default:
+ // Other options become query params
+ query.Add(items[0], items[1])
+ }
+ }
+
+ // Finally we need to fix up the Host if we have a unix port
+ if uri.Scheme == "redis+socket" {
+ query.Set("db", uri.Path)
+ uri.Path = uri.Host
+ uri.Host = ""
+ }
+ uri.RawQuery = query.Encode()
+
+ return uri
+}
diff --git a/modules/nosql/redis_test.go b/modules/nosql/redis_test.go
new file mode 100644
index 0000000000..c70d236bdc
--- /dev/null
+++ b/modules/nosql/redis_test.go
@@ -0,0 +1,35 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "testing"
+)
+
+func TestToRedisURI(t *testing.T) {
+ tests := []struct {
+ name string
+ connection string
+ want string
+ }{
+ {
+ name: "old_default",
+ connection: "addrs=127.0.0.1:6379 db=0",
+ want: "redis://127.0.0.1:6379/0",
+ },
+ {
+ name: "old_macaron_session_default",
+ connection: "network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180",
+ want: "redis://:macaron@127.0.0.1:6379/0?idle_timeout=180s&pool_size=100",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := ToRedisURI(tt.connection); got == nil || got.String() != tt.want {
+ t.Errorf(`ToRedisURI(%q) = %s, want %s`, tt.connection, got.String(), tt.want)
+ }
+ })
+ }
+}
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index ff0876488b..88b8c414c0 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "code.gitea.io/gitea/modules/nosql"
+
"gitea.com/lunny/levelqueue"
)
@@ -14,7 +16,9 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
ByteFIFOQueueConfiguration
- DataDir string
+ DataDir string
+ ConnectionString string
+ QueueName string
}
// LevelQueue implements a disk library queue
@@ -30,7 +34,11 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
}
config := configInterface.(LevelQueueConfiguration)
- byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
+ if len(config.ConnectionString) == 0 {
+ config.ConnectionString = config.DataDir
+ }
+
+ byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
return nil, err
}
@@ -51,18 +59,25 @@ var _ (ByteFIFO) = &LevelQueueByteFIFO{}
// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
type LevelQueueByteFIFO struct {
- internal *levelqueue.Queue
+ internal *levelqueue.Queue
+ connection string
}
// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
-func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
- internal, err := levelqueue.Open(dataDir)
+func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) {
+ db, err := nosql.GetManager().GetLevelDB(connection)
+ if err != nil {
+ return nil, err
+ }
+
+ internal, err := levelqueue.NewQueue(db, []byte(prefix), false)
if err != nil {
return nil, err
}
return &LevelQueueByteFIFO{
- internal: internal,
+ connection: connection,
+ internal: internal,
}, nil
}
@@ -87,7 +102,9 @@ func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
// Close this fifo
func (fifo *LevelQueueByteFIFO) Close() error {
- return fifo.internal.Close()
+ err := fifo.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(fifo.connection)
+ return err
}
// Len returns the length of the fifo
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index 4e05ddd17e..04e7b5d252 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -5,12 +5,10 @@
package queue
import (
- "errors"
- "strings"
-
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/nosql"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v7"
)
// RedisQueueType is the type for redis queue
@@ -75,11 +73,8 @@ type RedisByteFIFO struct {
// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
type RedisByteFIFOConfiguration struct {
- Network string
- Addresses string
- Password string
- DBIndex int
- QueueName string
+ ConnectionString string
+ QueueName string
}
// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
@@ -87,21 +82,7 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
fifo := &RedisByteFIFO{
queueName: config.QueueName,
}
- dbs := strings.Split(config.Addresses, ",")
- if len(dbs) == 0 {
- return nil, errors.New("no redis host specified")
- } else if len(dbs) == 1 {
- fifo.client = redis.NewClient(&redis.Options{
- Network: config.Network,
- Addr: strings.TrimSpace(dbs[0]), // use default Addr
- Password: config.Password, // no password set
- DB: config.DBIndex, // use default DB
- })
- } else {
- fifo.client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: dbs,
- })
- }
+ fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
if err := fifo.client.Ping().Err(); err != nil {
return nil, err
}
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
index bfe7aeed83..dd6ac1a538 100644
--- a/modules/queue/unique_queue_disk.go
+++ b/modules/queue/unique_queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "code.gitea.io/gitea/modules/nosql"
+
"gitea.com/lunny/levelqueue"
)
@@ -14,7 +16,9 @@ const LevelUniqueQueueType Type = "unique-level"
// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
type LevelUniqueQueueConfiguration struct {
ByteFIFOQueueConfiguration
- DataDir string
+ DataDir string
+ ConnectionString string
+ QueueName string
}
// LevelUniqueQueue implements a disk library queue
@@ -34,7 +38,11 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
}
config := configInterface.(LevelUniqueQueueConfiguration)
- byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir)
+ if len(config.ConnectionString) == 0 {
+ config.ConnectionString = config.DataDir
+ }
+
+ byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
return nil, err
}
@@ -55,18 +63,25 @@ var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{}
// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
type LevelUniqueQueueByteFIFO struct {
- internal *levelqueue.UniqueQueue
+ internal *levelqueue.UniqueQueue
+ connection string
}
// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
-func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) {
- internal, err := levelqueue.OpenUnique(dataDir)
+func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) {
+ db, err := nosql.GetManager().GetLevelDB(connection)
+ if err != nil {
+ return nil, err
+ }
+
+ internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false)
if err != nil {
return nil, err
}
return &LevelUniqueQueueByteFIFO{
- internal: internal,
+ connection: connection,
+ internal: internal,
}, nil
}
@@ -96,7 +111,9 @@ func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
// Close this fifo
func (fifo *LevelUniqueQueueByteFIFO) Close() error {
- return fifo.internal.Close()
+ err := fifo.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(fifo.connection)
+ return err
}
func init() {
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
index 9404369075..67efc66bc9 100644
--- a/modules/queue/unique_queue_redis.go
+++ b/modules/queue/unique_queue_redis.go
@@ -4,7 +4,7 @@
package queue
-import "github.com/go-redis/redis"
+import "github.com/go-redis/redis/v7"
// RedisUniqueQueueType is the type for redis queue
const RedisUniqueQueueType Type = "unique-redis"
diff --git a/modules/session/redis.go b/modules/session/redis.go
new file mode 100644
index 0000000000..c88ebd5769
--- /dev/null
+++ b/modules/session/redis.go
@@ -0,0 +1,216 @@
+// Copyright 2013 Beego Authors
+// Copyright 2014 The Macaron Authors
+// Copyright 2020 The Gitea Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package session
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/nosql"
+
+ "gitea.com/macaron/session"
+ "github.com/go-redis/redis/v7"
+)
+
+// RedisStore represents a redis session store implementation.
+type RedisStore struct {
+ c redis.UniversalClient
+ prefix, sid string
+ duration time.Duration
+ lock sync.RWMutex
+ data map[interface{}]interface{}
+}
+
+// NewRedisStore creates and returns a redis session store.
+func NewRedisStore(c redis.UniversalClient, prefix, sid string, dur time.Duration, kv map[interface{}]interface{}) *RedisStore {
+ return &RedisStore{
+ c: c,
+ prefix: prefix,
+ sid: sid,
+ duration: dur,
+ data: kv,
+ }
+}
+
+// Set sets value to given key in session.
+func (s *RedisStore) Set(key, val interface{}) error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.data[key] = val
+ return nil
+}
+
+// Get gets value by given key in session.
+func (s *RedisStore) Get(key interface{}) interface{} {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ return s.data[key]
+}
+
+// Delete delete a key from session.
+func (s *RedisStore) Delete(key interface{}) error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ delete(s.data, key)
+ return nil
+}
+
+// ID returns current session ID.
+func (s *RedisStore) ID() string {
+ return s.sid
+}
+
+// Release releases resource and save data to provider.
+func (s *RedisStore) Release() error {
+ // Skip encoding if the data is empty
+ if len(s.data) == 0 {
+ return nil
+ }
+
+ data, err := session.EncodeGob(s.data)
+ if err != nil {
+ return err
+ }
+
+ return s.c.Set(s.prefix+s.sid, string(data), s.duration).Err()
+}
+
+// Flush deletes all session data.
+func (s *RedisStore) Flush() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.data = make(map[interface{}]interface{})
+ return nil
+}
+
+// RedisProvider represents a redis session provider implementation.
+type RedisProvider struct {
+ c redis.UniversalClient
+ duration time.Duration
+ prefix string
+}
+
+// Init initializes redis session provider.
+// configs: network=tcp,addr=:6379,password=macaron,db=0,pool_size=100,idle_timeout=180,prefix=session;
+func (p *RedisProvider) Init(maxlifetime int64, configs string) (err error) {
+ p.duration, err = time.ParseDuration(fmt.Sprintf("%ds", maxlifetime))
+ if err != nil {
+ return err
+ }
+
+ uri := nosql.ToRedisURI(configs)
+
+ for k, v := range uri.Query() {
+ switch k {
+ case "prefix":
+ p.prefix = v[0]
+ }
+ }
+
+ p.c = nosql.GetManager().GetRedisClient(uri.String())
+ return p.c.Ping().Err()
+}
+
+// Read returns raw session store by session ID.
+func (p *RedisProvider) Read(sid string) (session.RawStore, error) {
+ psid := p.prefix + sid
+ if !p.Exist(sid) {
+ if err := p.c.Set(psid, "", p.duration).Err(); err != nil {
+ return nil, err
+ }
+ }
+
+ var kv map[interface{}]interface{}
+ kvs, err := p.c.Get(psid).Result()
+ if err != nil {
+ return nil, err
+ }
+ if len(kvs) == 0 {
+ kv = make(map[interface{}]interface{})
+ } else {
+ kv, err = session.DecodeGob([]byte(kvs))
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return NewRedisStore(p.c, p.prefix, sid, p.duration, kv), nil
+}
+
+// Exist returns true if session with given ID exists.
+func (p *RedisProvider) Exist(sid string) bool {
+ v, err := p.c.Exists(p.prefix + sid).Result()
+ return err == nil && v == 1
+}
+
+// Destroy deletes a session by session ID.
+func (p *RedisProvider) Destroy(sid string) error {
+ return p.c.Del(p.prefix + sid).Err()
+}
+
+// Regenerate regenerates a session store from old session ID to new one.
+func (p *RedisProvider) Regenerate(oldsid, sid string) (_ session.RawStore, err error) {
+ poldsid := p.prefix + oldsid
+ psid := p.prefix + sid
+
+ if p.Exist(sid) {
+ return nil, fmt.Errorf("new sid '%s' already exists", sid)
+ } else if !p.Exist(oldsid) {
+ // Make a fake old session.
+ if err = p.c.Set(poldsid, "", p.duration).Err(); err != nil {
+ return nil, err
+ }
+ }
+
+ if err = p.c.Rename(poldsid, psid).Err(); err != nil {
+ return nil, err
+ }
+
+ var kv map[interface{}]interface{}
+ kvs, err := p.c.Get(psid).Result()
+ if err != nil {
+ return nil, err
+ }
+
+ if len(kvs) == 0 {
+ kv = make(map[interface{}]interface{})
+ } else {
+ kv, err = session.DecodeGob([]byte(kvs))
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return NewRedisStore(p.c, p.prefix, sid, p.duration, kv), nil
+}
+
+// Count counts and returns number of sessions.
+func (p *RedisProvider) Count() int {
+ return int(p.c.DBSize().Val())
+}
+
+// GC calls GC to clean expired sessions.
+func (*RedisProvider) GC() {}
+
+func init() {
+ session.Register("redis", &RedisProvider{})
+}
diff --git a/modules/session/virtual.go b/modules/session/virtual.go
index c8e1e210cb..1139cfe89c 100644
--- a/modules/session/virtual.go
+++ b/modules/session/virtual.go
@@ -15,7 +15,6 @@ import (
mysql "gitea.com/macaron/session/mysql"
nodb "gitea.com/macaron/session/nodb"
postgres "gitea.com/macaron/session/postgres"
- redis "gitea.com/macaron/session/redis"
)
// VirtualSessionProvider represents a shadowed session provider implementation.
@@ -40,7 +39,7 @@ func (o *VirtualSessionProvider) Init(gclifetime int64, config string) error {
case "file":
o.provider = &session.FileProvider{}
case "redis":
- o.provider = &redis.RedisProvider{}
+ o.provider = &RedisProvider{}
case "mysql":
o.provider = &mysql.MysqlProvider{}
case "postgres":