summaryrefslogtreecommitdiffstats
path: root/src/protocol.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol.c')
-rw-r--r--src/protocol.c50
1 files changed, 26 insertions, 24 deletions
diff --git a/src/protocol.c b/src/protocol.c
index 92b331ac2..ab0db5f78 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -174,14 +174,17 @@ static int
parse_header (struct worker_task *task, char *line)
{
char *headern, *err, *tmp;
-
+
+ msg_debug ("parse_header: got line from worker: %s", line);
/* Check end of headers */
if (*line == '\0') {
+ msg_debug ("parse_header: got empty line, assume it as end of headers");
if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
task->state = WRITE_REPLY;
}
else {
if (task->content_length > 0) {
+ rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
task->state = READ_MESSAGE;
}
else {
@@ -209,14 +212,7 @@ parse_header (struct worker_task *task, char *line)
if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
if (task->content_length == 0) {
task->content_length = strtoul (line, &err, 10);
- task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_buf_t));
- task->msg->buf = fstralloc (task->task_pool, task->content_length);
- if (task->msg->buf == NULL) {
- msg_err ("read_socket: cannot allocate memory for message buffer");
- return -1;
- }
- task->msg->pos = task->msg->buf->begin;
- update_buf_size (task->msg);
+ msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length);
}
}
else {
@@ -229,6 +225,7 @@ parse_header (struct worker_task *task, char *line)
/* helo */
if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
task->helo = memory_pool_strdup (task->task_pool, line);
+ msg_debug ("parse_header: read helo header, value: %s", task->helo);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -240,6 +237,7 @@ parse_header (struct worker_task *task, char *line)
/* from */
if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
task->from = memory_pool_strdup (task->task_pool, line);
+ msg_debug ("parse_header: read from header, value: %s", task->from);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -252,6 +250,7 @@ parse_header (struct worker_task *task, char *line)
if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
tmp = memory_pool_strdup (task->task_pool, line);
task->rcpt = g_list_prepend (task->rcpt, tmp);
+ msg_debug ("parse_header: read rcpt header, value: %s", tmp);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -263,6 +262,7 @@ parse_header (struct worker_task *task, char *line)
/* nrcpt */
if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
task->nrcpt = strtoul (line, &err, 10);
+ msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -277,6 +277,7 @@ parse_header (struct worker_task *task, char *line)
msg_info ("parse_header: bad ip header: '%s'", line);
return -1;
}
+ msg_debug ("parse_header: read IP header, value: %s", line);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
@@ -292,14 +293,14 @@ parse_header (struct worker_task *task, char *line)
}
int
-read_rspamd_input_line (struct worker_task *task, char *line)
+read_rspamd_input_line (struct worker_task *task, f_str_t *line)
{
switch (task->state) {
case READ_COMMAND:
- return parse_command (task, line);
+ return parse_command (task, fstrcstr (line, task->task_pool));
break;
case READ_HEADER:
- return parse_header (task, line);
+ return parse_header (task, fstrcstr (line, task->task_pool));
break;
}
}
@@ -323,7 +324,7 @@ show_url_header (struct worker_task *task)
/* Do header folding */
if (host.len + r >= OUTBUFSIZ - 3) {
outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' ';
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
r = 0;
}
/* Write url host to buf */
@@ -340,7 +341,7 @@ show_url_header (struct worker_task *task)
*(host.begin + host.len) = c;
}
}
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
static void
@@ -363,7 +364,7 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name,
(is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
}
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
static int
@@ -374,7 +375,7 @@ write_check_reply (struct worker_task *task)
struct metric_result *metric_res;
r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK");
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
if (task->proto == SPAMC_PROTO) {
/* Ignore metrics, just write report for 'default' metric */
metric_res = g_hash_table_lookup (task->results, "default");
@@ -391,7 +392,7 @@ write_check_reply (struct worker_task *task)
/* URL stat */
show_url_header (task);
}
- bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1);
+ rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE);
return 0;
}
@@ -423,7 +424,7 @@ show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_dat
g_list_free (symbols);
msg_debug ("show_metric_symbols: write symbols line: %s", outbuf);
outbuf[r++] = '\r'; outbuf[r++] = '\n';
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
static int
@@ -450,6 +451,7 @@ write_symbols_reply (struct worker_task *task)
/* Write result for each metric separately */
g_hash_table_foreach (task->results, show_metric_symbols, task);
}
+
return 0;
}
@@ -460,9 +462,9 @@ write_process_reply (struct worker_task *task)
char outbuf[OUTBUFSIZ];
r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF,
- (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->buf->len);
- bufferevent_write (task->bev, outbuf, r);
- bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len);
+ (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->len);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
+ rspamd_dispatcher_write (task->dispatcher, task->msg->begin, task->msg->len, FALSE);
return 0;
}
@@ -486,7 +488,7 @@ write_reply (struct worker_task *task)
msg_debug ("write_reply: writing error: %s", outbuf);
}
/* Write to bufferevent error message */
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
}
else {
switch (task->cmd) {
@@ -504,11 +506,11 @@ write_reply (struct worker_task *task)
case CMD_SKIP:
r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
SPAMD_OK);
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
break;
case CMD_PING:
r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
- bufferevent_write (task->bev, outbuf, r);
+ rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
break;
}
}