You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package events
  5. import (
  6. "net/http"
  7. "time"
  8. "code.gitea.io/gitea/modules/context"
  9. "code.gitea.io/gitea/modules/eventsource"
  10. "code.gitea.io/gitea/modules/graceful"
  11. "code.gitea.io/gitea/modules/log"
  12. "code.gitea.io/gitea/routers/user"
  13. )
  14. // Events listens for events
  15. func Events(ctx *context.Context) {
  16. // FIXME: Need to check if resp is actually a http.Flusher! - how though?
  17. // Set the headers related to event streaming.
  18. ctx.Resp.Header().Set("Content-Type", "text/event-stream")
  19. ctx.Resp.Header().Set("Cache-Control", "no-cache")
  20. ctx.Resp.Header().Set("Connection", "keep-alive")
  21. ctx.Resp.Header().Set("X-Accel-Buffering", "no")
  22. ctx.Resp.WriteHeader(http.StatusOK)
  23. // Listen to connection close and un-register messageChan
  24. notify := ctx.Req.Context().Done()
  25. ctx.Resp.Flush()
  26. shutdownCtx := graceful.GetManager().ShutdownContext()
  27. uid := ctx.User.ID
  28. messageChan := eventsource.GetManager().Register(uid)
  29. unregister := func() {
  30. eventsource.GetManager().Unregister(uid, messageChan)
  31. // ensure the messageChan is closed
  32. for {
  33. _, ok := <-messageChan
  34. if !ok {
  35. break
  36. }
  37. }
  38. }
  39. if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
  40. log.Error("Unable to write to EventStream: %v", err)
  41. unregister()
  42. return
  43. }
  44. timer := time.NewTicker(30 * time.Second)
  45. loop:
  46. for {
  47. select {
  48. case <-timer.C:
  49. event := &eventsource.Event{
  50. Name: "ping",
  51. }
  52. _, err := event.WriteTo(ctx.Resp)
  53. if err != nil {
  54. log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
  55. go unregister()
  56. break loop
  57. }
  58. ctx.Resp.Flush()
  59. case <-notify:
  60. go unregister()
  61. break loop
  62. case <-shutdownCtx.Done():
  63. go unregister()
  64. break loop
  65. case event, ok := <-messageChan:
  66. if !ok {
  67. break loop
  68. }
  69. // Handle logout
  70. if event.Name == "logout" {
  71. if ctx.Session.ID() == event.Data {
  72. _, _ = (&eventsource.Event{
  73. Name: "logout",
  74. Data: "here",
  75. }).WriteTo(ctx.Resp)
  76. ctx.Resp.Flush()
  77. go unregister()
  78. user.HandleSignOut(ctx)
  79. break loop
  80. }
  81. // Replace the event - we don't want to expose the session ID to the user
  82. event = (&eventsource.Event{
  83. Name: "logout",
  84. Data: "elsewhere",
  85. })
  86. }
  87. _, err := event.WriteTo(ctx.Resp)
  88. if err != nil {
  89. log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
  90. go unregister()
  91. break loop
  92. }
  93. ctx.Resp.Flush()
  94. }
  95. }
  96. timer.Stop()
  97. }