123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- // Copyright 2019 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package queue
-
- import (
- "context"
- "fmt"
- "time"
- )
-
- // ErrInvalidConfiguration is called when there is invalid configuration for a queue
- type ErrInvalidConfiguration struct {
- cfg interface{}
- err error
- }
-
- func (err ErrInvalidConfiguration) Error() string {
- if err.err != nil {
- return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
- }
- return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
- }
-
- // IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
- func IsErrInvalidConfiguration(err error) bool {
- _, ok := err.(ErrInvalidConfiguration)
- return ok
- }
-
- // Type is a type of Queue
- type Type string
-
- // Data defines an type of queuable data
- type Data interface{}
-
- // HandlerFunc is a function that takes a variable amount of data and processes it
- type HandlerFunc func(...Data) (unhandled []Data)
-
- // NewQueueFunc is a function that creates a queue
- type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
-
- // Shutdownable represents a queue that can be shutdown
- type Shutdownable interface {
- Shutdown()
- Terminate()
- }
-
- // Named represents a queue with a name
- type Named interface {
- Name() string
- }
-
- // Queue defines an interface of a queue-like item
- //
- // Queues will handle their own contents in the Run method
- type Queue interface {
- Flushable
- Run(atShutdown, atTerminate func(func()))
- Push(Data) error
- }
-
- // PushBackable queues can be pushed back to
- type PushBackable interface {
- // PushBack pushes data back to the top of the fifo
- PushBack(Data) error
- }
-
- // DummyQueueType is the type for the dummy queue
- const DummyQueueType Type = "dummy"
-
- // NewDummyQueue creates a new DummyQueue
- func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
- return &DummyQueue{}, nil
- }
-
- // DummyQueue represents an empty queue
- type DummyQueue struct{}
-
- // Run does nothing
- func (*DummyQueue) Run(_, _ func(func())) {}
-
- // Push fakes a push of data to the queue
- func (*DummyQueue) Push(Data) error {
- return nil
- }
-
- // PushFunc fakes a push of data to the queue with a function. The function is never run.
- func (*DummyQueue) PushFunc(Data, func() error) error {
- return nil
- }
-
- // Has always returns false as this queue never does anything
- func (*DummyQueue) Has(Data) (bool, error) {
- return false, nil
- }
-
- // Flush always returns nil
- func (*DummyQueue) Flush(time.Duration) error {
- return nil
- }
-
- // FlushWithContext always returns nil
- func (*DummyQueue) FlushWithContext(context.Context) error {
- return nil
- }
-
- // IsEmpty asserts that the queue is empty
- func (*DummyQueue) IsEmpty() bool {
- return true
- }
-
- // ImmediateType is the type to execute the function when push
- const ImmediateType Type = "immediate"
-
- // NewImmediate creates a new false queue to execute the function when push
- func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
- return &Immediate{
- handler: handler,
- }, nil
- }
-
- // Immediate represents an direct execution queue
- type Immediate struct {
- handler HandlerFunc
- }
-
- // Run does nothing
- func (*Immediate) Run(_, _ func(func())) {}
-
- // Push fakes a push of data to the queue
- func (q *Immediate) Push(data Data) error {
- return q.PushFunc(data, nil)
- }
-
- // PushFunc fakes a push of data to the queue with a function. The function is never run.
- func (q *Immediate) PushFunc(data Data, f func() error) error {
- if f != nil {
- if err := f(); err != nil {
- return err
- }
- }
- q.handler(data)
- return nil
- }
-
- // Has always returns false as this queue never does anything
- func (*Immediate) Has(Data) (bool, error) {
- return false, nil
- }
-
- // Flush always returns nil
- func (*Immediate) Flush(time.Duration) error {
- return nil
- }
-
- // FlushWithContext always returns nil
- func (*Immediate) FlushWithContext(context.Context) error {
- return nil
- }
-
- // IsEmpty asserts that the queue is empty
- func (*Immediate) IsEmpty() bool {
- return true
- }
-
- var queuesMap = map[Type]NewQueueFunc{
- DummyQueueType: NewDummyQueue,
- ImmediateType: NewImmediate,
- }
-
- // RegisteredTypes provides the list of requested types of queues
- func RegisteredTypes() []Type {
- types := make([]Type, len(queuesMap))
- i := 0
- for key := range queuesMap {
- types[i] = key
- i++
- }
- return types
- }
-
- // RegisteredTypesAsString provides the list of requested types of queues
- func RegisteredTypesAsString() []string {
- types := make([]string, len(queuesMap))
- i := 0
- for key := range queuesMap {
- types[i] = string(key)
- i++
- }
- return types
- }
-
- // NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
- func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
- newFn, ok := queuesMap[queueType]
- if !ok {
- return nil, fmt.Errorf("unsupported queue type: %v", queueType)
- }
- return newFn(handlerFunc, opts, exemplar)
- }
|