summaryrefslogtreecommitdiffstats
path: root/services/archiver/archiver_test.go
blob: 84bab148182a3760f00b846dbff065a6ea8edfc1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package archiver

import (
	"path/filepath"
	"sync"
	"testing"
	"time"

	"code.gitea.io/gitea/models"
	"code.gitea.io/gitea/modules/test"
	"code.gitea.io/gitea/modules/util"

	"github.com/stretchr/testify/assert"
)

var queueMutex sync.Mutex

func TestMain(m *testing.M) {
	models.MainTest(m, filepath.Join("..", ".."))
}

func waitForCount(t *testing.T, num int) {
	var numQueued int

	// Wait for up to 10 seconds for the queue to be impacted.
	timeout := time.Now().Add(10 * time.Second)
	for {
		numQueued = len(archiveInProgress)
		if numQueued == num || time.Now().After(timeout) {
			break
		}
	}

	assert.Equal(t, num, len(archiveInProgress))
}

func releaseOneEntry(t *testing.T, inFlight []*ArchiveRequest) {
	var nowQueued, numQueued int

	numQueued = len(archiveInProgress)

	// Release one, then wait up to 10 seconds for it to complete.
	queueMutex.Lock()
	archiveQueueReleaseCond.Signal()
	queueMutex.Unlock()
	timeout := time.Now().Add(10 * time.Second)
	for {
		nowQueued = len(archiveInProgress)
		if nowQueued != numQueued || time.Now().After(timeout) {
			break
		}
	}

	// Make sure we didn't just timeout.
	assert.NotEqual(t, numQueued, nowQueued)

	// Also make sure that we released only one.
	assert.Equal(t, numQueued-1, nowQueued)
}

func TestArchive_Basic(t *testing.T) {
	assert.NoError(t, models.PrepareTestDatabase())

	archiveQueueMutex = &queueMutex
	archiveQueueStartCond = sync.NewCond(&queueMutex)
	archiveQueueReleaseCond = sync.NewCond(&queueMutex)
	defer func() {
		archiveQueueMutex = nil
		archiveQueueStartCond = nil
		archiveQueueReleaseCond = nil
	}()

	ctx := test.MockContext(t, "user27/repo49")
	firstCommit, secondCommit := "51f84af23134", "aacbdfe9e1c4"

	bogusReq := DeriveRequestFrom(ctx, firstCommit+".zip")
	assert.Nil(t, bogusReq)

	test.LoadRepo(t, ctx, 49)
	bogusReq = DeriveRequestFrom(ctx, firstCommit+".zip")
	assert.Nil(t, bogusReq)

	test.LoadGitRepo(t, ctx)
	defer ctx.Repo.GitRepo.Close()

	// Check a series of bogus requests.
	// Step 1, valid commit with a bad extension.
	bogusReq = DeriveRequestFrom(ctx, firstCommit+".dilbert")
	assert.Nil(t, bogusReq)

	// Step 2, missing commit.
	bogusReq = DeriveRequestFrom(ctx, "dbffff.zip")
	assert.Nil(t, bogusReq)

	// Step 3, doesn't look like branch/tag/commit.
	bogusReq = DeriveRequestFrom(ctx, "db.zip")
	assert.Nil(t, bogusReq)

	// Now two valid requests, firstCommit with valid extensions.
	zipReq := DeriveRequestFrom(ctx, firstCommit+".zip")
	assert.NotNil(t, zipReq)

	tgzReq := DeriveRequestFrom(ctx, firstCommit+".tar.gz")
	assert.NotNil(t, tgzReq)

	secondReq := DeriveRequestFrom(ctx, secondCommit+".zip")
	assert.NotNil(t, secondReq)

	inFlight := make([]*ArchiveRequest, 3)
	inFlight[0] = zipReq
	inFlight[1] = tgzReq
	inFlight[2] = secondReq

	ArchiveRepository(zipReq)
	waitForCount(t, 1)
	ArchiveRepository(tgzReq)
	waitForCount(t, 2)
	ArchiveRepository(secondReq)
	waitForCount(t, 3)

	// Make sure sending an unprocessed request through doesn't affect the queue
	// count.
	ArchiveRepository(zipReq)

	// Sleep two seconds to make sure the queue doesn't change.
	time.Sleep(2 * time.Second)
	assert.Equal(t, 3, len(archiveInProgress))

	// Release them all, they'll then stall at the archiveQueueReleaseCond while
	// we examine the queue state.
	queueMutex.Lock()
	archiveQueueStartCond.Broadcast()
	queueMutex.Unlock()

	// Iterate through all of the in-flight requests and wait for their
	// completion.
	for _, req := range inFlight {
		req.WaitForCompletion(ctx)
	}

	for _, req := range inFlight {
		assert.True(t, req.IsComplete())
		exist, err := util.IsExist(req.GetArchivePath())
		assert.NoError(t, err)
		assert.True(t, exist)
	}

	arbitraryReq := inFlight[0]
	// Reopen the channel so we don't double-close, mark it incomplete.  We're
	// going to run it back through the archiver, and it should get marked
	// complete again.
	arbitraryReq.cchan = make(chan struct{})
	arbitraryReq.archiveComplete = false
	doArchive(arbitraryReq)
	assert.True(t, arbitraryReq.IsComplete())

	// Queues should not have drained yet, because we haven't released them.
	// Do so now.
	assert.Equal(t, 3, len(archiveInProgress))

	zipReq2 := DeriveRequestFrom(ctx, firstCommit+".zip")
	// This zipReq should match what's sitting in the queue, as we haven't
	// let it release yet.  From the consumer's point of view, this looks like
	// a long-running archive task.
	assert.Equal(t, zipReq, zipReq2)

	// We still have the other three stalled at completion, waiting to remove
	// from archiveInProgress.  Try to submit this new one before its
	// predecessor has cleared out of the queue.
	ArchiveRepository(zipReq2)

	// Make sure the queue hasn't grown any.
	assert.Equal(t, 3, len(archiveInProgress))

	// Make sure the queue drains properly
	releaseOneEntry(t, inFlight)
	assert.Equal(t, 2, len(archiveInProgress))
	releaseOneEntry(t, inFlight)
	assert.Equal(t, 1, len(archiveInProgress))
	releaseOneEntry(t, inFlight)
	assert.Equal(t, 0, len(archiveInProgress))

	// Now we'll submit a request and TimedWaitForCompletion twice, before and
	// after we release it.  We should trigger both the timeout and non-timeout
	// cases.
	var completed, timedout bool
	timedReq := DeriveRequestFrom(ctx, secondCommit+".tar.gz")
	assert.NotNil(t, timedReq)
	ArchiveRepository(timedReq)

	// Guaranteed to timeout; we haven't signalled the request to start..
	completed, timedout = timedReq.TimedWaitForCompletion(ctx, 2*time.Second)
	assert.Equal(t, false, completed)
	assert.Equal(t, true, timedout)

	queueMutex.Lock()
	archiveQueueStartCond.Broadcast()
	queueMutex.Unlock()

	// Shouldn't timeout, we've now signalled it and it's a small request.
	completed, timedout = timedReq.TimedWaitForCompletion(ctx, 15*time.Second)
	assert.Equal(t, true, completed)
	assert.Equal(t, false, timedout)

	zipReq2 = DeriveRequestFrom(ctx, firstCommit+".zip")
	// Now, we're guaranteed to have released the original zipReq from the queue.
	// Ensure that we don't get handed back the released entry somehow, but they
	// should remain functionally equivalent in all fields.  The exception here
	// is zipReq.cchan, which will be non-nil because it's a completed request.
	// It's fine to go ahead and set it to nil now.
	zipReq.cchan = nil
	assert.Equal(t, zipReq, zipReq2)
	assert.False(t, zipReq == zipReq2)

	// Same commit, different compression formats should have different names.
	// Ideally, the extension would match what we originally requested.
	assert.NotEqual(t, zipReq.GetArchiveName(), tgzReq.GetArchiveName())
	assert.NotEqual(t, zipReq.GetArchiveName(), secondReq.GetArchiveName())
}