]> source.dussan.org Git - gitea.git/commitdiff
Queue: Make WorkerPools and Queues flushable (#10001)
authorzeripath <art27@cantab.net>
Wed, 29 Jan 2020 01:01:06 +0000 (01:01 +0000)
committerGitHub <noreply@github.com>
Wed, 29 Jan 2020 01:01:06 +0000 (20:01 -0500)
* Make WorkerPools and Queues flushable

Adds Flush methods to Queues and the WorkerPool
Further abstracts the WorkerPool
Adds a final step to Flush the queues in the defer from PrintCurrentTest
Fixes an issue with Settings inheritance in queues

Signed-off-by: Andrew Thornton <art27@cantab.net>
* Change to for loop

* Add IsEmpty and begin just making the queues composed WorkerPools

* subsume workerpool into the queues and create a flushable interface

* Add manager command

* Move flushall to queue.Manager and add to testlogger

* As per @guillep2k

* as per @guillep2k

* Just make queues all implement flushable and clean up the wrapped queue flushes

* cope with no timeout

Co-authored-by: Lauris BH <lauris@nix.lv>
27 files changed:
cmd/manager.go [new file with mode: 0644]
integrations/testlogger.go
main.go
modules/graceful/manager_unix.go
modules/graceful/manager_windows.go
modules/private/manager.go [new file with mode: 0644]
modules/queue/helper.go [new file with mode: 0644]
modules/queue/manager.go
modules/queue/queue.go
modules/queue/queue_channel.go
modules/queue/queue_channel_test.go
modules/queue/queue_disk.go
modules/queue/queue_disk_channel.go
modules/queue/queue_disk_test.go
modules/queue/queue_redis.go
modules/queue/queue_wrapped.go
modules/queue/setting.go
modules/queue/workerpool.go
modules/setting/queue.go
options/locale/locale_en-US.ini
routers/admin/admin.go
routers/private/internal.go
routers/private/manager.go [new file with mode: 0644]
routers/private/manager_unix.go [new file with mode: 0644]
routers/private/manager_windows.go [new file with mode: 0644]
routers/routes/routes.go
templates/admin/queue.tmpl

diff --git a/cmd/manager.go b/cmd/manager.go
new file mode 100644 (file)
index 0000000..eed0a9e
--- /dev/null
@@ -0,0 +1,92 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package cmd
+
+import (
+       "fmt"
+       "net/http"
+       "os"
+       "time"
+
+       "code.gitea.io/gitea/modules/private"
+
+       "github.com/urfave/cli"
+)
+
+var (
+       // CmdManager represents the manager command
+       CmdManager = cli.Command{
+               Name:        "manager",
+               Usage:       "Manage the running gitea process",
+               Description: "This is a command for managing the running gitea process",
+               Subcommands: []cli.Command{
+                       subcmdShutdown,
+                       subcmdRestart,
+                       subcmdFlushQueues,
+               },
+       }
+       subcmdShutdown = cli.Command{
+               Name:   "shutdown",
+               Usage:  "Gracefully shutdown the running process",
+               Action: runShutdown,
+       }
+       subcmdRestart = cli.Command{
+               Name:   "restart",
+               Usage:  "Gracefully restart the running process - (not implemented for windows servers)",
+               Action: runRestart,
+       }
+       subcmdFlushQueues = cli.Command{
+               Name:   "flush-queues",
+               Usage:  "Flush queues in the running process",
+               Action: runFlushQueues,
+               Flags: []cli.Flag{
+                       cli.DurationFlag{
+                               Name:  "timeout",
+                               Value: 60 * time.Second,
+                               Usage: "Timeout for the flushing process",
+                       },
+                       cli.BoolFlag{
+                               Name:  "non-blocking",
+                               Usage: "Set to true to not wait for flush to complete before returning",
+                       },
+               },
+       }
+)
+
+func runShutdown(c *cli.Context) error {
+       setup("manager", false)
+       statusCode, msg := private.Shutdown()
+       switch statusCode {
+       case http.StatusInternalServerError:
+               fail("InternalServerError", msg)
+       }
+
+       fmt.Fprintln(os.Stdout, msg)
+       return nil
+}
+
+func runRestart(c *cli.Context) error {
+       setup("manager", false)
+       statusCode, msg := private.Restart()
+       switch statusCode {
+       case http.StatusInternalServerError:
+               fail("InternalServerError", msg)
+       }
+
+       fmt.Fprintln(os.Stdout, msg)
+       return nil
+}
+
+func runFlushQueues(c *cli.Context) error {
+       setup("manager", false)
+       statusCode, msg := private.FlushQueues(c.Duration("timeout"), c.Bool("non-blocking"))
+       switch statusCode {
+       case http.StatusInternalServerError:
+               fail("InternalServerError", msg)
+       }
+
+       fmt.Fprintln(os.Stdout, msg)
+       return nil
+}
index 624abf3f811a1f16dd0341a9761dbb5649eded30..b2ad257a9b86e35523125b30783044027ba93251 100644 (file)
@@ -5,6 +5,7 @@
 package integrations
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "os"
@@ -12,8 +13,10 @@ import (
        "strings"
        "sync"
        "testing"
+       "time"
 
        "code.gitea.io/gitea/modules/log"
+       "code.gitea.io/gitea/modules/queue"
 )
 
 var prefix string
@@ -98,6 +101,9 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() {
        }
        writerCloser.setT(&t)
        return func() {
+               if err := queue.GetManager().FlushAll(context.Background(), 20*time.Second); err != nil {
+                       t.Errorf("Flushing queues failed with error %v", err)
+               }
                _ = writerCloser.Close()
        }
 }
diff --git a/main.go b/main.go
index 6ec272c9af9c2c97c9ea7477b785ff1b23410b1a..c97513293aa4bdcf4da38265bbe4c44602f7a791 100644 (file)
--- a/main.go
+++ b/main.go
@@ -69,6 +69,7 @@ arguments - which can alternatively be run by running the subcommand web.`
                cmd.CmdKeys,
                cmd.CmdConvert,
                cmd.CmdDoctor,
+               cmd.CmdManager,
        }
        // Now adjust these commands to add our global configuration options
 
index 323c6a4111da2b85be44036d9f07226037d44286..68aa7242646d2b807e5c5a217849daec7e83da97 100644 (file)
@@ -110,28 +110,19 @@ func (g *Manager) handleSignals(ctx context.Context) {
                case sig := <-signalChannel:
                        switch sig {
                        case syscall.SIGHUP:
-                               if setting.GracefulRestartable {
-                                       log.Info("PID: %d. Received SIGHUP. Forking...", pid)
-                                       err := g.doFork()
-                                       if err != nil && err.Error() != "another process already forked. Ignoring this one" {
-                                               log.Error("Error whilst forking from PID: %d : %v", pid, err)
-                                       }
-                               } else {
-                                       log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid)
-
-                                       g.doShutdown()
-                               }
+                               log.Info("PID: %d. Received SIGHUP. Attempting GracefulShutdown...", pid)
+                               g.DoGracefulShutdown()
                        case syscall.SIGUSR1:
                                log.Info("PID %d. Received SIGUSR1.", pid)
                        case syscall.SIGUSR2:
                                log.Warn("PID %d. Received SIGUSR2. Hammering...", pid)
-                               g.doHammerTime(0 * time.Second)
+                               g.DoImmediateHammer()
                        case syscall.SIGINT:
                                log.Warn("PID %d. Received SIGINT. Shutting down...", pid)
-                               g.doShutdown()
+                               g.DoGracefulShutdown()
                        case syscall.SIGTERM:
                                log.Warn("PID %d. Received SIGTERM. Shutting down...", pid)
-                               g.doShutdown()
+                               g.DoGracefulShutdown()
                        case syscall.SIGTSTP:
                                log.Info("PID %d. Received SIGTSTP.", pid)
                        default:
@@ -139,7 +130,7 @@ func (g *Manager) handleSignals(ctx context.Context) {
                        }
                case <-ctx.Done():
                        log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err())
-                       g.doShutdown()
+                       g.DoGracefulShutdown()
                }
        }
 }
@@ -160,6 +151,31 @@ func (g *Manager) doFork() error {
        return err
 }
 
+// DoGracefulRestart causes a graceful restart
+func (g *Manager) DoGracefulRestart() {
+       if setting.GracefulRestartable {
+               log.Info("PID: %d. Forking...", os.Getpid())
+               err := g.doFork()
+               if err != nil && err.Error() != "another process already forked. Ignoring this one" {
+                       log.Error("Error whilst forking from PID: %d : %v", os.Getpid(), err)
+               }
+       } else {
+               log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid())
+
+               g.doShutdown()
+       }
+}
+
+// DoImmediateHammer causes an immediate hammer
+func (g *Manager) DoImmediateHammer() {
+       g.doHammerTime(0 * time.Second)
+}
+
+// DoGracefulShutdown causes a graceful shutdown
+func (g *Manager) DoGracefulShutdown() {
+       g.doShutdown()
+}
+
 // RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die.
 // Any call to RegisterServer must be matched by a call to ServerDone
 func (g *Manager) RegisterServer() {
index 526fc0bd24c7543a79c3b576852b3cf196063cc6..cb4475ebb28d8aba2a6bf931f7221285e054763b 100644 (file)
@@ -43,6 +43,7 @@ type Manager struct {
        runningServerWaitGroup sync.WaitGroup
        createServerWaitGroup  sync.WaitGroup
        terminateWaitGroup     sync.WaitGroup
+       shutdownRequested          chan struct{}
 }
 
 func newGracefulManager(ctx context.Context) *Manager {
@@ -62,6 +63,7 @@ func (g *Manager) start() {
        g.shutdown = make(chan struct{})
        g.hammer = make(chan struct{})
        g.done = make(chan struct{})
+       g.shutdownRequested = make(chan struct{})
 
        // Set the running state
        g.setState(stateRunning)
@@ -107,7 +109,10 @@ loop:
        for {
                select {
                case <-g.ctx.Done():
-                       g.doShutdown()
+                       g.DoGracefulShutdown()
+                       waitTime += setting.GracefulHammerTime
+                       break loop
+               case <-g.shutdownRequested:
                        waitTime += setting.GracefulHammerTime
                        break loop
                case change := <-changes:
@@ -115,12 +120,12 @@ loop:
                        case svc.Interrogate:
                                status <- change.CurrentStatus
                        case svc.Stop, svc.Shutdown:
-                               g.doShutdown()
+                               g.DoGracefulShutdown()
                                waitTime += setting.GracefulHammerTime
                                break loop
                        case hammerCode:
-                               g.doShutdown()
-                               g.doHammerTime(0 * time.Second)
+                               g.DoGracefulShutdown()
+                               g.DoImmediateHammer()
                                break loop
                        default:
                                log.Debug("Unexpected control request: %v", change.Cmd)
@@ -140,7 +145,7 @@ hammerLoop:
                        case svc.Interrogate:
                                status <- change.CurrentStatus
                        case svc.Stop, svc.Shutdown, hammerCmd:
-                               g.doHammerTime(0 * time.Second)
+                               g.DoImmediateHammer()
                                break hammerLoop
                        default:
                                log.Debug("Unexpected control request: %v", change.Cmd)
@@ -152,6 +157,24 @@ hammerLoop:
        return false, 0
 }
 
+// DoImmediateHammer causes an immediate hammer
+func (g *Manager) DoImmediateHammer() {
+       g.doHammerTime(0 * time.Second)
+}
+
+// DoGracefulShutdown causes a graceful shutdown
+func (g *Manager) DoGracefulShutdown() {
+       g.lock.Lock()
+       select {
+       case <-g.shutdownRequested:
+               g.lock.Unlock()
+       default:
+               close(g.shutdownRequested)
+               g.lock.Unlock()
+               g.doShutdown()
+       }
+}
+
 // RegisterServer registers the running of a listening server.
 // Any call to RegisterServer must be matched by a call to ServerDone
 func (g *Manager) RegisterServer() {
diff --git a/modules/private/manager.go b/modules/private/manager.go
new file mode 100644 (file)
index 0000000..de46c7a
--- /dev/null
@@ -0,0 +1,83 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package private
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "time"
+
+       "code.gitea.io/gitea/modules/setting"
+)
+
+// Shutdown calls the internal shutdown function
+func Shutdown() (int, string) {
+       reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/shutdown")
+
+       req := newInternalRequest(reqURL, "POST")
+       resp, err := req.Response()
+       if err != nil {
+               return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               return resp.StatusCode, decodeJSONError(resp).Err
+       }
+
+       return http.StatusOK, "Shutting down"
+}
+
+// Restart calls the internal restart function
+func Restart() (int, string) {
+       reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/restart")
+
+       req := newInternalRequest(reqURL, "POST")
+       resp, err := req.Response()
+       if err != nil {
+               return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               return resp.StatusCode, decodeJSONError(resp).Err
+       }
+
+       return http.StatusOK, "Restarting"
+}
+
+// FlushOptions represents the options for the flush call
+type FlushOptions struct {
+       Timeout     time.Duration
+       NonBlocking bool
+}
+
+// FlushQueues calls the internal flush-queues function
+func FlushQueues(timeout time.Duration, nonBlocking bool) (int, string) {
+       reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/flush-queues")
+
+       req := newInternalRequest(reqURL, "POST")
+       if timeout > 0 {
+               req.SetTimeout(timeout+10*time.Second, timeout+10*time.Second)
+       }
+       req = req.Header("Content-Type", "application/json")
+       jsonBytes, _ := json.Marshal(FlushOptions{
+               Timeout:     timeout,
+               NonBlocking: nonBlocking,
+       })
+       req.Body(jsonBytes)
+       resp, err := req.Response()
+       if err != nil {
+               return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               return resp.StatusCode, decodeJSONError(resp).Err
+       }
+
+       return http.StatusOK, "Flushed"
+}
diff --git a/modules/queue/helper.go b/modules/queue/helper.go
new file mode 100644 (file)
index 0000000..e6fb1b9
--- /dev/null
@@ -0,0 +1,63 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "encoding/json"
+       "reflect"
+)
+
+// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
+//
+// It will tolerate the cfg being passed as a []byte or string of a json representation of the
+// exemplar or the correct type of the exemplar itself
+func toConfig(exemplar, cfg interface{}) (interface{}, error) {
+       if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
+               return cfg, nil
+       }
+
+       configBytes, ok := cfg.([]byte)
+       if !ok {
+               configStr, ok := cfg.(string)
+               if !ok {
+                       return nil, ErrInvalidConfiguration{cfg: cfg}
+               }
+               configBytes = []byte(configStr)
+       }
+       newVal := reflect.New(reflect.TypeOf(exemplar))
+       if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
+               return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
+       }
+       return newVal.Elem().Interface(), nil
+}
+
+// unmarshalAs will attempt to unmarshal provided bytes as the provided exemplar
+func unmarshalAs(bs []byte, exemplar interface{}) (data Data, err error) {
+       if exemplar != nil {
+               t := reflect.TypeOf(exemplar)
+               n := reflect.New(t)
+               ne := n.Elem()
+               err = json.Unmarshal(bs, ne.Addr().Interface())
+               data = ne.Interface().(Data)
+       } else {
+               err = json.Unmarshal(bs, &data)
+       }
+
+       return
+}
+
+// assignableTo will check if provided data is assignable to the same type as the exemplar
+// if the provided exemplar is nil then it will always return true
+func assignableTo(data Data, exemplar interface{}) bool {
+       if exemplar == nil {
+               return true
+       }
+
+       // Assert data is of same type as exemplar
+       t := reflect.TypeOf(data)
+       exemplarType := reflect.TypeOf(exemplar)
+
+       return t.AssignableTo(exemplarType) && data != nil
+}
index 88b2644848671549b5a5547c42bfd4e931f50f99..a6734787a981b85e8d00b7975b4060c3bd7705a2 100644 (file)
@@ -26,36 +26,57 @@ type Manager struct {
        Queues  map[int64]*ManagedQueue
 }
 
-// ManagedQueue represents a working queue inheriting from Gitea.
+// ManagedQueue represents a working queue with a Pool of workers.
+//
+// Although a ManagedQueue should really represent a Queue this does not
+// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
 type ManagedQueue struct {
        mutex         sync.Mutex
        QID           int64
-       Queue         Queue
        Type          Type
        Name          string
        Configuration interface{}
        ExemplarType  string
-       Pool          ManagedPool
+       Managed       interface{}
        counter       int64
        PoolWorkers   map[int64]*PoolWorkers
 }
 
+// Flushable represents a pool or queue that is flushable
+type Flushable interface {
+       // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
+       Flush(time.Duration) error
+       // FlushWithContext is very similar to Flush
+       // NB: The worker will not be registered with the manager.
+       FlushWithContext(ctx context.Context) error
+       // IsEmpty will return if the managed pool is empty and has no work
+       IsEmpty() bool
+}
+
 // ManagedPool is a simple interface to get certain details from a worker pool
 type ManagedPool interface {
+       // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
        AddWorkers(number int, timeout time.Duration) context.CancelFunc
+       // NumberOfWorkers returns the total number of workers in the pool
        NumberOfWorkers() int
+       // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
        MaxNumberOfWorkers() int
+       // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
        SetMaxNumberOfWorkers(int)
+       // BoostTimeout returns the current timeout for worker groups created during a boost
        BoostTimeout() time.Duration
+       // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
        BlockTimeout() time.Duration
+       // BoostWorkers sets the number of workers to be created during a boost
        BoostWorkers() int
-       SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+       // SetPoolSettings sets the user updatable settings for the pool
+       SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
 }
 
 // ManagedQueueList implements the sort.Interface
 type ManagedQueueList []*ManagedQueue
 
-// PoolWorkers represents a working queue inheriting from Gitea.
+// PoolWorkers represents a group of workers working on a queue
 type PoolWorkers struct {
        PID        int64
        Workers    int
@@ -63,9 +84,10 @@ type PoolWorkers struct {
        Timeout    time.Time
        HasTimeout bool
        Cancel     context.CancelFunc
+       IsFlusher  bool
 }
 
-// PoolWorkersList implements the sort.Interface
+// PoolWorkersList implements the sort.Interface for PoolWorkers
 type PoolWorkersList []*PoolWorkers
 
 func init() {
@@ -83,27 +105,28 @@ func GetManager() *Manager {
 }
 
 // Add adds a queue to this manager
-func (m *Manager) Add(queue Queue,
+func (m *Manager) Add(managed interface{},
        t Type,
        configuration,
-       exemplar interface{},
-       pool ManagedPool) int64 {
+       exemplar interface{}) int64 {
 
        cfg, _ := json.Marshal(configuration)
        mq := &ManagedQueue{
-               Queue:         queue,
                Type:          t,
                Configuration: string(cfg),
                ExemplarType:  reflect.TypeOf(exemplar).String(),
                PoolWorkers:   make(map[int64]*PoolWorkers),
-               Pool:          pool,
+               Managed:       managed,
        }
        m.mutex.Lock()
        m.counter++
        mq.QID = m.counter
        mq.Name = fmt.Sprintf("queue-%d", mq.QID)
-       if named, ok := queue.(Named); ok {
-               mq.Name = named.Name()
+       if named, ok := managed.(Named); ok {
+               name := named.Name()
+               if len(name) > 0 {
+                       mq.Name = name
+               }
        }
        m.Queues[mq.QID] = mq
        m.mutex.Unlock()
@@ -127,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
        return m.Queues[qid]
 }
 
+// FlushAll flushes all the flushable queues attached to this manager
+func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
+       var ctx context.Context
+       var cancel context.CancelFunc
+       start := time.Now()
+       end := start
+       hasTimeout := false
+       if timeout > 0 {
+               ctx, cancel = context.WithTimeout(baseCtx, timeout)
+               end = start.Add(timeout)
+               hasTimeout = true
+       } else {
+               ctx, cancel = context.WithCancel(baseCtx)
+       }
+       defer cancel()
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+               }
+               mqs := m.ManagedQueues()
+               wg := sync.WaitGroup{}
+               wg.Add(len(mqs))
+               allEmpty := true
+               for _, mq := range mqs {
+                       if mq.IsEmpty() {
+                               wg.Done()
+                               continue
+                       }
+                       allEmpty = false
+                       if flushable, ok := mq.Managed.(Flushable); ok {
+                               go func() {
+                                       localCtx, localCancel := context.WithCancel(ctx)
+                                       pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
+                                       err := flushable.FlushWithContext(localCtx)
+                                       if err != nil && err != ctx.Err() {
+                                               cancel()
+                                       }
+                                       mq.CancelWorkers(pid)
+                                       localCancel()
+                                       wg.Done()
+                               }()
+                       } else {
+                               wg.Done()
+                       }
+
+               }
+               if allEmpty {
+                       break
+               }
+               wg.Wait()
+       }
+       return nil
+
+}
+
 // ManagedQueues returns the managed queues
 func (m *Manager) ManagedQueues() []*ManagedQueue {
        m.mutex.Lock()
@@ -152,7 +233,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers {
 }
 
 // RegisterWorkers registers workers to this queue
-func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
+func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
        q.mutex.Lock()
        defer q.mutex.Unlock()
        q.counter++
@@ -163,6 +244,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b
                Timeout:    timeout,
                HasTimeout: hasTimeout,
                Cancel:     cancel,
+               IsFlusher:  isFlusher,
        }
        return q.counter
 }
@@ -191,57 +273,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) {
 
 // AddWorkers adds workers to the queue if it has registered an add worker function
 func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
-       if q.Pool != nil {
+       if pool, ok := q.Managed.(ManagedPool); ok {
                // the cancel will be added to the pool workers description above
-               return q.Pool.AddWorkers(number, timeout)
+               return pool.AddWorkers(number, timeout)
        }
        return nil
 }
 
+// Flush flushes the queue with a timeout
+func (q *ManagedQueue) Flush(timeout time.Duration) error {
+       if flushable, ok := q.Managed.(Flushable); ok {
+               // the cancel will be added to the pool workers description above
+               return flushable.Flush(timeout)
+       }
+       return nil
+}
+
+// IsEmpty returns if the queue is empty
+func (q *ManagedQueue) IsEmpty() bool {
+       if flushable, ok := q.Managed.(Flushable); ok {
+               return flushable.IsEmpty()
+       }
+       return true
+}
+
 // NumberOfWorkers returns the number of workers in the queue
 func (q *ManagedQueue) NumberOfWorkers() int {
-       if q.Pool != nil {
-               return q.Pool.NumberOfWorkers()
+       if pool, ok := q.Managed.(ManagedPool); ok {
+               return pool.NumberOfWorkers()
        }
        return -1
 }
 
 // MaxNumberOfWorkers returns the maximum number of workers for the pool
 func (q *ManagedQueue) MaxNumberOfWorkers() int {
-       if q.Pool != nil {
-               return q.Pool.MaxNumberOfWorkers()
+       if pool, ok := q.Managed.(ManagedPool); ok {
+               return pool.MaxNumberOfWorkers()
        }
        return 0
 }
 
 // BoostWorkers returns the number of workers for a boost
 func (q *ManagedQueue) BoostWorkers() int {
-       if q.Pool != nil {
-               return q.Pool.BoostWorkers()
+       if pool, ok := q.Managed.(ManagedPool); ok {
+               return pool.BoostWorkers()
        }
        return -1
 }
 
 // BoostTimeout returns the timeout of the next boost
 func (q *ManagedQueue) BoostTimeout() time.Duration {
-       if q.Pool != nil {
-               return q.Pool.BoostTimeout()
+       if pool, ok := q.Managed.(ManagedPool); ok {
+               return pool.BoostTimeout()
        }
        return 0
 }
 
 // BlockTimeout returns the timeout til the next boost
 func (q *ManagedQueue) BlockTimeout() time.Duration {
-       if q.Pool != nil {
-               return q.Pool.BlockTimeout()
+       if pool, ok := q.Managed.(ManagedPool); ok {
+               return pool.BlockTimeout()
        }
        return 0
 }
 
-// SetSettings sets the setable boost values
-func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
-       if q.Pool != nil {
-               q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
+// SetPoolSettings sets the setable boost values
+func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+       if pool, ok := q.Managed.(ManagedPool); ok {
+               pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
        }
 }
 
index d458a7d50627eb4a5937535f57d4f1cdf59c24bc..094699d4afde1513601eae07fb91a5452e7574b7 100644 (file)
@@ -6,9 +6,8 @@ package queue
 
 import (
        "context"
-       "encoding/json"
        "fmt"
-       "reflect"
+       "time"
 )
 
 // ErrInvalidConfiguration is called when there is invalid configuration for a queue
@@ -53,8 +52,11 @@ type Named interface {
        Name() string
 }
 
-// Queue defines an interface to save an issue indexer queue
+// Queue defines an interface of a queue-like item
+//
+// Queues will handle their own contents in the Run method
 type Queue interface {
+       Flushable
        Run(atShutdown, atTerminate func(context.Context, func()))
        Push(Data) error
 }
@@ -71,32 +73,27 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro
 type DummyQueue struct {
 }
 
-// Run starts to run the queue
+// Run does nothing
 func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
 
-// Push pushes data to the queue
+// Push fakes a push of data to the queue
 func (b *DummyQueue) Push(Data) error {
        return nil
 }
 
-func toConfig(exemplar, cfg interface{}) (interface{}, error) {
-       if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
-               return cfg, nil
-       }
+// Flush always returns nil
+func (b *DummyQueue) Flush(time.Duration) error {
+       return nil
+}
 
-       configBytes, ok := cfg.([]byte)
-       if !ok {
-               configStr, ok := cfg.(string)
-               if !ok {
-                       return nil, ErrInvalidConfiguration{cfg: cfg}
-               }
-               configBytes = []byte(configStr)
-       }
-       newVal := reflect.New(reflect.TypeOf(exemplar))
-       if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
-               return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
-       }
-       return newVal.Elem().Interface(), nil
+// FlushWithContext always returns nil
+func (b *DummyQueue) FlushWithContext(context.Context) error {
+       return nil
+}
+
+// IsEmpty asserts that the queue is empty
+func (b *DummyQueue) IsEmpty() bool {
+       return true
 }
 
 var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
@@ -123,7 +120,7 @@ func RegisteredTypesAsString() []string {
        return types
 }
 
-// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
+// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
 func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
        newFn, ok := queuesMap[queueType]
        if !ok {
index c8f8a53804e7ba53fc44b98cd1dcbf0273c92d87..45df8a443e4a4009952d031f4ebf5a1c99accb0b 100644 (file)
@@ -7,8 +7,6 @@ package queue
 import (
        "context"
        "fmt"
-       "reflect"
-       "time"
 
        "code.gitea.io/gitea/modules/log"
 )
@@ -18,25 +16,23 @@ const ChannelQueueType Type = "channel"
 
 // ChannelQueueConfiguration is the configuration for a ChannelQueue
 type ChannelQueueConfiguration struct {
-       QueueLength  int
-       BatchLength  int
-       Workers      int
-       MaxWorkers   int
-       BlockTimeout time.Duration
-       BoostTimeout time.Duration
-       BoostWorkers int
-       Name         string
+       WorkerPoolConfiguration
+       Workers int
+       Name    string
 }
 
-// ChannelQueue implements
+// ChannelQueue implements Queue
+//
+// A channel queue is not persistable and does not shutdown or terminate cleanly
+// It is basically a very thin wrapper around a WorkerPool
 type ChannelQueue struct {
-       pool     *WorkerPool
+       *WorkerPool
        exemplar interface{}
        workers  int
        name     string
 }
 
-// NewChannelQueue create a memory channel queue
+// NewChannelQueue creates a memory channel queue
 func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
        configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
        if err != nil {
@@ -46,26 +42,13 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
        if config.BatchLength == 0 {
                config.BatchLength = 1
        }
-       dataChan := make(chan Data, config.QueueLength)
-
-       ctx, cancel := context.WithCancel(context.Background())
        queue := &ChannelQueue{
-               pool: &WorkerPool{
-                       baseCtx:            ctx,
-                       cancel:             cancel,
-                       batchLength:        config.BatchLength,
-                       handle:             handle,
-                       dataChan:           dataChan,
-                       blockTimeout:       config.BlockTimeout,
-                       boostTimeout:       config.BoostTimeout,
-                       boostWorkers:       config.BoostWorkers,
-                       maxNumberOfWorkers: config.MaxWorkers,
-               },
-               exemplar: exemplar,
-               workers:  config.Workers,
-               name:     config.Name,
+               WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+               exemplar:   exemplar,
+               workers:    config.Workers,
+               name:       config.Name,
        }
-       queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool)
+       queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
        return queue, nil
 }
 
@@ -77,22 +60,18 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())
        atTerminate(context.Background(), func() {
                log.Warn("ChannelQueue: %s is not terminatable!", c.name)
        })
+       log.Debug("ChannelQueue: %s Starting", c.name)
        go func() {
-               _ = c.pool.AddWorkers(c.workers, 0)
+               _ = c.AddWorkers(c.workers, 0)
        }()
 }
 
 // Push will push data into the queue
 func (c *ChannelQueue) Push(data Data) error {
-       if c.exemplar != nil {
-               // Assert data is of same type as r.exemplar
-               t := reflect.TypeOf(data)
-               exemplarType := reflect.TypeOf(c.exemplar)
-               if !t.AssignableTo(exemplarType) || data == nil {
-                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
-               }
+       if !assignableTo(data, c.exemplar) {
+               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
        }
-       c.pool.Push(data)
+       c.WorkerPool.Push(data)
        return nil
 }
 
index fafc1e3303eb88f73668f5c0235884489b023cb7..8234b0f6f2a6c175c20266031892de75c636c0a6 100644 (file)
@@ -25,12 +25,14 @@ func TestChannelQueue(t *testing.T) {
 
        queue, err := NewChannelQueue(handle,
                ChannelQueueConfiguration{
-                       QueueLength:  20,
-                       Workers:      1,
-                       MaxWorkers:   10,
-                       BlockTimeout: 1 * time.Second,
-                       BoostTimeout: 5 * time.Minute,
-                       BoostWorkers: 5,
+                       WorkerPoolConfiguration: WorkerPoolConfiguration{
+                               QueueLength:  20,
+                               MaxWorkers:   10,
+                               BlockTimeout: 1 * time.Second,
+                               BoostTimeout: 5 * time.Minute,
+                               BoostWorkers: 5,
+                       },
+                       Workers: 1,
                }, &testData{})
        assert.NoError(t, err)
 
@@ -60,13 +62,15 @@ func TestChannelQueue_Batch(t *testing.T) {
 
        queue, err := NewChannelQueue(handle,
                ChannelQueueConfiguration{
-                       QueueLength:  20,
-                       BatchLength:  2,
-                       Workers:      1,
-                       MaxWorkers:   10,
-                       BlockTimeout: 1 * time.Second,
-                       BoostTimeout: 5 * time.Minute,
-                       BoostWorkers: 5,
+                       WorkerPoolConfiguration: WorkerPoolConfiguration{
+                               QueueLength:  20,
+                               BatchLength:  2,
+                               BlockTimeout: 1 * time.Second,
+                               BoostTimeout: 5 * time.Minute,
+                               BoostWorkers: 5,
+                               MaxWorkers:   10,
+                       },
+                       Workers: 1,
                }, &testData{})
        assert.NoError(t, err)
 
index 98e7b24e42fd5a9f0b2314723ae00164630c1e88..ca3e230e3d2d173aea5e2a691353f9840eca0a7a 100644 (file)
@@ -8,8 +8,8 @@ import (
        "context"
        "encoding/json"
        "fmt"
-       "reflect"
        "sync"
+       "sync/atomic"
        "time"
 
        "code.gitea.io/gitea/modules/log"
@@ -22,20 +22,15 @@ const LevelQueueType Type = "level"
 
 // LevelQueueConfiguration is the configuration for a LevelQueue
 type LevelQueueConfiguration struct {
-       DataDir      string
-       QueueLength  int
-       BatchLength  int
-       Workers      int
-       MaxWorkers   int
-       BlockTimeout time.Duration
-       BoostTimeout time.Duration
-       BoostWorkers int
-       Name         string
+       WorkerPoolConfiguration
+       DataDir string
+       Workers int
+       Name    string
 }
 
 // LevelQueue implements a disk library queue
 type LevelQueue struct {
-       pool       *WorkerPool
+       *WorkerPool
        queue      *levelqueue.Queue
        closed     chan struct{}
        terminated chan struct{}
@@ -58,21 +53,8 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
                return nil, err
        }
 
-       dataChan := make(chan Data, config.QueueLength)
-       ctx, cancel := context.WithCancel(context.Background())
-
        queue := &LevelQueue{
-               pool: &WorkerPool{
-                       baseCtx:            ctx,
-                       cancel:             cancel,
-                       batchLength:        config.BatchLength,
-                       handle:             handle,
-                       dataChan:           dataChan,
-                       blockTimeout:       config.BlockTimeout,
-                       boostTimeout:       config.BoostTimeout,
-                       boostWorkers:       config.BoostWorkers,
-                       maxNumberOfWorkers: config.MaxWorkers,
-               },
+               WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
                queue:      internal,
                exemplar:   exemplar,
                closed:     make(chan struct{}),
@@ -80,7 +62,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
                workers:    config.Workers,
                name:       config.Name,
        }
-       queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool)
+       queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
        return queue, nil
 }
 
@@ -88,9 +70,10 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
 func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
        atShutdown(context.Background(), l.Shutdown)
        atTerminate(context.Background(), l.Terminate)
+       log.Debug("LevelQueue: %s Starting", l.name)
 
        go func() {
-               _ = l.pool.AddWorkers(l.workers, 0)
+               _ = l.AddWorkers(l.workers, 0)
        }()
 
        go l.readToChan()
@@ -99,12 +82,12 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
        <-l.closed
 
        log.Trace("LevelQueue: %s Waiting til done", l.name)
-       l.pool.Wait()
+       l.Wait()
 
        log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
        ctx, cancel := context.WithCancel(context.Background())
        atTerminate(ctx, cancel)
-       l.pool.CleanUp(ctx)
+       l.CleanUp(ctx)
        cancel()
        log.Trace("LevelQueue: %s Cleaned", l.name)
 
@@ -115,56 +98,45 @@ func (l *LevelQueue) readToChan() {
                select {
                case <-l.closed:
                        // tell the pool to shutdown.
-                       l.pool.cancel()
+                       l.cancel()
                        return
                default:
+                       atomic.AddInt64(&l.numInQueue, 1)
                        bs, err := l.queue.RPop()
                        if err != nil {
                                if err != levelqueue.ErrNotFound {
                                        log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
                                }
+                               atomic.AddInt64(&l.numInQueue, -1)
                                time.Sleep(time.Millisecond * 100)
                                continue
                        }
 
                        if len(bs) == 0 {
+                               atomic.AddInt64(&l.numInQueue, -1)
                                time.Sleep(time.Millisecond * 100)
                                continue
                        }
 
-                       var data Data
-                       if l.exemplar != nil {
-                               t := reflect.TypeOf(l.exemplar)
-                               n := reflect.New(t)
-                               ne := n.Elem()
-                               err = json.Unmarshal(bs, ne.Addr().Interface())
-                               data = ne.Interface().(Data)
-                       } else {
-                               err = json.Unmarshal(bs, &data)
-                       }
+                       data, err := unmarshalAs(bs, l.exemplar)
                        if err != nil {
                                log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
+                               atomic.AddInt64(&l.numInQueue, -1)
                                time.Sleep(time.Millisecond * 100)
                                continue
                        }
 
                        log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
-                       l.pool.Push(data)
-
+                       l.WorkerPool.Push(data)
+                       atomic.AddInt64(&l.numInQueue, -1)
                }
        }
 }
 
 // Push will push the indexer data to queue
 func (l *LevelQueue) Push(data Data) error {
-       if l.exemplar != nil {
-               // Assert data is of same type as r.exemplar
-               value := reflect.ValueOf(data)
-               t := value.Type()
-               exemplarType := reflect.ValueOf(l.exemplar).Type()
-               if !t.AssignableTo(exemplarType) || data == nil {
-                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
-               }
+       if !assignableTo(data, l.exemplar) {
+               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
        }
        bs, err := json.Marshal(data)
        if err != nil {
@@ -173,16 +145,25 @@ func (l *LevelQueue) Push(data Data) error {
        return l.queue.LPush(bs)
 }
 
+// IsEmpty checks whether the queue is empty
+func (l *LevelQueue) IsEmpty() bool {
+       if !l.WorkerPool.IsEmpty() {
+               return false
+       }
+       return l.queue.Len() == 0
+}
+
 // Shutdown this queue and stop processing
 func (l *LevelQueue) Shutdown() {
        l.lock.Lock()
        defer l.lock.Unlock()
-       log.Trace("LevelQueue: %s Shutdown", l.name)
+       log.Trace("LevelQueue: %s Shutting down", l.name)
        select {
        case <-l.closed:
        default:
                close(l.closed)
        }
+       log.Debug("LevelQueue: %s Shutdown", l.name)
 }
 
 // Terminate this queue and close the queue
@@ -196,11 +177,15 @@ func (l *LevelQueue) Terminate() {
        default:
                close(l.terminated)
                l.lock.Unlock()
+               if log.IsDebug() {
+                       log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
+               }
                if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
                        log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
                }
 
        }
+       log.Debug("LevelQueue: %s Terminated", l.name)
 }
 
 // Name returns the name of this queue
index 6bb5a1be979826afdce8857e113182973f26ac35..961187ab0d7a5e43b87fd342960affd7299826bb 100644 (file)
@@ -6,7 +6,9 @@ package queue
 
 import (
        "context"
+       "fmt"
        "sync"
+       "sync/atomic"
        "time"
 
        "code.gitea.io/gitea/modules/log"
@@ -31,8 +33,10 @@ type PersistableChannelQueueConfiguration struct {
 }
 
 // PersistableChannelQueue wraps a channel queue and level queue together
+// The disk level queue will be used to store data at shutdown and terminate - and will be restored
+// on start up.
 type PersistableChannelQueue struct {
-       *ChannelQueue
+       channelQueue *ChannelQueue
        delayedStarter
        lock   sync.Mutex
        closed chan struct{}
@@ -48,14 +52,16 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
        config := configInterface.(PersistableChannelQueueConfiguration)
 
        channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
-               QueueLength:  config.QueueLength,
-               BatchLength:  config.BatchLength,
-               Workers:      config.Workers,
-               MaxWorkers:   config.MaxWorkers,
-               BlockTimeout: config.BlockTimeout,
-               BoostTimeout: config.BoostTimeout,
-               BoostWorkers: config.BoostWorkers,
-               Name:         config.Name + "-channel",
+               WorkerPoolConfiguration: WorkerPoolConfiguration{
+                       QueueLength:  config.QueueLength,
+                       BatchLength:  config.BatchLength,
+                       BlockTimeout: config.BlockTimeout,
+                       BoostTimeout: config.BoostTimeout,
+                       BoostWorkers: config.BoostWorkers,
+                       MaxWorkers:   config.MaxWorkers,
+               },
+               Workers: config.Workers,
+               Name:    config.Name + "-channel",
        }, exemplar)
        if err != nil {
                return nil, err
@@ -63,28 +69,30 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 
        // the level backend only needs temporary workers to catch up with the previously dropped work
        levelCfg := LevelQueueConfiguration{
-               DataDir:      config.DataDir,
-               QueueLength:  config.QueueLength,
-               BatchLength:  config.BatchLength,
-               Workers:      1,
-               MaxWorkers:   6,
-               BlockTimeout: 1 * time.Second,
-               BoostTimeout: 5 * time.Minute,
-               BoostWorkers: 5,
-               Name:         config.Name + "-level",
+               WorkerPoolConfiguration: WorkerPoolConfiguration{
+                       QueueLength:  config.QueueLength,
+                       BatchLength:  config.BatchLength,
+                       BlockTimeout: 1 * time.Second,
+                       BoostTimeout: 5 * time.Minute,
+                       BoostWorkers: 5,
+                       MaxWorkers:   6,
+               },
+               DataDir: config.DataDir,
+               Workers: 1,
+               Name:    config.Name + "-level",
        }
 
        levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
        if err == nil {
                queue := &PersistableChannelQueue{
-                       ChannelQueue: channelQueue.(*ChannelQueue),
+                       channelQueue: channelQueue.(*ChannelQueue),
                        delayedStarter: delayedStarter{
                                internal: levelQueue.(*LevelQueue),
                                name:     config.Name,
                        },
                        closed: make(chan struct{}),
                }
-               _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+               _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
                return queue, nil
        }
        if IsErrInvalidConfiguration(err) {
@@ -93,7 +101,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
        }
 
        queue := &PersistableChannelQueue{
-               ChannelQueue: channelQueue.(*ChannelQueue),
+               channelQueue: channelQueue.(*ChannelQueue),
                delayedStarter: delayedStarter{
                        cfg:         levelCfg,
                        underlying:  LevelQueueType,
@@ -103,7 +111,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
                },
                closed: make(chan struct{}),
        }
-       _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+       _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
        return queue, nil
 }
 
@@ -118,15 +126,17 @@ func (p *PersistableChannelQueue) Push(data Data) error {
        case <-p.closed:
                return p.internal.Push(data)
        default:
-               return p.ChannelQueue.Push(data)
+               return p.channelQueue.Push(data)
        }
 }
 
 // Run starts to run the queue
 func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name)
+
        p.lock.Lock()
        if p.internal == nil {
-               err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
+               err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar)
                p.lock.Unlock()
                if err != nil {
                        log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
@@ -142,31 +152,83 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
        go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
 
        go func() {
-               _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
+               _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0)
        }()
 
        log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
        <-p.closed
        log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
-       p.ChannelQueue.pool.cancel()
-       p.internal.(*LevelQueue).pool.cancel()
+       p.channelQueue.cancel()
+       p.internal.(*LevelQueue).cancel()
        log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
-       p.ChannelQueue.pool.Wait()
-       p.internal.(*LevelQueue).pool.Wait()
+       p.channelQueue.Wait()
+       p.internal.(*LevelQueue).Wait()
        // Redirect all remaining data in the chan to the internal channel
        go func() {
                log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
-               for data := range p.ChannelQueue.pool.dataChan {
+               for data := range p.channelQueue.dataChan {
                        _ = p.internal.Push(data)
+                       atomic.AddInt64(&p.channelQueue.numInQueue, -1)
                }
                log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
        }()
        log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
 }
 
+// Flush flushes the queue and blocks till the queue is empty
+func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
+       var ctx context.Context
+       var cancel context.CancelFunc
+       if timeout > 0 {
+               ctx, cancel = context.WithTimeout(context.Background(), timeout)
+       } else {
+               ctx, cancel = context.WithCancel(context.Background())
+       }
+       defer cancel()
+       return p.FlushWithContext(ctx)
+}
+
+// FlushWithContext flushes the queue and blocks till the queue is empty
+func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
+       errChan := make(chan error, 1)
+       go func() {
+               errChan <- p.channelQueue.FlushWithContext(ctx)
+       }()
+       go func() {
+               p.lock.Lock()
+               if p.internal == nil {
+                       p.lock.Unlock()
+                       errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name())
+                       return
+               }
+               p.lock.Unlock()
+               errChan <- p.internal.FlushWithContext(ctx)
+       }()
+       err1 := <-errChan
+       err2 := <-errChan
+
+       if err1 != nil {
+               return err1
+       }
+       return err2
+}
+
+// IsEmpty checks if a queue is empty
+func (p *PersistableChannelQueue) IsEmpty() bool {
+       if !p.channelQueue.IsEmpty() {
+               return false
+       }
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       if p.internal == nil {
+               return false
+       }
+       return p.internal.IsEmpty()
+}
+
 // Shutdown processing this queue
 func (p *PersistableChannelQueue) Shutdown() {
-       log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
+       log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name)
        select {
        case <-p.closed:
        default:
@@ -177,6 +239,7 @@ func (p *PersistableChannelQueue) Shutdown() {
                }
                close(p.closed)
        }
+       log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
 }
 
 // Terminate this queue and close the queue
@@ -188,6 +251,7 @@ func (p *PersistableChannelQueue) Terminate() {
        if p.internal != nil {
                p.internal.(*LevelQueue).Terminate()
        }
+       log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name)
 }
 
 func init() {
index c5959d606fdd293dd2f89f722f99e96d0437ba26..8600b8d8687ccab41177a539b0aa88f5a86f7b4d 100644 (file)
@@ -32,14 +32,16 @@ func TestLevelQueue(t *testing.T) {
        defer os.RemoveAll(tmpDir)
 
        queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
-               DataDir:      tmpDir,
-               BatchLength:  2,
-               Workers:      1,
-               MaxWorkers:   10,
-               QueueLength:  20,
-               BlockTimeout: 1 * time.Second,
-               BoostTimeout: 5 * time.Minute,
-               BoostWorkers: 5,
+               WorkerPoolConfiguration: WorkerPoolConfiguration{
+                       QueueLength:  20,
+                       BatchLength:  2,
+                       BlockTimeout: 1 * time.Second,
+                       BoostTimeout: 5 * time.Minute,
+                       BoostWorkers: 5,
+                       MaxWorkers:   10,
+               },
+               DataDir: tmpDir,
+               Workers: 1,
        }, &testData{})
        assert.NoError(t, err)
 
@@ -92,14 +94,16 @@ func TestLevelQueue(t *testing.T) {
                WrappedQueueConfiguration{
                        Underlying: LevelQueueType,
                        Config: LevelQueueConfiguration{
-                               DataDir:      tmpDir,
-                               BatchLength:  2,
-                               Workers:      1,
-                               MaxWorkers:   10,
-                               QueueLength:  20,
-                               BlockTimeout: 1 * time.Second,
-                               BoostTimeout: 5 * time.Minute,
-                               BoostWorkers: 5,
+                               WorkerPoolConfiguration: WorkerPoolConfiguration{
+                                       QueueLength:  20,
+                                       BatchLength:  2,
+                                       BlockTimeout: 1 * time.Second,
+                                       BoostTimeout: 5 * time.Minute,
+                                       BoostWorkers: 5,
+                                       MaxWorkers:   10,
+                               },
+                               DataDir: tmpDir,
+                               Workers: 1,
                        },
                }, &testData{})
        assert.NoError(t, err)
index 7d3efb9cff7be7871bde913e87f5221dc5badd68..0167c1ec497cfa15c48a6f397bd5a28fdee78d1f 100644 (file)
@@ -9,9 +9,9 @@ import (
        "encoding/json"
        "errors"
        "fmt"
-       "reflect"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "code.gitea.io/gitea/modules/log"
@@ -25,13 +25,14 @@ const RedisQueueType Type = "redis"
 type redisClient interface {
        RPush(key string, args ...interface{}) *redis.IntCmd
        LPop(key string) *redis.StringCmd
+       LLen(key string) *redis.IntCmd
        Ping() *redis.StatusCmd
        Close() error
 }
 
 // RedisQueue redis queue
 type RedisQueue struct {
-       pool       *WorkerPool
+       *WorkerPool
        client     redisClient
        queueName  string
        closed     chan struct{}
@@ -44,19 +45,14 @@ type RedisQueue struct {
 
 // RedisQueueConfiguration is the configuration for the redis queue
 type RedisQueueConfiguration struct {
-       Network      string
-       Addresses    string
-       Password     string
-       DBIndex      int
-       BatchLength  int
-       QueueLength  int
-       QueueName    string
-       Workers      int
-       MaxWorkers   int
-       BlockTimeout time.Duration
-       BoostTimeout time.Duration
-       BoostWorkers int
-       Name         string
+       WorkerPoolConfiguration
+       Network   string
+       Addresses string
+       Password  string
+       DBIndex   int
+       QueueName string
+       Workers   int
+       Name      string
 }
 
 // NewRedisQueue creates single redis or cluster redis queue
@@ -69,21 +65,8 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
 
        dbs := strings.Split(config.Addresses, ",")
 
-       dataChan := make(chan Data, config.QueueLength)
-       ctx, cancel := context.WithCancel(context.Background())
-
        var queue = &RedisQueue{
-               pool: &WorkerPool{
-                       baseCtx:            ctx,
-                       cancel:             cancel,
-                       batchLength:        config.BatchLength,
-                       handle:             handle,
-                       dataChan:           dataChan,
-                       blockTimeout:       config.BlockTimeout,
-                       boostTimeout:       config.BoostTimeout,
-                       boostWorkers:       config.BoostWorkers,
-                       maxNumberOfWorkers: config.MaxWorkers,
-               },
+               WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
                queueName:  config.QueueName,
                exemplar:   exemplar,
                closed:     make(chan struct{}),
@@ -108,7 +91,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
        if err := queue.client.Ping().Err(); err != nil {
                return nil, err
        }
-       queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool)
+       queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
 
        return queue, nil
 }
@@ -117,9 +100,10 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
 func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
        atShutdown(context.Background(), r.Shutdown)
        atTerminate(context.Background(), r.Terminate)
+       log.Debug("RedisQueue: %s Starting", r.name)
 
        go func() {
-               _ = r.pool.AddWorkers(r.workers, 0)
+               _ = r.AddWorkers(r.workers, 0)
        }()
 
        go r.readToChan()
@@ -127,12 +111,12 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func()))
        log.Trace("RedisQueue: %s Waiting til closed", r.name)
        <-r.closed
        log.Trace("RedisQueue: %s Waiting til done", r.name)
-       r.pool.Wait()
+       r.Wait()
 
        log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
        ctx, cancel := context.WithCancel(context.Background())
        atTerminate(ctx, cancel)
-       r.pool.CleanUp(ctx)
+       r.CleanUp(ctx)
        cancel()
 }
 
@@ -141,53 +125,43 @@ func (r *RedisQueue) readToChan() {
                select {
                case <-r.closed:
                        // tell the pool to shutdown
-                       r.pool.cancel()
+                       r.cancel()
                        return
                default:
+                       atomic.AddInt64(&r.numInQueue, 1)
                        bs, err := r.client.LPop(r.queueName).Bytes()
                        if err != nil && err != redis.Nil {
                                log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
+                               atomic.AddInt64(&r.numInQueue, -1)
                                time.Sleep(time.Millisecond * 100)
                                continue
                        }
 
                        if len(bs) == 0 {
+                               atomic.AddInt64(&r.numInQueue, -1)
                                time.Sleep(time.Millisecond * 100)
                                continue
                        }
 
-                       var data Data
-                       if r.exemplar != nil {
-                               t := reflect.TypeOf(r.exemplar)
-                               n := reflect.New(t)
-                               ne := n.Elem()
-                               err = json.Unmarshal(bs, ne.Addr().Interface())
-                               data = ne.Interface().(Data)
-                       } else {
-                               err = json.Unmarshal(bs, &data)
-                       }
+                       data, err := unmarshalAs(bs, r.exemplar)
                        if err != nil {
                                log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
+                               atomic.AddInt64(&r.numInQueue, -1)
                                time.Sleep(time.Millisecond * 100)
                                continue
                        }
 
                        log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
-                       r.pool.Push(data)
+                       r.WorkerPool.Push(data)
+                       atomic.AddInt64(&r.numInQueue, -1)
                }
        }
 }
 
 // Push implements Queue
 func (r *RedisQueue) Push(data Data) error {
-       if r.exemplar != nil {
-               // Assert data is of same type as r.exemplar
-               value := reflect.ValueOf(data)
-               t := value.Type()
-               exemplarType := reflect.ValueOf(r.exemplar).Type()
-               if !t.AssignableTo(exemplarType) || data == nil {
-                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
-               }
+       if !assignableTo(data, r.exemplar) {
+               return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
        }
        bs, err := json.Marshal(data)
        if err != nil {
@@ -196,9 +170,22 @@ func (r *RedisQueue) Push(data Data) error {
        return r.client.RPush(r.queueName, bs).Err()
 }
 
+// IsEmpty checks if the queue is empty
+func (r *RedisQueue) IsEmpty() bool {
+       if !r.WorkerPool.IsEmpty() {
+               return false
+       }
+       length, err := r.client.LLen(r.queueName).Result()
+       if err != nil {
+               log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err)
+               return false
+       }
+       return length == 0
+}
+
 // Shutdown processing from this queue
 func (r *RedisQueue) Shutdown() {
-       log.Trace("Shutdown: %s", r.name)
+       log.Trace("RedisQueue: %s Shutting down", r.name)
        r.lock.Lock()
        select {
        case <-r.closed:
@@ -206,11 +193,12 @@ func (r *RedisQueue) Shutdown() {
                close(r.closed)
        }
        r.lock.Unlock()
+       log.Debug("RedisQueue: %s Shutdown", r.name)
 }
 
 // Terminate this queue and close the queue
 func (r *RedisQueue) Terminate() {
-       log.Trace("Terminating: %s", r.name)
+       log.Trace("RedisQueue: %s Terminating", r.name)
        r.Shutdown()
        r.lock.Lock()
        select {
@@ -219,10 +207,14 @@ func (r *RedisQueue) Terminate() {
        default:
                close(r.terminated)
                r.lock.Unlock()
+               if log.IsDebug() {
+                       log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName))
+               }
                if err := r.client.Close(); err != nil {
                        log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
                }
        }
+       log.Debug("RedisQueue: %s Terminated", r.name)
 }
 
 // Name returns the name of this queue
index c52e6e467360e041c414fad41de47077392ab071..ef90d1860823300bb6ce64c5e921b8a0ad71bd79 100644 (file)
@@ -7,8 +7,8 @@ package queue
 import (
        "context"
        "fmt"
-       "reflect"
        "sync"
+       "sync/atomic"
        "time"
 
        "code.gitea.io/gitea/modules/log"
@@ -56,7 +56,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
        for q.internal == nil {
                select {
                case <-ctx.Done():
-                       return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name)
+                       return fmt.Errorf("Timedout creating queue %v with cfg %s in %s", q.underlying, q.cfg, q.name)
                default:
                        queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
                        if err == nil {
@@ -64,11 +64,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
                                break
                        }
                        if err.Error() != "resource temporarily unavailable" {
-                               log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err)
+                               log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v", i, q.underlying, q.name, q.cfg, err)
                        }
                        i++
                        if q.maxAttempts > 0 && i > q.maxAttempts {
-                               return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+                               return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
                        }
                        sleepTime := 100 * time.Millisecond
                        if q.timeout > 0 && q.maxAttempts > 0 {
@@ -88,10 +88,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
 // WrappedQueue wraps a delayed starting queue
 type WrappedQueue struct {
        delayedStarter
-       lock     sync.Mutex
-       handle   HandlerFunc
-       exemplar interface{}
-       channel  chan Data
+       lock       sync.Mutex
+       handle     HandlerFunc
+       exemplar   interface{}
+       channel    chan Data
+       numInQueue int64
 }
 
 // NewWrappedQueue will attempt to create a queue of the provided type,
@@ -127,7 +128,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
                        name:        config.Name,
                },
        }
-       _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil)
+       _ = GetManager().Add(queue, WrappedQueueType, config, exemplar)
        return queue, nil
 }
 
@@ -138,21 +139,78 @@ func (q *WrappedQueue) Name() string {
 
 // Push will push the data to the internal channel checking it against the exemplar
 func (q *WrappedQueue) Push(data Data) error {
-       if q.exemplar != nil {
-               // Assert data is of same type as r.exemplar
-               value := reflect.ValueOf(data)
-               t := value.Type()
-               exemplarType := reflect.ValueOf(q.exemplar).Type()
-               if !t.AssignableTo(exemplarType) || data == nil {
-                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
-               }
+       if !assignableTo(data, q.exemplar) {
+               return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
        }
+       atomic.AddInt64(&q.numInQueue, 1)
        q.channel <- data
        return nil
 }
 
+func (q *WrappedQueue) flushInternalWithContext(ctx context.Context) error {
+       q.lock.Lock()
+       if q.internal == nil {
+               q.lock.Unlock()
+               return fmt.Errorf("not ready to flush wrapped queue %s yet", q.Name())
+       }
+       q.lock.Unlock()
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       default:
+       }
+       return q.internal.FlushWithContext(ctx)
+}
+
+// Flush flushes the queue and blocks till the queue is empty
+func (q *WrappedQueue) Flush(timeout time.Duration) error {
+       var ctx context.Context
+       var cancel context.CancelFunc
+       if timeout > 0 {
+               ctx, cancel = context.WithTimeout(context.Background(), timeout)
+       } else {
+               ctx, cancel = context.WithCancel(context.Background())
+       }
+       defer cancel()
+       return q.FlushWithContext(ctx)
+}
+
+// FlushWithContext implements the final part of Flushable
+func (q *WrappedQueue) FlushWithContext(ctx context.Context) error {
+       log.Trace("WrappedQueue: %s FlushWithContext", q.Name())
+       errChan := make(chan error, 1)
+       go func() {
+               errChan <- q.flushInternalWithContext(ctx)
+               close(errChan)
+       }()
+
+       select {
+       case err := <-errChan:
+               return err
+       case <-ctx.Done():
+               go func() {
+                       <-errChan
+               }()
+               return ctx.Err()
+       }
+}
+
+// IsEmpty checks whether the queue is empty
+func (q *WrappedQueue) IsEmpty() bool {
+       if atomic.LoadInt64(&q.numInQueue) != 0 {
+               return false
+       }
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       if q.internal == nil {
+               return false
+       }
+       return q.internal.IsEmpty()
+}
+
 // Run starts to run the queue and attempts to create the internal queue
 func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       log.Debug("WrappedQueue: %s Starting", q.name)
        q.lock.Lock()
        if q.internal == nil {
                err := q.setInternal(atShutdown, q.handle, q.exemplar)
@@ -164,6 +222,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())
                go func() {
                        for data := range q.channel {
                                _ = q.internal.Push(data)
+                               atomic.AddInt64(&q.numInQueue, -1)
                        }
                }()
        } else {
@@ -176,7 +235,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())
 
 // Shutdown this queue and stop processing
 func (q *WrappedQueue) Shutdown() {
-       log.Trace("WrappedQueue: %s Shutdown", q.name)
+       log.Trace("WrappedQueue: %s Shutting down", q.name)
        q.lock.Lock()
        defer q.lock.Unlock()
        if q.internal == nil {
@@ -185,6 +244,7 @@ func (q *WrappedQueue) Shutdown() {
        if shutdownable, ok := q.internal.(Shutdownable); ok {
                shutdownable.Shutdown()
        }
+       log.Debug("WrappedQueue: %s Shutdown", q.name)
 }
 
 // Terminate this queue and close the queue
@@ -198,6 +258,7 @@ func (q *WrappedQueue) Terminate() {
        if shutdownable, ok := q.internal.(Shutdownable); ok {
                shutdownable.Terminate()
        }
+       log.Debug("WrappedQueue: %s Terminated", q.name)
 }
 
 func init() {
index d5a6b41882a05f71a2b84f5d428aed073725e2de..8760c09ae88a22ded407db6a202f2a7df254f4e6 100644 (file)
@@ -24,8 +24,7 @@ func validType(t string) (Type, error) {
        return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
 }
 
-// CreateQueue for name with provided handler and exemplar
-func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
+func getQueueSettings(name string) (setting.QueueSettings, []byte) {
        q := setting.GetQueueSettings(name)
        opts := make(map[string]interface{})
        opts["Name"] = name
@@ -43,24 +42,33 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
        opts["BoostTimeout"] = q.BoostTimeout
        opts["BoostWorkers"] = q.BoostWorkers
 
-       typ, err := validType(q.Type)
-       if err != nil {
-               log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
-       }
-
        cfg, err := json.Marshal(opts)
        if err != nil {
                log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
                log.Error("Unable to create queue for %s", name, err)
+               return q, []byte{}
+       }
+       return q, cfg
+}
+
+// CreateQueue for name with provided handler and exemplar
+func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
+       q, cfg := getQueueSettings(name)
+       if len(cfg) == 0 {
                return nil
        }
 
+       typ, err := validType(q.Type)
+       if err != nil {
+               log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+       }
+
        returnable, err := NewQueue(typ, handle, cfg, exemplar)
        if q.WrapIfNecessary && err != nil {
                log.Warn("Unable to create queue for %s: %v", name, err)
                log.Warn("Attempting to create wrapped queue")
                returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{
-                       Underlying:  Type(q.Type),
+                       Underlying:  typ,
                        Timeout:     q.Timeout,
                        MaxAttempts: q.MaxAttempts,
                        Config:      cfg,
index 25fc7dd644254075e847e0650b1d464372866ff8..63ec897481a1c5b93d46b6e8405c393fcdad58f9 100644 (file)
@@ -7,12 +7,16 @@ package queue
 import (
        "context"
        "sync"
+       "sync/atomic"
        "time"
 
        "code.gitea.io/gitea/modules/log"
 )
 
-// WorkerPool takes
+// WorkerPool represent a dynamically growable worker pool for a
+// provided handler function. They have an internal channel which
+// they use to detect if there is a block and will grow and shrink in
+// response to demand as per configuration.
 type WorkerPool struct {
        lock               sync.Mutex
        baseCtx            context.Context
@@ -27,10 +31,42 @@ type WorkerPool struct {
        blockTimeout       time.Duration
        boostTimeout       time.Duration
        boostWorkers       int
+       numInQueue         int64
+}
+
+// WorkerPoolConfiguration is the basic configuration for a WorkerPool
+type WorkerPoolConfiguration struct {
+       QueueLength  int
+       BatchLength  int
+       BlockTimeout time.Duration
+       BoostTimeout time.Duration
+       BoostWorkers int
+       MaxWorkers   int
+}
+
+// NewWorkerPool creates a new worker pool
+func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
+       ctx, cancel := context.WithCancel(context.Background())
+
+       dataChan := make(chan Data, config.QueueLength)
+       pool := &WorkerPool{
+               baseCtx:            ctx,
+               cancel:             cancel,
+               batchLength:        config.BatchLength,
+               dataChan:           dataChan,
+               handle:             handle,
+               blockTimeout:       config.BlockTimeout,
+               boostTimeout:       config.BoostTimeout,
+               boostWorkers:       config.BoostWorkers,
+               maxNumberOfWorkers: config.MaxWorkers,
+       }
+
+       return pool
 }
 
 // Push pushes the data to the internal channel
 func (p *WorkerPool) Push(data Data) {
+       atomic.AddInt64(&p.numInQueue, 1)
        p.lock.Lock()
        if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
                p.lock.Unlock()
@@ -80,7 +116,7 @@ func (p *WorkerPool) pushBoost(data Data) {
                                log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
 
                                start := time.Now()
-                               pid := mq.RegisterWorkers(boost, start, false, start, cancel)
+                               pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
                                go func() {
                                        <-ctx.Done()
                                        mq.RemoveWorkers(pid)
@@ -138,8 +174,8 @@ func (p *WorkerPool) BlockTimeout() time.Duration {
        return p.blockTimeout
 }
 
-// SetSettings sets the setable boost values
-func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+// SetPoolSettings sets the setable boost values
+func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
        p.lock.Lock()
        defer p.lock.Unlock()
        p.maxNumberOfWorkers = maxNumberOfWorkers
@@ -156,8 +192,7 @@ func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
        p.maxNumberOfWorkers = newMax
 }
 
-// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
-func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) {
        var ctx context.Context
        var cancel context.CancelFunc
        start := time.Now()
@@ -173,7 +208,7 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
 
        mq := GetManager().GetManagedQueue(p.qid)
        if mq != nil {
-               pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
+               pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
                go func() {
                        <-ctx.Done()
                        mq.RemoveWorkers(pid)
@@ -184,6 +219,12 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
                log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
 
        }
+       return ctx, cancel
+}
+
+// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
+func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+       ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
        p.addWorkers(ctx, number)
        return cancel
 }
@@ -235,6 +276,7 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
        close(p.dataChan)
        for data := range p.dataChan {
                p.handle(data)
+               atomic.AddInt64(&p.numInQueue, -1)
                select {
                case <-ctx.Done():
                        log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
@@ -245,6 +287,37 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
        log.Trace("WorkerPool: %d CleanUp Done", p.qid)
 }
 
+// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
+func (p *WorkerPool) Flush(timeout time.Duration) error {
+       ctx, cancel := p.commonRegisterWorkers(1, timeout, true)
+       defer cancel()
+       return p.FlushWithContext(ctx)
+}
+
+// IsEmpty returns if true if the worker queue is empty
+func (p *WorkerPool) IsEmpty() bool {
+       return atomic.LoadInt64(&p.numInQueue) == 0
+}
+
+// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
+// NB: The worker will not be registered with the manager.
+func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
+       log.Trace("WorkerPool: %d Flush", p.qid)
+       for {
+               select {
+               case data := <-p.dataChan:
+                       p.handle(data)
+                       atomic.AddInt64(&p.numInQueue, -1)
+               case <-p.baseCtx.Done():
+                       return p.baseCtx.Err()
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+                       return nil
+               }
+       }
+}
+
 func (p *WorkerPool) doWork(ctx context.Context) {
        delay := time.Millisecond * 300
        var data = make([]Data, 0, p.batchLength)
@@ -254,6 +327,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                        if len(data) > 0 {
                                log.Trace("Handling: %d data, %v", len(data), data)
                                p.handle(data...)
+                               atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                        }
                        log.Trace("Worker shutting down")
                        return
@@ -263,6 +337,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                                if len(data) > 0 {
                                        log.Trace("Handling: %d data, %v", len(data), data)
                                        p.handle(data...)
+                                       atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                                }
                                log.Trace("Worker shutting down")
                                return
@@ -271,6 +346,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                        if len(data) >= p.batchLength {
                                log.Trace("Handling: %d data, %v", len(data), data)
                                p.handle(data...)
+                               atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                                data = make([]Data, 0, p.batchLength)
                        }
                default:
@@ -286,6 +362,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                                if len(data) > 0 {
                                        log.Trace("Handling: %d data, %v", len(data), data)
                                        p.handle(data...)
+                                       atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                                }
                                log.Trace("Worker shutting down")
                                return
@@ -301,6 +378,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                                        if len(data) > 0 {
                                                log.Trace("Handling: %d data, %v", len(data), data)
                                                p.handle(data...)
+                                               atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                                        }
                                        log.Trace("Worker shutting down")
                                        return
@@ -309,6 +387,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                                if len(data) >= p.batchLength {
                                        log.Trace("Handling: %d data, %v", len(data), data)
                                        p.handle(data...)
+                                       atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                                        data = make([]Data, 0, p.batchLength)
                                }
                        case <-timer.C:
@@ -316,6 +395,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
                                if len(data) > 0 {
                                        log.Trace("Handling: %d data, %v", len(data), data)
                                        p.handle(data...)
+                                       atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
                                        data = make([]Data, 0, p.batchLength)
                                }
 
index 8c0768585519f22fcc9cd5274b18d8de6df60246..934c5a8108dc42b0de5a0ce79a3931b97835faa0 100644 (file)
@@ -59,7 +59,7 @@ func GetQueueSettings(name string) QueueSettings {
        if !filepath.IsAbs(q.DataDir) {
                q.DataDir = filepath.Join(AppDataPath, q.DataDir)
        }
-       sec.Key("DATADIR").SetValue(q.DataDir)
+       _, _ = sec.NewKey("DATADIR", q.DataDir)
        // The rest are...
        q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
        q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
@@ -89,7 +89,7 @@ func NewQueueService() {
        Queue.Length = sec.Key("LENGTH").MustInt(20)
        Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
        Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
-       Queue.Type = sec.Key("TYPE").MustString("")
+       Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
        Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
        Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
        Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
@@ -110,27 +110,27 @@ func NewQueueService() {
        if _, ok := sectionMap["TYPE"]; !ok {
                switch Indexer.IssueQueueType {
                case LevelQueueType:
-                       section.Key("TYPE").SetValue("level")
+                       _, _ = section.NewKey("TYPE", "level")
                case ChannelQueueType:
-                       section.Key("TYPE").SetValue("persistable-channel")
+                       _, _ = section.NewKey("TYPE", "persistable-channel")
                case RedisQueueType:
-                       section.Key("TYPE").SetValue("redis")
+                       _, _ = section.NewKey("TYPE", "redis")
                default:
                        log.Fatal("Unsupported indexer queue type: %v",
                                Indexer.IssueQueueType)
                }
        }
        if _, ok := sectionMap["LENGTH"]; !ok {
-               section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
+               _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
        }
        if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
-               section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
+               _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
        }
        if _, ok := sectionMap["DATADIR"]; !ok {
-               section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
+               _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
        }
        if _, ok := sectionMap["CONN_STR"]; !ok {
-               section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
+               _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
        }
 
        // Handle the old mailer configuration
@@ -140,7 +140,7 @@ func NewQueueService() {
                sectionMap[key.Name()] = true
        }
        if _, ok := sectionMap["LENGTH"]; !ok {
-               section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
+               _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
        }
 }
 
index a7d8b97d3de5a4f755e5b1bb2900c0cc1a292b04..7eb86b140e3d19f43a36443529b780afc00abbaa 100644 (file)
@@ -2083,6 +2083,10 @@ monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers
 monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout
 monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero
 monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0
+monitor.queue.pool.flush.title = Flush Queue
+monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out.
+monitor.queue.pool.flush.submit = Add Flush Worker
+monitor.queue.pool.flush.added = Flush Worker added for %[1]s
 
 monitor.queue.settings.title = Pool Settings
 monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups.
index 71a22e1f9eca73e94b7a619876d6ebd90e6d0cbc..56e5d321b307614b57fddffe9b296c2c116bb7f9 100644 (file)
@@ -404,6 +404,28 @@ func WorkerCancel(ctx *context.Context) {
        })
 }
 
+// Flush flushes a queue
+func Flush(ctx *context.Context) {
+       qid := ctx.ParamsInt64("qid")
+       mq := queue.GetManager().GetManagedQueue(qid)
+       if mq == nil {
+               ctx.Status(404)
+               return
+       }
+       timeout, err := time.ParseDuration(ctx.Query("timeout"))
+       if err != nil {
+               timeout = -1
+       }
+       ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.flush.added", mq.Name))
+       go func() {
+               err := mq.Flush(timeout)
+               if err != nil {
+                       log.Error("Flushing failure for %s: Error %v", mq.Name, err)
+               }
+       }()
+       ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+}
+
 // AddWorkers adds workers to a worker group
 func AddWorkers(ctx *context.Context) {
        qid := ctx.ParamsInt64("qid")
@@ -424,7 +446,7 @@ func AddWorkers(ctx *context.Context) {
                ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
                return
        }
-       if mq.Pool == nil {
+       if _, ok := mq.Managed.(queue.ManagedPool); !ok {
                ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
                ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
                return
@@ -442,7 +464,7 @@ func SetQueueSettings(ctx *context.Context) {
                ctx.Status(404)
                return
        }
-       if mq.Pool == nil {
+       if _, ok := mq.Managed.(queue.ManagedPool); !ok {
                ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
                ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
                return
@@ -488,10 +510,10 @@ func SetQueueSettings(ctx *context.Context) {
                        return
                }
        } else {
-               timeout = mq.Pool.BoostTimeout()
+               timeout = mq.BoostTimeout()
        }
 
-       mq.SetSettings(maxNumber, number, timeout)
+       mq.SetPoolSettings(maxNumber, number, timeout)
        ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
        ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
 }
index 913a52e404278d7afd0907b691bce8a35420a19a..b68a96c6412d59ad9c054f522ce00ec87f192cfb 100644 (file)
@@ -89,5 +89,9 @@ func RegisterRoutes(m *macaron.Macaron) {
                m.Post("/hook/set-default-branch/:owner/:repo/:branch", SetDefaultBranch)
                m.Get("/serv/none/:keyid", ServNoCommand)
                m.Get("/serv/command/:keyid/:owner/:repo", ServCommand)
+               m.Post("/manager/shutdown", Shutdown)
+               m.Post("/manager/restart", Restart)
+               m.Post("/manager/flush-queues", bind(private.FlushOptions{}), FlushQueues)
+
        }, CheckInternalToken)
 }
diff --git a/routers/private/manager.go b/routers/private/manager.go
new file mode 100644 (file)
index 0000000..1238ff2
--- /dev/null
@@ -0,0 +1,41 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package private
+
+import (
+       "net/http"
+
+       "code.gitea.io/gitea/modules/graceful"
+       "code.gitea.io/gitea/modules/log"
+       "code.gitea.io/gitea/modules/private"
+       "code.gitea.io/gitea/modules/queue"
+
+       "gitea.com/macaron/macaron"
+)
+
+// FlushQueues flushes all the Queues
+func FlushQueues(ctx *macaron.Context, opts private.FlushOptions) {
+       if opts.NonBlocking {
+               // Save the hammer ctx here - as a new one is created each time you call this.
+               baseCtx := graceful.GetManager().HammerContext()
+               go func() {
+                       err := queue.GetManager().FlushAll(baseCtx, opts.Timeout)
+                       if err != nil {
+                               log.Error("Flushing request timed-out with error: %v", err)
+                       }
+               }()
+               ctx.JSON(http.StatusAccepted, map[string]interface{}{
+                       "err": "Flushing",
+               })
+               return
+       }
+       err := queue.GetManager().FlushAll(ctx.Req.Request.Context(), opts.Timeout)
+       if err != nil {
+               ctx.JSON(http.StatusRequestTimeout, map[string]interface{}{
+                       "err": err,
+               })
+       }
+       ctx.PlainText(http.StatusOK, []byte("success"))
+}
diff --git a/routers/private/manager_unix.go b/routers/private/manager_unix.go
new file mode 100644 (file)
index 0000000..ec5e976
--- /dev/null
@@ -0,0 +1,28 @@
+// +build !windows
+
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package private
+
+import (
+       "net/http"
+
+       "code.gitea.io/gitea/modules/graceful"
+
+       "gitea.com/macaron/macaron"
+)
+
+// Restart causes the server to perform a graceful restart
+func Restart(ctx *macaron.Context) {
+       graceful.GetManager().DoGracefulRestart()
+       ctx.PlainText(http.StatusOK, []byte("success"))
+
+}
+
+// Shutdown causes the server to perform a graceful shutdown
+func Shutdown(ctx *macaron.Context) {
+       graceful.GetManager().DoGracefulShutdown()
+       ctx.PlainText(http.StatusOK, []byte("success"))
+}
diff --git a/routers/private/manager_windows.go b/routers/private/manager_windows.go
new file mode 100644 (file)
index 0000000..ac840a9
--- /dev/null
@@ -0,0 +1,28 @@
+// +build windows
+
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package private
+
+import (
+       "net/http"
+
+       "code.gitea.io/gitea/modules/graceful"
+
+       "gitea.com/macaron/macaron"
+)
+
+// Restart is not implemented for Windows based servers as they can't fork
+func Restart(ctx *macaron.Context) {
+       ctx.JSON(http.StatusNotImplemented, map[string]interface{}{
+               "err": "windows servers cannot be gracefully restarted - shutdown and restart manually",
+       })
+}
+
+// Shutdown causes the server to perform a graceful shutdown
+func Shutdown(ctx *macaron.Context) {
+       graceful.GetManager().DoGracefulShutdown()
+       ctx.PlainText(http.StatusOK, []byte("success"))
+}
index 815d78bb2f5441ae4be7c312cffdee3ce7a95938..693f33fddb81db69f2762ecf893619e41226c0d3 100644 (file)
@@ -423,6 +423,7 @@ func RegisterRoutes(m *macaron.Macaron) {
                                m.Post("/set", admin.SetQueueSettings)
                                m.Post("/add", admin.AddWorkers)
                                m.Post("/cancel/:pid", admin.WorkerCancel)
+                               m.Post("/flush", admin.Flush)
                        })
                })
 
index 4f422210e756ef81657a544c4a852cc47d49cc71..567e72e03c6254d3a5a7cec7da2d09dffb05737f 100644 (file)
                                </div>
                        </form>
                </div>
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue.pool.flush.title"}}
+               </h4>
+               <div class="ui attached segment">
+                       <p>{{.i18n.Tr "admin.monitor.queue.pool.flush.desc"}}</p>
+                       <form method="POST" action="{{.Link}}/flush">
+                               {{$.CsrfTokenHtml}}
+                               <div class="ui form">
+                                       <div class="fields">
+                                               <div class="field">
+                                                       <label>{{.i18n.Tr "admin.monitor.queue.pool.timeout"}}</label>
+                                                       <input name="timeout" type="text" placeholder="{{.i18n.Tr "admin.monitor.queue.pool.addworkers.timeout.placeholder"}}">
+                                               </div>
+                                       </div>
+                                       <button class="ui submit button">{{.i18n.Tr "admin.monitor.queue.pool.flush.submit"}}</button>
+                               </div>
+                       </form>
+               </div>
                <h4 class="ui top attached header">
                        {{.i18n.Tr "admin.monitor.queue.pool.workers.title"}}
                </h4>
                                <tbody>
                                        {{range .Queue.Workers}}
                                        <tr>
-                                               <td>{{.Workers}}</td>
+                                               <td>{{.Workers}}{{if .IsFlusher}}<i class="icon sync-alternate" title="{{.i18n.Tr "admin.monitor.queue.flush"}}"></i>{{end}}</td>
                                                <td>{{DateFmtLong .Start}}</td>
                                                <td>{{if .HasTimeout}}{{DateFmtLong .Timeout}}{{else}}-{{end}}</td>
                                                <td>