aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--modules/queue/workerpool.go14
-rw-r--r--services/mirror/mirror.go28
2 files changed, 32 insertions, 10 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 20108d3588..39ea59b7b1 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -115,6 +115,9 @@ func (p *WorkerPool) hasNoWorkerScaling() bool {
return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
}
+// zeroBoost will add a temporary boost worker for a no worker queue
+// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
+// (This is because addWorkers has to be called whilst unlocked)
func (p *WorkerPool) zeroBoost() {
ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
mq := GetManager().GetManagedQueue(p.qid)
@@ -316,6 +319,17 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
}
p.pause()
}
+ select {
+ case <-p.baseCtx.Done():
+ // this worker queue is shut-down don't reboost
+ default:
+ if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
+ // OK there are no workers but... there's still work to be done -> Reboost
+ p.zeroBoost()
+ // p.lock will be unlocked by zeroBoost
+ return
+ }
+ }
p.lock.Unlock()
}()
}
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 6f285ec467..5639a08f96 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
handler := func(idx int, bean interface{}, limit int) error {
var item SyncRequest
+ var repo *repo_model.Repository
if m, ok := bean.(*repo_model.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
+ repo = m.Repo
item = SyncRequest{
Type: PullMirrorType,
RepoID: m.RepoID,
@@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
+ repo = m.Repo
item = SyncRequest{
Type: PushMirrorType,
RepoID: m.RepoID,
@@ -89,17 +92,16 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
default:
}
- // Check if this request is already in the queue
- has, err := mirrorQueue.Has(&item)
- if err != nil {
- return err
- }
- if has {
- return nil
- }
-
// Push to the Queue
if err := mirrorQueue.Push(&item); err != nil {
+ if err == queue.ErrAlreadyInQueue {
+ if item.Type == PushMirrorType {
+ log.Trace("PushMirrors for %-v already queued for sync", repo)
+ } else {
+ log.Trace("PullMirrors for %-v already queued for sync", repo)
+ }
+ return nil
+ }
return err
}
@@ -110,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
return nil
}
+ pullMirrorsRequested := 0
if pullLimit != 0 {
+ requested = 0
if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error {
return handler(idx, bean, pullLimit)
}); err != nil && err != errLimit {
log.Error("MirrorsIterate: %v", err)
return err
}
+ pullMirrorsRequested, requested = requested, 0
}
+ pushMirrorsRequested := 0
if pushLimit != 0 {
+ requested = 0
if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error {
return handler(idx, bean, pushLimit)
}); err != nil && err != errLimit {
log.Error("PushMirrorsIterate: %v", err)
return err
}
+ pushMirrorsRequested, requested = requested, 0
}
- log.Trace("Finished: Update")
+ log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
return nil
}