aboutsummaryrefslogtreecommitdiffstats
path: root/modules/setting/queue.go
blob: 8673537b525693daa1e68b8f7459cf1227fda7c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2019 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package setting

import (
	"path/filepath"

	"code.gitea.io/gitea/modules/json"
	"code.gitea.io/gitea/modules/log"
)

// QueueSettings represent the settings for a queue from the ini
type QueueSettings struct {
	Name string // not an INI option, it is the name for [queue.the-name] section

	Type    string
	Datadir string
	ConnStr string // for leveldb or redis
	Length  int    // max queue length before blocking

	QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue

	BatchLength int
	MaxWorkers  int
}

var queueSettingsDefault = QueueSettings{
	Type:    "level",         // dummy, channel, level, redis
	Datadir: "queues/common", // relative to AppDataPath
	Length:  100,             // queue length before a channel queue will block

	QueueName:   "_queue",
	SetName:     "_unique",
	BatchLength: 20,
	MaxWorkers:  10,
}

func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
	// deep copy default settings
	cfg := QueueSettings{}
	if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
		return cfg, err
	} else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
		return cfg, err
	}

	cfg.Name = name
	if sec, err := rootCfg.GetSection("queue"); err == nil {
		if err = sec.MapTo(&cfg); err != nil {
			log.Error("Failed to map queue common config for %q: %v", name, err)
			return cfg, nil
		}
	}
	if sec, err := rootCfg.GetSection("queue." + name); err == nil {
		if err = sec.MapTo(&cfg); err != nil {
			log.Error("Failed to map queue spec config for %q: %v", name, err)
			return cfg, nil
		}
		if sec.HasKey("CONN_STR") {
			cfg.ConnStr = sec.Key("CONN_STR").String()
		}
	}

	if cfg.Datadir == "" {
		cfg.Datadir = queueSettingsDefault.Datadir
	}
	if !filepath.IsAbs(cfg.Datadir) {
		cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
	}
	cfg.Datadir = filepath.ToSlash(cfg.Datadir)

	if cfg.Type == "redis" && cfg.ConnStr == "" {
		cfg.ConnStr = "redis://127.0.0.1:6379/0"
	}

	if cfg.Length <= 0 {
		cfg.Length = queueSettingsDefault.Length
	}
	if cfg.MaxWorkers <= 0 {
		cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
	}
	if cfg.BatchLength <= 0 {
		cfg.BatchLength = queueSettingsDefault.BatchLength
	}

	return cfg, nil
}

func LoadQueueSettings() {
	loadQueueFrom(CfgProvider)
}

func loadQueueFrom(rootCfg ConfigProvider) {
	hasOld := false
	handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
		if rootCfg.Section(oldSection).HasKey(oldKey) {
			hasOld = true
			log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
		}
	}
	handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
	handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
	handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
	handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
	handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
	handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
	handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
	handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
	if hasOld {
		log.Fatal("Please update your app.ini to remove deprecated config options")
	}
}