123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- // Copyright 2019 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package queue
-
- import (
- "context"
- "sync"
- "time"
-
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/setting"
- )
-
- // Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
- type Manager struct {
- mu sync.Mutex
-
- qidCounter int64
- Queues map[int64]ManagedWorkerPoolQueue
- }
-
- type ManagedWorkerPoolQueue interface {
- GetName() string
- GetType() string
- GetItemTypeName() string
- GetWorkerNumber() int
- GetWorkerActiveNumber() int
- GetWorkerMaxNumber() int
- SetWorkerMaxNumber(num int)
- GetQueueItemNumber() int
-
- // FlushWithContext tries to make the handler process all items in the queue synchronously.
- // It is for testing purpose only. It's not designed to be used in a cluster.
- FlushWithContext(ctx context.Context, timeout time.Duration) error
-
- // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
- RemoveAllItems(ctx context.Context) error
- }
-
- var manager *Manager
-
- func init() {
- manager = &Manager{
- Queues: make(map[int64]ManagedWorkerPoolQueue),
- }
- }
-
- func GetManager() *Manager {
- return manager
- }
-
- func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.qidCounter++
- m.Queues[m.qidCounter] = managed
- }
-
- func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
- m.mu.Lock()
- defer m.mu.Unlock()
- return m.Queues[qid]
- }
-
- func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
- for k, v := range m.Queues {
- queues[k] = v
- }
- return queues
- }
-
- // FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
- // It is for testing purpose only. It's not designed to be used in a cluster.
- func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
- var finalErr error
- qs := m.ManagedQueues()
- for _, q := range qs {
- if err := q.FlushWithContext(ctx, timeout); err != nil {
- finalErr = err // TODO: in Go 1.20: errors.Join
- }
- }
- return finalErr
- }
-
- // CreateSimpleQueue creates a simple queue from global setting config provider by name
- func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
- return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
- }
-
- // CreateUniqueQueue creates a unique queue from global setting config provider by name
- func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
- return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
- }
-
- func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
- queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
- if err != nil {
- log.Error("Failed to get queue settings for %q: %v", name, err)
- return nil
- }
- w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
- if err != nil {
- log.Error("Failed to create queue %q: %v", name, err)
- return nil
- }
- GetManager().AddManagedQueue(w)
- return w
- }
|