diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-01-12 15:20:50 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-01-12 15:20:50 +0000 |
commit | 67b932ad9786743dc032ff2adc3788c5aadd3933 (patch) | |
tree | 084872b519dc54beef83cbdade01af8bb639eafd | |
parent | d6b454eda97f6ffe8455ec83ab8da6293e7b9c07 (diff) | |
download | rspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.tar.gz rspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.zip |
Start moving to HTTP world.
-rw-r--r-- | centos/rspamd.spec | 2 | ||||
-rw-r--r-- | src/controller.c | 12 | ||||
-rw-r--r-- | src/dkim.c | 4 | ||||
-rw-r--r-- | src/http.c | 4 | ||||
-rw-r--r-- | src/lua/lua_task.c | 5 | ||||
-rw-r--r-- | src/main.h | 4 | ||||
-rw-r--r-- | src/message.c | 2 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 5 | ||||
-rw-r--r-- | src/plugins/regexp.c | 2 | ||||
-rw-r--r-- | src/smtp.c | 10 | ||||
-rw-r--r-- | src/smtp_utils.c | 4 | ||||
-rw-r--r-- | src/webui.c | 8 | ||||
-rw-r--r-- | src/worker.c | 107 |
13 files changed, 132 insertions, 37 deletions
diff --git a/centos/rspamd.spec b/centos/rspamd.spec index bf46c076a..595b8d7da 100644 --- a/centos/rspamd.spec +++ b/centos/rspamd.spec @@ -229,7 +229,7 @@ fi %{rspamd_confdir}/lua/rspamd.classifiers.lua %changelog -* Fri Jan 10 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1 +* Fri Jan 10 2014 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1 - Update to 0.6.7. * Fri Dec 27 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.6-1 diff --git a/src/controller.c b/src/controller.c index d3f0f2855..a43b14203 100644 --- a/src/controller.c +++ b/src/controller.c @@ -1411,9 +1411,7 @@ controller_read_socket (f_str_t * in, void *arg) session->learn_buf = in; task = construct_task (session->worker); - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = in->begin; - task->msg->len = in->len; + task->msg = g_string_new_len (in->begin, in->len); task->ev_base = session->ev_base; r = process_message (task); @@ -1476,9 +1474,7 @@ controller_read_socket (f_str_t * in, void *arg) session->learn_buf = in; task = construct_task (session->worker); - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = in->begin; - task->msg->len = in->len; + task->msg = g_string_new_len (in->begin, in->len); task->resolver = session->resolver; task->ev_base = session->ev_base; @@ -1538,9 +1534,7 @@ controller_read_socket (f_str_t * in, void *arg) session->learn_buf = in; task = construct_task (session->worker); - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = in->begin; - task->msg->len = in->len; + task->msg = g_string_new_len (in->begin, in->len); task->ev_base = session->ev_base; r = process_message (task); diff --git a/src/dkim.c b/src/dkim.c index bd57cd232..2cf685641 100644 --- a/src/dkim.c +++ b/src/dkim.c @@ -1356,9 +1356,9 @@ rspamd_dkim_check (rspamd_dkim_context_t *ctx, rspamd_dkim_key_t *key, struct wo g_return_val_if_fail (task->msg != NULL, DKIM_ERROR); /* First of all find place of body */ - p = task->msg->begin; + p = task->msg->str; - end = task->msg->begin + task->msg->len; + end = task->msg->str + task->msg->len; while (p <= end) { /* Search for \r\n\r\n at the end of headers */ diff --git a/src/http.c b/src/http.c index 721a74b13..2d76ed092 100644 --- a/src/http.c +++ b/src/http.c @@ -518,6 +518,7 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) err = g_error_new (HTTP_ERROR, errno, "IO write error: %s", strerror (errno)); conn->error_handler (conn, err); g_error_free (err); + return; } else { priv->wr_pos += r; @@ -550,6 +551,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) err = g_error_new (HTTP_ERROR, errno, "IO read error: %s", strerror (errno)); conn->error_handler (conn, err); g_error_free (err); + return; } else { buf->len = r; @@ -558,6 +560,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) "HTTP parser error: %s", http_errno_description (priv->parser.http_errno)); conn->error_handler (conn, err); g_error_free (err); + return; } } } @@ -566,6 +569,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) "IO timeout"); conn->error_handler (conn, err); g_error_free (err); + return; } else if (what == EV_WRITE) { rspamd_http_write_helper (conn); diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index e653e25d1..a9e0d5433 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -283,10 +283,7 @@ lua_task_create_from_buffer (lua_State *L) ptask = lua_newuserdata (L, sizeof (gpointer)); lua_setclass (L, "rspamd{task}", -1); *ptask = task; - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = memory_pool_alloc (task->task_pool, len); - memcpy (task->msg->begin, data, len); - task->msg->len = len; + task->msg = g_string_new_len (data, len); } return 1; } diff --git a/src/main.h b/src/main.h index ab8146cc4..81cdfb9f1 100644 --- a/src/main.h +++ b/src/main.h @@ -20,6 +20,7 @@ #include "util.h" #include "logger.h" #include "roll_history.h" +#include "http.h" /* Default values */ #define FIXED_CONFIG_FILE RSPAMD_CONFDIR "/rspamd.conf" @@ -222,8 +223,9 @@ struct worker_task { gchar *subject; /**< subject (for non-mime) */ gchar *hostname; /**< hostname reported by MTA */ gchar *statfile; /**< statfile for learning */ - f_str_t *msg; /**< message buffer */ + GString *msg; /**< message buffer */ rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */ + struct rspamd_http_connection *http_conn; /**< HTTP server connection */ struct rspamd_async_session* s; /**< async session object */ gint parts_count; /**< mime parts count */ GMimeMessage *message; /**< message, parsed with GMime */ diff --git a/src/message.c b/src/message.c index 907893d91..6ad9610e4 100644 --- a/src/message.c +++ b/src/message.c @@ -1048,7 +1048,7 @@ process_message (struct worker_task *task) gint rc; tmp = memory_pool_alloc (task->task_pool, sizeof (GByteArray)); - tmp->data = task->msg->begin; + tmp->data = task->msg->str; tmp->len = task->msg->len; stream = g_mime_stream_mem_new_with_byte_array (tmp); diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index ba07acd2f..7913a1ea6 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -1020,10 +1020,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) session->state = STATE_WAIT; /* Allocate message from string */ - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = in->begin; - task->msg->len = in->len; - + task->msg = g_string_new_len (in->begin, in->len); saved = memory_pool_alloc0 (session->session_pool, sizeof (gint)); err = memory_pool_alloc0 (session->session_pool, sizeof (GError *)); diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index e9789b566..ece8a53fc 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -878,7 +878,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task, const gchar case REGEXP_MESSAGE: debug_task ("checking message regexp: %s", re->regexp_text); regexp = re->raw_regexp; - ct = task->msg->begin; + ct = task->msg->str; clen = task->msg->len; if (regexp_module_ctx->max_size != 0 && clen > regexp_module_ctx->max_size) { diff --git a/src/smtp.c b/src/smtp.c index 2cfddfe19..8ef68a675 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -310,12 +310,12 @@ process_smtp_data (struct smtp_session *session) session->task->resolver = session->resolver; session->task->fin_callback = smtp_write_socket; session->task->fin_arg = session; - session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t)); + session->task->msg = memory_pool_alloc (session->pool, sizeof (GString)); session->task->s = session->s; #ifdef HAVE_MMAP_NOCORE - if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) { + if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) { #else - if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) { + if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) { #endif msg_err ("mmap failed: %s", strerror (errno)); goto err; @@ -348,14 +348,14 @@ process_smtp_data (struct smtp_session *session) if (process_message (session->task) == -1) { msg_err ("cannot process message"); - munmap (session->task->msg->begin, st.st_size); + munmap (session->task->msg->str, st.st_size); goto err; } if (session->task->cfg->pre_filters == NULL) { r = process_filters (session->task); if (r == -1) { msg_err ("cannot process message"); - munmap (session->task->msg->begin, st.st_size); + munmap (session->task->msg->str, st.st_size); goto err; } } diff --git a/src/smtp_utils.c b/src/smtp_utils.c index 13e1617ce..ed7dfeb7e 100644 --- a/src/smtp_utils.c +++ b/src/smtp_utils.c @@ -36,8 +36,8 @@ free_smtp_session (gpointer arg) if (session) { if (session->task) { free_task (session->task, FALSE); - if (session->task->msg->begin) { - munmap (session->task->msg->begin, session->task->msg->len); + if (session->task->msg->str) { + munmap (session->task->msg->str, session->task->msg->len); } } if (session->rcpt) { diff --git a/src/webui.c b/src/webui.c index 24271e0ab..3553523f8 100644 --- a/src/webui.c +++ b/src/webui.c @@ -462,9 +462,7 @@ http_prepare_scan (struct evhttp_request *req, struct rspamd_webui_worker_ctx *c return NULL; } - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = EVBUFFER_DATA (in); - task->msg->len = EVBUFFER_LENGTH (in); + task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in)); task->resolver = ctx->resolver; task->ev_base = ctx->ev_base; @@ -628,9 +626,7 @@ http_prepare_learn (struct evhttp_request *req, struct rspamd_webui_worker_ctx * return NULL; } - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = EVBUFFER_DATA (in); - task->msg->len = EVBUFFER_LENGTH (in); + task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in)); task->resolver = ctx->resolver; task->ev_base = ctx->ev_base; diff --git a/src/worker.c b/src/worker.c index 3eb91a289..a36a0ed72 100644 --- a/src/worker.c +++ b/src/worker.c @@ -155,7 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg) return; } - +# if 0 /* * Callback that is called when there is data to read in buffer */ @@ -389,6 +389,7 @@ err_socket (GError * err, void *arg) g_error_free (err); destroy_session (task->s); } +#endif /* * Called if all filters are processed @@ -489,6 +490,104 @@ reduce_tasks_count (gpointer arg) (*tasks) --; } +static gboolean +rspamd_worker_body_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + const gchar *chunk, gsize len) +{ + struct worker_task *task = (struct worker_task *) conn->ud; + struct rspamd_worker_ctx *ctx; + ssize_t r; + GError *err = NULL; + + ctx = task->worker->ctx; + + if (msg->body->len == 0) { + msg_err ("got zero length body, cannot continue"); + return FALSE; + } + + task->msg = msg->body; + + debug_task ("got string of length %z", task->msg->len); + + r = process_message (task); + if (r == -1) { + msg_warn ("processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return FALSE; + } + if (task->cmd == CMD_OTHER) { + /* Skip filters */ + task->state = WRITE_REPLY; + return FALSE; + } + else if (task->cmd == CMD_LEARN) { + if (!learn_task (task->statfile, task, &err)) { + task->last_error = memory_pool_strdup (task->task_pool, err->message); + task->error_code = err->code; + g_error_free (err); + task->state = WRITE_ERROR; + } + else { + task->last_error = "learn ok"; + task->error_code = 0; + task->state = WRITE_REPLY; + } + return FALSE; + } + else { + if (task->cfg->pre_filters == NULL) { + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return FALSE; + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && ctx->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (ctx->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + } + } + if (task->is_skipped) { + /* Call write_socket to write reply and exit */ + return TRUE; + } + } + else { + lua_call_pre_filters (task); + /* We want fin_task after pre filters are processed */ + task->s->wanna_die = TRUE; + task->state = WAIT_PRE_FILTER; + check_session_pending (task->s); + } + } + return TRUE; +} + +static void +rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct worker_task *task = (struct worker_task *) conn->ud; + + msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); + destroy_session (task->s); +} + +static void +rspamd_worker_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + +} + /* * Accept new connection and construct task */ @@ -550,11 +649,15 @@ accept_socket (gint fd, short what, void *arg) new_task->resolver = ctx->resolver; msec_to_tv (ctx->timeout, &ctx->io_tv); +#if 0 /* Set up dispatcher */ new_task->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *) new_task); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; +#endif + new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler, + rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER); new_task->ev_base = ctx->ev_base; ctx->tasks ++; memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); @@ -562,6 +665,8 @@ accept_socket (gint fd, short what, void *arg) /* Set up async session */ new_task->s = new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); + + rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base); } gpointer |