summaryrefslogtreecommitdiffstats
path: root/modules/nosql
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nosql')
-rw-r--r--modules/nosql/leveldb.go25
-rw-r--r--modules/nosql/manager.go71
-rw-r--r--modules/nosql/manager_leveldb.go151
-rw-r--r--modules/nosql/manager_redis.go205
-rw-r--r--modules/nosql/redis.go102
-rw-r--r--modules/nosql/redis_test.go35
6 files changed, 589 insertions, 0 deletions
diff --git a/modules/nosql/leveldb.go b/modules/nosql/leveldb.go
new file mode 100644
index 0000000000..5da2291e03
--- /dev/null
+++ b/modules/nosql/leveldb.go
@@ -0,0 +1,25 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import "net/url"
+
+// ToLevelDBURI converts old style connections to a LevelDBURI
+//
+// A LevelDBURI matches the pattern:
+//
+// leveldb://path[?[option=value]*]
+//
+// We have previously just provided the path but this prevent other options
+func ToLevelDBURI(connection string) *url.URL {
+ uri, err := url.Parse(connection)
+ if err == nil && uri.Scheme == "leveldb" {
+ return uri
+ }
+ uri, _ = url.Parse("leveldb://common")
+ uri.Host = ""
+ uri.Path = connection
+ return uri
+}
diff --git a/modules/nosql/manager.go b/modules/nosql/manager.go
new file mode 100644
index 0000000000..ad61d6d18c
--- /dev/null
+++ b/modules/nosql/manager.go
@@ -0,0 +1,71 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/go-redis/redis/v7"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+var manager *Manager
+
+// Manager is the nosql connection manager
+type Manager struct {
+ mutex sync.Mutex
+
+ RedisConnections map[string]*redisClientHolder
+ LevelDBConnections map[string]*levelDBHolder
+}
+
+type redisClientHolder struct {
+ redis.UniversalClient
+ name []string
+ count int64
+}
+
+func (r *redisClientHolder) Close() error {
+ return manager.CloseRedisClient(r.name[0])
+}
+
+type levelDBHolder struct {
+ name []string
+ count int64
+ db *leveldb.DB
+}
+
+func init() {
+ _ = GetManager()
+}
+
+// GetManager returns a Manager and initializes one as singleton is there's none yet
+func GetManager() *Manager {
+ if manager == nil {
+ manager = &Manager{
+ RedisConnections: make(map[string]*redisClientHolder),
+ LevelDBConnections: make(map[string]*levelDBHolder),
+ }
+ }
+ return manager
+}
+
+func valToTimeDuration(vs []string) (result time.Duration) {
+ var err error
+ for _, v := range vs {
+ result, err = time.ParseDuration(v)
+ if err != nil {
+ var val int
+ val, err = strconv.Atoi(v)
+ result = time.Duration(val)
+ }
+ if err == nil {
+ return
+ }
+ }
+ return
+}
diff --git a/modules/nosql/manager_leveldb.go b/modules/nosql/manager_leveldb.go
new file mode 100644
index 0000000000..769d5002d0
--- /dev/null
+++ b/modules/nosql/manager_leveldb.go
@@ -0,0 +1,151 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+// CloseLevelDB closes a levelDB
+func (m *Manager) CloseLevelDB(connection string) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ db, ok := m.LevelDBConnections[connection]
+ if !ok {
+ connection = ToLevelDBURI(connection).String()
+ db, ok = m.LevelDBConnections[connection]
+ }
+ if !ok {
+ return nil
+ }
+
+ db.count--
+ if db.count > 0 {
+ return nil
+ }
+
+ for _, name := range db.name {
+ delete(m.LevelDBConnections, name)
+ }
+ return db.db.Close()
+}
+
+// GetLevelDB gets a levelDB for a particular connection
+func (m *Manager) GetLevelDB(connection string) (*leveldb.DB, error) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ db, ok := m.LevelDBConnections[connection]
+ if ok {
+ db.count++
+
+ return db.db, nil
+ }
+ dataDir := connection
+ uri := ToLevelDBURI(connection)
+ db = &levelDBHolder{
+ name: []string{connection, uri.String()},
+ }
+
+ dataDir = path.Join(uri.Host, uri.Path)
+ opts := &opt.Options{}
+ for k, v := range uri.Query() {
+ switch replacer.Replace(strings.ToLower(k)) {
+ case "blockcachecapacity":
+ opts.BlockCacheCapacity, _ = strconv.Atoi(v[0])
+ case "blockcacheevictremoved":
+ opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0])
+ case "blockrestartinterval":
+ opts.BlockRestartInterval, _ = strconv.Atoi(v[0])
+ case "blocksize":
+ opts.BlockSize, _ = strconv.Atoi(v[0])
+ case "compactionexpandlimitfactor":
+ opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0])
+ case "compactiongpoverlapsfactor":
+ opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0])
+ case "compactionl0trigger":
+ opts.CompactionL0Trigger, _ = strconv.Atoi(v[0])
+ case "compactionsourcelimitfactor":
+ opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0])
+ case "compactiontablesize":
+ opts.CompactionTableSize, _ = strconv.Atoi(v[0])
+ case "compactiontablesizemultiplier":
+ opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
+ case "compactiontablesizemultiplierperlevel":
+ for _, val := range v {
+ f, _ := strconv.ParseFloat(val, 64)
+ opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f)
+ }
+ case "compactiontotalsize":
+ opts.CompactionTotalSize, _ = strconv.Atoi(v[0])
+ case "compactiontotalsizemultiplier":
+ opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
+ case "compactiontotalsizemultiplierperlevel":
+ for _, val := range v {
+ f, _ := strconv.ParseFloat(val, 64)
+ opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f)
+ }
+ case "compression":
+ val, _ := strconv.Atoi(v[0])
+ opts.Compression = opt.Compression(val)
+ case "disablebufferpool":
+ opts.DisableBufferPool, _ = strconv.ParseBool(v[0])
+ case "disableblockcache":
+ opts.DisableBlockCache, _ = strconv.ParseBool(v[0])
+ case "disablecompactionbackoff":
+ opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0])
+ case "disablelargebatchtransaction":
+ opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0])
+ case "errorifexist":
+ opts.ErrorIfExist, _ = strconv.ParseBool(v[0])
+ case "errorifmissing":
+ opts.ErrorIfMissing, _ = strconv.ParseBool(v[0])
+ case "iteratorsamplingrate":
+ opts.IteratorSamplingRate, _ = strconv.Atoi(v[0])
+ case "nosync":
+ opts.NoSync, _ = strconv.ParseBool(v[0])
+ case "nowritemerge":
+ opts.NoWriteMerge, _ = strconv.ParseBool(v[0])
+ case "openfilescachecapacity":
+ opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0])
+ case "readonly":
+ opts.ReadOnly, _ = strconv.ParseBool(v[0])
+ case "strict":
+ val, _ := strconv.Atoi(v[0])
+ opts.Strict = opt.Strict(val)
+ case "writebuffer":
+ opts.WriteBuffer, _ = strconv.Atoi(v[0])
+ case "writel0pausetrigger":
+ opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0])
+ case "writel0slowdowntrigger":
+ opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0])
+ case "clientname":
+ db.name = append(db.name, v[0])
+ }
+ }
+
+ var err error
+ db.db, err = leveldb.OpenFile(dataDir, opts)
+ if err != nil {
+ if !errors.IsCorrupted(err) {
+ return nil, err
+ }
+ db.db, err = leveldb.RecoverFile(dataDir, opts)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ for _, name := range db.name {
+ m.LevelDBConnections[name] = db
+ }
+ db.count++
+ return db.db, nil
+}
diff --git a/modules/nosql/manager_redis.go b/modules/nosql/manager_redis.go
new file mode 100644
index 0000000000..7792a90112
--- /dev/null
+++ b/modules/nosql/manager_redis.go
@@ -0,0 +1,205 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "crypto/tls"
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/go-redis/redis/v7"
+)
+
+var replacer = strings.NewReplacer("_", "", "-", "")
+
+// CloseRedisClient closes a redis client
+func (m *Manager) CloseRedisClient(connection string) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ client, ok := m.RedisConnections[connection]
+ if !ok {
+ connection = ToRedisURI(connection).String()
+ client, ok = m.RedisConnections[connection]
+ }
+ if !ok {
+ return nil
+ }
+
+ client.count--
+ if client.count > 0 {
+ return nil
+ }
+
+ for _, name := range client.name {
+ delete(m.RedisConnections, name)
+ }
+ return client.UniversalClient.Close()
+}
+
+// GetRedisClient gets a redis client for a particular connection
+func (m *Manager) GetRedisClient(connection string) redis.UniversalClient {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ client, ok := m.RedisConnections[connection]
+ if ok {
+ client.count++
+ return client
+ }
+
+ uri := ToRedisURI(connection)
+ client, ok = m.RedisConnections[uri.String()]
+ if ok {
+ client.count++
+ return client
+ }
+ client = &redisClientHolder{
+ name: []string{connection, uri.String()},
+ }
+
+ opts := &redis.UniversalOptions{}
+ tlsConfig := &tls.Config{}
+
+ // Handle username/password
+ if password, ok := uri.User.Password(); ok {
+ opts.Password = password
+ // Username does not appear to be handled by redis.Options
+ opts.Username = uri.User.Username()
+ } else if uri.User.Username() != "" {
+ // assume this is the password
+ opts.Password = uri.User.Username()
+ }
+
+ // Now handle the uri query sets
+ for k, v := range uri.Query() {
+ switch replacer.Replace(strings.ToLower(k)) {
+ case "addr":
+ opts.Addrs = append(opts.Addrs, v...)
+ case "addrs":
+ opts.Addrs = append(opts.Addrs, strings.Split(v[0], ",")...)
+ case "username":
+ opts.Username = v[0]
+ case "password":
+ opts.Password = v[0]
+ case "database":
+ fallthrough
+ case "db":
+ opts.DB, _ = strconv.Atoi(v[0])
+ case "maxretries":
+ opts.MaxRetries, _ = strconv.Atoi(v[0])
+ case "minretrybackoff":
+ opts.MinRetryBackoff = valToTimeDuration(v)
+ case "maxretrybackoff":
+ opts.MaxRetryBackoff = valToTimeDuration(v)
+ case "timeout":
+ timeout := valToTimeDuration(v)
+ if timeout != 0 {
+ if opts.DialTimeout == 0 {
+ opts.DialTimeout = timeout
+ }
+ if opts.ReadTimeout == 0 {
+ opts.ReadTimeout = timeout
+ }
+ }
+ case "dialtimeout":
+ opts.DialTimeout = valToTimeDuration(v)
+ case "readtimeout":
+ opts.ReadTimeout = valToTimeDuration(v)
+ case "writetimeout":
+ opts.WriteTimeout = valToTimeDuration(v)
+ case "poolsize":
+ opts.PoolSize, _ = strconv.Atoi(v[0])
+ case "minidleconns":
+ opts.MinIdleConns, _ = strconv.Atoi(v[0])
+ case "pooltimeout":
+ opts.PoolTimeout = valToTimeDuration(v)
+ case "idletimeout":
+ opts.IdleTimeout = valToTimeDuration(v)
+ case "idlecheckfrequency":
+ opts.IdleCheckFrequency = valToTimeDuration(v)
+ case "maxredirects":
+ opts.MaxRedirects, _ = strconv.Atoi(v[0])
+ case "readonly":
+ opts.ReadOnly, _ = strconv.ParseBool(v[0])
+ case "routebylatency":
+ opts.RouteByLatency, _ = strconv.ParseBool(v[0])
+ case "routerandomly":
+ opts.RouteRandomly, _ = strconv.ParseBool(v[0])
+ case "sentinelmasterid":
+ fallthrough
+ case "mastername":
+ opts.MasterName = v[0]
+ case "skipverify":
+ fallthrough
+ case "insecureskipverify":
+ insecureSkipVerify, _ := strconv.ParseBool(v[0])
+ tlsConfig.InsecureSkipVerify = insecureSkipVerify
+ case "clientname":
+ client.name = append(client.name, v[0])
+ }
+ }
+
+ switch uri.Scheme {
+ case "redis+sentinels":
+ fallthrough
+ case "rediss+sentinel":
+ opts.TLSConfig = tlsConfig
+ fallthrough
+ case "redis+sentinel":
+ if uri.Host != "" {
+ opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...)
+ }
+ if uri.Path != "" {
+ if db, err := strconv.Atoi(uri.Path); err == nil {
+ opts.DB = db
+ }
+ }
+
+ client.UniversalClient = redis.NewFailoverClient(opts.Failover())
+ case "redis+clusters":
+ fallthrough
+ case "rediss+cluster":
+ opts.TLSConfig = tlsConfig
+ fallthrough
+ case "redis+cluster":
+ if uri.Host != "" {
+ opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...)
+ }
+ if uri.Path != "" {
+ if db, err := strconv.Atoi(uri.Path); err == nil {
+ opts.DB = db
+ }
+ }
+ client.UniversalClient = redis.NewClusterClient(opts.Cluster())
+ case "redis+socket":
+ simpleOpts := opts.Simple()
+ simpleOpts.Network = "unix"
+ simpleOpts.Addr = path.Join(uri.Host, uri.Path)
+ client.UniversalClient = redis.NewClient(simpleOpts)
+ case "rediss":
+ opts.TLSConfig = tlsConfig
+ fallthrough
+ case "redis":
+ if uri.Host != "" {
+ opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...)
+ }
+ if uri.Path != "" {
+ if db, err := strconv.Atoi(uri.Path); err == nil {
+ opts.DB = db
+ }
+ }
+ client.UniversalClient = redis.NewClient(opts.Simple())
+ default:
+ return nil
+ }
+
+ for _, name := range client.name {
+ m.RedisConnections[name] = client
+ }
+
+ client.count++
+
+ return client
+}
diff --git a/modules/nosql/redis.go b/modules/nosql/redis.go
new file mode 100644
index 0000000000..528f5fc802
--- /dev/null
+++ b/modules/nosql/redis.go
@@ -0,0 +1,102 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "net/url"
+ "strconv"
+ "strings"
+)
+
+// The file contains common redis connection functions
+
+// ToRedisURI converts old style connections to a RedisURI
+//
+// A RedisURI matches the pattern:
+//
+// redis://[username:password@]host[:port][/database][?[option=value]*]
+// rediss://[username:password@]host[:port][/database][?[option=value]*]
+// redis+socket://[username:password@]path[/database][?[option=value]*]
+// redis+sentinel://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*]
+// redis+cluster://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*]
+//
+// We have previously used a URI like:
+// addrs=127.0.0.1:6379 db=0
+// network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180
+//
+// We need to convert this old style to the new style
+func ToRedisURI(connection string) *url.URL {
+ uri, err := url.Parse(connection)
+ if err == nil && strings.HasPrefix(uri.Scheme, "redis") {
+ // OK we're going to assume that this is a reasonable redis URI
+ return uri
+ }
+
+ // Let's set a nice default
+ uri, _ = url.Parse("redis://127.0.0.1:6379/0")
+ network := "tcp"
+ query := uri.Query()
+
+ // OK so there are two types: Space delimited and Comma delimited
+ // Let's assume that we have a space delimited string - as this is the most common
+ fields := strings.Fields(connection)
+ if len(fields) == 1 {
+ // It's a comma delimited string, then...
+ fields = strings.Split(connection, ",")
+
+ }
+ for _, f := range fields {
+ items := strings.SplitN(f, "=", 2)
+ if len(items) < 2 {
+ continue
+ }
+ switch strings.ToLower(items[0]) {
+ case "network":
+ if items[1] == "unix" {
+ uri.Scheme = "redis+socket"
+ }
+ network = items[1]
+ case "addrs":
+ uri.Host = items[1]
+ // now we need to handle the clustering
+ if strings.Contains(items[1], ",") && network == "tcp" {
+ uri.Scheme = "redis+cluster"
+ }
+ case "addr":
+ uri.Host = items[1]
+ case "password":
+ uri.User = url.UserPassword(uri.User.Username(), items[1])
+ case "username":
+ password, set := uri.User.Password()
+ if !set {
+ uri.User = url.User(items[1])
+ } else {
+ uri.User = url.UserPassword(items[1], password)
+ }
+ case "db":
+ uri.Path = "/" + items[1]
+ case "idle_timeout":
+ _, err := strconv.Atoi(items[1])
+ if err == nil {
+ query.Add("idle_timeout", items[1]+"s")
+ } else {
+ query.Add("idle_timeout", items[1])
+ }
+ default:
+ // Other options become query params
+ query.Add(items[0], items[1])
+ }
+ }
+
+ // Finally we need to fix up the Host if we have a unix port
+ if uri.Scheme == "redis+socket" {
+ query.Set("db", uri.Path)
+ uri.Path = uri.Host
+ uri.Host = ""
+ }
+ uri.RawQuery = query.Encode()
+
+ return uri
+}
diff --git a/modules/nosql/redis_test.go b/modules/nosql/redis_test.go
new file mode 100644
index 0000000000..c70d236bdc
--- /dev/null
+++ b/modules/nosql/redis_test.go
@@ -0,0 +1,35 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "testing"
+)
+
+func TestToRedisURI(t *testing.T) {
+ tests := []struct {
+ name string
+ connection string
+ want string
+ }{
+ {
+ name: "old_default",
+ connection: "addrs=127.0.0.1:6379 db=0",
+ want: "redis://127.0.0.1:6379/0",
+ },
+ {
+ name: "old_macaron_session_default",
+ connection: "network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180",
+ want: "redis://:macaron@127.0.0.1:6379/0?idle_timeout=180s&pool_size=100",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := ToRedisURI(tt.connection); got == nil || got.String() != tt.want {
+ t.Errorf(`ToRedisURI(%q) = %s, want %s`, tt.connection, got.String(), tt.want)
+ }
+ })
+ }
+}