You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 6.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package setting
  5. import (
  6. "path/filepath"
  7. "strconv"
  8. "time"
  9. "code.gitea.io/gitea/modules/log"
  10. ini "gopkg.in/ini.v1"
  11. )
  12. // QueueSettings represent the settings for a queue from the ini
  13. type QueueSettings struct {
  14. Name string
  15. DataDir string
  16. QueueLength int `ini:"LENGTH"`
  17. BatchLength int
  18. ConnectionString string
  19. Type string
  20. QueueName string
  21. SetName string
  22. WrapIfNecessary bool
  23. MaxAttempts int
  24. Timeout time.Duration
  25. Workers int
  26. MaxWorkers int
  27. BlockTimeout time.Duration
  28. BoostTimeout time.Duration
  29. BoostWorkers int
  30. }
  31. // Queue settings
  32. var Queue = QueueSettings{}
  33. // GetQueueSettings returns the queue settings for the appropriately named queue
  34. func GetQueueSettings(name string) QueueSettings {
  35. q := QueueSettings{}
  36. sec := Cfg.Section("queue." + name)
  37. q.Name = name
  38. // DataDir is not directly inheritable
  39. q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common"))
  40. // QueueName is not directly inheritable either
  41. q.QueueName = name + Queue.QueueName
  42. for _, key := range sec.Keys() {
  43. switch key.Name() {
  44. case "DATADIR":
  45. q.DataDir = key.MustString(q.DataDir)
  46. case "QUEUE_NAME":
  47. q.QueueName = key.MustString(q.QueueName)
  48. case "SET_NAME":
  49. q.SetName = key.MustString(q.SetName)
  50. }
  51. }
  52. if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
  53. q.SetName = q.QueueName + Queue.SetName
  54. }
  55. if !filepath.IsAbs(q.DataDir) {
  56. q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir))
  57. }
  58. _, _ = sec.NewKey("DATADIR", q.DataDir)
  59. // The rest are...
  60. q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
  61. q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
  62. q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
  63. q.Type = sec.Key("TYPE").MustString(Queue.Type)
  64. q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
  65. q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
  66. q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
  67. q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
  68. q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
  69. q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
  70. q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
  71. q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
  72. return q
  73. }
  74. // NewQueueService sets up the default settings for Queues
  75. // This is exported for tests to be able to use the queue
  76. func NewQueueService() {
  77. sec := Cfg.Section("queue")
  78. Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
  79. if !filepath.IsAbs(Queue.DataDir) {
  80. Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
  81. }
  82. Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
  83. Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
  84. Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
  85. defaultType := sec.Key("TYPE").String()
  86. Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
  87. Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
  88. Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
  89. Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
  90. Queue.Workers = sec.Key("WORKERS").MustInt(0)
  91. Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
  92. Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
  93. Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
  94. Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1)
  95. Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
  96. Queue.SetName = sec.Key("SET_NAME").MustString("")
  97. // Now handle the old issue_indexer configuration
  98. section := Cfg.Section("queue.issue_indexer")
  99. directlySet := toDirectlySetKeysMap(section)
  100. if !directlySet["TYPE"] && defaultType == "" {
  101. switch Indexer.IssueQueueType {
  102. case LevelQueueType:
  103. _, _ = section.NewKey("TYPE", "level")
  104. case ChannelQueueType:
  105. _, _ = section.NewKey("TYPE", "persistable-channel")
  106. case RedisQueueType:
  107. _, _ = section.NewKey("TYPE", "redis")
  108. case "":
  109. _, _ = section.NewKey("TYPE", "level")
  110. default:
  111. log.Fatal("Unsupported indexer queue type: %v",
  112. Indexer.IssueQueueType)
  113. }
  114. }
  115. if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
  116. _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
  117. }
  118. if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
  119. _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
  120. }
  121. if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
  122. _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
  123. }
  124. if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
  125. _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
  126. }
  127. // Handle the old mailer configuration
  128. handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))
  129. // Handle the old test pull requests configuration
  130. // Please note this will be a unique queue
  131. handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))
  132. // Handle the old mirror queue configuration
  133. // Please note this will be a unique queue
  134. handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
  135. }
  136. // handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
  137. // if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
  138. func handleOldLengthConfiguration(queueName string, value int) {
  139. // Don't override with 0
  140. if value <= 0 {
  141. return
  142. }
  143. section := Cfg.Section("queue." + queueName)
  144. directlySet := toDirectlySetKeysMap(section)
  145. if !directlySet["LENGTH"] {
  146. _, _ = section.NewKey("LENGTH", strconv.Itoa(value))
  147. }
  148. }
  149. // toDirectlySetKeysMap returns a bool map of keys directly set by this section
  150. // Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
  151. // but this section does not.
  152. func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
  153. sectionMap := map[string]bool{}
  154. for _, key := range section.Keys() {
  155. sectionMap[key.Name()] = true
  156. }
  157. return sectionMap
  158. }