1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
package nodb
import (
"fmt"
"sync"
"time"
"gitea.com/lunny/nodb/config"
"gitea.com/lunny/nodb/store"
"gitea.com/lunny/log"
)
type Nodb struct {
cfg *config.Config
ldb *store.DB
dbs [MaxDBNumber]*DB
quit chan struct{}
jobs *sync.WaitGroup
binlog *BinLog
wLock sync.RWMutex //allow one write at same time
commitLock sync.Mutex //allow one write commit at same time
}
func Open(cfg *config.Config) (*Nodb, error) {
if len(cfg.DataDir) == 0 {
cfg.DataDir = config.DefaultDataDir
}
ldb, err := store.Open(cfg)
if err != nil {
return nil, err
}
l := new(Nodb)
l.quit = make(chan struct{})
l.jobs = new(sync.WaitGroup)
l.ldb = ldb
if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 {
l.binlog, err = NewBinLog(cfg)
if err != nil {
return nil, err
}
} else {
l.binlog = nil
}
for i := uint8(0); i < MaxDBNumber; i++ {
l.dbs[i] = l.newDB(i)
}
l.activeExpireCycle()
return l, nil
}
func (l *Nodb) Close() {
close(l.quit)
l.jobs.Wait()
l.ldb.Close()
if l.binlog != nil {
l.binlog.Close()
l.binlog = nil
}
}
func (l *Nodb) Select(index int) (*DB, error) {
if index < 0 || index >= int(MaxDBNumber) {
return nil, fmt.Errorf("invalid db index %d", index)
}
return l.dbs[index], nil
}
func (l *Nodb) FlushAll() error {
for index, db := range l.dbs {
if _, err := db.FlushAll(); err != nil {
log.Errorf("flush db %d error %s", index, err.Error())
}
}
return nil
}
// very dangerous to use
func (l *Nodb) DataDB() *store.DB {
return l.ldb
}
func (l *Nodb) activeExpireCycle() {
var executors []*elimination = make([]*elimination, len(l.dbs))
for i, db := range l.dbs {
executors[i] = db.newEliminator()
}
l.jobs.Add(1)
go func() {
tick := time.NewTicker(1 * time.Second)
end := false
done := make(chan struct{})
for !end {
select {
case <-tick.C:
go func() {
for _, eli := range executors {
eli.active()
}
done <- struct{}{}
}()
<-done
case <-l.quit:
end = true
break
}
}
tick.Stop()
l.jobs.Done()
}()
}
|