summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--custom/conf/app.example.ini4
-rw-r--r--docs/content/doc/advanced/config-cheat-sheet.en-us.md39
-rw-r--r--modules/queue/unique_queue_channel.go29
-rw-r--r--modules/setting/mailer.go2
-rw-r--r--modules/setting/queue.go64
-rw-r--r--modules/setting/repository.go4
-rw-r--r--services/mirror/mirror.go98
7 files changed, 164 insertions, 76 deletions
diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini
index bdc42480e4..1753ed2330 100644
--- a/custom/conf/app.example.ini
+++ b/custom/conf/app.example.ini
@@ -769,10 +769,10 @@ PATH =
;; Global limit of repositories per user, applied at creation time. -1 means no limit
;MAX_CREATION_LIMIT = -1
;;
-;; Mirror sync queue length, increase if mirror syncing starts hanging
+;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
;MIRROR_QUEUE_LENGTH = 1000
;;
-;; Patch test queue length, increase if pull request patch testing starts hanging
+;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
;PULL_REQUEST_QUEUE_LENGTH = 1000
;;
;; Preferred Licenses to place at the top of the List
diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
index 251f6bd51a..91c62dbec3 100644
--- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md
+++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
@@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
`-1` means no limit.
-- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
+- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
as large as possible. Use caution when editing this value.
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
- testing starts hanging.
+ testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
the top of the list. Name must match file name in options/license or custom/options/license.
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
@@ -382,6 +382,8 @@ relation to port exhaustion.
## Queue (`queue` and `queue.*`)
+Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.)
+
- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
- `LENGTH`: **20**: Maximal queue size before channel queues block
@@ -400,6 +402,37 @@ relation to port exhaustion.
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.
+Gitea creates the following non-unique queues:
+
+- `code_indexer`
+- `issue_indexer`
+- `notification-service`
+- `task`
+- `mail`
+- `push_update`
+
+And the following unique queues:
+
+- `repo_stats_update`
+- `repo-archive`
+- `mirror`
+- `pr_patch_checker`
+
+Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration):
+
+- `[queue.issue_indexer]`
+ - `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set.
+ - `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set.
+ - `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set.
+ - `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set.
+ - `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set.
+- `[queue.mailer]`
+ - `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is.
+- `[queue.pr_patch_checker]`
+ - `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is.
+- `[queue.mirror]`
+ - `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is.
+
## Admin (`admin`)
- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
@@ -588,7 +621,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
command or full path).
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
-- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
+- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`
## Cache (`cache`)
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index 5bec67c4d3..f617595c04 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -9,6 +9,7 @@ import (
"fmt"
"sync"
+ "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
@@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
- table map[Data]bool
+ table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
@@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelUniqueQueue{
- table: map[Data]bool{},
+ table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
@@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
+ // No error is possible here because PushFunc ensures that this can be marshalled
+ bs, _ := json.Marshal(datum)
+
queue.lock.Lock()
- delete(queue.table, datum)
+ delete(queue.table, string(bs))
queue.lock.Unlock()
+
handle(datum)
}
}, config.WorkerPoolConfiguration)
@@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
+
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
q.lock.Lock()
locked := true
defer func() {
@@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
q.lock.Unlock()
}
}()
- if _, ok := q.table[data]; ok {
+ if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
- q.table[data] = true
+ q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
- delete(q.table, data)
+ delete(q.table, string(bs))
return err
}
}
@@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return false, err
+ }
+
q.lock.Lock()
defer q.lock.Unlock()
- _, has := q.table[data]
+ _, has := q.table[string(bs)]
return has, nil
}
diff --git a/modules/setting/mailer.go b/modules/setting/mailer.go
index a2228e938b..d2fac440ac 100644
--- a/modules/setting/mailer.go
+++ b/modules/setting/mailer.go
@@ -16,7 +16,6 @@ import (
// Mailer represents mail service.
type Mailer struct {
// Mailer
- QueueLength int
Name string
From string
FromName string
@@ -54,7 +53,6 @@ func newMailService() {
}
MailService = &Mailer{
- QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100),
Name: sec.Key("NAME").MustString(AppName),
SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false),
MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}),
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index 76b7dc1faf..1668cc63a3 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -5,11 +5,12 @@
package setting
import (
- "fmt"
"path/filepath"
+ "strconv"
"time"
"code.gitea.io/gitea/modules/log"
+ ini "gopkg.in/ini.v1"
)
// QueueSettings represent the settings for a queue from the ini
@@ -106,11 +107,8 @@ func NewQueueService() {
// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
- sectionMap := map[string]bool{}
- for _, key := range section.Keys() {
- sectionMap[key.Name()] = true
- }
- if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" {
+ directlySet := toDirectlySetKeysMap(section)
+ if !directlySet["TYPE"] && defaultType == "" {
switch Indexer.IssueQueueType {
case LevelQueueType:
_, _ = section.NewKey("TYPE", "level")
@@ -125,37 +123,53 @@ func NewQueueService() {
Indexer.IssueQueueType)
}
}
- if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 {
- _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
+ if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
+ _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
}
- if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 {
- _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
+ if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
+ _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
}
- if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" {
+ if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
}
- if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" {
+ if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
}
// Handle the old mailer configuration
- section = Cfg.Section("queue.mailer")
- sectionMap = map[string]bool{}
- for _, key := range section.Keys() {
- sectionMap[key.Name()] = true
- }
- if _, ok := sectionMap["LENGTH"]; !ok {
- _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
- }
+ handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))
// Handle the old test pull requests configuration
// Please note this will be a unique queue
- section = Cfg.Section("queue.pr_patch_checker")
- sectionMap = map[string]bool{}
+ handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))
+
+ // Handle the old mirror queue configuration
+ // Please note this will be a unique queue
+ handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
+}
+
+// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
+// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
+func handleOldLengthConfiguration(queueName string, value int) {
+ // Don't override with 0
+ if value <= 0 {
+ return
+ }
+
+ section := Cfg.Section("queue." + queueName)
+ directlySet := toDirectlySetKeysMap(section)
+ if !directlySet["LENGTH"] {
+ _, _ = section.NewKey("LENGTH", strconv.Itoa(value))
+ }
+}
+
+// toDirectlySetKeysMap returns a bool map of keys directly set by this section
+// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
+// but this section does not.
+func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
+ sectionMap := map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
- if _, ok := sectionMap["LENGTH"]; !ok {
- _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
- }
+ return sectionMap
}
diff --git a/modules/setting/repository.go b/modules/setting/repository.go
index de57eb9140..0791602efb 100644
--- a/modules/setting/repository.go
+++ b/modules/setting/repository.go
@@ -29,8 +29,6 @@ var (
DefaultPrivate string
DefaultPushCreatePrivate bool
MaxCreationLimit int
- MirrorQueueLength int
- PullRequestQueueLength int
PreferredLicenses []string
DisableHTTPGit bool
AccessControlAllowOrigin string
@@ -142,8 +140,6 @@ var (
DefaultPrivate: RepoCreatingLastUserVisibility,
DefaultPushCreatePrivate: true,
MaxCreationLimit: -1,
- MirrorQueueLength: 1000,
- PullRequestQueueLength: 1000,
PreferredLicenses: []string{"Apache License 2.0", "MIT License"},
DisableHTTPGit: false,
AccessControlAllowOrigin: "",
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 7a3e37d993..eb37639bef 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -7,18 +7,43 @@ package mirror
import (
"context"
"fmt"
- "strconv"
- "strings"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/sync"
)
-// mirrorQueue holds an UniqueQueue object of the mirror
-var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
+var mirrorQueue queue.UniqueQueue
+
+// SyncType type of sync request
+type SyncType int
+
+const (
+ // PullMirrorType for pull mirrors
+ PullMirrorType SyncType = iota
+ // PushMirrorType for push mirrors
+ PushMirrorType
+)
+
+// SyncRequest for the mirror queue
+type SyncRequest struct {
+ Type SyncType
+ RepoID int64
+}
+
+// doMirrorSync causes this request to mirror itself
+func doMirrorSync(ctx context.Context, req *SyncRequest) {
+ switch req.Type {
+ case PushMirrorType:
+ _ = SyncPushMirror(ctx, req.RepoID)
+ case PullMirrorType:
+ _ = SyncPullMirror(ctx, req.RepoID)
+ default:
+ log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID)
+ }
+}
// Update checks and updates mirror repositories.
func Update(ctx context.Context) error {
@@ -29,19 +54,25 @@ func Update(ctx context.Context) error {
log.Trace("Doing: Update")
handler := func(idx int, bean interface{}) error {
- var item string
+ var item SyncRequest
if m, ok := bean.(*models.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
- item = fmt.Sprintf("pull %d", m.RepoID)
+ item = SyncRequest{
+ Type: PullMirrorType,
+ RepoID: m.RepoID,
+ }
} else if m, ok := bean.(*models.PushMirror); ok {
if m.Repo == nil {
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
- item = fmt.Sprintf("push %d", m.ID)
+ item = SyncRequest{
+ Type: PushMirrorType,
+ RepoID: m.RepoID,
+ }
} else {
log.Error("Unknown bean: %v", bean)
return nil
@@ -51,8 +82,7 @@ func Update(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("Aborted")
default:
- mirrorQueue.Add(item)
- return nil
+ return mirrorQueue.Push(&item)
}
}
@@ -68,26 +98,10 @@ func Update(ctx context.Context) error {
return nil
}
-// syncMirrors checks and syncs mirrors.
-// FIXME: graceful: this should be a persistable queue
-func syncMirrors(ctx context.Context) {
- // Start listening on new sync requests.
- for {
- select {
- case <-ctx.Done():
- mirrorQueue.Close()
- return
- case item := <-mirrorQueue.Queue():
- id, _ := strconv.ParseInt(item[5:], 10, 64)
- if strings.HasPrefix(item, "pull") {
- _ = SyncPullMirror(ctx, id)
- } else if strings.HasPrefix(item, "push") {
- _ = SyncPushMirror(ctx, id)
- } else {
- log.Error("Unknown item in queue: %v", item)
- }
- mirrorQueue.Remove(item)
- }
+func queueHandle(data ...queue.Data) {
+ for _, datum := range data {
+ req := datum.(*SyncRequest)
+ doMirrorSync(graceful.GetManager().ShutdownContext(), req)
}
}
@@ -96,7 +110,9 @@ func InitSyncMirrors() {
if !setting.Mirror.Enabled {
return
}
- go graceful.GetManager().RunWithShutdownContext(syncMirrors)
+ mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
+
+ go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
// StartToMirror adds repoID to mirror queue
@@ -104,7 +120,15 @@ func StartToMirror(repoID int64) {
if !setting.Mirror.Enabled {
return
}
- go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID))
+ go func() {
+ err := mirrorQueue.Push(&SyncRequest{
+ Type: PullMirrorType,
+ RepoID: repoID,
+ })
+ if err != nil {
+ log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err)
+ }
+ }()
}
// AddPushMirrorToQueue adds the push mirror to the queue
@@ -112,5 +136,13 @@ func AddPushMirrorToQueue(mirrorID int64) {
if !setting.Mirror.Enabled {
return
}
- go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID))
+ go func() {
+ err := mirrorQueue.Push(&SyncRequest{
+ Type: PushMirrorType,
+ RepoID: mirrorID,
+ })
+ if err != nil {
+ log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err)
+ }
+ }()
}