summaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/queue_channel.go
blob: bd92f6b7b114c3f473f90bbd26bbcbc892f8b081 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Copyright 2018 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package issues

import (
	"time"

	"code.gitea.io/gitea/modules/setting"
)

// ChannelQueue implements
type ChannelQueue struct {
	queue       chan *IndexerData
	indexer     Indexer
	batchNumber int
}

// NewChannelQueue create a memory channel queue
func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
	return &ChannelQueue{
		queue:       make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
		indexer:     indexer,
		batchNumber: batchNumber,
	}
}

// Run starts to run the queue
func (c *ChannelQueue) Run() error {
	var i int
	var datas = make([]*IndexerData, 0, c.batchNumber)
	for {
		select {
		case data := <-c.queue:
			if data.IsDelete {
				c.indexer.Delete(data.IDs...)
				continue
			}

			datas = append(datas, data)
			if len(datas) >= c.batchNumber {
				c.indexer.Index(datas)
				// TODO: save the point
				datas = make([]*IndexerData, 0, c.batchNumber)
			}
		case <-time.After(time.Millisecond * 100):
			i++
			if i >= 3 && len(datas) > 0 {
				c.indexer.Index(datas)
				// TODO: save the point
				datas = make([]*IndexerData, 0, c.batchNumber)
			}
		}
	}
}

// Push will push the indexer data to queue
func (c *ChannelQueue) Push(data *IndexerData) error {
	c.queue <- data
	return nil
}