123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- // Copyright 2020 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package nosql
-
- import (
- "fmt"
- "path"
- "runtime/pprof"
- "strconv"
- "strings"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/opt"
- )
-
- // CloseLevelDB closes a levelDB
- func (m *Manager) CloseLevelDB(connection string) error {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- db, ok := m.LevelDBConnections[connection]
- if !ok {
- // Try the full URI
- uri := ToLevelDBURI(connection)
- db, ok = m.LevelDBConnections[uri.String()]
-
- if !ok {
- // Try the datadir directly
- dataDir := path.Join(uri.Host, uri.Path)
-
- db, ok = m.LevelDBConnections[dataDir]
- }
- }
- if !ok {
- return nil
- }
-
- db.count--
- if db.count > 0 {
- return nil
- }
-
- for _, name := range db.name {
- delete(m.LevelDBConnections, name)
- }
- return db.db.Close()
- }
-
- // GetLevelDB gets a levelDB for a particular connection
- func (m *Manager) GetLevelDB(connection string) (db *leveldb.DB, err error) {
- // Because we want associate any goroutines created by this call to the main nosqldb context we need to
- // wrap this in a goroutine labelled with the nosqldb context
- done := make(chan struct{})
- var recovered interface{}
- go func() {
- defer func() {
- recovered = recover()
- if recovered != nil {
- log.Critical("PANIC during GetLevelDB: %v\nStacktrace: %s", recovered, log.Stack(2))
- }
- close(done)
- }()
- pprof.SetGoroutineLabels(m.ctx)
-
- db, err = m.getLevelDB(connection)
- }()
- <-done
- if recovered != nil {
- panic(recovered)
- }
- return db, err
- }
-
- func (m *Manager) getLevelDB(connection string) (*leveldb.DB, error) {
- // Convert the provided connection description to the common format
- uri := ToLevelDBURI(connection)
-
- // Get the datadir
- dataDir := path.Join(uri.Host, uri.Path)
-
- m.mutex.Lock()
- defer m.mutex.Unlock()
- db, ok := m.LevelDBConnections[connection]
- if ok {
- db.count++
-
- return db.db, nil
- }
-
- db, ok = m.LevelDBConnections[uri.String()]
- if ok {
- db.count++
-
- return db.db, nil
- }
-
- // if there is already a connection to this leveldb reuse that
- // NOTE: if there differing options then only the first leveldb connection will be used
- db, ok = m.LevelDBConnections[dataDir]
- if ok {
- db.count++
- log.Warn("Duplicate connection to level db: %s with different connection strings. Initial connection: %s. This connection: %s", dataDir, db.name[0], connection)
- db.name = append(db.name, connection)
- m.LevelDBConnections[connection] = db
- return db.db, nil
- }
- db = &levelDBHolder{
- name: []string{connection, uri.String(), dataDir},
- }
-
- opts := &opt.Options{}
- for k, v := range uri.Query() {
- switch replacer.Replace(strings.ToLower(k)) {
- case "blockcachecapacity":
- opts.BlockCacheCapacity, _ = strconv.Atoi(v[0])
- case "blockcacheevictremoved":
- opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0])
- case "blockrestartinterval":
- opts.BlockRestartInterval, _ = strconv.Atoi(v[0])
- case "blocksize":
- opts.BlockSize, _ = strconv.Atoi(v[0])
- case "compactionexpandlimitfactor":
- opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0])
- case "compactiongpoverlapsfactor":
- opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0])
- case "compactionl0trigger":
- opts.CompactionL0Trigger, _ = strconv.Atoi(v[0])
- case "compactionsourcelimitfactor":
- opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0])
- case "compactiontablesize":
- opts.CompactionTableSize, _ = strconv.Atoi(v[0])
- case "compactiontablesizemultiplier":
- opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
- case "compactiontablesizemultiplierperlevel":
- for _, val := range v {
- f, _ := strconv.ParseFloat(val, 64)
- opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f)
- }
- case "compactiontotalsize":
- opts.CompactionTotalSize, _ = strconv.Atoi(v[0])
- case "compactiontotalsizemultiplier":
- opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
- case "compactiontotalsizemultiplierperlevel":
- for _, val := range v {
- f, _ := strconv.ParseFloat(val, 64)
- opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f)
- }
- case "compression":
- val, _ := strconv.Atoi(v[0])
- opts.Compression = opt.Compression(val)
- case "disablebufferpool":
- opts.DisableBufferPool, _ = strconv.ParseBool(v[0])
- case "disableblockcache":
- opts.DisableBlockCache, _ = strconv.ParseBool(v[0])
- case "disablecompactionbackoff":
- opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0])
- case "disablelargebatchtransaction":
- opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0])
- case "errorifexist":
- opts.ErrorIfExist, _ = strconv.ParseBool(v[0])
- case "errorifmissing":
- opts.ErrorIfMissing, _ = strconv.ParseBool(v[0])
- case "iteratorsamplingrate":
- opts.IteratorSamplingRate, _ = strconv.Atoi(v[0])
- case "nosync":
- opts.NoSync, _ = strconv.ParseBool(v[0])
- case "nowritemerge":
- opts.NoWriteMerge, _ = strconv.ParseBool(v[0])
- case "openfilescachecapacity":
- opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0])
- case "readonly":
- opts.ReadOnly, _ = strconv.ParseBool(v[0])
- case "strict":
- val, _ := strconv.Atoi(v[0])
- opts.Strict = opt.Strict(val)
- case "writebuffer":
- opts.WriteBuffer, _ = strconv.Atoi(v[0])
- case "writel0pausetrigger":
- opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0])
- case "writel0slowdowntrigger":
- opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0])
- case "clientname":
- db.name = append(db.name, v[0])
- }
- }
-
- var err error
- db.db, err = leveldb.OpenFile(dataDir, opts)
- if err != nil {
- if !errors.IsCorrupted(err) {
- if strings.Contains(err.Error(), "resource temporarily unavailable") {
- err = fmt.Errorf("unable to lock level db at %s: %w", dataDir, err)
- return nil, err
- }
-
- err = fmt.Errorf("unable to open level db at %s: %w", dataDir, err)
- return nil, err
- }
- db.db, err = leveldb.RecoverFile(dataDir, opts)
- }
-
- if err != nil {
- return nil, err
- }
-
- for _, name := range db.name {
- m.LevelDBConnections[name] = db
- }
- db.count++
- return db.db, nil
- }
|