You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue.go 2.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Copyright 2016 The Gogs Authors. All rights reserved.
  2. // Copyright 2019 The Gitea Authors. All rights reserved.
  3. // Use of this source code is governed by a MIT-style
  4. // license that can be found in the LICENSE file.
  5. package sync
  6. import (
  7. "github.com/unknwon/com"
  8. )
  9. // UniqueQueue is a queue which guarantees only one instance of same
  10. // identity is in the line. Instances with same identity will be
  11. // discarded if there is already one in the line.
  12. //
  13. // This queue is particularly useful for preventing duplicated task
  14. // of same purpose.
  15. type UniqueQueue struct {
  16. table *StatusTable
  17. queue chan string
  18. closed chan struct{}
  19. }
  20. // NewUniqueQueue initializes and returns a new UniqueQueue object.
  21. func NewUniqueQueue(queueLength int) *UniqueQueue {
  22. if queueLength <= 0 {
  23. queueLength = 100
  24. }
  25. return &UniqueQueue{
  26. table: NewStatusTable(),
  27. queue: make(chan string, queueLength),
  28. closed: make(chan struct{}),
  29. }
  30. }
  31. // Close closes this queue
  32. func (q *UniqueQueue) Close() {
  33. select {
  34. case <-q.closed:
  35. default:
  36. q.table.lock.Lock()
  37. select {
  38. case <-q.closed:
  39. default:
  40. close(q.closed)
  41. }
  42. q.table.lock.Unlock()
  43. }
  44. }
  45. // IsClosed returns a channel that is closed when this Queue is closed
  46. func (q *UniqueQueue) IsClosed() <-chan struct{} {
  47. return q.closed
  48. }
  49. // IDs returns the current ids in the pool
  50. func (q *UniqueQueue) IDs() []interface{} {
  51. q.table.lock.Lock()
  52. defer q.table.lock.Unlock()
  53. ids := make([]interface{}, 0, len(q.table.pool))
  54. for id := range q.table.pool {
  55. ids = append(ids, id)
  56. }
  57. return ids
  58. }
  59. // Queue returns channel of queue for retrieving instances.
  60. func (q *UniqueQueue) Queue() <-chan string {
  61. return q.queue
  62. }
  63. // Exist returns true if there is an instance with given identity
  64. // exists in the queue.
  65. func (q *UniqueQueue) Exist(id interface{}) bool {
  66. return q.table.IsRunning(com.ToStr(id))
  67. }
  68. // AddFunc adds new instance to the queue with a custom runnable function,
  69. // the queue is blocked until the function exits.
  70. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) {
  71. idStr := com.ToStr(id)
  72. q.table.lock.Lock()
  73. if _, ok := q.table.pool[idStr]; ok {
  74. q.table.lock.Unlock()
  75. return
  76. }
  77. q.table.pool[idStr] = struct{}{}
  78. if fn != nil {
  79. fn()
  80. }
  81. q.table.lock.Unlock()
  82. select {
  83. case <-q.closed:
  84. return
  85. case q.queue <- idStr:
  86. return
  87. }
  88. }
  89. // Add adds new instance to the queue.
  90. func (q *UniqueQueue) Add(id interface{}) {
  91. q.AddFunc(id, nil)
  92. }
  93. // Remove removes instance from the queue.
  94. func (q *UniqueQueue) Remove(id interface{}) {
  95. q.table.Stop(com.ToStr(id))
  96. }