diff options
author | zeripath <art27@cantab.net> | 2020-01-29 01:01:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-28 20:01:06 -0500 |
commit | c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3 (patch) | |
tree | 4017848a786da2080e9a003a77bd40bd81625680 /modules/queue/queue_disk.go | |
parent | 7c84dbca4f0f79dc90752105800a6964693283bd (diff) | |
download | gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.tar.gz gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.zip |
Queue: Make WorkerPools and Queues flushable (#10001)
* Make WorkerPools and Queues flushable
Adds Flush methods to Queues and the WorkerPool
Further abstracts the WorkerPool
Adds a final step to Flush the queues in the defer from PrintCurrentTest
Fixes an issue with Settings inheritance in queues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Change to for loop
* Add IsEmpty and begin just making the queues composed WorkerPools
* subsume workerpool into the queues and create a flushable interface
* Add manager command
* Move flushall to queue.Manager and add to testlogger
* As per @guillep2k
* as per @guillep2k
* Just make queues all implement flushable and clean up the wrapped queue flushes
* cope with no timeout
Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue/queue_disk.go')
-rw-r--r-- | modules/queue/queue_disk.go | 87 |
1 files changed, 36 insertions, 51 deletions
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 98e7b24e42..ca3e230e3d 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -8,8 +8,8 @@ import ( "context" "encoding/json" "fmt" - "reflect" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -22,20 +22,15 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { - DataDir string - QueueLength int - BatchLength int - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + DataDir string + Workers int + Name string } // LevelQueue implements a disk library queue type LevelQueue struct { - pool *WorkerPool + *WorkerPool queue *levelqueue.Queue closed chan struct{} terminated chan struct{} @@ -58,21 +53,8 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) return nil, err } - dataChan := make(chan Data, config.QueueLength) - ctx, cancel := context.WithCancel(context.Background()) - queue := &LevelQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), queue: internal, exemplar: exemplar, closed: make(chan struct{}), @@ -80,7 +62,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) workers: config.Workers, name: config.Name, } - queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar) return queue, nil } @@ -88,9 +70,10 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), l.Shutdown) atTerminate(context.Background(), l.Terminate) + log.Debug("LevelQueue: %s Starting", l.name) go func() { - _ = l.pool.AddWorkers(l.workers, 0) + _ = l.AddWorkers(l.workers, 0) }() go l.readToChan() @@ -99,12 +82,12 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) <-l.closed log.Trace("LevelQueue: %s Waiting til done", l.name) - l.pool.Wait() + l.Wait() log.Trace("LevelQueue: %s Waiting til cleaned", l.name) ctx, cancel := context.WithCancel(context.Background()) atTerminate(ctx, cancel) - l.pool.CleanUp(ctx) + l.CleanUp(ctx) cancel() log.Trace("LevelQueue: %s Cleaned", l.name) @@ -115,56 +98,45 @@ func (l *LevelQueue) readToChan() { select { case <-l.closed: // tell the pool to shutdown. - l.pool.cancel() + l.cancel() return default: + atomic.AddInt64(&l.numInQueue, 1) bs, err := l.queue.RPop() if err != nil { if err != levelqueue.ErrNotFound { log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) } + atomic.AddInt64(&l.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } if len(bs) == 0 { + atomic.AddInt64(&l.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } - var data Data - if l.exemplar != nil { - t := reflect.TypeOf(l.exemplar) - n := reflect.New(t) - ne := n.Elem() - err = json.Unmarshal(bs, ne.Addr().Interface()) - data = ne.Interface().(Data) - } else { - err = json.Unmarshal(bs, &data) - } + data, err := unmarshalAs(bs, l.exemplar) if err != nil { log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) + atomic.AddInt64(&l.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } log.Trace("LevelQueue %s: Task found: %#v", l.name, data) - l.pool.Push(data) - + l.WorkerPool.Push(data) + atomic.AddInt64(&l.numInQueue, -1) } } } // Push will push the indexer data to queue func (l *LevelQueue) Push(data Data) error { - if l.exemplar != nil { - // Assert data is of same type as r.exemplar - value := reflect.ValueOf(data) - t := value.Type() - exemplarType := reflect.ValueOf(l.exemplar).Type() - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) - } + if !assignableTo(data, l.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) } bs, err := json.Marshal(data) if err != nil { @@ -173,16 +145,25 @@ func (l *LevelQueue) Push(data Data) error { return l.queue.LPush(bs) } +// IsEmpty checks whether the queue is empty +func (l *LevelQueue) IsEmpty() bool { + if !l.WorkerPool.IsEmpty() { + return false + } + return l.queue.Len() == 0 +} + // Shutdown this queue and stop processing func (l *LevelQueue) Shutdown() { l.lock.Lock() defer l.lock.Unlock() - log.Trace("LevelQueue: %s Shutdown", l.name) + log.Trace("LevelQueue: %s Shutting down", l.name) select { case <-l.closed: default: close(l.closed) } + log.Debug("LevelQueue: %s Shutdown", l.name) } // Terminate this queue and close the queue @@ -196,11 +177,15 @@ func (l *LevelQueue) Terminate() { default: close(l.terminated) l.lock.Unlock() + if log.IsDebug() { + log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len()) + } if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { log.Error("Error whilst closing internal queue in %s: %v", l.name, err) } } + log.Debug("LevelQueue: %s Terminated", l.name) } // Name returns the name of this queue |