aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/code/wrapped.go
blob: ba58236fbad13592e24e28a5dcb45a48e7b684a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package code

import (
	"context"
	"fmt"
	"sync"

	repo_model "code.gitea.io/gitea/models/repo"
	"code.gitea.io/gitea/modules/log"
)

var indexer = newWrappedIndexer()

// ErrWrappedIndexerClosed is the error returned if the indexer was closed before it was ready
var ErrWrappedIndexerClosed = fmt.Errorf("Indexer closed before ready")

type wrappedIndexer struct {
	internal Indexer
	lock     sync.RWMutex
	cond     *sync.Cond
	closed   bool
}

func newWrappedIndexer() *wrappedIndexer {
	w := &wrappedIndexer{}
	w.cond = sync.NewCond(w.lock.RLocker())
	return w
}

func (w *wrappedIndexer) set(indexer Indexer) {
	w.lock.Lock()
	defer w.lock.Unlock()
	if w.closed {
		// Too late!
		indexer.Close()
	}
	w.internal = indexer
	w.cond.Broadcast()
}

func (w *wrappedIndexer) get() (Indexer, error) {
	w.lock.RLock()
	defer w.lock.RUnlock()
	if w.internal == nil {
		if w.closed {
			return nil, ErrWrappedIndexerClosed
		}
		w.cond.Wait()
		if w.closed {
			return nil, ErrWrappedIndexerClosed
		}
	}
	return w.internal, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
	indexer, err := w.get()
	if err != nil {
		log.Error("Failed to get indexer: %v", err)
		return
	}
	indexer.SetAvailabilityChangeCallback(callback)
}

// Ping checks if elastic is available
func (w *wrappedIndexer) Ping() bool {
	indexer, err := w.get()
	if err != nil {
		log.Warn("Failed to get indexer: %v", err)
		return false
	}
	return indexer.Ping()
}

func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error {
	indexer, err := w.get()
	if err != nil {
		return err
	}
	return indexer.Index(ctx, repo, sha, changes)
}

func (w *wrappedIndexer) Delete(repoID int64) error {
	indexer, err := w.get()
	if err != nil {
		return err
	}
	return indexer.Delete(repoID)
}

func (w *wrappedIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
	indexer, err := w.get()
	if err != nil {
		return 0, nil, nil, err
	}
	return indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch)
}

func (w *wrappedIndexer) Close() {
	w.lock.Lock()
	defer w.lock.Unlock()
	if w.closed {
		return
	}
	w.closed = true
	w.cond.Broadcast()
	if w.internal != nil {
		w.internal.Close()
	}
}