From a93dd7892f8ecf0a72a242d2d203575001ef635e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 31 Mar 2011 17:35:35 +0400 Subject: [PATCH] * Preload statfiles at the early beginning in the main process * Add JSON output for rspamd worker * Add HTTP protocol support for rspamd worker --- src/main.c | 30 +++ src/main.h | 2 + src/protocol.c | 648 +++++++++++++++++++++++++++++++++++++------------ src/protocol.h | 2 +- src/worker.c | 15 +- 5 files changed, 539 insertions(+), 158 deletions(-) diff --git a/src/main.c b/src/main.c index cef64d8a1..d268bc3cb 100644 --- a/src/main.c +++ b/src/main.c @@ -680,6 +680,33 @@ convert_old_config (struct rspamd_main *rspamd) } #endif +static void +preload_statfiles (struct rspamd_main *rspamd) +{ + struct classifier_config *cf; + struct statfile *st; + stat_file_t *stf; + GList *cur_cl, *cur_st; + + cur_cl = rspamd->cfg->classifiers; + while (cur_cl) { + /* Open all statfiles */ + cf = cur_cl->data; + + cur_st = cf->statfiles; + while (cur_st) { + st = cur_st->data; + stf = statfile_pool_open (rspamd->statfile_pool, st->path, st->size, FALSE); + if (stf == NULL) { + msg_warn ("preload of %s from %s failed", st->symbol, st->path); + } + cur_st = g_list_next (cur_st); + } + + cur_cl = g_list_next (cur_cl); + } +} + static gboolean load_rspamd_config (struct config_file *cfg, gboolean init_modules) { @@ -990,6 +1017,9 @@ main (gint argc, gchar **argv, gchar **env) /* Flush log */ flush_log_buf (); + /* Preload all statfiles */ + preload_statfiles (rspamd); + /* Spawn workers */ rspamd->workers = g_hash_table_new (g_direct_hash, g_direct_equal); spawn_workers (rspamd); diff --git a/src/main.h b/src/main.h index aac10529e..78b3af14b 100644 --- a/src/main.h +++ b/src/main.h @@ -179,6 +179,8 @@ struct worker_task { struct custom_command *custom_cmd; /**< custom command if any */ gint sock; /**< socket descriptor */ gboolean is_mime; /**< if this task is mime task */ + gboolean is_json; /**< output is JSON */ + gboolean is_http; /**< output is HTTP */ gboolean is_skipped; /**< whether message was skipped by configuration */ gchar *helo; /**< helo header value */ gchar *from; /**< from header value */ diff --git a/src/protocol.c b/src/protocol.c index edeec92b9..8ffaddea1 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -85,6 +85,7 @@ #define ERROR_HEADER "Error" #define USER_HEADER "User" #define PASS_HEADER "Pass" +#define JSON_HEADER "Json" #define DELIVER_TO_HEADER "Deliver-To" static GList *custom_commands = NULL; @@ -137,19 +138,11 @@ separate_command (f_str_t * in, gchar c) return NULL; } -static gint -parse_command (struct worker_task *task, f_str_t * line) +static gboolean +parse_check_command (struct worker_task *task, gchar *token) { - gchar *token; - struct custom_command *cmd; GList *cur; - - task->proto_ver = 11; - token = separate_command (line, ' '); - if (line == NULL || token == NULL) { - debug_task ("bad command"); - return -1; - } + struct custom_command *cmd; switch (token[0]) { case 'c': @@ -160,7 +153,7 @@ parse_command (struct worker_task *task, f_str_t * line) } else { debug_task ("bad command: %s", token); - return -1; + return FALSE; } break; case 's': @@ -174,7 +167,7 @@ parse_command (struct worker_task *task, f_str_t * line) } else { debug_task ("bad command: %s", token); - return -1; + return FALSE; } break; case 'p': @@ -188,7 +181,7 @@ parse_command (struct worker_task *task, f_str_t * line) } else { debug_task ("bad command: %s", token); - return -1; + return FALSE; } break; case 'r': @@ -202,7 +195,7 @@ parse_command (struct worker_task *task, f_str_t * line) } else { debug_task ("bad command: %s", token); - return -1; + return FALSE; } break; default: @@ -219,11 +212,30 @@ parse_command (struct worker_task *task, f_str_t * line) if (cur == NULL) { debug_task ("bad command: %s", token); - return -1; + return FALSE; } break; } + return TRUE; +} + +static gboolean +parse_rspamc_command (struct worker_task *task, f_str_t * line) +{ + gchar *token; + + /* Separate line */ + token = separate_command (line, ' '); + if (line == NULL || token == NULL) { + debug_task ("bad command"); + return FALSE; + } + + if (!parse_check_command (task, token)) { + return FALSE; + } + if (g_ascii_strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) { task->proto = RSPAMC_PROTO; task->proto_ver = 10; @@ -246,15 +258,115 @@ parse_command (struct worker_task *task, f_str_t * line) task->proto_ver = 12; } else { - return -1; + return FALSE; } task->state = READ_HEADER; + + return TRUE; +} + +static gboolean +parse_http_command (struct worker_task *task, f_str_t * line) +{ + guint8 *p, *end, *c; + gint state = 0, next_state; + gchar *cmd; + + p = line->begin; + end = p + line->len; + task->proto = RSPAMC_PROTO; + + while (p < end) { + switch (state) { + case 0: + /* Expect GET or POST here */ + if ((end - p > 3 && + (*p == 'G' || *p == 'g') && + (p[1] == 'E' || p[1] == 'e') && + (p[2] == 'T' || p[2] == 't')) || + (end - p > 4 && + (*p == 'P' || *p == 'p') && + (p[1] == 'O' || p[1] == 'o') && + (p[2] == 'S' || p[2] == 's') && + (p[3] == 'T' || p[3] == 't'))) { + state = 99; + next_state = 1; + p += (*p == 'g' || *p == 'G') ? 3 : 4; + } + else { + msg_info ("invalid HTTP request: %V", line); + return FALSE; + } + break; + case 1: + /* Get command or path */ + if (!g_ascii_isspace (*p)) { + p ++; + } + else { + /* Copy command */ + cmd = memory_pool_alloc (task->task_pool, p - c); + rspamd_strlcpy (cmd, c, p - c); + /* Skip the first '/' */ + if (*cmd == '/') { + cmd ++; + } + if (!parse_check_command (task, cmd)) { + /* Assume that command is symbols */ + task->cmd = CMD_SYMBOLS; + } + state = 99; + next_state = 2; + } + break; + case 2: + /* Get HTTP/1.0 or HTTP/1.1 */ + if (p == end - 1) { + /* We are at the end */ + if (g_ascii_strncasecmp (c, "HTTP/1.0", sizeof ("HTTP/1.0") - 1) == 0 || + g_ascii_strncasecmp (c, "HTTP/1.1", sizeof ("HTTP/1.1") - 1) == 0) { + task->state = READ_HEADER; + return TRUE; + } + } + else { + p ++; + } + break; + case 99: + /* Skip spaces */ + if (g_ascii_isspace (*p)) { + p ++; + } + else { + state = next_state; + c = p; + } + break; + } + } + + return FALSE; +} + +static gboolean +parse_command (struct worker_task *task, f_str_t * line) +{ + task->proto_ver = 11; + + if (! task->is_http) { + return parse_rspamc_command (task, line); + } + else { + return parse_http_command (task, line); + } - return 0; + /* Unreached */ + return FALSE; } -static gint +static gboolean parse_header (struct worker_task *task, f_str_t * line) { gchar *headern, *err, *tmp; @@ -274,16 +386,16 @@ parse_header (struct worker_task *task, f_str_t * line) task->last_error = "Unknown content length"; task->error_code = RSPAMD_LENGTH_ERROR; task->state = WRITE_ERROR; - return -1; + return FALSE; } } - return 0; + return TRUE; } headern = separate_command (line, ':'); if (line == NULL || headern == NULL) { - return -1; + return FALSE; } /* Eat whitespaces */ g_strstrip (headern); @@ -302,7 +414,7 @@ parse_header (struct worker_task *task, f_str_t * line) } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } break; case 'd': @@ -314,7 +426,7 @@ parse_header (struct worker_task *task, f_str_t * line) } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } break; case 'h': @@ -326,7 +438,7 @@ parse_header (struct worker_task *task, f_str_t * line) } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } break; case 'f': @@ -338,7 +450,18 @@ parse_header (struct worker_task *task, f_str_t * line) } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; + } + break; + case 'j': + case 'J': + /* json */ + if (g_ascii_strncasecmp (headern, JSON_HEADER, sizeof (JSON_HEADER) - 1) == 0) { + task->is_json = parse_flag (memory_pool_fstrdup (task->task_pool, line)); + } + else { + msg_info ("wrong header: %s", headern); + return FALSE; } break; case 'q': @@ -350,7 +473,7 @@ parse_header (struct worker_task *task, f_str_t * line) } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } break; case 'r': @@ -368,7 +491,7 @@ parse_header (struct worker_task *task, f_str_t * line) } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } break; case 'i': @@ -378,13 +501,13 @@ parse_header (struct worker_task *task, f_str_t * line) tmp = memory_pool_fstrdup (task->task_pool, line); if (!inet_aton (tmp, &task->from_addr)) { msg_info ("bad ip header: '%s'", tmp); - return -1; + return FALSE; } debug_task ("read IP header, value: %s", tmp); } else { msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } break; case 'p': @@ -396,6 +519,9 @@ parse_header (struct worker_task *task, f_str_t * line) msg_info ("pass all filters"); } } + else { + return FALSE; + } break; case 's': case 'S': @@ -403,7 +529,7 @@ parse_header (struct worker_task *task, f_str_t * line) task->subject = memory_pool_fstrdup (task->task_pool, line); } else { - return -1; + return FALSE; } break; case 'u': @@ -413,18 +539,18 @@ parse_header (struct worker_task *task, f_str_t * line) task->user = memory_pool_fstrdup (task->task_pool, line); } else { - return -1; + return FALSE; } break; default: msg_info ("wrong header: %s", headern); - return -1; + return FALSE; } - return 0; + return TRUE; } -gint +gboolean read_rspamd_input_line (struct worker_task *task, f_str_t * line) { switch (task->state) { @@ -435,9 +561,9 @@ read_rspamd_input_line (struct worker_task *task, f_str_t * line) return parse_header (task, line); break; default: - return -1; + return FALSE; } - return -1; + return FALSE; } struct metric_callback_data { @@ -579,8 +705,11 @@ show_email_header (struct worker_task *task) return rspamd_dispatcher_write (task->dispatcher, outbuf, cb.off, FALSE, FALSE); } +/* + * Print a single symbol using rspamc protocol + */ static void -metric_symbols_callback (gpointer key, gpointer value, void *user_data) +metric_symbols_callback_rspamc (gpointer key, gpointer value, void *user_data) { struct metric_callback_data *cd = (struct metric_callback_data *)user_data; struct worker_task *task = cd->task; @@ -656,10 +785,10 @@ metric_symbols_callback (gpointer key, gpointer value, void *user_data) } static gboolean -show_metric_symbols (struct metric_result *metric_res, struct metric_callback_data *cd) +show_metric_symbols_rspamc (struct metric_result *metric_res, struct metric_callback_data *cd) { cd->cur_metric = metric_res->metric; - g_hash_table_foreach (metric_res->symbols, metric_symbols_callback, cd); + g_hash_table_foreach (metric_res->symbols, metric_symbols_callback_rspamc, cd); /* Remove last , from log buf */ if (cd->log_buf[cd->log_offset - 1] == ',') { cd->log_buf[--cd->log_offset] = '\0'; @@ -671,55 +800,227 @@ show_metric_symbols (struct metric_result *metric_res, struct metric_callback_da return TRUE; } +/* + * JSON symbols output + */ static void -show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data) +metric_symbols_callback_json (gpointer key, gpointer value, void *user_data) { struct metric_callback_data *cd = (struct metric_callback_data *)user_data; - struct worker_task *task = cd->task; - gint r = 0; - gchar outbuf[OUTBUFSIZ]; - struct metric_result *metric_res = (struct metric_result *)metric_value; - struct metric *m; - gint is_spam = 0; - double ms = 0, rs = 0; - enum rspamd_metric_action action = METRIC_ACTION_NOACTION; + gchar *description; + struct symbol *s = (struct symbol *)value; + GList *cur; if (! cd->alive) { return; } - if (metric_name == NULL || metric_value == NULL) { - m = g_hash_table_lookup (task->cfg->metrics, DEFAULT_METRIC); - default_required_score = m->required_score; - default_score = 0; - if (!check_metric_settings (task, m, &ms, &rs)) { - ms = m->required_score; - rs = m->reject_score; + description = g_hash_table_lookup (cd->cur_metric->descriptions, key); + + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + CRLF " {" CRLF " \"name\": \"%s\"," CRLF + " \"weight\": %.2f," CRLF, + (gchar *)key, s->score); + if (description != NULL) { + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + " \"description\": \"%s\"" CRLF, description); + } + if (s->options) { + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + " \"options\": [" CRLF " "); + cur = s->options; + while (cur) { + if (g_list_next (cur)) { + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + "\"%s\", ", (gchar *)cur->data); + } + else { + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + "\"%s\"" CRLF " ]" CRLF, (gchar *)cur->data); + + } + cur = g_list_next (cur); } + } + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + " },"); +} +/* + * Print a single symbol using json protocol + */ +static gboolean +show_metric_symbols_json (struct metric_result *metric_res, struct metric_callback_data *cd) +{ + + cd->cur_metric = metric_res->metric; + cd->symbols_offset = 0; + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + " \"symbols\": ["); + + /* Print all symbols */ + g_hash_table_foreach (metric_res->symbols, metric_symbols_callback_json, cd); + /* Remove last ',' symbol */ + if (cd->symbols_buf[cd->symbols_offset - 1] == ',') { + cd->symbols_buf[--cd->symbols_offset] = '\0'; + } + cd->symbols_offset += rspamd_snprintf (cd->symbols_buf + cd->symbols_offset, cd->symbols_size - cd->symbols_offset, + CRLF " ]" CRLF " }" CRLF); + if (! rspamd_dispatcher_write (cd->task->dispatcher, cd->symbols_buf, cd->symbols_offset, FALSE, FALSE)) { + cd->alive = FALSE; + } + return cd->alive; +} + +/* Print a single metric line */ +static gint +print_metric_data_rspamc (struct worker_task *task, gchar *outbuf, gsize size, + struct metric_result *metric_res, + struct metric *m, double ms, double rs, + enum rspamd_metric_action action) +{ + gint r = 0; + gboolean is_spam = FALSE; + + if (metric_res == NULL) { if (task->proto == SPAMC_PROTO) { - r = rspamd_snprintf (outbuf, sizeof(outbuf), + r = rspamd_snprintf (outbuf, size, "Spam: False ; 0 / %.2f" CRLF, ms); } else { if (task->proto_ver >= 11) { if (!task->is_skipped) { - r = rspamd_snprintf (outbuf, sizeof(outbuf), + r = rspamd_snprintf (outbuf, size, "Metric: default; False; 0.00 / %.2f / %.2f" CRLF, ms, rs); } else { - r = rspamd_snprintf (outbuf, sizeof(outbuf), + r = rspamd_snprintf (outbuf, size, "Metric: default; Skip; 0.00 / %.2f / %.2f" CRLF, ms, rs); } } else { - r = rspamd_snprintf (outbuf, sizeof(outbuf), + r = rspamd_snprintf (outbuf, size, "Metric: default; False; 0.00 / %.2f" CRLF, ms); } - r += rspamd_snprintf (outbuf + r, sizeof(outbuf) - r, + r += rspamd_snprintf (outbuf + r, size - r, "Action: %s" CRLF, str_action_metric ( METRIC_ACTION_NOACTION)); } + } + else { + is_spam = metric_res->score >= ms; + + if (task->proto == SPAMC_PROTO) { + if (task->cmd != CMD_REPORT_IFSPAM || is_spam) { + r = rspamd_snprintf (outbuf, size, + "Spam: %s ; %.2f / %.2f" CRLF, (is_spam) ? "True" + : "False", metric_res->score, ms); + } + } + else { + if (task->proto_ver >= 11) { + if (!task->is_skipped) { + r = rspamd_snprintf (outbuf, size, + "Metric: %s; %s; %.2f / %.2f / %.2f" CRLF, + (gchar *) m->name, + (is_spam) ? "True" : "False", metric_res->score, + ms, rs); + } + else { + r = rspamd_snprintf (outbuf, size, + "Metric: %s; Skip; %.2f / %.2f / %.2f" CRLF, + (gchar *) m->name, metric_res->score, ms, rs); + } + } + else { + r = rspamd_snprintf (outbuf, size, + "Metric: %s; %s; %.2f / %.2f" CRLF, + (gchar *) m->name, (is_spam) ? "True" : "False", + metric_res->score, ms); + } + r += rspamd_snprintf (outbuf + r, size - r, + "Action: %s" CRLF, str_action_metric (action)); + } + } + + return r; +} + +/* Print a single metric line in json */ +static gint +print_metric_data_json (struct worker_task *task, gchar *outbuf, gsize size, + struct metric_result *metric_res, + struct metric *m, double ms, double rs, + enum rspamd_metric_action action) +{ + gint r = 0; + + + if (metric_res == NULL) { + r = rspamd_snprintf (outbuf, size, + " {" CRLF " \"name\": \"default\"," CRLF + " \"is_spam\": false," CRLF + " \"is_skipped\": %s," CRLF + " \"score\": 0.00," CRLF + " \"required_score\": %.2f," CRLF + " \"reject_score\": %.2f," CRLF + " \"action\": \"%s\"," CRLF, + task->is_skipped ? "true" : "false", ms, rs, + str_action_metric (METRIC_ACTION_NOACTION)); + } + else { + r = rspamd_snprintf (outbuf, size, + " {" CRLF " \"name\": \"default\"," CRLF + " \"is_spam\": %s," CRLF + " \"is_skipped\": %s," CRLF + " \"score\": %.2f," CRLF + " \"required_score\": %.2f," CRLF + " \"reject_score\": %.2f," CRLF + " \"action\": \"%s\"," CRLF, + metric_res->score >= ms ? "true" : "false", + metric_res->score, + task->is_skipped ? "true" : "false", ms, rs, + str_action_metric (action)); + } + + return r; +} + +static void +show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data) +{ + struct metric_callback_data *cd = (struct metric_callback_data *)user_data; + struct worker_task *task = cd->task; + + gchar outbuf[OUTBUFSIZ]; + struct metric_result *metric_res = (struct metric_result *)metric_value; + struct metric *m; + gboolean is_spam; + gint r; + + double ms = 0, rs = 0; + enum rspamd_metric_action action = METRIC_ACTION_NOACTION; + + if (! cd->alive) { + return; + } + if (metric_name == NULL || metric_value == NULL) { + m = g_hash_table_lookup (task->cfg->metrics, DEFAULT_METRIC); + default_required_score = m->required_score; + default_score = 0; + if (!check_metric_settings (task, m, &ms, &rs)) { + ms = m->required_score; + rs = m->reject_score; + } + + if (!task->is_json) { + r = print_metric_data_rspamc (task, outbuf, sizeof (outbuf), NULL, m, ms, rs, action); + } + else { + r = print_metric_data_json (task, outbuf, sizeof (outbuf), NULL, m, ms, rs, action); + } + + /* Write log line */ if (!task->is_skipped) { cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, @@ -748,42 +1049,16 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data metric_res->metric); } - if (metric_res->score >= ms) { - is_spam = 1; - } - - if (task->proto == SPAMC_PROTO) { - if (task->cmd != CMD_REPORT_IFSPAM || is_spam) { - r = rspamd_snprintf (outbuf, sizeof(outbuf), - "Spam: %s ; %.2f / %.2f" CRLF, (is_spam) ? "True" - : "False", metric_res->score, ms); - } + if (!task->is_json) { + r = print_metric_data_rspamc (task, outbuf, sizeof (outbuf), metric_res, metric_res->metric, ms, rs, action); } else { - if (task->proto_ver >= 11) { - if (!task->is_skipped) { - r = rspamd_snprintf (outbuf, sizeof(outbuf), - "Metric: %s; %s; %.2f / %.2f / %.2f" CRLF, - (gchar *) metric_name, - (is_spam) ? "True" : "False", metric_res->score, - ms, rs); - } - else { - r = rspamd_snprintf (outbuf, sizeof(outbuf), - "Metric: %s; Skip; %.2f / %.2f / %.2f" CRLF, - (gchar *) metric_name, metric_res->score, ms, rs); - } - } - else { - r = rspamd_snprintf (outbuf, sizeof(outbuf), - "Metric: %s; %s; %.2f / %.2f" CRLF, - (gchar *) metric_name, (is_spam) ? "True" : "False", - metric_res->score, ms); - } - r += rspamd_snprintf (outbuf + r, sizeof(outbuf) - r, - "Action: %s" CRLF, str_action_metric (action)); + r = print_metric_data_json (task, outbuf, sizeof (outbuf), metric_res, metric_res->metric, ms, rs, action); } + /* Write log line */ if (!task->is_skipped) { + is_spam = metric_res->score >= ms; + cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "(%s: %c (%s): [%.2f/%.2f/%.2f] [", (gchar *) metric_name, @@ -799,12 +1074,16 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data } } if (task->cmd == CMD_PROCESS) { + if (task->is_json) { + r += rspamd_snprintf (outbuf + r, sizeof (outbuf) - r, "}" CRLF); + } #ifndef GMIME24 g_mime_message_add_header (task->message, "X-Spam-Status", outbuf); #else g_mime_object_append_header (GMIME_OBJECT (task->message), "X-Spam-Status", outbuf); #endif + } else { if (!rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { @@ -813,9 +1092,25 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data } if (metric_value != NULL) { - if (!show_metric_symbols (metric_res, cd)) { - cd->alive = FALSE; - return; + if (task->is_json) { + if (!show_metric_symbols_json (metric_res, cd)) { + cd->alive = FALSE; + return; + } + } + else { + if (!show_metric_symbols_rspamc (metric_res, cd)) { + cd->alive = FALSE; + return; + } + } + } + else { + if (task->is_json) { + if (!rspamd_dispatcher_write (task->dispatcher, " }" CRLF, 5, FALSE, TRUE)) { + cd->alive = FALSE; + return; + } } } } @@ -860,10 +1155,19 @@ write_check_reply (struct worker_task *task) struct metric_result *metric_res; struct metric_callback_data cd; - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, + /* Output the first line - check status */ + if (task->is_http) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 OK" CRLF "Connection: close" CRLF CRLF); + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) { + return FALSE; + } + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), SPAMD_OK); - if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) { - return FALSE; + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) { + return FALSE; + } } cd.task = task; @@ -883,7 +1187,7 @@ write_check_reply (struct worker_task *task) } cd.alive = TRUE; - if (task->proto == SPAMC_PROTO) { + if (task->proto == SPAMC_PROTO && !task->is_http) { /* Ignore metrics, just write report for 'default' metric */ metric_res = g_hash_table_lookup (task->results, "default"); @@ -903,6 +1207,11 @@ write_check_reply (struct worker_task *task) } else { /* Show default metric first */ + if (task->is_json) { + if (! rspamd_dispatcher_write (task->dispatcher, "[" CRLF, 3, TRUE, TRUE)) { + return FALSE; + } + } metric_res = g_hash_table_lookup (task->results, "default"); if (metric_res == NULL) { /* Implicit metric result */ @@ -924,61 +1233,74 @@ write_check_reply (struct worker_task *task) if (!cd.alive) { return FALSE; } - /* Messages */ - if (! show_messages (task)) { - return FALSE; - } - /* URL stat */ - if (! show_url_header (task)) { - return FALSE; + if (task->is_json) { + if (! rspamd_dispatcher_write (task->dispatcher, "]" CRLF, 3, FALSE, TRUE)) { + return FALSE; + } } - /* Emails stat */ - if (! show_email_header (task)) { - return FALSE; + else { + /* XXX: add this for JSON as well */ + /* Messages */ + if (! show_messages (task)) { + return FALSE; + } + /* URL stat */ + if (! show_url_header (task)) { + return FALSE; + } + /* Emails stat */ + if (! show_email_header (task)) { + return FALSE; + } } } write_hashes_to_log (task, logbuf, cd.log_offset, cd.log_size); msg_info ("%s", logbuf); - if (task->proto == RSPAMC_PROTO && task->proto_ver >= 12) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "Message-ID: %s" CRLF CRLF, task->message_id); - if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { - return FALSE; - } - } - else if (task->proto == SPAMC_PROTO && task->cmd == CMD_SYMBOLS) { - len = strlen (cd.symbols_buf); - r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF "%s" CRLF, - len + sizeof (CRLF) - 1, cd.symbols_buf); - if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { - return FALSE; - } - } - - if (task->cmd == CMD_REPORT || task->cmd == CMD_REPORT_IFSPAM) { - if (default_score >= default_required_score) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF - "This message is likely spam" CRLF "%s", - sizeof ("This message is likely spam" CRLF) - 1 + cd.report_offset, - cd.report_buf); - } - else if (default_score > 0.01) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF - "This message is probably spam" CRLF "%s", - sizeof ("This message is probably spam" CRLF) - 1 + cd.report_offset, - cd.report_buf); + if (!task->is_json) { + /* Write message id */ + if (task->proto == RSPAMC_PROTO && task->proto_ver >= 12) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "Message-ID: %s" CRLF CRLF, task->message_id); + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { + return FALSE; + } } - else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF - "This message is not spam" CRLF "%s", - sizeof ("This message is not spam" CRLF) - 1 + cd.report_offset, - cd.report_buf); + /* Write symbols for spamc proto */ + else if (task->proto == SPAMC_PROTO && task->cmd == CMD_SYMBOLS) { + len = strlen (cd.symbols_buf); + r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF "%s" CRLF, + len + sizeof (CRLF) - 1, cd.symbols_buf); + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { + return FALSE; + } } - if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { - return FALSE; + /* Write report, based on score of default metric */ + if (task->cmd == CMD_REPORT || task->cmd == CMD_REPORT_IFSPAM) { + if (default_score >= default_required_score) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF + "This message is likely spam" CRLF "%s", + sizeof ("This message is likely spam" CRLF) - 1 + cd.report_offset, + cd.report_buf); + } + else if (default_score > 0.01) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF + "This message is probably spam" CRLF "%s", + sizeof ("This message is probably spam" CRLF) - 1 + cd.report_offset, + cd.report_buf); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), CONTENT_LENGTH_HEADER ": %d" CRLF CRLF + "This message is not spam" CRLF "%s", + sizeof ("This message is not spam" CRLF) - 1 + cd.report_offset, + cd.report_buf); + } + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { + return FALSE; + } } } + /* Increase counters */ task->worker->srv->stat->messages_scanned++; if (default_score >= default_required_score) { task->worker->srv->stat->messages_spam ++; @@ -1000,9 +1322,20 @@ write_process_reply (struct worker_task *task) struct metric_result *metric_res; struct metric_callback_data cd; - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, - (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, - rspamc_proto_str (task->proto_ver), SPAMD_OK); + /* Output the first line - check status */ + if (task->is_http) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 200 OK" CRLF "Connection: close" CRLF CRLF); + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) { + return FALSE; + } + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, + rspamc_proto_str (task->proto_ver), SPAMD_OK); + if (! rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE, FALSE)) { + return FALSE; + } + } cd.task = task; cd.log_buf = logbuf; @@ -1062,6 +1395,7 @@ write_process_reply (struct worker_task *task) write_hashes_to_log (task, logbuf, cd.log_offset, cd.log_size); msg_info ("%s", logbuf); + /* Now prepare and write message itself */ outmsg = g_mime_object_to_string (GMIME_OBJECT (task->message)); memory_pool_add_destructor (task->task_pool, (pool_destruct_func) g_free, outmsg); @@ -1097,15 +1431,21 @@ write_reply (struct worker_task *task) debug_task ("writing reply to client"); if (task->error_code != 0) { /* Write error message and error code to reply */ - if (task->proto == SPAMC_PROTO) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF, - SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR); - debug_task ("writing error: %s", outbuf); + if (task->is_http) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "HTTP/1.0 400 Bad request" CRLF + "Connection: close" CRLF CRLF); } else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF, - RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error); - debug_task ("writing error: %s", outbuf); + if (task->proto == SPAMC_PROTO) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF CRLF, + SPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR); + debug_task ("writing error: %s", outbuf); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s/%s %d %s" CRLF "%s: %s" CRLF CRLF, + RSPAMD_REPLY_BANNER, rspamc_proto_str (task->proto_ver), task->error_code, SPAMD_ERROR, ERROR_HEADER, task->last_error); + debug_task ("writing error: %s", outbuf); + } } /* Write to bufferevent error message */ return rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE); diff --git a/src/protocol.h b/src/protocol.h index 0d2a0207b..a15530a7b 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -61,7 +61,7 @@ struct custom_command { * @param line line of user's input * @return 0 if line was successfully parsed and -1 if we have protocol error */ -gint read_rspamd_input_line (struct worker_task *task, f_str_t *line); +gboolean read_rspamd_input_line (struct worker_task *task, f_str_t *line); /** * Write reply for specified task command diff --git a/src/worker.c b/src/worker.c index f9a6f936e..57ad4ecf1 100644 --- a/src/worker.c +++ b/src/worker.c @@ -81,10 +81,14 @@ struct custom_filter { struct rspamd_worker_ctx { guint32 timeout; struct timeval io_tv; - /* Detect whether this worker is mime worker */ + /* Detect whether this worker is mime worker */ gboolean is_mime; - /* Detect whether this worker is mime worker */ + /* Detect whether this worker is custom worker */ gboolean is_custom; + /* HTTP worker */ + gboolean is_http; + /* JSON output */ + gboolean is_json; GList *custom_filters; /* DNS resolver */ struct rspamd_dns_resolver *resolver; @@ -327,7 +331,7 @@ read_socket (f_str_t * in, void *arg) } } else { - if (read_rspamd_input_line (task, in) != 0) { + if (!read_rspamd_input_line (task, in)) { task->last_error = "Read error"; task->error_code = RSPAMD_NETWORK_ERROR; task->state = WRITE_ERROR; @@ -603,6 +607,9 @@ accept_socket (gint fd, short what, void *arg) new_task->sock = nfd; new_task->is_mime = ctx->is_mime; + new_task->is_json = ctx->is_json; + new_task->is_http = ctx->is_http; + worker->srv->stat->connections_count++; new_task->resolver = ctx->resolver; msec_to_tv (ctx->timeout, &ctx->io_tv); @@ -741,6 +748,8 @@ init_worker (void) ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT; register_worker_opt (TYPE_WORKER, "mime", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime)); + register_worker_opt (TYPE_WORKER, "http", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http)); + register_worker_opt (TYPE_WORKER, "json", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json)); register_worker_opt (TYPE_WORKER, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout)); return ctx; -- 2.39.5