diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /modules/queue/queue_disk.go | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'modules/queue/queue_disk.go')
-rw-r--r-- | modules/queue/queue_disk.go | 178 |
1 files changed, 40 insertions, 138 deletions
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index ca3e230e3d..ff0876488b 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,15 +5,6 @@ package queue import ( - "context" - "encoding/json" - "fmt" - "sync" - "sync/atomic" - "time" - - "code.gitea.io/gitea/modules/log" - "gitea.com/lunny/levelqueue" ) @@ -22,22 +13,13 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { - WorkerPoolConfiguration + ByteFIFOQueueConfiguration DataDir string - Workers int - Name string } // LevelQueue implements a disk library queue type LevelQueue struct { - *WorkerPool - queue *levelqueue.Queue - closed chan struct{} - terminated chan struct{} - lock sync.Mutex - exemplar interface{} - workers int - name string + *ByteFIFOQueue } // NewLevelQueue creates a ledis local queue @@ -48,149 +30,69 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - internal, err := levelqueue.Open(config.DataDir) + byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) if err != nil { return nil, err } queue := &LevelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - queue: internal, - exemplar: exemplar, - closed: make(chan struct{}), - terminated: make(chan struct{}), - workers: config.Workers, - name: config.Name, + ByteFIFOQueue: byteFIFOQueue, } queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar) return queue, nil } -// Run starts to run the queue -func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), l.Shutdown) - atTerminate(context.Background(), l.Terminate) - log.Debug("LevelQueue: %s Starting", l.name) - - go func() { - _ = l.AddWorkers(l.workers, 0) - }() - - go l.readToChan() - - log.Trace("LevelQueue: %s Waiting til closed", l.name) - <-l.closed - - log.Trace("LevelQueue: %s Waiting til done", l.name) - l.Wait() - - log.Trace("LevelQueue: %s Waiting til cleaned", l.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - l.CleanUp(ctx) - cancel() - log.Trace("LevelQueue: %s Cleaned", l.name) - -} +var _ (ByteFIFO) = &LevelQueueByteFIFO{} -func (l *LevelQueue) readToChan() { - for { - select { - case <-l.closed: - // tell the pool to shutdown. - l.cancel() - return - default: - atomic.AddInt64(&l.numInQueue, 1) - bs, err := l.queue.RPop() - if err != nil { - if err != levelqueue.ErrNotFound { - log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) - } - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - data, err := unmarshalAs(bs, l.exemplar) - if err != nil { - log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) - atomic.AddInt64(&l.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("LevelQueue %s: Task found: %#v", l.name, data) - l.WorkerPool.Push(data) - atomic.AddInt64(&l.numInQueue, -1) - } - } +// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue +type LevelQueueByteFIFO struct { + internal *levelqueue.Queue } -// Push will push the indexer data to queue -func (l *LevelQueue) Push(data Data) error { - if !assignableTo(data, l.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) - } - bs, err := json.Marshal(data) +// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue +func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) { + internal, err := levelqueue.Open(dataDir) if err != nil { - return err + return nil, err } - return l.queue.LPush(bs) + + return &LevelQueueByteFIFO{ + internal: internal, + }, nil } -// IsEmpty checks whether the queue is empty -func (l *LevelQueue) IsEmpty() bool { - if !l.WorkerPool.IsEmpty() { - return false +// PushFunc will push data into the fifo +func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { + if fn != nil { + if err := fn(); err != nil { + return err + } } - return l.queue.Len() == 0 + return fifo.internal.LPush(data) } -// Shutdown this queue and stop processing -func (l *LevelQueue) Shutdown() { - l.lock.Lock() - defer l.lock.Unlock() - log.Trace("LevelQueue: %s Shutting down", l.name) - select { - case <-l.closed: - default: - close(l.closed) +// Pop pops data from the start of the fifo +func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { + data, err := fifo.internal.RPop() + if err != nil && err != levelqueue.ErrNotFound { + return nil, err } - log.Debug("LevelQueue: %s Shutdown", l.name) + return data, nil } -// Terminate this queue and close the queue -func (l *LevelQueue) Terminate() { - log.Trace("LevelQueue: %s Terminating", l.name) - l.Shutdown() - l.lock.Lock() - select { - case <-l.terminated: - l.lock.Unlock() - default: - close(l.terminated) - l.lock.Unlock() - if log.IsDebug() { - log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len()) - } - if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { - log.Error("Error whilst closing internal queue in %s: %v", l.name, err) - } - - } - log.Debug("LevelQueue: %s Terminated", l.name) +// Close this fifo +func (fifo *LevelQueueByteFIFO) Close() error { + return fifo.internal.Close() } -// Name returns the name of this queue -func (l *LevelQueue) Name() string { - return l.name +// Len returns the length of the fifo +func (fifo *LevelQueueByteFIFO) Len() int64 { + return fifo.internal.Len() } func init() { |