You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue.go 2.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Copyright 2016 The Gogs Authors. All rights reserved.
  2. // Copyright 2019 The Gitea Authors. All rights reserved.
  3. // Use of this source code is governed by a MIT-style
  4. // license that can be found in the LICENSE file.
  5. package sync
  6. // UniqueQueue is a queue which guarantees only one instance of same
  7. // identity is in the line. Instances with same identity will be
  8. // discarded if there is already one in the line.
  9. //
  10. // This queue is particularly useful for preventing duplicated task
  11. // of same purpose.
  12. type UniqueQueue struct {
  13. table *StatusTable
  14. queue chan string
  15. closed chan struct{}
  16. }
  17. // NewUniqueQueue initializes and returns a new UniqueQueue object.
  18. func NewUniqueQueue(queueLength int) *UniqueQueue {
  19. if queueLength <= 0 {
  20. queueLength = 100
  21. }
  22. return &UniqueQueue{
  23. table: NewStatusTable(),
  24. queue: make(chan string, queueLength),
  25. closed: make(chan struct{}),
  26. }
  27. }
  28. // Close closes this queue
  29. func (q *UniqueQueue) Close() {
  30. select {
  31. case <-q.closed:
  32. default:
  33. q.table.lock.Lock()
  34. select {
  35. case <-q.closed:
  36. default:
  37. close(q.closed)
  38. }
  39. q.table.lock.Unlock()
  40. }
  41. }
  42. // IsClosed returns a channel that is closed when this Queue is closed
  43. func (q *UniqueQueue) IsClosed() <-chan struct{} {
  44. return q.closed
  45. }
  46. // IDs returns the current ids in the pool
  47. func (q *UniqueQueue) IDs() []string {
  48. q.table.lock.Lock()
  49. defer q.table.lock.Unlock()
  50. ids := make([]string, 0, len(q.table.pool))
  51. for id := range q.table.pool {
  52. ids = append(ids, id)
  53. }
  54. return ids
  55. }
  56. // Queue returns channel of queue for retrieving instances.
  57. func (q *UniqueQueue) Queue() <-chan string {
  58. return q.queue
  59. }
  60. // Exist returns true if there is an instance with given identity
  61. // exists in the queue.
  62. func (q *UniqueQueue) Exist(id string) bool {
  63. return q.table.IsRunning(id)
  64. }
  65. // AddFunc adds new instance to the queue with a custom runnable function,
  66. // the queue is blocked until the function exits.
  67. func (q *UniqueQueue) AddFunc(id string, fn func()) {
  68. q.table.lock.Lock()
  69. if _, ok := q.table.pool[id]; ok {
  70. q.table.lock.Unlock()
  71. return
  72. }
  73. q.table.pool[id] = struct{}{}
  74. if fn != nil {
  75. fn()
  76. }
  77. q.table.lock.Unlock()
  78. select {
  79. case <-q.closed:
  80. return
  81. case q.queue <- id:
  82. return
  83. }
  84. }
  85. // Add adds new instance to the queue.
  86. func (q *UniqueQueue) Add(id string) {
  87. q.AddFunc(id, nil)
  88. }
  89. // Remove removes instance from the queue.
  90. func (q *UniqueQueue) Remove(id string) {
  91. q.table.Stop(id)
  92. }