You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

elastic_search.go 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package code
  4. import (
  5. "bufio"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. repo_model "code.gitea.io/gitea/models/repo"
  16. "code.gitea.io/gitea/modules/analyze"
  17. "code.gitea.io/gitea/modules/charset"
  18. "code.gitea.io/gitea/modules/git"
  19. "code.gitea.io/gitea/modules/graceful"
  20. "code.gitea.io/gitea/modules/json"
  21. "code.gitea.io/gitea/modules/log"
  22. "code.gitea.io/gitea/modules/setting"
  23. "code.gitea.io/gitea/modules/timeutil"
  24. "code.gitea.io/gitea/modules/typesniffer"
  25. "github.com/go-enry/go-enry/v2"
  26. "github.com/olivere/elastic/v7"
  27. )
  28. const (
  29. esRepoIndexerLatestVersion = 1
  30. // multi-match-types, currently only 2 types are used
  31. // Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
  32. esMultiMatchTypeBestFields = "best_fields"
  33. esMultiMatchTypePhrasePrefix = "phrase_prefix"
  34. )
  35. var _ Indexer = &ElasticSearchIndexer{}
  36. // ElasticSearchIndexer implements Indexer interface
  37. type ElasticSearchIndexer struct {
  38. client *elastic.Client
  39. indexerAliasName string
  40. available bool
  41. stopTimer chan struct{}
  42. lock sync.RWMutex
  43. }
  44. // NewElasticSearchIndexer creates a new elasticsearch indexer
  45. func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) {
  46. opts := []elastic.ClientOptionFunc{
  47. elastic.SetURL(url),
  48. elastic.SetSniff(false),
  49. elastic.SetHealthcheckInterval(10 * time.Second),
  50. elastic.SetGzip(false),
  51. }
  52. logger := log.GetLogger(log.DEFAULT)
  53. opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace}))
  54. opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info}))
  55. opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error}))
  56. client, err := elastic.NewClient(opts...)
  57. if err != nil {
  58. return nil, false, err
  59. }
  60. indexer := &ElasticSearchIndexer{
  61. client: client,
  62. indexerAliasName: indexerName,
  63. available: true,
  64. stopTimer: make(chan struct{}),
  65. }
  66. ticker := time.NewTicker(10 * time.Second)
  67. go func() {
  68. for {
  69. select {
  70. case <-ticker.C:
  71. indexer.checkAvailability()
  72. case <-indexer.stopTimer:
  73. ticker.Stop()
  74. return
  75. }
  76. }
  77. }()
  78. exists, err := indexer.init()
  79. if err != nil {
  80. indexer.Close()
  81. return nil, false, err
  82. }
  83. return indexer, !exists, err
  84. }
  85. const (
  86. defaultMapping = `{
  87. "mappings": {
  88. "properties": {
  89. "repo_id": {
  90. "type": "long",
  91. "index": true
  92. },
  93. "content": {
  94. "type": "text",
  95. "term_vector": "with_positions_offsets",
  96. "index": true
  97. },
  98. "commit_id": {
  99. "type": "keyword",
  100. "index": true
  101. },
  102. "language": {
  103. "type": "keyword",
  104. "index": true
  105. },
  106. "updated_at": {
  107. "type": "long",
  108. "index": true
  109. }
  110. }
  111. }
  112. }`
  113. )
  114. func (b *ElasticSearchIndexer) realIndexerName() string {
  115. return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion)
  116. }
  117. // Init will initialize the indexer
  118. func (b *ElasticSearchIndexer) init() (bool, error) {
  119. ctx := graceful.GetManager().HammerContext()
  120. exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
  121. if err != nil {
  122. return false, b.checkError(err)
  123. }
  124. if !exists {
  125. mapping := defaultMapping
  126. createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx)
  127. if err != nil {
  128. return false, b.checkError(err)
  129. }
  130. if !createIndex.Acknowledged {
  131. return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping)
  132. }
  133. }
  134. // check version
  135. r, err := b.client.Aliases().Do(ctx)
  136. if err != nil {
  137. return false, b.checkError(err)
  138. }
  139. realIndexerNames := r.IndicesByAlias(b.indexerAliasName)
  140. if len(realIndexerNames) < 1 {
  141. res, err := b.client.Alias().
  142. Add(b.realIndexerName(), b.indexerAliasName).
  143. Do(ctx)
  144. if err != nil {
  145. return false, b.checkError(err)
  146. }
  147. if !res.Acknowledged {
  148. return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName())
  149. }
  150. } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() {
  151. log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.",
  152. realIndexerNames[0], b.realIndexerName())
  153. res, err := b.client.Alias().
  154. Remove(realIndexerNames[0], b.indexerAliasName).
  155. Add(b.realIndexerName(), b.indexerAliasName).
  156. Do(ctx)
  157. if err != nil {
  158. return false, b.checkError(err)
  159. }
  160. if !res.Acknowledged {
  161. return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName())
  162. }
  163. }
  164. return exists, nil
  165. }
  166. // Ping checks if elastic is available
  167. func (b *ElasticSearchIndexer) Ping() bool {
  168. b.lock.RLock()
  169. defer b.lock.RUnlock()
  170. return b.available
  171. }
  172. func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) {
  173. // Ignore vendored files in code search
  174. if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
  175. return nil, nil
  176. }
  177. size := update.Size
  178. var err error
  179. if !update.Sized {
  180. var stdout string
  181. stdout, _, err = git.NewCommand(ctx, "cat-file", "-s").AddDynamicArguments(update.BlobSha).RunStdString(&git.RunOpts{Dir: repo.RepoPath()})
  182. if err != nil {
  183. return nil, err
  184. }
  185. if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil {
  186. return nil, fmt.Errorf("misformatted git cat-file output: %w", err)
  187. }
  188. }
  189. if size > setting.Indexer.MaxIndexerFileSize {
  190. return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
  191. }
  192. if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
  193. return nil, err
  194. }
  195. _, _, size, err = git.ReadBatchLine(batchReader)
  196. if err != nil {
  197. return nil, err
  198. }
  199. fileContents, err := io.ReadAll(io.LimitReader(batchReader, size))
  200. if err != nil {
  201. return nil, err
  202. } else if !typesniffer.DetectContentType(fileContents).IsText() {
  203. // FIXME: UTF-16 files will probably fail here
  204. return nil, nil
  205. }
  206. if _, err = batchReader.Discard(1); err != nil {
  207. return nil, err
  208. }
  209. id := filenameIndexerID(repo.ID, update.Filename)
  210. return []elastic.BulkableRequest{
  211. elastic.NewBulkIndexRequest().
  212. Index(b.indexerAliasName).
  213. Id(id).
  214. Doc(map[string]interface{}{
  215. "repo_id": repo.ID,
  216. "content": string(charset.ToUTF8DropErrors(fileContents)),
  217. "commit_id": sha,
  218. "language": analyze.GetCodeLanguage(update.Filename, fileContents),
  219. "updated_at": timeutil.TimeStampNow(),
  220. }),
  221. }, nil
  222. }
  223. func (b *ElasticSearchIndexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest {
  224. id := filenameIndexerID(repo.ID, filename)
  225. return elastic.NewBulkDeleteRequest().
  226. Index(b.indexerAliasName).
  227. Id(id)
  228. }
  229. // Index will save the index data
  230. func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error {
  231. reqs := make([]elastic.BulkableRequest, 0)
  232. if len(changes.Updates) > 0 {
  233. // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first!
  234. if err := git.EnsureValidGitRepository(ctx, repo.RepoPath()); err != nil {
  235. log.Error("Unable to open git repo: %s for %-v: %v", repo.RepoPath(), repo, err)
  236. return err
  237. }
  238. batchWriter, batchReader, cancel := git.CatFileBatch(ctx, repo.RepoPath())
  239. defer cancel()
  240. for _, update := range changes.Updates {
  241. updateReqs, err := b.addUpdate(ctx, batchWriter, batchReader, sha, update, repo)
  242. if err != nil {
  243. return err
  244. }
  245. if len(updateReqs) > 0 {
  246. reqs = append(reqs, updateReqs...)
  247. }
  248. }
  249. cancel()
  250. }
  251. for _, filename := range changes.RemovedFilenames {
  252. reqs = append(reqs, b.addDelete(filename, repo))
  253. }
  254. if len(reqs) > 0 {
  255. _, err := b.client.Bulk().
  256. Index(b.indexerAliasName).
  257. Add(reqs...).
  258. Do(ctx)
  259. return b.checkError(err)
  260. }
  261. return nil
  262. }
  263. // Delete deletes indexes by ids
  264. func (b *ElasticSearchIndexer) Delete(repoID int64) error {
  265. _, err := b.client.DeleteByQuery(b.indexerAliasName).
  266. Query(elastic.NewTermsQuery("repo_id", repoID)).
  267. Do(graceful.GetManager().HammerContext())
  268. return b.checkError(err)
  269. }
  270. // indexPos find words positions for start and the following end on content. It will
  271. // return the beginning position of the first start and the ending position of the
  272. // first end following the start string.
  273. // If not found any of the positions, it will return -1, -1.
  274. func indexPos(content, start, end string) (int, int) {
  275. startIdx := strings.Index(content, start)
  276. if startIdx < 0 {
  277. return -1, -1
  278. }
  279. endIdx := strings.Index(content[startIdx+len(start):], end)
  280. if endIdx < 0 {
  281. return -1, -1
  282. }
  283. return startIdx, startIdx + len(start) + endIdx + len(end)
  284. }
  285. func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) {
  286. hits := make([]*SearchResult, 0, pageSize)
  287. for _, hit := range searchResult.Hits.Hits {
  288. // FIXME: There is no way to get the position the keyword on the content currently on the same request.
  289. // So we get it from content, this may made the query slower. See
  290. // https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291
  291. var startIndex, endIndex int
  292. c, ok := hit.Highlight["content"]
  293. if ok && len(c) > 0 {
  294. // FIXME: Since the highlighting content will include <em> and </em> for the keywords,
  295. // now we should find the positions. But how to avoid html content which contains the
  296. // <em> and </em> tags? If elastic search has handled that?
  297. startIndex, endIndex = indexPos(c[0], "<em>", "</em>")
  298. if startIndex == -1 {
  299. panic(fmt.Sprintf("1===%s,,,%#v,,,%s", kw, hit.Highlight, c[0]))
  300. }
  301. } else {
  302. panic(fmt.Sprintf("2===%#v", hit.Highlight))
  303. }
  304. repoID, fileName := parseIndexerID(hit.Id)
  305. res := make(map[string]interface{})
  306. if err := json.Unmarshal(hit.Source, &res); err != nil {
  307. return 0, nil, nil, err
  308. }
  309. language := res["language"].(string)
  310. hits = append(hits, &SearchResult{
  311. RepoID: repoID,
  312. Filename: fileName,
  313. CommitID: res["commit_id"].(string),
  314. Content: res["content"].(string),
  315. UpdatedUnix: timeutil.TimeStamp(res["updated_at"].(float64)),
  316. Language: language,
  317. StartIndex: startIndex,
  318. EndIndex: endIndex - 9, // remove the length <em></em> since we give Content the original data
  319. Color: enry.GetColor(language),
  320. })
  321. }
  322. return searchResult.TotalHits(), hits, extractAggs(searchResult), nil
  323. }
  324. func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
  325. var searchResultLanguages []*SearchResultLanguages
  326. agg, found := searchResult.Aggregations.Terms("language")
  327. if found {
  328. searchResultLanguages = make([]*SearchResultLanguages, 0, 10)
  329. for _, bucket := range agg.Buckets {
  330. searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{
  331. Language: bucket.Key.(string),
  332. Color: enry.GetColor(bucket.Key.(string)),
  333. Count: int(bucket.DocCount),
  334. })
  335. }
  336. }
  337. return searchResultLanguages
  338. }
  339. // Search searches for codes and language stats by given conditions.
  340. func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
  341. searchType := esMultiMatchTypeBestFields
  342. if isMatch {
  343. searchType = esMultiMatchTypePhrasePrefix
  344. }
  345. kwQuery := elastic.NewMultiMatchQuery(keyword, "content").Type(searchType)
  346. query := elastic.NewBoolQuery()
  347. query = query.Must(kwQuery)
  348. if len(repoIDs) > 0 {
  349. repoStrs := make([]interface{}, 0, len(repoIDs))
  350. for _, repoID := range repoIDs {
  351. repoStrs = append(repoStrs, repoID)
  352. }
  353. repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
  354. query = query.Must(repoQuery)
  355. }
  356. var (
  357. start int
  358. kw = "<em>" + keyword + "</em>"
  359. aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc()
  360. )
  361. if page > 0 {
  362. start = (page - 1) * pageSize
  363. }
  364. if len(language) == 0 {
  365. searchResult, err := b.client.Search().
  366. Index(b.indexerAliasName).
  367. Aggregation("language", aggregation).
  368. Query(query).
  369. Highlight(
  370. elastic.NewHighlight().
  371. Field("content").
  372. NumOfFragments(0). // return all highting content on fragments
  373. HighlighterType("fvh"),
  374. ).
  375. Sort("repo_id", true).
  376. From(start).Size(pageSize).
  377. Do(ctx)
  378. if err != nil {
  379. return 0, nil, nil, b.checkError(err)
  380. }
  381. return convertResult(searchResult, kw, pageSize)
  382. }
  383. langQuery := elastic.NewMatchQuery("language", language)
  384. countResult, err := b.client.Search().
  385. Index(b.indexerAliasName).
  386. Aggregation("language", aggregation).
  387. Query(query).
  388. Size(0). // We only needs stats information
  389. Do(ctx)
  390. if err != nil {
  391. return 0, nil, nil, b.checkError(err)
  392. }
  393. query = query.Must(langQuery)
  394. searchResult, err := b.client.Search().
  395. Index(b.indexerAliasName).
  396. Query(query).
  397. Highlight(
  398. elastic.NewHighlight().
  399. Field("content").
  400. NumOfFragments(0). // return all highting content on fragments
  401. HighlighterType("fvh"),
  402. ).
  403. Sort("repo_id", true).
  404. From(start).Size(pageSize).
  405. Do(ctx)
  406. if err != nil {
  407. return 0, nil, nil, b.checkError(err)
  408. }
  409. total, hits, _, err := convertResult(searchResult, kw, pageSize)
  410. return total, hits, extractAggs(countResult), err
  411. }
  412. // Close implements indexer
  413. func (b *ElasticSearchIndexer) Close() {
  414. select {
  415. case <-b.stopTimer:
  416. default:
  417. close(b.stopTimer)
  418. }
  419. }
  420. func (b *ElasticSearchIndexer) checkError(err error) error {
  421. var opErr *net.OpError
  422. if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
  423. return err
  424. }
  425. b.setAvailability(false)
  426. return err
  427. }
  428. func (b *ElasticSearchIndexer) checkAvailability() {
  429. if b.Ping() {
  430. return
  431. }
  432. // Request cluster state to check if elastic is available again
  433. _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
  434. if err != nil {
  435. b.setAvailability(false)
  436. return
  437. }
  438. b.setAvailability(true)
  439. }
  440. func (b *ElasticSearchIndexer) setAvailability(available bool) {
  441. b.lock.Lock()
  442. defer b.lock.Unlock()
  443. if b.available == available {
  444. return
  445. }
  446. b.available = available
  447. }