aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/queue_disk.go31
-rw-r--r--modules/queue/queue_redis.go29
-rw-r--r--modules/queue/unique_queue_disk.go31
-rw-r--r--modules/queue/unique_queue_redis.go2
4 files changed, 54 insertions, 39 deletions
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index ff0876488b..88b8c414c0 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "code.gitea.io/gitea/modules/nosql"
+
"gitea.com/lunny/levelqueue"
)
@@ -14,7 +16,9 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
ByteFIFOQueueConfiguration
- DataDir string
+ DataDir string
+ ConnectionString string
+ QueueName string
}
// LevelQueue implements a disk library queue
@@ -30,7 +34,11 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
}
config := configInterface.(LevelQueueConfiguration)
- byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
+ if len(config.ConnectionString) == 0 {
+ config.ConnectionString = config.DataDir
+ }
+
+ byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
return nil, err
}
@@ -51,18 +59,25 @@ var _ (ByteFIFO) = &LevelQueueByteFIFO{}
// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
type LevelQueueByteFIFO struct {
- internal *levelqueue.Queue
+ internal *levelqueue.Queue
+ connection string
}
// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
-func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
- internal, err := levelqueue.Open(dataDir)
+func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) {
+ db, err := nosql.GetManager().GetLevelDB(connection)
+ if err != nil {
+ return nil, err
+ }
+
+ internal, err := levelqueue.NewQueue(db, []byte(prefix), false)
if err != nil {
return nil, err
}
return &LevelQueueByteFIFO{
- internal: internal,
+ connection: connection,
+ internal: internal,
}, nil
}
@@ -87,7 +102,9 @@ func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
// Close this fifo
func (fifo *LevelQueueByteFIFO) Close() error {
- return fifo.internal.Close()
+ err := fifo.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(fifo.connection)
+ return err
}
// Len returns the length of the fifo
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index 4e05ddd17e..04e7b5d252 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -5,12 +5,10 @@
package queue
import (
- "errors"
- "strings"
-
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/nosql"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v7"
)
// RedisQueueType is the type for redis queue
@@ -75,11 +73,8 @@ type RedisByteFIFO struct {
// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
type RedisByteFIFOConfiguration struct {
- Network string
- Addresses string
- Password string
- DBIndex int
- QueueName string
+ ConnectionString string
+ QueueName string
}
// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
@@ -87,21 +82,7 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
fifo := &RedisByteFIFO{
queueName: config.QueueName,
}
- dbs := strings.Split(config.Addresses, ",")
- if len(dbs) == 0 {
- return nil, errors.New("no redis host specified")
- } else if len(dbs) == 1 {
- fifo.client = redis.NewClient(&redis.Options{
- Network: config.Network,
- Addr: strings.TrimSpace(dbs[0]), // use default Addr
- Password: config.Password, // no password set
- DB: config.DBIndex, // use default DB
- })
- } else {
- fifo.client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: dbs,
- })
- }
+ fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
if err := fifo.client.Ping().Err(); err != nil {
return nil, err
}
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
index bfe7aeed83..dd6ac1a538 100644
--- a/modules/queue/unique_queue_disk.go
+++ b/modules/queue/unique_queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "code.gitea.io/gitea/modules/nosql"
+
"gitea.com/lunny/levelqueue"
)
@@ -14,7 +16,9 @@ const LevelUniqueQueueType Type = "unique-level"
// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
type LevelUniqueQueueConfiguration struct {
ByteFIFOQueueConfiguration
- DataDir string
+ DataDir string
+ ConnectionString string
+ QueueName string
}
// LevelUniqueQueue implements a disk library queue
@@ -34,7 +38,11 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
}
config := configInterface.(LevelUniqueQueueConfiguration)
- byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir)
+ if len(config.ConnectionString) == 0 {
+ config.ConnectionString = config.DataDir
+ }
+
+ byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
return nil, err
}
@@ -55,18 +63,25 @@ var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{}
// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
type LevelUniqueQueueByteFIFO struct {
- internal *levelqueue.UniqueQueue
+ internal *levelqueue.UniqueQueue
+ connection string
}
// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
-func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) {
- internal, err := levelqueue.OpenUnique(dataDir)
+func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) {
+ db, err := nosql.GetManager().GetLevelDB(connection)
+ if err != nil {
+ return nil, err
+ }
+
+ internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false)
if err != nil {
return nil, err
}
return &LevelUniqueQueueByteFIFO{
- internal: internal,
+ connection: connection,
+ internal: internal,
}, nil
}
@@ -96,7 +111,9 @@ func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
// Close this fifo
func (fifo *LevelUniqueQueueByteFIFO) Close() error {
- return fifo.internal.Close()
+ err := fifo.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(fifo.connection)
+ return err
}
func init() {
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
index 9404369075..67efc66bc9 100644
--- a/modules/queue/unique_queue_redis.go
+++ b/modules/queue/unique_queue_redis.go
@@ -4,7 +4,7 @@
package queue
-import "github.com/go-redis/redis"
+import "github.com/go-redis/redis/v7"
// RedisUniqueQueueType is the type for redis queue
const RedisUniqueQueueType Type = "unique-redis"