summaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/manager.go6
-rw-r--r--modules/queue/queue.go5
-rw-r--r--modules/queue/queue_bytefifo.go8
-rw-r--r--modules/queue/queue_disk_channel.go1
-rw-r--r--modules/queue/queue_disk_channel_test.go1
-rw-r--r--modules/queue/queue_wrapped.go2
-rw-r--r--modules/queue/unique_queue_disk_channel.go1
-rw-r--r--modules/queue/workerpool.go2
8 files changed, 11 insertions, 15 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 23e96155a9..e0384d15a3 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -109,8 +109,8 @@ func GetManager() *Manager {
func (m *Manager) Add(managed interface{},
t Type,
configuration,
- exemplar interface{}) int64 {
-
+ exemplar interface{},
+) int64 {
cfg, _ := json.Marshal(configuration)
mq := &ManagedQueue{
Type: t,
@@ -141,7 +141,6 @@ func (m *Manager) Remove(qid int64) {
delete(m.Queues, qid)
m.mutex.Unlock()
log.Trace("Queue Manager removed: QID: %d", qid)
-
}
// GetManagedQueue by qid
@@ -225,7 +224,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Wait()
}
return nil
-
}
// ManagedQueues returns the managed queues
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index 7159048c11..80a9f1f2c7 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -39,7 +39,7 @@ type Data interface{}
type HandlerFunc func(...Data)
// NewQueueFunc is a function that creates a queue
-type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
+type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
// Shutdownable represents a queue that can be shutdown
type Shutdownable interface {
@@ -70,8 +70,7 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro
}
// DummyQueue represents an empty queue
-type DummyQueue struct {
-}
+type DummyQueue struct{}
// Run does nothing
func (*DummyQueue) Run(_, _ func(func())) {}
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index edde47a62d..c4d5d20a89 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -195,9 +195,11 @@ loop:
}
}
-var errQueueEmpty = fmt.Errorf("empty queue")
-var errEmptyBytes = fmt.Errorf("empty bytes")
-var errUnmarshal = fmt.Errorf("failed to unmarshal")
+var (
+ errQueueEmpty = fmt.Errorf("empty queue")
+ errEmptyBytes = fmt.Errorf("empty bytes")
+ errUnmarshal = fmt.Errorf("failed to unmarshal")
+)
func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock()
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index c3a1c5781e..f3cd132d7d 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
q.internal.(*LevelQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelQueue).qid)
}
-
}
// Flush flushes the queue and blocks till the queue is empty
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index c90d715a73..db12d9575c 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) {
for _, callback := range callbacks {
callback()
}
-
}
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
index ec30ab0281..edb589338a 100644
--- a/modules/queue/queue_wrapped.go
+++ b/modules/queue/queue_wrapped.go
@@ -55,7 +55,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
for q.internal == nil {
select {
case <-ctx.Done():
- var cfg = q.cfg
+ cfg := q.cfg
if s, ok := cfg.([]byte); ok {
cfg = string(s)
}
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index af42c0913d..d71f5e2b04 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -197,7 +197,6 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
}
-
}
// Flush flushes the queue
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 0176e2e0b2..653d0558c8 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -341,7 +341,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
func (p *WorkerPool) doWork(ctx context.Context) {
delay := time.Millisecond * 300
- var data = make([]Data, 0, p.batchLength)
+ data := make([]Data, 0, p.batchLength)
for {
select {
case <-ctx.Done():