var archiveInProgress []*ArchiveRequest
var archiveMutex sync.Mutex
+// These facilitate testing, by allowing the unit tests to control (to some extent)
+// the goroutine used for processing the queue.
+var archiveQueueMutex *sync.Mutex
+var archiveQueueStartCond *sync.Cond
+var archiveQueueReleaseCond *sync.Cond
+
// GetArchivePath returns the path from which we can serve this archive.
func (aReq *ArchiveRequest) GetArchivePath() string {
return aReq.archivePath
archiveInProgress = append(archiveInProgress, request)
archiveMutex.Unlock()
+ // Wait to start, if we have the Cond for it. This is currently only
+ // useful for testing, so that the start and release of queued entries
+ // can be controlled to examine the queue.
+ if archiveQueueStartCond != nil {
+ archiveQueueMutex.Lock()
+ archiveQueueStartCond.Wait()
+ archiveQueueMutex.Unlock()
+ }
+
// Drop the mutex while we process the request. This may take a long
// time, and it's not necessary now that we've added the reequest to
// archiveInProgress.
doArchive(request)
+ if archiveQueueReleaseCond != nil {
+ archiveQueueMutex.Lock()
+ archiveQueueReleaseCond.Wait()
+ archiveQueueMutex.Unlock()
+ }
+
// Purge this request from the list. To do so, we'll just take the
// index at which we ended up at and swap the final element into that
// position, then chop off the now-redundant final element. The slice
import (
"path/filepath"
+ "sync"
"testing"
"time"
"github.com/unknwon/com"
)
+var queueMutex sync.Mutex
+
func TestMain(m *testing.M) {
models.MainTest(m, filepath.Join("..", ".."))
+
+ archiveQueueMutex = &queueMutex
+ archiveQueueStartCond = sync.NewCond(&queueMutex)
+ archiveQueueReleaseCond = sync.NewCond(&queueMutex)
+}
+
+func allComplete(inFlight []*ArchiveRequest) bool {
+ for _, req := range inFlight {
+ if !req.IsComplete() {
+ return false
+ }
+ }
+
+ return true
+}
+
+func releaseOneEntry(t *testing.T, inFlight []*ArchiveRequest) {
+ var nowQueued, numQueued int
+
+ numQueued = len(archiveInProgress)
+
+ // Release one, then wait up to 3 seconds for it to complete.
+ archiveQueueReleaseCond.Signal()
+ timeout := time.Now().Add(3 * time.Second)
+ for {
+ nowQueued = len(archiveInProgress)
+ if nowQueued != numQueued || time.Now().After(timeout) {
+ break
+ }
+ }
+
+ // Make sure we didn't just timeout.
+ assert.NotEqual(t, nowQueued, numQueued)
+
+ // Also make sure that we released only one.
+ assert.Equal(t, nowQueued, numQueued + 1)
}
func TestArchive_Basic(t *testing.T) {
secondReq := DeriveRequestFrom(ctx, secondCommit+".zip")
assert.NotNil(t, secondReq)
+ inFlight := make([]*ArchiveRequest, 3)
+ inFlight[0] = zipReq
+ inFlight[1] = tgzReq
+ inFlight[2] = secondReq
+
ArchiveRepository(zipReq)
+ assert.Equal(t, len(archiveInProgress), 1)
ArchiveRepository(tgzReq)
+ assert.Equal(t, len(archiveInProgress), 2)
ArchiveRepository(secondReq)
+ assert.Equal(t, len(archiveInProgress), 3)
+
+ // Make sure sending an unprocessed request through doesn't affect the queue
+ // count.
+ ArchiveRepository(zipReq)
+ assert.Equal(t, len(archiveInProgress), 3)
+
+ // Release them all, they'll then stall at the archiveQueueReleaseCond while
+ // we examine the queue state.
+ archiveQueueStartCond.Broadcast()
- // Wait for those requests to complete, time out after 8 seconds.
+ // 8 second timeout for them all to complete.
timeout := time.Now().Add(8 * time.Second)
for {
- if zipReq.IsComplete() && tgzReq.IsComplete() && secondReq.IsComplete() {
+ if allComplete(inFlight) {
break
} else if time.Now().After(timeout) {
break
assert.True(t, com.IsExist(tgzReq.GetArchivePath()))
assert.True(t, com.IsExist(secondReq.GetArchivePath()))
- // The queue should also be drained, if all requests have completed.
- assert.Equal(t, len(archiveInProgress), 0)
+ // Queues should not have drained yet, because we haven't released them.
+ // Do so now.
+ assert.Equal(t, len(archiveInProgress), 3)
zipReq2 := DeriveRequestFrom(ctx, firstCommit+".zip")
// After completion, zipReq should have dropped out of the queue. Make sure
assert.Equal(t, zipReq, zipReq2)
assert.False(t, zipReq == zipReq2)
- // Make sure we can submit this follow-up request with no side-effects, to
- // the extent that we can.
+ // We still have the other three stalled at completion, waiting to remove
+ // from archiveInProgress. Try to submit this new one before its
+ // predecessor has cleared out of the queue.
ArchiveRepository(zipReq2)
- assert.Equal(t, zipReq, zipReq2)
+
+ // Make sure we didn't enqueue anything from this new one, and that the
+ // queue hasn't changed.
+ assert.Equal(t, len(archiveInProgress), 3)
+
+ for _, req := range archiveInProgress {
+ assert.False(t, req == zipReq2)
+ }
+
+ // Make sure the queue drains properly
+ releaseOneEntry(t, inFlight)
+ assert.Equal(t, len(archiveInProgress), 2)
+ releaseOneEntry(t, inFlight)
+ assert.Equal(t, len(archiveInProgress), 1)
+ releaseOneEntry(t, inFlight)
assert.Equal(t, len(archiveInProgress), 0)
// Same commit, different compression formats should have different names.