diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-01-18 18:20:54 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-01-18 18:20:54 +0000 |
commit | e2f2eed337ecdb17c897fe7e04626dfffe32f2a2 (patch) | |
tree | 25f39a0aef9ed9eacc117407cc3824b16acccb9c | |
parent | f5933d697d2cd9854afcbc7421efda353e165aea (diff) | |
download | rspamd-e2f2eed337ecdb17c897fe7e04626dfffe32f2a2.tar.gz rspamd-e2f2eed337ecdb17c897fe7e04626dfffe32f2a2.zip |
Parse HTTP requests, cleanup the code.
--HG--
extra : rebase_source : 6b35fbf55fc9fe65d7f033620670bb210928e9b4
-rw-r--r-- | src/cfg_utils.c | 2 | ||||
-rw-r--r-- | src/controller.c | 3 | ||||
-rw-r--r-- | src/main.h | 7 | ||||
-rw-r--r-- | src/plugins/surbl.c | 24 | ||||
-rw-r--r-- | src/protocol.c | 659 | ||||
-rw-r--r-- | src/protocol.h | 42 | ||||
-rw-r--r-- | src/util.c | 23 | ||||
-rw-r--r-- | src/util.h | 5 | ||||
-rw-r--r-- | src/worker.c | 288 |
9 files changed, 239 insertions, 814 deletions
diff --git a/src/cfg_utils.c b/src/cfg_utils.c index 9216fb6ef..3e1503db7 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -892,11 +892,13 @@ read_rspamd_config (struct config_file *cfg, const gchar *filename, rspamd_ucl_add_conf_macros (parser, cfg); if (!ucl_parser_add_chunk (parser, data, st.st_size)) { msg_err ("ucl parser error: %s", ucl_parser_get_error (parser)); + ucl_parser_free (parser); munmap (data, st.st_size); return FALSE; } munmap (data, st.st_size); cfg->rcl_obj = ucl_parser_get_object (parser); + ucl_parser_free (parser); res = TRUE; } diff --git a/src/controller.c b/src/controller.c index a43b14203..097ed0e7c 100644 --- a/src/controller.c +++ b/src/controller.c @@ -1226,7 +1226,8 @@ process_header (f_str_t *line, struct controller_session *session) struct rspamd_controller_ctx *ctx = session->worker->ctx; controller_func_t custom_handler; - headern = separate_command (line, ':'); + /* XXX: temporary workaround */ + headern = NULL; if (line == NULL || headern == NULL) { msg_warn ("bad header: %V", line); diff --git a/src/main.h b/src/main.h index 81cdfb9f1..ea1172468 100644 --- a/src/main.h +++ b/src/main.h @@ -187,15 +187,11 @@ struct worker_task { CLOSING_CONNECTION, WRITING_REPLY } state; /**< current session state */ - size_t content_length; /**< length of user's input */ - enum rspamd_protocol proto; /**< protocol (rspamc or spamc) */ - guint proto_ver; /**< protocol version */ enum rspamd_command cmd; /**< command */ struct custom_command *custom_cmd; /**< custom command if any */ gint sock; /**< socket descriptor */ gboolean is_mime; /**< if this task is mime task */ gboolean is_json; /**< output is JSON */ - gboolean is_http; /**< output is HTTP */ gboolean allow_learn; /**< allow learning */ gboolean is_skipped; /**< whether message was skipped by configuration */ @@ -203,7 +199,7 @@ struct worker_task { gchar *from; /**< from header value */ gchar *queue_id; /**< queue id if specified */ const gchar *message_id; /**< message id */ - GList *rcpt; /**< recipients list */ + GList *rcpt; /**< recipients list */ guint nrcpt; /**< number of recipients */ #ifdef HAVE_INET_PTON struct { @@ -222,7 +218,6 @@ struct worker_task { gchar *user; /**< user to deliver */ gchar *subject; /**< subject (for non-mime) */ gchar *hostname; /**< hostname reported by MTA */ - gchar *statfile; /**< statfile for learning */ GString *msg; /**< message buffer */ rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */ struct rspamd_http_connection *http_conn; /**< HTTP server connection */ diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 397a26ba4..b256c8e1e 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -57,11 +57,10 @@ static struct surbl_ctx *surbl_module_ctx = NULL; -static gint surbl_filter (struct worker_task *task); -static void surbl_test_url (struct worker_task *task, void *user_data); -static void dns_callback (struct rspamd_dns_reply *reply, gpointer arg); -static void process_dns_results (struct worker_task *task, struct suffix_item *suffix, gchar *url, guint32 addr); -static gint urls_command_handler (struct worker_task *task); +static void surbl_test_url (struct worker_task *task, void *user_data); +static void dns_callback (struct rspamd_dns_reply *reply, gpointer arg); +static void process_dns_results (struct worker_task *task, + struct suffix_item *suffix, gchar *url, guint32 addr); #define NO_REGEXP (gpointer)-1 @@ -213,7 +212,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx) { surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx)); - surbl_module_ctx->filter = surbl_filter; surbl_module_ctx->use_redirector = 0; surbl_module_ctx->suffixes = NULL; surbl_module_ctx->surbl_pool = memory_pool_new (memory_pool_get_size ()); @@ -237,8 +235,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx) *ctx = (struct module_ctx *)surbl_module_ctx; - register_protocol_command ("urls", urls_command_handler); - return 0; } @@ -449,7 +445,6 @@ surbl_module_reconfig (struct config_file *cfg) /* Delete pool and objects */ memory_pool_delete (surbl_module_ctx->surbl_pool); /* Reinit module */ - surbl_module_ctx->filter = surbl_filter; surbl_module_ctx->use_redirector = 0; surbl_module_ctx->suffixes = NULL; surbl_module_ctx->surbl_pool = memory_pool_new (memory_pool_get_size ()); @@ -996,17 +991,10 @@ surbl_test_url (struct worker_task *task, void *user_data) param.suffix = suffix; g_tree_foreach (task->urls, surbl_tree_url_callback, ¶m); } - -static gint -surbl_filter (struct worker_task *task) -{ - /* XXX: remove this shit */ - return 0; -} - /* * Handlers of URLS command */ +#if 0 struct urls_tree_cb_data { gchar *buf; gsize len; @@ -1086,7 +1074,7 @@ urls_command_handler (struct worker_task *task) return TRUE; } - +#endif /* * vi:ts=4 */ diff --git a/src/protocol.c b/src/protocol.c index 02ba94e39..3351a5195 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -97,161 +97,109 @@ static GList *custom_commands = NULL; -/* XXX: remove this legacy sometimes */ -static const gchar * -str_action_metric_spamc (enum rspamd_metric_action action) -{ - switch (action) { - case METRIC_ACTION_REJECT: - return "reject"; - case METRIC_ACTION_SOFT_REJECT: - return "soft reject"; - case METRIC_ACTION_REWRITE_SUBJECT: - return "rewrite subject"; - case METRIC_ACTION_ADD_HEADER: - return "add header"; - case METRIC_ACTION_GREYLIST: - return "greylist"; - case METRIC_ACTION_NOACTION: - return "no action"; - case METRIC_ACTION_MAX: - return "invalid max action"; - } - return "unknown action"; -} - -static inline const gchar * -rspamc_proto_str (guint ver) +/* + * Remove <> from the fixed string and copy it to the pool + */ +static gchar * +rspamd_protocol_escape_braces (GString *in) { + gint len = 0; + gchar *orig, *p; - if (G_LIKELY (ver == 12)) { - return "1.2"; + orig = in->str; + while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->str < (gint)in->len) { + orig ++; } - else if (G_UNLIKELY (ver == 11)) { - return "1.1"; - } - else if (G_UNLIKELY (ver == 13)) { - return "1.3"; - } - else if (G_UNLIKELY (ver == 14)) { - return "1.4"; - } - else if (G_UNLIKELY (ver == 15)) { - return "1.5"; - } - else { - return "1.0"; - } -} -gchar * -separate_command (f_str_t * in, gchar c) -{ - guint r = 0; - gchar *p = in->begin, *b; - b = p; + g_string_erase (in, 0, orig - in->str); - while (r < in->len) { - if (*p == c) { - *p = '\0'; - in->begin = p + 1; - in->len -= r + 1; - return b; - } - else if (*p == '\0') { - /* Actually we cannot allow several \0 characters in string, so write to the log about it */ - msg_warn ("cannot separate command with \0 character, this can be an attack attempt"); - return NULL; - } - p++; - r++; + p = orig; + while ((!g_ascii_isspace (*p) && *p != '>') && p - in->str < (gint)in->len) { + p ++; + len ++; } - return NULL; + g_string_truncate (in, len); + + return in->str; } static gboolean -parse_check_command (struct worker_task *task, gchar *token) +rspamd_protocol_handle_url (struct worker_task *task, struct rspamd_http_message *msg) { GList *cur; struct custom_command *cmd; + const gchar *p; + + if (msg->url == NULL || msg->url->len == 0) { + task->last_error = "command is absent"; + task->error_code = 400; + return FALSE; + } + + if (msg->url->str[0] == '/') { + p = &msg->url->str[1]; + } + else { + p = msg->url->str; + } - switch (token[0]) { + switch (*p) { case 'c': case 'C': /* check */ - if (g_ascii_strcasecmp (token + 1, MSG_CMD_CHECK + 1) == 0) { + if (g_ascii_strcasecmp (p + 1, MSG_CMD_CHECK + 1) == 0) { task->cmd = CMD_CHECK; } else { - debug_task ("bad command: %s", token); - return FALSE; + goto err; } break; case 's': case 'S': /* symbols, skip */ - if (g_ascii_strcasecmp (token + 1, MSG_CMD_SYMBOLS + 1) == 0) { + if (g_ascii_strcasecmp (p + 1, MSG_CMD_SYMBOLS + 1) == 0) { task->cmd = CMD_SYMBOLS; } - else if (g_ascii_strcasecmp (token + 1, MSG_CMD_SKIP + 1) == 0) { + else if (g_ascii_strcasecmp (p + 1, MSG_CMD_SKIP + 1) == 0) { task->cmd = CMD_SKIP; } else { - debug_task ("bad command: %s", token); - return FALSE; + goto err; } break; case 'p': case 'P': /* ping, process */ - if (g_ascii_strcasecmp (token + 1, MSG_CMD_PING + 1) == 0) { + if (g_ascii_strcasecmp (p + 1, MSG_CMD_PING + 1) == 0) { task->cmd = CMD_PING; } - else if (g_ascii_strcasecmp (token + 1, MSG_CMD_PROCESS + 1) == 0) { + else if (g_ascii_strcasecmp (p + 1, MSG_CMD_PROCESS + 1) == 0) { task->cmd = CMD_PROCESS; } else { - debug_task ("bad command: %s", token); - return FALSE; + goto err; } break; case 'r': case 'R': /* report, report_ifspam */ - if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT + 1) == 0) { + if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT + 1) == 0) { task->cmd = CMD_REPORT; } - else if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) { + else if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) { task->cmd = CMD_REPORT_IFSPAM; } else { - debug_task ("bad command: %s", token); - return FALSE; - } - break; - case 'l': - case 'L': - if (g_ascii_strcasecmp (token + 1, MSG_CMD_LEARN + 1) == 0) { - if (task->allow_learn) { - task->cmd = CMD_LEARN; - } - else { - msg_info ("learning is disabled"); - return FALSE; - } - } - else { - debug_task ("bad command: %s", token); - return FALSE; + goto err; } break; default: cur = custom_commands; while (cur) { cmd = cur->data; - if (g_ascii_strcasecmp (token, cmd->name) == 0) { + if (g_ascii_strcasecmp (p, cmd->name) == 0) { task->cmd = CMD_OTHER; task->custom_cmd = cmd; break; @@ -260,402 +208,191 @@ parse_check_command (struct worker_task *task, gchar *token) } if (cur == NULL) { - debug_task ("bad command: %s", token); - return FALSE; + goto err; } break; } return TRUE; + +err: + debug_task ("bad command: %s", p); + task->last_error = "invalid command"; + task->error_code = 400; + return FALSE; } static gboolean -parse_rspamc_command (struct worker_task *task, f_str_t * line) +rspamd_protocol_handle_headers (struct worker_task *task, struct rspamd_http_message *msg) { - gchar *token; + gchar *headern, *err, *tmp; + gboolean res = TRUE; + struct rspamd_http_header *h; - /* Separate line */ - token = separate_command (line, ' '); - if (line == NULL || token == NULL) { - debug_task ("bad command"); - return FALSE; - } + LL_FOREACH (msg->headers, h) { + headern = h->name->str; - if (!parse_check_command (task, token)) { - return FALSE; - } - - if (g_ascii_strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) { - task->proto = RSPAMC_PROTO; - task->proto_ver = 10; - if (*(line->begin + sizeof (RSPAMC_GREETING) - 1) == '/') { - /* Extract protocol version */ - token = line->begin + sizeof (RSPAMC_GREETING); - if (strncmp (token, RSPAMC_PROTO_1_1, sizeof (RSPAMC_PROTO_1_1) - 1) == 0) { - task->proto_ver = 11; + switch (headern[0]) { + case 'd': + case 'D': + if (g_ascii_strcasecmp (headern, DELIVER_TO_HEADER) == 0) { + task->deliver_to = rspamd_protocol_escape_braces (h->value); + debug_task ("read deliver-to header, value: %s", task->deliver_to); } - else if (strncmp (token, RSPAMC_PROTO_1_2, sizeof (RSPAMC_PROTO_1_2) - 1) == 0) { - task->proto_ver = 12; + else { + debug_task ("wrong header: %s", headern); + res = FALSE; } - else if (strncmp (token, RSPAMC_PROTO_1_3, sizeof (RSPAMC_PROTO_1_3) - 1) == 0) { - task->proto_ver = 13; + break; + case 'h': + case 'H': + if (g_ascii_strcasecmp (headern, HELO_HEADER) == 0) { + task->helo = h->value->str; + debug_task ("read helo header, value: %s", task->helo); } - } - } - else if (g_ascii_strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) - 1) == 0) { - task->proto = SPAMC_PROTO; - task->proto_ver = 12; - } - else { - return FALSE; - } - - task->state = READ_HEADER; - - return TRUE; -} - -static gboolean -parse_http_command (struct worker_task *task, f_str_t * line) -{ - guint8 *p, *end, *c; - gint state = 0, next_state = 0; - gchar *cmd; - - p = line->begin; - c = p; - end = p + line->len; - task->proto = RSPAMC_PROTO; - - while (p < end) { - switch (state) { - case 0: - /* Expect GET or POST here */ - if ((end - p > 3 && - (*p == 'G' || *p == 'g') && - (p[1] == 'E' || p[1] == 'e') && - (p[2] == 'T' || p[2] == 't')) || - (end - p > 4 && - (*p == 'P' || *p == 'p') && - (p[1] == 'O' || p[1] == 'o') && - (p[2] == 'S' || p[2] == 's') && - (p[3] == 'T' || p[3] == 't'))) { - state = 99; - next_state = 1; - p += (*p == 'g' || *p == 'G') ? 3 : 4; + else if (g_ascii_strcasecmp (headern, HOSTNAME_HEADER) == 0) { + task->hostname = h->value->str; + debug_task ("read hostname header, value: %s", task->hostname); } else { - msg_info ("invalid HTTP request: %V", line); - return FALSE; + debug_task ("wrong header: %s", headern); + res = FALSE; } break; - case 1: - /* Get command or path */ - if (!g_ascii_isspace (*p)) { - p ++; + case 'f': + case 'F': + if (g_ascii_strcasecmp (headern, FROM_HEADER) == 0) { + task->from = rspamd_protocol_escape_braces (h->value); + debug_task ("read from header, value: %s", task->from); } else { - /* Copy command */ - cmd = memory_pool_alloc (task->task_pool, p - c + 1); - rspamd_strlcpy (cmd, c, p - c + 1); - /* Skip the first '/' */ - if (*cmd == '/') { - cmd ++; - } - if (!parse_check_command (task, cmd)) { - /* Assume that command is symbols */ - task->cmd = CMD_SYMBOLS; - } - state = 99; - next_state = 2; + debug_task ("wrong header: %s", headern); + res = FALSE; } break; - case 2: - /* Get HTTP/1.0 or HTTP/1.1 */ - if (p == end - 1) { - /* We are at the end */ - if (g_ascii_strncasecmp (c, "HTTP/1.0", sizeof ("HTTP/1.0") - 1) == 0 || - g_ascii_strncasecmp (c, "HTTP/1.1", sizeof ("HTTP/1.1") - 1) == 0) { - task->state = READ_HEADER; - return TRUE; - } + case 'j': + case 'J': + if (g_ascii_strcasecmp (headern, JSON_HEADER) == 0) { + task->is_json = parse_flag (h->value->str); } else { - p ++; + debug_task ("wrong header: %s", headern); + res = FALSE; } break; - case 99: - /* Skip spaces */ - if (g_ascii_isspace (*p)) { - p ++; + case 'q': + case 'Q': + if (g_ascii_strcasecmp (headern, QUEUE_ID_HEADER) == 0) { + task->queue_id = h->value->str; + debug_task ("read queue_id header, value: %s", task->queue_id); } else { - state = next_state; - c = p; + debug_task ("wrong header: %s", headern); + res = FALSE; } break; - } - } - - return FALSE; -} - -static gboolean -parse_command (struct worker_task *task, f_str_t * line) -{ - task->proto_ver = 11; - - if (! task->is_http) { - return parse_rspamc_command (task, line); - } - else { - return parse_http_command (task, line); - } - - /* Unreached */ - return FALSE; -} - -static gboolean -parse_header (struct worker_task *task, f_str_t * line) -{ - gchar *headern, *err, *tmp; - gboolean res = TRUE; - - /* Check end of headers */ - if (line->len == 0) { - debug_task ("got empty line, assume it as end of headers"); - if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) { - task->state = WRITE_REPLY; - } - else { - if (task->content_length > 0) { - if (task->cmd == CMD_LEARN) { - if (task->statfile != NULL) { - rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length); - task->state = READ_MESSAGE; - } - else { - task->last_error = "Unknown statfile"; - task->error_code = RSPAMD_STATFILE_ERROR; - task->state = WRITE_ERROR; - return FALSE; - } - } - else { - rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length); - task->state = READ_MESSAGE; - task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t)); - } + case 'r': + case 'R': + if (g_ascii_strcasecmp (headern, RCPT_HEADER) == 0) { + tmp = rspamd_protocol_escape_braces (h->value); + task->rcpt = g_list_prepend (task->rcpt, tmp); + debug_task ("read rcpt header, value: %s", tmp); } - else if (task->cmd != CMD_LEARN && task->cmd != CMD_OTHER) { - rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_ANY, 0); - task->state = READ_MESSAGE; - task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t)); + else if (g_ascii_strcasecmp (headern, NRCPT_HEADER) == 0) { + task->nrcpt = strtoul (h->value->str, &err, 10); + debug_task ("read rcpt header, value: %d", (gint)task->nrcpt); } else { - task->last_error = "Unknown content length"; - task->error_code = RSPAMD_LENGTH_ERROR; - task->state = WRITE_ERROR; - return FALSE; + msg_info ("wrong header: %s", headern); + res = FALSE; } - } - return TRUE; - } - - headern = separate_command (line, ':'); - - if (line == NULL || headern == NULL) { - return FALSE; - } - /* Eat whitespaces */ - g_strstrip (headern); - fstrstrip (line); - - switch (headern[0]) { - case 'c': - case 'C': - /* content-length */ - if (g_ascii_strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) { - if (task->content_length == 0) { - tmp = memory_pool_fstrdup (task->task_pool, line); - task->content_length = strtoul (tmp, &err, 10); - debug_task ("read Content-Length header, value: %ul", (guint32)task->content_length); - } - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'd': - case 'D': - /* Deliver-To */ - if (g_ascii_strncasecmp (headern, DELIVER_TO_HEADER, sizeof (DELIVER_TO_HEADER) - 1) == 0) { - task->deliver_to = escape_braces_addr_fstr (task->task_pool, line); - debug_task ("read deliver-to header, value: %s", task->deliver_to); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'h': - case 'H': - /* helo */ - if (g_ascii_strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) { - task->helo = memory_pool_fstrdup (task->task_pool, line); - debug_task ("read helo header, value: %s", task->helo); - } - else if (g_ascii_strncasecmp (headern, HOSTNAME_HEADER, sizeof (HOSTNAME_HEADER) - 1) == 0) { - task->hostname = memory_pool_fstrdup (task->task_pool, line); - debug_task ("read hostname header, value: %s", task->hostname); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'f': - case 'F': - /* from */ - if (g_ascii_strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) { - task->from = escape_braces_addr_fstr (task->task_pool, line); - debug_task ("read from header, value: %s", task->from); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'j': - case 'J': - /* json */ - if (g_ascii_strncasecmp (headern, JSON_HEADER, sizeof (JSON_HEADER) - 1) == 0) { - task->is_json = parse_flag (memory_pool_fstrdup (task->task_pool, line)); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'q': - case 'Q': - /* Queue id */ - if (g_ascii_strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) { - task->queue_id = memory_pool_fstrdup (task->task_pool, line); - debug_task ("read queue_id header, value: %s", task->queue_id); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'r': - case 'R': - /* rcpt */ - if (g_ascii_strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) { - tmp = escape_braces_addr_fstr (task->task_pool, line); - task->rcpt = g_list_prepend (task->rcpt, tmp); - debug_task ("read rcpt header, value: %s", tmp); - } - else if (g_ascii_strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) { - tmp = memory_pool_fstrdup (task->task_pool, line); - task->nrcpt = strtoul (tmp, &err, 10); - debug_task ("read rcpt header, value: %d", (gint)task->nrcpt); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'i': - case 'I': - /* ip_addr */ - if (g_ascii_strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) { - tmp = memory_pool_fstrdup (task->task_pool, line); + break; + case 'i': + case 'I': + if (g_ascii_strcasecmp (headern, IP_ADDR_HEADER) == 0) { + tmp = h->value->str; #ifdef HAVE_INET_PTON - if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) { - if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) { - task->from_addr.ipv6 = TRUE; - } - else { - msg_err ("bad ip header: '%s'", tmp); - return FALSE; - } - task->from_addr.has_addr = TRUE; - } - else { - if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) { - /* Try ipv6 */ - if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) { + if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) { + if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) { task->from_addr.ipv6 = TRUE; } else { msg_err ("bad ip header: '%s'", tmp); return FALSE; } + task->from_addr.has_addr = TRUE; } else { - task->from_addr.ipv6 = FALSE; + if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) { + /* Try ipv6 */ + if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) { + task->from_addr.ipv6 = TRUE; + } + else { + msg_err ("bad ip header: '%s'", tmp); + return FALSE; + } + } + else { + task->from_addr.ipv6 = FALSE; + } + task->from_addr.has_addr = TRUE; } - task->from_addr.has_addr = TRUE; - } #else - if (!inet_aton (tmp, &task->from_addr)) { - msg_err ("bad ip header: '%s'", tmp); - return FALSE; - } + if (!inet_aton (tmp, &task->from_addr)) { + msg_err ("bad ip header: '%s'", tmp); + return FALSE; + } #endif - debug_task ("read IP header, value: %s", tmp); - } - else { - msg_info ("wrong header: %s", headern); - res = FALSE; - } - break; - case 'p': - case 'P': - /* Pass header */ - if (g_ascii_strncasecmp (headern, PASS_HEADER, sizeof (PASS_HEADER) - 1) == 0) { - if (line->len == sizeof ("all") - 1 && g_ascii_strncasecmp (line->begin, "all", sizeof ("all") - 1) == 0) { - task->pass_all_filters = TRUE; - msg_info ("pass all filters"); - } - } - else { - res = FALSE; - } - break; - case 's': - case 'S': - if (g_ascii_strncasecmp (headern, SUBJECT_HEADER, sizeof (SUBJECT_HEADER) - 1) == 0) { - task->subject = memory_pool_fstrdup (task->task_pool, line); - } - else if (g_ascii_strncasecmp (headern, STATFILE_HEADER, sizeof (STATFILE_HEADER) - 1) == 0) { - task->statfile = memory_pool_fstrdup (task->task_pool, line); - } - else { - res = FALSE; - } - break; - case 'u': - case 'U': - if (g_ascii_strncasecmp (headern, USER_HEADER, sizeof (USER_HEADER) - 1) == 0) { - task->user = memory_pool_fstrdup (task->task_pool, line); - } - else { + debug_task ("read IP header, value: %s", tmp); + } + else { + debug_task ("wrong header: %s", headern); + res = FALSE; + } + break; + case 'p': + case 'P': + if (g_ascii_strcasecmp (headern, PASS_HEADER) == 0) { + if (h->value->len == sizeof ("all") - 1 && + g_ascii_strcasecmp (h->value->str, "all") == 0) { + task->pass_all_filters = TRUE; + debug_task ("pass all filters"); + } + } + else { + res = FALSE; + } + break; + case 's': + case 'S': + if (g_ascii_strcasecmp (headern, SUBJECT_HEADER) == 0) { + task->subject = h->value->str; + } + else { + res = FALSE; + } + break; + case 'u': + case 'U': + if (g_ascii_strcasecmp (headern, USER_HEADER) == 0) { + task->user = h->value->str; + } + else { + res = FALSE; + } + break; + default: + debug_task ("wrong header: %s", headern); res = FALSE; + break; } - break; - default: - msg_info ("wrong header: %s", headern); - res = FALSE; - break; } if (!res && task->cfg->strict_protocol_headers) { msg_err ("deny processing of a request with incorrect or unknown headers"); + task->last_error = "invalid header"; + task->error_code = 400; return FALSE; } @@ -663,18 +400,13 @@ parse_header (struct worker_task *task, f_str_t * line) } gboolean -read_rspamd_input_line (struct worker_task *task, f_str_t * line) +rspamd_protocol_handle_request (struct worker_task *task, + struct rspamd_http_message *msg) { - switch (task->state) { - case READ_COMMAND: - return parse_command (task, line); - break; - case READ_HEADER: - return parse_header (task, line); - break; - default: - return FALSE; + if (rspamd_protocol_handle_url (task, msg)) { + return rspamd_protocol_handle_headers (task, msg); } + return FALSE; } @@ -878,7 +610,17 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres, if (logbuf->str[logbuf->len - 1] == ',') { logbuf->len --; } - rspamd_printf_gstring (logbuf, "]), "); + +#ifdef HAVE_CLOCK_GETTIME + rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,", + task->msg->len, calculate_check_time (&task->tv, &task->ts, + task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests); +#else + rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,", + task->msg->len, + calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds), + task->dns_requests); +#endif return obj; } @@ -992,10 +734,12 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task) write_hashes_to_log (task, logbuf); msg_info ("%v", logbuf); + g_string_free (logbuf, TRUE); msg->body = g_string_sized_new (BUFSIZ); func.ud = msg->body; ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func); + ucl_object_unref (top); /* Increase counters */ task->worker->srv->stat->messages_scanned++; @@ -1006,7 +750,7 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task) } gboolean -write_reply (struct worker_task *task) +rspamd_protocol_write_reply (struct worker_task *task) { struct rspamd_http_message *msg; @@ -1038,13 +782,6 @@ write_reply (struct worker_task *task) "text/plain", task, task->sock, &task->tv, task->ev_base); task->state = CLOSING_CONNECTION; break; - case CMD_LEARN: - msg->code = task->error_code; - rspamd_http_connection_write_message (task->http_conn, msg, NULL, - "text/plain", task, task->sock, &task->tv, task->ev_base); - task->state = CLOSING_CONNECTION; - return TRUE; - break; case CMD_OTHER: task->state = CLOSING_CONNECTION; return task->custom_cmd->func (task); diff --git a/src/protocol.h b/src/protocol.h index 72460940f..4b62fdfb2 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -8,6 +8,7 @@ #include "config.h" #include "filter.h" +#include "http.h" #define RSPAMD_FILTER_ERROR 1 #define RSPAMD_NETWORK_ERROR 2 @@ -15,27 +16,9 @@ #define RSPAMD_LENGTH_ERROR 4 #define RSPAMD_STATFILE_ERROR 5 -#define RSPAMC_PROTO_1_0 "1.0" -#define RSPAMC_PROTO_1_1 "1.1" -#define RSPAMC_PROTO_1_2 "1.2" -#define RSPAMC_PROTO_1_3 "1.3" - -/* - * Reply messages - */ -#define RSPAMD_REPLY_BANNER "RSPAMD" -#define SPAMD_REPLY_BANNER "SPAMD" -#define SPAMD_OK "EX_OK" -/* XXX: try to convert rspamd errors to spamd errors */ -#define SPAMD_ERROR "EX_ERROR" - struct worker_task; struct metric; -enum rspamd_protocol { - SPAMC_PROTO, - RSPAMC_PROTO, -}; enum rspamd_command { CMD_CHECK, @@ -45,8 +28,7 @@ enum rspamd_command { CMD_SKIP, CMD_PING, CMD_PROCESS, - CMD_LEARN, - CMD_OTHER, + CMD_OTHER }; @@ -58,27 +40,19 @@ struct custom_command { }; /** - * Find a character in command in and return pointer to the first part of the string, in is modified to point to the second part of string - * @param in f_str_t input - * @param c separator character - * @return pointer to the first part of string or NULL if there is no separator found - */ -gchar* separate_command (f_str_t * in, gchar c); - -/** - * Read one line of user's input for specified task - * @param task task object - * @param line line of user's input - * @return 0 if line was successfully parsed and -1 if we have protocol error + * Process HTTP request to the task structure + * @param task + * @param msg + * @return */ -gboolean read_rspamd_input_line (struct worker_task *task, f_str_t *line); +gboolean rspamd_protocol_handle_request (struct worker_task *task, struct rspamd_http_message *msg); /** * Write reply for specified task command * @param task task object * @return 0 if we wrote reply and -1 if there was some error */ -gboolean write_reply (struct worker_task *task) G_GNUC_WARN_UNUSED_RESULT; +gboolean rspamd_protocol_write_reply (struct worker_task *task); /** diff --git a/src/util.c b/src/util.c index 4a88bb1c9..65f3b106b 100644 --- a/src/util.c +++ b/src/util.c @@ -1409,29 +1409,6 @@ compare_url_func (gconstpointer a, gconstpointer b) return r; } -gchar * -escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in) -{ - gint len = 0; - gchar *res, *orig, *p; - - orig = in->begin; - while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->begin < (gint)in->len) { - orig ++; - } - - p = orig; - while ((!g_ascii_isspace (*p) && *p != '>') && p - in->begin < (gint)in->len) { - p ++; - len ++; - } - - res = memory_pool_alloc (pool, len + 1); - rspamd_strlcpy (res, orig, len + 1); - - return res; -} - /* * Find the first occurrence of find in s, ignore case. */ diff --git a/src/util.h b/src/util.h index 2951942e3..9b7891c05 100644 --- a/src/util.h +++ b/src/util.h @@ -219,11 +219,6 @@ gsize rspamd_strlcpy (gchar *dst, const gchar *src, gsize siz); gsize rspamd_strlcpy_tolower (gchar *dst, const gchar *src, gsize siz); /* - * Strip <> from email address - */ -gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in); - -/* * Convert milliseconds to timeval fields */ #define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0) diff --git a/src/worker.c b/src/worker.c index f2c579fdc..4199a809f 100644 --- a/src/worker.c +++ b/src/worker.c @@ -89,8 +89,6 @@ struct rspamd_worker_ctx { struct event_base *ev_base; }; -static gboolean write_socket (void *arg); - static sig_atomic_t wanna_die = 0; #ifndef HAVE_SA_SIGINFO @@ -155,242 +153,6 @@ sigusr1_handler (gint fd, short what, void *arg) return; } -# if 0 -/* - * Callback that is called when there is data to read in buffer - */ -static gboolean -read_socket (f_str_t * in, void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; - ssize_t r; - GError *err = NULL; - - ctx = task->worker->ctx; - switch (task->state) { - case READ_COMMAND: - case READ_HEADER: - if (!read_rspamd_input_line (task, in)) { - if (!task->last_error) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - } - task->state = WRITE_ERROR; - } - if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { - return write_socket (task); - } - break; - case READ_MESSAGE: - /* Allow half-closed connections to be proceed */ - - debug_task ("got string of length %z", task->msg->len); - if (task->content_length > 0) { - task->msg->begin = in->begin; - task->msg->len = in->len; - task->state = WAIT_FILTER; - task->dispatcher->want_read = FALSE; - } - else { - task->dispatcher->want_read = FALSE; - if (in->len > 0) { - if (task->msg->begin == NULL) { - /* Allocate buf */ - task->msg->size = MAX (BUFSIZ, in->len); - task->msg->begin = g_malloc (task->msg->size); - memcpy (task->msg->begin, in->begin, in->len); - task->msg->len = in->len; - } - else if (task->msg->size >= task->msg->len + in->len) { - memcpy (task->msg->begin + task->msg->len, in->begin, in->len); - task->msg->len += in->len; - } - else { - /* Need to realloc */ - task->msg->size = MAX (task->msg->size * 2, task->msg->size + in->len); - task->msg->begin = g_realloc (task->msg->begin, task->msg->size); - memcpy (task->msg->begin + task->msg->len, in->begin, in->len); - task->msg->len += in->len; - } - /* Want more */ - return TRUE; - } - else if (task->msg->len > 0) { - memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_free, task->msg->begin); - } - else { - msg_warn ("empty message passed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - } - - r = process_message (task); - if (r == -1) { - msg_warn ("processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - if (task->cmd == CMD_OTHER) { - /* Skip filters */ - task->state = WRITE_REPLY; - return write_socket (task); - } - else if (task->cmd == CMD_LEARN) { - if (!learn_task (task->statfile, task, &err)) { - task->last_error = memory_pool_strdup (task->task_pool, err->message); - task->error_code = err->code; - g_error_free (err); - task->state = WRITE_ERROR; - } - else { - task->last_error = "learn ok"; - task->error_code = 0; - task->state = WRITE_REPLY; - } - return write_socket (task); - } - else { - if (task->cfg->pre_filters == NULL) { - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - } - } - if (task->is_skipped) { - /* Call write_socket to write reply and exit */ - return write_socket (task); - } - } - else { - lua_call_pre_filters (task); - /* We want fin_task after pre filters are processed */ - task->s->wanna_die = TRUE; - task->state = WAIT_PRE_FILTER; - check_session_pending (task->s); - } - } - break; - case WRITE_REPLY: - case WRITE_ERROR: - return write_socket (task); - break; - case WAIT_FILTER: - case WAIT_POST_FILTER: - case WAIT_PRE_FILTER: - msg_info ("ignoring trailing garbadge of size %z", in->len); - break; - default: - debug_task ("invalid state on reading stage"); - break; - } - - return TRUE; -} - -/* - * Callback for socket writing - */ -static gboolean -write_socket (void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; - GError *err = NULL; - gint r; - - ctx = task->worker->ctx; - - switch (task->state) { - case WRITE_REPLY: - task->state = WRITING_REPLY; - if (!write_reply (task)) { - return FALSE; - } - destroy_session (task->s); - return FALSE; - break; - case WRITE_ERROR: - task->state = WRITING_REPLY; - if (!write_reply (task)) { - return FALSE; - } - destroy_session (task->s); - return FALSE; - break; - case CLOSING_CONNECTION: - debug_task ("normally closing connection"); - destroy_session (task->s); - return FALSE; - break; - case WRITING_REPLY: - case WAIT_FILTER: - case WAIT_POST_FILTER: - /* Do nothing here */ - break; - case WAIT_PRE_FILTER: - task->state = WAIT_FILTER; - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - } - } - if (task->is_skipped) { - /* Call write_socket again to write reply and exit */ - return write_socket (task); - } - break; - default: - msg_info ("abnormally closing connection at state: %d", task->state); - destroy_session (task->s); - return FALSE; - break; - } - return TRUE; -} - -/* - * Called if something goes wrong - */ -static void -err_socket (GError * err, void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - - msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); - /* Free buffers */ - g_error_free (err); - destroy_session (task->s); -} -#endif - /* * Called if all filters are processed */ @@ -410,7 +172,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - write_reply (task); + rspamd_protocol_write_reply (task); } return TRUE; } @@ -443,7 +205,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - write_reply (task); + rspamd_protocol_write_reply (task); } } else { @@ -455,7 +217,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - write_reply (task); + rspamd_protocol_write_reply (task); } } else { @@ -465,7 +227,7 @@ fin_task (void *arg) task->last_error = "Filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; - write_reply (task); + rspamd_protocol_write_reply (task); } /* Add task to classify to classify pool */ if (!task->is_skipped && ctx->classify_pool) { @@ -478,7 +240,7 @@ fin_task (void *arg) } } if (task->is_skipped) { - write_reply (task); + rspamd_protocol_write_reply (task); } } } @@ -510,7 +272,7 @@ reduce_tasks_count (gpointer arg) (*tasks) --; } -static gboolean +static gint rspamd_worker_body_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg, const gchar *chunk, gsize len) @@ -524,7 +286,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, if (msg->body->len == 0) { msg_err ("got zero length body, cannot continue"); - return FALSE; + return 0; + } + + if (!rspamd_protocol_handle_request (task, msg)) { + task->state = WRITE_ERROR; + return 0; } task->msg = msg->body; @@ -537,26 +304,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, task->last_error = "MIME processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; - return FALSE; + return 0; } if (task->cmd == CMD_OTHER) { /* Skip filters */ task->state = WRITE_REPLY; - return FALSE; - } - else if (task->cmd == CMD_LEARN) { - if (!learn_task (task->statfile, task, &err)) { - task->last_error = memory_pool_strdup (task->task_pool, err->message); - task->error_code = err->code; - g_error_free (err); - task->state = WRITE_ERROR; - } - else { - task->last_error = "learn ok"; - task->error_code = 0; - task->state = WRITE_REPLY; - } - return FALSE; + return 0; } else { if (task->cfg->pre_filters == NULL) { @@ -565,7 +318,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, task->last_error = "Filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; - return FALSE; + return 0; } /* Add task to classify to classify pool */ if (!task->is_skipped && ctx->classify_pool) { @@ -578,7 +331,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, } if (task->is_skipped) { /* Call write_socket to write reply and exit */ - return TRUE; + task->state = WRITE_REPLY; + return 0; } } else { @@ -589,7 +343,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, check_session_pending (task->s); } } - return TRUE; + return 0; } static void @@ -611,6 +365,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr)); destroy_session (task->s); } + else { + check_session_pending (task->s); + } } /* @@ -667,7 +424,6 @@ accept_socket (gint fd, short what, void *arg) new_task->sock = nfd; new_task->is_mime = ctx->is_mime; new_task->is_json = ctx->is_json; - new_task->is_http = ctx->is_http; new_task->allow_learn = ctx->allow_learn; worker->srv->stat->connections_count++; @@ -779,7 +535,7 @@ start_worker (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); - + g_mime_shutdown (); close_log (rspamd_main->logger); exit (EXIT_SUCCESS); } |