summaryrefslogtreecommitdiffstats
path: root/services/archiver/archiver.go
blob: 359fc8b627d9164e135544fc7870feaff26ae2fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// Copyright 2020 The Gitea Authors.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package archiver

import (
	"io"
	"io/ioutil"
	"os"
	"path"
	"regexp"
	"strings"
	"sync"
	"time"

	"code.gitea.io/gitea/modules/base"
	"code.gitea.io/gitea/modules/context"
	"code.gitea.io/gitea/modules/git"
	"code.gitea.io/gitea/modules/graceful"
	"code.gitea.io/gitea/modules/log"
	"code.gitea.io/gitea/modules/setting"
	"code.gitea.io/gitea/modules/util"
)

// ArchiveRequest defines the parameters of an archive request, which notably
// includes the specific repository being archived as well as the commit, the
// name by which it was requested, and the kind of archive being requested.
// This is entirely opaque to external entities, though, and mostly used as a
// handle elsewhere.
type ArchiveRequest struct {
	uri             string
	repo            *git.Repository
	refName         string
	ext             string
	archivePath     string
	archiveType     git.ArchiveType
	archiveComplete bool
	commit          *git.Commit
	cchan           chan struct{}
}

var archiveInProgress []*ArchiveRequest
var archiveMutex sync.Mutex

// SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all
// the way to 64.
var shaRegex = regexp.MustCompile(`^[0-9a-f]{4,64}$`)

// These facilitate testing, by allowing the unit tests to control (to some extent)
// the goroutine used for processing the queue.
var archiveQueueMutex *sync.Mutex
var archiveQueueStartCond *sync.Cond
var archiveQueueReleaseCond *sync.Cond

// GetArchivePath returns the path from which we can serve this archive.
func (aReq *ArchiveRequest) GetArchivePath() string {
	return aReq.archivePath
}

// GetArchiveName returns the name of the caller, based on the ref used by the
// caller to create this request.
func (aReq *ArchiveRequest) GetArchiveName() string {
	return aReq.refName + aReq.ext
}

// IsComplete returns the completion status of this request.
func (aReq *ArchiveRequest) IsComplete() bool {
	return aReq.archiveComplete
}

// WaitForCompletion will wait for this request to complete, with no timeout.
// It returns whether the archive was actually completed, as the channel could
// have also been closed due to an error.
func (aReq *ArchiveRequest) WaitForCompletion(ctx *context.Context) bool {
	select {
	case <-aReq.cchan:
	case <-ctx.Req.Context().Done():
	}

	return aReq.IsComplete()
}

// TimedWaitForCompletion will wait for this request to complete, with timeout
// happening after the specified Duration.  It returns whether the archive is
// now complete and whether we hit the timeout or not.  The latter may not be
// useful if the request is complete or we started to shutdown.
func (aReq *ArchiveRequest) TimedWaitForCompletion(ctx *context.Context, dur time.Duration) (bool, bool) {
	timeout := false
	select {
	case <-time.After(dur):
		timeout = true
	case <-aReq.cchan:
	case <-ctx.Req.Context().Done():
	}

	return aReq.IsComplete(), timeout
}

// The caller must hold the archiveMutex across calls to getArchiveRequest.
func getArchiveRequest(repo *git.Repository, commit *git.Commit, archiveType git.ArchiveType) *ArchiveRequest {
	for _, r := range archiveInProgress {
		// Need to be referring to the same repository.
		if r.repo.Path == repo.Path && r.commit.ID == commit.ID && r.archiveType == archiveType {
			return r
		}
	}
	return nil
}

// DeriveRequestFrom creates an archival request, based on the URI.  The
// resulting ArchiveRequest is suitable for being passed to ArchiveRepository()
// if it's determined that the request still needs to be satisfied.
func DeriveRequestFrom(ctx *context.Context, uri string) *ArchiveRequest {
	if ctx.Repo == nil || ctx.Repo.GitRepo == nil {
		log.Trace("Repo not initialized")
		return nil
	}
	r := &ArchiveRequest{
		uri:  uri,
		repo: ctx.Repo.GitRepo,
	}

	switch {
	case strings.HasSuffix(uri, ".zip"):
		r.ext = ".zip"
		r.archivePath = path.Join(r.repo.Path, "archives/zip")
		r.archiveType = git.ZIP
	case strings.HasSuffix(uri, ".tar.gz"):
		r.ext = ".tar.gz"
		r.archivePath = path.Join(r.repo.Path, "archives/targz")
		r.archiveType = git.TARGZ
	default:
		log.Trace("Unknown format: %s", uri)
		return nil
	}

	r.refName = strings.TrimSuffix(r.uri, r.ext)
	isDir, err := util.IsDir(r.archivePath)
	if err != nil {
		ctx.ServerError("Download -> util.IsDir(archivePath)", err)
		return nil
	}
	if !isDir {
		if err := os.MkdirAll(r.archivePath, os.ModePerm); err != nil {
			ctx.ServerError("Download -> os.MkdirAll(archivePath)", err)
			return nil
		}
	}

	// Get corresponding commit.
	if r.repo.IsBranchExist(r.refName) {
		r.commit, err = r.repo.GetBranchCommit(r.refName)
		if err != nil {
			ctx.ServerError("GetBranchCommit", err)
			return nil
		}
	} else if r.repo.IsTagExist(r.refName) {
		r.commit, err = r.repo.GetTagCommit(r.refName)
		if err != nil {
			ctx.ServerError("GetTagCommit", err)
			return nil
		}
	} else if shaRegex.MatchString(r.refName) {
		r.commit, err = r.repo.GetCommit(r.refName)
		if err != nil {
			ctx.NotFound("GetCommit", nil)
			return nil
		}
	} else {
		ctx.NotFound("DeriveRequestFrom", nil)
		return nil
	}

	archiveMutex.Lock()
	defer archiveMutex.Unlock()
	if rExisting := getArchiveRequest(r.repo, r.commit, r.archiveType); rExisting != nil {
		return rExisting
	}

	r.archivePath = path.Join(r.archivePath, base.ShortSha(r.commit.ID.String())+r.ext)
	r.archiveComplete, err = util.IsFile(r.archivePath)
	if err != nil {
		ctx.ServerError("util.IsFile", err)
		return nil
	}
	return r
}

func doArchive(r *ArchiveRequest) {
	var (
		err         error
		tmpArchive  *os.File
		destArchive *os.File
	)

	// Close the channel to indicate to potential waiters that this request
	// has finished.
	defer close(r.cchan)

	// It could have happened that we enqueued two archival requests, due to
	// race conditions and difficulties in locking.  Do one last check that
	// the archive we're referring to doesn't already exist.  If it does exist,
	// then just mark the request as complete and move on.
	isFile, err := util.IsFile(r.archivePath)
	if err != nil {
		log.Error("Unable to check if %s util.IsFile: %v. Will ignore and recreate.", r.archivePath, err)
	}
	if isFile {
		r.archiveComplete = true
		return
	}

	// Create a temporary file to use while the archive is being built.  We
	// will then copy it into place (r.archivePath) once it's fully
	// constructed.
	tmpArchive, err = ioutil.TempFile("", "archive")
	if err != nil {
		log.Error("Unable to create a temporary archive file! Error: %v", err)
		return
	}
	defer func() {
		tmpArchive.Close()
		os.Remove(tmpArchive.Name())
	}()

	if err = r.commit.CreateArchive(graceful.GetManager().ShutdownContext(), tmpArchive.Name(), git.CreateArchiveOpts{
		Format: r.archiveType,
		Prefix: setting.Repository.PrefixArchiveFiles,
	}); err != nil {
		log.Error("Download -> CreateArchive "+tmpArchive.Name(), err)
		return
	}

	// Now we copy it into place
	if destArchive, err = os.Create(r.archivePath); err != nil {
		log.Error("Unable to open archive " + r.archivePath)
		return
	}
	_, err = io.Copy(destArchive, tmpArchive)
	destArchive.Close()
	if err != nil {
		log.Error("Unable to write archive " + r.archivePath)
		return
	}

	// Block any attempt to finalize creating a new request if we're marking
	r.archiveComplete = true
}

// ArchiveRepository satisfies the ArchiveRequest being passed in.  Processing
// will occur in a separate goroutine, as this phase may take a while to
// complete.  If the archive already exists, ArchiveRepository will not do
// anything.  In all cases, the caller should be examining the *ArchiveRequest
// being returned for completion, as it may be different than the one they passed
// in.
func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest {
	// We'll return the request that's already been enqueued if it has been
	// enqueued, or we'll immediately enqueue it if it has not been enqueued
	// and it is not marked complete.
	archiveMutex.Lock()
	defer archiveMutex.Unlock()
	if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil {
		return rExisting
	}
	if request.archiveComplete {
		return request
	}

	request.cchan = make(chan struct{})
	archiveInProgress = append(archiveInProgress, request)
	go func() {
		// Wait to start, if we have the Cond for it.  This is currently only
		// useful for testing, so that the start and release of queued entries
		// can be controlled to examine the queue.
		if archiveQueueStartCond != nil {
			archiveQueueMutex.Lock()
			archiveQueueStartCond.Wait()
			archiveQueueMutex.Unlock()
		}

		// Drop the mutex while we process the request.  This may take a long
		// time, and it's not necessary now that we've added the reequest to
		// archiveInProgress.
		doArchive(request)

		if archiveQueueReleaseCond != nil {
			archiveQueueMutex.Lock()
			archiveQueueReleaseCond.Wait()
			archiveQueueMutex.Unlock()
		}

		// Purge this request from the list.  To do so, we'll just take the
		// index at which we ended up at and swap the final element into that
		// position, then chop off the now-redundant final element.  The slice
		// may have change in between these two segments and we may have moved,
		// so we search for it here.  We could perhaps avoid this search
		// entirely if len(archiveInProgress) == 1, but we should verify
		// correctness.
		archiveMutex.Lock()
		defer archiveMutex.Unlock()

		idx := -1
		for _idx, req := range archiveInProgress {
			if req == request {
				idx = _idx
				break
			}
		}
		if idx == -1 {
			log.Error("ArchiveRepository: Failed to find request for removal.")
			return
		}
		archiveInProgress = append(archiveInProgress[:idx], archiveInProgress[idx+1:]...)
	}()

	return request
}