diff options
author | zeripath <art27@cantab.net> | 2021-10-17 12:43:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-17 12:43:25 +0100 |
commit | 7117c7774ae1c1003c95d3df8dfaa5bd7270165d (patch) | |
tree | eab8f310ad065af318a5c4dedb7efdb6251436cb /modules/queue/unique_queue_channel.go | |
parent | b9a2f263b8a4c19b01f3440b52f0f90f3c1ee072 (diff) | |
download | gitea-7117c7774ae1c1003c95d3df8dfaa5bd7270165d.tar.gz gitea-7117c7774ae1c1003c95d3df8dfaa5bd7270165d.zip |
Make the Mirror Queue a queue (#17326)
Convert the old mirror syncing queue to the more modern queue format.
Fix a bug in the from the repo-archive queue PR - the assumption was made that uniqueness could be enforced with by checking equality in a map in channel unique queues - however this only works for primitive types - which was the initial intention but is an imperfect. This is fixed by marshalling the data and placing the martialled data in the unique map instead.
The documentation is also updated to add information about the deprecated configuration values.
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue/unique_queue_channel.go')
-rw-r--r-- | modules/queue/unique_queue_channel.go | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d3..f617595c04 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[Data]bool + table map[string]bool shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, + table: map[string]bool{}, shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { + // No error is possible here because PushFunc ensures that this can be marshalled + bs, _ := json.Marshal(datum) + queue.lock.Lock() - delete(queue.table, datum) + delete(queue.table, string(bs)) queue.lock.Unlock() + handle(datum) } }, config.WorkerPoolConfiguration) @@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } + + bs, err := json.Marshal(data) + if err != nil { + return err + } q.lock.Lock() locked := true defer func() { @@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[data]; ok { + if _, ok := q.table[string(bs)]; ok { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[data] = true + q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, data) + delete(q.table, string(bs)) return err } } @@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { // Has checks if the data is in the queue func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[data] + _, has := q.table[string(bs)] return has, nil } |