diff options
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/manager.go | 6 | ||||
-rw-r--r-- | modules/queue/queue.go | 5 | ||||
-rw-r--r-- | modules/queue/queue_bytefifo.go | 8 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel.go | 1 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 1 | ||||
-rw-r--r-- | modules/queue/queue_wrapped.go | 2 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk_channel.go | 1 | ||||
-rw-r--r-- | modules/queue/workerpool.go | 2 |
8 files changed, 11 insertions, 15 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 23e96155a9..e0384d15a3 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -109,8 +109,8 @@ func GetManager() *Manager { func (m *Manager) Add(managed interface{}, t Type, configuration, - exemplar interface{}) int64 { - + exemplar interface{}, +) int64 { cfg, _ := json.Marshal(configuration) mq := &ManagedQueue{ Type: t, @@ -141,7 +141,6 @@ func (m *Manager) Remove(qid int64) { delete(m.Queues, qid) m.mutex.Unlock() log.Trace("Queue Manager removed: QID: %d", qid) - } // GetManagedQueue by qid @@ -225,7 +224,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Wait() } return nil - } // ManagedQueues returns the managed queues diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 7159048c11..80a9f1f2c7 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -39,7 +39,7 @@ type Data interface{} type HandlerFunc func(...Data) // NewQueueFunc is a function that creates a queue -type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error) +type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error) // Shutdownable represents a queue that can be shutdown type Shutdownable interface { @@ -70,8 +70,7 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro } // DummyQueue represents an empty queue -type DummyQueue struct { -} +type DummyQueue struct{} // Run does nothing func (*DummyQueue) Run(_, _ func(func())) {} diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index edde47a62d..c4d5d20a89 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -195,9 +195,11 @@ loop: } } -var errQueueEmpty = fmt.Errorf("empty queue") -var errEmptyBytes = fmt.Errorf("empty bytes") -var errUnmarshal = fmt.Errorf("failed to unmarshal") +var ( + errQueueEmpty = fmt.Errorf("empty queue") + errEmptyBytes = fmt.Errorf("empty bytes") + errUnmarshal = fmt.Errorf("failed to unmarshal") +) func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index c3a1c5781e..f3cd132d7d 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { q.internal.(*LevelQueue).Shutdown() GetManager().Remove(q.internal.(*LevelQueue).qid) } - } // Flush flushes the queue and blocks till the queue is empty diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index c90d715a73..db12d9575c 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) { for _, callback := range callbacks { callback() } - } diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index ec30ab0281..edb589338a 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -55,7 +55,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc for q.internal == nil { select { case <-ctx.Done(): - var cfg = q.cfg + cfg := q.cfg if s, ok := cfg.([]byte); ok { cfg = string(s) } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index af42c0913d..d71f5e2b04 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -197,7 +197,6 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) q.internal.(*LevelUniqueQueue).Shutdown() GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) } - } // Flush flushes the queue diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0176e2e0b2..653d0558c8 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -341,7 +341,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 - var data = make([]Data, 0, p.batchLength) + data := make([]Data, 0, p.batchLength) for { select { case <-ctx.Done(): |