diff options
Diffstat (limited to 'modules/queue/unique_queue_channel.go')
-rw-r--r-- | modules/queue/unique_queue_channel.go | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 6e8d37a20c..d1bf7239eb 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "code.gitea.io/gitea/modules/container" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -33,7 +34,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[string]bool + table container.Set[string] shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -58,7 +59,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[string]bool{}, + table: make(container.Set[string]), shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -73,7 +74,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue bs, _ := json.Marshal(datum) queue.lock.Lock() - delete(queue.table, string(bs)) + queue.table.Remove(string(bs)) queue.lock.Unlock() if u := handle(datum); u != nil { @@ -127,16 +128,15 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[string(bs)]; ok { + if !q.table.Add(string(bs)) { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, string(bs)) + q.table.Remove(string(bs)) return err } } @@ -155,8 +155,7 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[string(bs)] - return has, nil + return q.table.Contains(string(bs)), nil } // Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager |