diff options
author | zeripath <art27@cantab.net> | 2020-09-27 22:09:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-28 00:09:46 +0300 |
commit | 7f8e3192cd941f008a3a2413ca0e9ff90c02fd88 (patch) | |
tree | a22a8feb40925b85fca67fff316ba45eef49155f /modules/queue | |
parent | f404bdde9bec5fb7badf3a5ca1c503a2a884f315 (diff) | |
download | gitea-7f8e3192cd941f008a3a2413ca0e9ff90c02fd88.tar.gz gitea-7f8e3192cd941f008a3a2413ca0e9ff90c02fd88.zip |
Allow common redis and leveldb connections (#12385)
* Allow common redis and leveldb connections
Prevents multiple reopening of redis and leveldb connections to the same
place by sharing connections.
Further allows for more configurable redis connection type using the
redisURI and a leveldbURI scheme.
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add unit-test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* as per @lunny
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Update modules/cache/cache_redis.go
* Update modules/queue/queue_disk.go
* Update modules/cache/cache_redis.go
* Update modules/cache/cache_redis.go
* Update modules/queue/unique_queue_disk.go
* Update modules/queue/queue_disk.go
* Update modules/queue/unique_queue_disk.go
* Update modules/session/redis.go
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/queue_disk.go | 31 | ||||
-rw-r--r-- | modules/queue/queue_redis.go | 29 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk.go | 31 | ||||
-rw-r--r-- | modules/queue/unique_queue_redis.go | 2 |
4 files changed, 54 insertions, 39 deletions
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index ff0876488b..88b8c414c0 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "code.gitea.io/gitea/modules/nosql" + "gitea.com/lunny/levelqueue" ) @@ -14,7 +16,9 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { ByteFIFOQueueConfiguration - DataDir string + DataDir string + ConnectionString string + QueueName string } // LevelQueue implements a disk library queue @@ -30,7 +34,11 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir) + if len(config.ConnectionString) == 0 { + config.ConnectionString = config.DataDir + } + + byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { return nil, err } @@ -51,18 +59,25 @@ var _ (ByteFIFO) = &LevelQueueByteFIFO{} // LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue type LevelQueueByteFIFO struct { - internal *levelqueue.Queue + internal *levelqueue.Queue + connection string } // NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue -func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) { - internal, err := levelqueue.Open(dataDir) +func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) { + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + return nil, err + } + + internal, err := levelqueue.NewQueue(db, []byte(prefix), false) if err != nil { return nil, err } return &LevelQueueByteFIFO{ - internal: internal, + connection: connection, + internal: internal, }, nil } @@ -87,7 +102,9 @@ func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { // Close this fifo func (fifo *LevelQueueByteFIFO) Close() error { - return fifo.internal.Close() + err := fifo.internal.Close() + _ = nosql.GetManager().CloseLevelDB(fifo.connection) + return err } // Len returns the length of the fifo diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 4e05ddd17e..04e7b5d252 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -5,12 +5,10 @@ package queue import ( - "errors" - "strings" - "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/nosql" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v7" ) // RedisQueueType is the type for redis queue @@ -75,11 +73,8 @@ type RedisByteFIFO struct { // RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO type RedisByteFIFOConfiguration struct { - Network string - Addresses string - Password string - DBIndex int - QueueName string + ConnectionString string + QueueName string } // NewRedisByteFIFO creates a ByteFIFO formed from a redisClient @@ -87,21 +82,7 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) fifo := &RedisByteFIFO{ queueName: config.QueueName, } - dbs := strings.Split(config.Addresses, ",") - if len(dbs) == 0 { - return nil, errors.New("no redis host specified") - } else if len(dbs) == 1 { - fifo.client = redis.NewClient(&redis.Options{ - Network: config.Network, - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: config.Password, // no password set - DB: config.DBIndex, // use default DB - }) - } else { - fifo.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - }) - } + fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) if err := fifo.client.Ping().Err(); err != nil { return nil, err } diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index bfe7aeed83..dd6ac1a538 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "code.gitea.io/gitea/modules/nosql" + "gitea.com/lunny/levelqueue" ) @@ -14,7 +16,9 @@ const LevelUniqueQueueType Type = "unique-level" // LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue type LevelUniqueQueueConfiguration struct { ByteFIFOQueueConfiguration - DataDir string + DataDir string + ConnectionString string + QueueName string } // LevelUniqueQueue implements a disk library queue @@ -34,7 +38,11 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, } config := configInterface.(LevelUniqueQueueConfiguration) - byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir) + if len(config.ConnectionString) == 0 { + config.ConnectionString = config.DataDir + } + + byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { return nil, err } @@ -55,18 +63,25 @@ var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{} // LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue type LevelUniqueQueueByteFIFO struct { - internal *levelqueue.UniqueQueue + internal *levelqueue.UniqueQueue + connection string } // NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue -func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) { - internal, err := levelqueue.OpenUnique(dataDir) +func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) { + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + return nil, err + } + + internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false) if err != nil { return nil, err } return &LevelUniqueQueueByteFIFO{ - internal: internal, + connection: connection, + internal: internal, }, nil } @@ -96,7 +111,9 @@ func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { // Close this fifo func (fifo *LevelUniqueQueueByteFIFO) Close() error { - return fifo.internal.Close() + err := fifo.internal.Close() + _ = nosql.GetManager().CloseLevelDB(fifo.connection) + return err } func init() { diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 9404369075..67efc66bc9 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -4,7 +4,7 @@ package queue -import "github.com/go-redis/redis" +import "github.com/go-redis/redis/v7" // RedisUniqueQueueType is the type for redis queue const RedisUniqueQueueType Type = "unique-redis" |