summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorwxiaoguang <wxiaoguang@gmail.com>2023-05-11 15:45:47 +0800
committerGitHub <noreply@github.com>2023-05-11 07:45:47 +0000
commit58dfaf3a75a097088376a9c221784b3675ac9c48 (patch)
tree62779d06087d0707b15f13e503c73a82300ef217 /modules
parentb3af7484bc821d71cb20f6289f767119494bc81e (diff)
downloadgitea-58dfaf3a75a097088376a9c221784b3675ac9c48.tar.gz
gitea-58dfaf3a75a097088376a9c221784b3675ac9c48.zip
Improve queue & process & stacktrace (#24636)
Although some features are mixed together in this PR, this PR is not that large, and these features are all related. Actually there are more than 70 lines are for a toy "test queue", so this PR is quite simple. Major features: 1. Allow site admin to clear a queue (remove all items in a queue) * Because there is no transaction, the "unique queue" could be corrupted in rare cases, that's unfixable. * eg: the item is in the "set" but not in the "list", so the item would never be able to be pushed into the queue. * Now site admin could simply clear the queue, then everything becomes correct, the lost items could be re-pushed into queue by future operations. 3. Split the "admin/monitor" to separate pages 4. Allow to download diagnosis report * In history, there were many users reporting that Gitea queue gets stuck, or Gitea's CPU is 100% * With diagnosis report, maintainers could know what happens clearly The diagnosis report sample: [gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip) , use "go tool pprof profile.dat" to view the report. Screenshots: ![image](https://github.com/go-gitea/gitea/assets/2114189/320659b4-2eda-4def-8dc0-5ea08d578063) ![image](https://github.com/go-gitea/gitea/assets/2114189/c5c46fae-9dc0-44ca-8cd3-57beedc5035e) ![image](https://github.com/go-gitea/gitea/assets/2114189/6168a811-42a1-4e64-a263-0617a6c8c4fe) --------- Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/base_channel.go12
-rw-r--r--modules/queue/base_levelqueue_unique.go13
-rw-r--r--modules/queue/base_redis.go3
-rw-r--r--modules/queue/manager.go3
-rw-r--r--modules/queue/workerqueue.go5
5 files changed, 29 insertions, 7 deletions
diff --git a/modules/queue/base_channel.go b/modules/queue/base_channel.go
index 27055faf4b..d03c72bdae 100644
--- a/modules/queue/base_channel.go
+++ b/modules/queue/base_channel.go
@@ -87,7 +87,9 @@ func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()
-
+ if !q.isUnique {
+ return false, nil
+ }
return q.set.Contains(string(data)), nil
}
@@ -107,7 +109,9 @@ func (q *baseChannel) Close() error {
defer q.mu.Unlock()
close(q.c)
- q.set = container.Set[string]{}
+ if q.isUnique {
+ q.set = container.Set[string]{}
+ }
return nil
}
@@ -119,5 +123,9 @@ func (q *baseChannel) RemoveAll(ctx context.Context) error {
for q.c != nil && len(q.c) > 0 {
<-q.c
}
+
+ if q.isUnique {
+ q.set = container.Set[string]{}
+ }
return nil
}
diff --git a/modules/queue/base_levelqueue_unique.go b/modules/queue/base_levelqueue_unique.go
index 7546221631..1acd504e32 100644
--- a/modules/queue/base_levelqueue_unique.go
+++ b/modules/queue/base_levelqueue_unique.go
@@ -77,6 +77,14 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
}
lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))
+ for lq.q.Len() > 0 {
+ if _, err := lq.q.LPop(); err != nil {
+ return err
+ }
+ }
+
+ // the "set" must be cleared after the "list" because there is no transaction.
+ // it's better to have duplicate items than losing items.
members, err := lq.set.Members()
if err != nil {
return err // seriously corrupted
@@ -84,10 +92,5 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
for _, v := range members {
_, _ = lq.set.Remove(v)
}
- for lq.q.Len() > 0 {
- if _, err = lq.q.LPop(); err != nil {
- return err
- }
- }
return nil
}
diff --git a/modules/queue/base_redis.go b/modules/queue/base_redis.go
index a294077cc6..a1e234943d 100644
--- a/modules/queue/base_redis.go
+++ b/modules/queue/base_redis.go
@@ -123,7 +123,10 @@ func (q *baseRedis) Close() error {
func (q *baseRedis) RemoveAll(ctx context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()
+
c1 := q.client.Del(ctx, q.cfg.QueueFullName)
+ // the "set" must be cleared after the "list" because there is no transaction.
+ // it's better to have duplicate items than losing items.
c2 := q.client.Del(ctx, q.cfg.SetFullName)
if c1.Err() != nil {
return c1.Err()
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 03dbc72da4..95b3bad57b 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -33,6 +33,9 @@ type ManagedWorkerPoolQueue interface {
// FlushWithContext tries to make the handler process all items in the queue synchronously.
// It is for testing purpose only. It's not designed to be used in a cluster.
FlushWithContext(ctx context.Context, timeout time.Duration) error
+
+ // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
+ RemoveAllItems(ctx context.Context) error
}
var manager *Manager
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
index 493bea17aa..de4485fa51 100644
--- a/modules/queue/workerqueue.go
+++ b/modules/queue/workerqueue.go
@@ -130,6 +130,11 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
}
}
+// RemoveAllItems removes all items in the baes queue
+func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
+ return q.baseQueue.RemoveAll(ctx)
+}
+
func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
bs, err := json.Marshal(data)
if err != nil {