aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2019-02-19 22:39:39 +0800
committertechknowlogick <matti@mdranta.net>2019-02-19 09:39:39 -0500
commit830ae614560b0c504c00d693b63d9889bac1a2d8 (patch)
tree5fd933f8124f4dd30d0215def2a7bcc0181573be
parent094263db4d9f1b53c4b4c021005eec07baddd253 (diff)
downloadgitea-830ae614560b0c504c00d693b63d9889bac1a2d8.tar.gz
gitea-830ae614560b0c504c00d693b63d9889bac1a2d8.zip
Refactor issue indexer (#5363)
-rw-r--r--Gopkg.lock9
-rw-r--r--models/issue.go18
-rw-r--r--models/issue_comment.go2
-rw-r--r--models/issue_indexer.go166
-rw-r--r--models/issue_list.go16
-rw-r--r--models/models.go14
-rw-r--r--models/unit_tests.go4
-rw-r--r--modules/indexer/issues/bleve.go250
-rw-r--r--modules/indexer/issues/bleve_test.go88
-rw-r--r--modules/indexer/issues/indexer.go36
-rw-r--r--modules/indexer/issues/queue.go11
-rw-r--r--modules/indexer/issues/queue_channel.go56
-rw-r--r--modules/indexer/issues/queue_disk.go104
-rw-r--r--modules/notification/indexer/indexer.go62
-rw-r--r--modules/setting/indexer.go55
-rw-r--r--modules/setting/setting.go11
-rw-r--r--routers/api/v1/repo/issue.go3
-rw-r--r--routers/init.go4
-rw-r--r--routers/repo/issue.go7
-rw-r--r--vendor/github.com/lunny/levelqueue/LICENSE19
-rw-r--r--vendor/github.com/lunny/levelqueue/error.go12
-rw-r--r--vendor/github.com/lunny/levelqueue/queue.go214
22 files changed, 1045 insertions, 116 deletions
diff --git a/Gopkg.lock b/Gopkg.lock
index fa2a58a1a3..3ef6f552ed 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -617,6 +617,14 @@
revision = "e3534c89ef969912856dfa39e56b09e58c5f5daf"
[[projects]]
+ branch = "master"
+ digest = "1:3ea59a5ada4bbac04da58e6177ca63da8c377a3143b48fca584408bf415fdafb"
+ name = "github.com/lunny/levelqueue"
+ packages = ["."]
+ pruneopts = "NUT"
+ revision = "02b525a4418e684a7786215296984e364746806f"
+
+[[projects]]
digest = "1:1e6a29ed1f189354030e3371f63ec58aacbc2bf232fd104c6e0d41174ac5af48"
name = "github.com/lunny/log"
packages = ["."]
@@ -1270,6 +1278,7 @@
"github.com/lafriks/xormstore",
"github.com/lib/pq",
"github.com/lunny/dingtalk_webhook",
+ "github.com/lunny/levelqueue",
"github.com/markbates/goth",
"github.com/markbates/goth/gothic",
"github.com/markbates/goth/providers/bitbucket",
diff --git a/models/issue.go b/models/issue.go
index 8ce8658fee..835c6cf9fc 100644
--- a/models/issue.go
+++ b/models/issue.go
@@ -183,12 +183,21 @@ func (issue *Issue) LoadPullRequest() error {
}
func (issue *Issue) loadComments(e Engine) (err error) {
+ return issue.loadCommentsByType(e, CommentTypeUnknown)
+}
+
+// LoadDiscussComments loads discuss comments
+func (issue *Issue) LoadDiscussComments() error {
+ return issue.loadCommentsByType(x, CommentTypeComment)
+}
+
+func (issue *Issue) loadCommentsByType(e Engine, tp CommentType) (err error) {
if issue.Comments != nil {
return nil
}
issue.Comments, err = findComments(e, FindCommentsOptions{
IssueID: issue.ID,
- Type: CommentTypeUnknown,
+ Type: tp,
})
return err
}
@@ -681,7 +690,6 @@ func updateIssueCols(e Engine, issue *Issue, cols ...string) error {
if _, err := e.ID(issue.ID).Cols(cols...).Update(issue); err != nil {
return err
}
- UpdateIssueIndexerCols(issue.ID, cols...)
return nil
}
@@ -1217,6 +1225,12 @@ func getIssuesByIDs(e Engine, issueIDs []int64) ([]*Issue, error) {
return issues, e.In("id", issueIDs).Find(&issues)
}
+func getIssueIDsByRepoID(e Engine, repoID int64) ([]int64, error) {
+ var ids = make([]int64, 0, 10)
+ err := e.Table("issue").Where("repo_id = ?", repoID).Find(&ids)
+ return ids, err
+}
+
// GetIssuesByIDs return issues with the given IDs.
func GetIssuesByIDs(issueIDs []int64) ([]*Issue, error) {
return getIssuesByIDs(x, issueIDs)
diff --git a/models/issue_comment.go b/models/issue_comment.go
index 1b02918cb7..c3654460ff 100644
--- a/models/issue_comment.go
+++ b/models/issue_comment.go
@@ -1035,6 +1035,7 @@ func UpdateComment(doer *User, c *Comment, oldContent string) error {
if err := c.LoadIssue(); err != nil {
return err
}
+
if err := c.Issue.LoadAttributes(); err != nil {
return err
}
@@ -1093,6 +1094,7 @@ func DeleteComment(doer *User, comment *Comment) error {
if err := comment.LoadIssue(); err != nil {
return err
}
+
if err := comment.Issue.LoadAttributes(); err != nil {
return err
}
diff --git a/models/issue_indexer.go b/models/issue_indexer.go
index 48c0b9f246..d02b7164da 100644
--- a/models/issue_indexer.go
+++ b/models/issue_indexer.go
@@ -7,25 +7,60 @@ package models
import (
"fmt"
- "code.gitea.io/gitea/modules/indexer"
+ "code.gitea.io/gitea/modules/indexer/issues"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)
-// issueIndexerUpdateQueue queue of issue ids to be updated
-var issueIndexerUpdateQueue chan int64
+var (
+ // issueIndexerUpdateQueue queue of issue ids to be updated
+ issueIndexerUpdateQueue issues.Queue
+ issueIndexer issues.Indexer
+)
// InitIssueIndexer initialize issue indexer
-func InitIssueIndexer() {
- indexer.InitIssueIndexer(populateIssueIndexer)
- issueIndexerUpdateQueue = make(chan int64, setting.Indexer.UpdateQueueLength)
- go processIssueIndexerUpdateQueue()
+func InitIssueIndexer() error {
+ var populate bool
+ switch setting.Indexer.IssueType {
+ case "bleve":
+ issueIndexer = issues.NewBleveIndexer(setting.Indexer.IssuePath)
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ return err
+ }
+ populate = !exist
+ default:
+ return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
+ }
+
+ var err error
+ switch setting.Indexer.IssueIndexerQueueType {
+ case setting.LevelQueueType:
+ issueIndexerUpdateQueue, err = issues.NewLevelQueue(
+ issueIndexer,
+ setting.Indexer.IssueIndexerQueueDir,
+ setting.Indexer.IssueIndexerQueueBatchNumber)
+ if err != nil {
+ return err
+ }
+ case setting.ChannelQueueType:
+ issueIndexerUpdateQueue = issues.NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
+ default:
+ return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
+ }
+
+ go issueIndexerUpdateQueue.Run()
+
+ if populate {
+ go populateIssueIndexer()
+ }
+
+ return nil
}
// populateIssueIndexer populate the issue indexer with issue data
-func populateIssueIndexer() error {
- batch := indexer.IssueIndexerBatch()
+func populateIssueIndexer() {
for page := 1; ; page++ {
repos, _, err := SearchRepositoryByName(&SearchRepoOptions{
Page: page,
@@ -35,98 +70,79 @@ func populateIssueIndexer() error {
Collaborate: util.OptionalBoolFalse,
})
if err != nil {
- return fmt.Errorf("Repositories: %v", err)
+ log.Error(4, "SearchRepositoryByName: %v", err)
+ continue
}
if len(repos) == 0 {
- return batch.Flush()
+ return
}
+
for _, repo := range repos {
- issues, err := Issues(&IssuesOptions{
+ is, err := Issues(&IssuesOptions{
RepoIDs: []int64{repo.ID},
IsClosed: util.OptionalBoolNone,
IsPull: util.OptionalBoolNone,
})
if err != nil {
- return err
+ log.Error(4, "Issues: %v", err)
+ continue
}
- if err = IssueList(issues).LoadComments(); err != nil {
- return err
+ if err = IssueList(is).LoadDiscussComments(); err != nil {
+ log.Error(4, "LoadComments: %v", err)
+ continue
}
- for _, issue := range issues {
- if err := issue.update().AddToFlushingBatch(batch); err != nil {
- return err
- }
+ for _, issue := range is {
+ UpdateIssueIndexer(issue)
}
}
}
}
-func processIssueIndexerUpdateQueue() {
- batch := indexer.IssueIndexerBatch()
- for {
- var issueID int64
- select {
- case issueID = <-issueIndexerUpdateQueue:
- default:
- // flush whatever updates we currently have, since we
- // might have to wait a while
- if err := batch.Flush(); err != nil {
- log.Error(4, "IssueIndexer: %v", err)
- }
- issueID = <-issueIndexerUpdateQueue
- }
- issue, err := GetIssueByID(issueID)
- if err != nil {
- log.Error(4, "GetIssueByID: %v", err)
- } else if err = issue.update().AddToFlushingBatch(batch); err != nil {
- log.Error(4, "IssueIndexer: %v", err)
- }
- }
-}
-
-func (issue *Issue) update() indexer.IssueIndexerUpdate {
- comments := make([]string, 0, 5)
+// UpdateIssueIndexer add/update an issue to the issue indexer
+func UpdateIssueIndexer(issue *Issue) {
+ var comments []string
for _, comment := range issue.Comments {
if comment.Type == CommentTypeComment {
comments = append(comments, comment.Content)
}
}
- return indexer.IssueIndexerUpdate{
- IssueID: issue.ID,
- Data: &indexer.IssueIndexerData{
- RepoID: issue.RepoID,
- Title: issue.Title,
- Content: issue.Content,
- Comments: comments,
- },
- }
+ issueIndexerUpdateQueue.Push(&issues.IndexerData{
+ ID: issue.ID,
+ RepoID: issue.RepoID,
+ Title: issue.Title,
+ Content: issue.Content,
+ Comments: comments,
+ })
}
-// updateNeededCols whether a change to the specified columns requires updating
-// the issue indexer
-func updateNeededCols(cols []string) bool {
- for _, col := range cols {
- switch col {
- case "name", "content":
- return true
- }
+// DeleteRepoIssueIndexer deletes repo's all issues indexes
+func DeleteRepoIssueIndexer(repo *Repository) {
+ var ids []int64
+ ids, err := getIssueIDsByRepoID(x, repo.ID)
+ if err != nil {
+ log.Error(4, "getIssueIDsByRepoID failed: %v", err)
+ return
+ }
+
+ if len(ids) <= 0 {
+ return
}
- return false
-}
-// UpdateIssueIndexerCols update an issue in the issue indexer, given changes
-// to the specified columns
-func UpdateIssueIndexerCols(issueID int64, cols ...string) {
- updateNeededCols(cols)
+ issueIndexerUpdateQueue.Push(&issues.IndexerData{
+ IDs: ids,
+ IsDelete: true,
+ })
}
-// UpdateIssueIndexer add/update an issue to the issue indexer
-func UpdateIssueIndexer(issueID int64) {
- select {
- case issueIndexerUpdateQueue <- issueID:
- default:
- go func() {
- issueIndexerUpdateQueue <- issueID
- }()
+// SearchIssuesByKeyword search issue ids by keywords and repo id
+func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
+ var issueIDs []int64
+ res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
+ if err != nil {
+ return nil, err
+ }
+ for _, r := range res.Hits {
+ issueIDs = append(issueIDs, r.ID)
}
+ return issueIDs, nil
}
diff --git a/models/issue_list.go b/models/issue_list.go
index 7e4c264643..a1aab488fc 100644
--- a/models/issue_list.go
+++ b/models/issue_list.go
@@ -4,7 +4,11 @@
package models
-import "fmt"
+import (
+ "fmt"
+
+ "github.com/go-xorm/builder"
+)
// IssueList defines a list of issues
type IssueList []*Issue
@@ -338,7 +342,7 @@ func (issues IssueList) loadAttachments(e Engine) (err error) {
return nil
}
-func (issues IssueList) loadComments(e Engine) (err error) {
+func (issues IssueList) loadComments(e Engine, cond builder.Cond) (err error) {
if len(issues) == 0 {
return nil
}
@@ -354,6 +358,7 @@ func (issues IssueList) loadComments(e Engine) (err error) {
rows, err := e.Table("comment").
Join("INNER", "issue", "issue.id = comment.issue_id").
In("issue.id", issuesIDs[:limit]).
+ Where(cond).
Rows(new(Comment))
if err != nil {
return err
@@ -479,5 +484,10 @@ func (issues IssueList) LoadAttachments() error {
// LoadComments loads comments
func (issues IssueList) LoadComments() error {
- return issues.loadComments(x)
+ return issues.loadComments(x, builder.NewCond())
+}
+
+// LoadDiscussComments loads discuss comments
+func (issues IssueList) LoadDiscussComments() error {
+ return issues.loadComments(x, builder.Eq{"comment.type": CommentTypeComment})
}
diff --git a/models/models.go b/models/models.go
index daef7c07e8..b8fe588b5a 100644
--- a/models/models.go
+++ b/models/models.go
@@ -12,7 +12,6 @@ import (
"net/url"
"os"
"path"
- "path/filepath"
"strings"
"code.gitea.io/gitea/modules/log"
@@ -158,19 +157,6 @@ func LoadConfigs() {
DbCfg.SSLMode = sec.Key("SSL_MODE").MustString("disable")
DbCfg.Path = sec.Key("PATH").MustString("data/gitea.db")
DbCfg.Timeout = sec.Key("SQLITE_TIMEOUT").MustInt(500)
-
- sec = setting.Cfg.Section("indexer")
- setting.Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/issues.bleve"))
- if !filepath.IsAbs(setting.Indexer.IssuePath) {
- setting.Indexer.IssuePath = path.Join(setting.AppWorkPath, setting.Indexer.IssuePath)
- }
- setting.Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
- setting.Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/repos.bleve"))
- if !filepath.IsAbs(setting.Indexer.RepoPath) {
- setting.Indexer.RepoPath = path.Join(setting.AppWorkPath, setting.Indexer.RepoPath)
- }
- setting.Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
- setting.Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
}
// parsePostgreSQLHostPort parses given input in various forms defined in
diff --git a/models/unit_tests.go b/models/unit_tests.go
index 28cd91215e..f87dd7ee96 100644
--- a/models/unit_tests.go
+++ b/models/unit_tests.go
@@ -44,6 +44,10 @@ func MainTest(m *testing.M, pathToGiteaRoot string) {
fatalTestError("Error creating test engine: %v\n", err)
}
+ if err = InitIssueIndexer(); err != nil {
+ fatalTestError("Error InitIssueIndexer: %v\n", err)
+ }
+
setting.AppURL = "https://try.gitea.io/"
setting.RunUser = "runuser"
setting.SSH.Port = 3000
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go
new file mode 100644
index 0000000000..36279198b8
--- /dev/null
+++ b/modules/indexer/issues/bleve.go
@@ -0,0 +1,250 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+
+ "github.com/blevesearch/bleve"
+ "github.com/blevesearch/bleve/analysis/analyzer/custom"
+ "github.com/blevesearch/bleve/analysis/token/lowercase"
+ "github.com/blevesearch/bleve/analysis/token/unicodenorm"
+ "github.com/blevesearch/bleve/analysis/tokenizer/unicode"
+ "github.com/blevesearch/bleve/index/upsidedown"
+ "github.com/blevesearch/bleve/mapping"
+ "github.com/blevesearch/bleve/search/query"
+ "github.com/ethantkoenig/rupture"
+)
+
+const (
+ issueIndexerAnalyzer = "issueIndexer"
+ issueIndexerDocType = "issueIndexerDocType"
+ issueIndexerLatestVersion = 1
+)
+
+// indexerID a bleve-compatible unique identifier for an integer id
+func indexerID(id int64) string {
+ return strconv.FormatInt(id, 36)
+}
+
+// idOfIndexerID the integer id associated with an indexer id
+func idOfIndexerID(indexerID string) (int64, error) {
+ id, err := strconv.ParseInt(indexerID, 36, 64)
+ if err != nil {
+ return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err)
+ }
+ return id, nil
+}
+
+// numericEqualityQuery a numeric equality query for the given value and field
+func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery {
+ f := float64(value)
+ tru := true
+ q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru)
+ q.SetField(field)
+ return q
+}
+
+func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery {
+ q := bleve.NewMatchPhraseQuery(matchPhrase)
+ q.FieldVal = field
+ q.Analyzer = analyzer
+ return q
+}
+
+const unicodeNormalizeName = "unicodeNormalize"
+
+func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
+ return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
+ "type": unicodenorm.Name,
+ "form": unicodenorm.NFC,
+ })
+}
+
+const maxBatchSize = 16
+
+// openIndexer open the index at the specified path, checking for metadata
+// updates and bleve version updates. If index needs to be created (or
+// re-created), returns (nil, nil)
+func openIndexer(path string, latestVersion int) (bleve.Index, error) {
+ _, err := os.Stat(path)
+ if err != nil && os.IsNotExist(err) {
+ return nil, nil
+ } else if err != nil {
+ return nil, err
+ }
+
+ metadata, err := rupture.ReadIndexMetadata(path)
+ if err != nil {
+ return nil, err
+ }
+ if metadata.Version < latestVersion {
+ // the indexer is using a previous version, so we should delete it and
+ // re-populate
+ return nil, os.RemoveAll(path)
+ }
+
+ index, err := bleve.Open(path)
+ if err != nil && err == upsidedown.IncompatibleVersion {
+ // the indexer was built with a previous version of bleve, so we should
+ // delete it and re-populate
+ return nil, os.RemoveAll(path)
+ } else if err != nil {
+ return nil, err
+ }
+
+ return index, nil
+}
+
+// BleveIndexerData an update to the issue indexer
+type BleveIndexerData IndexerData
+
+// Type returns the document type, for bleve's mapping.Classifier interface.
+func (i *BleveIndexerData) Type() string {
+ return issueIndexerDocType
+}
+
+// createIssueIndexer create an issue indexer if one does not already exist
+func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) {
+ mapping := bleve.NewIndexMapping()
+ docMapping := bleve.NewDocumentMapping()
+
+ numericFieldMapping := bleve.NewNumericFieldMapping()
+ numericFieldMapping.IncludeInAll = false
+ docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
+
+ textFieldMapping := bleve.NewTextFieldMapping()
+ textFieldMapping.Store = false
+ textFieldMapping.IncludeInAll = false
+ docMapping.AddFieldMappingsAt("Title", textFieldMapping)
+ docMapping.AddFieldMappingsAt("Content", textFieldMapping)
+ docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
+
+ if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
+ return nil, err
+ } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
+ "type": custom.Name,
+ "char_filters": []string{},
+ "tokenizer": unicode.Name,
+ "token_filters": []string{unicodeNormalizeName, lowercase.Name},
+ }); err != nil {
+ return nil, err
+ }
+
+ mapping.DefaultAnalyzer = issueIndexerAnalyzer
+ mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
+ mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
+
+ index, err := bleve.New(path, mapping)
+ if err != nil {
+ return nil, err
+ }
+
+ if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{
+ Version: latestVersion,
+ }); err != nil {
+ return nil, err
+ }
+ return index, nil
+}
+
+var (
+ _ Indexer = &BleveIndexer{}
+)
+
+// BleveIndexer implements Indexer interface
+type BleveIndexer struct {
+ indexDir string
+ indexer bleve.Index
+}
+
+// NewBleveIndexer creates a new bleve local indexer
+func NewBleveIndexer(indexDir string) *BleveIndexer {
+ return &BleveIndexer{
+ indexDir: indexDir,
+ }
+}
+
+// Init will initial the indexer
+func (b *BleveIndexer) Init() (bool, error) {
+ var err error
+ b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion)
+ if err != nil {
+ return false, err
+ }
+ if b.indexer != nil {
+ return true, nil
+ }
+
+ b.indexer, err = createIssueIndexer(b.indexDir, issueIndexerLatestVersion)
+ return false, err
+}
+
+// Index will save the index data
+func (b *BleveIndexer) Index(issues []*IndexerData) error {
+ batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+ for _, issue := range issues {
+ if err := batch.Index(indexerID(issue.ID), struct {
+ RepoID int64
+ Title string
+ Content string
+ Comments []string
+ }{
+ RepoID: issue.RepoID,
+ Title: issue.Title,
+ Content: issue.Content,
+ Comments: issue.Comments,
+ }); err != nil {
+ return err
+ }
+ }
+ return batch.Flush()
+}
+
+// Delete deletes indexes by ids
+func (b *BleveIndexer) Delete(ids ...int64) error {
+ batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+ for _, id := range ids {
+ if err := batch.Delete(indexerID(id)); err != nil {
+ return err
+ }
+ }
+ return batch.Flush()
+}
+
+// Search searches for issues by given conditions.
+// Returns the matching issue IDs
+func (b *BleveIndexer) Search(keyword string, repoID int64, limit, start int) (*SearchResult, error) {
+ indexerQuery := bleve.NewConjunctionQuery(
+ numericEqualityQuery(repoID, "RepoID"),
+ bleve.NewDisjunctionQuery(
+ newMatchPhraseQuery(keyword, "Title", issueIndexerAnalyzer),
+ newMatchPhraseQuery(keyword, "Content", issueIndexerAnalyzer),
+ newMatchPhraseQuery(keyword, "Comments", issueIndexerAnalyzer),
+ ))
+ search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
+
+ result, err := b.indexer.Search(search)
+ if err != nil {
+ return nil, err
+ }
+
+ var ret = SearchResult{
+ Hits: make([]Match, 0, len(result.Hits)),
+ }
+ for _, hit := range result.Hits {
+ id, err := idOfIndexerID(hit.ID)
+ if err != nil {
+ return nil, err
+ }
+ ret.Hits = append(ret.Hits, Match{
+ ID: id,
+ RepoID: repoID,
+ })
+ }
+ return &ret, nil
+}
diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go
new file mode 100644
index 0000000000..720266e3b5
--- /dev/null
+++ b/modules/indexer/issues/bleve_test.go
@@ -0,0 +1,88 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestIndexAndSearch(t *testing.T) {
+ dir := "./bleve.index"
+ indexer := NewBleveIndexer(dir)
+ defer os.RemoveAll(dir)
+
+ _, err := indexer.Init()
+ assert.NoError(t, err)
+
+ err = indexer.Index([]*IndexerData{
+ {
+ ID: 1,
+ RepoID: 2,
+ Title: "Issue search should support Chinese",
+ Content: "As title",
+ Comments: []string{
+ "test1",
+ "test2",
+ },
+ },
+ {
+ ID: 2,
+ RepoID: 2,
+ Title: "CJK support could be optional",
+ Content: "Chinese Korean and Japanese should be supported but I would like it's not enabled by default",
+ Comments: []string{
+ "LGTM",
+ "Good idea",
+ },
+ },
+ })
+ assert.NoError(t, err)
+
+ var (
+ keywords = []struct {
+ Keyword string
+ IDs []int64
+ }{
+ {
+ Keyword: "search",
+ IDs: []int64{1},
+ },
+ {
+ Keyword: "test1",
+ IDs: []int64{1},
+ },
+ {
+ Keyword: "test2",
+ IDs: []int64{1},
+ },
+ {
+ Keyword: "support",
+ IDs: []int64{1, 2},
+ },
+ {
+ Keyword: "chinese",
+ IDs: []int64{1, 2},
+ },
+ {
+ Keyword: "help",
+ IDs: []int64{},
+ },
+ }
+ )
+
+ for _, kw := range keywords {
+ res, err := indexer.Search(kw.Keyword, 2, 10, 0)
+ assert.NoError(t, err)
+
+ var ids = make([]int64, 0, len(res.Hits))
+ for _, hit := range res.Hits {
+ ids = append(ids, hit.ID)
+ }
+ assert.EqualValues(t, kw.IDs, ids)
+ }
+}
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
new file mode 100644
index 0000000000..c31006d0dd
--- /dev/null
+++ b/modules/indexer/issues/indexer.go
@@ -0,0 +1,36 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+// IndexerData data stored in the issue indexer
+type IndexerData struct {
+ ID int64
+ RepoID int64
+ Title string
+ Content string
+ Comments []string
+ IsDelete bool
+ IDs []int64
+}
+
+// Match represents on search result
+type Match struct {
+ ID int64 `json:"id"`
+ RepoID int64 `json:"repo_id"`
+ Score float64 `json:"score"`
+}
+
+// SearchResult represents search results
+type SearchResult struct {
+ Hits []Match
+}
+
+// Indexer defines an inteface to indexer issues contents
+type Indexer interface {
+ Init() (bool, error)
+ Index(issue []*IndexerData) error
+ Delete(ids ...int64) error
+ Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
+}
diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go
new file mode 100644
index 0000000000..6f4ee4c13a
--- /dev/null
+++ b/modules/indexer/issues/queue.go
@@ -0,0 +1,11 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+// Queue defines an interface to save an issue indexer queue
+type Queue interface {
+ Run() error
+ Push(*IndexerData)
+}
diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go
new file mode 100644
index 0000000000..99a90ad499
--- /dev/null
+++ b/modules/indexer/issues/queue_channel.go
@@ -0,0 +1,56 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "time"
+
+ "code.gitea.io/gitea/modules/setting"
+)
+
+// ChannelQueue implements
+type ChannelQueue struct {
+ queue chan *IndexerData
+ indexer Indexer
+ batchNumber int
+}
+
+// NewChannelQueue create a memory channel queue
+func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
+ return &ChannelQueue{
+ queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
+ indexer: indexer,
+ batchNumber: batchNumber,
+ }
+}
+
+// Run starts to run the queue
+func (c *ChannelQueue) Run() error {
+ var i int
+ var datas = make([]*IndexerData, 0, c.batchNumber)
+ for {
+ select {
+ case data := <-c.queue:
+ datas = append(datas, data)
+ if len(datas) >= c.batchNumber {
+ c.indexer.Index(datas)
+ // TODO: save the point
+ datas = make([]*IndexerData, 0, c.batchNumber)
+ }
+ case <-time.After(time.Millisecond * 100):
+ i++
+ if i >= 3 && len(datas) > 0 {
+ c.indexer.Index(datas)
+ // TODO: save the point
+ datas = make([]*IndexerData, 0, c.batchNumber)
+ }
+ }
+ }
+}
+
+// Push will push the indexer data to queue
+func (c *ChannelQueue) Push(data *IndexerData) {
+ c.queue <- data
+}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
new file mode 100644
index 0000000000..97e9a3d965
--- /dev/null
+++ b/modules/indexer/issues/queue_disk.go
@@ -0,0 +1,104 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "encoding/json"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+ "github.com/lunny/levelqueue"
+)
+
+var (
+ _ Queue = &LevelQueue{}
+)
+
+// LevelQueue implements a disk library queue
+type LevelQueue struct {
+ indexer Indexer
+ queue *levelqueue.Queue
+ batchNumber int
+}
+
+// NewLevelQueue creates a ledis local queue
+func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
+ queue, err := levelqueue.Open(dataDir)
+ if err != nil {
+ return nil, err
+ }
+
+ return &LevelQueue{
+ indexer: indexer,
+ queue: queue,
+ batchNumber: batchNumber,
+ }, nil
+}
+
+// Run starts to run the queue
+func (l *LevelQueue) Run() error {
+ var i int
+ var datas = make([]*IndexerData, 0, l.batchNumber)
+ for {
+ bs, err := l.queue.RPop()
+ if err != nil {
+ log.Error(4, "RPop: %v", err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ i++
+ if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
+ l.indexer.Index(datas)
+ datas = make([]*IndexerData, 0, l.batchNumber)
+ i = 0
+ }
+
+ if len(bs) <= 0 {
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ var data IndexerData
+ err = json.Unmarshal(bs, &data)
+ if err != nil {
+ log.Error(4, "Unmarshal: %v", err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ log.Trace("LedisLocalQueue: task found: %#v", data)
+
+ if data.IsDelete {
+ if data.ID > 0 {
+ if err = l.indexer.Delete(data.ID); err != nil {
+ log.Error(4, "indexer.Delete: %v", err)
+ }
+ } else if len(data.IDs) > 0 {
+ if err = l.indexer.Delete(data.IDs...); err != nil {
+ log.Error(4, "indexer.Delete: %v", err)
+ }
+ }
+ time.Sleep(time.Millisecond * 10)
+ continue
+ }
+
+ datas = append(datas, &data)
+ time.Sleep(time.Millisecond * 10)
+ }
+}
+
+// Push will push the indexer data to queue
+func (l *LevelQueue) Push(data *IndexerData) {
+ bs, err := json.Marshal(data)
+ if err != nil {
+ log.Error(4, "Marshal: %v", err)
+ return
+ }
+ err = l.queue.LPush(bs)
+ if err != nil {
+ log.Error(4, "LPush: %v", err)
+ }
+}
diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go
index 3fd3352188..66d483c017 100644
--- a/modules/notification/indexer/indexer.go
+++ b/modules/notification/indexer/indexer.go
@@ -6,6 +6,7 @@ package indexer
import (
"code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification/base"
)
@@ -25,38 +26,83 @@ func NewNotifier() base.Notifier {
func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository,
issue *models.Issue, comment *models.Comment) {
if comment.Type == models.CommentTypeComment {
- models.UpdateIssueIndexer(issue.ID)
+ if issue.Comments == nil {
+ if err := issue.LoadDiscussComments(); err != nil {
+ log.Error(4, "LoadComments failed: %v", err)
+ return
+ }
+ } else {
+ issue.Comments = append(issue.Comments, comment)
+ }
+
+ models.UpdateIssueIndexer(issue)
}
}
func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) {
- models.UpdateIssueIndexer(issue.ID)
+ models.UpdateIssueIndexer(issue)
}
func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) {
- models.UpdateIssueIndexer(pr.Issue.ID)
+ models.UpdateIssueIndexer(pr.Issue)
}
func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) {
if c.Type == models.CommentTypeComment {
- models.UpdateIssueIndexer(c.IssueID)
+ var found bool
+ if c.Issue.Comments != nil {
+ for i := 0; i < len(c.Issue.Comments); i++ {
+ if c.Issue.Comments[i].ID == c.ID {
+ c.Issue.Comments[i] = c
+ found = true
+ break
+ }
+ }
+ }
+
+ if !found {
+ if err := c.Issue.LoadDiscussComments(); err != nil {
+ log.Error(4, "LoadComments failed: %v", err)
+ return
+ }
+ }
+
+ models.UpdateIssueIndexer(c.Issue)
}
}
func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models.Comment) {
if comment.Type == models.CommentTypeComment {
- models.UpdateIssueIndexer(comment.IssueID)
+ var found bool
+ if comment.Issue.Comments != nil {
+ for i := 0; i < len(comment.Issue.Comments); i++ {
+ if comment.Issue.Comments[i].ID == comment.ID {
+ comment.Issue.Comments = append(comment.Issue.Comments[:i], comment.Issue.Comments[i+1:]...)
+ found = true
+ break
+ }
+ }
+ }
+
+ if !found {
+ if err := comment.Issue.LoadDiscussComments(); err != nil {
+ log.Error(4, "LoadComments failed: %v", err)
+ return
+ }
+ }
+ // reload comments to delete the old comment
+ models.UpdateIssueIndexer(comment.Issue)
}
}
func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) {
- models.DeleteRepoFromIndexer(repo)
+ models.DeleteRepoIssueIndexer(repo)
}
func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) {
- models.UpdateIssueIndexer(issue.ID)
+ models.UpdateIssueIndexer(issue)
}
func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) {
- models.UpdateIssueIndexer(issue.ID)
+ models.UpdateIssueIndexer(issue)
}
diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go
new file mode 100644
index 0000000000..245ebb0496
--- /dev/null
+++ b/modules/setting/indexer.go
@@ -0,0 +1,55 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package setting
+
+import (
+ "path"
+ "path/filepath"
+)
+
+// enumerates all the indexer queue types
+const (
+ LevelQueueType = "levelqueue"
+ ChannelQueueType = "channel"
+)
+
+var (
+ // Indexer settings
+ Indexer = struct {
+ IssueType string
+ IssuePath string
+ RepoIndexerEnabled bool
+ RepoPath string
+ UpdateQueueLength int
+ MaxIndexerFileSize int64
+ IssueIndexerQueueType string
+ IssueIndexerQueueDir string
+ IssueIndexerQueueBatchNumber int
+ }{
+ IssueType: "bleve",
+ IssuePath: "indexers/issues.bleve",
+ IssueIndexerQueueType: LevelQueueType,
+ IssueIndexerQueueDir: "indexers/issues.queue",
+ IssueIndexerQueueBatchNumber: 20,
+ }
+)
+
+func newIndexerService() {
+ sec := Cfg.Section("indexer")
+ Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/issues.bleve"))
+ if !filepath.IsAbs(Indexer.IssuePath) {
+ Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath)
+ }
+ Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
+ Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve"))
+ if !filepath.IsAbs(Indexer.RepoPath) {
+ Indexer.RepoPath = path.Join(AppWorkPath, Indexer.RepoPath)
+ }
+ Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
+ Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
+ Indexer.IssueIndexerQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType)
+ Indexer.IssueIndexerQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
+ Indexer.IssueIndexerQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
+}
diff --git a/modules/setting/setting.go b/modules/setting/setting.go
index 5f65570540..4c016f3489 100644
--- a/modules/setting/setting.go
+++ b/modules/setting/setting.go
@@ -179,15 +179,6 @@ var (
DBConnectRetries int
DBConnectBackoff time.Duration
- // Indexer settings
- Indexer struct {
- IssuePath string
- RepoIndexerEnabled bool
- RepoPath string
- UpdateQueueLength int
- MaxIndexerFileSize int64
- }
-
// Repository settings
Repository = struct {
AnsiCharset string
@@ -1214,6 +1205,7 @@ func NewContext() {
IsInputFile: sec.Key("IS_INPUT_FILE").MustBool(false),
})
}
+
sec = Cfg.Section("U2F")
U2F.TrustedFacets, _ = shellquote.Split(sec.Key("TRUSTED_FACETS").MustString(strings.TrimRight(AppURL, "/")))
U2F.AppID = sec.Key("APP_ID").MustString(strings.TrimRight(AppURL, "/"))
@@ -1240,4 +1232,5 @@ func NewServices() {
newRegisterMailService()
newNotifyMailService()
newWebhookService()
+ newIndexerService()
}
diff --git a/routers/api/v1/repo/issue.go b/routers/api/v1/repo/issue.go
index d339d8f0b7..b13af33548 100644
--- a/routers/api/v1/repo/issue.go
+++ b/routers/api/v1/repo/issue.go
@@ -13,7 +13,6 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
- "code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
@@ -78,7 +77,7 @@ func ListIssues(ctx *context.APIContext) {
var labelIDs []int64
var err error
if len(keyword) > 0 {
- issueIDs, err = indexer.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword)
+ issueIDs, err = models.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword)
}
if splitted := strings.Split(ctx.Query("labels"), ","); len(splitted) > 0 {
diff --git a/routers/init.go b/routers/init.go
index 4da786cc00..1da21a351b 100644
--- a/routers/init.go
+++ b/routers/init.go
@@ -90,7 +90,9 @@ func GlobalInit() {
// Booting long running goroutines.
cron.NewContext()
- models.InitIssueIndexer()
+ if err := models.InitIssueIndexer(); err != nil {
+ log.Fatal(4, "Failed to initialize issue indexer: %v", err)
+ }
models.InitRepoIndexer()
models.InitSyncMirrors()
models.InitDeliverHooks()
diff --git a/routers/repo/issue.go b/routers/repo/issue.go
index 6783d279b5..1843e00144 100644
--- a/routers/repo/issue.go
+++ b/routers/repo/issue.go
@@ -23,7 +23,6 @@ import (
"code.gitea.io/gitea/modules/auth"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context"
- "code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/markup/markdown"
"code.gitea.io/gitea/modules/notification"
@@ -147,7 +146,11 @@ func issues(ctx *context.Context, milestoneID int64, isPullOption util.OptionalB
var issueIDs []int64
if len(keyword) > 0 {
- issueIDs, err = indexer.SearchIssuesByKeyword(repo.ID, keyword)
+ issueIDs, err = models.SearchIssuesByKeyword(repo.ID, keyword)
+ if err != nil {
+ ctx.ServerError("issueIndexer.Search", err)
+ return
+ }
if len(issueIDs) == 0 {
forceEmpty = true
}
diff --git a/vendor/github.com/lunny/levelqueue/LICENSE b/vendor/github.com/lunny/levelqueue/LICENSE
new file mode 100644
index 0000000000..4a5a4ea0ff
--- /dev/null
+++ b/vendor/github.com/lunny/levelqueue/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2019 Lunny Xiao
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/github.com/lunny/levelqueue/error.go b/vendor/github.com/lunny/levelqueue/error.go
new file mode 100644
index 0000000000..d639c5d496
--- /dev/null
+++ b/vendor/github.com/lunny/levelqueue/error.go
@@ -0,0 +1,12 @@
+// Copyright 2019 Lunny Xiao. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import "errors"
+
+var (
+ // ErrNotFound means no element in queue
+ ErrNotFound = errors.New("no key found")
+)
diff --git a/vendor/github.com/lunny/levelqueue/queue.go b/vendor/github.com/lunny/levelqueue/queue.go
new file mode 100644
index 0000000000..0b2bef6c84
--- /dev/null
+++ b/vendor/github.com/lunny/levelqueue/queue.go
@@ -0,0 +1,214 @@
+// Copyright 2019 Lunny Xiao. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package levelqueue
+
+import (
+ "bytes"
+ "encoding/binary"
+ "sync"
+
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// Queue defines a queue struct
+type Queue struct {
+ db *leveldb.DB
+ highLock sync.Mutex
+ lowLock sync.Mutex
+ low int64
+ high int64
+}
+
+// Open opens a queue object or create it if not exist
+func Open(dataDir string) (*Queue, error) {
+ db, err := leveldb.OpenFile(dataDir, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ var queue = &Queue{
+ db: db,
+ }
+ queue.low, err = queue.readID(lowKey)
+ if err == leveldb.ErrNotFound {
+ queue.low = 1
+ err = db.Put(lowKey, id2bytes(1), nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ queue.high, err = queue.readID(highKey)
+ if err == leveldb.ErrNotFound {
+ err = db.Put(highKey, id2bytes(0), nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return queue, nil
+}
+
+func (queue *Queue) readID(key []byte) (int64, error) {
+ bs, err := queue.db.Get(key, nil)
+ if err != nil {
+ return 0, err
+ }
+ return bytes2id(bs)
+}
+
+var (
+ lowKey = []byte("low")
+ highKey = []byte("high")
+)
+
+func (queue *Queue) highincrement() (int64, error) {
+ id := queue.high + 1
+ queue.high = id
+ err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ if err != nil {
+ queue.high = queue.high - 1
+ return 0, err
+ }
+ return id, nil
+}
+
+func (queue *Queue) highdecrement() (int64, error) {
+ queue.high = queue.high - 1
+ err := queue.db.Put(highKey, id2bytes(queue.high), nil)
+ if err != nil {
+ queue.high = queue.high + 1
+ return 0, err
+ }
+ return queue.high, nil
+}
+
+func (queue *Queue) lowincrement() (int64, error) {
+ queue.low = queue.low + 1
+ err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ if err != nil {
+ queue.low = queue.low - 1
+ return 0, err
+ }
+ return queue.low, nil
+}
+
+func (queue *Queue) lowdecrement() (int64, error) {
+ queue.low = queue.low - 1
+ err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
+ if err != nil {
+ queue.low = queue.low + 1
+ return 0, err
+ }
+ return queue.low, nil
+}
+
+// Len returns the length of the queue
+func (queue *Queue) Len() int64 {
+ queue.lowLock.Lock()
+ queue.highLock.Lock()
+ l := queue.high - queue.low + 1
+ queue.highLock.Unlock()
+ queue.lowLock.Unlock()
+ return l
+}
+
+func id2bytes(id int64) []byte {
+ var buf = make([]byte, 8)
+ binary.PutVarint(buf, id)
+ return buf
+}
+
+func bytes2id(b []byte) (int64, error) {
+ return binary.ReadVarint(bytes.NewReader(b))
+}
+
+// RPush pushes a data from right of queue
+func (queue *Queue) RPush(data []byte) error {
+ queue.highLock.Lock()
+ id, err := queue.highincrement()
+ if err != nil {
+ queue.highLock.Unlock()
+ return err
+ }
+ err = queue.db.Put(id2bytes(id), data, nil)
+ queue.highLock.Unlock()
+ return err
+}
+
+// LPush pushes a data from left of queue
+func (queue *Queue) LPush(data []byte) error {
+ queue.highLock.Lock()
+ id, err := queue.lowdecrement()
+ if err != nil {
+ queue.highLock.Unlock()
+ return err
+ }
+ err = queue.db.Put(id2bytes(id), data, nil)
+ queue.highLock.Unlock()
+ return err
+}
+
+// RPop pop a data from right of queue
+func (queue *Queue) RPop() ([]byte, error) {
+ queue.highLock.Lock()
+ currentID := queue.high
+
+ res, err := queue.db.Get(id2bytes(currentID), nil)
+ if err != nil {
+ queue.highLock.Unlock()
+ if err == leveldb.ErrNotFound {
+ return nil, ErrNotFound
+ }
+ return nil, err
+ }
+
+ _, err = queue.highdecrement()
+ if err != nil {
+ queue.highLock.Unlock()
+ return nil, err
+ }
+
+ err = queue.db.Delete(id2bytes(currentID), nil)
+ queue.highLock.Unlock()
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+}
+
+// LPop pop a data from left of queue
+func (queue *Queue) LPop() ([]byte, error) {
+ queue.lowLock.Lock()
+ currentID := queue.low
+
+ res, err := queue.db.Get(id2bytes(currentID), nil)
+ if err != nil {
+ queue.lowLock.Unlock()
+ if err == leveldb.ErrNotFound {
+ return nil, ErrNotFound
+ }
+ return nil, err
+ }
+
+ _, err = queue.lowincrement()
+ if err != nil {
+ return nil, err
+ }
+
+ err = queue.db.Delete(id2bytes(currentID), nil)
+ queue.lowLock.Unlock()
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+}
+
+// Close closes the queue
+func (queue *Queue) Close() error {
+ err := queue.db.Close()
+ queue.db = nil
+ return err
+}