summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-01-16 17:55:36 +0000
committerAntoine GIRARD <sapk@users.noreply.github.com>2020-01-16 18:55:36 +0100
commitc76c70a16ce6c1d472059ea3e03206abb5ed884d (patch)
tree25902cbd062be6fdbefe9417ca33c96acab5809a
parent06cd3e03a24e23d2d838cda6eb1edd5e7474ad42 (diff)
downloadgitea-c76c70a16ce6c1d472059ea3e03206abb5ed884d.tar.gz
gitea-c76c70a16ce6c1d472059ea3e03206abb5ed884d.zip
Move mailer to use a queue (#9789)
* Move mailer to use a queue * Make sectionMap map[string]bool * Ensure that Message is json encodable
-rw-r--r--modules/setting/queue.go24
-rw-r--r--services/mailer/mail.go2
-rw-r--r--services/mailer/mail_test.go29
-rw-r--r--services/mailer/mailer.go90
4 files changed, 92 insertions, 53 deletions
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index c91ff55acd..8c07685855 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -103,11 +103,11 @@ func NewQueueService() {
// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
- issueIndexerSectionMap := map[string]string{}
+ sectionMap := map[string]bool{}
for _, key := range section.Keys() {
- issueIndexerSectionMap[key.Name()] = key.Value()
+ sectionMap[key.Name()] = true
}
- if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
+ if _, ok := sectionMap["TYPE"]; !ok {
switch Indexer.IssueQueueType {
case LevelQueueType:
section.Key("TYPE").SetValue("level")
@@ -120,18 +120,28 @@ func NewQueueService() {
Indexer.IssueQueueType)
}
}
- if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
+ if _, ok := sectionMap["LENGTH"]; !ok {
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
}
- if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
+ if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
}
- if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
+ if _, ok := sectionMap["DATADIR"]; !ok {
section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
}
- if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
+ if _, ok := sectionMap["CONN_STR"]; !ok {
section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
}
+
+ // Handle the old mailer configuration
+ section = Cfg.Section("queue.mailer")
+ sectionMap = map[string]bool{}
+ for _, key := range section.Keys() {
+ sectionMap[key.Name()] = true
+ }
+ if _, ok := sectionMap["LENGTH"]; !ok {
+ section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
+ }
}
// ParseQueueConnStr parses a queue connection string
diff --git a/services/mailer/mail.go b/services/mailer/mail.go
index 4b8e46715f..3241ae728d 100644
--- a/services/mailer/mail.go
+++ b/services/mailer/mail.go
@@ -51,7 +51,7 @@ func InitMailRender(subjectTpl *texttmpl.Template, bodyTpl *template.Template) {
// SendTestMail sends a test mail
func SendTestMail(email string) error {
- return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").Message)
+ return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").ToMessage())
}
// SendUserMail sends a mail to the user
diff --git a/services/mailer/mail_test.go b/services/mailer/mail_test.go
index 43e99c635e..d7d02d9dee 100644
--- a/services/mailer/mail_test.go
+++ b/services/mailer/mail_test.go
@@ -61,11 +61,11 @@ func TestComposeIssueCommentMessage(t *testing.T) {
msgs := composeIssueCommentMessages(&mailCommentContext{Issue: issue, Doer: doer, ActionType: models.ActionCommentIssue,
Content: "test body", Comment: comment}, tos, false, "issue comment")
assert.Len(t, msgs, 2)
-
- mailto := msgs[0].GetHeader("To")
- subject := msgs[0].GetHeader("Subject")
- inreplyTo := msgs[0].GetHeader("In-Reply-To")
- references := msgs[0].GetHeader("References")
+ gomailMsg := msgs[0].ToMessage()
+ mailto := gomailMsg.GetHeader("To")
+ subject := gomailMsg.GetHeader("Subject")
+ inreplyTo := gomailMsg.GetHeader("In-Reply-To")
+ references := gomailMsg.GetHeader("References")
assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
assert.Equal(t, "Re: ", subject[0][:4], "Comment reply subject should contain Re:")
@@ -96,14 +96,15 @@ func TestComposeIssueMessage(t *testing.T) {
Content: "test body"}, tos, false, "issue create")
assert.Len(t, msgs, 2)
- mailto := msgs[0].GetHeader("To")
- subject := msgs[0].GetHeader("Subject")
- messageID := msgs[0].GetHeader("Message-ID")
+ gomailMsg := msgs[0].ToMessage()
+ mailto := gomailMsg.GetHeader("To")
+ subject := gomailMsg.GetHeader("Subject")
+ messageID := gomailMsg.GetHeader("Message-ID")
assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
assert.Equal(t, "[user2/repo1] @user2 #1 - issue1", subject[0])
- assert.Nil(t, msgs[0].GetHeader("In-Reply-To"))
- assert.Nil(t, msgs[0].GetHeader("References"))
+ assert.Nil(t, gomailMsg.GetHeader("In-Reply-To"))
+ assert.Nil(t, gomailMsg.GetHeader("References"))
assert.Equal(t, messageID[0], "<user2/repo1/issues/1@localhost>", "Message-ID header doesn't match")
}
@@ -134,9 +135,9 @@ func TestTemplateSelection(t *testing.T) {
InitMailRender(stpl, btpl)
expect := func(t *testing.T, msg *Message, expSubject, expBody string) {
- subject := msg.GetHeader("Subject")
+ subject := msg.ToMessage().GetHeader("Subject")
msgbuf := new(bytes.Buffer)
- _, _ = msg.WriteTo(msgbuf)
+ _, _ = msg.ToMessage().WriteTo(msgbuf)
wholemsg := msgbuf.String()
assert.Equal(t, []string{expSubject}, subject)
assert.Contains(t, wholemsg, expBody)
@@ -188,9 +189,9 @@ func TestTemplateServices(t *testing.T) {
msg := testComposeIssueCommentMessage(t, &mailCommentContext{Issue: issue, Doer: doer, ActionType: actionType,
Content: "test body", Comment: comment}, tos, fromMention, "TestTemplateServices")
- subject := msg.GetHeader("Subject")
+ subject := msg.ToMessage().GetHeader("Subject")
msgbuf := new(bytes.Buffer)
- _, _ = msg.WriteTo(msgbuf)
+ _, _ = msg.ToMessage().WriteTo(msgbuf)
wholemsg := msgbuf.String()
assert.Equal(t, []string{expSubject}, subject)
diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go
index 2e4aa8d71b..afcc7a7278 100644
--- a/services/mailer/mailer.go
+++ b/services/mailer/mailer.go
@@ -18,7 +18,9 @@ import (
"time"
"code.gitea.io/gitea/modules/base"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"github.com/jaytaylor/html2text"
@@ -27,38 +29,63 @@ import (
// Message mail body and log info
type Message struct {
- Info string // Message information for log purpose.
- *gomail.Message
+ Info string // Message information for log purpose.
+ FromAddress string
+ FromDisplayName string
+ To []string
+ Subject string
+ Date time.Time
+ Body string
+ Headers map[string][]string
}
-// NewMessageFrom creates new mail message object with custom From header.
-func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
- log.Trace("NewMessageFrom (body):\n%s", body)
-
+// ToMessage converts a Message to gomail.Message
+func (m *Message) ToMessage() *gomail.Message {
msg := gomail.NewMessage()
- msg.SetAddressHeader("From", fromAddress, fromDisplayName)
- msg.SetHeader("To", to...)
+ msg.SetAddressHeader("From", m.FromAddress, m.FromDisplayName)
+ msg.SetHeader("To", m.To...)
+ for header := range m.Headers {
+ msg.SetHeader(header, m.Headers[header]...)
+ }
+
if len(setting.MailService.SubjectPrefix) > 0 {
- msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+subject)
+ msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+m.Subject)
} else {
- msg.SetHeader("Subject", subject)
+ msg.SetHeader("Subject", m.Subject)
}
- msg.SetDateHeader("Date", time.Now())
+ msg.SetDateHeader("Date", m.Date)
msg.SetHeader("X-Auto-Response-Suppress", "All")
- plainBody, err := html2text.FromString(body)
+ plainBody, err := html2text.FromString(m.Body)
if err != nil || setting.MailService.SendAsPlainText {
- if strings.Contains(base.TruncateString(body, 100), "<html>") {
+ if strings.Contains(base.TruncateString(m.Body, 100), "<html>") {
log.Warn("Mail contains HTML but configured to send as plain text.")
}
msg.SetBody("text/plain", plainBody)
} else {
msg.SetBody("text/plain", plainBody)
- msg.AddAlternative("text/html", body)
+ msg.AddAlternative("text/html", m.Body)
}
+ return msg
+}
+
+// SetHeader adds additional headers to a message
+func (m *Message) SetHeader(field string, value ...string) {
+ m.Headers[field] = value
+}
+
+// NewMessageFrom creates new mail message object with custom From header.
+func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
+ log.Trace("NewMessageFrom (body):\n%s", body)
return &Message{
- Message: msg,
+ FromAddress: fromAddress,
+ FromDisplayName: fromDisplayName,
+ To: to,
+ Subject: subject,
+ Date: time.Now(),
+ Body: body,
+ Headers: map[string][]string{},
}
}
@@ -257,18 +284,7 @@ func (s *dummySender) Send(from string, to []string, msg io.WriterTo) error {
return nil
}
-func processMailQueue() {
- for msg := range mailQueue {
- log.Trace("New e-mail sending request %s: %s", msg.GetHeader("To"), msg.Info)
- if err := gomail.Send(Sender, msg.Message); err != nil {
- log.Error("Failed to send emails %s: %s - %v", msg.GetHeader("To"), msg.Info, err)
- } else {
- log.Trace("E-mails sent %s: %s", msg.GetHeader("To"), msg.Info)
- }
- }
-}
-
-var mailQueue chan *Message
+var mailQueue queue.Queue
// Sender sender for sending mail synchronously
var Sender gomail.Sender
@@ -291,14 +307,26 @@ func NewContext() {
Sender = &dummySender{}
}
- mailQueue = make(chan *Message, setting.MailService.QueueLength)
- go processMailQueue()
+ mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) {
+ for _, datum := range data {
+ msg := datum.(*Message)
+ gomailMsg := msg.ToMessage()
+ log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info)
+ if err := gomail.Send(Sender, gomailMsg); err != nil {
+ log.Error("Failed to send emails %s: %s - %v", gomailMsg.GetHeader("To"), msg.Info, err)
+ } else {
+ log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info)
+ }
+ }
+ }, &Message{})
+
+ go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
}
// SendAsync send mail asynchronously
func SendAsync(msg *Message) {
go func() {
- mailQueue <- msg
+ _ = mailQueue.Push(msg)
}()
}
@@ -306,7 +334,7 @@ func SendAsync(msg *Message) {
func SendAsyncs(msgs []*Message) {
go func() {
for _, msg := range msgs {
- mailQueue <- msg
+ _ = mailQueue.Push(msg)
}
}()
}