You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

elastic_search.go 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package code
  5. import (
  6. "bufio"
  7. "context"
  8. "fmt"
  9. "io"
  10. "strconv"
  11. "strings"
  12. "time"
  13. repo_model "code.gitea.io/gitea/models/repo"
  14. "code.gitea.io/gitea/modules/analyze"
  15. "code.gitea.io/gitea/modules/charset"
  16. "code.gitea.io/gitea/modules/git"
  17. "code.gitea.io/gitea/modules/json"
  18. "code.gitea.io/gitea/modules/log"
  19. "code.gitea.io/gitea/modules/setting"
  20. "code.gitea.io/gitea/modules/timeutil"
  21. "code.gitea.io/gitea/modules/typesniffer"
  22. "github.com/go-enry/go-enry/v2"
  23. "github.com/olivere/elastic/v7"
  24. )
  25. const (
  26. esRepoIndexerLatestVersion = 1
  27. // multi-match-types, currently only 2 types are used
  28. // Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
  29. esMultiMatchTypeBestFields = "best_fields"
  30. esMultiMatchTypePhrasePrefix = "phrase_prefix"
  31. )
  32. var (
  33. _ Indexer = &ElasticSearchIndexer{}
  34. )
  35. // ElasticSearchIndexer implements Indexer interface
  36. type ElasticSearchIndexer struct {
  37. client *elastic.Client
  38. indexerAliasName string
  39. }
  40. type elasticLogger struct {
  41. log.Logger
  42. }
  43. func (l elasticLogger) Printf(format string, args ...interface{}) {
  44. _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
  45. }
  46. // NewElasticSearchIndexer creates a new elasticsearch indexer
  47. func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) {
  48. opts := []elastic.ClientOptionFunc{
  49. elastic.SetURL(url),
  50. elastic.SetSniff(false),
  51. elastic.SetHealthcheckInterval(10 * time.Second),
  52. elastic.SetGzip(false),
  53. }
  54. logger := elasticLogger{log.GetLogger(log.DEFAULT)}
  55. if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
  56. opts = append(opts, elastic.SetTraceLog(logger))
  57. } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
  58. opts = append(opts, elastic.SetErrorLog(logger))
  59. } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
  60. opts = append(opts, elastic.SetInfoLog(logger))
  61. }
  62. client, err := elastic.NewClient(opts...)
  63. if err != nil {
  64. return nil, false, err
  65. }
  66. indexer := &ElasticSearchIndexer{
  67. client: client,
  68. indexerAliasName: indexerName,
  69. }
  70. exists, err := indexer.init()
  71. if err != nil {
  72. indexer.Close()
  73. return nil, false, err
  74. }
  75. return indexer, !exists, err
  76. }
  77. const (
  78. defaultMapping = `{
  79. "mappings": {
  80. "properties": {
  81. "repo_id": {
  82. "type": "long",
  83. "index": true
  84. },
  85. "content": {
  86. "type": "text",
  87. "term_vector": "with_positions_offsets",
  88. "index": true
  89. },
  90. "commit_id": {
  91. "type": "keyword",
  92. "index": true
  93. },
  94. "language": {
  95. "type": "keyword",
  96. "index": true
  97. },
  98. "updated_at": {
  99. "type": "long",
  100. "index": true
  101. }
  102. }
  103. }
  104. }`
  105. )
  106. func (b *ElasticSearchIndexer) realIndexerName() string {
  107. return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion)
  108. }
  109. // Init will initialize the indexer
  110. func (b *ElasticSearchIndexer) init() (bool, error) {
  111. ctx := context.Background()
  112. exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
  113. if err != nil {
  114. return false, err
  115. }
  116. if !exists {
  117. var mapping = defaultMapping
  118. createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx)
  119. if err != nil {
  120. return false, err
  121. }
  122. if !createIndex.Acknowledged {
  123. return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping)
  124. }
  125. }
  126. // check version
  127. r, err := b.client.Aliases().Do(ctx)
  128. if err != nil {
  129. return false, err
  130. }
  131. realIndexerNames := r.IndicesByAlias(b.indexerAliasName)
  132. if len(realIndexerNames) < 1 {
  133. res, err := b.client.Alias().
  134. Add(b.realIndexerName(), b.indexerAliasName).
  135. Do(ctx)
  136. if err != nil {
  137. return false, err
  138. }
  139. if !res.Acknowledged {
  140. return false, fmt.Errorf("")
  141. }
  142. } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() {
  143. log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.",
  144. realIndexerNames[0], b.realIndexerName())
  145. res, err := b.client.Alias().
  146. Remove(realIndexerNames[0], b.indexerAliasName).
  147. Add(b.realIndexerName(), b.indexerAliasName).
  148. Do(ctx)
  149. if err != nil {
  150. return false, err
  151. }
  152. if !res.Acknowledged {
  153. return false, fmt.Errorf("")
  154. }
  155. }
  156. return exists, nil
  157. }
  158. func (b *ElasticSearchIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) {
  159. // Ignore vendored files in code search
  160. if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
  161. return nil, nil
  162. }
  163. size := update.Size
  164. if !update.Sized {
  165. stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
  166. RunInDir(repo.RepoPath())
  167. if err != nil {
  168. return nil, err
  169. }
  170. if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil {
  171. return nil, fmt.Errorf("Misformatted git cat-file output: %v", err)
  172. }
  173. }
  174. if size > setting.Indexer.MaxIndexerFileSize {
  175. return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
  176. }
  177. if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
  178. return nil, err
  179. }
  180. _, _, size, err := git.ReadBatchLine(batchReader)
  181. if err != nil {
  182. return nil, err
  183. }
  184. fileContents, err := io.ReadAll(io.LimitReader(batchReader, size))
  185. if err != nil {
  186. return nil, err
  187. } else if !typesniffer.DetectContentType(fileContents).IsText() {
  188. // FIXME: UTF-16 files will probably fail here
  189. return nil, nil
  190. }
  191. if _, err = batchReader.Discard(1); err != nil {
  192. return nil, err
  193. }
  194. id := filenameIndexerID(repo.ID, update.Filename)
  195. return []elastic.BulkableRequest{
  196. elastic.NewBulkIndexRequest().
  197. Index(b.indexerAliasName).
  198. Id(id).
  199. Doc(map[string]interface{}{
  200. "repo_id": repo.ID,
  201. "content": string(charset.ToUTF8DropErrors(fileContents)),
  202. "commit_id": sha,
  203. "language": analyze.GetCodeLanguage(update.Filename, fileContents),
  204. "updated_at": timeutil.TimeStampNow(),
  205. }),
  206. }, nil
  207. }
  208. func (b *ElasticSearchIndexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest {
  209. id := filenameIndexerID(repo.ID, filename)
  210. return elastic.NewBulkDeleteRequest().
  211. Index(b.indexerAliasName).
  212. Id(id)
  213. }
  214. // Index will save the index data
  215. func (b *ElasticSearchIndexer) Index(repo *repo_model.Repository, sha string, changes *repoChanges) error {
  216. reqs := make([]elastic.BulkableRequest, 0)
  217. if len(changes.Updates) > 0 {
  218. // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first!
  219. if err := git.EnsureValidGitRepository(git.DefaultContext, repo.RepoPath()); err != nil {
  220. log.Error("Unable to open git repo: %s for %-v: %v", repo.RepoPath(), repo, err)
  221. return err
  222. }
  223. batchWriter, batchReader, cancel := git.CatFileBatch(git.DefaultContext, repo.RepoPath())
  224. defer cancel()
  225. for _, update := range changes.Updates {
  226. updateReqs, err := b.addUpdate(batchWriter, batchReader, sha, update, repo)
  227. if err != nil {
  228. return err
  229. }
  230. if len(updateReqs) > 0 {
  231. reqs = append(reqs, updateReqs...)
  232. }
  233. }
  234. cancel()
  235. }
  236. for _, filename := range changes.RemovedFilenames {
  237. reqs = append(reqs, b.addDelete(filename, repo))
  238. }
  239. if len(reqs) > 0 {
  240. _, err := b.client.Bulk().
  241. Index(b.indexerAliasName).
  242. Add(reqs...).
  243. Do(context.Background())
  244. return err
  245. }
  246. return nil
  247. }
  248. // Delete deletes indexes by ids
  249. func (b *ElasticSearchIndexer) Delete(repoID int64) error {
  250. _, err := b.client.DeleteByQuery(b.indexerAliasName).
  251. Query(elastic.NewTermsQuery("repo_id", repoID)).
  252. Do(context.Background())
  253. return err
  254. }
  255. // indexPos find words positions for start and the following end on content. It will
  256. // return the beginning position of the first start and the ending position of the
  257. // first end following the start string.
  258. // If not found any of the positions, it will return -1, -1.
  259. func indexPos(content, start, end string) (int, int) {
  260. startIdx := strings.Index(content, start)
  261. if startIdx < 0 {
  262. return -1, -1
  263. }
  264. endIdx := strings.Index(content[startIdx+len(start):], end)
  265. if endIdx < 0 {
  266. return -1, -1
  267. }
  268. return startIdx, startIdx + len(start) + endIdx + len(end)
  269. }
  270. func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) {
  271. hits := make([]*SearchResult, 0, pageSize)
  272. for _, hit := range searchResult.Hits.Hits {
  273. // FIXME: There is no way to get the position the keyword on the content currently on the same request.
  274. // So we get it from content, this may made the query slower. See
  275. // https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291
  276. var startIndex, endIndex int = -1, -1
  277. c, ok := hit.Highlight["content"]
  278. if ok && len(c) > 0 {
  279. // FIXME: Since the highlighting content will include <em> and </em> for the keywords,
  280. // now we should find the positions. But how to avoid html content which contains the
  281. // <em> and </em> tags? If elastic search has handled that?
  282. startIndex, endIndex = indexPos(c[0], "<em>", "</em>")
  283. if startIndex == -1 {
  284. panic(fmt.Sprintf("1===%s,,,%#v,,,%s", kw, hit.Highlight, c[0]))
  285. }
  286. } else {
  287. panic(fmt.Sprintf("2===%#v", hit.Highlight))
  288. }
  289. repoID, fileName := parseIndexerID(hit.Id)
  290. var res = make(map[string]interface{})
  291. if err := json.Unmarshal(hit.Source, &res); err != nil {
  292. return 0, nil, nil, err
  293. }
  294. language := res["language"].(string)
  295. hits = append(hits, &SearchResult{
  296. RepoID: repoID,
  297. Filename: fileName,
  298. CommitID: res["commit_id"].(string),
  299. Content: res["content"].(string),
  300. UpdatedUnix: timeutil.TimeStamp(res["updated_at"].(float64)),
  301. Language: language,
  302. StartIndex: startIndex,
  303. EndIndex: endIndex - 9, // remove the length <em></em> since we give Content the original data
  304. Color: enry.GetColor(language),
  305. })
  306. }
  307. return searchResult.TotalHits(), hits, extractAggs(searchResult), nil
  308. }
  309. func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
  310. var searchResultLanguages []*SearchResultLanguages
  311. agg, found := searchResult.Aggregations.Terms("language")
  312. if found {
  313. searchResultLanguages = make([]*SearchResultLanguages, 0, 10)
  314. for _, bucket := range agg.Buckets {
  315. searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{
  316. Language: bucket.Key.(string),
  317. Color: enry.GetColor(bucket.Key.(string)),
  318. Count: int(bucket.DocCount),
  319. })
  320. }
  321. }
  322. return searchResultLanguages
  323. }
  324. // Search searches for codes and language stats by given conditions.
  325. func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
  326. searchType := esMultiMatchTypeBestFields
  327. if isMatch {
  328. searchType = esMultiMatchTypePhrasePrefix
  329. }
  330. kwQuery := elastic.NewMultiMatchQuery(keyword, "content").Type(searchType)
  331. query := elastic.NewBoolQuery()
  332. query = query.Must(kwQuery)
  333. if len(repoIDs) > 0 {
  334. var repoStrs = make([]interface{}, 0, len(repoIDs))
  335. for _, repoID := range repoIDs {
  336. repoStrs = append(repoStrs, repoID)
  337. }
  338. repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
  339. query = query.Must(repoQuery)
  340. }
  341. var (
  342. start int
  343. kw = "<em>" + keyword + "</em>"
  344. aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc()
  345. )
  346. if page > 0 {
  347. start = (page - 1) * pageSize
  348. }
  349. if len(language) == 0 {
  350. searchResult, err := b.client.Search().
  351. Index(b.indexerAliasName).
  352. Aggregation("language", aggregation).
  353. Query(query).
  354. Highlight(
  355. elastic.NewHighlight().
  356. Field("content").
  357. NumOfFragments(0). // return all highting content on fragments
  358. HighlighterType("fvh"),
  359. ).
  360. Sort("repo_id", true).
  361. From(start).Size(pageSize).
  362. Do(context.Background())
  363. if err != nil {
  364. return 0, nil, nil, err
  365. }
  366. return convertResult(searchResult, kw, pageSize)
  367. }
  368. langQuery := elastic.NewMatchQuery("language", language)
  369. countResult, err := b.client.Search().
  370. Index(b.indexerAliasName).
  371. Aggregation("language", aggregation).
  372. Query(query).
  373. Size(0). // We only needs stats information
  374. Do(context.Background())
  375. if err != nil {
  376. return 0, nil, nil, err
  377. }
  378. query = query.Must(langQuery)
  379. searchResult, err := b.client.Search().
  380. Index(b.indexerAliasName).
  381. Query(query).
  382. Highlight(
  383. elastic.NewHighlight().
  384. Field("content").
  385. NumOfFragments(0). // return all highting content on fragments
  386. HighlighterType("fvh"),
  387. ).
  388. Sort("repo_id", true).
  389. From(start).Size(pageSize).
  390. Do(context.Background())
  391. if err != nil {
  392. return 0, nil, nil, err
  393. }
  394. total, hits, _, err := convertResult(searchResult, kw, pageSize)
  395. return total, hits, extractAggs(countResult), err
  396. }
  397. // Close implements indexer
  398. func (b *ElasticSearchIndexer) Close() {}