diff options
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/queue_disk.go | 31 | ||||
-rw-r--r-- | modules/queue/queue_redis.go | 29 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk.go | 31 | ||||
-rw-r--r-- | modules/queue/unique_queue_redis.go | 2 |
4 files changed, 54 insertions, 39 deletions
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index ff0876488b..88b8c414c0 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "code.gitea.io/gitea/modules/nosql" + "gitea.com/lunny/levelqueue" ) @@ -14,7 +16,9 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { ByteFIFOQueueConfiguration - DataDir string + DataDir string + ConnectionString string + QueueName string } // LevelQueue implements a disk library queue @@ -30,7 +34,11 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir) + if len(config.ConnectionString) == 0 { + config.ConnectionString = config.DataDir + } + + byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { return nil, err } @@ -51,18 +59,25 @@ var _ (ByteFIFO) = &LevelQueueByteFIFO{} // LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue type LevelQueueByteFIFO struct { - internal *levelqueue.Queue + internal *levelqueue.Queue + connection string } // NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue -func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) { - internal, err := levelqueue.Open(dataDir) +func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) { + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + return nil, err + } + + internal, err := levelqueue.NewQueue(db, []byte(prefix), false) if err != nil { return nil, err } return &LevelQueueByteFIFO{ - internal: internal, + connection: connection, + internal: internal, }, nil } @@ -87,7 +102,9 @@ func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { // Close this fifo func (fifo *LevelQueueByteFIFO) Close() error { - return fifo.internal.Close() + err := fifo.internal.Close() + _ = nosql.GetManager().CloseLevelDB(fifo.connection) + return err } // Len returns the length of the fifo diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 4e05ddd17e..04e7b5d252 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -5,12 +5,10 @@ package queue import ( - "errors" - "strings" - "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/nosql" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v7" ) // RedisQueueType is the type for redis queue @@ -75,11 +73,8 @@ type RedisByteFIFO struct { // RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO type RedisByteFIFOConfiguration struct { - Network string - Addresses string - Password string - DBIndex int - QueueName string + ConnectionString string + QueueName string } // NewRedisByteFIFO creates a ByteFIFO formed from a redisClient @@ -87,21 +82,7 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) fifo := &RedisByteFIFO{ queueName: config.QueueName, } - dbs := strings.Split(config.Addresses, ",") - if len(dbs) == 0 { - return nil, errors.New("no redis host specified") - } else if len(dbs) == 1 { - fifo.client = redis.NewClient(&redis.Options{ - Network: config.Network, - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: config.Password, // no password set - DB: config.DBIndex, // use default DB - }) - } else { - fifo.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - }) - } + fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) if err := fifo.client.Ping().Err(); err != nil { return nil, err } diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index bfe7aeed83..dd6ac1a538 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "code.gitea.io/gitea/modules/nosql" + "gitea.com/lunny/levelqueue" ) @@ -14,7 +16,9 @@ const LevelUniqueQueueType Type = "unique-level" // LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue type LevelUniqueQueueConfiguration struct { ByteFIFOQueueConfiguration - DataDir string + DataDir string + ConnectionString string + QueueName string } // LevelUniqueQueue implements a disk library queue @@ -34,7 +38,11 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, } config := configInterface.(LevelUniqueQueueConfiguration) - byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir) + if len(config.ConnectionString) == 0 { + config.ConnectionString = config.DataDir + } + + byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { return nil, err } @@ -55,18 +63,25 @@ var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{} // LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue type LevelUniqueQueueByteFIFO struct { - internal *levelqueue.UniqueQueue + internal *levelqueue.UniqueQueue + connection string } // NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue -func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) { - internal, err := levelqueue.OpenUnique(dataDir) +func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) { + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + return nil, err + } + + internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false) if err != nil { return nil, err } return &LevelUniqueQueueByteFIFO{ - internal: internal, + connection: connection, + internal: internal, }, nil } @@ -96,7 +111,9 @@ func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { // Close this fifo func (fifo *LevelUniqueQueueByteFIFO) Close() error { - return fifo.internal.Close() + err := fifo.internal.Close() + _ = nosql.GetManager().CloseLevelDB(fifo.connection) + return err } func init() { diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 9404369075..67efc66bc9 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -4,7 +4,7 @@ package queue -import "github.com/go-redis/redis" +import "github.com/go-redis/redis/v7" // RedisUniqueQueueType is the type for redis queue const RedisUniqueQueueType Type = "unique-redis" |