]> source.dussan.org Git - rspamd.git/commitdiff
Parse HTTP requests, cleanup the code.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 18 Jan 2014 18:20:54 +0000 (18:20 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 18 Jan 2014 18:20:54 +0000 (18:20 +0000)
--HG--
extra : rebase_source : 6b35fbf55fc9fe65d7f033620670bb210928e9b4

src/cfg_utils.c
src/controller.c
src/main.h
src/plugins/surbl.c
src/protocol.c
src/protocol.h
src/util.c
src/util.h
src/worker.c

index 9216fb6ef3f2940b12863bc919c0d4361704ab95..3e1503db7feaa118a3e928dd7e6a6866b56059dd 100644 (file)
@@ -892,11 +892,13 @@ read_rspamd_config (struct config_file *cfg, const gchar *filename,
                rspamd_ucl_add_conf_macros (parser, cfg);
                if (!ucl_parser_add_chunk (parser, data, st.st_size)) {
                        msg_err ("ucl parser error: %s", ucl_parser_get_error (parser));
+                       ucl_parser_free (parser);
                        munmap (data, st.st_size);
                        return FALSE;
                }
                munmap (data, st.st_size);
                cfg->rcl_obj = ucl_parser_get_object (parser);
+               ucl_parser_free (parser);
                res = TRUE;
        }
 
index a43b1420326a254b7e7a6c6c146cc5a837d8e7ec..097ed0e7c69dc1ffc40e81038468bb53c071cd46 100644 (file)
@@ -1226,7 +1226,8 @@ process_header (f_str_t *line, struct controller_session *session)
        struct rspamd_controller_ctx    *ctx = session->worker->ctx;
        controller_func_t                                custom_handler;
 
-       headern = separate_command (line, ':');
+       /* XXX: temporary workaround */
+       headern = NULL;
 
        if (line == NULL || headern == NULL) {
                msg_warn ("bad header: %V", line);
index 81cdfb9f1739cde76697814ede89b49f27a5b063..ea1172468c4ba2d4363fc9e8275d47fc8fc6caa3 100644 (file)
@@ -187,15 +187,11 @@ struct worker_task {
                CLOSING_CONNECTION,
                WRITING_REPLY
        } state;                                                                                                        /**< current session state                                                      */
-       size_t content_length;                                                                          /**< length of user's input                                                     */
-       enum rspamd_protocol proto;                                                                     /**< protocol (rspamc or spamc)                                         */
-       guint proto_ver;                                                                                        /**< protocol version                                                           */
        enum rspamd_command cmd;                                                                        /**< command                                                                            */
        struct custom_command *custom_cmd;                                                      /**< custom command if any                                                      */      
        gint sock;                                                                                                      /**< socket descriptor                                                          */
        gboolean is_mime;                                           /**< if this task is mime task                      */
        gboolean is_json;                                                                                       /**< output is JSON                                                                     */
-       gboolean is_http;                                                                                       /**< output is HTTP                                                                     */
        gboolean allow_learn;                                                                           /**< allow learning                                                                     */
        gboolean is_skipped;                                        /**< whether message was skipped by configuration   */
 
@@ -203,7 +199,7 @@ struct worker_task {
        gchar *from;                                                                                                    /**< from header value                                                          */
        gchar *queue_id;                                                                                                /**< queue id if specified                                                      */
        const gchar *message_id;                                                                                /**< message id                                                                         */
-       GList *rcpt;                                                                                            /**< recipients list                                                            */
+       GList *rcpt;                                                                                                    /**< recipients list                                                            */
        guint nrcpt;                                                                                    /**< number of recipients                                                       */
 #ifdef HAVE_INET_PTON
        struct {
@@ -222,7 +218,6 @@ struct worker_task {
        gchar *user;                                                                                                    /**< user to deliver                                                            */
        gchar *subject;                                                                                         /**< subject (for non-mime)                                                     */
        gchar *hostname;                                                                                        /**< hostname reported by MTA                                           */
-       gchar *statfile;                                                                                        /**< statfile for learning                                                      */
        GString *msg;                                                                                           /**< message buffer                                                                     */
        rspamd_io_dispatcher_t *dispatcher;                                                     /**< IO dispatcher object                                                       */
        struct rspamd_http_connection *http_conn;                                       /**< HTTP server connection                                                     */
index 397a26ba4d79f8e95b27b707e44c8c8ddc581f0e..b256c8e1e1c3f4d946fe60d41287ed5f029d45a9 100644 (file)
 
 static struct surbl_ctx        *surbl_module_ctx = NULL;
 
-static gint                     surbl_filter (struct worker_task *task);
-static void                     surbl_test_url (struct worker_task *task, void *user_data);
-static void                     dns_callback (struct rspamd_dns_reply *reply, gpointer arg);
-static void                     process_dns_results (struct worker_task *task, struct suffix_item *suffix, gchar *url, guint32 addr);
-static gint                     urls_command_handler (struct worker_task *task);
+static void surbl_test_url (struct worker_task *task, void *user_data);
+static void dns_callback (struct rspamd_dns_reply *reply, gpointer arg);
+static void process_dns_results (struct worker_task *task,
+               struct suffix_item *suffix, gchar *url, guint32 addr);
 
 
 #define NO_REGEXP (gpointer)-1
@@ -213,7 +212,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
 {
        surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx));
 
-       surbl_module_ctx->filter = surbl_filter;
        surbl_module_ctx->use_redirector = 0;
        surbl_module_ctx->suffixes = NULL;
        surbl_module_ctx->surbl_pool = memory_pool_new (memory_pool_get_size ());
@@ -237,8 +235,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
 
        *ctx = (struct module_ctx *)surbl_module_ctx;
 
-       register_protocol_command ("urls", urls_command_handler);
-
        return 0;
 }
 
@@ -449,7 +445,6 @@ surbl_module_reconfig (struct config_file *cfg)
        /* Delete pool and objects */
        memory_pool_delete (surbl_module_ctx->surbl_pool);
        /* Reinit module */
-       surbl_module_ctx->filter = surbl_filter;
        surbl_module_ctx->use_redirector = 0;
        surbl_module_ctx->suffixes = NULL;
        surbl_module_ctx->surbl_pool = memory_pool_new (memory_pool_get_size ());
@@ -996,17 +991,10 @@ surbl_test_url (struct worker_task *task, void *user_data)
        param.suffix = suffix;
        g_tree_foreach (task->urls, surbl_tree_url_callback, &param);
 }
-
-static gint
-surbl_filter (struct worker_task *task)
-{
-       /* XXX: remove this shit */
-       return 0;
-}
-
 /*
  * Handlers of URLS command
  */
+#if 0
 struct urls_tree_cb_data {
        gchar                          *buf;
        gsize                           len;
@@ -1086,7 +1074,7 @@ urls_command_handler (struct worker_task *task)
 
        return TRUE;
 }
-
+#endif
 /*
  * vi:ts=4 
  */
index 02ba94e394074a90333e1923b39a20bee00329e2..3351a51951f83f086654512a1c22acc80602081c 100644 (file)
 
 static GList                   *custom_commands = NULL;
 
-/* XXX: remove this legacy sometimes */
-static const gchar *
-str_action_metric_spamc (enum rspamd_metric_action action)
-{
-       switch (action) {
-       case METRIC_ACTION_REJECT:
-               return "reject";
-       case METRIC_ACTION_SOFT_REJECT:
-               return "soft reject";
-       case METRIC_ACTION_REWRITE_SUBJECT:
-               return "rewrite subject";
-       case METRIC_ACTION_ADD_HEADER:
-               return "add header";
-       case METRIC_ACTION_GREYLIST:
-               return "greylist";
-       case METRIC_ACTION_NOACTION:
-               return "no action";
-       case METRIC_ACTION_MAX:
-               return "invalid max action";
-       }
 
-       return "unknown action";
-}
-
-static inline const gchar *
-rspamc_proto_str (guint ver)
+/*
+ * Remove <> from the fixed string and copy it to the pool
+ */
+static gchar *
+rspamd_protocol_escape_braces (GString *in)
 {
+       gint                          len = 0;
+       gchar                        *orig, *p;
 
-       if (G_LIKELY (ver == 12)) {
-               return "1.2";
+       orig = in->str;
+       while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->str < (gint)in->len) {
+               orig ++;
        }
-       else if (G_UNLIKELY (ver == 11)) {
-               return "1.1";
-       }
-       else if (G_UNLIKELY (ver == 13)) {
-               return "1.3";
-       }
-       else if (G_UNLIKELY (ver == 14)) {
-               return "1.4";
-       }
-       else if (G_UNLIKELY (ver == 15)) {
-               return "1.5";
-       }
-       else {
-               return "1.0";
-       }
-}
 
-gchar                    *
-separate_command (f_str_t * in, gchar c)
-{
-       guint                            r = 0;
-       gchar                           *p = in->begin, *b;
-       b = p;
+       g_string_erase (in, 0, orig - in->str);
 
-       while (r < in->len) {
-               if (*p == c) {
-                       *p = '\0';
-                       in->begin = p + 1;
-                       in->len -= r + 1;
-                       return b;
-               }
-               else if (*p == '\0') {
-                       /* Actually we cannot allow several \0 characters in string, so write to the log about it */
-                       msg_warn ("cannot separate command with \0 character, this can be an attack attempt");
-                       return NULL;
-               }
-               p++;
-               r++;
+       p = orig;
+       while ((!g_ascii_isspace (*p) && *p != '>') && p - in->str < (gint)in->len) {
+               p ++;
+               len ++;
        }
 
-       return NULL;
+       g_string_truncate (in, len);
+
+       return in->str;
 }
 
 static gboolean
-parse_check_command (struct worker_task *task, gchar *token)
+rspamd_protocol_handle_url (struct worker_task *task, struct rspamd_http_message *msg)
 {
        GList                          *cur;
        struct custom_command          *cmd;
+       const gchar *p;
+
+       if (msg->url == NULL || msg->url->len == 0) {
+               task->last_error = "command is absent";
+               task->error_code = 400;
+               return FALSE;
+       }
+
+       if (msg->url->str[0] == '/') {
+               p = &msg->url->str[1];
+       }
+       else {
+               p = msg->url->str;
+       }
 
-       switch (token[0]) {
+       switch (*p) {
        case 'c':
        case 'C':
                /* check */
-               if (g_ascii_strcasecmp (token + 1, MSG_CMD_CHECK + 1) == 0) {
+               if (g_ascii_strcasecmp (p + 1, MSG_CMD_CHECK + 1) == 0) {
                        task->cmd = CMD_CHECK;
                }
                else {
-                       debug_task ("bad command: %s", token);
-                       return FALSE;
+                       goto err;
                }
                break;
        case 's':
        case 'S':
                /* symbols, skip */
-               if (g_ascii_strcasecmp (token + 1, MSG_CMD_SYMBOLS + 1) == 0) {
+               if (g_ascii_strcasecmp (p + 1, MSG_CMD_SYMBOLS + 1) == 0) {
                        task->cmd = CMD_SYMBOLS;
                }
-               else if (g_ascii_strcasecmp (token + 1, MSG_CMD_SKIP + 1) == 0) {
+               else if (g_ascii_strcasecmp (p + 1, MSG_CMD_SKIP + 1) == 0) {
                        task->cmd = CMD_SKIP;
                }
                else {
-                       debug_task ("bad command: %s", token);
-                       return FALSE;
+                       goto err;
                }
                break;
        case 'p':
        case 'P':
                /* ping, process */
-               if (g_ascii_strcasecmp (token + 1, MSG_CMD_PING + 1) == 0) {
+               if (g_ascii_strcasecmp (p + 1, MSG_CMD_PING + 1) == 0) {
                        task->cmd = CMD_PING;
                }
-               else if (g_ascii_strcasecmp (token + 1, MSG_CMD_PROCESS + 1) == 0) {
+               else if (g_ascii_strcasecmp (p + 1, MSG_CMD_PROCESS + 1) == 0) {
                        task->cmd = CMD_PROCESS;
                }
                else {
-                       debug_task ("bad command: %s", token);
-                       return FALSE;
+                       goto err;
                }
                break;
        case 'r':
        case 'R':
                /* report, report_ifspam */
-               if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT + 1) == 0) {
+               if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT + 1) == 0) {
                        task->cmd = CMD_REPORT;
                }
-               else if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
+               else if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
                        task->cmd = CMD_REPORT_IFSPAM;
                }
                else {
-                       debug_task ("bad command: %s", token);
-                       return FALSE;
-               }
-               break;
-       case 'l':
-       case 'L':
-               if (g_ascii_strcasecmp (token + 1, MSG_CMD_LEARN + 1) == 0) {
-                       if (task->allow_learn) {
-                               task->cmd = CMD_LEARN;
-                       }
-                       else {
-                               msg_info ("learning is disabled");
-                               return FALSE;
-                       }
-               }
-               else {
-                       debug_task ("bad command: %s", token);
-                       return FALSE;
+                       goto err;
                }
                break;
        default:
                cur = custom_commands;
                while (cur) {
                        cmd = cur->data;
-                       if (g_ascii_strcasecmp (token, cmd->name) == 0) {
+                       if (g_ascii_strcasecmp (p, cmd->name) == 0) {
                                task->cmd = CMD_OTHER;
                                task->custom_cmd = cmd;
                                break;
@@ -260,402 +208,191 @@ parse_check_command (struct worker_task *task, gchar *token)
                }
 
                if (cur == NULL) {
-                       debug_task ("bad command: %s", token);
-                       return FALSE;
+                       goto err;
                }
                break;
        }
 
        return TRUE;
+
+err:
+       debug_task ("bad command: %s", p);
+       task->last_error = "invalid command";
+       task->error_code = 400;
+       return FALSE;
 }
 
 static gboolean
-parse_rspamc_command (struct worker_task *task, f_str_t * line)
+rspamd_protocol_handle_headers (struct worker_task *task, struct rspamd_http_message *msg)
 {
-       gchar                          *token;
+       gchar                           *headern, *err, *tmp;
+       gboolean                         res = TRUE;
+       struct rspamd_http_header      *h;
 
-       /* Separate line */
-       token = separate_command (line, ' ');
-       if (line == NULL || token == NULL) {
-               debug_task ("bad command");
-               return FALSE;
-       }
+       LL_FOREACH (msg->headers, h) {
+               headern = h->name->str;
 
-       if (!parse_check_command (task, token)) {
-               return FALSE;
-       }
-
-       if (g_ascii_strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
-               task->proto = RSPAMC_PROTO;
-               task->proto_ver = 10;
-               if (*(line->begin + sizeof (RSPAMC_GREETING) - 1) == '/') {
-                       /* Extract protocol version */
-                       token = line->begin + sizeof (RSPAMC_GREETING);
-                       if (strncmp (token, RSPAMC_PROTO_1_1, sizeof (RSPAMC_PROTO_1_1) - 1) == 0) {
-                               task->proto_ver = 11;
+               switch (headern[0]) {
+               case 'd':
+               case 'D':
+                       if (g_ascii_strcasecmp (headern, DELIVER_TO_HEADER) == 0) {
+                               task->deliver_to = rspamd_protocol_escape_braces (h->value);
+                               debug_task ("read deliver-to header, value: %s", task->deliver_to);
                        }
-                       else if (strncmp (token, RSPAMC_PROTO_1_2, sizeof (RSPAMC_PROTO_1_2) - 1) == 0) {
-                               task->proto_ver = 12;
+                       else {
+                               debug_task ("wrong header: %s", headern);
+                               res = FALSE;
                        }
-                       else if (strncmp (token, RSPAMC_PROTO_1_3, sizeof (RSPAMC_PROTO_1_3) - 1) == 0) {
-                               task->proto_ver = 13;
+                       break;
+               case 'h':
+               case 'H':
+                       if (g_ascii_strcasecmp (headern, HELO_HEADER) == 0) {
+                               task->helo = h->value->str;
+                               debug_task ("read helo header, value: %s", task->helo);
                        }
-               }
-       }
-       else if (g_ascii_strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) - 1) == 0) {
-               task->proto = SPAMC_PROTO;
-               task->proto_ver = 12;
-       }
-       else {
-               return FALSE;
-       }
-
-       task->state = READ_HEADER;
-
-       return TRUE;
-}
-
-static gboolean
-parse_http_command (struct worker_task *task, f_str_t * line)
-{
-       guint8                         *p, *end, *c;
-       gint                            state = 0, next_state = 0;
-       gchar                          *cmd;
-
-       p = line->begin;
-       c = p;
-       end = p + line->len;
-       task->proto = RSPAMC_PROTO;
-
-       while (p < end) {
-               switch (state) {
-               case 0:
-                       /* Expect GET or POST here */
-                       if ((end - p > 3 &&
-                                       (*p == 'G' || *p == 'g') &&
-                                       (p[1] == 'E' || p[1] == 'e') &&
-                                       (p[2] == 'T' || p[2] == 't')) ||
-                                       (end - p > 4 &&
-                                       (*p == 'P' || *p == 'p') &&
-                                       (p[1] == 'O' || p[1] == 'o') &&
-                                       (p[2] == 'S' || p[2] == 's') &&
-                                       (p[3] == 'T' || p[3] == 't'))) {
-                               state = 99;
-                               next_state = 1;
-                               p += (*p == 'g' || *p == 'G') ? 3 : 4;
+                       else if (g_ascii_strcasecmp (headern, HOSTNAME_HEADER) == 0) {
+                               task->hostname = h->value->str;
+                               debug_task ("read hostname header, value: %s", task->hostname);
                        }
                        else {
-                               msg_info ("invalid HTTP request: %V", line);
-                               return FALSE;
+                               debug_task ("wrong header: %s", headern);
+                               res = FALSE;
                        }
                        break;
-               case 1:
-                       /* Get command or path */
-                       if (!g_ascii_isspace (*p)) {
-                               p ++;
+               case 'f':
+               case 'F':
+                       if (g_ascii_strcasecmp (headern, FROM_HEADER) == 0) {
+                               task->from = rspamd_protocol_escape_braces (h->value);
+                               debug_task ("read from header, value: %s", task->from);
                        }
                        else {
-                               /* Copy command */
-                               cmd = memory_pool_alloc (task->task_pool, p - c + 1);
-                               rspamd_strlcpy (cmd, c, p - c + 1);
-                               /* Skip the first '/' */
-                               if (*cmd == '/') {
-                                       cmd ++;
-                               }
-                               if (!parse_check_command (task, cmd)) {
-                                       /* Assume that command is symbols */
-                                       task->cmd = CMD_SYMBOLS;
-                               }
-                               state = 99;
-                               next_state = 2;
+                               debug_task ("wrong header: %s", headern);
+                               res = FALSE;
                        }
                        break;
-               case 2:
-                       /* Get HTTP/1.0 or HTTP/1.1 */
-                       if (p == end - 1) {
-                               /* We are at the end */
-                               if (g_ascii_strncasecmp (c, "HTTP/1.0", sizeof ("HTTP/1.0") - 1) == 0 ||
-                                               g_ascii_strncasecmp (c, "HTTP/1.1", sizeof ("HTTP/1.1") - 1) == 0) {
-                                       task->state = READ_HEADER;
-                                       return TRUE;
-                               }
+               case 'j':
+               case 'J':
+                       if (g_ascii_strcasecmp (headern, JSON_HEADER) == 0) {
+                               task->is_json = parse_flag (h->value->str);
                        }
                        else {
-                               p ++;
+                               debug_task ("wrong header: %s", headern);
+                               res = FALSE;
                        }
                        break;
-               case 99:
-                       /* Skip spaces */
-                       if (g_ascii_isspace (*p)) {
-                               p ++;
+               case 'q':
+               case 'Q':
+                       if (g_ascii_strcasecmp (headern, QUEUE_ID_HEADER) == 0) {
+                               task->queue_id = h->value->str;
+                               debug_task ("read queue_id header, value: %s", task->queue_id);
                        }
                        else {
-                               state = next_state;
-                               c = p;
+                               debug_task ("wrong header: %s", headern);
+                               res = FALSE;
                        }
                        break;
-               }
-       }
-
-       return FALSE;
-}
-
-static gboolean
-parse_command (struct worker_task *task, f_str_t * line)
-{
-       task->proto_ver = 11;
-
-       if (! task->is_http) {
-               return parse_rspamc_command (task, line);
-       }
-       else {
-               return parse_http_command (task, line);
-       }
-       
-       /* Unreached */
-       return FALSE;
-}
-
-static gboolean
-parse_header (struct worker_task *task, f_str_t * line)
-{
-       gchar                           *headern, *err, *tmp;
-       gboolean                         res = TRUE;
-
-       /* Check end of headers */
-       if (line->len == 0) {
-               debug_task ("got empty line, assume it as end of headers");
-               if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
-                       task->state = WRITE_REPLY;
-               }
-               else {
-                       if (task->content_length > 0) {
-                               if (task->cmd == CMD_LEARN) {
-                                       if (task->statfile != NULL) {
-                                               rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
-                                               task->state = READ_MESSAGE;
-                                       }
-                                       else {
-                                               task->last_error = "Unknown statfile";
-                                               task->error_code = RSPAMD_STATFILE_ERROR;
-                                               task->state = WRITE_ERROR;
-                                               return FALSE;
-                                       }
-                               }
-                               else {
-                                       rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
-                                       task->state = READ_MESSAGE;
-                                       task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t));
-                               }
+               case 'r':
+               case 'R':
+                       if (g_ascii_strcasecmp (headern, RCPT_HEADER) == 0) {
+                               tmp = rspamd_protocol_escape_braces (h->value);
+                               task->rcpt = g_list_prepend (task->rcpt, tmp);
+                               debug_task ("read rcpt header, value: %s", tmp);
                        }
-                       else if (task->cmd != CMD_LEARN && task->cmd != CMD_OTHER) {
-                               rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_ANY, 0);
-                               task->state = READ_MESSAGE;
-                               task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t));
+                       else if (g_ascii_strcasecmp (headern, NRCPT_HEADER) == 0) {
+                               task->nrcpt = strtoul (h->value->str, &err, 10);
+                               debug_task ("read rcpt header, value: %d", (gint)task->nrcpt);
                        }
                        else {
-                               task->last_error = "Unknown content length";
-                               task->error_code = RSPAMD_LENGTH_ERROR;
-                               task->state = WRITE_ERROR;
-                               return FALSE;
+                               msg_info ("wrong header: %s", headern);
+                               res = FALSE;
                        }
-               }
-               return TRUE;
-       }
-
-       headern = separate_command (line, ':');
-
-       if (line == NULL || headern == NULL) {
-               return FALSE;
-       }
-       /* Eat whitespaces */
-       g_strstrip (headern);
-       fstrstrip (line);
-
-       switch (headern[0]) {
-       case 'c':
-       case 'C':
-               /* content-length */
-               if (g_ascii_strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
-                       if (task->content_length == 0) {
-                               tmp = memory_pool_fstrdup (task->task_pool, line);
-                               task->content_length = strtoul (tmp, &err, 10);
-                               debug_task ("read Content-Length header, value: %ul", (guint32)task->content_length);
-                       }
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'd':
-       case 'D':
-               /* Deliver-To */
-               if (g_ascii_strncasecmp (headern, DELIVER_TO_HEADER, sizeof (DELIVER_TO_HEADER) - 1) == 0) {
-                       task->deliver_to = escape_braces_addr_fstr (task->task_pool, line);
-                       debug_task ("read deliver-to header, value: %s", task->deliver_to);
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'h':
-       case 'H':
-               /* helo */
-               if (g_ascii_strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
-                       task->helo = memory_pool_fstrdup (task->task_pool, line);
-                       debug_task ("read helo header, value: %s", task->helo);
-               }
-               else if (g_ascii_strncasecmp (headern, HOSTNAME_HEADER, sizeof (HOSTNAME_HEADER) - 1) == 0) {
-                       task->hostname = memory_pool_fstrdup (task->task_pool, line);
-                       debug_task ("read hostname header, value: %s", task->hostname);
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'f':
-       case 'F':
-               /* from */
-               if (g_ascii_strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
-                       task->from = escape_braces_addr_fstr (task->task_pool, line);
-                       debug_task ("read from header, value: %s", task->from);
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'j':
-       case 'J':
-               /* json */
-               if (g_ascii_strncasecmp (headern, JSON_HEADER, sizeof (JSON_HEADER) - 1) == 0) {
-                       task->is_json = parse_flag (memory_pool_fstrdup (task->task_pool, line));
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'q':
-       case 'Q':
-               /* Queue id */
-               if (g_ascii_strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
-                       task->queue_id = memory_pool_fstrdup (task->task_pool, line);
-                       debug_task ("read queue_id header, value: %s", task->queue_id);
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'r':
-       case 'R':
-               /* rcpt */
-               if (g_ascii_strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
-                       tmp = escape_braces_addr_fstr (task->task_pool, line);
-                       task->rcpt = g_list_prepend (task->rcpt, tmp);
-                       debug_task ("read rcpt header, value: %s", tmp);
-               }
-               else if (g_ascii_strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
-                       tmp = memory_pool_fstrdup (task->task_pool, line);
-                       task->nrcpt = strtoul (tmp, &err, 10);
-                       debug_task ("read rcpt header, value: %d", (gint)task->nrcpt);
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'i':
-       case 'I':
-               /* ip_addr */
-               if (g_ascii_strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) {
-                       tmp = memory_pool_fstrdup (task->task_pool, line);
+                       break;
+               case 'i':
+               case 'I':
+                       if (g_ascii_strcasecmp (headern, IP_ADDR_HEADER) == 0) {
+                               tmp = h->value->str;
 #ifdef HAVE_INET_PTON
-                       if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) {
-                               if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) {
-                                       task->from_addr.ipv6 = TRUE;
-                               }
-                               else {
-                                       msg_err ("bad ip header: '%s'", tmp);
-                                       return FALSE;
-                               }
-                               task->from_addr.has_addr = TRUE;
-                       }
-                       else {
-                               if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) {
-                                       /* Try ipv6 */
-                                       if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) {
+                               if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) {
+                                       if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) {
                                                task->from_addr.ipv6 = TRUE;
                                        }
                                        else {
                                                msg_err ("bad ip header: '%s'", tmp);
                                                return FALSE;
                                        }
+                                       task->from_addr.has_addr = TRUE;
                                }
                                else {
-                                       task->from_addr.ipv6 = FALSE;
+                                       if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) {
+                                               /* Try ipv6 */
+                                               if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) {
+                                                       task->from_addr.ipv6 = TRUE;
+                                               }
+                                               else {
+                                                       msg_err ("bad ip header: '%s'", tmp);
+                                                       return FALSE;
+                                               }
+                                       }
+                                       else {
+                                               task->from_addr.ipv6 = FALSE;
+                                       }
+                                       task->from_addr.has_addr = TRUE;
                                }
-                               task->from_addr.has_addr = TRUE;
-                       }
 #else
-                       if (!inet_aton (tmp, &task->from_addr)) {
-                               msg_err ("bad ip header: '%s'", tmp);
-                               return FALSE;
-                       }
+                               if (!inet_aton (tmp, &task->from_addr)) {
+                                       msg_err ("bad ip header: '%s'", tmp);
+                                       return FALSE;
+                               }
 #endif
-                       debug_task ("read IP header, value: %s", tmp);
-               }
-               else {
-                       msg_info ("wrong header: %s", headern);
-                       res = FALSE;
-               }
-               break;
-       case 'p':
-       case 'P':
-               /* Pass header */
-               if (g_ascii_strncasecmp (headern, PASS_HEADER, sizeof (PASS_HEADER) - 1) == 0) {
-                       if (line->len == sizeof ("all") - 1 && g_ascii_strncasecmp (line->begin, "all", sizeof ("all") - 1) == 0) {
-                               task->pass_all_filters = TRUE;
-                               msg_info ("pass all filters");
-                       } 
-               }
-               else {
-                       res = FALSE;
-               }
-               break;
-       case 's':
-       case 'S':
-               if (g_ascii_strncasecmp (headern, SUBJECT_HEADER, sizeof (SUBJECT_HEADER) - 1) == 0) {
-                       task->subject = memory_pool_fstrdup (task->task_pool, line);
-               }
-               else if (g_ascii_strncasecmp (headern, STATFILE_HEADER, sizeof (STATFILE_HEADER) - 1) == 0) {
-                       task->statfile = memory_pool_fstrdup (task->task_pool, line);
-               }
-               else {
-                       res = FALSE;
-               }
-               break;
-       case 'u':
-       case 'U':
-               if (g_ascii_strncasecmp (headern, USER_HEADER, sizeof (USER_HEADER) - 1) == 0) {
-                       task->user = memory_pool_fstrdup (task->task_pool, line);
-               }
-               else {
+                               debug_task ("read IP header, value: %s", tmp);
+                       }
+                       else {
+                               debug_task ("wrong header: %s", headern);
+                               res = FALSE;
+                       }
+                       break;
+               case 'p':
+               case 'P':
+                       if (g_ascii_strcasecmp (headern, PASS_HEADER) == 0) {
+                               if (h->value->len == sizeof ("all") - 1 &&
+                                               g_ascii_strcasecmp (h->value->str, "all") == 0) {
+                                       task->pass_all_filters = TRUE;
+                                       debug_task ("pass all filters");
+                               }
+                       }
+                       else {
+                               res = FALSE;
+                       }
+                       break;
+               case 's':
+               case 'S':
+                       if (g_ascii_strcasecmp (headern, SUBJECT_HEADER) == 0) {
+                               task->subject = h->value->str;
+                       }
+                       else {
+                               res = FALSE;
+                       }
+                       break;
+               case 'u':
+               case 'U':
+                       if (g_ascii_strcasecmp (headern, USER_HEADER) == 0) {
+                               task->user = h->value->str;
+                       }
+                       else {
+                               res = FALSE;
+                       }
+                       break;
+               default:
+                       debug_task ("wrong header: %s", headern);
                        res = FALSE;
+                       break;
                }
-               break;
-       default:
-               msg_info ("wrong header: %s", headern);
-               res = FALSE;
-               break;
        }
 
        if (!res && task->cfg->strict_protocol_headers) {
                msg_err ("deny processing of a request with incorrect or unknown headers");
+               task->last_error = "invalid header";
+               task->error_code = 400;
                return FALSE;
        }
 
@@ -663,18 +400,13 @@ parse_header (struct worker_task *task, f_str_t * line)
 }
 
 gboolean
-read_rspamd_input_line (struct worker_task *task, f_str_t * line)
+rspamd_protocol_handle_request (struct worker_task *task,
+               struct rspamd_http_message *msg)
 {
-       switch (task->state) {
-       case READ_COMMAND:
-               return parse_command (task, line);
-               break;
-       case READ_HEADER:
-               return parse_header (task, line);
-               break;
-       default:
-               return FALSE;
+       if (rspamd_protocol_handle_url (task, msg)) {
+               return rspamd_protocol_handle_headers (task, msg);
        }
+
        return FALSE;
 }
 
@@ -878,7 +610,17 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres,
        if (logbuf->str[logbuf->len - 1] == ',') {
                logbuf->len --;
        }
-       rspamd_printf_gstring (logbuf, "]), ");
+
+#ifdef HAVE_CLOCK_GETTIME
+       rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,",
+                       task->msg->len, calculate_check_time (&task->tv, &task->ts,
+                       task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests);
+#else
+       rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,",
+                       task->msg->len,
+                       calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds),
+                       task->dns_requests);
+#endif
 
        return obj;
 }
@@ -992,10 +734,12 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
 
        write_hashes_to_log (task, logbuf);
        msg_info ("%v", logbuf);
+       g_string_free (logbuf, TRUE);
 
        msg->body = g_string_sized_new (BUFSIZ);
        func.ud = msg->body;
        ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);
+       ucl_object_unref (top);
 
        /* Increase counters */
        task->worker->srv->stat->messages_scanned++;
@@ -1006,7 +750,7 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
 }
 
 gboolean
-write_reply (struct worker_task *task)
+rspamd_protocol_write_reply (struct worker_task *task)
 {
        struct rspamd_http_message    *msg;
 
@@ -1038,13 +782,6 @@ write_reply (struct worker_task *task)
                                                "text/plain", task, task->sock, &task->tv, task->ev_base);
                        task->state = CLOSING_CONNECTION;
                        break;
-               case CMD_LEARN:
-                       msg->code = task->error_code;
-                       rspamd_http_connection_write_message (task->http_conn, msg, NULL,
-                                               "text/plain", task, task->sock, &task->tv, task->ev_base);
-                       task->state = CLOSING_CONNECTION;
-                       return TRUE;
-                       break;
                case CMD_OTHER:
                        task->state = CLOSING_CONNECTION;
                        return task->custom_cmd->func (task);
index 72460940f9788697832bde424a7120f69bb09d1d..4b62fdfb219f37f3a9585a832d79e59677b8d73e 100644 (file)
@@ -8,6 +8,7 @@
 
 #include "config.h"
 #include "filter.h"
+#include "http.h"
 
 #define RSPAMD_FILTER_ERROR 1
 #define RSPAMD_NETWORK_ERROR 2
 #define RSPAMD_LENGTH_ERROR 4
 #define RSPAMD_STATFILE_ERROR 5
 
-#define RSPAMC_PROTO_1_0 "1.0"
-#define RSPAMC_PROTO_1_1 "1.1"
-#define RSPAMC_PROTO_1_2 "1.2"
-#define RSPAMC_PROTO_1_3 "1.3"
-
-/*
- * Reply messages
- */
-#define RSPAMD_REPLY_BANNER "RSPAMD"
-#define SPAMD_REPLY_BANNER "SPAMD"
-#define SPAMD_OK "EX_OK"
-/* XXX: try to convert rspamd errors to spamd errors */
-#define SPAMD_ERROR "EX_ERROR"
-
 struct worker_task;
 struct metric;
 
-enum rspamd_protocol {
-       SPAMC_PROTO,
-       RSPAMC_PROTO,
-};
 
 enum rspamd_command {
        CMD_CHECK,
@@ -45,8 +28,7 @@ enum rspamd_command {
        CMD_SKIP,
        CMD_PING,
        CMD_PROCESS,
-       CMD_LEARN,
-       CMD_OTHER,
+       CMD_OTHER
 };
 
 
@@ -58,27 +40,19 @@ struct custom_command {
 };
 
 /**
- * Find a character in command in and return pointer to the first part of the string, in is modified to point to the second part of string
- * @param in f_str_t input
- * @param c separator character
- * @return pointer to the first part of string or NULL if there is no separator found
- */
-gchar* separate_command (f_str_t * in, gchar c);
-
-/**
- * Read one line of user's input for specified task
- * @param task task object
- * @param line line of user's input
- * @return 0 if line was successfully parsed and -1 if we have protocol error
+ * Process HTTP request to the task structure
+ * @param task
+ * @param msg
+ * @return
  */
-gboolean read_rspamd_input_line (struct worker_task *task, f_str_t *line);
+gboolean rspamd_protocol_handle_request (struct worker_task *task, struct rspamd_http_message *msg);
 
 /**
  * Write reply for specified task command
  * @param task task object
  * @return 0 if we wrote reply and -1 if there was some error
  */
-gboolean write_reply (struct worker_task *task) G_GNUC_WARN_UNUSED_RESULT;
+gboolean rspamd_protocol_write_reply (struct worker_task *task);
 
 
 /**
index 4a88bb1c92f3aa75b2c7d8573aa50bc27fdc7414..65f3b106b6b6a3416b8a0e3631cec0a2ec9b7eff 100644 (file)
@@ -1409,29 +1409,6 @@ compare_url_func (gconstpointer a, gconstpointer b)
        return r;
 }
 
-gchar *
-escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in)
-{
-       gint                          len = 0;
-       gchar                        *res, *orig, *p;
-
-       orig = in->begin;
-       while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->begin < (gint)in->len) {
-               orig ++;
-       }
-
-       p = orig;
-       while ((!g_ascii_isspace (*p) && *p != '>') && p - in->begin < (gint)in->len) {
-               p ++;
-               len ++;
-       }
-
-       res = memory_pool_alloc (pool, len + 1);
-       rspamd_strlcpy (res, orig, len + 1);
-
-       return res;
-}
-
 /*
  * Find the first occurrence of find in s, ignore case.
  */
index 2951942e37a35cfb9293fc175464c9772461e5fd..9b7891c05d6202606a1e89d97595af4d512e84c0 100644 (file)
@@ -218,11 +218,6 @@ gsize rspamd_strlcpy (gchar *dst, const gchar *src, gsize siz);
  */
 gsize rspamd_strlcpy_tolower (gchar *dst, const gchar *src, gsize siz);
 
-/*
- * Strip <> from email address
- */
-gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in);
-
 /*
  * Convert milliseconds to timeval fields
  */
index f2c579fdca70a1da45337bbd88b82bcb4edc630c..4199a809f1aa2d664fd7609d66a7b673322d24bd 100644 (file)
@@ -89,8 +89,6 @@ struct rspamd_worker_ctx {
        struct event_base              *ev_base;
 };
 
-static gboolean                 write_socket (void *arg);
-
 static sig_atomic_t             wanna_die = 0;
 
 #ifndef HAVE_SA_SIGINFO
@@ -155,242 +153,6 @@ sigusr1_handler (gint fd, short what, void *arg)
        return;
 }
 
-# if 0
-/*
- * Callback that is called when there is data to read in buffer
- */
-static                          gboolean
-read_socket (f_str_t * in, void *arg)
-{
-       struct worker_task             *task = (struct worker_task *) arg;
-       struct rspamd_worker_ctx       *ctx;
-       ssize_t                         r;
-       GError                         *err = NULL;
-
-       ctx = task->worker->ctx;
-       switch (task->state) {
-       case READ_COMMAND:
-       case READ_HEADER:
-               if (!read_rspamd_input_line (task, in)) {
-                       if (!task->last_error) {
-                               task->last_error = "Read error";
-                               task->error_code = RSPAMD_NETWORK_ERROR;
-                       }
-                       task->state = WRITE_ERROR;
-               }
-               if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
-                       return write_socket (task);
-               }
-               break;
-       case READ_MESSAGE:
-               /* Allow half-closed connections to be proceed */
-
-               debug_task ("got string of length %z", task->msg->len);
-               if (task->content_length > 0) {
-                       task->msg->begin = in->begin;
-                       task->msg->len = in->len;
-                       task->state = WAIT_FILTER;
-                       task->dispatcher->want_read = FALSE;
-               }
-               else {
-                       task->dispatcher->want_read = FALSE;
-                       if (in->len > 0) {
-                               if (task->msg->begin == NULL) {
-                                       /* Allocate buf */
-                                       task->msg->size = MAX (BUFSIZ, in->len);
-                                       task->msg->begin = g_malloc (task->msg->size);
-                                       memcpy (task->msg->begin, in->begin, in->len);
-                                       task->msg->len = in->len;
-                               }
-                               else if (task->msg->size >= task->msg->len + in->len) {
-                                       memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
-                                       task->msg->len += in->len;
-                               }
-                               else {
-                                       /* Need to realloc */
-                                       task->msg->size = MAX (task->msg->size * 2, task->msg->size + in->len);
-                                       task->msg->begin = g_realloc (task->msg->begin, task->msg->size);
-                                       memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
-                                       task->msg->len += in->len;
-                               }
-                               /* Want more */
-                               return TRUE;
-                       }
-                       else if (task->msg->len > 0) {
-                               memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_free, task->msg->begin);
-                       }
-                       else {
-                               msg_warn ("empty message passed");
-                               task->last_error = "MIME processing error";
-                               task->error_code = RSPAMD_FILTER_ERROR;
-                               task->state = WRITE_ERROR;
-                               return write_socket (task);
-                       }
-               }
-
-               r = process_message (task);
-               if (r == -1) {
-                       msg_warn ("processing of message failed");
-                       task->last_error = "MIME processing error";
-                       task->error_code = RSPAMD_FILTER_ERROR;
-                       task->state = WRITE_ERROR;
-                       return write_socket (task);
-               }
-               if (task->cmd == CMD_OTHER) {
-                       /* Skip filters */
-                       task->state = WRITE_REPLY;
-                       return write_socket (task);
-               }
-               else if (task->cmd == CMD_LEARN) {
-                       if (!learn_task (task->statfile, task, &err)) {
-                               task->last_error = memory_pool_strdup (task->task_pool, err->message);
-                               task->error_code = err->code;
-                               g_error_free (err);
-                               task->state = WRITE_ERROR;
-                       }
-                       else {
-                               task->last_error = "learn ok";
-                               task->error_code = 0;
-                               task->state = WRITE_REPLY;
-                       }
-                       return write_socket (task);
-               }
-               else {
-                       if (task->cfg->pre_filters == NULL) {
-                               r = process_filters (task);
-                               if (r == -1) {
-                                       task->last_error = "Filter processing error";
-                                       task->error_code = RSPAMD_FILTER_ERROR;
-                                       task->state = WRITE_ERROR;
-                                       return write_socket (task);
-                               }
-                               /* Add task to classify to classify pool */
-                               if (!task->is_skipped && ctx->classify_pool) {
-                                       register_async_thread (task->s);
-                                       g_thread_pool_push (ctx->classify_pool, task, &err);
-                                       if (err != NULL) {
-                                               msg_err ("cannot pull task to the pool: %s", err->message);
-                                               remove_async_thread (task->s);
-                                       }
-                               }
-                               if (task->is_skipped) {
-                                       /* Call write_socket to write reply and exit */
-                                       return write_socket (task);
-                               }
-                       }
-                       else {
-                               lua_call_pre_filters (task);
-                               /* We want fin_task after pre filters are processed */
-                               task->s->wanna_die = TRUE;
-                               task->state = WAIT_PRE_FILTER;
-                               check_session_pending (task->s);
-                       }
-               }
-               break;
-       case WRITE_REPLY:
-       case WRITE_ERROR:
-               return write_socket (task);
-               break;
-       case WAIT_FILTER:
-       case WAIT_POST_FILTER:
-       case WAIT_PRE_FILTER:
-               msg_info ("ignoring trailing garbadge of size %z", in->len);
-               break;
-       default:
-               debug_task ("invalid state on reading stage");
-               break;
-       }
-
-       return TRUE;
-}
-
-/*
- * Callback for socket writing
- */
-static                          gboolean
-write_socket (void *arg)
-{
-       struct worker_task             *task = (struct worker_task *) arg;
-       struct rspamd_worker_ctx       *ctx;
-       GError                                                  *err = NULL;
-       gint                                                     r;
-
-       ctx = task->worker->ctx;
-
-       switch (task->state) {
-       case WRITE_REPLY:
-               task->state = WRITING_REPLY;
-               if (!write_reply (task)) {
-                       return FALSE;
-               }
-               destroy_session (task->s);
-               return FALSE;
-               break;
-       case WRITE_ERROR:
-               task->state = WRITING_REPLY;
-               if (!write_reply (task)) {
-                       return FALSE;
-               }
-               destroy_session (task->s);
-               return FALSE;
-               break;
-       case CLOSING_CONNECTION:
-               debug_task ("normally closing connection");
-               destroy_session (task->s);
-               return FALSE;
-               break;
-       case WRITING_REPLY:
-       case WAIT_FILTER:
-       case WAIT_POST_FILTER:
-               /* Do nothing here */
-               break;
-       case WAIT_PRE_FILTER:
-               task->state = WAIT_FILTER;
-               r = process_filters (task);
-               if (r == -1) {
-                       task->last_error = "Filter processing error";
-                       task->error_code = RSPAMD_FILTER_ERROR;
-                       task->state = WRITE_ERROR;
-                       return write_socket (task);
-               }
-               /* Add task to classify to classify pool */
-               if (!task->is_skipped && ctx->classify_pool) {
-                       register_async_thread (task->s);
-                       g_thread_pool_push (ctx->classify_pool, task, &err);
-                       if (err != NULL) {
-                               msg_err ("cannot pull task to the pool: %s", err->message);
-                               remove_async_thread (task->s);
-                       }
-               }
-               if (task->is_skipped) {
-                       /* Call write_socket again to write reply and exit */
-                       return write_socket (task);
-               }
-               break;
-       default:
-               msg_info ("abnormally closing connection at state: %d", task->state);
-               destroy_session (task->s);
-               return FALSE;
-               break;
-       }
-       return TRUE;
-}
-
-/*
- * Called if something goes wrong
- */
-static void
-err_socket (GError * err, void *arg)
-{
-       struct worker_task             *task = (struct worker_task *) arg;
-
-       msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
-       /* Free buffers */
-       g_error_free (err);
-       destroy_session (task->s);
-}
-#endif
-
 /*
  * Called if all filters are processed
  */
@@ -410,7 +172,7 @@ fin_task (void *arg)
                        task->fin_callback (task->fin_arg);
                }
                else {
-                       write_reply (task);
+                       rspamd_protocol_write_reply (task);
                }
                return TRUE;
        }
@@ -443,7 +205,7 @@ fin_task (void *arg)
                        task->fin_callback (task->fin_arg);
                }
                else {
-                       write_reply (task);
+                       rspamd_protocol_write_reply (task);
                }
        }
        else {
@@ -455,7 +217,7 @@ fin_task (void *arg)
                                task->fin_callback (task->fin_arg);
                        }
                        else {
-                               write_reply (task);
+                               rspamd_protocol_write_reply (task);
                        }
                }
                else {
@@ -465,7 +227,7 @@ fin_task (void *arg)
                                task->last_error = "Filter processing error";
                                task->error_code = RSPAMD_FILTER_ERROR;
                                task->state = WRITE_ERROR;
-                               write_reply (task);
+                               rspamd_protocol_write_reply (task);
                        }
                        /* Add task to classify to classify pool */
                        if (!task->is_skipped && ctx->classify_pool) {
@@ -478,7 +240,7 @@ fin_task (void *arg)
                                }
                        }
                        if (task->is_skipped) {
-                               write_reply (task);
+                               rspamd_protocol_write_reply (task);
                        }
                }
        }
@@ -510,7 +272,7 @@ reduce_tasks_count (gpointer arg)
        (*tasks) --;
 }
 
-static gboolean
+static gint
 rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                struct rspamd_http_message *msg,
                const gchar *chunk, gsize len)
@@ -524,7 +286,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
        if (msg->body->len == 0) {
                msg_err ("got zero length body, cannot continue");
-               return FALSE;
+               return 0;
+       }
+
+       if (!rspamd_protocol_handle_request (task, msg)) {
+               task->state = WRITE_ERROR;
+               return 0;
        }
 
        task->msg = msg->body;
@@ -537,26 +304,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                task->last_error = "MIME processing error";
                task->error_code = RSPAMD_FILTER_ERROR;
                task->state = WRITE_ERROR;
-               return FALSE;
+               return 0;
        }
        if (task->cmd == CMD_OTHER) {
                /* Skip filters */
                task->state = WRITE_REPLY;
-               return FALSE;
-       }
-       else if (task->cmd == CMD_LEARN) {
-               if (!learn_task (task->statfile, task, &err)) {
-                       task->last_error = memory_pool_strdup (task->task_pool, err->message);
-                       task->error_code = err->code;
-                       g_error_free (err);
-                       task->state = WRITE_ERROR;
-               }
-               else {
-                       task->last_error = "learn ok";
-                       task->error_code = 0;
-                       task->state = WRITE_REPLY;
-               }
-               return FALSE;
+               return 0;
        }
        else {
                if (task->cfg->pre_filters == NULL) {
@@ -565,7 +318,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                                task->last_error = "Filter processing error";
                                task->error_code = RSPAMD_FILTER_ERROR;
                                task->state = WRITE_ERROR;
-                               return FALSE;
+                               return 0;
                        }
                        /* Add task to classify to classify pool */
                        if (!task->is_skipped && ctx->classify_pool) {
@@ -578,7 +331,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                        }
                        if (task->is_skipped) {
                                /* Call write_socket to write reply and exit */
-                               return TRUE;
+                               task->state = WRITE_REPLY;
+                               return 0;
                        }
                }
                else {
@@ -589,7 +343,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                        check_session_pending (task->s);
                }
        }
-       return TRUE;
+       return 0;
 }
 
 static void
@@ -611,6 +365,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
                msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
                destroy_session (task->s);
        }
+       else {
+               check_session_pending (task->s);
+       }
 }
 
 /*
@@ -667,7 +424,6 @@ accept_socket (gint fd, short what, void *arg)
        new_task->sock = nfd;
        new_task->is_mime = ctx->is_mime;
        new_task->is_json = ctx->is_json;
-       new_task->is_http = ctx->is_http;
        new_task->allow_learn = ctx->allow_learn;
 
        worker->srv->stat->connections_count++;
@@ -779,7 +535,7 @@ start_worker (struct rspamd_worker *worker)
 
        event_base_loop (ctx->ev_base, 0);
 
-
+       g_mime_shutdown ();
        close_log (rspamd_main->logger);
        exit (EXIT_SUCCESS);
 }