aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/unique_queue_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/unique_queue_channel.go')
-rw-r--r--modules/queue/unique_queue_channel.go15
1 files changed, 7 insertions, 8 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index 6e8d37a20c..d1bf7239eb 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"
+ "code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
@@ -33,7 +34,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
- table map[string]bool
+ table container.Set[string]
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
@@ -58,7 +59,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelUniqueQueue{
- table: map[string]bool{},
+ table: make(container.Set[string]),
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
@@ -73,7 +74,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
bs, _ := json.Marshal(datum)
queue.lock.Lock()
- delete(queue.table, string(bs))
+ queue.table.Remove(string(bs))
queue.lock.Unlock()
if u := handle(datum); u != nil {
@@ -127,16 +128,15 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
q.lock.Unlock()
}
}()
- if _, ok := q.table[string(bs)]; ok {
+ if !q.table.Add(string(bs)) {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
- q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
- delete(q.table, string(bs))
+ q.table.Remove(string(bs))
return err
}
}
@@ -155,8 +155,7 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
q.lock.Lock()
defer q.lock.Unlock()
- _, has := q.table[string(bs)]
- return has, nil
+ return q.table.Contains(string(bs)), nil
}
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager