You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. )
  9. // ErrInvalidConfiguration is called when there is invalid configuration for a queue
  10. type ErrInvalidConfiguration struct {
  11. cfg interface{}
  12. err error
  13. }
  14. func (err ErrInvalidConfiguration) Error() string {
  15. if err.err != nil {
  16. return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
  17. }
  18. return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
  19. }
  20. // IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
  21. func IsErrInvalidConfiguration(err error) bool {
  22. _, ok := err.(ErrInvalidConfiguration)
  23. return ok
  24. }
  25. // Type is a type of Queue
  26. type Type string
  27. // Data defines an type of queuable data
  28. type Data interface{}
  29. // HandlerFunc is a function that takes a variable amount of data and processes it
  30. type HandlerFunc func(...Data) (unhandled []Data)
  31. // NewQueueFunc is a function that creates a queue
  32. type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
  33. // Shutdownable represents a queue that can be shutdown
  34. type Shutdownable interface {
  35. Shutdown()
  36. Terminate()
  37. }
  38. // Named represents a queue with a name
  39. type Named interface {
  40. Name() string
  41. }
  42. // Queue defines an interface of a queue-like item
  43. //
  44. // Queues will handle their own contents in the Run method
  45. type Queue interface {
  46. Flushable
  47. Run(atShutdown, atTerminate func(func()))
  48. Push(Data) error
  49. }
  50. // PushBackable queues can be pushed back to
  51. type PushBackable interface {
  52. // PushBack pushes data back to the top of the fifo
  53. PushBack(Data) error
  54. }
  55. // DummyQueueType is the type for the dummy queue
  56. const DummyQueueType Type = "dummy"
  57. // NewDummyQueue creates a new DummyQueue
  58. func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
  59. return &DummyQueue{}, nil
  60. }
  61. // DummyQueue represents an empty queue
  62. type DummyQueue struct{}
  63. // Run does nothing
  64. func (*DummyQueue) Run(_, _ func(func())) {}
  65. // Push fakes a push of data to the queue
  66. func (*DummyQueue) Push(Data) error {
  67. return nil
  68. }
  69. // PushFunc fakes a push of data to the queue with a function. The function is never run.
  70. func (*DummyQueue) PushFunc(Data, func() error) error {
  71. return nil
  72. }
  73. // Has always returns false as this queue never does anything
  74. func (*DummyQueue) Has(Data) (bool, error) {
  75. return false, nil
  76. }
  77. // Flush always returns nil
  78. func (*DummyQueue) Flush(time.Duration) error {
  79. return nil
  80. }
  81. // FlushWithContext always returns nil
  82. func (*DummyQueue) FlushWithContext(context.Context) error {
  83. return nil
  84. }
  85. // IsEmpty asserts that the queue is empty
  86. func (*DummyQueue) IsEmpty() bool {
  87. return true
  88. }
  89. // ImmediateType is the type to execute the function when push
  90. const ImmediateType Type = "immediate"
  91. // NewImmediate creates a new false queue to execute the function when push
  92. func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
  93. return &Immediate{
  94. handler: handler,
  95. }, nil
  96. }
  97. // Immediate represents an direct execution queue
  98. type Immediate struct {
  99. handler HandlerFunc
  100. }
  101. // Run does nothing
  102. func (*Immediate) Run(_, _ func(func())) {}
  103. // Push fakes a push of data to the queue
  104. func (q *Immediate) Push(data Data) error {
  105. return q.PushFunc(data, nil)
  106. }
  107. // PushFunc fakes a push of data to the queue with a function. The function is never run.
  108. func (q *Immediate) PushFunc(data Data, f func() error) error {
  109. if f != nil {
  110. if err := f(); err != nil {
  111. return err
  112. }
  113. }
  114. q.handler(data)
  115. return nil
  116. }
  117. // Has always returns false as this queue never does anything
  118. func (*Immediate) Has(Data) (bool, error) {
  119. return false, nil
  120. }
  121. // Flush always returns nil
  122. func (*Immediate) Flush(time.Duration) error {
  123. return nil
  124. }
  125. // FlushWithContext always returns nil
  126. func (*Immediate) FlushWithContext(context.Context) error {
  127. return nil
  128. }
  129. // IsEmpty asserts that the queue is empty
  130. func (*Immediate) IsEmpty() bool {
  131. return true
  132. }
  133. var queuesMap = map[Type]NewQueueFunc{
  134. DummyQueueType: NewDummyQueue,
  135. ImmediateType: NewImmediate,
  136. }
  137. // RegisteredTypes provides the list of requested types of queues
  138. func RegisteredTypes() []Type {
  139. types := make([]Type, len(queuesMap))
  140. i := 0
  141. for key := range queuesMap {
  142. types[i] = key
  143. i++
  144. }
  145. return types
  146. }
  147. // RegisteredTypesAsString provides the list of requested types of queues
  148. func RegisteredTypesAsString() []string {
  149. types := make([]string, len(queuesMap))
  150. i := 0
  151. for key := range queuesMap {
  152. types[i] = string(key)
  153. i++
  154. }
  155. return types
  156. }
  157. // NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
  158. func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
  159. newFn, ok := queuesMap[queueType]
  160. if !ok {
  161. return nil, fmt.Errorf("unsupported queue type: %v", queueType)
  162. }
  163. return newFn(handlerFunc, opts, exemplar)
  164. }