You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

events.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package events
  5. import (
  6. "net/http"
  7. "time"
  8. "code.gitea.io/gitea/models"
  9. "code.gitea.io/gitea/models/db"
  10. "code.gitea.io/gitea/modules/context"
  11. "code.gitea.io/gitea/modules/convert"
  12. "code.gitea.io/gitea/modules/eventsource"
  13. "code.gitea.io/gitea/modules/graceful"
  14. "code.gitea.io/gitea/modules/json"
  15. "code.gitea.io/gitea/modules/log"
  16. "code.gitea.io/gitea/modules/setting"
  17. "code.gitea.io/gitea/routers/web/auth"
  18. )
  19. // Events listens for events
  20. func Events(ctx *context.Context) {
  21. // FIXME: Need to check if resp is actually a http.Flusher! - how though?
  22. // Set the headers related to event streaming.
  23. ctx.Resp.Header().Set("Content-Type", "text/event-stream")
  24. ctx.Resp.Header().Set("Cache-Control", "no-cache")
  25. ctx.Resp.Header().Set("Connection", "keep-alive")
  26. ctx.Resp.Header().Set("X-Accel-Buffering", "no")
  27. ctx.Resp.WriteHeader(http.StatusOK)
  28. if !ctx.IsSigned {
  29. // Return unauthorized status event
  30. event := &eventsource.Event{
  31. Name: "close",
  32. Data: "unauthorized",
  33. }
  34. _, _ = event.WriteTo(ctx)
  35. ctx.Resp.Flush()
  36. return
  37. }
  38. // Listen to connection close and un-register messageChan
  39. notify := ctx.Done()
  40. ctx.Resp.Flush()
  41. shutdownCtx := graceful.GetManager().ShutdownContext()
  42. uid := ctx.User.ID
  43. messageChan := eventsource.GetManager().Register(uid)
  44. unregister := func() {
  45. eventsource.GetManager().Unregister(uid, messageChan)
  46. // ensure the messageChan is closed
  47. for {
  48. _, ok := <-messageChan
  49. if !ok {
  50. break
  51. }
  52. }
  53. }
  54. if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
  55. log.Error("Unable to write to EventStream: %v", err)
  56. unregister()
  57. return
  58. }
  59. timer := time.NewTicker(30 * time.Second)
  60. stopwatchTimer := time.NewTicker(setting.UI.Notification.MinTimeout)
  61. loop:
  62. for {
  63. select {
  64. case <-timer.C:
  65. event := &eventsource.Event{
  66. Name: "ping",
  67. }
  68. _, err := event.WriteTo(ctx.Resp)
  69. if err != nil {
  70. log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
  71. go unregister()
  72. break loop
  73. }
  74. ctx.Resp.Flush()
  75. case <-notify:
  76. go unregister()
  77. break loop
  78. case <-shutdownCtx.Done():
  79. go unregister()
  80. break loop
  81. case <-stopwatchTimer.C:
  82. sws, err := models.GetUserStopwatches(ctx.User.ID, db.ListOptions{})
  83. if err != nil {
  84. log.Error("Unable to GetUserStopwatches: %v", err)
  85. continue
  86. }
  87. apiSWs, err := convert.ToStopWatches(sws)
  88. if err != nil {
  89. log.Error("Unable to APIFormat stopwatches: %v", err)
  90. continue
  91. }
  92. dataBs, err := json.Marshal(apiSWs)
  93. if err != nil {
  94. log.Error("Unable to marshal stopwatches: %v", err)
  95. continue
  96. }
  97. _, err = (&eventsource.Event{
  98. Name: "stopwatches",
  99. Data: string(dataBs),
  100. }).WriteTo(ctx.Resp)
  101. if err != nil {
  102. log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
  103. go unregister()
  104. break loop
  105. }
  106. ctx.Resp.Flush()
  107. case event, ok := <-messageChan:
  108. if !ok {
  109. break loop
  110. }
  111. // Handle logout
  112. if event.Name == "logout" {
  113. if ctx.Session.ID() == event.Data {
  114. _, _ = (&eventsource.Event{
  115. Name: "logout",
  116. Data: "here",
  117. }).WriteTo(ctx.Resp)
  118. ctx.Resp.Flush()
  119. go unregister()
  120. auth.HandleSignOut(ctx)
  121. break loop
  122. }
  123. // Replace the event - we don't want to expose the session ID to the user
  124. event = &eventsource.Event{
  125. Name: "logout",
  126. Data: "elsewhere",
  127. }
  128. }
  129. _, err := event.WriteTo(ctx.Resp)
  130. if err != nil {
  131. log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
  132. go unregister()
  133. break loop
  134. }
  135. ctx.Resp.Flush()
  136. }
  137. }
  138. timer.Stop()
  139. }