var archiveInProgress []*ArchiveRequest
var archiveMutex sync.Mutex
+var archiveCond *sync.Cond
// These facilitate testing, by allowing the unit tests to control (to some extent)
// the goroutine used for processing the queue.
return
}
+ // Block any attempt to finalize creating a new request if we're marking
r.archiveComplete = true
}
// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
// will occur in a separate goroutine, as this phase may take a while to
// complete. If the archive already exists, ArchiveRepository will not do
-// anything.
-func ArchiveRepository(request *ArchiveRequest) {
- if request.archiveComplete {
- return
+// anything. In all cases, the caller should be examining the *ArchiveRequest
+// being returned for completion, as it may be different than the one they passed
+// in.
+func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest {
+ // We'll return the request that's already been enqueued if it has been
+ // enqueued, or we'll immediately enqueue it if it has not been enqueued
+ // and it is not marked complete.
+ archiveMutex.Lock()
+ if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil {
+ archiveMutex.Unlock()
+ return rExisting
}
- go func() {
- // We'll take some liberties here, in that the caller may not assume that the
- // specific request they submitted is the one getting enqueued. We'll just drop
- // it if it turns out we've already enqueued an identical request, as they'll keep
- // checking back for the status anyways.
- archiveMutex.Lock()
- if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil {
- archiveMutex.Unlock()
- return
- }
- archiveInProgress = append(archiveInProgress, request)
+ if request.archiveComplete {
archiveMutex.Unlock()
+ return request
+ }
+ archiveInProgress = append(archiveInProgress, request)
+ archiveMutex.Unlock()
+ go func() {
// Wait to start, if we have the Cond for it. This is currently only
// useful for testing, so that the start and release of queued entries
// can be controlled to examine the queue.
// correctness.
archiveMutex.Lock()
defer archiveMutex.Unlock()
+ // Wake up all other goroutines that may be waiting on a request to
+ // complete. They should all wake up, see if that particular request
+ // is complete, then return to waiting if it is not.
+ archiveCond.Broadcast()
+
idx := -1
for _idx, req := range archiveInProgress {
if req == request {
}
archiveInProgress = archiveInProgress[:lastidx]
}()
+
+ return request
+}
+
+// LockQueue will obtain the archiveMutex for the caller. This allows the
+// underlying locking mechanism to remain opaque.
+func LockQueue() {
+ archiveMutex.Lock()
+}
+
+// UnlockQueue will release the archiveMutex for the caller, again allowing the
+// underlying locking mechanism to remain opaque.
+func UnlockQueue() {
+ archiveMutex.Unlock()
+}
+
+// WaitForCompletion should be called with the queue locked (LockQueue), and will
+// return with the queue lock held when a single archive request has finished.
+// There is currently no API for getting notified of a particular request being
+// completed.
+func WaitForCompletion() {
+ archiveCond.Wait()
+}
+
+// NewContext will initialize local state, e.g. primitives needed to be able to
+// synchronize with the lock queue and allow callers to wait for an archive to
+// finish.
+func NewContext() {
+ archiveCond = sync.NewCond(&archiveMutex)
}