You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 6.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package setting
  5. import (
  6. "fmt"
  7. "path"
  8. "path/filepath"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "code.gitea.io/gitea/modules/log"
  13. )
  14. // QueueSettings represent the settings for a queue from the ini
  15. type QueueSettings struct {
  16. DataDir string
  17. Length int
  18. BatchLength int
  19. ConnectionString string
  20. Type string
  21. Network string
  22. Addresses string
  23. Password string
  24. QueueName string
  25. SetName string
  26. DBIndex int
  27. WrapIfNecessary bool
  28. MaxAttempts int
  29. Timeout time.Duration
  30. Workers int
  31. MaxWorkers int
  32. BlockTimeout time.Duration
  33. BoostTimeout time.Duration
  34. BoostWorkers int
  35. }
  36. // Queue settings
  37. var Queue = QueueSettings{}
  38. // GetQueueSettings returns the queue settings for the appropriately named queue
  39. func GetQueueSettings(name string) QueueSettings {
  40. q := QueueSettings{}
  41. sec := Cfg.Section("queue." + name)
  42. // DataDir is not directly inheritable
  43. q.DataDir = filepath.Join(Queue.DataDir, name)
  44. // QueueName is not directly inheritable either
  45. q.QueueName = name + Queue.QueueName
  46. for _, key := range sec.Keys() {
  47. switch key.Name() {
  48. case "DATADIR":
  49. q.DataDir = key.MustString(q.DataDir)
  50. case "QUEUE_NAME":
  51. q.QueueName = key.MustString(q.QueueName)
  52. case "SET_NAME":
  53. q.SetName = key.MustString(q.SetName)
  54. }
  55. }
  56. if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
  57. q.SetName = q.QueueName + Queue.SetName
  58. }
  59. if !filepath.IsAbs(q.DataDir) {
  60. q.DataDir = filepath.Join(AppDataPath, q.DataDir)
  61. }
  62. _, _ = sec.NewKey("DATADIR", q.DataDir)
  63. // The rest are...
  64. q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
  65. q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
  66. q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
  67. q.Type = sec.Key("TYPE").MustString(Queue.Type)
  68. q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
  69. q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
  70. q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
  71. q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
  72. q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
  73. q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
  74. q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
  75. q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
  76. q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
  77. return q
  78. }
  79. // NewQueueService sets up the default settings for Queues
  80. // This is exported for tests to be able to use the queue
  81. func NewQueueService() {
  82. sec := Cfg.Section("queue")
  83. Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
  84. if !filepath.IsAbs(Queue.DataDir) {
  85. Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
  86. }
  87. Queue.Length = sec.Key("LENGTH").MustInt(20)
  88. Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
  89. Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
  90. Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
  91. Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
  92. Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
  93. Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
  94. Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
  95. Queue.Workers = sec.Key("WORKERS").MustInt(1)
  96. Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
  97. Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
  98. Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
  99. Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
  100. Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
  101. Queue.SetName = sec.Key("SET_NAME").MustString("")
  102. // Now handle the old issue_indexer configuration
  103. section := Cfg.Section("queue.issue_indexer")
  104. sectionMap := map[string]bool{}
  105. for _, key := range section.Keys() {
  106. sectionMap[key.Name()] = true
  107. }
  108. if _, ok := sectionMap["TYPE"]; !ok {
  109. switch Indexer.IssueQueueType {
  110. case LevelQueueType:
  111. _, _ = section.NewKey("TYPE", "level")
  112. case ChannelQueueType:
  113. _, _ = section.NewKey("TYPE", "persistable-channel")
  114. case RedisQueueType:
  115. _, _ = section.NewKey("TYPE", "redis")
  116. default:
  117. log.Fatal("Unsupported indexer queue type: %v",
  118. Indexer.IssueQueueType)
  119. }
  120. }
  121. if _, ok := sectionMap["LENGTH"]; !ok {
  122. _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
  123. }
  124. if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
  125. _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
  126. }
  127. if _, ok := sectionMap["DATADIR"]; !ok {
  128. _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
  129. }
  130. if _, ok := sectionMap["CONN_STR"]; !ok {
  131. _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
  132. }
  133. // Handle the old mailer configuration
  134. section = Cfg.Section("queue.mailer")
  135. sectionMap = map[string]bool{}
  136. for _, key := range section.Keys() {
  137. sectionMap[key.Name()] = true
  138. }
  139. if _, ok := sectionMap["LENGTH"]; !ok {
  140. _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
  141. }
  142. // Handle the old test pull requests configuration
  143. // Please note this will be a unique queue
  144. section = Cfg.Section("queue.pr_patch_checker")
  145. sectionMap = map[string]bool{}
  146. for _, key := range section.Keys() {
  147. sectionMap[key.Name()] = true
  148. }
  149. if _, ok := sectionMap["LENGTH"]; !ok {
  150. _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
  151. }
  152. }
  153. // ParseQueueConnStr parses a queue connection string
  154. func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
  155. fields := strings.Fields(connStr)
  156. for _, f := range fields {
  157. items := strings.SplitN(f, "=", 2)
  158. if len(items) < 2 {
  159. continue
  160. }
  161. switch strings.ToLower(items[0]) {
  162. case "network":
  163. network = items[1]
  164. case "addrs":
  165. addrs = items[1]
  166. case "password":
  167. password = items[1]
  168. case "db":
  169. dbIdx, err = strconv.Atoi(items[1])
  170. if err != nil {
  171. return
  172. }
  173. }
  174. }
  175. return
  176. }