diff options
author | zeripath <art27@cantab.net> | 2020-05-07 22:49:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-07 22:49:00 +0100 |
commit | 791353c03ba81d1c67393a04256a77293307ecad (patch) | |
tree | b0771f7e1683db318c5e5606a312319578392dcd /modules/eventsource | |
parent | 486e4c8087746ca91c05a693cadd563ac061a913 (diff) | |
download | gitea-791353c03ba81d1c67393a04256a77293307ecad.tar.gz gitea-791353c03ba81d1c67393a04256a77293307ecad.zip |
Add EventSource support (#11235)
If the browser supports EventSource switch to use this instead of
polling notifications.
Signed-off-by: Andrew Thornton art27@cantab.net
Diffstat (limited to 'modules/eventsource')
-rw-r--r-- | modules/eventsource/event.go | 119 | ||||
-rw-r--r-- | modules/eventsource/event_test.go | 54 | ||||
-rw-r--r-- | modules/eventsource/manager.go | 84 | ||||
-rw-r--r-- | modules/eventsource/manager_run.go | 50 | ||||
-rw-r--r-- | modules/eventsource/messenger.go | 78 |
5 files changed, 385 insertions, 0 deletions
diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go new file mode 100644 index 0000000000..fd418c6f07 --- /dev/null +++ b/modules/eventsource/event.go @@ -0,0 +1,119 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package eventsource + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "strings" + "time" +) + +func wrapNewlines(w io.Writer, prefix []byte, value []byte) (sum int64, err error) { + if len(value) == 0 { + return + } + n := 0 + last := 0 + for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') { + n, err = w.Write(prefix) + sum += int64(n) + if err != nil { + return + } + n, err = w.Write(value[last : last+j+1]) + sum += int64(n) + if err != nil { + return + } + last += j + 1 + } + n, err = w.Write(prefix) + sum += int64(n) + if err != nil { + return + } + n, err = w.Write(value[last:]) + sum += int64(n) + if err != nil { + return + } + n, err = w.Write([]byte("\n")) + sum += int64(n) + return +} + +// Event is an eventsource event, not all fields need to be set +type Event struct { + // Name represents the value of the event: tag in the stream + Name string + // Data is either JSONified []byte or interface{} that can be JSONd + Data interface{} + // ID represents the ID of an event + ID string + // Retry tells the receiver only to attempt to reconnect to the source after this time + Retry time.Duration +} + +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. Any error encountered during the write is also returned. +func (e *Event) WriteTo(w io.Writer) (int64, error) { + sum := int64(0) + nint := 0 + n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name)) + sum += n + if err != nil { + return sum, err + } + + if e.Data != nil { + var data []byte + switch v := e.Data.(type) { + case []byte: + data = v + case string: + data = []byte(v) + default: + var err error + data, err = json.Marshal(e.Data) + if err != nil { + return sum, err + } + } + n, err := wrapNewlines(w, []byte("data: "), data) + sum += n + if err != nil { + return sum, err + } + + } + + n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID)) + sum += n + if err != nil { + return sum, err + } + + if e.Retry != 0 { + nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond)) + sum += int64(nint) + if err != nil { + return sum, err + } + } + + nint, err = w.Write([]byte("\n")) + sum += int64(nint) + + return sum, err +} + +func (e *Event) String() string { + buf := new(strings.Builder) + _, _ = e.WriteTo(buf) + return buf.String() +} diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go new file mode 100644 index 0000000000..a80e062f0e --- /dev/null +++ b/modules/eventsource/event_test.go @@ -0,0 +1,54 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package eventsource + +import ( + "bytes" + "testing" +) + +func Test_wrapNewlines(t *testing.T) { + tests := []struct { + name string + prefix string + value string + output string + }{ + { + "check no new lines", + "prefix: ", + "value", + "prefix: value\n", + }, + { + "check simple newline", + "prefix: ", + "value1\nvalue2", + "prefix: value1\nprefix: value2\n", + }, + { + "check pathological newlines", + "p: ", + "\n1\n\n2\n3\n", + "p: \np: 1\np: \np: 2\np: 3\np: \n", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &bytes.Buffer{} + gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value)) + if err != nil { + t.Errorf("wrapNewlines() error = %v", err) + return + } + if gotSum != int64(len(tt.output)) { + t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output))) + } + if gotW := w.String(); gotW != tt.output { + t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output) + } + }) + } +} diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go new file mode 100644 index 0000000000..212fe60569 --- /dev/null +++ b/modules/eventsource/manager.go @@ -0,0 +1,84 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package eventsource + +import ( + "sync" +) + +// Manager manages the eventsource Messengers +type Manager struct { + mutex sync.Mutex + + messengers map[int64]*Messenger +} + +var manager *Manager + +func init() { + manager = &Manager{ + messengers: make(map[int64]*Messenger), + } +} + +// GetManager returns a Manager and initializes one as singleton if there's none yet +func GetManager() *Manager { + return manager +} + +// Register message channel +func (m *Manager) Register(uid int64) <-chan *Event { + m.mutex.Lock() + messenger, ok := m.messengers[uid] + if !ok { + messenger = NewMessenger(uid) + m.messengers[uid] = messenger + } + m.mutex.Unlock() + return messenger.Register() +} + +// Unregister message channel +func (m *Manager) Unregister(uid int64, channel <-chan *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + messenger, ok := m.messengers[uid] + if !ok { + return + } + if messenger.Unregister(channel) { + delete(m.messengers, uid) + } +} + +// UnregisterAll message channels +func (m *Manager) UnregisterAll() { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, messenger := range m.messengers { + messenger.UnregisterAll() + } + m.messengers = map[int64]*Messenger{} +} + +// SendMessage sends a message to a particular user +func (m *Manager) SendMessage(uid int64, message *Event) { + m.mutex.Lock() + messenger, ok := m.messengers[uid] + m.mutex.Unlock() + if ok { + messenger.SendMessage(message) + } +} + +// SendMessageBlocking sends a message to a particular user +func (m *Manager) SendMessageBlocking(uid int64, message *Event) { + m.mutex.Lock() + messenger, ok := m.messengers[uid] + m.mutex.Unlock() + if ok { + messenger.SendMessageBlocking(message) + } +} diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go new file mode 100644 index 0000000000..75d3ee5b01 --- /dev/null +++ b/modules/eventsource/manager_run.go @@ -0,0 +1,50 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package eventsource + +import ( + "context" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" +) + +// Init starts this eventsource +func (m *Manager) Init() { + go graceful.GetManager().RunWithShutdownContext(m.Run) +} + +// Run runs the manager within a provided context +func (m *Manager) Run(ctx context.Context) { + then := timeutil.TimeStampNow().Add(-2) + timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) +loop: + for { + select { + case <-ctx.Done(): + timer.Stop() + break loop + case <-timer.C: + now := timeutil.TimeStampNow().Add(-2) + + uidCounts, err := models.GetUIDsAndNotificationCounts(then, now) + if err != nil { + log.Error("Unable to get UIDcounts: %v", err) + } + for _, uidCount := range uidCounts { + m.SendMessage(uidCount.UserID, &Event{ + Name: "notification-count", + Data: uidCount, + }) + } + then = now + } + } + m.UnregisterAll() +} diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go new file mode 100644 index 0000000000..091e1a5c1c --- /dev/null +++ b/modules/eventsource/messenger.go @@ -0,0 +1,78 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package eventsource + +import "sync" + +// Messenger is a per uid message store +type Messenger struct { + mutex sync.Mutex + uid int64 + channels []chan *Event +} + +// NewMessenger creates a messenger for a particular uid +func NewMessenger(uid int64) *Messenger { + return &Messenger{ + uid: uid, + channels: [](chan *Event){}, + } +} + +// Register returns a new chan []byte +func (m *Messenger) Register() <-chan *Event { + m.mutex.Lock() + // TODO: Limit the number of messengers per uid + channel := make(chan *Event, 1) + m.channels = append(m.channels, channel) + m.mutex.Unlock() + return channel +} + +// Unregister removes the provider chan []byte +func (m *Messenger) Unregister(channel <-chan *Event) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + for i, toRemove := range m.channels { + if channel == toRemove { + m.channels = append(m.channels[:i], m.channels[i+1:]...) + close(toRemove) + break + } + } + return len(m.channels) == 0 +} + +// UnregisterAll removes all chan []byte +func (m *Messenger) UnregisterAll() { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, channel := range m.channels { + close(channel) + } + m.channels = nil +} + +// SendMessage sends the message to all registered channels +func (m *Messenger) SendMessage(message *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.channels { + channel := m.channels[i] + select { + case channel <- message: + default: + } + } +} + +// SendMessageBlocking sends the message to all registered channels and ensures it gets sent +func (m *Messenger) SendMessageBlocking(message *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.channels { + m.channels[i] <- message + } +} |