You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

deliver.go 9.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package webhook
  4. import (
  5. "context"
  6. "crypto/hmac"
  7. "crypto/sha1"
  8. "crypto/tls"
  9. "encoding/hex"
  10. "fmt"
  11. "io"
  12. "net/http"
  13. "net/url"
  14. "strings"
  15. "sync"
  16. "time"
  17. webhook_model "code.gitea.io/gitea/models/webhook"
  18. "code.gitea.io/gitea/modules/graceful"
  19. "code.gitea.io/gitea/modules/hostmatcher"
  20. "code.gitea.io/gitea/modules/log"
  21. "code.gitea.io/gitea/modules/process"
  22. "code.gitea.io/gitea/modules/proxy"
  23. "code.gitea.io/gitea/modules/queue"
  24. "code.gitea.io/gitea/modules/setting"
  25. "code.gitea.io/gitea/modules/timeutil"
  26. webhook_module "code.gitea.io/gitea/modules/webhook"
  27. "github.com/gobwas/glob"
  28. "github.com/minio/sha256-simd"
  29. )
  30. // Deliver deliver hook task
  31. func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
  32. w, err := webhook_model.GetWebhookByID(t.HookID)
  33. if err != nil {
  34. return err
  35. }
  36. defer func() {
  37. err := recover()
  38. if err == nil {
  39. return
  40. }
  41. // There was a panic whilst delivering a hook...
  42. log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
  43. }()
  44. t.IsDelivered = true
  45. var req *http.Request
  46. switch w.HTTPMethod {
  47. case "":
  48. log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
  49. fallthrough
  50. case http.MethodPost:
  51. switch w.ContentType {
  52. case webhook_model.ContentTypeJSON:
  53. req, err = http.NewRequest("POST", w.URL, strings.NewReader(t.PayloadContent))
  54. if err != nil {
  55. return err
  56. }
  57. req.Header.Set("Content-Type", "application/json")
  58. case webhook_model.ContentTypeForm:
  59. forms := url.Values{
  60. "payload": []string{t.PayloadContent},
  61. }
  62. req, err = http.NewRequest("POST", w.URL, strings.NewReader(forms.Encode()))
  63. if err != nil {
  64. return err
  65. }
  66. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  67. }
  68. case http.MethodGet:
  69. u, err := url.Parse(w.URL)
  70. if err != nil {
  71. return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
  72. }
  73. vals := u.Query()
  74. vals["payload"] = []string{t.PayloadContent}
  75. u.RawQuery = vals.Encode()
  76. req, err = http.NewRequest("GET", u.String(), nil)
  77. if err != nil {
  78. return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
  79. }
  80. case http.MethodPut:
  81. switch w.Type {
  82. case webhook_module.MATRIX:
  83. txnID, err := getMatrixTxnID([]byte(t.PayloadContent))
  84. if err != nil {
  85. return err
  86. }
  87. url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID))
  88. req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent))
  89. if err != nil {
  90. return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
  91. }
  92. default:
  93. return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
  94. }
  95. default:
  96. return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
  97. }
  98. var signatureSHA1 string
  99. var signatureSHA256 string
  100. if len(w.Secret) > 0 {
  101. sig1 := hmac.New(sha1.New, []byte(w.Secret))
  102. sig256 := hmac.New(sha256.New, []byte(w.Secret))
  103. _, err = io.MultiWriter(sig1, sig256).Write([]byte(t.PayloadContent))
  104. if err != nil {
  105. log.Error("prepareWebhooks.sigWrite: %v", err)
  106. }
  107. signatureSHA1 = hex.EncodeToString(sig1.Sum(nil))
  108. signatureSHA256 = hex.EncodeToString(sig256.Sum(nil))
  109. }
  110. event := t.EventType.Event()
  111. eventType := string(t.EventType)
  112. req.Header.Add("X-Gitea-Delivery", t.UUID)
  113. req.Header.Add("X-Gitea-Event", event)
  114. req.Header.Add("X-Gitea-Event-Type", eventType)
  115. req.Header.Add("X-Gitea-Signature", signatureSHA256)
  116. req.Header.Add("X-Gogs-Delivery", t.UUID)
  117. req.Header.Add("X-Gogs-Event", event)
  118. req.Header.Add("X-Gogs-Event-Type", eventType)
  119. req.Header.Add("X-Gogs-Signature", signatureSHA256)
  120. req.Header.Add("X-Hub-Signature", "sha1="+signatureSHA1)
  121. req.Header.Add("X-Hub-Signature-256", "sha256="+signatureSHA256)
  122. req.Header["X-GitHub-Delivery"] = []string{t.UUID}
  123. req.Header["X-GitHub-Event"] = []string{event}
  124. req.Header["X-GitHub-Event-Type"] = []string{eventType}
  125. // Add Authorization Header
  126. authorization, err := w.HeaderAuthorization()
  127. if err != nil {
  128. log.Error("Webhook could not get Authorization header [%d]: %v", w.ID, err)
  129. return err
  130. }
  131. if authorization != "" {
  132. req.Header["Authorization"] = []string{authorization}
  133. }
  134. // Record delivery information.
  135. t.RequestInfo = &webhook_model.HookRequest{
  136. URL: req.URL.String(),
  137. HTTPMethod: req.Method,
  138. Headers: map[string]string{},
  139. }
  140. for k, vals := range req.Header {
  141. t.RequestInfo.Headers[k] = strings.Join(vals, ",")
  142. }
  143. t.ResponseInfo = &webhook_model.HookResponse{
  144. Headers: map[string]string{},
  145. }
  146. // OK We're now ready to attempt to deliver the task - we must double check that it
  147. // has not been delivered in the meantime
  148. updated, err := webhook_model.MarkTaskDelivered(ctx, t)
  149. if err != nil {
  150. log.Error("MarkTaskDelivered[%d]: %v", t.ID, err)
  151. return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
  152. }
  153. if !updated {
  154. // This webhook task has already been attempted to be delivered or is in the process of being delivered
  155. log.Trace("Webhook Task[%d] already delivered", t.ID)
  156. return nil
  157. }
  158. // All code from this point will update the hook task
  159. defer func() {
  160. t.Delivered = timeutil.TimeStampNanoNow()
  161. if t.IsSucceed {
  162. log.Trace("Hook delivered: %s", t.UUID)
  163. } else if !w.IsActive {
  164. log.Trace("Hook delivery skipped as webhook is inactive: %s", t.UUID)
  165. } else {
  166. log.Trace("Hook delivery failed: %s", t.UUID)
  167. }
  168. if err := webhook_model.UpdateHookTask(t); err != nil {
  169. log.Error("UpdateHookTask [%d]: %v", t.ID, err)
  170. }
  171. // Update webhook last delivery status.
  172. if t.IsSucceed {
  173. w.LastStatus = webhook_module.HookStatusSucceed
  174. } else {
  175. w.LastStatus = webhook_module.HookStatusFail
  176. }
  177. if err = webhook_model.UpdateWebhookLastStatus(w); err != nil {
  178. log.Error("UpdateWebhookLastStatus: %v", err)
  179. return
  180. }
  181. }()
  182. if setting.DisableWebhooks {
  183. return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID)
  184. }
  185. if !w.IsActive {
  186. log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
  187. return nil
  188. }
  189. resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
  190. if err != nil {
  191. t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
  192. return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
  193. }
  194. defer resp.Body.Close()
  195. // Status code is 20x can be seen as succeed.
  196. t.IsSucceed = resp.StatusCode/100 == 2
  197. t.ResponseInfo.Status = resp.StatusCode
  198. for k, vals := range resp.Header {
  199. t.ResponseInfo.Headers[k] = strings.Join(vals, ",")
  200. }
  201. p, err := io.ReadAll(resp.Body)
  202. if err != nil {
  203. t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
  204. return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
  205. }
  206. t.ResponseInfo.Body = string(p)
  207. return nil
  208. }
  209. var (
  210. webhookHTTPClient *http.Client
  211. once sync.Once
  212. hostMatchers []glob.Glob
  213. )
  214. func webhookProxy() func(req *http.Request) (*url.URL, error) {
  215. if setting.Webhook.ProxyURL == "" {
  216. return proxy.Proxy()
  217. }
  218. once.Do(func() {
  219. for _, h := range setting.Webhook.ProxyHosts {
  220. if g, err := glob.Compile(h); err == nil {
  221. hostMatchers = append(hostMatchers, g)
  222. } else {
  223. log.Error("glob.Compile %s failed: %v", h, err)
  224. }
  225. }
  226. })
  227. return func(req *http.Request) (*url.URL, error) {
  228. for _, v := range hostMatchers {
  229. if v.Match(req.URL.Host) {
  230. return http.ProxyURL(setting.Webhook.ProxyURLFixed)(req)
  231. }
  232. }
  233. return http.ProxyFromEnvironment(req)
  234. }
  235. }
  236. // Init starts the hooks delivery thread
  237. func Init() error {
  238. timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
  239. allowedHostListValue := setting.Webhook.AllowedHostList
  240. if allowedHostListValue == "" {
  241. allowedHostListValue = hostmatcher.MatchBuiltinExternal
  242. }
  243. allowedHostMatcher := hostmatcher.ParseHostMatchList("webhook.ALLOWED_HOST_LIST", allowedHostListValue)
  244. webhookHTTPClient = &http.Client{
  245. Timeout: timeout,
  246. Transport: &http.Transport{
  247. TLSClientConfig: &tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify},
  248. Proxy: webhookProxy(),
  249. DialContext: hostmatcher.NewDialContext("webhook", allowedHostMatcher, nil),
  250. },
  251. }
  252. hookQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "webhook_sender", handler)
  253. if hookQueue == nil {
  254. return fmt.Errorf("unable to create webhook_sender queue")
  255. }
  256. go graceful.GetManager().RunWithCancel(hookQueue)
  257. go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)
  258. return nil
  259. }
  260. func populateWebhookSendingQueue(ctx context.Context) {
  261. ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
  262. defer finished()
  263. lowerID := int64(0)
  264. for {
  265. taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
  266. if err != nil {
  267. log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
  268. return
  269. }
  270. if len(taskIDs) == 0 {
  271. return
  272. }
  273. lowerID = taskIDs[len(taskIDs)-1]
  274. for _, taskID := range taskIDs {
  275. select {
  276. case <-ctx.Done():
  277. log.Warn("Shutdown before Webhook Sending queue finishing being populated")
  278. return
  279. default:
  280. }
  281. if err := enqueueHookTask(taskID); err != nil {
  282. log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
  283. }
  284. }
  285. }
  286. }