summaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/queue_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/indexer/issues/queue_channel.go')
-rw-r--r--modules/indexer/issues/queue_channel.go56
1 files changed, 56 insertions, 0 deletions
diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go
new file mode 100644
index 0000000000..99a90ad499
--- /dev/null
+++ b/modules/indexer/issues/queue_channel.go
@@ -0,0 +1,56 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "time"
+
+ "code.gitea.io/gitea/modules/setting"
+)
+
+// ChannelQueue implements
+type ChannelQueue struct {
+ queue chan *IndexerData
+ indexer Indexer
+ batchNumber int
+}
+
+// NewChannelQueue create a memory channel queue
+func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
+ return &ChannelQueue{
+ queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
+ indexer: indexer,
+ batchNumber: batchNumber,
+ }
+}
+
+// Run starts to run the queue
+func (c *ChannelQueue) Run() error {
+ var i int
+ var datas = make([]*IndexerData, 0, c.batchNumber)
+ for {
+ select {
+ case data := <-c.queue:
+ datas = append(datas, data)
+ if len(datas) >= c.batchNumber {
+ c.indexer.Index(datas)
+ // TODO: save the point
+ datas = make([]*IndexerData, 0, c.batchNumber)
+ }
+ case <-time.After(time.Millisecond * 100):
+ i++
+ if i >= 3 && len(datas) > 0 {
+ c.indexer.Index(datas)
+ // TODO: save the point
+ datas = make([]*IndexerData, 0, c.batchNumber)
+ }
+ }
+ }
+}
+
+// Push will push the indexer data to queue
+func (c *ChannelQueue) Push(data *IndexerData) {
+ c.queue <- data
+}