diff options
Diffstat (limited to 'modules/eventsource/messenger.go')
-rw-r--r-- | modules/eventsource/messenger.go | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go new file mode 100644 index 0000000000..091e1a5c1c --- /dev/null +++ b/modules/eventsource/messenger.go @@ -0,0 +1,78 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package eventsource + +import "sync" + +// Messenger is a per uid message store +type Messenger struct { + mutex sync.Mutex + uid int64 + channels []chan *Event +} + +// NewMessenger creates a messenger for a particular uid +func NewMessenger(uid int64) *Messenger { + return &Messenger{ + uid: uid, + channels: [](chan *Event){}, + } +} + +// Register returns a new chan []byte +func (m *Messenger) Register() <-chan *Event { + m.mutex.Lock() + // TODO: Limit the number of messengers per uid + channel := make(chan *Event, 1) + m.channels = append(m.channels, channel) + m.mutex.Unlock() + return channel +} + +// Unregister removes the provider chan []byte +func (m *Messenger) Unregister(channel <-chan *Event) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + for i, toRemove := range m.channels { + if channel == toRemove { + m.channels = append(m.channels[:i], m.channels[i+1:]...) + close(toRemove) + break + } + } + return len(m.channels) == 0 +} + +// UnregisterAll removes all chan []byte +func (m *Messenger) UnregisterAll() { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, channel := range m.channels { + close(channel) + } + m.channels = nil +} + +// SendMessage sends the message to all registered channels +func (m *Messenger) SendMessage(message *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.channels { + channel := m.channels[i] + select { + case channel <- message: + default: + } + } +} + +// SendMessageBlocking sends the message to all registered channels and ensures it gets sent +func (m *Messenger) SendMessageBlocking(message *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.channels { + m.channels[i] <- message + } +} |