summaryrefslogtreecommitdiffstats
path: root/modules/queue/unique_queue_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/unique_queue_channel.go')
-rw-r--r--modules/queue/unique_queue_channel.go29
1 files changed, 22 insertions, 7 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index 5bec67c4d3..f617595c04 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -9,6 +9,7 @@ import (
"fmt"
"sync"
+ "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
@@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
- table map[Data]bool
+ table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
@@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelUniqueQueue{
- table: map[Data]bool{},
+ table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
@@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
+ // No error is possible here because PushFunc ensures that this can be marshalled
+ bs, _ := json.Marshal(datum)
+
queue.lock.Lock()
- delete(queue.table, datum)
+ delete(queue.table, string(bs))
queue.lock.Unlock()
+
handle(datum)
}
}, config.WorkerPoolConfiguration)
@@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
+
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
q.lock.Lock()
locked := true
defer func() {
@@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
q.lock.Unlock()
}
}()
- if _, ok := q.table[data]; ok {
+ if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
- q.table[data] = true
+ q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
- delete(q.table, data)
+ delete(q.table, string(bs))
return err
}
}
@@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return false, err
+ }
+
q.lock.Lock()
defer q.lock.Unlock()
- _, has := q.table[data]
+ _, has := q.table[string(bs)]
return has, nil
}