diff options
-rw-r--r-- | cfg_file.h | 2 | ||||
-rw-r--r-- | main.h | 5 | ||||
-rw-r--r-- | protocol.c | 218 | ||||
-rw-r--r-- | protocol.h | 5 | ||||
-rw-r--r-- | test/rspamd_test_suite.c | 4 | ||||
-rw-r--r-- | url.h | 2 | ||||
-rw-r--r-- | worker.c | 11 |
7 files changed, 240 insertions, 7 deletions
diff --git a/cfg_file.h b/cfg_file.h index 6eb441c71..5b4e2d84b 100644 --- a/cfg_file.h +++ b/cfg_file.h @@ -8,7 +8,7 @@ #include "config.h" #include <sys/types.h> -#ifndef OWN_QUEUE_H +#ifndef HAVE_OWN_QUEUE_H #include <sys/queue.h> #else #include "queue.h" @@ -128,8 +128,13 @@ struct worker_task { TAILQ_HEAD (uriq, uri) urls; /* Hash of metric result structures */ GHashTable *results; + /* Config file to write to */ struct config_file *cfg; + /* Save point for filters deferred processing */ struct save_point save; + /* Saved error message and code */ + char *last_error; + int error_code; /* Memory pool that is associated with this task */ memory_pool_t *task_pool; }; diff --git a/protocol.c b/protocol.c index 4a5ec5360..832a14281 100644 --- a/protocol.c +++ b/protocol.c @@ -4,6 +4,9 @@ #include <glib.h> #include "main.h" +#define CRLF "\r\n" +/* Max line size as it is defined in rfc2822 */ +#define OUTBUFSIZ 1000 /* * Just check if the passed message is spam or not and reply as * described below @@ -53,6 +56,15 @@ #define IP_ADDR_HEADER "IP" #define NRCPT_HEADER "Recipient-Number" #define RCPT_HEADER "Rcpt" +#define ERROR_HEADER "Error" +/* + * Reply messages + */ +#define RSPAMD_REPLY_BANNER "RSPAMD/1.0" +#define SPAMD_REPLY_BANNER "SPAMD/1.1" +#define SPAMD_OK "EX_OK" +/* XXX: try to convert rspamd errors to spamd errors */ +#define SPAMD_ERROR "EX_ERROR" static int parse_command (struct worker_task *task, char *line) @@ -252,3 +264,209 @@ read_rspamd_input_line (struct worker_task *task, char *line) break; } } + +static void +show_url_header (struct worker_task *task) +{ + int r = 0; + char outbuf[OUTBUFSIZ], c; + struct uri *url; + f_str_t host; + + r = snprintf (outbuf, sizeof (outbuf), "Urls: "); + TAILQ_FOREACH (url, &task->urls, next) { + host.begin = url->host; + host.len = url->hostlen; + /* Skip long hosts to avoid protocol coollisions */ + if (host.len > OUTBUFSIZ) { + continue; + } + /* Do header folding */ + if (host.len + r >= OUTBUFSIZ - 3) { + outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' '; + bufferevent_write (task->bev, outbuf, r); + r = 0; + } + /* Write url host to buf */ + if (TAILQ_NEXT (url, next) != NULL) { + c = *(host.begin + host.len); + *(host.begin + host.len) = '\0'; + r += snprintf (outbuf, sizeof (outbuf) - r, "%s, ", host.begin); + *(host.begin + host.len) = c; + } + else { + c = *(host.begin + host.len); + *(host.begin + host.len) = '\0'; + r += snprintf (outbuf, sizeof (outbuf) - r, "%s" CRLF, host.begin); + *(host.begin + host.len) = c; + } + } + bufferevent_write (task->bev, outbuf, r); +} + +static void +show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data) +{ + struct worker_task *task = (struct worker_task *)user_data; + int r; + char outbuf[OUTBUFSIZ]; + struct metric_result *metric_res = (struct metric_result *)metric_value; + int is_spam = 0; + + if (metric_res->score >= metric_res->metric->required_score) { + is_spam = 1; + } + if (task->proto == SPAMC_PROTO) { + r = snprintf (outbuf, sizeof (outbuf), "Spam: %s ; %.2f / %.2f" CRLF, + (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score); + } + else { + r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name, + (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score); + } + bufferevent_write (task->bev, outbuf, r); +} + +static int +write_check_reply (struct worker_task *task) +{ + int r; + char outbuf[OUTBUFSIZ]; + struct metric_result *metric_res; + + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); + bufferevent_write (task->bev, outbuf, r); + if (task->proto == SPAMC_PROTO) { + /* Ignore metrics, just write report for 'default' metric */ + metric_res = g_hash_table_lookup (task->results, "default"); + if (metric_res == NULL) { + return -1; + } + else { + show_metric_result ((gpointer)"default", (gpointer)metric_res, (void *)task); + } + } + else { + /* Write result for each metric separately */ + g_hash_table_foreach (task->results, show_metric_result, task); + /* URL stat */ + show_url_header (task); + } + bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1); + + return 0; +} + +static void +show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_data) +{ + struct worker_task *task = (struct worker_task *)user_data; + int r = 0; + char outbuf[OUTBUFSIZ]; + struct filter_result *result; + struct metric_result *metric_res = (struct metric_result *)metric_value; + + if (task->proto == RSPAMC_PROTO) { + r = snprintf (outbuf, sizeof (outbuf), "%s: ", (char *)metric_name); + } + + LIST_FOREACH (result, &metric_res->results, next) { + if (result->flag) { + if (LIST_NEXT (result, next) != NULL) { + r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s,", result->symbol); + } + else { + r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s", result->symbol); + } + } + } + outbuf[r++] = '\r'; outbuf[r] = '\n'; + bufferevent_write (task->bev, outbuf, r); +} + +static int +write_symbols_reply (struct worker_task *task) +{ + struct metric_result *metric_res; + + /* First of all write normal results by calling write_check_reply */ + if (write_check_reply (task) == -1) { + return -1; + } + /* Now write symbols */ + if (task->proto == SPAMC_PROTO) { + /* Ignore metrics, just write report for 'default' metric */ + metric_res = g_hash_table_lookup (task->results, "default"); + if (metric_res == NULL) { + return -1; + } + else { + show_metric_symbols ((gpointer)"default", (gpointer)metric_res, (void *)task); + } + } + else { + /* Write result for each metric separately */ + g_hash_table_foreach (task->results, show_metric_symbols, task); + } + return 0; +} + +static int +write_process_reply (struct worker_task *task) +{ + int r; + char outbuf[OUTBUFSIZ]; + + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF, + (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, task->msg->buf->len); + bufferevent_write (task->bev, outbuf, r); + bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len); + + return 0; +} + +int +write_reply (struct worker_task *task) +{ + int r; + char outbuf[OUTBUFSIZ]; + + if (task->error_code != 0) { + /* Write error message and error code to reply */ + if (task->proto == SPAMC_PROTO) { + r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR); + } + else { + r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code, + SPAMD_ERROR, ERROR_HEADER, task->last_error); + } + /* Write to bufferevent error message */ + bufferevent_write (task->bev, outbuf, r); + } + else { + switch (task->cmd) { + case CMD_REPORT_IFSPAM: + case CMD_REPORT: + case CMD_CHECK: + return write_check_reply (task); + break; + case CMD_SYMBOLS: + return write_symbols_reply (task); + break; + case CMD_PROCESS: + return write_process_reply (task); + break; + case CMD_SKIP: + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, + SPAMD_OK); + bufferevent_write (task->bev, outbuf, r); + break; + case CMD_PING: + r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); + bufferevent_write (task->bev, outbuf, r); + break; + } + } + + return 0; +} diff --git a/protocol.h b/protocol.h index 17a7a9315..50b458de0 100644 --- a/protocol.h +++ b/protocol.h @@ -3,6 +3,10 @@ #include "config.h" +#define RSPAMD_FILTER_ERROR 1 +#define RSPAMD_NETWORK_ERROR 2 +#define RSPAMD_PROTOCOL_ERROR 3 + struct worker_task; enum rspamd_protocol { @@ -21,5 +25,6 @@ enum rspamd_command { }; int read_rspamd_input_line (struct worker_task *task, char *line); +int write_reply (struct worker_task *task); #endif diff --git a/test/rspamd_test_suite.c b/test/rspamd_test_suite.c index c7c239f87..90e64b112 100644 --- a/test/rspamd_test_suite.c +++ b/test/rspamd_test_suite.c @@ -13,6 +13,10 @@ #include "../cfg_file.h" #include "tests.h" +#ifdef HAVE_STRLCPY_H +#include "../strlcpy.c" +#endif + int main (int argc, char **argv) { @@ -4,7 +4,7 @@ #include <sys/types.h> #include <sys/socket.h> -#ifndef OWN_QUEUE_H +#ifndef HAVE_OWN_QUEUE_H #include <sys/queue.h> #else #include "queue.h" @@ -201,6 +201,8 @@ read_socket (struct bufferevent *bev, void *arg) case READ_HEADER: s = evbuffer_readline (EVBUFFER_INPUT (bev)); if (read_rspamd_input_line (task, s) != 0) { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; task->state = WRITE_ERROR; } free (s); @@ -213,6 +215,8 @@ read_socket (struct bufferevent *bev, void *arg) if (task->msg->free == 0) { r = process_message (task); if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; } else if (r == 1) { @@ -229,15 +233,14 @@ read_socket (struct bufferevent *bev, void *arg) break; case WAIT_FILTER: bufferevent_disable (bev, EV_READ); - bufferevent_disable (bev, EV_READ); break; case WRITE_REPLY: - r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1); + write_reply (task); bufferevent_disable (bev, EV_READ); bufferevent_enable (bev, EV_WRITE); break; case WRITE_ERROR: - r = bufferevent_write (bev, "Error\r\n", sizeof ("Error\r\n") - 1); + write_reply (task); bufferevent_disable (bev, EV_READ); bufferevent_enable (bev, EV_WRITE); break; @@ -293,8 +296,6 @@ accept_socket (int fd, short what, void *arg) bzero (new_task, sizeof (struct worker_task)); new_task->worker = worker; new_task->state = READ_COMMAND; - new_task->content_length = 0; - new_task->parts_count = 0; new_task->cfg = worker->srv->cfg; TAILQ_INIT (&new_task->urls); TAILQ_INIT (&new_task->parts); |