You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk_test.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "io/ioutil"
  8. "os"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. )
  14. func TestLevelQueue(t *testing.T) {
  15. handleChan := make(chan *testData)
  16. handle := func(data ...Data) {
  17. assert.True(t, len(data) == 2)
  18. for _, datum := range data {
  19. testDatum := datum.(*testData)
  20. handleChan <- testDatum
  21. }
  22. }
  23. var lock sync.Mutex
  24. queueShutdown := []func(){}
  25. queueTerminate := []func(){}
  26. tmpDir, err := ioutil.TempDir("", "level-queue-test-data")
  27. assert.NoError(t, err)
  28. defer os.RemoveAll(tmpDir)
  29. queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
  30. ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
  31. WorkerPoolConfiguration: WorkerPoolConfiguration{
  32. QueueLength: 20,
  33. BatchLength: 2,
  34. BlockTimeout: 1 * time.Second,
  35. BoostTimeout: 5 * time.Minute,
  36. BoostWorkers: 5,
  37. MaxWorkers: 10,
  38. },
  39. Workers: 1,
  40. },
  41. DataDir: tmpDir,
  42. }, &testData{})
  43. assert.NoError(t, err)
  44. go queue.Run(func(_ context.Context, shutdown func()) {
  45. lock.Lock()
  46. queueShutdown = append(queueShutdown, shutdown)
  47. lock.Unlock()
  48. }, func(_ context.Context, terminate func()) {
  49. lock.Lock()
  50. queueTerminate = append(queueTerminate, terminate)
  51. lock.Unlock()
  52. })
  53. test1 := testData{"A", 1}
  54. test2 := testData{"B", 2}
  55. err = queue.Push(&test1)
  56. assert.NoError(t, err)
  57. go func() {
  58. err = queue.Push(&test2)
  59. assert.NoError(t, err)
  60. }()
  61. result1 := <-handleChan
  62. assert.Equal(t, test1.TestString, result1.TestString)
  63. assert.Equal(t, test1.TestInt, result1.TestInt)
  64. result2 := <-handleChan
  65. assert.Equal(t, test2.TestString, result2.TestString)
  66. assert.Equal(t, test2.TestInt, result2.TestInt)
  67. err = queue.Push(test1)
  68. assert.Error(t, err)
  69. lock.Lock()
  70. for _, callback := range queueShutdown {
  71. callback()
  72. }
  73. lock.Unlock()
  74. time.Sleep(200 * time.Millisecond)
  75. err = queue.Push(&test1)
  76. assert.NoError(t, err)
  77. err = queue.Push(&test2)
  78. assert.NoError(t, err)
  79. select {
  80. case <-handleChan:
  81. assert.Fail(t, "Handler processing should have stopped")
  82. default:
  83. }
  84. lock.Lock()
  85. for _, callback := range queueTerminate {
  86. callback()
  87. }
  88. lock.Unlock()
  89. // Reopen queue
  90. queue, err = NewWrappedQueue(handle,
  91. WrappedQueueConfiguration{
  92. Underlying: LevelQueueType,
  93. Config: LevelQueueConfiguration{
  94. ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
  95. WorkerPoolConfiguration: WorkerPoolConfiguration{
  96. QueueLength: 20,
  97. BatchLength: 2,
  98. BlockTimeout: 1 * time.Second,
  99. BoostTimeout: 5 * time.Minute,
  100. BoostWorkers: 5,
  101. MaxWorkers: 10,
  102. },
  103. Workers: 1,
  104. },
  105. DataDir: tmpDir,
  106. },
  107. }, &testData{})
  108. assert.NoError(t, err)
  109. go queue.Run(func(_ context.Context, shutdown func()) {
  110. lock.Lock()
  111. queueShutdown = append(queueShutdown, shutdown)
  112. lock.Unlock()
  113. }, func(_ context.Context, terminate func()) {
  114. lock.Lock()
  115. queueTerminate = append(queueTerminate, terminate)
  116. lock.Unlock()
  117. })
  118. result3 := <-handleChan
  119. assert.Equal(t, test1.TestString, result3.TestString)
  120. assert.Equal(t, test1.TestInt, result3.TestInt)
  121. result4 := <-handleChan
  122. assert.Equal(t, test2.TestString, result4.TestString)
  123. assert.Equal(t, test2.TestInt, result4.TestInt)
  124. lock.Lock()
  125. for _, callback := range queueShutdown {
  126. callback()
  127. }
  128. for _, callback := range queueTerminate {
  129. callback()
  130. }
  131. lock.Unlock()
  132. }