From c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3 Mon Sep 17 00:00:00 2001 From: zeripath Date: Wed, 29 Jan 2020 01:01:06 +0000 Subject: Queue: Make WorkerPools and Queues flushable (#10001) * Make WorkerPools and Queues flushable Adds Flush methods to Queues and the WorkerPool Further abstracts the WorkerPool Adds a final step to Flush the queues in the defer from PrintCurrentTest Fixes an issue with Settings inheritance in queues Signed-off-by: Andrew Thornton * Change to for loop * Add IsEmpty and begin just making the queues composed WorkerPools * subsume workerpool into the queues and create a flushable interface * Add manager command * Move flushall to queue.Manager and add to testlogger * As per @guillep2k * as per @guillep2k * Just make queues all implement flushable and clean up the wrapped queue flushes * cope with no timeout Co-authored-by: Lauris BH --- modules/setting/queue.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'modules/setting') diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 8c07685855..934c5a8108 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -59,7 +59,7 @@ func GetQueueSettings(name string) QueueSettings { if !filepath.IsAbs(q.DataDir) { q.DataDir = filepath.Join(AppDataPath, q.DataDir) } - sec.Key("DATADIR").SetValue(q.DataDir) + _, _ = sec.NewKey("DATADIR", q.DataDir) // The rest are... q.Length = sec.Key("LENGTH").MustInt(Queue.Length) q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) @@ -89,7 +89,7 @@ func NewQueueService() { Queue.Length = sec.Key("LENGTH").MustInt(20) Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, "")) - Queue.Type = sec.Key("TYPE").MustString("") + Queue.Type = sec.Key("TYPE").MustString("persistable-channel") Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString) Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) @@ -110,27 +110,27 @@ func NewQueueService() { if _, ok := sectionMap["TYPE"]; !ok { switch Indexer.IssueQueueType { case LevelQueueType: - section.Key("TYPE").SetValue("level") + _, _ = section.NewKey("TYPE", "level") case ChannelQueueType: - section.Key("TYPE").SetValue("persistable-channel") + _, _ = section.NewKey("TYPE", "persistable-channel") case RedisQueueType: - section.Key("TYPE").SetValue("redis") + _, _ = section.NewKey("TYPE", "redis") default: log.Fatal("Unsupported indexer queue type: %v", Indexer.IssueQueueType) } } if _, ok := sectionMap["LENGTH"]; !ok { - section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength)) } if _, ok := sectionMap["BATCH_LENGTH"]; !ok { - section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) } if _, ok := sectionMap["DATADIR"]; !ok { - section.Key("DATADIR").SetValue(Indexer.IssueQueueDir) + _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir) } if _, ok := sectionMap["CONN_STR"]; !ok { - section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr) + _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr) } // Handle the old mailer configuration @@ -140,7 +140,7 @@ func NewQueueService() { sectionMap[key.Name()] = true } if _, ok := sectionMap["LENGTH"]; !ok { - section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) + _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) } } -- cgit v1.2.3