diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/queue/unique_queue_channel.go | 29 | ||||
-rw-r--r-- | modules/setting/mailer.go | 2 | ||||
-rw-r--r-- | modules/setting/queue.go | 64 | ||||
-rw-r--r-- | modules/setting/repository.go | 4 |
4 files changed, 61 insertions, 38 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d3..f617595c04 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[Data]bool + table map[string]bool shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, + table: map[string]bool{}, shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { + // No error is possible here because PushFunc ensures that this can be marshalled + bs, _ := json.Marshal(datum) + queue.lock.Lock() - delete(queue.table, datum) + delete(queue.table, string(bs)) queue.lock.Unlock() + handle(datum) } }, config.WorkerPoolConfiguration) @@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } + + bs, err := json.Marshal(data) + if err != nil { + return err + } q.lock.Lock() locked := true defer func() { @@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[data]; ok { + if _, ok := q.table[string(bs)]; ok { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[data] = true + q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, data) + delete(q.table, string(bs)) return err } } @@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { // Has checks if the data is in the queue func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[data] + _, has := q.table[string(bs)] return has, nil } diff --git a/modules/setting/mailer.go b/modules/setting/mailer.go index a2228e938b..d2fac440ac 100644 --- a/modules/setting/mailer.go +++ b/modules/setting/mailer.go @@ -16,7 +16,6 @@ import ( // Mailer represents mail service. type Mailer struct { // Mailer - QueueLength int Name string From string FromName string @@ -54,7 +53,6 @@ func newMailService() { } MailService = &Mailer{ - QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100), Name: sec.Key("NAME").MustString(AppName), SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false), MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}), diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 76b7dc1faf..1668cc63a3 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -5,11 +5,12 @@ package setting import ( - "fmt" "path/filepath" + "strconv" "time" "code.gitea.io/gitea/modules/log" + ini "gopkg.in/ini.v1" ) // QueueSettings represent the settings for a queue from the ini @@ -106,11 +107,8 @@ func NewQueueService() { // Now handle the old issue_indexer configuration section := Cfg.Section("queue.issue_indexer") - sectionMap := map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" { + directlySet := toDirectlySetKeysMap(section) + if !directlySet["TYPE"] && defaultType == "" { switch Indexer.IssueQueueType { case LevelQueueType: _, _ = section.NewKey("TYPE", "level") @@ -125,37 +123,53 @@ func NewQueueService() { Indexer.IssueQueueType) } } - if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 { + _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength)) } - if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 { - _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 { + _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber)) } - if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" { + if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" { _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir) } - if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" { + if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" { _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr) } // Handle the old mailer configuration - section = Cfg.Section("queue.mailer") - sectionMap = map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) - } + handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)) // Handle the old test pull requests configuration // Please note this will be a unique queue - section = Cfg.Section("queue.pr_patch_checker") - sectionMap = map[string]bool{} + handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000)) + + // Handle the old mirror queue configuration + // Please note this will be a unique queue + handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000)) +} + +// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but +// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0) +func handleOldLengthConfiguration(queueName string, value int) { + // Don't override with 0 + if value <= 0 { + return + } + + section := Cfg.Section("queue." + queueName) + directlySet := toDirectlySetKeysMap(section) + if !directlySet["LENGTH"] { + _, _ = section.NewKey("LENGTH", strconv.Itoa(value)) + } +} + +// toDirectlySetKeysMap returns a bool map of keys directly set by this section +// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key +// but this section does not. +func toDirectlySetKeysMap(section *ini.Section) map[string]bool { + sectionMap := map[string]bool{} for _, key := range section.Keys() { sectionMap[key.Name()] = true } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) - } + return sectionMap } diff --git a/modules/setting/repository.go b/modules/setting/repository.go index de57eb9140..0791602efb 100644 --- a/modules/setting/repository.go +++ b/modules/setting/repository.go @@ -29,8 +29,6 @@ var ( DefaultPrivate string DefaultPushCreatePrivate bool MaxCreationLimit int - MirrorQueueLength int - PullRequestQueueLength int PreferredLicenses []string DisableHTTPGit bool AccessControlAllowOrigin string @@ -142,8 +140,6 @@ var ( DefaultPrivate: RepoCreatingLastUserVisibility, DefaultPushCreatePrivate: true, MaxCreationLimit: -1, - MirrorQueueLength: 1000, - PullRequestQueueLength: 1000, PreferredLicenses: []string{"Apache License 2.0", "MIT License"}, DisableHTTPGit: false, AccessControlAllowOrigin: "", |