]> source.dussan.org Git - gitea.git/commitdiff
Introduce globallock as distributed locks (#31908)
authorJason Song <i@wolfogre.com>
Mon, 26 Aug 2024 14:27:57 +0000 (22:27 +0800)
committerGitHub <noreply@github.com>
Mon, 26 Aug 2024 14:27:57 +0000 (22:27 +0800)
To help #31813, but do not replace it, since this PR just introduces the
new module but misses some work:

- New option in settings. `#31813` has done it.
- Use the locks in business logic. `#31813` has done it.

So I think the most efficient way is to merge this PR first (if it's
acceptable) and then finish #31813.

## Design principles

### Use spinlock even in memory implementation

In actual use cases, users may cancel requests. `sync.Mutex` will block
the goroutine until the lock is acquired even if the request is
canceled. And the spinlock is more suitable for this scenario since it's
possible to give up the lock acquisition.

Although the spinlock consumes more CPU resources, I think it's
acceptable in most cases.

### Do not expose the mutex to callers

If we expose the mutex to callers, it's possible for callers to reuse
the mutex, which causes more complexity.

For example:
```go
lock := GetLocker(key)
lock.Lock()
// ...
// even if the lock is unlocked, we cannot GC the lock,
// since the caller may still use it again.
lock.Unlock()
lock.Lock()
// ...
lock.Unlock()

// callers have to GC the lock manually.
RemoveLocker(key)
```

That's why
https://github.com/go-gitea/gitea/pull/31813#discussion_r1721200549

In this PR, we only expose `ReleaseFunc` to callers. So callers just
need to call `ReleaseFunc` to release the lock, and do not need to care
about the lock's lifecycle.
```go
_, release, err := locker.Lock(ctx, key)
if err != nil {
    return err
}
// ...
release()

// if callers want to lock again, they have to re-acquire the lock.
_, release, err := locker.Lock(ctx, key)
// ...
```

In this way, it's also much easier for redis implementation to extend
the mutex automatically, so that callers do not need to care about the
lock's lifecycle. See also
https://github.com/go-gitea/gitea/pull/31813#discussion_r1722659743

### Use "release" instead of "unlock"

For "unlock", it has the meaning of "unlock an acquired lock". So it's
not acceptable to call "unlock" when failed to acquire the lock, or call
"unlock" multiple times. It causes more complexity for callers to decide
whether to call "unlock" or not.

So we use "release" instead of "unlock" to make it clear. Whether the
lock is acquired or not, callers can always call "release", and it's
also safe to call "release" multiple times.

But the code DO NOT expect callers to not call "release" after acquiring
the lock. If callers forget to call "release", it will cause resource
leak. That's why it's always safe to call "release" without extra
checks: to avoid callers to forget to call it.

### Acquired locks could be lost

Unlike `sync.Mutex` which will be locked forever once acquired until
calling `Unlock`, in the new module, the acquired lock could be lost.

For example, the caller has acquired the lock, and it holds the lock for
a long time since auto-extending is working for redis. However, it lost
the connection to the redis server, and it's impossible to extend the
lock anymore.

If the caller don't stop what it's doing, another instance which can
connect to the redis server could acquire the lock, and do the same
thing, which could cause data inconsistency.

So the caller should know what happened, the solution is to return a new
context which will be canceled if the lock is lost or released:

```go
ctx, release, err := locker.Lock(ctx, key)
if err != nil {
    return err
}
defer release()
// ...
DoSomething(ctx)

// the lock is lost now, then ctx has been canceled.

// Failed, since ctx has been canceled.
DoSomethingElse(ctx)
```

### Multiple ways to use the lock

1. Regular way

```go
ctx, release, err := Lock(ctx, key)
if err != nil {
    return err
}
defer release()
// ...
```

2. Early release

```go
ctx, release, err := Lock(ctx, key)
if err != nil {
    return err
}
defer release()
// ...
// release the lock earlier and reset the context back
ctx = release()
// continue to do something else
// ...
```

3. Functional way

```go
if err := LockAndDo(ctx, key, func(ctx context.Context) error {
    // ...
    return nil
}); err != nil {
    return err
}
```

go.mod
go.sum
modules/globallock/globallock.go [new file with mode: 0644]
modules/globallock/globallock_test.go [new file with mode: 0644]
modules/globallock/locker.go [new file with mode: 0644]
modules/globallock/locker_test.go [new file with mode: 0644]
modules/globallock/memory_locker.go [new file with mode: 0644]
modules/globallock/redis_locker.go [new file with mode: 0644]

diff --git a/go.mod b/go.mod
index f5c189893f4d5b4aefda3df20130ee45655d5c15..69695fa178733045d8aa9c5943e0a921b76b5542 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -49,6 +49,7 @@ require (
        github.com/go-git/go-billy/v5 v5.5.0
        github.com/go-git/go-git/v5 v5.12.0
        github.com/go-ldap/ldap/v3 v3.4.6
+       github.com/go-redsync/redsync/v4 v4.13.0
        github.com/go-sql-driver/mysql v1.8.1
        github.com/go-swagger/go-swagger v0.31.0
        github.com/go-testfixtures/testfixtures/v3 v3.11.0
@@ -218,7 +219,9 @@ require (
        github.com/gorilla/mux v1.8.1 // indirect
        github.com/gorilla/securecookie v1.1.2 // indirect
        github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
+       github.com/hashicorp/errwrap v1.1.0 // indirect
        github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
+       github.com/hashicorp/go-multierror v1.1.1 // indirect
        github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
        github.com/hashicorp/hcl v1.0.0 // indirect
        github.com/imdario/mergo v0.3.16 // indirect
diff --git a/go.sum b/go.sum
index f1780fada798152dfff50efbf8a76d4cbb66f5f3..510ef8479de1811213173eb6ad374e5da0657c86 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -342,6 +342,14 @@ github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ
 github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58=
 github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ=
 github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
+github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
+github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
+github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
+github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
+github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
+github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
+github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA=
+github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ=
 github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
 github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
@@ -397,6 +405,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
 github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
+github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
 github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
 github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -449,10 +459,15 @@ github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE=
 github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk=
 github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
 github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
+github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
 github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
 github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
 github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU=
 github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
@@ -674,6 +689,8 @@ github.com/quasoft/websspi v1.1.2/go.mod h1:HmVdl939dQ0WIXZhyik+ARdI03M6bQzaSEKc
 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/redis/go-redis/v9 v9.6.0 h1:NLck+Rab3AOTHw21CGRpvQpgTrAU4sgdCswqGtlhGRA=
 github.com/redis/go-redis/v9 v9.6.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
+github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/rhysd/actionlint v1.7.1 h1:WJaDzyT1StBWVKGSsZPYnbV0HF9Y9/vD6KFdZQL42qE=
@@ -765,6 +782,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
 github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
 github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
+github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
 github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
 github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
diff --git a/modules/globallock/globallock.go b/modules/globallock/globallock.go
new file mode 100644 (file)
index 0000000..707d169
--- /dev/null
@@ -0,0 +1,66 @@
+// Copyright 2024 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package globallock
+
+import (
+       "context"
+       "sync"
+)
+
+var (
+       defaultLocker Locker
+       initOnce      sync.Once
+       initFunc      = func() {
+               // TODO: read the setting and initialize the default locker.
+               //       Before implementing this, don't use it.
+       } // define initFunc as a variable to make it possible to change it in tests
+)
+
+// DefaultLocker returns the default locker.
+func DefaultLocker() Locker {
+       initOnce.Do(func() {
+               initFunc()
+       })
+       return defaultLocker
+}
+
+// Lock tries to acquire a lock for the given key, it uses the default locker.
+// Read the documentation of Locker.Lock for more information about the behavior.
+func Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) {
+       return DefaultLocker().Lock(ctx, key)
+}
+
+// TryLock tries to acquire a lock for the given key, it uses the default locker.
+// Read the documentation of Locker.TryLock for more information about the behavior.
+func TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) {
+       return DefaultLocker().TryLock(ctx, key)
+}
+
+// LockAndDo tries to acquire a lock for the given key and then calls the given function.
+// It uses the default locker, and it will return an error if failed to acquire the lock.
+func LockAndDo(ctx context.Context, key string, f func(context.Context) error) error {
+       ctx, release, err := Lock(ctx, key)
+       if err != nil {
+               return err
+       }
+       defer release()
+
+       return f(ctx)
+}
+
+// TryLockAndDo tries to acquire a lock for the given key and then calls the given function.
+// It uses the default locker, and it will return false if failed to acquire the lock.
+func TryLockAndDo(ctx context.Context, key string, f func(context.Context) error) (bool, error) {
+       ok, ctx, release, err := TryLock(ctx, key)
+       if err != nil {
+               return false, err
+       }
+       defer release()
+
+       if !ok {
+               return false, nil
+       }
+
+       return true, f(ctx)
+}
diff --git a/modules/globallock/globallock_test.go b/modules/globallock/globallock_test.go
new file mode 100644 (file)
index 0000000..88a555c
--- /dev/null
@@ -0,0 +1,96 @@
+// Copyright 2024 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package globallock
+
+import (
+       "context"
+       "os"
+       "sync"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestLockAndDo(t *testing.T) {
+       t.Run("redis", func(t *testing.T) {
+               url := "redis://127.0.0.1:6379/0"
+               if os.Getenv("CI") == "" {
+                       // Make it possible to run tests against a local redis instance
+                       url = os.Getenv("TEST_REDIS_URL")
+                       if url == "" {
+                               t.Skip("TEST_REDIS_URL not set and not running in CI")
+                               return
+                       }
+               }
+
+               oldDefaultLocker := defaultLocker
+               oldInitFunc := initFunc
+               defer func() {
+                       defaultLocker = oldDefaultLocker
+                       initFunc = oldInitFunc
+                       if defaultLocker == nil {
+                               initOnce = sync.Once{}
+                       }
+               }()
+
+               initOnce = sync.Once{}
+               initFunc = func() {
+                       defaultLocker = NewRedisLocker(url)
+               }
+
+               testLockAndDo(t)
+               require.NoError(t, defaultLocker.(*redisLocker).Close())
+       })
+       t.Run("memory", func(t *testing.T) {
+               oldDefaultLocker := defaultLocker
+               oldInitFunc := initFunc
+               defer func() {
+                       defaultLocker = oldDefaultLocker
+                       initFunc = oldInitFunc
+                       if defaultLocker == nil {
+                               initOnce = sync.Once{}
+                       }
+               }()
+
+               initOnce = sync.Once{}
+               initFunc = func() {
+                       defaultLocker = NewMemoryLocker()
+               }
+
+               testLockAndDo(t)
+       })
+}
+
+func testLockAndDo(t *testing.T) {
+       const concurrency = 1000
+
+       ctx := context.Background()
+       count := 0
+       wg := sync.WaitGroup{}
+       wg.Add(concurrency)
+       for i := 0; i < concurrency; i++ {
+               go func() {
+                       defer wg.Done()
+                       err := LockAndDo(ctx, "test", func(ctx context.Context) error {
+                               count++
+
+                               // It's impossible to acquire the lock inner the function
+                               ok, err := TryLockAndDo(ctx, "test", func(ctx context.Context) error {
+                                       assert.Fail(t, "should not acquire the lock")
+                                       return nil
+                               })
+                               assert.False(t, ok)
+                               assert.NoError(t, err)
+
+                               return nil
+                       })
+                       require.NoError(t, err)
+               }()
+       }
+
+       wg.Wait()
+
+       assert.Equal(t, concurrency, count)
+}
diff --git a/modules/globallock/locker.go b/modules/globallock/locker.go
new file mode 100644 (file)
index 0000000..b0764cd
--- /dev/null
@@ -0,0 +1,60 @@
+// Copyright 2024 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package globallock
+
+import (
+       "context"
+       "fmt"
+)
+
+type Locker interface {
+       // Lock tries to acquire a lock for the given key, it blocks until the lock is acquired or the context is canceled.
+       //
+       // Lock returns a new context which should be used in the following code.
+       // The new context will be canceled when the lock is released or lost - yes, it's possible to lose a lock.
+       // For example, it lost the connection to the redis server while holding the lock.
+       // If it fails to acquire the lock, the returned context will be the same as the input context.
+       //
+       // Lock returns a ReleaseFunc to release the lock, it cannot be nil.
+       // It's always safe to call this function even if it fails to acquire the lock, and it will do nothing in that case.
+       // And it's also safe to call it multiple times, but it will only release the lock once.
+       // That's why it's called ReleaseFunc, not UnlockFunc.
+       // But be aware that it's not safe to not call it at all; it could lead to a memory leak.
+       // So a recommended pattern is to use defer to call it:
+       //   ctx, release, err := locker.Lock(ctx, "key")
+       //   if err != nil {
+       //     return err
+       //   }
+       //   defer release()
+       // The ReleaseFunc will return the original context which was used to acquire the lock.
+       // It's useful when you want to continue to do something after releasing the lock.
+       // At that time, the ctx will be canceled, and you can use the returned context by the ReleaseFunc to continue:
+       //   ctx, release, err := locker.Lock(ctx, "key")
+       //   if err != nil {
+       //     return err
+       //   }
+       //   defer release()
+       //   doSomething(ctx)
+       //   ctx = release()
+       //   doSomethingElse(ctx)
+       // Please ignore it and use `defer release()` instead if you don't need this, to avoid forgetting to release the lock.
+       //
+       // Lock returns an error if failed to acquire the lock.
+       // Be aware that even the context is not canceled, it's still possible to fail to acquire the lock.
+       // For example, redis is down, or it reached the maximum number of tries.
+       Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error)
+
+       // TryLock tries to acquire a lock for the given key, it returns immediately.
+       // It follows the same pattern as Lock, but it doesn't block.
+       // And if it fails to acquire the lock because it's already locked, not other reasons like redis is down,
+       // it will return false without any error.
+       TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error)
+}
+
+// ReleaseFunc is a function that releases a lock.
+// It returns the original context which was used to acquire the lock.
+type ReleaseFunc func() context.Context
+
+// ErrLockReleased is used as context cause when a lock is released
+var ErrLockReleased = fmt.Errorf("lock released")
diff --git a/modules/globallock/locker_test.go b/modules/globallock/locker_test.go
new file mode 100644 (file)
index 0000000..15a3c65
--- /dev/null
@@ -0,0 +1,211 @@
+// Copyright 2024 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package globallock
+
+import (
+       "context"
+       "os"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/go-redsync/redsync/v4"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestLocker(t *testing.T) {
+       t.Run("redis", func(t *testing.T) {
+               url := "redis://127.0.0.1:6379/0"
+               if os.Getenv("CI") == "" {
+                       // Make it possible to run tests against a local redis instance
+                       url = os.Getenv("TEST_REDIS_URL")
+                       if url == "" {
+                               t.Skip("TEST_REDIS_URL not set and not running in CI")
+                               return
+                       }
+               }
+               oldExpiry := redisLockExpiry
+               redisLockExpiry = 5 * time.Second // make it shorter for testing
+               defer func() {
+                       redisLockExpiry = oldExpiry
+               }()
+
+               locker := NewRedisLocker(url)
+               testLocker(t, locker)
+               testRedisLocker(t, locker.(*redisLocker))
+               require.NoError(t, locker.(*redisLocker).Close())
+       })
+       t.Run("memory", func(t *testing.T) {
+               locker := NewMemoryLocker()
+               testLocker(t, locker)
+               testMemoryLocker(t, locker.(*memoryLocker))
+       })
+}
+
+func testLocker(t *testing.T, locker Locker) {
+       t.Run("lock", func(t *testing.T) {
+               parentCtx := context.Background()
+               ctx, release, err := locker.Lock(parentCtx, "test")
+               defer release()
+
+               assert.NotEqual(t, parentCtx, ctx) // new context should be returned
+               assert.NoError(t, err)
+
+               func() {
+                       parentCtx, cancel := context.WithTimeout(context.Background(), time.Second)
+                       defer cancel()
+                       ctx, release, err := locker.Lock(parentCtx, "test")
+                       defer release()
+
+                       assert.Error(t, err)
+                       assert.Equal(t, parentCtx, ctx) // should return the same context
+               }()
+
+               release()
+               assert.Error(t, ctx.Err())
+
+               func() {
+                       _, release, err := locker.Lock(context.Background(), "test")
+                       defer release()
+
+                       assert.NoError(t, err)
+               }()
+       })
+
+       t.Run("try lock", func(t *testing.T) {
+               parentCtx := context.Background()
+               ok, ctx, release, err := locker.TryLock(parentCtx, "test")
+               defer release()
+
+               assert.True(t, ok)
+               assert.NotEqual(t, parentCtx, ctx) // new context should be returned
+               assert.NoError(t, err)
+
+               func() {
+                       parentCtx, cancel := context.WithTimeout(context.Background(), time.Second)
+                       defer cancel()
+                       ok, ctx, release, err := locker.TryLock(parentCtx, "test")
+                       defer release()
+
+                       assert.False(t, ok)
+                       assert.NoError(t, err)
+                       assert.Equal(t, parentCtx, ctx) // should return the same context
+               }()
+
+               release()
+               assert.Error(t, ctx.Err())
+
+               func() {
+                       ok, _, release, _ := locker.TryLock(context.Background(), "test")
+                       defer release()
+
+                       assert.True(t, ok)
+               }()
+       })
+
+       t.Run("wait and acquired", func(t *testing.T) {
+               ctx := context.Background()
+               _, release, err := locker.Lock(ctx, "test")
+               require.NoError(t, err)
+
+               wg := &sync.WaitGroup{}
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       started := time.Now()
+                       _, release, err := locker.Lock(context.Background(), "test") // should be blocked for seconds
+                       defer release()
+                       assert.Greater(t, time.Since(started), time.Second)
+                       assert.NoError(t, err)
+               }()
+
+               time.Sleep(2 * time.Second)
+               release()
+
+               wg.Wait()
+       })
+
+       t.Run("continue after release", func(t *testing.T) {
+               ctx := context.Background()
+
+               ctxBeforeLock := ctx
+               ctx, release, err := locker.Lock(ctx, "test")
+
+               require.NoError(t, err)
+               assert.NoError(t, ctx.Err())
+               assert.NotEqual(t, ctxBeforeLock, ctx)
+
+               ctxBeforeRelease := ctx
+               ctx = release()
+
+               assert.NoError(t, ctx.Err())
+               assert.Error(t, ctxBeforeRelease.Err())
+
+               // so it can continue with ctx to do more work
+       })
+
+       t.Run("multiple release", func(t *testing.T) {
+               ctx := context.Background()
+
+               _, release1, err := locker.Lock(ctx, "test")
+               require.NoError(t, err)
+
+               release1()
+
+               _, release2, err := locker.Lock(ctx, "test")
+               defer release2()
+               require.NoError(t, err)
+
+               // Call release1 again,
+               // it should not panic or block,
+               // and it shouldn't affect the other lock
+               release1()
+
+               ok, _, release3, err := locker.TryLock(ctx, "test")
+               defer release3()
+               require.NoError(t, err)
+               // It should be able to acquire the lock;
+               // otherwise, it means the lock has been released by release1
+               assert.False(t, ok)
+       })
+}
+
+// testMemoryLocker does specific tests for memoryLocker
+func testMemoryLocker(t *testing.T, locker *memoryLocker) {
+       // nothing to do
+}
+
+// testRedisLocker does specific tests for redisLocker
+func testRedisLocker(t *testing.T, locker *redisLocker) {
+       defer func() {
+               // This case should be tested at the end.
+               // Otherwise, it will affect other tests.
+               t.Run("close", func(t *testing.T) {
+                       assert.NoError(t, locker.Close())
+                       _, _, err := locker.Lock(context.Background(), "test")
+                       assert.Error(t, err)
+               })
+       }()
+
+       t.Run("failed extend", func(t *testing.T) {
+               ctx, release, err := locker.Lock(context.Background(), "test")
+               defer release()
+               require.NoError(t, err)
+
+               // It simulates that there are some problems with extending like network issues or redis server down.
+               v, ok := locker.mutexM.Load("test")
+               require.True(t, ok)
+               m := v.(*redisMutex)
+               _, _ = m.mutex.Unlock() // release it to make it impossible to extend
+
+               select {
+               case <-time.After(redisLockExpiry + time.Second):
+                       t.Errorf("lock should be expired")
+               case <-ctx.Done():
+                       var errTaken *redsync.ErrTaken
+                       assert.ErrorAs(t, context.Cause(ctx), &errTaken)
+               }
+       })
+}
diff --git a/modules/globallock/memory_locker.go b/modules/globallock/memory_locker.go
new file mode 100644 (file)
index 0000000..fb1fc79
--- /dev/null
@@ -0,0 +1,80 @@
+// Copyright 2024 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package globallock
+
+import (
+       "context"
+       "sync"
+       "time"
+)
+
+type memoryLocker struct {
+       locks sync.Map
+}
+
+var _ Locker = &memoryLocker{}
+
+func NewMemoryLocker() Locker {
+       return &memoryLocker{}
+}
+
+func (l *memoryLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) {
+       originalCtx := ctx
+
+       if l.tryLock(key) {
+               ctx, cancel := context.WithCancelCause(ctx)
+               releaseOnce := sync.Once{}
+               return ctx, func() context.Context {
+                       releaseOnce.Do(func() {
+                               l.locks.Delete(key)
+                               cancel(ErrLockReleased)
+                       })
+                       return originalCtx
+               }, nil
+       }
+
+       ticker := time.NewTicker(time.Millisecond * 100)
+       defer ticker.Stop()
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx, func() context.Context { return originalCtx }, ctx.Err()
+               case <-ticker.C:
+                       if l.tryLock(key) {
+                               ctx, cancel := context.WithCancelCause(ctx)
+                               releaseOnce := sync.Once{}
+                               return ctx, func() context.Context {
+                                       releaseOnce.Do(func() {
+                                               l.locks.Delete(key)
+                                               cancel(ErrLockReleased)
+                                       })
+                                       return originalCtx
+                               }, nil
+                       }
+               }
+       }
+}
+
+func (l *memoryLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) {
+       originalCtx := ctx
+
+       if l.tryLock(key) {
+               ctx, cancel := context.WithCancelCause(ctx)
+               releaseOnce := sync.Once{}
+               return true, ctx, func() context.Context {
+                       releaseOnce.Do(func() {
+                               cancel(ErrLockReleased)
+                               l.locks.Delete(key)
+                       })
+                       return originalCtx
+               }, nil
+       }
+
+       return false, ctx, func() context.Context { return originalCtx }, nil
+}
+
+func (l *memoryLocker) tryLock(key string) bool {
+       _, loaded := l.locks.LoadOrStore(key, struct{}{})
+       return !loaded
+}
diff --git a/modules/globallock/redis_locker.go b/modules/globallock/redis_locker.go
new file mode 100644 (file)
index 0000000..34b2fab
--- /dev/null
@@ -0,0 +1,154 @@
+// Copyright 2024 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package globallock
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "code.gitea.io/gitea/modules/nosql"
+
+       "github.com/go-redsync/redsync/v4"
+       "github.com/go-redsync/redsync/v4/redis/goredis/v9"
+)
+
+const redisLockKeyPrefix = "gitea:globallock:"
+
+// redisLockExpiry is the default expiry time for a lock.
+// Define it as a variable to make it possible to change it in tests.
+var redisLockExpiry = 30 * time.Second
+
+type redisLocker struct {
+       rs *redsync.Redsync
+
+       mutexM   sync.Map
+       closed   atomic.Bool
+       extendWg sync.WaitGroup
+}
+
+var _ Locker = &redisLocker{}
+
+func NewRedisLocker(connection string) Locker {
+       l := &redisLocker{
+               rs: redsync.New(
+                       goredis.NewPool(
+                               nosql.GetManager().GetRedisClient(connection),
+                       ),
+               ),
+       }
+
+       l.extendWg.Add(1)
+       l.startExtend()
+
+       return l
+}
+
+func (l *redisLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) {
+       return l.lock(ctx, key, 0)
+}
+
+func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) {
+       ctx, f, err := l.lock(ctx, key, 1)
+
+       var (
+               errTaken     *redsync.ErrTaken
+               errNodeTaken *redsync.ErrNodeTaken
+       )
+       if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) {
+               return false, ctx, f, nil
+       }
+       return err == nil, ctx, f, err
+}
+
+// Close closes the locker.
+// It will stop extending the locks and refuse to acquire new locks.
+// In actual use, it is not necessary to call this function.
+// But it's useful in tests to release resources.
+// It could take some time since it waits for the extending goroutine to finish.
+func (l *redisLocker) Close() error {
+       l.closed.Store(true)
+       l.extendWg.Wait()
+       return nil
+}
+
+type redisMutex struct {
+       mutex  *redsync.Mutex
+       cancel context.CancelCauseFunc
+}
+
+func (l *redisLocker) lock(ctx context.Context, key string, tries int) (context.Context, ReleaseFunc, error) {
+       if l.closed.Load() {
+               return ctx, func() context.Context { return ctx }, fmt.Errorf("locker is closed")
+       }
+
+       originalCtx := ctx
+
+       options := []redsync.Option{
+               redsync.WithExpiry(redisLockExpiry),
+       }
+       if tries > 0 {
+               options = append(options, redsync.WithTries(tries))
+       }
+       mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...)
+       if err := mutex.LockContext(ctx); err != nil {
+               return ctx, func() context.Context { return originalCtx }, err
+       }
+
+       ctx, cancel := context.WithCancelCause(ctx)
+
+       l.mutexM.Store(key, &redisMutex{
+               mutex:  mutex,
+               cancel: cancel,
+       })
+
+       releaseOnce := sync.Once{}
+       return ctx, func() context.Context {
+               releaseOnce.Do(func() {
+                       l.mutexM.Delete(key)
+
+                       // It's safe to ignore the error here,
+                       // if it failed to unlock, it will be released automatically after the lock expires.
+                       // Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out.
+                       _, _ = mutex.Unlock()
+
+                       cancel(ErrLockReleased)
+               })
+               return originalCtx
+       }, nil
+}
+
+func (l *redisLocker) startExtend() {
+       if l.closed.Load() {
+               l.extendWg.Done()
+               return
+       }
+
+       toExtend := make([]*redisMutex, 0)
+       l.mutexM.Range(func(_, value any) bool {
+               m := value.(*redisMutex)
+
+               // Extend the lock if it is not expired.
+               // Although the mutex will be removed from the map before it is released,
+               // it still can be expired because of a failed extension.
+               // If it happens, the cancel function should have been called,
+               // so it does not need to be extended anymore.
+               if time.Now().After(m.mutex.Until()) {
+                       return true
+               }
+
+               toExtend = append(toExtend, m)
+               return true
+       })
+       for _, v := range toExtend {
+               if ok, err := v.mutex.Extend(); !ok {
+                       v.cancel(err)
+               }
+       }
+
+       time.AfterFunc(redisLockExpiry/2, l.startExtend)
+}