diff options
Diffstat (limited to 'src/protocol.c')
-rw-r--r-- | src/protocol.c | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/src/protocol.c b/src/protocol.c index 92b331ac2..ab0db5f78 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -174,14 +174,17 @@ static int parse_header (struct worker_task *task, char *line) { char *headern, *err, *tmp; - + + msg_debug ("parse_header: got line from worker: %s", line); /* Check end of headers */ if (*line == '\0') { + msg_debug ("parse_header: got empty line, assume it as end of headers"); if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) { task->state = WRITE_REPLY; } else { if (task->content_length > 0) { + rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length); task->state = READ_MESSAGE; } else { @@ -209,14 +212,7 @@ parse_header (struct worker_task *task, char *line) if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) { if (task->content_length == 0) { task->content_length = strtoul (line, &err, 10); - task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_buf_t)); - task->msg->buf = fstralloc (task->task_pool, task->content_length); - if (task->msg->buf == NULL) { - msg_err ("read_socket: cannot allocate memory for message buffer"); - return -1; - } - task->msg->pos = task->msg->buf->begin; - update_buf_size (task->msg); + msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length); } } else { @@ -229,6 +225,7 @@ parse_header (struct worker_task *task, char *line) /* helo */ if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) { task->helo = memory_pool_strdup (task->task_pool, line); + msg_debug ("parse_header: read helo header, value: %s", task->helo); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -240,6 +237,7 @@ parse_header (struct worker_task *task, char *line) /* from */ if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) { task->from = memory_pool_strdup (task->task_pool, line); + msg_debug ("parse_header: read from header, value: %s", task->from); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -252,6 +250,7 @@ parse_header (struct worker_task *task, char *line) if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) { tmp = memory_pool_strdup (task->task_pool, line); task->rcpt = g_list_prepend (task->rcpt, tmp); + msg_debug ("parse_header: read rcpt header, value: %s", tmp); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -263,6 +262,7 @@ parse_header (struct worker_task *task, char *line) /* nrcpt */ if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) { task->nrcpt = strtoul (line, &err, 10); + msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -277,6 +277,7 @@ parse_header (struct worker_task *task, char *line) msg_info ("parse_header: bad ip header: '%s'", line); return -1; } + msg_debug ("parse_header: read IP header, value: %s", line); } else { msg_info ("parse_header: wrong header: %s", headern); @@ -292,14 +293,14 @@ parse_header (struct worker_task *task, char *line) } int -read_rspamd_input_line (struct worker_task *task, char *line) +read_rspamd_input_line (struct worker_task *task, f_str_t *line) { switch (task->state) { case READ_COMMAND: - return parse_command (task, line); + return parse_command (task, fstrcstr (line, task->task_pool)); break; case READ_HEADER: - return parse_header (task, line); + return parse_header (task, fstrcstr (line, task->task_pool)); break; } } @@ -323,7 +324,7 @@ show_url_header (struct worker_task *task) /* Do header folding */ if (host.len + r >= OUTBUFSIZ - 3) { outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' '; - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE); r = 0; } /* Write url host to buf */ @@ -340,7 +341,7 @@ show_url_header (struct worker_task *task) *(host.begin + host.len) = c; } } - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } static void @@ -363,7 +364,7 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name, (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score); } - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } static int @@ -374,7 +375,7 @@ write_check_reply (struct worker_task *task) struct metric_result *metric_res; r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK"); - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE); if (task->proto == SPAMC_PROTO) { /* Ignore metrics, just write report for 'default' metric */ metric_res = g_hash_table_lookup (task->results, "default"); @@ -391,7 +392,7 @@ write_check_reply (struct worker_task *task) /* URL stat */ show_url_header (task); } - bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1); + rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE); return 0; } @@ -423,7 +424,7 @@ show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_dat g_list_free (symbols); msg_debug ("show_metric_symbols: write symbols line: %s", outbuf); outbuf[r++] = '\r'; outbuf[r++] = '\n'; - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } static int @@ -450,6 +451,7 @@ write_symbols_reply (struct worker_task *task) /* Write result for each metric separately */ g_hash_table_foreach (task->results, show_metric_symbols, task); } + return 0; } @@ -460,9 +462,9 @@ write_process_reply (struct worker_task *task) char outbuf[OUTBUFSIZ]; r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF, - (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->buf->len); - bufferevent_write (task->bev, outbuf, r); - bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len); + (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->len); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE); + rspamd_dispatcher_write (task->dispatcher, task->msg->begin, task->msg->len, FALSE); return 0; } @@ -486,7 +488,7 @@ write_reply (struct worker_task *task) msg_debug ("write_reply: writing error: %s", outbuf); } /* Write to bufferevent error message */ - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); } else { switch (task->cmd) { @@ -504,11 +506,11 @@ write_reply (struct worker_task *task) case CMD_SKIP: r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, SPAMD_OK); - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); break; case CMD_PING: r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); - bufferevent_write (task->bev, outbuf, r); + rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE); break; } } |