summaryrefslogtreecommitdiffstats
path: root/modules/eventsource
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-05-07 22:49:00 +0100
committerGitHub <noreply@github.com>2020-05-07 22:49:00 +0100
commit791353c03ba81d1c67393a04256a77293307ecad (patch)
treeb0771f7e1683db318c5e5606a312319578392dcd /modules/eventsource
parent486e4c8087746ca91c05a693cadd563ac061a913 (diff)
downloadgitea-791353c03ba81d1c67393a04256a77293307ecad.tar.gz
gitea-791353c03ba81d1c67393a04256a77293307ecad.zip
Add EventSource support (#11235)
If the browser supports EventSource switch to use this instead of polling notifications. Signed-off-by: Andrew Thornton art27@cantab.net
Diffstat (limited to 'modules/eventsource')
-rw-r--r--modules/eventsource/event.go119
-rw-r--r--modules/eventsource/event_test.go54
-rw-r--r--modules/eventsource/manager.go84
-rw-r--r--modules/eventsource/manager_run.go50
-rw-r--r--modules/eventsource/messenger.go78
5 files changed, 385 insertions, 0 deletions
diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go
new file mode 100644
index 0000000000..fd418c6f07
--- /dev/null
+++ b/modules/eventsource/event.go
@@ -0,0 +1,119 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+)
+
+func wrapNewlines(w io.Writer, prefix []byte, value []byte) (sum int64, err error) {
+ if len(value) == 0 {
+ return
+ }
+ n := 0
+ last := 0
+ for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') {
+ n, err = w.Write(prefix)
+ sum += int64(n)
+ if err != nil {
+ return
+ }
+ n, err = w.Write(value[last : last+j+1])
+ sum += int64(n)
+ if err != nil {
+ return
+ }
+ last += j + 1
+ }
+ n, err = w.Write(prefix)
+ sum += int64(n)
+ if err != nil {
+ return
+ }
+ n, err = w.Write(value[last:])
+ sum += int64(n)
+ if err != nil {
+ return
+ }
+ n, err = w.Write([]byte("\n"))
+ sum += int64(n)
+ return
+}
+
+// Event is an eventsource event, not all fields need to be set
+type Event struct {
+ // Name represents the value of the event: tag in the stream
+ Name string
+ // Data is either JSONified []byte or interface{} that can be JSONd
+ Data interface{}
+ // ID represents the ID of an event
+ ID string
+ // Retry tells the receiver only to attempt to reconnect to the source after this time
+ Retry time.Duration
+}
+
+// WriteTo writes data to w until there's no more data to write or when an error occurs.
+// The return value n is the number of bytes written. Any error encountered during the write is also returned.
+func (e *Event) WriteTo(w io.Writer) (int64, error) {
+ sum := int64(0)
+ nint := 0
+ n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name))
+ sum += n
+ if err != nil {
+ return sum, err
+ }
+
+ if e.Data != nil {
+ var data []byte
+ switch v := e.Data.(type) {
+ case []byte:
+ data = v
+ case string:
+ data = []byte(v)
+ default:
+ var err error
+ data, err = json.Marshal(e.Data)
+ if err != nil {
+ return sum, err
+ }
+ }
+ n, err := wrapNewlines(w, []byte("data: "), data)
+ sum += n
+ if err != nil {
+ return sum, err
+ }
+
+ }
+
+ n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID))
+ sum += n
+ if err != nil {
+ return sum, err
+ }
+
+ if e.Retry != 0 {
+ nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond))
+ sum += int64(nint)
+ if err != nil {
+ return sum, err
+ }
+ }
+
+ nint, err = w.Write([]byte("\n"))
+ sum += int64(nint)
+
+ return sum, err
+}
+
+func (e *Event) String() string {
+ buf := new(strings.Builder)
+ _, _ = e.WriteTo(buf)
+ return buf.String()
+}
diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go
new file mode 100644
index 0000000000..a80e062f0e
--- /dev/null
+++ b/modules/eventsource/event_test.go
@@ -0,0 +1,54 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+ "bytes"
+ "testing"
+)
+
+func Test_wrapNewlines(t *testing.T) {
+ tests := []struct {
+ name string
+ prefix string
+ value string
+ output string
+ }{
+ {
+ "check no new lines",
+ "prefix: ",
+ "value",
+ "prefix: value\n",
+ },
+ {
+ "check simple newline",
+ "prefix: ",
+ "value1\nvalue2",
+ "prefix: value1\nprefix: value2\n",
+ },
+ {
+ "check pathological newlines",
+ "p: ",
+ "\n1\n\n2\n3\n",
+ "p: \np: 1\np: \np: 2\np: 3\np: \n",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &bytes.Buffer{}
+ gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value))
+ if err != nil {
+ t.Errorf("wrapNewlines() error = %v", err)
+ return
+ }
+ if gotSum != int64(len(tt.output)) {
+ t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output)))
+ }
+ if gotW := w.String(); gotW != tt.output {
+ t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output)
+ }
+ })
+ }
+}
diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go
new file mode 100644
index 0000000000..212fe60569
--- /dev/null
+++ b/modules/eventsource/manager.go
@@ -0,0 +1,84 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+ "sync"
+)
+
+// Manager manages the eventsource Messengers
+type Manager struct {
+ mutex sync.Mutex
+
+ messengers map[int64]*Messenger
+}
+
+var manager *Manager
+
+func init() {
+ manager = &Manager{
+ messengers: make(map[int64]*Messenger),
+ }
+}
+
+// GetManager returns a Manager and initializes one as singleton if there's none yet
+func GetManager() *Manager {
+ return manager
+}
+
+// Register message channel
+func (m *Manager) Register(uid int64) <-chan *Event {
+ m.mutex.Lock()
+ messenger, ok := m.messengers[uid]
+ if !ok {
+ messenger = NewMessenger(uid)
+ m.messengers[uid] = messenger
+ }
+ m.mutex.Unlock()
+ return messenger.Register()
+}
+
+// Unregister message channel
+func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ messenger, ok := m.messengers[uid]
+ if !ok {
+ return
+ }
+ if messenger.Unregister(channel) {
+ delete(m.messengers, uid)
+ }
+}
+
+// UnregisterAll message channels
+func (m *Manager) UnregisterAll() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for _, messenger := range m.messengers {
+ messenger.UnregisterAll()
+ }
+ m.messengers = map[int64]*Messenger{}
+}
+
+// SendMessage sends a message to a particular user
+func (m *Manager) SendMessage(uid int64, message *Event) {
+ m.mutex.Lock()
+ messenger, ok := m.messengers[uid]
+ m.mutex.Unlock()
+ if ok {
+ messenger.SendMessage(message)
+ }
+}
+
+// SendMessageBlocking sends a message to a particular user
+func (m *Manager) SendMessageBlocking(uid int64, message *Event) {
+ m.mutex.Lock()
+ messenger, ok := m.messengers[uid]
+ m.mutex.Unlock()
+ if ok {
+ messenger.SendMessageBlocking(message)
+ }
+}
diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go
new file mode 100644
index 0000000000..75d3ee5b01
--- /dev/null
+++ b/modules/eventsource/manager_run.go
@@ -0,0 +1,50 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+ "context"
+ "time"
+
+ "code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/timeutil"
+)
+
+// Init starts this eventsource
+func (m *Manager) Init() {
+ go graceful.GetManager().RunWithShutdownContext(m.Run)
+}
+
+// Run runs the manager within a provided context
+func (m *Manager) Run(ctx context.Context) {
+ then := timeutil.TimeStampNow().Add(-2)
+ timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ break loop
+ case <-timer.C:
+ now := timeutil.TimeStampNow().Add(-2)
+
+ uidCounts, err := models.GetUIDsAndNotificationCounts(then, now)
+ if err != nil {
+ log.Error("Unable to get UIDcounts: %v", err)
+ }
+ for _, uidCount := range uidCounts {
+ m.SendMessage(uidCount.UserID, &Event{
+ Name: "notification-count",
+ Data: uidCount,
+ })
+ }
+ then = now
+ }
+ }
+ m.UnregisterAll()
+}
diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go
new file mode 100644
index 0000000000..091e1a5c1c
--- /dev/null
+++ b/modules/eventsource/messenger.go
@@ -0,0 +1,78 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import "sync"
+
+// Messenger is a per uid message store
+type Messenger struct {
+ mutex sync.Mutex
+ uid int64
+ channels []chan *Event
+}
+
+// NewMessenger creates a messenger for a particular uid
+func NewMessenger(uid int64) *Messenger {
+ return &Messenger{
+ uid: uid,
+ channels: [](chan *Event){},
+ }
+}
+
+// Register returns a new chan []byte
+func (m *Messenger) Register() <-chan *Event {
+ m.mutex.Lock()
+ // TODO: Limit the number of messengers per uid
+ channel := make(chan *Event, 1)
+ m.channels = append(m.channels, channel)
+ m.mutex.Unlock()
+ return channel
+}
+
+// Unregister removes the provider chan []byte
+func (m *Messenger) Unregister(channel <-chan *Event) bool {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i, toRemove := range m.channels {
+ if channel == toRemove {
+ m.channels = append(m.channels[:i], m.channels[i+1:]...)
+ close(toRemove)
+ break
+ }
+ }
+ return len(m.channels) == 0
+}
+
+// UnregisterAll removes all chan []byte
+func (m *Messenger) UnregisterAll() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for _, channel := range m.channels {
+ close(channel)
+ }
+ m.channels = nil
+}
+
+// SendMessage sends the message to all registered channels
+func (m *Messenger) SendMessage(message *Event) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i := range m.channels {
+ channel := m.channels[i]
+ select {
+ case channel <- message:
+ default:
+ }
+ }
+}
+
+// SendMessageBlocking sends the message to all registered channels and ensures it gets sent
+func (m *Messenger) SendMessageBlocking(message *Event) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i := range m.channels {
+ m.channels[i] <- message
+ }
+}