aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-12 15:20:50 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-12 15:20:50 +0000
commit67b932ad9786743dc032ff2adc3788c5aadd3933 (patch)
tree084872b519dc54beef83cbdade01af8bb639eafd
parentd6b454eda97f6ffe8455ec83ab8da6293e7b9c07 (diff)
downloadrspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.tar.gz
rspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.zip
Start moving to HTTP world.
-rw-r--r--centos/rspamd.spec2
-rw-r--r--src/controller.c12
-rw-r--r--src/dkim.c4
-rw-r--r--src/http.c4
-rw-r--r--src/lua/lua_task.c5
-rw-r--r--src/main.h4
-rw-r--r--src/message.c2
-rw-r--r--src/plugins/fuzzy_check.c5
-rw-r--r--src/plugins/regexp.c2
-rw-r--r--src/smtp.c10
-rw-r--r--src/smtp_utils.c4
-rw-r--r--src/webui.c8
-rw-r--r--src/worker.c107
13 files changed, 132 insertions, 37 deletions
diff --git a/centos/rspamd.spec b/centos/rspamd.spec
index bf46c076a..595b8d7da 100644
--- a/centos/rspamd.spec
+++ b/centos/rspamd.spec
@@ -229,7 +229,7 @@ fi
%{rspamd_confdir}/lua/rspamd.classifiers.lua
%changelog
-* Fri Jan 10 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1
+* Fri Jan 10 2014 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1
- Update to 0.6.7.
* Fri Dec 27 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.6-1
diff --git a/src/controller.c b/src/controller.c
index d3f0f2855..a43b14203 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -1411,9 +1411,7 @@ controller_read_socket (f_str_t * in, void *arg)
session->learn_buf = in;
task = construct_task (session->worker);
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = in->begin;
- task->msg->len = in->len;
+ task->msg = g_string_new_len (in->begin, in->len);
task->ev_base = session->ev_base;
r = process_message (task);
@@ -1476,9 +1474,7 @@ controller_read_socket (f_str_t * in, void *arg)
session->learn_buf = in;
task = construct_task (session->worker);
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = in->begin;
- task->msg->len = in->len;
+ task->msg = g_string_new_len (in->begin, in->len);
task->resolver = session->resolver;
task->ev_base = session->ev_base;
@@ -1538,9 +1534,7 @@ controller_read_socket (f_str_t * in, void *arg)
session->learn_buf = in;
task = construct_task (session->worker);
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = in->begin;
- task->msg->len = in->len;
+ task->msg = g_string_new_len (in->begin, in->len);
task->ev_base = session->ev_base;
r = process_message (task);
diff --git a/src/dkim.c b/src/dkim.c
index bd57cd232..2cf685641 100644
--- a/src/dkim.c
+++ b/src/dkim.c
@@ -1356,9 +1356,9 @@ rspamd_dkim_check (rspamd_dkim_context_t *ctx, rspamd_dkim_key_t *key, struct wo
g_return_val_if_fail (task->msg != NULL, DKIM_ERROR);
/* First of all find place of body */
- p = task->msg->begin;
+ p = task->msg->str;
- end = task->msg->begin + task->msg->len;
+ end = task->msg->str + task->msg->len;
while (p <= end) {
/* Search for \r\n\r\n at the end of headers */
diff --git a/src/http.c b/src/http.c
index 721a74b13..2d76ed092 100644
--- a/src/http.c
+++ b/src/http.c
@@ -518,6 +518,7 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn)
err = g_error_new (HTTP_ERROR, errno, "IO write error: %s", strerror (errno));
conn->error_handler (conn, err);
g_error_free (err);
+ return;
}
else {
priv->wr_pos += r;
@@ -550,6 +551,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud)
err = g_error_new (HTTP_ERROR, errno, "IO read error: %s", strerror (errno));
conn->error_handler (conn, err);
g_error_free (err);
+ return;
}
else {
buf->len = r;
@@ -558,6 +560,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud)
"HTTP parser error: %s", http_errno_description (priv->parser.http_errno));
conn->error_handler (conn, err);
g_error_free (err);
+ return;
}
}
}
@@ -566,6 +569,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud)
"IO timeout");
conn->error_handler (conn, err);
g_error_free (err);
+ return;
}
else if (what == EV_WRITE) {
rspamd_http_write_helper (conn);
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index e653e25d1..a9e0d5433 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -283,10 +283,7 @@ lua_task_create_from_buffer (lua_State *L)
ptask = lua_newuserdata (L, sizeof (gpointer));
lua_setclass (L, "rspamd{task}", -1);
*ptask = task;
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = memory_pool_alloc (task->task_pool, len);
- memcpy (task->msg->begin, data, len);
- task->msg->len = len;
+ task->msg = g_string_new_len (data, len);
}
return 1;
}
diff --git a/src/main.h b/src/main.h
index ab8146cc4..81cdfb9f1 100644
--- a/src/main.h
+++ b/src/main.h
@@ -20,6 +20,7 @@
#include "util.h"
#include "logger.h"
#include "roll_history.h"
+#include "http.h"
/* Default values */
#define FIXED_CONFIG_FILE RSPAMD_CONFDIR "/rspamd.conf"
@@ -222,8 +223,9 @@ struct worker_task {
gchar *subject; /**< subject (for non-mime) */
gchar *hostname; /**< hostname reported by MTA */
gchar *statfile; /**< statfile for learning */
- f_str_t *msg; /**< message buffer */
+ GString *msg; /**< message buffer */
rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
+ struct rspamd_http_connection *http_conn; /**< HTTP server connection */
struct rspamd_async_session* s; /**< async session object */
gint parts_count; /**< mime parts count */
GMimeMessage *message; /**< message, parsed with GMime */
diff --git a/src/message.c b/src/message.c
index 907893d91..6ad9610e4 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1048,7 +1048,7 @@ process_message (struct worker_task *task)
gint rc;
tmp = memory_pool_alloc (task->task_pool, sizeof (GByteArray));
- tmp->data = task->msg->begin;
+ tmp->data = task->msg->str;
tmp->len = task->msg->len;
stream = g_mime_stream_mem_new_with_byte_array (tmp);
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index ba07acd2f..7913a1ea6 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -1020,10 +1020,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
session->state = STATE_WAIT;
/* Allocate message from string */
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = in->begin;
- task->msg->len = in->len;
-
+ task->msg = g_string_new_len (in->begin, in->len);
saved = memory_pool_alloc0 (session->session_pool, sizeof (gint));
err = memory_pool_alloc0 (session->session_pool, sizeof (GError *));
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index e9789b566..ece8a53fc 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -878,7 +878,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task, const gchar
case REGEXP_MESSAGE:
debug_task ("checking message regexp: %s", re->regexp_text);
regexp = re->raw_regexp;
- ct = task->msg->begin;
+ ct = task->msg->str;
clen = task->msg->len;
if (regexp_module_ctx->max_size != 0 && clen > regexp_module_ctx->max_size) {
diff --git a/src/smtp.c b/src/smtp.c
index 2cfddfe19..8ef68a675 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -310,12 +310,12 @@ process_smtp_data (struct smtp_session *session)
session->task->resolver = session->resolver;
session->task->fin_callback = smtp_write_socket;
session->task->fin_arg = session;
- session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t));
+ session->task->msg = memory_pool_alloc (session->pool, sizeof (GString));
session->task->s = session->s;
#ifdef HAVE_MMAP_NOCORE
- if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
+ if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
#else
- if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
+ if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
#endif
msg_err ("mmap failed: %s", strerror (errno));
goto err;
@@ -348,14 +348,14 @@ process_smtp_data (struct smtp_session *session)
if (process_message (session->task) == -1) {
msg_err ("cannot process message");
- munmap (session->task->msg->begin, st.st_size);
+ munmap (session->task->msg->str, st.st_size);
goto err;
}
if (session->task->cfg->pre_filters == NULL) {
r = process_filters (session->task);
if (r == -1) {
msg_err ("cannot process message");
- munmap (session->task->msg->begin, st.st_size);
+ munmap (session->task->msg->str, st.st_size);
goto err;
}
}
diff --git a/src/smtp_utils.c b/src/smtp_utils.c
index 13e1617ce..ed7dfeb7e 100644
--- a/src/smtp_utils.c
+++ b/src/smtp_utils.c
@@ -36,8 +36,8 @@ free_smtp_session (gpointer arg)
if (session) {
if (session->task) {
free_task (session->task, FALSE);
- if (session->task->msg->begin) {
- munmap (session->task->msg->begin, session->task->msg->len);
+ if (session->task->msg->str) {
+ munmap (session->task->msg->str, session->task->msg->len);
}
}
if (session->rcpt) {
diff --git a/src/webui.c b/src/webui.c
index 24271e0ab..3553523f8 100644
--- a/src/webui.c
+++ b/src/webui.c
@@ -462,9 +462,7 @@ http_prepare_scan (struct evhttp_request *req, struct rspamd_webui_worker_ctx *c
return NULL;
}
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = EVBUFFER_DATA (in);
- task->msg->len = EVBUFFER_LENGTH (in);
+ task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in));
task->resolver = ctx->resolver;
task->ev_base = ctx->ev_base;
@@ -628,9 +626,7 @@ http_prepare_learn (struct evhttp_request *req, struct rspamd_webui_worker_ctx *
return NULL;
}
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = EVBUFFER_DATA (in);
- task->msg->len = EVBUFFER_LENGTH (in);
+ task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in));
task->resolver = ctx->resolver;
task->ev_base = ctx->ev_base;
diff --git a/src/worker.c b/src/worker.c
index 3eb91a289..a36a0ed72 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -155,7 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg)
return;
}
-
+# if 0
/*
* Callback that is called when there is data to read in buffer
*/
@@ -389,6 +389,7 @@ err_socket (GError * err, void *arg)
g_error_free (err);
destroy_session (task->s);
}
+#endif
/*
* Called if all filters are processed
@@ -489,6 +490,104 @@ reduce_tasks_count (gpointer arg)
(*tasks) --;
}
+static gboolean
+rspamd_worker_body_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg,
+ const gchar *chunk, gsize len)
+{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+ struct rspamd_worker_ctx *ctx;
+ ssize_t r;
+ GError *err = NULL;
+
+ ctx = task->worker->ctx;
+
+ if (msg->body->len == 0) {
+ msg_err ("got zero length body, cannot continue");
+ return FALSE;
+ }
+
+ task->msg = msg->body;
+
+ debug_task ("got string of length %z", task->msg->len);
+
+ r = process_message (task);
+ if (r == -1) {
+ msg_warn ("processing of message failed");
+ task->last_error = "MIME processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ return FALSE;
+ }
+ if (task->cmd == CMD_OTHER) {
+ /* Skip filters */
+ task->state = WRITE_REPLY;
+ return FALSE;
+ }
+ else if (task->cmd == CMD_LEARN) {
+ if (!learn_task (task->statfile, task, &err)) {
+ task->last_error = memory_pool_strdup (task->task_pool, err->message);
+ task->error_code = err->code;
+ g_error_free (err);
+ task->state = WRITE_ERROR;
+ }
+ else {
+ task->last_error = "learn ok";
+ task->error_code = 0;
+ task->state = WRITE_REPLY;
+ }
+ return FALSE;
+ }
+ else {
+ if (task->cfg->pre_filters == NULL) {
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ return FALSE;
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ }
+ }
+ if (task->is_skipped) {
+ /* Call write_socket to write reply and exit */
+ return TRUE;
+ }
+ }
+ else {
+ lua_call_pre_filters (task);
+ /* We want fin_task after pre filters are processed */
+ task->s->wanna_die = TRUE;
+ task->state = WAIT_PRE_FILTER;
+ check_session_pending (task->s);
+ }
+ }
+ return TRUE;
+}
+
+static void
+rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+
+ msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
+ destroy_session (task->s);
+}
+
+static void
+rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+
+}
+
/*
* Accept new connection and construct task
*/
@@ -550,11 +649,15 @@ accept_socket (gint fd, short what, void *arg)
new_task->resolver = ctx->resolver;
msec_to_tv (ctx->timeout, &ctx->io_tv);
+#if 0
/* Set up dispatcher */
new_task->dispatcher =
rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket,
err_socket, &ctx->io_tv, (void *) new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+#endif
+ new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler,
+ rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER);
new_task->ev_base = ctx->ev_base;
ctx->tasks ++;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
@@ -562,6 +665,8 @@ accept_socket (gint fd, short what, void *arg)
/* Set up async session */
new_task->s =
new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
+
+ rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
}
gpointer