@@ -229,7 +229,7 @@ fi | |||
%{rspamd_confdir}/lua/rspamd.classifiers.lua | |||
%changelog | |||
* Fri Jan 10 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1 | |||
* Fri Jan 10 2014 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.7-1 | |||
- Update to 0.6.7. | |||
* Fri Dec 27 2013 Vsevolod Stakhov <vsevolod-at-highsecure.ru> 0.6.6-1 |
@@ -1411,9 +1411,7 @@ controller_read_socket (f_str_t * in, void *arg) | |||
session->learn_buf = in; | |||
task = construct_task (session->worker); | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = in->begin; | |||
task->msg->len = in->len; | |||
task->msg = g_string_new_len (in->begin, in->len); | |||
task->ev_base = session->ev_base; | |||
r = process_message (task); | |||
@@ -1476,9 +1474,7 @@ controller_read_socket (f_str_t * in, void *arg) | |||
session->learn_buf = in; | |||
task = construct_task (session->worker); | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = in->begin; | |||
task->msg->len = in->len; | |||
task->msg = g_string_new_len (in->begin, in->len); | |||
task->resolver = session->resolver; | |||
task->ev_base = session->ev_base; | |||
@@ -1538,9 +1534,7 @@ controller_read_socket (f_str_t * in, void *arg) | |||
session->learn_buf = in; | |||
task = construct_task (session->worker); | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = in->begin; | |||
task->msg->len = in->len; | |||
task->msg = g_string_new_len (in->begin, in->len); | |||
task->ev_base = session->ev_base; | |||
r = process_message (task); |
@@ -1356,9 +1356,9 @@ rspamd_dkim_check (rspamd_dkim_context_t *ctx, rspamd_dkim_key_t *key, struct wo | |||
g_return_val_if_fail (task->msg != NULL, DKIM_ERROR); | |||
/* First of all find place of body */ | |||
p = task->msg->begin; | |||
p = task->msg->str; | |||
end = task->msg->begin + task->msg->len; | |||
end = task->msg->str + task->msg->len; | |||
while (p <= end) { | |||
/* Search for \r\n\r\n at the end of headers */ |
@@ -518,6 +518,7 @@ rspamd_http_write_helper (struct rspamd_http_connection *conn) | |||
err = g_error_new (HTTP_ERROR, errno, "IO write error: %s", strerror (errno)); | |||
conn->error_handler (conn, err); | |||
g_error_free (err); | |||
return; | |||
} | |||
else { | |||
priv->wr_pos += r; | |||
@@ -550,6 +551,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) | |||
err = g_error_new (HTTP_ERROR, errno, "IO read error: %s", strerror (errno)); | |||
conn->error_handler (conn, err); | |||
g_error_free (err); | |||
return; | |||
} | |||
else { | |||
buf->len = r; | |||
@@ -558,6 +560,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) | |||
"HTTP parser error: %s", http_errno_description (priv->parser.http_errno)); | |||
conn->error_handler (conn, err); | |||
g_error_free (err); | |||
return; | |||
} | |||
} | |||
} | |||
@@ -566,6 +569,7 @@ rspamd_http_event_handler (int fd, short what, gpointer ud) | |||
"IO timeout"); | |||
conn->error_handler (conn, err); | |||
g_error_free (err); | |||
return; | |||
} | |||
else if (what == EV_WRITE) { | |||
rspamd_http_write_helper (conn); |
@@ -283,10 +283,7 @@ lua_task_create_from_buffer (lua_State *L) | |||
ptask = lua_newuserdata (L, sizeof (gpointer)); | |||
lua_setclass (L, "rspamd{task}", -1); | |||
*ptask = task; | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = memory_pool_alloc (task->task_pool, len); | |||
memcpy (task->msg->begin, data, len); | |||
task->msg->len = len; | |||
task->msg = g_string_new_len (data, len); | |||
} | |||
return 1; | |||
} |
@@ -20,6 +20,7 @@ | |||
#include "util.h" | |||
#include "logger.h" | |||
#include "roll_history.h" | |||
#include "http.h" | |||
/* Default values */ | |||
#define FIXED_CONFIG_FILE RSPAMD_CONFDIR "/rspamd.conf" | |||
@@ -222,8 +223,9 @@ struct worker_task { | |||
gchar *subject; /**< subject (for non-mime) */ | |||
gchar *hostname; /**< hostname reported by MTA */ | |||
gchar *statfile; /**< statfile for learning */ | |||
f_str_t *msg; /**< message buffer */ | |||
GString *msg; /**< message buffer */ | |||
rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */ | |||
struct rspamd_http_connection *http_conn; /**< HTTP server connection */ | |||
struct rspamd_async_session* s; /**< async session object */ | |||
gint parts_count; /**< mime parts count */ | |||
GMimeMessage *message; /**< message, parsed with GMime */ |
@@ -1048,7 +1048,7 @@ process_message (struct worker_task *task) | |||
gint rc; | |||
tmp = memory_pool_alloc (task->task_pool, sizeof (GByteArray)); | |||
tmp->data = task->msg->begin; | |||
tmp->data = task->msg->str; | |||
tmp->len = task->msg->len; | |||
stream = g_mime_stream_mem_new_with_byte_array (tmp); |
@@ -1020,10 +1020,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) | |||
session->state = STATE_WAIT; | |||
/* Allocate message from string */ | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = in->begin; | |||
task->msg->len = in->len; | |||
task->msg = g_string_new_len (in->begin, in->len); | |||
saved = memory_pool_alloc0 (session->session_pool, sizeof (gint)); | |||
err = memory_pool_alloc0 (session->session_pool, sizeof (GError *)); |
@@ -878,7 +878,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task, const gchar | |||
case REGEXP_MESSAGE: | |||
debug_task ("checking message regexp: %s", re->regexp_text); | |||
regexp = re->raw_regexp; | |||
ct = task->msg->begin; | |||
ct = task->msg->str; | |||
clen = task->msg->len; | |||
if (regexp_module_ctx->max_size != 0 && clen > regexp_module_ctx->max_size) { |
@@ -310,12 +310,12 @@ process_smtp_data (struct smtp_session *session) | |||
session->task->resolver = session->resolver; | |||
session->task->fin_callback = smtp_write_socket; | |||
session->task->fin_arg = session; | |||
session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t)); | |||
session->task->msg = memory_pool_alloc (session->pool, sizeof (GString)); | |||
session->task->s = session->s; | |||
#ifdef HAVE_MMAP_NOCORE | |||
if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) { | |||
if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) { | |||
#else | |||
if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) { | |||
if ((session->task->msg->str = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) { | |||
#endif | |||
msg_err ("mmap failed: %s", strerror (errno)); | |||
goto err; | |||
@@ -348,14 +348,14 @@ process_smtp_data (struct smtp_session *session) | |||
if (process_message (session->task) == -1) { | |||
msg_err ("cannot process message"); | |||
munmap (session->task->msg->begin, st.st_size); | |||
munmap (session->task->msg->str, st.st_size); | |||
goto err; | |||
} | |||
if (session->task->cfg->pre_filters == NULL) { | |||
r = process_filters (session->task); | |||
if (r == -1) { | |||
msg_err ("cannot process message"); | |||
munmap (session->task->msg->begin, st.st_size); | |||
munmap (session->task->msg->str, st.st_size); | |||
goto err; | |||
} | |||
} |
@@ -36,8 +36,8 @@ free_smtp_session (gpointer arg) | |||
if (session) { | |||
if (session->task) { | |||
free_task (session->task, FALSE); | |||
if (session->task->msg->begin) { | |||
munmap (session->task->msg->begin, session->task->msg->len); | |||
if (session->task->msg->str) { | |||
munmap (session->task->msg->str, session->task->msg->len); | |||
} | |||
} | |||
if (session->rcpt) { |
@@ -462,9 +462,7 @@ http_prepare_scan (struct evhttp_request *req, struct rspamd_webui_worker_ctx *c | |||
return NULL; | |||
} | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = EVBUFFER_DATA (in); | |||
task->msg->len = EVBUFFER_LENGTH (in); | |||
task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in)); | |||
task->resolver = ctx->resolver; | |||
task->ev_base = ctx->ev_base; | |||
@@ -628,9 +626,7 @@ http_prepare_learn (struct evhttp_request *req, struct rspamd_webui_worker_ctx * | |||
return NULL; | |||
} | |||
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); | |||
task->msg->begin = EVBUFFER_DATA (in); | |||
task->msg->len = EVBUFFER_LENGTH (in); | |||
task->msg = g_string_new_len (EVBUFFER_DATA (in), EVBUFFER_LENGTH (in)); | |||
task->resolver = ctx->resolver; | |||
task->ev_base = ctx->ev_base; |
@@ -155,7 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg) | |||
return; | |||
} | |||
# if 0 | |||
/* | |||
* Callback that is called when there is data to read in buffer | |||
*/ | |||
@@ -389,6 +389,7 @@ err_socket (GError * err, void *arg) | |||
g_error_free (err); | |||
destroy_session (task->s); | |||
} | |||
#endif | |||
/* | |||
* Called if all filters are processed | |||
@@ -489,6 +490,104 @@ reduce_tasks_count (gpointer arg) | |||
(*tasks) --; | |||
} | |||
static gboolean | |||
rspamd_worker_body_handler (struct rspamd_http_connection *conn, | |||
struct rspamd_http_message *msg, | |||
const gchar *chunk, gsize len) | |||
{ | |||
struct worker_task *task = (struct worker_task *) conn->ud; | |||
struct rspamd_worker_ctx *ctx; | |||
ssize_t r; | |||
GError *err = NULL; | |||
ctx = task->worker->ctx; | |||
if (msg->body->len == 0) { | |||
msg_err ("got zero length body, cannot continue"); | |||
return FALSE; | |||
} | |||
task->msg = msg->body; | |||
debug_task ("got string of length %z", task->msg->len); | |||
r = process_message (task); | |||
if (r == -1) { | |||
msg_warn ("processing of message failed"); | |||
task->last_error = "MIME processing error"; | |||
task->error_code = RSPAMD_FILTER_ERROR; | |||
task->state = WRITE_ERROR; | |||
return FALSE; | |||
} | |||
if (task->cmd == CMD_OTHER) { | |||
/* Skip filters */ | |||
task->state = WRITE_REPLY; | |||
return FALSE; | |||
} | |||
else if (task->cmd == CMD_LEARN) { | |||
if (!learn_task (task->statfile, task, &err)) { | |||
task->last_error = memory_pool_strdup (task->task_pool, err->message); | |||
task->error_code = err->code; | |||
g_error_free (err); | |||
task->state = WRITE_ERROR; | |||
} | |||
else { | |||
task->last_error = "learn ok"; | |||
task->error_code = 0; | |||
task->state = WRITE_REPLY; | |||
} | |||
return FALSE; | |||
} | |||
else { | |||
if (task->cfg->pre_filters == NULL) { | |||
r = process_filters (task); | |||
if (r == -1) { | |||
task->last_error = "Filter processing error"; | |||
task->error_code = RSPAMD_FILTER_ERROR; | |||
task->state = WRITE_ERROR; | |||
return FALSE; | |||
} | |||
/* Add task to classify to classify pool */ | |||
if (!task->is_skipped && ctx->classify_pool) { | |||
register_async_thread (task->s); | |||
g_thread_pool_push (ctx->classify_pool, task, &err); | |||
if (err != NULL) { | |||
msg_err ("cannot pull task to the pool: %s", err->message); | |||
remove_async_thread (task->s); | |||
} | |||
} | |||
if (task->is_skipped) { | |||
/* Call write_socket to write reply and exit */ | |||
return TRUE; | |||
} | |||
} | |||
else { | |||
lua_call_pre_filters (task); | |||
/* We want fin_task after pre filters are processed */ | |||
task->s->wanna_die = TRUE; | |||
task->state = WAIT_PRE_FILTER; | |||
check_session_pending (task->s); | |||
} | |||
} | |||
return TRUE; | |||
} | |||
static void | |||
rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) | |||
{ | |||
struct worker_task *task = (struct worker_task *) conn->ud; | |||
msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); | |||
destroy_session (task->s); | |||
} | |||
static void | |||
rspamd_worker_finish_handler (struct rspamd_http_connection *conn, | |||
struct rspamd_http_message *msg) | |||
{ | |||
} | |||
/* | |||
* Accept new connection and construct task | |||
*/ | |||
@@ -550,11 +649,15 @@ accept_socket (gint fd, short what, void *arg) | |||
new_task->resolver = ctx->resolver; | |||
msec_to_tv (ctx->timeout, &ctx->io_tv); | |||
#if 0 | |||
/* Set up dispatcher */ | |||
new_task->dispatcher = | |||
rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket, | |||
err_socket, &ctx->io_tv, (void *) new_task); | |||
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; | |||
#endif | |||
new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler, | |||
rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER); | |||
new_task->ev_base = ctx->ev_base; | |||
ctx->tasks ++; | |||
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); | |||
@@ -562,6 +665,8 @@ accept_socket (gint fd, short what, void *arg) | |||
/* Set up async session */ | |||
new_task->s = | |||
new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); | |||
rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base); | |||
} | |||
gpointer |