diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2023-05-11 15:45:47 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-11 07:45:47 +0000 |
commit | 58dfaf3a75a097088376a9c221784b3675ac9c48 (patch) | |
tree | 62779d06087d0707b15f13e503c73a82300ef217 /modules | |
parent | b3af7484bc821d71cb20f6289f767119494bc81e (diff) | |
download | gitea-58dfaf3a75a097088376a9c221784b3675ac9c48.tar.gz gitea-58dfaf3a75a097088376a9c221784b3675ac9c48.zip |
Improve queue & process & stacktrace (#24636)
Although some features are mixed together in this PR, this PR is not
that large, and these features are all related.
Actually there are more than 70 lines are for a toy "test queue", so
this PR is quite simple.
Major features:
1. Allow site admin to clear a queue (remove all items in a queue)
* Because there is no transaction, the "unique queue" could be corrupted
in rare cases, that's unfixable.
* eg: the item is in the "set" but not in the "list", so the item would
never be able to be pushed into the queue.
* Now site admin could simply clear the queue, then everything becomes
correct, the lost items could be re-pushed into queue by future
operations.
3. Split the "admin/monitor" to separate pages
4. Allow to download diagnosis report
* In history, there were many users reporting that Gitea queue gets
stuck, or Gitea's CPU is 100%
* With diagnosis report, maintainers could know what happens clearly
The diagnosis report sample:
[gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip)
, use "go tool pprof profile.dat" to view the report.
Screenshots:
![image](https://github.com/go-gitea/gitea/assets/2114189/320659b4-2eda-4def-8dc0-5ea08d578063)
![image](https://github.com/go-gitea/gitea/assets/2114189/c5c46fae-9dc0-44ca-8cd3-57beedc5035e)
![image](https://github.com/go-gitea/gitea/assets/2114189/6168a811-42a1-4e64-a263-0617a6c8c4fe)
---------
Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules')
-rw-r--r-- | modules/queue/base_channel.go | 12 | ||||
-rw-r--r-- | modules/queue/base_levelqueue_unique.go | 13 | ||||
-rw-r--r-- | modules/queue/base_redis.go | 3 | ||||
-rw-r--r-- | modules/queue/manager.go | 3 | ||||
-rw-r--r-- | modules/queue/workerqueue.go | 5 |
5 files changed, 29 insertions, 7 deletions
diff --git a/modules/queue/base_channel.go b/modules/queue/base_channel.go index 27055faf4b..d03c72bdae 100644 --- a/modules/queue/base_channel.go +++ b/modules/queue/base_channel.go @@ -87,7 +87,9 @@ func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) { func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) { q.mu.Lock() defer q.mu.Unlock() - + if !q.isUnique { + return false, nil + } return q.set.Contains(string(data)), nil } @@ -107,7 +109,9 @@ func (q *baseChannel) Close() error { defer q.mu.Unlock() close(q.c) - q.set = container.Set[string]{} + if q.isUnique { + q.set = container.Set[string]{} + } return nil } @@ -119,5 +123,9 @@ func (q *baseChannel) RemoveAll(ctx context.Context) error { for q.c != nil && len(q.c) > 0 { <-q.c } + + if q.isUnique { + q.set = container.Set[string]{} + } return nil } diff --git a/modules/queue/base_levelqueue_unique.go b/modules/queue/base_levelqueue_unique.go index 7546221631..1acd504e32 100644 --- a/modules/queue/base_levelqueue_unique.go +++ b/modules/queue/base_levelqueue_unique.go @@ -77,6 +77,14 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error { } lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal)) + for lq.q.Len() > 0 { + if _, err := lq.q.LPop(); err != nil { + return err + } + } + + // the "set" must be cleared after the "list" because there is no transaction. + // it's better to have duplicate items than losing items. members, err := lq.set.Members() if err != nil { return err // seriously corrupted @@ -84,10 +92,5 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error { for _, v := range members { _, _ = lq.set.Remove(v) } - for lq.q.Len() > 0 { - if _, err = lq.q.LPop(); err != nil { - return err - } - } return nil } diff --git a/modules/queue/base_redis.go b/modules/queue/base_redis.go index a294077cc6..a1e234943d 100644 --- a/modules/queue/base_redis.go +++ b/modules/queue/base_redis.go @@ -123,7 +123,10 @@ func (q *baseRedis) Close() error { func (q *baseRedis) RemoveAll(ctx context.Context) error { q.mu.Lock() defer q.mu.Unlock() + c1 := q.client.Del(ctx, q.cfg.QueueFullName) + // the "set" must be cleared after the "list" because there is no transaction. + // it's better to have duplicate items than losing items. c2 := q.client.Del(ctx, q.cfg.SetFullName) if c1.Err() != nil { return c1.Err() diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 03dbc72da4..95b3bad57b 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -33,6 +33,9 @@ type ManagedWorkerPoolQueue interface { // FlushWithContext tries to make the handler process all items in the queue synchronously. // It is for testing purpose only. It's not designed to be used in a cluster. FlushWithContext(ctx context.Context, timeout time.Duration) error + + // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected) + RemoveAllItems(ctx context.Context) error } var manager *Manager diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index 493bea17aa..de4485fa51 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -130,6 +130,11 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time. } } +// RemoveAllItems removes all items in the baes queue +func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error { + return q.baseQueue.RemoveAll(ctx) +} + func (q *WorkerPoolQueue[T]) marshal(data T) []byte { bs, err := json.Marshal(data) if err != nil { |