return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
}
+// zeroBoost will add a temporary boost worker for a no worker queue
+// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
+// (This is because addWorkers has to be called whilst unlocked)
func (p *WorkerPool) zeroBoost() {
ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
mq := GetManager().GetManagedQueue(p.qid)
}
p.pause()
}
+ select {
+ case <-p.baseCtx.Done():
+ // this worker queue is shut-down don't reboost
+ default:
+ if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
+ // OK there are no workers but... there's still work to be done -> Reboost
+ p.zeroBoost()
+ // p.lock will be unlocked by zeroBoost
+ return
+ }
+ }
p.lock.Unlock()
}()
}
handler := func(idx int, bean interface{}, limit int) error {
var item SyncRequest
+ var repo *repo_model.Repository
if m, ok := bean.(*repo_model.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
+ repo = m.Repo
item = SyncRequest{
Type: PullMirrorType,
RepoID: m.RepoID,
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
+ repo = m.Repo
item = SyncRequest{
Type: PushMirrorType,
RepoID: m.RepoID,
default:
}
- // Check if this request is already in the queue
- has, err := mirrorQueue.Has(&item)
- if err != nil {
- return err
- }
- if has {
- return nil
- }
-
// Push to the Queue
if err := mirrorQueue.Push(&item); err != nil {
+ if err == queue.ErrAlreadyInQueue {
+ if item.Type == PushMirrorType {
+ log.Trace("PushMirrors for %-v already queued for sync", repo)
+ } else {
+ log.Trace("PullMirrors for %-v already queued for sync", repo)
+ }
+ return nil
+ }
return err
}
return nil
}
+ pullMirrorsRequested := 0
if pullLimit != 0 {
+ requested = 0
if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error {
return handler(idx, bean, pullLimit)
}); err != nil && err != errLimit {
log.Error("MirrorsIterate: %v", err)
return err
}
+ pullMirrorsRequested, requested = requested, 0
}
+ pushMirrorsRequested := 0
if pushLimit != 0 {
+ requested = 0
if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error {
return handler(idx, bean, pushLimit)
}); err != nil && err != errLimit {
log.Error("PushMirrorsIterate: %v", err)
return err
}
+ pushMirrorsRequested, requested = requested, 0
}
- log.Trace("Finished: Update")
+ log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
return nil
}