]> source.dussan.org Git - gitea.git/commitdiff
Make WrappedQueues and PersistableChannelUniqueQueues Pausable (#18393)
authorzeripath <art27@cantab.net>
Mon, 24 Jan 2022 22:54:35 +0000 (22:54 +0000)
committerGitHub <noreply@github.com>
Mon, 24 Jan 2022 22:54:35 +0000 (22:54 +0000)
Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues

Reference #15928

Signed-off-by: Andrew Thornton art27@cantab.net
modules/queue/queue.go
modules/queue/queue_bytefifo.go
modules/queue/queue_channel.go
modules/queue/queue_wrapped.go
modules/queue/setting.go
modules/queue/unique_queue_channel.go
modules/queue/unique_queue_disk_channel.go
modules/queue/unique_queue_wrapped.go
modules/queue/workerpool.go

index 3a519651430e1ffb151d8ec669d42547f0dd3d15..a166a935a6777ea00e6b63878f9cf3218a7c2c51 100644 (file)
@@ -196,7 +196,7 @@ func RegisteredTypesAsString() []string {
 func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
        newFn, ok := queuesMap[queueType]
        if !ok {
-               return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
+               return nil, fmt.Errorf("unsupported queue type: %v", queueType)
        }
        return newFn(handlerFunc, opts, exemplar)
 }
index 0380497ea675f3710e04f4a2d27ab379f607b912..7f2acf3debbd7542e72b618480a7b7377662f26a 100644 (file)
@@ -92,7 +92,7 @@ func (q *ByteFIFOQueue) Push(data Data) error {
 // PushBack pushes data to the fifo
 func (q *ByteFIFOQueue) PushBack(data Data) error {
        if !assignableTo(data, q.exemplar) {
-               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+               return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
        }
        bs, err := json.Marshal(data)
        if err != nil {
@@ -110,7 +110,7 @@ func (q *ByteFIFOQueue) PushBack(data Data) error {
 // PushFunc pushes data to the fifo
 func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
        if !assignableTo(data, q.exemplar) {
-               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+               return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
        }
        bs, err := json.Marshal(data)
        if err != nil {
@@ -398,7 +398,7 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
 // Has checks if the provided data is in the queue
 func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
        if !assignableTo(data, q.exemplar) {
-               return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+               return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
        }
        bs, err := json.Marshal(data)
        if err != nil {
index 7de9c17c862434518cfada08fbe2fc95a7d6c80d..105388f4214e9ac7e100a40f92530fada2f6f2b4 100644 (file)
@@ -93,7 +93,7 @@ func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
 // Push will push data into the queue
 func (q *ChannelQueue) Push(data Data) error {
        if !assignableTo(data, q.exemplar) {
-               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
+               return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
        }
        q.WorkerPool.Push(data)
        return nil
index edb589338ae45a5e4d0e835b0dad5afd26a02ef3..02f7818aa891222151d6300f1bab4a37b4317d10 100644 (file)
@@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
                        if s, ok := cfg.([]byte); ok {
                                cfg = string(s)
                        }
-                       return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
+                       return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
                default:
                        queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
                        if err == nil {
@@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
                        i++
                        if q.maxAttempts > 0 && i > q.maxAttempts {
                                if bs, ok := q.cfg.([]byte); ok {
-                                       return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
+                                       return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
                                }
-                               return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+                               return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
                        }
                        sleepTime := 100 * time.Millisecond
                        if q.timeout > 0 && q.maxAttempts > 0 {
@@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() {
        log.Debug("WrappedQueue: %s Terminated", q.name)
 }
 
+// IsPaused will return if the pool or queue is paused
+func (q *WrappedQueue) IsPaused() bool {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       pausable, ok := q.internal.(Pausable)
+       return ok && pausable.IsPaused()
+}
+
+// Pause will pause the pool or queue
+func (q *WrappedQueue) Pause() {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       if pausable, ok := q.internal.(Pausable); ok {
+               pausable.Pause()
+       }
+}
+
+// Resume will resume the pool or queue
+func (q *WrappedQueue) Resume() {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       if pausable, ok := q.internal.(Pausable); ok {
+               pausable.Resume()
+       }
+}
+
+// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       if pausable, ok := q.internal.(Pausable); ok {
+               return pausable.IsPausedIsResumed()
+       }
+       return context.Background().Done(), closedChan
+}
+
+var closedChan chan struct{}
+
 func init() {
        queuesMap[WrappedQueueType] = NewWrappedQueue
+       closedChan = make(chan struct{})
+       close(closedChan)
 }
index 61f156c377e264b9a79169617134418fa4c9a3e0..880770f0739571aaa419fe5118bd17a42ba86627 100644 (file)
@@ -22,7 +22,7 @@ func validType(t string) (Type, error) {
                        return typ, nil
                }
        }
-       return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
+       return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
 }
 
 func getQueueSettings(name string) (setting.QueueSettings, []byte) {
index b6d2e770fce2d48fd6adf5a0bf600babffd9cae7..59210855a18935af8c69bb2ce081c46a1da77933 100644 (file)
@@ -111,7 +111,7 @@ func (q *ChannelUniqueQueue) Push(data Data) error {
 // PushFunc will push data into the queue
 func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
        if !assignableTo(data, q.exemplar) {
-               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
+               return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
        }
 
        bs, err := json.Marshal(data)
index 7fc304b17e9bdf741fa63ec3b28f27b7c480b0db..ac7919926f22ec2c68446a12633ccbaa620eb324 100644 (file)
@@ -239,6 +239,26 @@ func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
        return q.channelQueue.IsEmpty()
 }
 
+// IsPaused will return if the pool or queue is paused
+func (q *PersistableChannelUniqueQueue) IsPaused() bool {
+       return q.channelQueue.IsPaused()
+}
+
+// Pause will pause the pool or queue
+func (q *PersistableChannelUniqueQueue) Pause() {
+       q.channelQueue.Pause()
+}
+
+// Resume will resume the pool or queue
+func (q *PersistableChannelUniqueQueue) Resume() {
+       q.channelQueue.Resume()
+}
+
+// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
+       return q.channelQueue.IsPausedIsResumed()
+}
+
 // Shutdown processing this queue
 func (q *PersistableChannelUniqueQueue) Shutdown() {
        log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
index 32fa9ed970dbc6bfb139a64787df696077c728d9..5245a35f77175a6147c9df6693ef73158128183c 100644 (file)
@@ -105,7 +105,7 @@ func (q *WrappedUniqueQueue) Push(data Data) error {
 // PushFunc will push the data to the internal channel checking it against the exemplar
 func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
        if !assignableTo(data, q.exemplar) {
-               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+               return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
        }
 
        q.tlock.Lock()
index da56216dcb12433c2271e669a58f93732ace3f02..30dc8073c90b7d33cd70bf72813e3a9d7f02820e 100644 (file)
@@ -57,14 +57,12 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
        ctx, cancel := context.WithCancel(context.Background())
 
        dataChan := make(chan Data, config.QueueLength)
-       resumed := make(chan struct{})
-       close(resumed)
        pool := &WorkerPool{
                baseCtx:            ctx,
                baseCtxCancel:      cancel,
                batchLength:        config.BatchLength,
                dataChan:           dataChan,
-               resumed:            resumed,
+               resumed:            closedChan,
                paused:             make(chan struct{}),
                handle:             handle,
                blockTimeout:       config.BlockTimeout,