You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager_leveldb.go 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package nosql
  4. import (
  5. "fmt"
  6. "path"
  7. "runtime/pprof"
  8. "strconv"
  9. "strings"
  10. "code.gitea.io/gitea/modules/log"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/syndtr/goleveldb/leveldb/errors"
  13. "github.com/syndtr/goleveldb/leveldb/opt"
  14. )
  15. // CloseLevelDB closes a levelDB
  16. func (m *Manager) CloseLevelDB(connection string) error {
  17. m.mutex.Lock()
  18. defer m.mutex.Unlock()
  19. db, ok := m.LevelDBConnections[connection]
  20. if !ok {
  21. // Try the full URI
  22. uri := ToLevelDBURI(connection)
  23. db, ok = m.LevelDBConnections[uri.String()]
  24. if !ok {
  25. // Try the datadir directly
  26. dataDir := path.Join(uri.Host, uri.Path)
  27. db, ok = m.LevelDBConnections[dataDir]
  28. }
  29. }
  30. if !ok {
  31. return nil
  32. }
  33. db.count--
  34. if db.count > 0 {
  35. return nil
  36. }
  37. for _, name := range db.name {
  38. delete(m.LevelDBConnections, name)
  39. }
  40. return db.db.Close()
  41. }
  42. // GetLevelDB gets a levelDB for a particular connection
  43. func (m *Manager) GetLevelDB(connection string) (db *leveldb.DB, err error) {
  44. // Because we want associate any goroutines created by this call to the main nosqldb context we need to
  45. // wrap this in a goroutine labelled with the nosqldb context
  46. done := make(chan struct{})
  47. var recovered interface{}
  48. go func() {
  49. defer func() {
  50. recovered = recover()
  51. if recovered != nil {
  52. log.Critical("PANIC during GetLevelDB: %v\nStacktrace: %s", recovered, log.Stack(2))
  53. }
  54. close(done)
  55. }()
  56. pprof.SetGoroutineLabels(m.ctx)
  57. db, err = m.getLevelDB(connection)
  58. }()
  59. <-done
  60. if recovered != nil {
  61. panic(recovered)
  62. }
  63. return db, err
  64. }
  65. func (m *Manager) getLevelDB(connection string) (*leveldb.DB, error) {
  66. // Convert the provided connection description to the common format
  67. uri := ToLevelDBURI(connection)
  68. // Get the datadir
  69. dataDir := path.Join(uri.Host, uri.Path)
  70. m.mutex.Lock()
  71. defer m.mutex.Unlock()
  72. db, ok := m.LevelDBConnections[connection]
  73. if ok {
  74. db.count++
  75. return db.db, nil
  76. }
  77. db, ok = m.LevelDBConnections[uri.String()]
  78. if ok {
  79. db.count++
  80. return db.db, nil
  81. }
  82. // if there is already a connection to this leveldb reuse that
  83. // NOTE: if there differing options then only the first leveldb connection will be used
  84. db, ok = m.LevelDBConnections[dataDir]
  85. if ok {
  86. db.count++
  87. log.Warn("Duplicate connection to level db: %s with different connection strings. Initial connection: %s. This connection: %s", dataDir, db.name[0], connection)
  88. db.name = append(db.name, connection)
  89. m.LevelDBConnections[connection] = db
  90. return db.db, nil
  91. }
  92. db = &levelDBHolder{
  93. name: []string{connection, uri.String(), dataDir},
  94. }
  95. opts := &opt.Options{}
  96. for k, v := range uri.Query() {
  97. switch replacer.Replace(strings.ToLower(k)) {
  98. case "blockcachecapacity":
  99. opts.BlockCacheCapacity, _ = strconv.Atoi(v[0])
  100. case "blockcacheevictremoved":
  101. opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0])
  102. case "blockrestartinterval":
  103. opts.BlockRestartInterval, _ = strconv.Atoi(v[0])
  104. case "blocksize":
  105. opts.BlockSize, _ = strconv.Atoi(v[0])
  106. case "compactionexpandlimitfactor":
  107. opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0])
  108. case "compactiongpoverlapsfactor":
  109. opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0])
  110. case "compactionl0trigger":
  111. opts.CompactionL0Trigger, _ = strconv.Atoi(v[0])
  112. case "compactionsourcelimitfactor":
  113. opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0])
  114. case "compactiontablesize":
  115. opts.CompactionTableSize, _ = strconv.Atoi(v[0])
  116. case "compactiontablesizemultiplier":
  117. opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
  118. case "compactiontablesizemultiplierperlevel":
  119. for _, val := range v {
  120. f, _ := strconv.ParseFloat(val, 64)
  121. opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f)
  122. }
  123. case "compactiontotalsize":
  124. opts.CompactionTotalSize, _ = strconv.Atoi(v[0])
  125. case "compactiontotalsizemultiplier":
  126. opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
  127. case "compactiontotalsizemultiplierperlevel":
  128. for _, val := range v {
  129. f, _ := strconv.ParseFloat(val, 64)
  130. opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f)
  131. }
  132. case "compression":
  133. val, _ := strconv.Atoi(v[0])
  134. opts.Compression = opt.Compression(val)
  135. case "disablebufferpool":
  136. opts.DisableBufferPool, _ = strconv.ParseBool(v[0])
  137. case "disableblockcache":
  138. opts.DisableBlockCache, _ = strconv.ParseBool(v[0])
  139. case "disablecompactionbackoff":
  140. opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0])
  141. case "disablelargebatchtransaction":
  142. opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0])
  143. case "errorifexist":
  144. opts.ErrorIfExist, _ = strconv.ParseBool(v[0])
  145. case "errorifmissing":
  146. opts.ErrorIfMissing, _ = strconv.ParseBool(v[0])
  147. case "iteratorsamplingrate":
  148. opts.IteratorSamplingRate, _ = strconv.Atoi(v[0])
  149. case "nosync":
  150. opts.NoSync, _ = strconv.ParseBool(v[0])
  151. case "nowritemerge":
  152. opts.NoWriteMerge, _ = strconv.ParseBool(v[0])
  153. case "openfilescachecapacity":
  154. opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0])
  155. case "readonly":
  156. opts.ReadOnly, _ = strconv.ParseBool(v[0])
  157. case "strict":
  158. val, _ := strconv.Atoi(v[0])
  159. opts.Strict = opt.Strict(val)
  160. case "writebuffer":
  161. opts.WriteBuffer, _ = strconv.Atoi(v[0])
  162. case "writel0pausetrigger":
  163. opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0])
  164. case "writel0slowdowntrigger":
  165. opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0])
  166. case "clientname":
  167. db.name = append(db.name, v[0])
  168. }
  169. }
  170. var err error
  171. db.db, err = leveldb.OpenFile(dataDir, opts)
  172. if err != nil {
  173. if !errors.IsCorrupted(err) {
  174. if strings.Contains(err.Error(), "resource temporarily unavailable") {
  175. err = fmt.Errorf("unable to lock level db at %s: %w", dataDir, err)
  176. return nil, err
  177. }
  178. err = fmt.Errorf("unable to open level db at %s: %w", dataDir, err)
  179. return nil, err
  180. }
  181. db.db, err = leveldb.RecoverFile(dataDir, opts)
  182. }
  183. if err != nil {
  184. return nil, err
  185. }
  186. for _, name := range db.name {
  187. m.LevelDBConnections[name] = db
  188. }
  189. db.count++
  190. return db.db, nil
  191. }