diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/cache/cache.go | 1 | ||||
-rw-r--r-- | modules/cache/cache_redis.go | 140 | ||||
-rw-r--r-- | modules/nosql/leveldb.go | 25 | ||||
-rw-r--r-- | modules/nosql/manager.go | 71 | ||||
-rw-r--r-- | modules/nosql/manager_leveldb.go | 151 | ||||
-rw-r--r-- | modules/nosql/manager_redis.go | 205 | ||||
-rw-r--r-- | modules/nosql/redis.go | 102 | ||||
-rw-r--r-- | modules/nosql/redis_test.go | 35 | ||||
-rw-r--r-- | modules/queue/queue_disk.go | 31 | ||||
-rw-r--r-- | modules/queue/queue_redis.go | 29 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk.go | 31 | ||||
-rw-r--r-- | modules/queue/unique_queue_redis.go | 2 | ||||
-rw-r--r-- | modules/session/redis.go | 216 | ||||
-rw-r--r-- | modules/session/virtual.go | 3 |
14 files changed, 1000 insertions, 42 deletions
diff --git a/modules/cache/cache.go b/modules/cache/cache.go index 859f4a4b47..60865d8335 100644 --- a/modules/cache/cache.go +++ b/modules/cache/cache.go @@ -13,7 +13,6 @@ import ( mc "gitea.com/macaron/cache" _ "gitea.com/macaron/cache/memcache" // memcache plugin for cache - _ "gitea.com/macaron/cache/redis" ) var ( diff --git a/modules/cache/cache_redis.go b/modules/cache/cache_redis.go new file mode 100644 index 0000000000..96e865a382 --- /dev/null +++ b/modules/cache/cache_redis.go @@ -0,0 +1,140 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package cache + +import ( + "fmt" + "time" + + "code.gitea.io/gitea/modules/nosql" + + "gitea.com/macaron/cache" + "github.com/go-redis/redis/v7" + "github.com/unknwon/com" +) + +// RedisCacher represents a redis cache adapter implementation. +type RedisCacher struct { + c redis.UniversalClient + prefix string + hsetName string + occupyMode bool +} + +// Put puts value into cache with key and expire time. +// If expired is 0, it lives forever. +func (c *RedisCacher) Put(key string, val interface{}, expire int64) error { + key = c.prefix + key + if expire == 0 { + if err := c.c.Set(key, com.ToStr(val), 0).Err(); err != nil { + return err + } + } else { + dur, err := time.ParseDuration(com.ToStr(expire) + "s") + if err != nil { + return err + } + if err = c.c.Set(key, com.ToStr(val), dur).Err(); err != nil { + return err + } + } + + if c.occupyMode { + return nil + } + return c.c.HSet(c.hsetName, key, "0").Err() +} + +// Get gets cached value by given key. +func (c *RedisCacher) Get(key string) interface{} { + val, err := c.c.Get(c.prefix + key).Result() + if err != nil { + return nil + } + return val +} + +// Delete deletes cached value by given key. +func (c *RedisCacher) Delete(key string) error { + key = c.prefix + key + if err := c.c.Del(key).Err(); err != nil { + return err + } + + if c.occupyMode { + return nil + } + return c.c.HDel(c.hsetName, key).Err() +} + +// Incr increases cached int-type value by given key as a counter. +func (c *RedisCacher) Incr(key string) error { + if !c.IsExist(key) { + return fmt.Errorf("key '%s' not exist", key) + } + return c.c.Incr(c.prefix + key).Err() +} + +// Decr decreases cached int-type value by given key as a counter. +func (c *RedisCacher) Decr(key string) error { + if !c.IsExist(key) { + return fmt.Errorf("key '%s' not exist", key) + } + return c.c.Decr(c.prefix + key).Err() +} + +// IsExist returns true if cached value exists. +func (c *RedisCacher) IsExist(key string) bool { + if c.c.Exists(c.prefix+key).Val() == 1 { + return true + } + + if !c.occupyMode { + c.c.HDel(c.hsetName, c.prefix+key) + } + return false +} + +// Flush deletes all cached data. +func (c *RedisCacher) Flush() error { + if c.occupyMode { + return c.c.FlushDB().Err() + } + + keys, err := c.c.HKeys(c.hsetName).Result() + if err != nil { + return err + } + if err = c.c.Del(keys...).Err(); err != nil { + return err + } + return c.c.Del(c.hsetName).Err() +} + +// StartAndGC starts GC routine based on config string settings. +// AdapterConfig: network=tcp,addr=:6379,password=macaron,db=0,pool_size=100,idle_timeout=180,hset_name=MacaronCache,prefix=cache: +func (c *RedisCacher) StartAndGC(opts cache.Options) error { + c.hsetName = "MacaronCache" + c.occupyMode = opts.OccupyMode + + uri := nosql.ToRedisURI(opts.AdapterConfig) + + c.c = nosql.GetManager().GetRedisClient(uri.String()) + + for k, v := range uri.Query() { + switch k { + case "hset_name": + c.hsetName = v[0] + case "prefix": + c.prefix = v[0] + } + } + + return c.c.Ping().Err() +} + +func init() { + cache.Register("redis", &RedisCacher{}) +} diff --git a/modules/nosql/leveldb.go b/modules/nosql/leveldb.go new file mode 100644 index 0000000000..5da2291e03 --- /dev/null +++ b/modules/nosql/leveldb.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nosql + +import "net/url" + +// ToLevelDBURI converts old style connections to a LevelDBURI +// +// A LevelDBURI matches the pattern: +// +// leveldb://path[?[option=value]*] +// +// We have previously just provided the path but this prevent other options +func ToLevelDBURI(connection string) *url.URL { + uri, err := url.Parse(connection) + if err == nil && uri.Scheme == "leveldb" { + return uri + } + uri, _ = url.Parse("leveldb://common") + uri.Host = "" + uri.Path = connection + return uri +} diff --git a/modules/nosql/manager.go b/modules/nosql/manager.go new file mode 100644 index 0000000000..ad61d6d18c --- /dev/null +++ b/modules/nosql/manager.go @@ -0,0 +1,71 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nosql + +import ( + "strconv" + "sync" + "time" + + "github.com/go-redis/redis/v7" + "github.com/syndtr/goleveldb/leveldb" +) + +var manager *Manager + +// Manager is the nosql connection manager +type Manager struct { + mutex sync.Mutex + + RedisConnections map[string]*redisClientHolder + LevelDBConnections map[string]*levelDBHolder +} + +type redisClientHolder struct { + redis.UniversalClient + name []string + count int64 +} + +func (r *redisClientHolder) Close() error { + return manager.CloseRedisClient(r.name[0]) +} + +type levelDBHolder struct { + name []string + count int64 + db *leveldb.DB +} + +func init() { + _ = GetManager() +} + +// GetManager returns a Manager and initializes one as singleton is there's none yet +func GetManager() *Manager { + if manager == nil { + manager = &Manager{ + RedisConnections: make(map[string]*redisClientHolder), + LevelDBConnections: make(map[string]*levelDBHolder), + } + } + return manager +} + +func valToTimeDuration(vs []string) (result time.Duration) { + var err error + for _, v := range vs { + result, err = time.ParseDuration(v) + if err != nil { + var val int + val, err = strconv.Atoi(v) + result = time.Duration(val) + } + if err == nil { + return + } + } + return +} diff --git a/modules/nosql/manager_leveldb.go b/modules/nosql/manager_leveldb.go new file mode 100644 index 0000000000..769d5002d0 --- /dev/null +++ b/modules/nosql/manager_leveldb.go @@ -0,0 +1,151 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nosql + +import ( + "path" + "strconv" + "strings" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +// CloseLevelDB closes a levelDB +func (m *Manager) CloseLevelDB(connection string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + db, ok := m.LevelDBConnections[connection] + if !ok { + connection = ToLevelDBURI(connection).String() + db, ok = m.LevelDBConnections[connection] + } + if !ok { + return nil + } + + db.count-- + if db.count > 0 { + return nil + } + + for _, name := range db.name { + delete(m.LevelDBConnections, name) + } + return db.db.Close() +} + +// GetLevelDB gets a levelDB for a particular connection +func (m *Manager) GetLevelDB(connection string) (*leveldb.DB, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + db, ok := m.LevelDBConnections[connection] + if ok { + db.count++ + + return db.db, nil + } + dataDir := connection + uri := ToLevelDBURI(connection) + db = &levelDBHolder{ + name: []string{connection, uri.String()}, + } + + dataDir = path.Join(uri.Host, uri.Path) + opts := &opt.Options{} + for k, v := range uri.Query() { + switch replacer.Replace(strings.ToLower(k)) { + case "blockcachecapacity": + opts.BlockCacheCapacity, _ = strconv.Atoi(v[0]) + case "blockcacheevictremoved": + opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0]) + case "blockrestartinterval": + opts.BlockRestartInterval, _ = strconv.Atoi(v[0]) + case "blocksize": + opts.BlockSize, _ = strconv.Atoi(v[0]) + case "compactionexpandlimitfactor": + opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0]) + case "compactiongpoverlapsfactor": + opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0]) + case "compactionl0trigger": + opts.CompactionL0Trigger, _ = strconv.Atoi(v[0]) + case "compactionsourcelimitfactor": + opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0]) + case "compactiontablesize": + opts.CompactionTableSize, _ = strconv.Atoi(v[0]) + case "compactiontablesizemultiplier": + opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64) + case "compactiontablesizemultiplierperlevel": + for _, val := range v { + f, _ := strconv.ParseFloat(val, 64) + opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f) + } + case "compactiontotalsize": + opts.CompactionTotalSize, _ = strconv.Atoi(v[0]) + case "compactiontotalsizemultiplier": + opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64) + case "compactiontotalsizemultiplierperlevel": + for _, val := range v { + f, _ := strconv.ParseFloat(val, 64) + opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f) + } + case "compression": + val, _ := strconv.Atoi(v[0]) + opts.Compression = opt.Compression(val) + case "disablebufferpool": + opts.DisableBufferPool, _ = strconv.ParseBool(v[0]) + case "disableblockcache": + opts.DisableBlockCache, _ = strconv.ParseBool(v[0]) + case "disablecompactionbackoff": + opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0]) + case "disablelargebatchtransaction": + opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0]) + case "errorifexist": + opts.ErrorIfExist, _ = strconv.ParseBool(v[0]) + case "errorifmissing": + opts.ErrorIfMissing, _ = strconv.ParseBool(v[0]) + case "iteratorsamplingrate": + opts.IteratorSamplingRate, _ = strconv.Atoi(v[0]) + case "nosync": + opts.NoSync, _ = strconv.ParseBool(v[0]) + case "nowritemerge": + opts.NoWriteMerge, _ = strconv.ParseBool(v[0]) + case "openfilescachecapacity": + opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0]) + case "readonly": + opts.ReadOnly, _ = strconv.ParseBool(v[0]) + case "strict": + val, _ := strconv.Atoi(v[0]) + opts.Strict = opt.Strict(val) + case "writebuffer": + opts.WriteBuffer, _ = strconv.Atoi(v[0]) + case "writel0pausetrigger": + opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0]) + case "writel0slowdowntrigger": + opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0]) + case "clientname": + db.name = append(db.name, v[0]) + } + } + + var err error + db.db, err = leveldb.OpenFile(dataDir, opts) + if err != nil { + if !errors.IsCorrupted(err) { + return nil, err + } + db.db, err = leveldb.RecoverFile(dataDir, opts) + if err != nil { + return nil, err + } + } + + for _, name := range db.name { + m.LevelDBConnections[name] = db + } + db.count++ + return db.db, nil +} diff --git a/modules/nosql/manager_redis.go b/modules/nosql/manager_redis.go new file mode 100644 index 0000000000..7792a90112 --- /dev/null +++ b/modules/nosql/manager_redis.go @@ -0,0 +1,205 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nosql + +import ( + "crypto/tls" + "path" + "strconv" + "strings" + + "github.com/go-redis/redis/v7" +) + +var replacer = strings.NewReplacer("_", "", "-", "") + +// CloseRedisClient closes a redis client +func (m *Manager) CloseRedisClient(connection string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + client, ok := m.RedisConnections[connection] + if !ok { + connection = ToRedisURI(connection).String() + client, ok = m.RedisConnections[connection] + } + if !ok { + return nil + } + + client.count-- + if client.count > 0 { + return nil + } + + for _, name := range client.name { + delete(m.RedisConnections, name) + } + return client.UniversalClient.Close() +} + +// GetRedisClient gets a redis client for a particular connection +func (m *Manager) GetRedisClient(connection string) redis.UniversalClient { + m.mutex.Lock() + defer m.mutex.Unlock() + client, ok := m.RedisConnections[connection] + if ok { + client.count++ + return client + } + + uri := ToRedisURI(connection) + client, ok = m.RedisConnections[uri.String()] + if ok { + client.count++ + return client + } + client = &redisClientHolder{ + name: []string{connection, uri.String()}, + } + + opts := &redis.UniversalOptions{} + tlsConfig := &tls.Config{} + + // Handle username/password + if password, ok := uri.User.Password(); ok { + opts.Password = password + // Username does not appear to be handled by redis.Options + opts.Username = uri.User.Username() + } else if uri.User.Username() != "" { + // assume this is the password + opts.Password = uri.User.Username() + } + + // Now handle the uri query sets + for k, v := range uri.Query() { + switch replacer.Replace(strings.ToLower(k)) { + case "addr": + opts.Addrs = append(opts.Addrs, v...) + case "addrs": + opts.Addrs = append(opts.Addrs, strings.Split(v[0], ",")...) + case "username": + opts.Username = v[0] + case "password": + opts.Password = v[0] + case "database": + fallthrough + case "db": + opts.DB, _ = strconv.Atoi(v[0]) + case "maxretries": + opts.MaxRetries, _ = strconv.Atoi(v[0]) + case "minretrybackoff": + opts.MinRetryBackoff = valToTimeDuration(v) + case "maxretrybackoff": + opts.MaxRetryBackoff = valToTimeDuration(v) + case "timeout": + timeout := valToTimeDuration(v) + if timeout != 0 { + if opts.DialTimeout == 0 { + opts.DialTimeout = timeout + } + if opts.ReadTimeout == 0 { + opts.ReadTimeout = timeout + } + } + case "dialtimeout": + opts.DialTimeout = valToTimeDuration(v) + case "readtimeout": + opts.ReadTimeout = valToTimeDuration(v) + case "writetimeout": + opts.WriteTimeout = valToTimeDuration(v) + case "poolsize": + opts.PoolSize, _ = strconv.Atoi(v[0]) + case "minidleconns": + opts.MinIdleConns, _ = strconv.Atoi(v[0]) + case "pooltimeout": + opts.PoolTimeout = valToTimeDuration(v) + case "idletimeout": + opts.IdleTimeout = valToTimeDuration(v) + case "idlecheckfrequency": + opts.IdleCheckFrequency = valToTimeDuration(v) + case "maxredirects": + opts.MaxRedirects, _ = strconv.Atoi(v[0]) + case "readonly": + opts.ReadOnly, _ = strconv.ParseBool(v[0]) + case "routebylatency": + opts.RouteByLatency, _ = strconv.ParseBool(v[0]) + case "routerandomly": + opts.RouteRandomly, _ = strconv.ParseBool(v[0]) + case "sentinelmasterid": + fallthrough + case "mastername": + opts.MasterName = v[0] + case "skipverify": + fallthrough + case "insecureskipverify": + insecureSkipVerify, _ := strconv.ParseBool(v[0]) + tlsConfig.InsecureSkipVerify = insecureSkipVerify + case "clientname": + client.name = append(client.name, v[0]) + } + } + + switch uri.Scheme { + case "redis+sentinels": + fallthrough + case "rediss+sentinel": + opts.TLSConfig = tlsConfig + fallthrough + case "redis+sentinel": + if uri.Host != "" { + opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...) + } + if uri.Path != "" { + if db, err := strconv.Atoi(uri.Path); err == nil { + opts.DB = db + } + } + + client.UniversalClient = redis.NewFailoverClient(opts.Failover()) + case "redis+clusters": + fallthrough + case "rediss+cluster": + opts.TLSConfig = tlsConfig + fallthrough + case "redis+cluster": + if uri.Host != "" { + opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...) + } + if uri.Path != "" { + if db, err := strconv.Atoi(uri.Path); err == nil { + opts.DB = db + } + } + client.UniversalClient = redis.NewClusterClient(opts.Cluster()) + case "redis+socket": + simpleOpts := opts.Simple() + simpleOpts.Network = "unix" + simpleOpts.Addr = path.Join(uri.Host, uri.Path) + client.UniversalClient = redis.NewClient(simpleOpts) + case "rediss": + opts.TLSConfig = tlsConfig + fallthrough + case "redis": + if uri.Host != "" { + opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...) + } + if uri.Path != "" { + if db, err := strconv.Atoi(uri.Path); err == nil { + opts.DB = db + } + } + client.UniversalClient = redis.NewClient(opts.Simple()) + default: + return nil + } + + for _, name := range client.name { + m.RedisConnections[name] = client + } + + client.count++ + + return client +} diff --git a/modules/nosql/redis.go b/modules/nosql/redis.go new file mode 100644 index 0000000000..528f5fc802 --- /dev/null +++ b/modules/nosql/redis.go @@ -0,0 +1,102 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nosql + +import ( + "net/url" + "strconv" + "strings" +) + +// The file contains common redis connection functions + +// ToRedisURI converts old style connections to a RedisURI +// +// A RedisURI matches the pattern: +// +// redis://[username:password@]host[:port][/database][?[option=value]*] +// rediss://[username:password@]host[:port][/database][?[option=value]*] +// redis+socket://[username:password@]path[/database][?[option=value]*] +// redis+sentinel://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*] +// redis+cluster://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*] +// +// We have previously used a URI like: +// addrs=127.0.0.1:6379 db=0 +// network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180 +// +// We need to convert this old style to the new style +func ToRedisURI(connection string) *url.URL { + uri, err := url.Parse(connection) + if err == nil && strings.HasPrefix(uri.Scheme, "redis") { + // OK we're going to assume that this is a reasonable redis URI + return uri + } + + // Let's set a nice default + uri, _ = url.Parse("redis://127.0.0.1:6379/0") + network := "tcp" + query := uri.Query() + + // OK so there are two types: Space delimited and Comma delimited + // Let's assume that we have a space delimited string - as this is the most common + fields := strings.Fields(connection) + if len(fields) == 1 { + // It's a comma delimited string, then... + fields = strings.Split(connection, ",") + + } + for _, f := range fields { + items := strings.SplitN(f, "=", 2) + if len(items) < 2 { + continue + } + switch strings.ToLower(items[0]) { + case "network": + if items[1] == "unix" { + uri.Scheme = "redis+socket" + } + network = items[1] + case "addrs": + uri.Host = items[1] + // now we need to handle the clustering + if strings.Contains(items[1], ",") && network == "tcp" { + uri.Scheme = "redis+cluster" + } + case "addr": + uri.Host = items[1] + case "password": + uri.User = url.UserPassword(uri.User.Username(), items[1]) + case "username": + password, set := uri.User.Password() + if !set { + uri.User = url.User(items[1]) + } else { + uri.User = url.UserPassword(items[1], password) + } + case "db": + uri.Path = "/" + items[1] + case "idle_timeout": + _, err := strconv.Atoi(items[1]) + if err == nil { + query.Add("idle_timeout", items[1]+"s") + } else { + query.Add("idle_timeout", items[1]) + } + default: + // Other options become query params + query.Add(items[0], items[1]) + } + } + + // Finally we need to fix up the Host if we have a unix port + if uri.Scheme == "redis+socket" { + query.Set("db", uri.Path) + uri.Path = uri.Host + uri.Host = "" + } + uri.RawQuery = query.Encode() + + return uri +} diff --git a/modules/nosql/redis_test.go b/modules/nosql/redis_test.go new file mode 100644 index 0000000000..c70d236bdc --- /dev/null +++ b/modules/nosql/redis_test.go @@ -0,0 +1,35 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nosql + +import ( + "testing" +) + +func TestToRedisURI(t *testing.T) { + tests := []struct { + name string + connection string + want string + }{ + { + name: "old_default", + connection: "addrs=127.0.0.1:6379 db=0", + want: "redis://127.0.0.1:6379/0", + }, + { + name: "old_macaron_session_default", + connection: "network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180", + want: "redis://:macaron@127.0.0.1:6379/0?idle_timeout=180s&pool_size=100", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ToRedisURI(tt.connection); got == nil || got.String() != tt.want { + t.Errorf(`ToRedisURI(%q) = %s, want %s`, tt.connection, got.String(), tt.want) + } + }) + } +} diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index ff0876488b..88b8c414c0 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "code.gitea.io/gitea/modules/nosql" + "gitea.com/lunny/levelqueue" ) @@ -14,7 +16,9 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { ByteFIFOQueueConfiguration - DataDir string + DataDir string + ConnectionString string + QueueName string } // LevelQueue implements a disk library queue @@ -30,7 +34,11 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir) + if len(config.ConnectionString) == 0 { + config.ConnectionString = config.DataDir + } + + byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { return nil, err } @@ -51,18 +59,25 @@ var _ (ByteFIFO) = &LevelQueueByteFIFO{} // LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue type LevelQueueByteFIFO struct { - internal *levelqueue.Queue + internal *levelqueue.Queue + connection string } // NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue -func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) { - internal, err := levelqueue.Open(dataDir) +func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) { + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + return nil, err + } + + internal, err := levelqueue.NewQueue(db, []byte(prefix), false) if err != nil { return nil, err } return &LevelQueueByteFIFO{ - internal: internal, + connection: connection, + internal: internal, }, nil } @@ -87,7 +102,9 @@ func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { // Close this fifo func (fifo *LevelQueueByteFIFO) Close() error { - return fifo.internal.Close() + err := fifo.internal.Close() + _ = nosql.GetManager().CloseLevelDB(fifo.connection) + return err } // Len returns the length of the fifo diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 4e05ddd17e..04e7b5d252 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -5,12 +5,10 @@ package queue import ( - "errors" - "strings" - "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/nosql" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v7" ) // RedisQueueType is the type for redis queue @@ -75,11 +73,8 @@ type RedisByteFIFO struct { // RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO type RedisByteFIFOConfiguration struct { - Network string - Addresses string - Password string - DBIndex int - QueueName string + ConnectionString string + QueueName string } // NewRedisByteFIFO creates a ByteFIFO formed from a redisClient @@ -87,21 +82,7 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) fifo := &RedisByteFIFO{ queueName: config.QueueName, } - dbs := strings.Split(config.Addresses, ",") - if len(dbs) == 0 { - return nil, errors.New("no redis host specified") - } else if len(dbs) == 1 { - fifo.client = redis.NewClient(&redis.Options{ - Network: config.Network, - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: config.Password, // no password set - DB: config.DBIndex, // use default DB - }) - } else { - fifo.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - }) - } + fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) if err := fifo.client.Ping().Err(); err != nil { return nil, err } diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index bfe7aeed83..dd6ac1a538 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "code.gitea.io/gitea/modules/nosql" + "gitea.com/lunny/levelqueue" ) @@ -14,7 +16,9 @@ const LevelUniqueQueueType Type = "unique-level" // LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue type LevelUniqueQueueConfiguration struct { ByteFIFOQueueConfiguration - DataDir string + DataDir string + ConnectionString string + QueueName string } // LevelUniqueQueue implements a disk library queue @@ -34,7 +38,11 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, } config := configInterface.(LevelUniqueQueueConfiguration) - byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir) + if len(config.ConnectionString) == 0 { + config.ConnectionString = config.DataDir + } + + byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { return nil, err } @@ -55,18 +63,25 @@ var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{} // LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue type LevelUniqueQueueByteFIFO struct { - internal *levelqueue.UniqueQueue + internal *levelqueue.UniqueQueue + connection string } // NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue -func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) { - internal, err := levelqueue.OpenUnique(dataDir) +func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) { + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + return nil, err + } + + internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false) if err != nil { return nil, err } return &LevelUniqueQueueByteFIFO{ - internal: internal, + connection: connection, + internal: internal, }, nil } @@ -96,7 +111,9 @@ func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { // Close this fifo func (fifo *LevelUniqueQueueByteFIFO) Close() error { - return fifo.internal.Close() + err := fifo.internal.Close() + _ = nosql.GetManager().CloseLevelDB(fifo.connection) + return err } func init() { diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 9404369075..67efc66bc9 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -4,7 +4,7 @@ package queue -import "github.com/go-redis/redis" +import "github.com/go-redis/redis/v7" // RedisUniqueQueueType is the type for redis queue const RedisUniqueQueueType Type = "unique-redis" diff --git a/modules/session/redis.go b/modules/session/redis.go new file mode 100644 index 0000000000..c88ebd5769 --- /dev/null +++ b/modules/session/redis.go @@ -0,0 +1,216 @@ +// Copyright 2013 Beego Authors +// Copyright 2014 The Macaron Authors +// Copyright 2020 The Gitea Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package session + +import ( + "fmt" + "sync" + "time" + + "code.gitea.io/gitea/modules/nosql" + + "gitea.com/macaron/session" + "github.com/go-redis/redis/v7" +) + +// RedisStore represents a redis session store implementation. +type RedisStore struct { + c redis.UniversalClient + prefix, sid string + duration time.Duration + lock sync.RWMutex + data map[interface{}]interface{} +} + +// NewRedisStore creates and returns a redis session store. +func NewRedisStore(c redis.UniversalClient, prefix, sid string, dur time.Duration, kv map[interface{}]interface{}) *RedisStore { + return &RedisStore{ + c: c, + prefix: prefix, + sid: sid, + duration: dur, + data: kv, + } +} + +// Set sets value to given key in session. +func (s *RedisStore) Set(key, val interface{}) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.data[key] = val + return nil +} + +// Get gets value by given key in session. +func (s *RedisStore) Get(key interface{}) interface{} { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.data[key] +} + +// Delete delete a key from session. +func (s *RedisStore) Delete(key interface{}) error { + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.data, key) + return nil +} + +// ID returns current session ID. +func (s *RedisStore) ID() string { + return s.sid +} + +// Release releases resource and save data to provider. +func (s *RedisStore) Release() error { + // Skip encoding if the data is empty + if len(s.data) == 0 { + return nil + } + + data, err := session.EncodeGob(s.data) + if err != nil { + return err + } + + return s.c.Set(s.prefix+s.sid, string(data), s.duration).Err() +} + +// Flush deletes all session data. +func (s *RedisStore) Flush() error { + s.lock.Lock() + defer s.lock.Unlock() + + s.data = make(map[interface{}]interface{}) + return nil +} + +// RedisProvider represents a redis session provider implementation. +type RedisProvider struct { + c redis.UniversalClient + duration time.Duration + prefix string +} + +// Init initializes redis session provider. +// configs: network=tcp,addr=:6379,password=macaron,db=0,pool_size=100,idle_timeout=180,prefix=session; +func (p *RedisProvider) Init(maxlifetime int64, configs string) (err error) { + p.duration, err = time.ParseDuration(fmt.Sprintf("%ds", maxlifetime)) + if err != nil { + return err + } + + uri := nosql.ToRedisURI(configs) + + for k, v := range uri.Query() { + switch k { + case "prefix": + p.prefix = v[0] + } + } + + p.c = nosql.GetManager().GetRedisClient(uri.String()) + return p.c.Ping().Err() +} + +// Read returns raw session store by session ID. +func (p *RedisProvider) Read(sid string) (session.RawStore, error) { + psid := p.prefix + sid + if !p.Exist(sid) { + if err := p.c.Set(psid, "", p.duration).Err(); err != nil { + return nil, err + } + } + + var kv map[interface{}]interface{} + kvs, err := p.c.Get(psid).Result() + if err != nil { + return nil, err + } + if len(kvs) == 0 { + kv = make(map[interface{}]interface{}) + } else { + kv, err = session.DecodeGob([]byte(kvs)) + if err != nil { + return nil, err + } + } + + return NewRedisStore(p.c, p.prefix, sid, p.duration, kv), nil +} + +// Exist returns true if session with given ID exists. +func (p *RedisProvider) Exist(sid string) bool { + v, err := p.c.Exists(p.prefix + sid).Result() + return err == nil && v == 1 +} + +// Destroy deletes a session by session ID. +func (p *RedisProvider) Destroy(sid string) error { + return p.c.Del(p.prefix + sid).Err() +} + +// Regenerate regenerates a session store from old session ID to new one. +func (p *RedisProvider) Regenerate(oldsid, sid string) (_ session.RawStore, err error) { + poldsid := p.prefix + oldsid + psid := p.prefix + sid + + if p.Exist(sid) { + return nil, fmt.Errorf("new sid '%s' already exists", sid) + } else if !p.Exist(oldsid) { + // Make a fake old session. + if err = p.c.Set(poldsid, "", p.duration).Err(); err != nil { + return nil, err + } + } + + if err = p.c.Rename(poldsid, psid).Err(); err != nil { + return nil, err + } + + var kv map[interface{}]interface{} + kvs, err := p.c.Get(psid).Result() + if err != nil { + return nil, err + } + + if len(kvs) == 0 { + kv = make(map[interface{}]interface{}) + } else { + kv, err = session.DecodeGob([]byte(kvs)) + if err != nil { + return nil, err + } + } + + return NewRedisStore(p.c, p.prefix, sid, p.duration, kv), nil +} + +// Count counts and returns number of sessions. +func (p *RedisProvider) Count() int { + return int(p.c.DBSize().Val()) +} + +// GC calls GC to clean expired sessions. +func (*RedisProvider) GC() {} + +func init() { + session.Register("redis", &RedisProvider{}) +} diff --git a/modules/session/virtual.go b/modules/session/virtual.go index c8e1e210cb..1139cfe89c 100644 --- a/modules/session/virtual.go +++ b/modules/session/virtual.go @@ -15,7 +15,6 @@ import ( mysql "gitea.com/macaron/session/mysql" nodb "gitea.com/macaron/session/nodb" postgres "gitea.com/macaron/session/postgres" - redis "gitea.com/macaron/session/redis" ) // VirtualSessionProvider represents a shadowed session provider implementation. @@ -40,7 +39,7 @@ func (o *VirtualSessionProvider) Init(gclifetime int64, config string) error { case "file": o.provider = &session.FileProvider{} case "redis": - o.provider = &redis.RedisProvider{} + o.provider = &RedisProvider{} case "mysql": o.provider = &mysql.MysqlProvider{} case "postgres": |