You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 7.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package setting
  4. import (
  5. "path/filepath"
  6. "strconv"
  7. "time"
  8. "code.gitea.io/gitea/modules/container"
  9. "code.gitea.io/gitea/modules/log"
  10. ini "gopkg.in/ini.v1"
  11. )
  12. // QueueSettings represent the settings for a queue from the ini
  13. type QueueSettings struct {
  14. Name string
  15. DataDir string
  16. QueueLength int `ini:"LENGTH"`
  17. BatchLength int
  18. ConnectionString string
  19. Type string
  20. QueueName string
  21. SetName string
  22. WrapIfNecessary bool
  23. MaxAttempts int
  24. Timeout time.Duration
  25. Workers int
  26. MaxWorkers int
  27. BlockTimeout time.Duration
  28. BoostTimeout time.Duration
  29. BoostWorkers int
  30. }
  31. // Queue settings
  32. var Queue = QueueSettings{}
  33. // GetQueueSettings returns the queue settings for the appropriately named queue
  34. func GetQueueSettings(name string) QueueSettings {
  35. q := QueueSettings{}
  36. sec := Cfg.Section("queue." + name)
  37. q.Name = name
  38. // DataDir is not directly inheritable
  39. q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common"))
  40. // QueueName is not directly inheritable either
  41. q.QueueName = name + Queue.QueueName
  42. for _, key := range sec.Keys() {
  43. switch key.Name() {
  44. case "DATADIR":
  45. q.DataDir = key.MustString(q.DataDir)
  46. case "QUEUE_NAME":
  47. q.QueueName = key.MustString(q.QueueName)
  48. case "SET_NAME":
  49. q.SetName = key.MustString(q.SetName)
  50. }
  51. }
  52. if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
  53. q.SetName = q.QueueName + Queue.SetName
  54. }
  55. if !filepath.IsAbs(q.DataDir) {
  56. q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir))
  57. }
  58. _, _ = sec.NewKey("DATADIR", q.DataDir)
  59. // The rest are...
  60. q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
  61. q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
  62. q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
  63. q.Type = sec.Key("TYPE").MustString(Queue.Type)
  64. q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
  65. q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
  66. q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
  67. q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
  68. q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
  69. q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
  70. q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
  71. q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
  72. return q
  73. }
  74. // NewQueueService sets up the default settings for Queues
  75. // This is exported for tests to be able to use the queue
  76. func NewQueueService() {
  77. sec := Cfg.Section("queue")
  78. Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
  79. if !filepath.IsAbs(Queue.DataDir) {
  80. Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
  81. }
  82. Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
  83. Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
  84. Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
  85. defaultType := sec.Key("TYPE").String()
  86. Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
  87. Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
  88. Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
  89. Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
  90. Queue.Workers = sec.Key("WORKERS").MustInt(0)
  91. Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
  92. Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
  93. Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
  94. Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1)
  95. Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
  96. Queue.SetName = sec.Key("SET_NAME").MustString("")
  97. // Now handle the old issue_indexer configuration
  98. // FIXME: DEPRECATED to be removed in v1.18.0
  99. section := Cfg.Section("queue.issue_indexer")
  100. directlySet := toDirectlySetKeysSet(section)
  101. if !directlySet.Contains("TYPE") && defaultType == "" {
  102. switch typ := Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ {
  103. case "levelqueue":
  104. _, _ = section.NewKey("TYPE", "level")
  105. case "channel":
  106. _, _ = section.NewKey("TYPE", "persistable-channel")
  107. case "redis":
  108. _, _ = section.NewKey("TYPE", "redis")
  109. case "":
  110. _, _ = section.NewKey("TYPE", "level")
  111. default:
  112. log.Fatal("Unsupported indexer queue type: %v", typ)
  113. }
  114. }
  115. if !directlySet.Contains("LENGTH") {
  116. length := Cfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0)
  117. if length != 0 {
  118. _, _ = section.NewKey("LENGTH", strconv.Itoa(length))
  119. }
  120. }
  121. if !directlySet.Contains("BATCH_LENGTH") {
  122. fallback := Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0)
  123. if fallback != 0 {
  124. _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(fallback))
  125. }
  126. }
  127. if !directlySet.Contains("DATADIR") {
  128. queueDir := filepath.ToSlash(Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString(""))
  129. if queueDir != "" {
  130. _, _ = section.NewKey("DATADIR", queueDir)
  131. }
  132. }
  133. if !directlySet.Contains("CONN_STR") {
  134. connStr := Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("")
  135. if connStr != "" {
  136. _, _ = section.NewKey("CONN_STR", connStr)
  137. }
  138. }
  139. // FIXME: DEPRECATED to be removed in v1.18.0
  140. // - will need to set default for [queue.*)] LENGTH appropriately though though
  141. // Handle the old mailer configuration
  142. handleOldLengthConfiguration("mailer", "mailer", "SEND_BUFFER_LEN", 100)
  143. // Handle the old test pull requests configuration
  144. // Please note this will be a unique queue
  145. handleOldLengthConfiguration("pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000)
  146. // Handle the old mirror queue configuration
  147. // Please note this will be a unique queue
  148. handleOldLengthConfiguration("mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000)
  149. }
  150. // handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
  151. // if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
  152. func handleOldLengthConfiguration(queueName, oldSection, oldKey string, defaultValue int) {
  153. if Cfg.Section(oldSection).HasKey(oldKey) {
  154. log.Error("Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0", queueName, queueName, oldSection, oldKey)
  155. }
  156. value := Cfg.Section(oldSection).Key(oldKey).MustInt(defaultValue)
  157. // Don't override with 0
  158. if value <= 0 {
  159. return
  160. }
  161. section := Cfg.Section("queue." + queueName)
  162. directlySet := toDirectlySetKeysSet(section)
  163. if !directlySet.Contains("LENGTH") {
  164. _, _ = section.NewKey("LENGTH", strconv.Itoa(value))
  165. }
  166. }
  167. // toDirectlySetKeysSet returns a set of keys directly set by this section
  168. // Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
  169. // but this section does not.
  170. func toDirectlySetKeysSet(section *ini.Section) container.Set[string] {
  171. sections := make(container.Set[string])
  172. for _, key := range section.Keys() {
  173. sections.Add(key.Name())
  174. }
  175. return sections
  176. }