summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_disk.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_disk.go')
-rw-r--r--modules/queue/queue_disk.go31
1 files changed, 24 insertions, 7 deletions
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index ff0876488b..88b8c414c0 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -5,6 +5,8 @@
package queue
import (
+ "code.gitea.io/gitea/modules/nosql"
+
"gitea.com/lunny/levelqueue"
)
@@ -14,7 +16,9 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
ByteFIFOQueueConfiguration
- DataDir string
+ DataDir string
+ ConnectionString string
+ QueueName string
}
// LevelQueue implements a disk library queue
@@ -30,7 +34,11 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
}
config := configInterface.(LevelQueueConfiguration)
- byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
+ if len(config.ConnectionString) == 0 {
+ config.ConnectionString = config.DataDir
+ }
+
+ byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
if err != nil {
return nil, err
}
@@ -51,18 +59,25 @@ var _ (ByteFIFO) = &LevelQueueByteFIFO{}
// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
type LevelQueueByteFIFO struct {
- internal *levelqueue.Queue
+ internal *levelqueue.Queue
+ connection string
}
// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
-func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
- internal, err := levelqueue.Open(dataDir)
+func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) {
+ db, err := nosql.GetManager().GetLevelDB(connection)
+ if err != nil {
+ return nil, err
+ }
+
+ internal, err := levelqueue.NewQueue(db, []byte(prefix), false)
if err != nil {
return nil, err
}
return &LevelQueueByteFIFO{
- internal: internal,
+ connection: connection,
+ internal: internal,
}, nil
}
@@ -87,7 +102,9 @@ func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
// Close this fifo
func (fifo *LevelQueueByteFIFO) Close() error {
- return fifo.internal.Close()
+ err := fifo.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(fifo.connection)
+ return err
}
// Len returns the length of the fifo