您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package setting
  5. import (
  6. "fmt"
  7. "path"
  8. "path/filepath"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "code.gitea.io/gitea/modules/log"
  13. )
  14. // QueueSettings represent the settings for a queue from the ini
  15. type QueueSettings struct {
  16. DataDir string
  17. Length int
  18. BatchLength int
  19. ConnectionString string
  20. Type string
  21. Network string
  22. Addresses string
  23. Password string
  24. QueueName string
  25. DBIndex int
  26. WrapIfNecessary bool
  27. MaxAttempts int
  28. Timeout time.Duration
  29. Workers int
  30. MaxWorkers int
  31. BlockTimeout time.Duration
  32. BoostTimeout time.Duration
  33. BoostWorkers int
  34. }
  35. // Queue settings
  36. var Queue = QueueSettings{}
  37. // GetQueueSettings returns the queue settings for the appropriately named queue
  38. func GetQueueSettings(name string) QueueSettings {
  39. q := QueueSettings{}
  40. sec := Cfg.Section("queue." + name)
  41. // DataDir is not directly inheritable
  42. q.DataDir = filepath.Join(Queue.DataDir, name)
  43. // QueueName is not directly inheritable either
  44. q.QueueName = name + Queue.QueueName
  45. for _, key := range sec.Keys() {
  46. switch key.Name() {
  47. case "DATADIR":
  48. q.DataDir = key.MustString(q.DataDir)
  49. case "QUEUE_NAME":
  50. q.QueueName = key.MustString(q.QueueName)
  51. }
  52. }
  53. if !filepath.IsAbs(q.DataDir) {
  54. q.DataDir = filepath.Join(AppDataPath, q.DataDir)
  55. }
  56. sec.Key("DATADIR").SetValue(q.DataDir)
  57. // The rest are...
  58. q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
  59. q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
  60. q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
  61. q.Type = sec.Key("TYPE").MustString(Queue.Type)
  62. q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
  63. q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
  64. q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
  65. q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
  66. q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
  67. q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
  68. q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
  69. q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
  70. q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
  71. return q
  72. }
  73. // NewQueueService sets up the default settings for Queues
  74. // This is exported for tests to be able to use the queue
  75. func NewQueueService() {
  76. sec := Cfg.Section("queue")
  77. Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
  78. if !filepath.IsAbs(Queue.DataDir) {
  79. Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
  80. }
  81. Queue.Length = sec.Key("LENGTH").MustInt(20)
  82. Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
  83. Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
  84. Queue.Type = sec.Key("TYPE").MustString("")
  85. Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
  86. Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
  87. Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
  88. Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
  89. Queue.Workers = sec.Key("WORKERS").MustInt(1)
  90. Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
  91. Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
  92. Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
  93. Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
  94. Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
  95. // Now handle the old issue_indexer configuration
  96. section := Cfg.Section("queue.issue_indexer")
  97. issueIndexerSectionMap := map[string]string{}
  98. for _, key := range section.Keys() {
  99. issueIndexerSectionMap[key.Name()] = key.Value()
  100. }
  101. if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
  102. switch Indexer.IssueQueueType {
  103. case LevelQueueType:
  104. section.Key("TYPE").SetValue("level")
  105. case ChannelQueueType:
  106. section.Key("TYPE").SetValue("persistable-channel")
  107. case RedisQueueType:
  108. section.Key("TYPE").SetValue("redis")
  109. default:
  110. log.Fatal("Unsupported indexer queue type: %v",
  111. Indexer.IssueQueueType)
  112. }
  113. }
  114. if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
  115. section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
  116. }
  117. if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
  118. section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
  119. }
  120. if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
  121. section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
  122. }
  123. if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
  124. section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
  125. }
  126. }
  127. // ParseQueueConnStr parses a queue connection string
  128. func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
  129. fields := strings.Fields(connStr)
  130. for _, f := range fields {
  131. items := strings.SplitN(f, "=", 2)
  132. if len(items) < 2 {
  133. continue
  134. }
  135. switch strings.ToLower(items[0]) {
  136. case "network":
  137. network = items[1]
  138. case "addrs":
  139. addrs = items[1]
  140. case "password":
  141. password = items[1]
  142. case "db":
  143. dbIdx, err = strconv.Atoi(items[1])
  144. if err != nil {
  145. return
  146. }
  147. }
  148. }
  149. return
  150. }