Browse Source

* Check return value of each rspamd_dispatcher_write as in case of write errors sessions can be destroyed early

tags/0.3.1
Vsevolod Stakhov 14 years ago
parent
commit
9727678e70
12 changed files with 428 additions and 160 deletions
  1. 2
    2
      src/buffer.h
  2. 105
    46
      src/controller.c
  3. 4
    5
      src/lmtp.c
  4. 55
    22
      src/lmtp_proto.c
  5. 3
    1
      src/plugins/emails.c
  6. 27
    9
      src/plugins/fuzzy_check.c
  7. 4
    2
      src/plugins/surbl.c
  8. 95
    29
      src/protocol.c
  9. 1
    1
      src/protocol.h
  10. 34
    12
      src/smtp.c
  11. 84
    27
      src/smtp_proto.c
  12. 14
    4
      src/worker.c

+ 2
- 2
src/buffer.h View File

@@ -93,7 +93,7 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
*/
gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated);
size_t len, gboolean delayed, gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;

/**
* Send specified descriptor to dispatcher
@@ -101,7 +101,7 @@ gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
* @param fd descriptor of file
* @param len length of data
*/
gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len);
gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len) G_GNUC_WARN_UNUSED_RESULT;

/**
* Pause IO events on dispatcher

+ 105
- 46
src/controller.c View File

@@ -161,7 +161,9 @@ check_auth (struct controller_command *cmd, struct controller_session *session)

if (cmd->privilleged && !session->authorized) {
r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return 0;
}
return 0;
}

@@ -178,7 +180,9 @@ counter_write_callback (gpointer key, gpointer value, void *data)
int r;

r = snprintf (out_buf, sizeof (out_buf), "%s: %llu" CRLF, name, (unsigned long long int)cd->value);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) {
msg_warn ("cannot write to socket");
}
}

static gboolean
@@ -223,7 +227,9 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c
}

i = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %uL" CRLF, rev, ti, pos);
rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE)) {
return FALSE;
}

if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) {
return FALSE;
@@ -299,7 +305,12 @@ process_sync_command (struct controller_session *session, char **args)
while (binlog_sync (binlog, rev, &time, &data)) {
r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) {
if (data != NULL) {
g_free (data);
}
return FALSE;
}
if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
if (data != NULL) {
g_free (data);
@@ -388,7 +399,7 @@ process_stat_command (struct controller_session *session)
return rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
}

static void
static gboolean
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
{
char out_buf[BUFSIZ], *arg, *err_str;
@@ -404,23 +415,31 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("empty password passed");
r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
if (password == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "password command disabled in config, authorized access unallowed" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
if (strncmp (arg, password, strlen (arg)) == 0) {
session->authorized = 1;
r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
}
else {
session->authorized = 0;
r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
}
break;
case COMMAND_QUIT:
@@ -429,19 +448,23 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
case COMMAND_RELOAD:
if (check_auth (cmd, session)) {
r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
kill (getppid (), SIGHUP);
}
break;
case COMMAND_STAT:
if (check_auth (cmd, session)) {
(void)process_stat_command (session);
return process_stat_command (session);
}
break;
case COMMAND_SHUTDOWN:
if (check_auth (cmd, session)) {
r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
kill (getppid (), SIGTERM);
}
break;
@@ -466,7 +489,9 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
uptime -= hours * 3600 + minutes * 60;
r = snprintf (out_buf, sizeof (out_buf), "%d hour%s %d minute%s %d second%s" CRLF, hours, hours > 1 ? "s" : " ", minutes, minutes > 1 ? "s" : " ", (int)uptime, uptime > 1 ? "s" : " ");
}
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
}
break;
case COMMAND_LEARN:
@@ -475,30 +500,38 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("no statfile specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
arg = *(cmd_args + 1);
if (arg == NULL || *arg == '\0') {
msg_debug ("no statfile size specified in learn command");
r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
msg_debug ("message size is invalid: %s", arg);
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}

session->learn_symbol = memory_pool_strdup (session->session_pool, *cmd_args);
cl = g_hash_table_lookup (session->cfg->classifiers_symbols, *cmd_args);
if (cl == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;

}
session->learn_classifier = cl;
@@ -515,8 +548,9 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
}
session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
break;
@@ -524,8 +558,9 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
}
session->learn_from = memory_pool_strdup (session->session_pool, arg);
break;
@@ -536,15 +571,18 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
arg = *(cmd_args + 1);
if (!arg || *arg == '\0') {
r = snprintf (out_buf, sizeof (out_buf), "multiplier is not defined" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
}
session->learn_multiplier = strtod (arg, NULL);
break;
default:
r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
}
}
@@ -558,29 +596,37 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
if (!arg || *arg == '\0') {
msg_debug ("no statfile specified in weights command");
r = snprintf (out_buf, sizeof (out_buf), "weights command requires two arguments: statfile and message size" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
arg = *(cmd_args + 1);
if (arg == NULL || *arg == '\0') {
msg_debug ("no message size specified in weights command");
r = snprintf (out_buf, sizeof (out_buf), "weights command requires two arguments: statfile and message size" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
msg_debug ("message size is invalid: %s", arg);
r = snprintf (out_buf, sizeof (out_buf), "message size is invalid" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}

cl = g_hash_table_lookup (session->cfg->classifiers_symbols, *cmd_args);
if (cl == NULL) {
r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;

}
session->learn_classifier = cl;
@@ -591,8 +637,10 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
case COMMAND_SYNC:
if (!process_sync_command (session, cmd_args)) {
r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}
break;
case COMMAND_HELP:
@@ -607,12 +655,15 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
" stat - show different rspamd stat" CRLF
" counters - show rspamd counters" CRLF
" uptime - rspamd uptime" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
break;
case COMMAND_COUNTERS:
rspamd_hash_foreach (counters, counter_write_callback, session);
break;
}
return TRUE;
}

static gboolean
@@ -664,7 +715,9 @@ controller_read_socket (f_str_t * in, void *arg)
comp_list = g_completion_complete (comp, cmd, NULL);
switch (g_list_length (comp_list)) {
case 1:
process_command ((struct controller_command *)comp_list->data, &params[1], session);
if (! process_command ((struct controller_command *)comp_list->data, &params[1], session)) {
return FALSE;
}
break;
case 0:
if (!process_custom_command (cmd, &params[1], session)) {
@@ -708,7 +761,9 @@ controller_read_socket (f_str_t * in, void *arg)
free_task (task, FALSE);
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return FALSE;
}
if ((s = g_hash_table_lookup (session->learn_classifier->opts, "header")) != NULL) {
@@ -805,7 +860,9 @@ controller_read_socket (f_str_t * in, void *arg)
free_task (task, FALSE);
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
}
return FALSE;
}

@@ -951,7 +1008,9 @@ accept_socket (int fd, short what, void *arg)
new_session->s = new_async_session (new_session->session_pool, free_session, new_session);

new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE);
if (! rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
msg_warn ("cannot write greeting");
}
}

void

+ 4
- 5
src/lmtp.c View File

@@ -147,10 +147,7 @@ lmtp_read_socket (f_str_t * in, void *arg)
r = process_message (lmtp->task);
r = process_filters (lmtp->task);
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = LMTP_FAILURE;
task->state = WRITE_ERROR;
lmtp_write_socket (lmtp);
return FALSE;
}
else if (r == 0) {
task->state = WAIT_FILTER;
@@ -262,7 +259,9 @@ accept_socket (int fd, short what, void *arg)
/* Set up dispatcher */
new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE);
if (! rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
msg_warn ("cannot write greeting");
}
}

/*

+ 55
- 22
src/lmtp_proto.c View File

@@ -88,7 +88,7 @@ extract_mail (memory_pool_t * pool, f_str_t * line)
return match;
}

static void
static gboolean
out_lmtp_reply (struct worker_task *task, int code, char *rcode, char *msg)
{
char outbuf[OUTBUFSIZ];
@@ -100,7 +100,10 @@ out_lmtp_reply (struct worker_task *task, int code, char *rcode, char *msg)
else {
r = snprintf (outbuf, OUTBUFSIZ, "%d %s %s\r\n", code, rcode, msg);
}
rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
return TRUE;
}

int
@@ -115,7 +118,7 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
/* Search LHLO line */
if ((i = fstrstri (line, &lhlo_command)) == -1) {
msg_info ("LHLO expected but not found");
out_lmtp_reply (lmtp->task, LMTP_BAD_CMD, "5.0.0", "Need LHLO here");
(void)out_lmtp_reply (lmtp->task, LMTP_BAD_CMD, "5.0.0", "Need LHLO here");
return -1;
}
else {
@@ -130,7 +133,9 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
/* Strlcpy makes string null terminated by design */
g_strlcpy (lmtp->task->helo, c, line->len - i + 1);
lmtp->state = LMTP_READ_FROM;
out_lmtp_reply (lmtp->task, LMTP_OK, "", "Ok");
if (! out_lmtp_reply (lmtp->task, LMTP_OK, "", "Ok")) {
return -1;
}
return 0;
}
break;
@@ -138,7 +143,7 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
/* Search MAIL FROM: line */
if ((i = fstrstri (line, &mail_command)) == -1) {
msg_info ("MAIL expected but not found");
out_lmtp_reply (lmtp->task, LMTP_BAD_CMD, "5.0.0", "Need MAIL here");
(void)out_lmtp_reply (lmtp->task, LMTP_BAD_CMD, "5.0.0", "Need MAIL here");
return -1;
}
else {
@@ -148,7 +153,9 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
fstr.len = line->len - i;
lmtp->task->from = extract_mail (lmtp->task->task_pool, &fstr);
lmtp->state = LMTP_READ_RCPT;
out_lmtp_reply (lmtp->task, LMTP_OK, "2.1.0", "Sender ok");
if (! out_lmtp_reply (lmtp->task, LMTP_OK, "2.1.0", "Sender ok")) {
return -1;
}
return 0;
}
break;
@@ -156,7 +163,7 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
/* Search RCPT_TO: line */
if ((i = fstrstri (line, &rcpt_command)) == -1) {
msg_info ("RCPT expected but not found");
out_lmtp_reply (lmtp->task, LMTP_NO_RCPT, "5.5.4", "Need RCPT here");
(void)out_lmtp_reply (lmtp->task, LMTP_NO_RCPT, "5.5.4", "Need RCPT here");
return -1;
}
else {
@@ -168,13 +175,15 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
if (*rcpt == '<' && *(rcpt + 1) == '>') {
/* Invalid or empty rcpt not allowed */
msg_info ("bad recipient");
out_lmtp_reply (lmtp->task, LMTP_NO_RCPT, "5.5.4", "Bad recipient");
(void)out_lmtp_reply (lmtp->task, LMTP_NO_RCPT, "5.5.4", "Bad recipient");
return -1;
}
/* Strlcpy makes string null terminated by design */
lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
lmtp->state = LMTP_READ_DATA;
out_lmtp_reply (lmtp->task, LMTP_OK, "2.1.0", "Recipient ok");
if (! out_lmtp_reply (lmtp->task, LMTP_OK, "2.1.0", "Recipient ok")) {
return -1;
}
return 0;
}
break;
@@ -182,7 +191,7 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
/* Search DATA line */
if ((i = fstrstri (line, &data_command)) == -1) {
msg_info ("DATA expected but not found");
out_lmtp_reply (lmtp->task, LMTP_BAD_CMD, "5.0.0", "Need DATA here");
(void)out_lmtp_reply (lmtp->task, LMTP_BAD_CMD, "5.0.0", "Need DATA here");
return -1;
}
else {
@@ -197,7 +206,9 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
g_strlcpy (rcpt, c, line->len - i + 1);
lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
lmtp->state = LMTP_READ_MESSAGE;
out_lmtp_reply (lmtp->task, LMTP_DATA, "", "Enter message, ending with \".\" on a line by itself");
if (! out_lmtp_reply (lmtp->task, LMTP_DATA, "", "Enter message, ending with \".\" on a line by itself")) {
return -1;
}
lmtp->task->msg = fstralloc (lmtp->task->task_pool, BUFSIZ);
return 0;
}
@@ -230,7 +241,9 @@ read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t * line)
/* We have some input after reading dot, close connection as we have no currently support of multiply
* messages per session
*/
out_lmtp_reply (lmtp->task, LMTP_QUIT, "", "Bye");
if (! out_lmtp_reply (lmtp->task, LMTP_QUIT, "", "Bye")) {
return -1;
}
return 0;
break;
}
@@ -288,10 +301,14 @@ close_mta_connection (struct mta_callback_data *cd, gboolean is_success)
{
cd->task->state = CLOSING_CONNECTION;
if (is_success) {
out_lmtp_reply (cd->task, LMTP_OK, "", "Delivery completed");
if (! out_lmtp_reply (cd->task, LMTP_OK, "", "Delivery completed")) {
return;
}
}
else {
out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure");
if (! out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure")) {
return;
}
}
rspamd_remove_dispatcher (cd->dispatcher);
}
@@ -335,7 +352,9 @@ mta_read_socket (f_str_t * in, void *arg)
else {
r = snprintf (outbuf, sizeof (outbuf), "HELO %s" CRLF, hostbuf);
}
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
cd->state = LMTP_WANT_MAIL;
break;
case LMTP_WANT_MAIL:
@@ -345,7 +364,9 @@ mta_read_socket (f_str_t * in, void *arg)
return FALSE;
}
r = snprintf (outbuf, sizeof (outbuf), "MAIL FROM: <%s>" CRLF, cd->task->from);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
cd->state = LMTP_WANT_RCPT;
break;
case LMTP_WANT_RCPT:
@@ -361,7 +382,9 @@ mta_read_socket (f_str_t * in, void *arg)
cur = g_list_next (cur);
}

rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
cd->state = LMTP_WANT_DATA;
break;
case LMTP_WANT_DATA:
@@ -371,7 +394,9 @@ mta_read_socket (f_str_t * in, void *arg)
return FALSE;
}
r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
cd->state = LMTP_WANT_DOT;
break;
case LMTP_WANT_DOT:
@@ -382,10 +407,14 @@ mta_read_socket (f_str_t * in, void *arg)
}
c = g_mime_object_to_string ((GMimeObject *) cd->task->message);
r = strlen (c);
rspamd_dispatcher_write (cd->task->dispatcher, c, r, TRUE, TRUE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, c, r, TRUE, TRUE)) {
return FALSE;
}
memory_pool_add_destructor (cd->task->task_pool, (pool_destruct_func) g_free, c);
r = snprintf (outbuf, sizeof (outbuf), CRLF "." CRLF);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
cd->state = LMTP_WANT_CLOSING;
case LMTP_WANT_CLOSING:
if (!parse_mta_str (in, cd)) {
@@ -641,7 +670,9 @@ write_lmtp_reply (struct rspamd_lmtp_proto *lmtp)

debug_task ("writing reply to client");
if (lmtp->task->error_code != 0) {
out_lmtp_reply (lmtp->task, lmtp->task->error_code, "", lmtp->task->last_error);
if (! out_lmtp_reply (lmtp->task, lmtp->task->error_code, "", lmtp->task->last_error)) {
return -1;
}
}
else {
/* Do delivery */
@@ -650,7 +681,9 @@ write_lmtp_reply (struct rspamd_lmtp_proto *lmtp)
return -1;
}
else if (r == 0) {
out_lmtp_reply (lmtp->task, LMTP_OK, "", "Delivery completed");
if (! out_lmtp_reply (lmtp->task, LMTP_OK, "", "Delivery completed")) {
return -1;
}
}
else {
return 1;

+ 3
- 1
src/plugins/emails.c View File

@@ -192,7 +192,9 @@ emails_command_handler (struct worker_task *task)
outbuf[r++] = '\r';
outbuf[r++] = '\n';

rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) {
return -1;
}
msg_info ("msg ok, id: <%s>, %d emails extracted", task->message_id, num);

return 0;

+ 27
- 9
src/plugins/fuzzy_check.c View File

@@ -446,12 +446,16 @@ fuzzy_learn_callback (int fd, short what, void *arg)
}
else if (buf[0] == 'O' && buf[1] == 'K') {
r = snprintf (buf, sizeof (buf), "OK" CRLF);
rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE)) {
return;
}
goto ok;
}
else {
r = snprintf (buf, sizeof (buf), "ERR" CRLF);
rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE)) {
return;
}
goto ok;
}
}
@@ -465,7 +469,9 @@ fuzzy_learn_callback (int fd, short what, void *arg)
err:
msg_err ("got error in IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
r = snprintf (buf, sizeof (buf), "Error" CRLF);
rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE)) {
return;
}
ok:
remove_normal_event (session->session->s, fuzzy_learn_fin, session);
}
@@ -576,7 +582,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
free_task (task, FALSE);
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
msg_warn ("write error");
}
return;
}
else {
@@ -605,7 +613,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return;
}
free_task (task, FALSE);
return;
}
@@ -634,7 +644,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
/* Cannot write hash */
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return;
}
free_task (task, FALSE);
return;
}
@@ -646,7 +658,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
if (*saved == 0) {
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return;
}
}
}

@@ -662,7 +676,9 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
if (!arg || *arg == '\0') {
msg_info ("empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return;
}
session->state = STATE_REPLY;
return;
}
@@ -670,7 +686,9 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
size = strtoul (arg, &err_str, 10);
if (errno != 0 || (err_str && *err_str != '\0')) {
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return;
}
session->state = STATE_REPLY;
return;
}

+ 4
- 2
src/plugins/surbl.c View File

@@ -867,9 +867,11 @@ urls_command_handler (struct worker_task *task)
outbuf[r++] = '\r';
outbuf[r++] = '\n';

rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, TRUE);
msg_info ("msg ok, id: <%s>, %d urls extracted", task->message_id, num);
g_tree_destroy (url_tree);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, TRUE)) {
return -1;
}
msg_info ("msg ok, id: <%s>, %d urls extracted", task->message_id, num);

return 0;
}

+ 95
- 29
src/protocol.c View File

@@ -414,6 +414,7 @@ struct metric_callback_data {
char *log_buf;
int log_offset;
int log_size;
gboolean alive;
};

static void
@@ -438,7 +439,7 @@ write_hashes_to_log (struct worker_task *task, char *logbuf, int offset, int siz
}
}

static void
static gboolean
show_url_header (struct worker_task *task)
{
int r = 0;
@@ -467,7 +468,9 @@ show_url_header (struct worker_task *task)
outbuf[r++] = '\r';
outbuf[r++] = '\n';
outbuf[r] = ' ';
rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) {
return FALSE;
}
r = 0;
}
/* Write url host to buf */
@@ -487,7 +490,7 @@ show_url_header (struct worker_task *task)
}
cur = g_list_next (cur);
}
rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
}

static void
@@ -500,6 +503,10 @@ metric_symbols_callback (gpointer key, gpointer value, void *user_data)
struct symbol *s = (struct symbol *)value;
GList *cur;

if (! cd->alive) {
return;
}

if (s->options) {
r = snprintf (outbuf, OUTBUFSIZ, "Symbol: %s; ", (char *)key);
cur = s->options;
@@ -523,10 +530,12 @@ metric_symbols_callback (gpointer key, gpointer value, void *user_data)
}
cd->log_offset += snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "%s,", (char *)key);

rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) {
cd->alive = FALSE;
}
}

static void
static gboolean
show_metric_symbols (struct metric_result *metric_res, struct metric_callback_data *cd)
{
int r = 0;
@@ -546,7 +555,9 @@ show_metric_symbols (struct metric_result *metric_res, struct metric_callback_da
cur = g_list_next (cur);
}
g_list_free (symbols);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
}
else {
g_hash_table_foreach (metric_res->symbols, metric_symbols_callback, cd);
@@ -555,6 +566,8 @@ show_metric_symbols (struct metric_result *metric_res, struct metric_callback_da
cd->log_buf[--cd->log_offset] = '\0';
}
}

return TRUE;
}


@@ -570,7 +583,9 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
int is_spam = 0;
double ms = 0, rs = 0;


if (! cd->alive) {
return;
}
if (metric_name == NULL || metric_value == NULL) {
m = g_hash_table_lookup (task->cfg->metrics, DEFAULT_METRIC);
default_required_score = m->required_score;
@@ -653,10 +668,14 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
#endif
}
else {
rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) {
cd->alive = FALSE;
}

if (task->cmd == CMD_SYMBOLS && metric_value != NULL) {
show_metric_symbols (metric_res, cd);
if (! show_metric_symbols (metric_res, cd)) {
cd->alive = FALSE;
}
}
}
#ifdef HAVE_CLOCK_GETTIME
@@ -668,7 +687,7 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
#endif
}

static void
static gboolean
show_messages (struct worker_task *task)
{
int r = 0;
@@ -681,10 +700,10 @@ show_messages (struct worker_task *task)
cur = g_list_next (cur);
}

rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
}

static int
static gboolean
write_check_reply (struct worker_task *task)
{
int r;
@@ -694,12 +713,15 @@ write_check_reply (struct worker_task *task)

r = snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
task->proto_ver, "OK");
rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) {
return FALSE;
}

cd.task = task;
cd.log_buf = logbuf;
cd.log_offset = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", task->message_id);
cd.log_size = sizeof (logbuf);
cd.alive = TRUE;

if (task->proto == SPAMC_PROTO) {
/* Ignore metrics, just write report for 'default' metric */
@@ -707,9 +729,15 @@ write_check_reply (struct worker_task *task)
if (metric_res == NULL) {
/* Implicit metric result */
show_metric_result (NULL, NULL, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
else {
show_metric_result ((gpointer) "default", (gpointer) metric_res, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
}
else {
@@ -718,23 +746,38 @@ write_check_reply (struct worker_task *task)
if (metric_res == NULL) {
/* Implicit metric result */
show_metric_result (NULL, NULL, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
else {
show_metric_result ((gpointer) "default", (gpointer) metric_res, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
g_hash_table_remove (task->results, "default");

/* Write result for each metric separately */
g_hash_table_foreach (task->results, show_metric_result, &cd);
if (!cd.alive) {
return FALSE;
}
/* Messages */
show_messages (task);
if (! show_messages (task)) {
return FALSE;
}
/* URL stat */
show_url_header (task);
if (! show_url_header (task)) {
return FALSE;
}
}
write_hashes_to_log (task, logbuf, cd.log_offset, cd.log_size);
msg_info ("%s", logbuf);
rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
return FALSE;
}

task->worker->srv->stat->messages_scanned++;
if (default_score >= default_required_score) {
@@ -744,10 +787,10 @@ write_check_reply (struct worker_task *task)
task->worker->srv->stat->messages_ham ++;
}

return 0;
return TRUE;
}

static int
static gboolean
write_process_reply (struct worker_task *task)
{
int r;
@@ -764,6 +807,7 @@ write_process_reply (struct worker_task *task)
cd.log_buf = logbuf;
cd.log_offset = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", task->message_id);
cd.log_size = sizeof (logbuf);
cd.alive = TRUE;

if (task->proto == SPAMC_PROTO) {
/* Ignore metrics, just write report for 'default' metric */
@@ -771,9 +815,15 @@ write_process_reply (struct worker_task *task)
if (metric_res == NULL) {
/* Implicit metric result */
show_metric_result (NULL, NULL, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
else {
show_metric_result ((gpointer) "default", (gpointer) metric_res, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
}
else {
@@ -782,24 +832,40 @@ write_process_reply (struct worker_task *task)
if (metric_res == NULL) {
/* Implicit metric result */
show_metric_result (NULL, NULL, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
else {
show_metric_result ((gpointer) "default", (gpointer) metric_res, (void *)&cd);
if (! cd.alive) {
return FALSE;
}
}
g_hash_table_remove (task->results, "default");

/* Write result for each metric separately */
g_hash_table_foreach (task->results, show_metric_result, &cd);
if (! cd.alive) {
return FALSE;
}
/* Messages */
show_messages (task);
if (! show_messages (task)) {
return FALSE;
}
}
write_hashes_to_log (task, logbuf, cd.log_offset, cd.log_size);
msg_info ("%s", logbuf);

outmsg = g_mime_object_to_string (GMIME_OBJECT (task->message));
memory_pool_add_destructor (task->task_pool, (pool_destruct_func) g_free, outmsg);

rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE);
rspamd_dispatcher_write (task->dispatcher, outmsg, strlen (outmsg), FALSE, TRUE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) {
return FALSE;
}
if (! rspamd_dispatcher_write (task->dispatcher, outmsg, strlen (outmsg), FALSE, TRUE)) {
return FALSE;
}

task->worker->srv->stat->messages_scanned++;
if (default_score >= default_required_score) {
@@ -809,12 +875,10 @@ write_process_reply (struct worker_task *task)
task->worker->srv->stat->messages_ham ++;
}

memory_pool_add_destructor (task->task_pool, (pool_destruct_func) g_free, outmsg);

return 0;
return TRUE;
}

int
gboolean
write_reply (struct worker_task *task)
{
int r;
@@ -834,7 +898,9 @@ write_reply (struct worker_task *task)
debug_task ("writing error: %s", outbuf);
}
/* Write to bufferevent error message */
rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) {
return FALSE;
}
}
else {
switch (task->cmd) {
@@ -850,19 +916,19 @@ write_reply (struct worker_task *task)
case CMD_SKIP:
r = snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF,
(task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, task->proto_ver, SPAMD_OK);
rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
break;
case CMD_PING:
r = snprintf (outbuf, sizeof (outbuf), "%s/%s 0 PONG" CRLF,
(task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, task->proto_ver);
rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE);
break;
case CMD_OTHER:
return task->custom_cmd->func (task);
}
}

return 0;
return FALSE;
}

void

+ 1
- 1
src/protocol.h View File

@@ -64,7 +64,7 @@ int read_rspamd_input_line (struct worker_task *task, f_str_t *line);
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
int write_reply (struct worker_task *task);
gboolean write_reply (struct worker_task *task) G_GNUC_WARN_UNUSED_RESULT;


/**

+ 34
- 12
src/smtp.c View File

@@ -316,7 +316,9 @@ smtp_send_upstream_message (struct smtp_session *session)
err:
session->error = SMTP_ERROR_FILE;
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
destroy_session (session->s);
return FALSE;
}
@@ -405,7 +407,9 @@ process_smtp_data (struct smtp_session *session)
err:
session->error = SMTP_ERROR_FILE;
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
destroy_session (session->s);
return FALSE;
}
@@ -435,7 +439,9 @@ smtp_read_socket (f_str_t * in, void *arg)
if (session->errors > session->ctx->max_errors) {
session->error = SMTP_ERROR_LIMIT;
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
destroy_session (session->s);
return FALSE;
}
@@ -454,7 +460,9 @@ smtp_read_socket (f_str_t * in, void *arg)
msg_err ("cannot write to temp file: %s", strerror (errno));
session->error = SMTP_ERROR_FILE;
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
destroy_session (session->s);
return FALSE;
}
@@ -496,7 +504,9 @@ smtp_write_socket (void *arg)

if (session->state == SMTP_STATE_CRITICAL_ERROR) {
if (session->error != NULL) {
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
}
destroy_session (session->s);
return FALSE;
@@ -540,8 +550,12 @@ smtp_write_socket (void *arg)
msg_info ("%s", logbuf);

if (is_spam) {
rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE)) {
return FALSE;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
return FALSE;
}
destroy_session (session->s);
return FALSE;
}
@@ -550,13 +564,17 @@ smtp_write_socket (void *arg)
}
else {
if (session->error != NULL) {
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
}
}
}
else {
if (session->error != NULL) {
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return FALSE;
}
}
}
@@ -579,12 +597,16 @@ smtp_err_socket (GError * err, void *arg)
/*
* Write greeting to client
*/
static void
static gboolean
write_smtp_greeting (struct smtp_session *session)
{
if (session->ctx->smtp_banner) {
rspamd_dispatcher_write (session->dispatcher, session->ctx->smtp_banner, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->ctx->smtp_banner, 0, FALSE, TRUE)) {
return FALSE;
}
}

return TRUE;
}

/*
@@ -601,7 +623,7 @@ smtp_delay_handler (int fd, short what, void *arg)
}
else {
session->state = SMTP_STATE_CRITICAL_ERROR;
smtp_write_socket (session);
(void)smtp_write_socket (session);
}
}


+ 84
- 27
src/smtp_proto.c View File

@@ -402,8 +402,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
@@ -437,8 +441,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
@@ -463,8 +471,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
@@ -483,8 +495,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
@@ -503,8 +519,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
if (session->cur_rcpt) {
session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt);
}
@@ -520,7 +540,9 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: ");
r += smtp_upstream_write_list (session->cur_rcpt, outbuf + r, sizeof (outbuf) - r);
session->cur_rcpt = g_list_next (session->cur_rcpt);
rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
if (! rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE)) {
goto err;
}
}
else {
session->upstream_state = SMTP_STATE_DATA;
@@ -529,8 +551,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
/* Write to client */
rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
if (session->state == SMTP_STATE_WAIT_UPSTREAM) {
rspamd_dispatcher_restore (session->dispatcher);
session->state = SMTP_STATE_RCPT;
@@ -545,8 +571,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
@@ -565,14 +595,18 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
session->error = SMTP_ERROR_FILE;
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
session->state = SMTP_STATE_AFTER_DATA;
session->error = SMTP_ERROR_DATA_OK;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
goto err;
}
rspamd_dispatcher_pause (session->upstream_dispatcher);
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, 0);
session->dispatcher->strip_eol = FALSE;
@@ -584,9 +618,15 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
g_strlcpy (session->error, in->begin, in->len + 1);
session->state = SMTP_STATE_DATA;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
rspamd_dispatcher_write (session->upstream_dispatcher, "QUIT" CRLF, sizeof ("QUIT" CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->upstream_dispatcher, "QUIT" CRLF, sizeof ("QUIT" CRLF) - 1, FALSE, TRUE)) {
goto err;
}
session->upstream_state = SMTP_STATE_END;
return TRUE;
break;
@@ -598,8 +638,12 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}
@@ -612,13 +656,20 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
msg_err ("got upstream reply at unexpected state: %d, reply: %V", session->upstream_state, in);
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
goto err;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
destroy_session (session->s);
return FALSE;
}

return TRUE;
err:
msg_warn ("write error occured");
return FALSE;
}

void
@@ -631,8 +682,12 @@ smtp_upstream_err_socket (GError *err, void *arg)
session->state = SMTP_STATE_CRITICAL_ERROR;
/* XXX: assume upstream errors as critical errors */
rspamd_dispatcher_restore (session->dispatcher);
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) {
return;
}
if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
return;
}
upstream_fail (&session->upstream->up, session->session_time);
destroy_session (session->s);
}
@@ -643,7 +698,9 @@ smtp_upstream_finalize_connection (gpointer data)
struct smtp_session *session = data;
if (session->state != SMTP_STATE_CRITICAL_ERROR) {
rspamd_dispatcher_write (session->upstream_dispatcher, "QUIT" CRLF, 0, FALSE, TRUE);
if (! rspamd_dispatcher_write (session->upstream_dispatcher, "QUIT" CRLF, 0, FALSE, TRUE)) {
msg_warn ("cannot send correctly closing message to upstream");
}
}
rspamd_remove_dispatcher (session->upstream_dispatcher);
session->upstream_dispatcher = NULL;

+ 14
- 4
src/worker.c View File

@@ -158,7 +158,10 @@ fin_custom_filters (struct worker_task *task)
if (filt->after_connect) {
filt->after_connect (&output, &log, curd->data);
if (output != NULL) {
rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE)) {
g_free (output);
return;
}
g_free (output);
}
if (log != NULL) {
@@ -190,7 +193,10 @@ parse_line_custom (struct worker_task *task, f_str_t *in)
res = FALSE;
}
if (output != NULL) {
rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE);
if (! rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE)) {
g_free (output);
return FALSE;
}
g_free (output);
}
if (curd->next) {
@@ -349,7 +355,9 @@ write_socket (void *arg)

switch (task->state) {
case WRITE_REPLY:
write_reply (task);
if (! write_reply (task)) {
return FALSE;
}
if (is_custom) {
fin_custom_filters (task);
}
@@ -357,7 +365,9 @@ write_socket (void *arg)
return FALSE;
break;
case WRITE_ERROR:
write_reply (task);
if (! write_reply (task)) {
return FALSE;
}
if (is_custom) {
fin_custom_filters (task);
}

Loading…
Cancel
Save