You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager_run.go 3.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package eventsource
  4. import (
  5. "context"
  6. "time"
  7. activities_model "code.gitea.io/gitea/models/activities"
  8. issues_model "code.gitea.io/gitea/models/issues"
  9. "code.gitea.io/gitea/modules/graceful"
  10. "code.gitea.io/gitea/modules/json"
  11. "code.gitea.io/gitea/modules/log"
  12. "code.gitea.io/gitea/modules/process"
  13. "code.gitea.io/gitea/modules/setting"
  14. "code.gitea.io/gitea/modules/timeutil"
  15. "code.gitea.io/gitea/services/convert"
  16. )
  17. // Init starts this eventsource
  18. func (m *Manager) Init() {
  19. if setting.UI.Notification.EventSourceUpdateTime <= 0 {
  20. return
  21. }
  22. go graceful.GetManager().RunWithShutdownContext(m.Run)
  23. }
  24. // Run runs the manager within a provided context
  25. func (m *Manager) Run(ctx context.Context) {
  26. ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true)
  27. defer finished()
  28. then := timeutil.TimeStampNow().Add(-2)
  29. timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
  30. loop:
  31. for {
  32. select {
  33. case <-ctx.Done():
  34. timer.Stop()
  35. break loop
  36. case <-timer.C:
  37. m.mutex.Lock()
  38. connectionCount := len(m.messengers)
  39. if connectionCount == 0 {
  40. log.Trace("Event source has no listeners")
  41. // empty the connection channel
  42. select {
  43. case <-m.connection:
  44. default:
  45. }
  46. }
  47. m.mutex.Unlock()
  48. if connectionCount == 0 {
  49. // No listeners so the source can be paused
  50. log.Trace("Pausing the eventsource")
  51. select {
  52. case <-ctx.Done():
  53. break loop
  54. case <-m.connection:
  55. log.Trace("Connection detected - restarting the eventsource")
  56. // OK we're back so lets reset the timer and start again
  57. // We won't change the "then" time because there could be concurrency issues
  58. select {
  59. case <-timer.C:
  60. default:
  61. }
  62. continue
  63. }
  64. }
  65. now := timeutil.TimeStampNow().Add(-2)
  66. uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
  67. if err != nil {
  68. log.Error("Unable to get UIDcounts: %v", err)
  69. }
  70. for _, uidCount := range uidCounts {
  71. m.SendMessage(uidCount.UserID, &Event{
  72. Name: "notification-count",
  73. Data: uidCount,
  74. })
  75. }
  76. then = now
  77. if setting.Service.EnableTimetracking {
  78. usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
  79. if err != nil {
  80. log.Error("Unable to get GetUIDsAndStopwatch: %v", err)
  81. return
  82. }
  83. for _, userStopwatches := range usersStopwatches {
  84. apiSWs, err := convert.ToStopWatches(userStopwatches.StopWatches)
  85. if err != nil {
  86. if !issues_model.IsErrIssueNotExist(err) {
  87. log.Error("Unable to APIFormat stopwatches: %v", err)
  88. }
  89. continue
  90. }
  91. dataBs, err := json.Marshal(apiSWs)
  92. if err != nil {
  93. log.Error("Unable to marshal stopwatches: %v", err)
  94. continue
  95. }
  96. m.SendMessage(userStopwatches.UserID, &Event{
  97. Name: "stopwatches",
  98. Data: string(dataBs),
  99. })
  100. }
  101. }
  102. }
  103. }
  104. m.UnregisterAll()
  105. }