You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package setting
  4. import (
  5. "path/filepath"
  6. "runtime"
  7. "code.gitea.io/gitea/modules/json"
  8. "code.gitea.io/gitea/modules/log"
  9. )
  10. // QueueSettings represent the settings for a queue from the ini
  11. type QueueSettings struct {
  12. Name string // not an INI option, it is the name for [queue.the-name] section
  13. Type string
  14. Datadir string
  15. ConnStr string // for leveldb or redis
  16. Length int // max queue length before blocking
  17. QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
  18. BatchLength int
  19. MaxWorkers int
  20. }
  21. func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
  22. queueSettingsDefault := QueueSettings{
  23. Type: "level", // dummy, channel, level, redis
  24. Datadir: "queues/common", // relative to AppDataPath
  25. Length: 100000, // queue length before a channel queue will block
  26. QueueName: "_queue",
  27. SetName: "_unique",
  28. BatchLength: 20,
  29. MaxWorkers: runtime.NumCPU() / 2,
  30. }
  31. if queueSettingsDefault.MaxWorkers < 1 {
  32. queueSettingsDefault.MaxWorkers = 1
  33. }
  34. if queueSettingsDefault.MaxWorkers > 10 {
  35. queueSettingsDefault.MaxWorkers = 10
  36. }
  37. // deep copy default settings
  38. cfg := QueueSettings{}
  39. if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
  40. return cfg, err
  41. } else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
  42. return cfg, err
  43. }
  44. cfg.Name = name
  45. if sec, err := rootCfg.GetSection("queue"); err == nil {
  46. if err = sec.MapTo(&cfg); err != nil {
  47. log.Error("Failed to map queue common config for %q: %v", name, err)
  48. return cfg, nil
  49. }
  50. }
  51. if sec, err := rootCfg.GetSection("queue." + name); err == nil {
  52. if err = sec.MapTo(&cfg); err != nil {
  53. log.Error("Failed to map queue spec config for %q: %v", name, err)
  54. return cfg, nil
  55. }
  56. if sec.HasKey("CONN_STR") {
  57. cfg.ConnStr = sec.Key("CONN_STR").String()
  58. }
  59. }
  60. if cfg.Datadir == "" {
  61. cfg.Datadir = queueSettingsDefault.Datadir
  62. }
  63. if !filepath.IsAbs(cfg.Datadir) {
  64. cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
  65. }
  66. cfg.Datadir = filepath.ToSlash(cfg.Datadir)
  67. if cfg.Type == "redis" && cfg.ConnStr == "" {
  68. cfg.ConnStr = "redis://127.0.0.1:6379/0"
  69. }
  70. if cfg.Length <= 0 {
  71. cfg.Length = queueSettingsDefault.Length
  72. }
  73. if cfg.MaxWorkers <= 0 {
  74. cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
  75. }
  76. if cfg.BatchLength <= 0 {
  77. cfg.BatchLength = queueSettingsDefault.BatchLength
  78. }
  79. return cfg, nil
  80. }
  81. func LoadQueueSettings() {
  82. loadQueueFrom(CfgProvider)
  83. }
  84. func loadQueueFrom(rootCfg ConfigProvider) {
  85. hasOld := false
  86. handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
  87. if rootCfg.Section(oldSection).HasKey(oldKey) {
  88. hasOld = true
  89. log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
  90. }
  91. }
  92. handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
  93. handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
  94. handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
  95. handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
  96. handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
  97. handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
  98. handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
  99. handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
  100. if hasOld {
  101. log.Fatal("Please update your app.ini to remove deprecated config options")
  102. }
  103. }