You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk_test.go 3.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "io/ioutil"
  8. "os"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. )
  13. func TestLevelQueue(t *testing.T) {
  14. handleChan := make(chan *testData)
  15. handle := func(data ...Data) {
  16. assert.True(t, len(data) == 2)
  17. for _, datum := range data {
  18. testDatum := datum.(*testData)
  19. handleChan <- testDatum
  20. }
  21. }
  22. queueShutdown := []func(){}
  23. queueTerminate := []func(){}
  24. tmpDir, err := ioutil.TempDir("", "level-queue-test-data")
  25. assert.NoError(t, err)
  26. defer os.RemoveAll(tmpDir)
  27. queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
  28. DataDir: tmpDir,
  29. BatchLength: 2,
  30. Workers: 1,
  31. MaxWorkers: 10,
  32. QueueLength: 20,
  33. BlockTimeout: 1 * time.Second,
  34. BoostTimeout: 5 * time.Minute,
  35. BoostWorkers: 5,
  36. }, &testData{})
  37. assert.NoError(t, err)
  38. go queue.Run(func(_ context.Context, shutdown func()) {
  39. queueShutdown = append(queueShutdown, shutdown)
  40. }, func(_ context.Context, terminate func()) {
  41. queueTerminate = append(queueTerminate, terminate)
  42. })
  43. test1 := testData{"A", 1}
  44. test2 := testData{"B", 2}
  45. err = queue.Push(&test1)
  46. assert.NoError(t, err)
  47. go func() {
  48. err = queue.Push(&test2)
  49. assert.NoError(t, err)
  50. }()
  51. result1 := <-handleChan
  52. assert.Equal(t, test1.TestString, result1.TestString)
  53. assert.Equal(t, test1.TestInt, result1.TestInt)
  54. result2 := <-handleChan
  55. assert.Equal(t, test2.TestString, result2.TestString)
  56. assert.Equal(t, test2.TestInt, result2.TestInt)
  57. err = queue.Push(test1)
  58. assert.Error(t, err)
  59. for _, callback := range queueShutdown {
  60. callback()
  61. }
  62. time.Sleep(200 * time.Millisecond)
  63. err = queue.Push(&test1)
  64. assert.NoError(t, err)
  65. err = queue.Push(&test2)
  66. assert.NoError(t, err)
  67. select {
  68. case <-handleChan:
  69. assert.Fail(t, "Handler processing should have stopped")
  70. default:
  71. }
  72. for _, callback := range queueTerminate {
  73. callback()
  74. }
  75. // Reopen queue
  76. queue, err = NewWrappedQueue(handle,
  77. WrappedQueueConfiguration{
  78. Underlying: LevelQueueType,
  79. Config: LevelQueueConfiguration{
  80. DataDir: tmpDir,
  81. BatchLength: 2,
  82. Workers: 1,
  83. MaxWorkers: 10,
  84. QueueLength: 20,
  85. BlockTimeout: 1 * time.Second,
  86. BoostTimeout: 5 * time.Minute,
  87. BoostWorkers: 5,
  88. },
  89. }, &testData{})
  90. assert.NoError(t, err)
  91. go queue.Run(func(_ context.Context, shutdown func()) {
  92. queueShutdown = append(queueShutdown, shutdown)
  93. }, func(_ context.Context, terminate func()) {
  94. queueTerminate = append(queueTerminate, terminate)
  95. })
  96. result3 := <-handleChan
  97. assert.Equal(t, test1.TestString, result3.TestString)
  98. assert.Equal(t, test1.TestInt, result3.TestInt)
  99. result4 := <-handleChan
  100. assert.Equal(t, test2.TestString, result4.TestString)
  101. assert.Equal(t, test2.TestInt, result4.TestInt)
  102. for _, callback := range queueShutdown {
  103. callback()
  104. }
  105. for _, callback := range queueTerminate {
  106. callback()
  107. }
  108. }