aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/manager.go
blob: 079e2bee7a7b949906516f1d966e9eae6d3f6961 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright 2019 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package queue

import (
	"context"
	"errors"
	"sync"
	"time"

	"code.gitea.io/gitea/modules/log"
	"code.gitea.io/gitea/modules/setting"
)

// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
type Manager struct {
	mu sync.Mutex

	qidCounter int64
	Queues     map[int64]ManagedWorkerPoolQueue
}

type ManagedWorkerPoolQueue interface {
	GetName() string
	GetType() string
	GetItemTypeName() string
	GetWorkerNumber() int
	GetWorkerActiveNumber() int
	GetWorkerMaxNumber() int
	SetWorkerMaxNumber(num int)
	GetQueueItemNumber() int

	// FlushWithContext tries to make the handler process all items in the queue synchronously.
	// It is for testing purpose only. It's not designed to be used in a cluster.
	// Negative timeout means discarding all items in the queue.
	FlushWithContext(ctx context.Context, timeout time.Duration) error

	// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
	RemoveAllItems(ctx context.Context) error
}

var manager *Manager

func init() {
	manager = &Manager{
		Queues: make(map[int64]ManagedWorkerPoolQueue),
	}
}

func GetManager() *Manager {
	return manager
}

func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.qidCounter++
	m.Queues[m.qidCounter] = managed
}

func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
	m.mu.Lock()
	defer m.mu.Unlock()
	return m.Queues[qid]
}

func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
	m.mu.Lock()
	defer m.mu.Unlock()

	queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
	for k, v := range m.Queues {
		queues[k] = v
	}
	return queues
}

// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
// It is for testing purpose only. It's not designed to be used in a cluster.
// Negative timeout means discarding all items in the queue.
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
	var finalErrors []error
	qs := m.ManagedQueues()
	for _, q := range qs {
		if err := q.FlushWithContext(ctx, timeout); err != nil {
			finalErrors = append(finalErrors, err)
		}
	}
	return errors.Join(finalErrors...)
}

// CreateSimpleQueue creates a simple queue from global setting config provider by name
func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
	return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
}

// CreateUniqueQueue creates a unique queue from global setting config provider by name
func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
	return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
}

func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
	queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
	if err != nil {
		log.Error("Failed to get queue settings for %q: %v", name, err)
		return nil
	}
	w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
	if err != nil {
		log.Error("Failed to create queue %q: %v", name, err)
		return nil
	}
	GetManager().AddManagedQueue(w)
	return w
}