aboutsummaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/unique_queue_channel.go29
-rw-r--r--modules/setting/mailer.go2
-rw-r--r--modules/setting/queue.go64
-rw-r--r--modules/setting/repository.go4
4 files changed, 61 insertions, 38 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index 5bec67c4d3..f617595c04 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -9,6 +9,7 @@ import (
"fmt"
"sync"
+ "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
@@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
- table map[Data]bool
+ table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
@@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelUniqueQueue{
- table: map[Data]bool{},
+ table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
@@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
+ // No error is possible here because PushFunc ensures that this can be marshalled
+ bs, _ := json.Marshal(datum)
+
queue.lock.Lock()
- delete(queue.table, datum)
+ delete(queue.table, string(bs))
queue.lock.Unlock()
+
handle(datum)
}
}, config.WorkerPoolConfiguration)
@@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
+
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
q.lock.Lock()
locked := true
defer func() {
@@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
q.lock.Unlock()
}
}()
- if _, ok := q.table[data]; ok {
+ if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
- q.table[data] = true
+ q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
- delete(q.table, data)
+ delete(q.table, string(bs))
return err
}
}
@@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return false, err
+ }
+
q.lock.Lock()
defer q.lock.Unlock()
- _, has := q.table[data]
+ _, has := q.table[string(bs)]
return has, nil
}
diff --git a/modules/setting/mailer.go b/modules/setting/mailer.go
index a2228e938b..d2fac440ac 100644
--- a/modules/setting/mailer.go
+++ b/modules/setting/mailer.go
@@ -16,7 +16,6 @@ import (
// Mailer represents mail service.
type Mailer struct {
// Mailer
- QueueLength int
Name string
From string
FromName string
@@ -54,7 +53,6 @@ func newMailService() {
}
MailService = &Mailer{
- QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100),
Name: sec.Key("NAME").MustString(AppName),
SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false),
MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}),
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index 76b7dc1faf..1668cc63a3 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -5,11 +5,12 @@
package setting
import (
- "fmt"
"path/filepath"
+ "strconv"
"time"
"code.gitea.io/gitea/modules/log"
+ ini "gopkg.in/ini.v1"
)
// QueueSettings represent the settings for a queue from the ini
@@ -106,11 +107,8 @@ func NewQueueService() {
// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
- sectionMap := map[string]bool{}
- for _, key := range section.Keys() {
- sectionMap[key.Name()] = true
- }
- if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" {
+ directlySet := toDirectlySetKeysMap(section)
+ if !directlySet["TYPE"] && defaultType == "" {
switch Indexer.IssueQueueType {
case LevelQueueType:
_, _ = section.NewKey("TYPE", "level")
@@ -125,37 +123,53 @@ func NewQueueService() {
Indexer.IssueQueueType)
}
}
- if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 {
- _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
+ if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
+ _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
}
- if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 {
- _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
+ if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
+ _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
}
- if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" {
+ if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
}
- if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" {
+ if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
}
// Handle the old mailer configuration
- section = Cfg.Section("queue.mailer")
- sectionMap = map[string]bool{}
- for _, key := range section.Keys() {
- sectionMap[key.Name()] = true
- }
- if _, ok := sectionMap["LENGTH"]; !ok {
- _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
- }
+ handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))
// Handle the old test pull requests configuration
// Please note this will be a unique queue
- section = Cfg.Section("queue.pr_patch_checker")
- sectionMap = map[string]bool{}
+ handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))
+
+ // Handle the old mirror queue configuration
+ // Please note this will be a unique queue
+ handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
+}
+
+// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
+// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
+func handleOldLengthConfiguration(queueName string, value int) {
+ // Don't override with 0
+ if value <= 0 {
+ return
+ }
+
+ section := Cfg.Section("queue." + queueName)
+ directlySet := toDirectlySetKeysMap(section)
+ if !directlySet["LENGTH"] {
+ _, _ = section.NewKey("LENGTH", strconv.Itoa(value))
+ }
+}
+
+// toDirectlySetKeysMap returns a bool map of keys directly set by this section
+// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
+// but this section does not.
+func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
+ sectionMap := map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
- if _, ok := sectionMap["LENGTH"]; !ok {
- _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
- }
+ return sectionMap
}
diff --git a/modules/setting/repository.go b/modules/setting/repository.go
index de57eb9140..0791602efb 100644
--- a/modules/setting/repository.go
+++ b/modules/setting/repository.go
@@ -29,8 +29,6 @@ var (
DefaultPrivate string
DefaultPushCreatePrivate bool
MaxCreationLimit int
- MirrorQueueLength int
- PullRequestQueueLength int
PreferredLicenses []string
DisableHTTPGit bool
AccessControlAllowOrigin string
@@ -142,8 +140,6 @@ var (
DefaultPrivate: RepoCreatingLastUserVisibility,
DefaultPushCreatePrivate: true,
MaxCreationLimit: -1,
- MirrorQueueLength: 1000,
- PullRequestQueueLength: 1000,
PreferredLicenses: []string{"Apache License 2.0", "MIT License"},
DisableHTTPGit: false,
AccessControlAllowOrigin: "",