You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 3.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "fmt"
  8. "time"
  9. )
  10. // ErrInvalidConfiguration is called when there is invalid configuration for a queue
  11. type ErrInvalidConfiguration struct {
  12. cfg interface{}
  13. err error
  14. }
  15. func (err ErrInvalidConfiguration) Error() string {
  16. if err.err != nil {
  17. return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
  18. }
  19. return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
  20. }
  21. // IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
  22. func IsErrInvalidConfiguration(err error) bool {
  23. _, ok := err.(ErrInvalidConfiguration)
  24. return ok
  25. }
  26. // Type is a type of Queue
  27. type Type string
  28. // Data defines an type of queuable data
  29. type Data interface{}
  30. // HandlerFunc is a function that takes a variable amount of data and processes it
  31. type HandlerFunc func(...Data)
  32. // NewQueueFunc is a function that creates a queue
  33. type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
  34. // Shutdownable represents a queue that can be shutdown
  35. type Shutdownable interface {
  36. Shutdown()
  37. Terminate()
  38. }
  39. // Named represents a queue with a name
  40. type Named interface {
  41. Name() string
  42. }
  43. // Queue defines an interface of a queue-like item
  44. //
  45. // Queues will handle their own contents in the Run method
  46. type Queue interface {
  47. Flushable
  48. Run(atShutdown, atTerminate func(context.Context, func()))
  49. Push(Data) error
  50. }
  51. // DummyQueueType is the type for the dummy queue
  52. const DummyQueueType Type = "dummy"
  53. // NewDummyQueue creates a new DummyQueue
  54. func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
  55. return &DummyQueue{}, nil
  56. }
  57. // DummyQueue represents an empty queue
  58. type DummyQueue struct {
  59. }
  60. // Run does nothing
  61. func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
  62. // Push fakes a push of data to the queue
  63. func (*DummyQueue) Push(Data) error {
  64. return nil
  65. }
  66. // PushFunc fakes a push of data to the queue with a function. The function is never run.
  67. func (*DummyQueue) PushFunc(Data, func() error) error {
  68. return nil
  69. }
  70. // Has always returns false as this queue never does anything
  71. func (*DummyQueue) Has(Data) (bool, error) {
  72. return false, nil
  73. }
  74. // Flush always returns nil
  75. func (*DummyQueue) Flush(time.Duration) error {
  76. return nil
  77. }
  78. // FlushWithContext always returns nil
  79. func (*DummyQueue) FlushWithContext(context.Context) error {
  80. return nil
  81. }
  82. // IsEmpty asserts that the queue is empty
  83. func (*DummyQueue) IsEmpty() bool {
  84. return true
  85. }
  86. var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
  87. // RegisteredTypes provides the list of requested types of queues
  88. func RegisteredTypes() []Type {
  89. types := make([]Type, len(queuesMap))
  90. i := 0
  91. for key := range queuesMap {
  92. types[i] = key
  93. i++
  94. }
  95. return types
  96. }
  97. // RegisteredTypesAsString provides the list of requested types of queues
  98. func RegisteredTypesAsString() []string {
  99. types := make([]string, len(queuesMap))
  100. i := 0
  101. for key := range queuesMap {
  102. types[i] = string(key)
  103. i++
  104. }
  105. return types
  106. }
  107. // NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
  108. func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
  109. newFn, ok := queuesMap[queueType]
  110. if !ok {
  111. return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
  112. }
  113. return newFn(handlerFunc, opts, exemplar)
  114. }