]> source.dussan.org Git - rspamd.git/commitdiff
Start moving to HTTP world.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 12 Jan 2014 15:20:50 +0000 (15:20 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 12 Jan 2014 15:20:50 +0000 (15:20 +0000)
13 files changed:
centos/rspamd.spec
src/controller.c
src/dkim.c
src/http.c
src/lua/lua_task.c
src/main.h
src/message.c
src/plugins/fuzzy_check.c
src/plugins/regexp.c
src/smtp.c
src/smtp_utils.c
src/webui.c
src/worker.c

index bf46c076ae3005f82593b92997cef88d525a139b..595b8d7dadf9a3866ccf8b0c041743f6077b8f90 100644 (file)
@@ -229,7 +229,7 @@ fi
 %{rspamd_confdir}/lua/rspamd.classifiers.lua
 
 %changelog
-* Fri Jan 10 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1
+* Fri Jan 10 2014 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1
 - Update to 0.6.7.
 
 * Fri Dec 27 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.6-1
index d3f0f285553e255fb8ff6382c08009f56737be1f..a43b1420326a254b7e7a6c6c146cc5a837d8e7ec 100644 (file)
@@ -1411,9 +1411,7 @@ controller_read_socket (f_str_t * in, void *arg)
                session->learn_buf = in;
                task = construct_task (session->worker);
 
-               task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-               task->msg->begin = in->begin;
-               task->msg->len = in->len;
+               task->msg = g_string_new_len (in->begin, in->len);
                task->ev_base = session->ev_base;
 
                r = process_message (task);
@@ -1476,9 +1474,7 @@ controller_read_socket (f_str_t * in, void *arg)
                session->learn_buf = in;
                task = construct_task (session->worker);
 
-               task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-               task->msg->begin = in->begin;
-               task->msg->len = in->len;
+               task->msg = g_string_new_len (in->begin, in->len);
 
                task->resolver = session->resolver;
                task->ev_base = session->ev_base;
@@ -1538,9 +1534,7 @@ controller_read_socket (f_str_t * in, void *arg)
                session->learn_buf = in;
                task = construct_task (session->worker);
 
-               task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-               task->msg->begin = in->begin;
-               task->msg->len = in->len;
+               task->msg = g_string_new_len (in->begin, in->len);
                task->ev_base = session->ev_base;
 
                r = process_message (task);
index bd57cd2321c563c0063c2ebf481cf934c0d0b958..2cf6856414bdbcacb647c5a2ffe5f4e9c30c8b29 100644 (file)
@@ -1356,9 +1356,9 @@ rspamd_dkim_check (rspamd_dkim_context_t *ctx, rspamd_dkim_key_t *key, struct wo
        g_return_val_if_fail (task->msg != NULL, DKIM_ERROR);
 
        /* First of all find place of body */
-       p = task->msg->begin;
+       p = task->msg->str;
 
-       end = task->msg->begin + task->msg->len;
+       end = task->msg->str + task->msg->len;
 
        while (p <= end) {
                /* Search for \r\n\r\n at the end of headers */
index 721a74b137a664cb43afadd83e1a12aa8fd8a61d..2d76ed0922b88444ddc76797e9270511923828e2 100644 (file)
@@ -518,6 +518,7 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn)
                err = g_error_new (HTTP_ERROR, errno, "IO write error: %s", strerror (errno));
                conn->error_handler (conn, err);
                g_error_free (err);
+               return;
        }
        else {
                priv->wr_pos += r;
@@ -550,6 +551,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud)
                        err = g_error_new (HTTP_ERROR, errno, "IO read error: %s", strerror (errno));
                        conn->error_handler (conn, err);
                        g_error_free (err);
+                       return;
                }
                else {
                        buf->len = r;
@@ -558,6 +560,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud)
                                                "HTTP parser error: %s", http_errno_description (priv->parser.http_errno));
                                conn->error_handler (conn, err);
                                g_error_free (err);
+                               return;
                        }
                }
        }
@@ -566,6 +569,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud)
                                "IO timeout");
                conn->error_handler (conn, err);
                g_error_free (err);
+               return;
        }
        else if (what == EV_WRITE) {
                rspamd_http_write_helper (conn);
index e653e25d1ce093ba6d4ddd4e0586d1d1f082854d..a9e0d543364edcd572b1e3a2125d0a7e394d6f86 100644 (file)
@@ -283,10 +283,7 @@ lua_task_create_from_buffer (lua_State *L)
                ptask = lua_newuserdata (L, sizeof (gpointer));
                lua_setclass (L, "rspamd{task}", -1);
                *ptask = task;
-               task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-               task->msg->begin = memory_pool_alloc (task->task_pool, len);
-               memcpy (task->msg->begin, data, len);
-               task->msg->len = len;
+               task->msg = g_string_new_len (data, len);
        }
        return 1;
 }
index ab8146cc41c20f50fd89b71ae9e491282a5d51b5..81cdfb9f1739cde76697814ede89b49f27a5b063 100644 (file)
@@ -20,6 +20,7 @@
 #include "util.h"
 #include "logger.h"
 #include "roll_history.h"
+#include "http.h"
 
 /* Default values */
 #define FIXED_CONFIG_FILE RSPAMD_CONFDIR "/rspamd.conf"
@@ -222,8 +223,9 @@ struct worker_task {
        gchar *subject;                                                                                         /**< subject (for non-mime)                                                     */
        gchar *hostname;                                                                                        /**< hostname reported by MTA                                           */
        gchar *statfile;                                                                                        /**< statfile for learning                                                      */
-       f_str_t *msg;                                                                                           /**< message buffer                                                                     */
+       GString *msg;                                                                                           /**< message buffer                                                                     */
        rspamd_io_dispatcher_t *dispatcher;                                                     /**< IO dispatcher object                                                       */
+       struct rspamd_http_connection *http_conn;                                       /**< HTTP server connection                                                     */
        struct rspamd_async_session* s;                                                         /**< async session object                                                       */
        gint parts_count;                                                                                       /**< mime parts count                                                           */
        GMimeMessage *message;                                                                          /**< message, parsed with GMime                                         */
index 907893d91d0e20ff20ad6517f2332e33caea1273..6ad9610e41c32490e97d805cd3e07e4f874fead0 100644 (file)
@@ -1048,7 +1048,7 @@ process_message (struct worker_task *task)
        gint                            rc;
 
        tmp = memory_pool_alloc (task->task_pool, sizeof (GByteArray));
-       tmp->data = task->msg->begin;
+       tmp->data = task->msg->str;
        tmp->len = task->msg->len;
 
        stream = g_mime_stream_mem_new_with_byte_array (tmp);
index ba07acd2f37a0592bc80a4528cb6de701146aa29..7913a1ea6433eeffd01db95b627cf91523b84122 100644 (file)
@@ -1020,10 +1020,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
        session->state = STATE_WAIT;
 
        /* Allocate message from string */
-       task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-       task->msg->begin = in->begin;
-       task->msg->len = in->len;
-
+       task->msg = g_string_new_len (in->begin, in->len);
 
        saved = memory_pool_alloc0 (session->session_pool, sizeof (gint));
        err = memory_pool_alloc0 (session->session_pool, sizeof (GError *));
index e9789b5662a10d4b875daa6359c8b0213f453881..ece8a53fc420bd5d888facb246e327e08b518867 100644 (file)
@@ -878,7 +878,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task, const gchar
        case REGEXP_MESSAGE:
                debug_task ("checking message regexp: %s", re->regexp_text);
                regexp = re->raw_regexp;
-               ct = task->msg->begin;
+               ct = task->msg->str;
                clen = task->msg->len;
 
                if (regexp_module_ctx->max_size != 0 && clen > regexp_module_ctx->max_size) {
index 2cfddfe19f6600da3cb64c7e7f6f7ecb604d58b0..8ef68a675e85dde4cdc685df981bc8aeeb9cfcbf 100644 (file)
@@ -310,12 +310,12 @@ process_smtp_data (struct smtp_session *session)
                session->task->resolver = session->resolver;
                session->task->fin_callback = smtp_write_socket;
                session->task->fin_arg = session;
-               session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t));
+               session->task->msg = memory_pool_alloc (session->pool, sizeof (GString));
                session->task->s = session->s;
 #ifdef HAVE_MMAP_NOCORE
-               if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
+               if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
 #else
-               if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
+               if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
 #endif
                        msg_err ("mmap failed: %s", strerror (errno));
                        goto err;
@@ -348,14 +348,14 @@ process_smtp_data (struct smtp_session *session)
 
                if (process_message (session->task) == -1) {
                        msg_err ("cannot process message");
-                       munmap (session->task->msg->begin, st.st_size);
+                       munmap (session->task->msg->str, st.st_size);
                        goto err;
                }
                if (session->task->cfg->pre_filters == NULL) {
                        r = process_filters (session->task);
                        if (r == -1) {
                                msg_err ("cannot process message");
-                               munmap (session->task->msg->begin, st.st_size);
+                               munmap (session->task->msg->str, st.st_size);
                                goto err;
                        }
                }
index 13e1617ce5c5f97d8637caa0c4df51974fce36d8..ed7dfeb7e4468b96297fe7e14fb408644bb32d67 100644 (file)
@@ -36,8 +36,8 @@ free_smtp_session (gpointer arg)
        if (session) {
                if (session->task) {
                        free_task (session->task, FALSE);
-                       if (session->task->msg->begin) {
-                               munmap (session->task->msg->begin, session->task->msg->len);
+                       if (session->task->msg->str) {
+                               munmap (session->task->msg->str, session->task->msg->len);
                        }
                }
                if (session->rcpt) {
index 24271e0abfb6c1b475eea2d2289e44b0ba5dc161..3553523f8d8cff87c776143d7d0d85e87e60eb23 100644 (file)
@@ -462,9 +462,7 @@ http_prepare_scan (struct evhttp_request *req, struct rspamd_webui_worker_ctx *c
                return NULL;
        }
 
-       task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-       task->msg->begin = EVBUFFER_DATA (in);
-       task->msg->len = EVBUFFER_LENGTH (in);
+       task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in));
 
        task->resolver = ctx->resolver;
        task->ev_base = ctx->ev_base;
@@ -628,9 +626,7 @@ http_prepare_learn (struct evhttp_request *req, struct rspamd_webui_worker_ctx *
                return NULL;
        }
 
-       task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
-       task->msg->begin = EVBUFFER_DATA (in);
-       task->msg->len = EVBUFFER_LENGTH (in);
+       task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in));
 
        task->resolver = ctx->resolver;
        task->ev_base = ctx->ev_base;
index 3eb91a289b305605f30bcce945e0ff56f52220b9..a36a0ed72468faf96eb8a967c30f937f93ed461a 100644 (file)
@@ -155,7 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg)
        return;
 }
 
-
+# if 0
 /*
  * Callback that is called when there is data to read in buffer
  */
@@ -389,6 +389,7 @@ err_socket (GError * err, void *arg)
        g_error_free (err);
        destroy_session (task->s);
 }
+#endif
 
 /*
  * Called if all filters are processed
@@ -489,6 +490,104 @@ reduce_tasks_count (gpointer arg)
        (*tasks) --;
 }
 
+static gboolean
+rspamd_worker_body_handler (struct rspamd_http_connection *conn,
+               struct rspamd_http_message *msg,
+               const gchar *chunk, gsize len)
+{
+       struct worker_task             *task = (struct worker_task *) conn->ud;
+       struct rspamd_worker_ctx       *ctx;
+       ssize_t                         r;
+       GError                         *err = NULL;
+
+       ctx = task->worker->ctx;
+
+       if (msg->body->len == 0) {
+               msg_err ("got zero length body, cannot continue");
+               return FALSE;
+       }
+
+       task->msg = msg->body;
+
+       debug_task ("got string of length %z", task->msg->len);
+
+       r = process_message (task);
+       if (r == -1) {
+               msg_warn ("processing of message failed");
+               task->last_error = "MIME processing error";
+               task->error_code = RSPAMD_FILTER_ERROR;
+               task->state = WRITE_ERROR;
+               return FALSE;
+       }
+       if (task->cmd == CMD_OTHER) {
+               /* Skip filters */
+               task->state = WRITE_REPLY;
+               return FALSE;
+       }
+       else if (task->cmd == CMD_LEARN) {
+               if (!learn_task (task->statfile, task, &err)) {
+                       task->last_error = memory_pool_strdup (task->task_pool, err->message);
+                       task->error_code = err->code;
+                       g_error_free (err);
+                       task->state = WRITE_ERROR;
+               }
+               else {
+                       task->last_error = "learn ok";
+                       task->error_code = 0;
+                       task->state = WRITE_REPLY;
+               }
+               return FALSE;
+       }
+       else {
+               if (task->cfg->pre_filters == NULL) {
+                       r = process_filters (task);
+                       if (r == -1) {
+                               task->last_error = "Filter processing error";
+                               task->error_code = RSPAMD_FILTER_ERROR;
+                               task->state = WRITE_ERROR;
+                               return FALSE;
+                       }
+                       /* Add task to classify to classify pool */
+                       if (!task->is_skipped && ctx->classify_pool) {
+                               register_async_thread (task->s);
+                               g_thread_pool_push (ctx->classify_pool, task, &err);
+                               if (err != NULL) {
+                                       msg_err ("cannot pull task to the pool: %s", err->message);
+                                       remove_async_thread (task->s);
+                               }
+                       }
+                       if (task->is_skipped) {
+                               /* Call write_socket to write reply and exit */
+                               return TRUE;
+                       }
+               }
+               else {
+                       lua_call_pre_filters (task);
+                       /* We want fin_task after pre filters are processed */
+                       task->s->wanna_die = TRUE;
+                       task->state = WAIT_PRE_FILTER;
+                       check_session_pending (task->s);
+               }
+       }
+       return TRUE;
+}
+
+static void
+rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct worker_task             *task = (struct worker_task *) conn->ud;
+
+       msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
+       destroy_session (task->s);
+}
+
+static void
+rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
+               struct rspamd_http_message *msg)
+{
+
+}
+
 /*
  * Accept new connection and construct task
  */
@@ -550,11 +649,15 @@ accept_socket (gint fd, short what, void *arg)
        new_task->resolver = ctx->resolver;
        msec_to_tv (ctx->timeout, &ctx->io_tv);
 
+#if 0
        /* Set up dispatcher */
        new_task->dispatcher =
                        rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket,
                                        err_socket, &ctx->io_tv, (void *) new_task);
        new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+#endif
+       new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler,
+                       rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER);
        new_task->ev_base = ctx->ev_base;
        ctx->tasks ++;
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
@@ -562,6 +665,8 @@ accept_socket (gint fd, short what, void *arg)
        /* Set up async session */
        new_task->s =
                                new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
+
+       rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
 }
 
 gpointer