You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 6.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package setting
  5. import (
  6. "fmt"
  7. "path/filepath"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "code.gitea.io/gitea/modules/log"
  12. )
  13. // QueueSettings represent the settings for a queue from the ini
  14. type QueueSettings struct {
  15. DataDir string
  16. Length int
  17. BatchLength int
  18. ConnectionString string
  19. Type string
  20. Network string
  21. Addresses string
  22. Password string
  23. QueueName string
  24. SetName string
  25. DBIndex int
  26. WrapIfNecessary bool
  27. MaxAttempts int
  28. Timeout time.Duration
  29. Workers int
  30. MaxWorkers int
  31. BlockTimeout time.Duration
  32. BoostTimeout time.Duration
  33. BoostWorkers int
  34. }
  35. // Queue settings
  36. var Queue = QueueSettings{}
  37. // GetQueueSettings returns the queue settings for the appropriately named queue
  38. func GetQueueSettings(name string) QueueSettings {
  39. q := QueueSettings{}
  40. sec := Cfg.Section("queue." + name)
  41. // DataDir is not directly inheritable
  42. q.DataDir = filepath.Join(Queue.DataDir, name)
  43. // QueueName is not directly inheritable either
  44. q.QueueName = name + Queue.QueueName
  45. for _, key := range sec.Keys() {
  46. switch key.Name() {
  47. case "DATADIR":
  48. q.DataDir = key.MustString(q.DataDir)
  49. case "QUEUE_NAME":
  50. q.QueueName = key.MustString(q.QueueName)
  51. case "SET_NAME":
  52. q.SetName = key.MustString(q.SetName)
  53. }
  54. }
  55. if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
  56. q.SetName = q.QueueName + Queue.SetName
  57. }
  58. if !filepath.IsAbs(q.DataDir) {
  59. q.DataDir = filepath.Join(AppDataPath, q.DataDir)
  60. }
  61. _, _ = sec.NewKey("DATADIR", q.DataDir)
  62. // The rest are...
  63. q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
  64. q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
  65. q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
  66. q.Type = sec.Key("TYPE").MustString(Queue.Type)
  67. q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
  68. q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
  69. q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
  70. q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
  71. q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
  72. q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
  73. q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
  74. q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
  75. q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
  76. return q
  77. }
  78. // NewQueueService sets up the default settings for Queues
  79. // This is exported for tests to be able to use the queue
  80. func NewQueueService() {
  81. sec := Cfg.Section("queue")
  82. Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
  83. if !filepath.IsAbs(Queue.DataDir) {
  84. Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
  85. }
  86. Queue.Length = sec.Key("LENGTH").MustInt(20)
  87. Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
  88. Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
  89. Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
  90. Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
  91. Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
  92. Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
  93. Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
  94. Queue.Workers = sec.Key("WORKERS").MustInt(1)
  95. Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
  96. Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
  97. Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
  98. Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
  99. Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
  100. Queue.SetName = sec.Key("SET_NAME").MustString("")
  101. // Now handle the old issue_indexer configuration
  102. section := Cfg.Section("queue.issue_indexer")
  103. sectionMap := map[string]bool{}
  104. for _, key := range section.Keys() {
  105. sectionMap[key.Name()] = true
  106. }
  107. if _, ok := sectionMap["TYPE"]; !ok {
  108. switch Indexer.IssueQueueType {
  109. case LevelQueueType:
  110. _, _ = section.NewKey("TYPE", "level")
  111. case ChannelQueueType:
  112. _, _ = section.NewKey("TYPE", "persistable-channel")
  113. case RedisQueueType:
  114. _, _ = section.NewKey("TYPE", "redis")
  115. default:
  116. log.Fatal("Unsupported indexer queue type: %v",
  117. Indexer.IssueQueueType)
  118. }
  119. }
  120. if _, ok := sectionMap["LENGTH"]; !ok {
  121. _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
  122. }
  123. if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
  124. _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
  125. }
  126. if _, ok := sectionMap["DATADIR"]; !ok {
  127. _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
  128. }
  129. if _, ok := sectionMap["CONN_STR"]; !ok {
  130. _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
  131. }
  132. // Handle the old mailer configuration
  133. section = Cfg.Section("queue.mailer")
  134. sectionMap = map[string]bool{}
  135. for _, key := range section.Keys() {
  136. sectionMap[key.Name()] = true
  137. }
  138. if _, ok := sectionMap["LENGTH"]; !ok {
  139. _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
  140. }
  141. // Handle the old test pull requests configuration
  142. // Please note this will be a unique queue
  143. section = Cfg.Section("queue.pr_patch_checker")
  144. sectionMap = map[string]bool{}
  145. for _, key := range section.Keys() {
  146. sectionMap[key.Name()] = true
  147. }
  148. if _, ok := sectionMap["LENGTH"]; !ok {
  149. _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
  150. }
  151. }
  152. // ParseQueueConnStr parses a queue connection string
  153. func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
  154. fields := strings.Fields(connStr)
  155. for _, f := range fields {
  156. items := strings.SplitN(f, "=", 2)
  157. if len(items) < 2 {
  158. continue
  159. }
  160. switch strings.ToLower(items[0]) {
  161. case "network":
  162. network = items[1]
  163. case "addrs":
  164. addrs = items[1]
  165. case "password":
  166. password = items[1]
  167. case "db":
  168. dbIdx, err = strconv.Atoi(items[1])
  169. if err != nil {
  170. return
  171. }
  172. }
  173. }
  174. return
  175. }