You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

events.go 2.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package events
  5. import (
  6. "net/http"
  7. "time"
  8. "code.gitea.io/gitea/modules/context"
  9. "code.gitea.io/gitea/modules/eventsource"
  10. "code.gitea.io/gitea/modules/graceful"
  11. "code.gitea.io/gitea/modules/log"
  12. "code.gitea.io/gitea/routers/web/auth"
  13. )
  14. // Events listens for events
  15. func Events(ctx *context.Context) {
  16. // FIXME: Need to check if resp is actually a http.Flusher! - how though?
  17. // Set the headers related to event streaming.
  18. ctx.Resp.Header().Set("Content-Type", "text/event-stream")
  19. ctx.Resp.Header().Set("Cache-Control", "no-cache")
  20. ctx.Resp.Header().Set("Connection", "keep-alive")
  21. ctx.Resp.Header().Set("X-Accel-Buffering", "no")
  22. ctx.Resp.WriteHeader(http.StatusOK)
  23. if !ctx.IsSigned {
  24. // Return unauthorized status event
  25. event := &eventsource.Event{
  26. Name: "close",
  27. Data: "unauthorized",
  28. }
  29. _, _ = event.WriteTo(ctx)
  30. ctx.Resp.Flush()
  31. return
  32. }
  33. // Listen to connection close and un-register messageChan
  34. notify := ctx.Done()
  35. ctx.Resp.Flush()
  36. shutdownCtx := graceful.GetManager().ShutdownContext()
  37. uid := ctx.Doer.ID
  38. messageChan := eventsource.GetManager().Register(uid)
  39. unregister := func() {
  40. eventsource.GetManager().Unregister(uid, messageChan)
  41. // ensure the messageChan is closed
  42. for {
  43. _, ok := <-messageChan
  44. if !ok {
  45. break
  46. }
  47. }
  48. }
  49. if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
  50. log.Error("Unable to write to EventStream: %v", err)
  51. unregister()
  52. return
  53. }
  54. timer := time.NewTicker(30 * time.Second)
  55. loop:
  56. for {
  57. select {
  58. case <-timer.C:
  59. event := &eventsource.Event{
  60. Name: "ping",
  61. }
  62. _, err := event.WriteTo(ctx.Resp)
  63. if err != nil {
  64. log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
  65. go unregister()
  66. break loop
  67. }
  68. ctx.Resp.Flush()
  69. case <-notify:
  70. go unregister()
  71. break loop
  72. case <-shutdownCtx.Done():
  73. go unregister()
  74. break loop
  75. case event, ok := <-messageChan:
  76. if !ok {
  77. break loop
  78. }
  79. // Handle logout
  80. if event.Name == "logout" {
  81. if ctx.Session.ID() == event.Data {
  82. _, _ = (&eventsource.Event{
  83. Name: "logout",
  84. Data: "here",
  85. }).WriteTo(ctx.Resp)
  86. ctx.Resp.Flush()
  87. go unregister()
  88. auth.HandleSignOut(ctx)
  89. break loop
  90. }
  91. // Replace the event - we don't want to expose the session ID to the user
  92. event = &eventsource.Event{
  93. Name: "logout",
  94. Data: "elsewhere",
  95. }
  96. }
  97. _, err := event.WriteTo(ctx.Resp)
  98. if err != nil {
  99. log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
  100. go unregister()
  101. break loop
  102. }
  103. ctx.Resp.Flush()
  104. }
  105. }
  106. timer.Stop()
  107. }