aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt1
-rw-r--r--src/buffer.c71
-rw-r--r--src/buffer.h6
-rw-r--r--src/controller.c60
-rw-r--r--src/events.c173
-rw-r--r--src/events.h39
-rw-r--r--src/lmtp.c11
-rw-r--r--src/lmtp_proto.c20
-rw-r--r--src/main.h3
-rw-r--r--src/plugins/fuzzy_check.c58
-rw-r--r--src/plugins/surbl.c62
-rw-r--r--src/worker.c34
12 files changed, 389 insertions, 149 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 390b1bbfc..f43f8bab5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -360,6 +360,7 @@ SET(RSPAMDSRC src/modules.c
src/controller.c
src/cfg_utils.c
src/buffer.c
+ src/events.c
src/html.c
src/lmtp.c
src/lmtp_proto.c
diff --git a/src/buffer.c b/src/buffer.c
index 7c52da10d..33d0904f4 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -38,13 +38,12 @@ dispatcher_error_quark (void)
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
-static void
-write_buffers (int fd, rspamd_io_dispatcher_t *d)
+static gboolean
+write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
{
GList *cur;
GError *err;
rspamd_buffer_t *buf;
- struct timeval *ntv;
ssize_t r;
/* Fix order */
@@ -64,7 +63,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
d->err_callback (err, d->user_data);
- return;
+ return FALSE;
}
}
else if (r > 0) {
@@ -80,7 +79,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
d->err_callback (err, d->user_data);
- return;
+ return FALSE;
}
}
else if (r == -1 && errno == EAGAIN) {
@@ -88,10 +87,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
- return;
+ event_add (d->ev, d->tv);
+ return TRUE;
}
cur = g_list_next (cur);
}
@@ -103,29 +100,25 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
msg_debug ("write_buffers: all buffers were written successfully");
- if (d->write_callback) {
- d->write_callback (d->user_data);
- if (d->wanna_die) {
+ if (is_delayed && d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
msg_debug ("write_buffers: callback set wanna_die flag, terminating");
- rspamd_remove_dispatcher (d);
- return;
+ return FALSE;
}
}
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
else {
/* Plan other write event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
+
+ return TRUE;
}
static void
@@ -138,7 +131,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
char **pos;
size_t *len;
enum io_policy saved_policy;
-
+
+ if (d->wanna_die) {
+ rspamd_remove_dispatcher (d);
+ return;
+ }
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -208,10 +205,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
res.len --;
}
if (d->read_callback) {
- d->read_callback (&res, d->user_data);
- if (d->wanna_die) {
- msg_debug ("read_buffers: callback set wanna_die flag, terminating");
- rspamd_remove_dispatcher (d);
+ if (!d->read_callback (&res, d->user_data)) {
return;
}
/* Move remaining string to begin of buffer (draining) */
@@ -239,7 +233,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
res.len = r;
c = b + r;
if (d->read_callback) {
- d->read_callback (&res, d->user_data);
+ if (!d->read_callback (&res, d->user_data)) {
+ return;
+ }
/* Move remaining string to begin of buffer (draining) */
memmove (d->in_buf->data->begin, c, *len - r);
b = d->in_buf->data->begin;
@@ -264,7 +260,6 @@ dispatcher_cb (int fd, short what, void *arg)
{
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
GError *err;
- struct timeval *ntv;
msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
@@ -280,12 +275,11 @@ dispatcher_cb (int fd, short what, void *arg)
if (d->out_buffers == NULL) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
else {
- write_buffers (fd, d);
+ /* Delayed write */
+ write_buffers (fd, d, TRUE);
}
break;
case EV_READ:
@@ -303,7 +297,6 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
struct timeval *tv, void *user_data)
{
rspamd_io_dispatcher_t *new;
- struct timeval *ntv;
if (fd == -1) {
return NULL;
@@ -331,9 +324,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
new->fd = fd;
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
- ntv = memory_pool_alloc (new->pool, sizeof (struct timeval));
- memcpy (ntv, new->tv, sizeof (struct timeval));
- event_add (new->ev, ntv);
+ event_add (new->ev, new->tv);
return new;
}
@@ -388,13 +379,12 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
}
-void
+gboolean
rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated)
{
rspamd_buffer_t *newbuf;
- struct timeval *ntv;
newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
if (!allocated) {
@@ -416,12 +406,9 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
if (!delayed) {
msg_debug ("rspamd_dispatcher_write: plan write event");
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ return write_buffers (d->fd, d, FALSE);
}
+ return TRUE;
}
void
diff --git a/src/buffer.h b/src/buffer.h
index 458ea32de..d3410ebb8 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -10,8 +10,8 @@
#include "mem_pool.h"
#include "fstring.h"
-typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
-typedef void (*dispatcher_write_callback_t)(void *user_data);
+typedef gboolean (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
+typedef gboolean (*dispatcher_write_callback_t)(void *user_data);
typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
/**
@@ -81,7 +81,7 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
* @param data data to write
* @param len length of data
*/
-void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated);
diff --git a/src/controller.c b/src/controller.c
index 0aaa8bd99..dd1f4a91b 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -84,6 +84,8 @@ static time_t start_time;
static char greetingbuf[1024];
extern rspamd_hash_t *counters;
+static gboolean controller_write_socket (void *arg);
+
static
void sig_handler (int signo)
{
@@ -119,12 +121,13 @@ completion_func (gpointer elem)
}
static void
-free_session (struct controller_session *session, gboolean is_soft)
+free_session (void *ud)
{
GList *part;
struct mime_part *p;
+ struct controller_session *session = ud;
- msg_debug ("free_session: freeing session %p", session);
+ msg_info ("free_session: freeing session %p", session);
while ((part = g_list_first (session->parts))) {
session->parts = g_list_remove_link (session->parts, part);
@@ -132,13 +135,7 @@ free_session (struct controller_session *session, gboolean is_soft)
g_byte_array_free (p->content, FALSE);
g_list_free_1 (part);
}
- if (is_soft) {
- /* Plan dispatcher shutdown */
- session->dispatcher->wanna_die = 1;
- }
- else {
- rspamd_remove_dispatcher (session->dispatcher);
- }
+ rspamd_remove_dispatcher (session->dispatcher);
close (session->sock);
@@ -397,7 +394,7 @@ process_custom_command (char *line, char **cmd_args, struct controller_session *
return FALSE;
}
-static void
+static gboolean
controller_read_socket (f_str_t *in, void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
@@ -426,13 +423,17 @@ controller_read_socket (f_str_t *in, void *arg)
if (!process_custom_command (cmd, &params[1], session)) {
msg_debug ("Unknown command: '%s'", cmd);
i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
}
break;
default:
msg_debug ("Ambigious command: '%s'", cmd);
i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
break;
}
}
@@ -440,7 +441,9 @@ controller_read_socket (f_str_t *in, void *arg)
session->state = STATE_REPLY;
}
if (session->state != STATE_LEARN && session->state != STATE_OTHER) {
- rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE);
+ if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) {
+ return FALSE;
+ }
}
g_strfreev (params);
@@ -454,17 +457,17 @@ controller_read_socket (f_str_t *in, void *arg)
if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer,
session->session_pool, &c, &tokens)) {
i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
session->state = STATE_REPLY;
- return;
+ return TRUE;
}
}
cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
session->learn_symbol, tokens, session->in_class);
session->worker->srv->stat->messages_learned ++;
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
/* Clean learned parts */
while ((cur = g_list_first (session->parts))) {
@@ -474,6 +477,11 @@ controller_read_socket (f_str_t *in, void *arg)
g_list_free_1 (cur);
}
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+ return FALSE;
+ }
+
session->state = STATE_REPLY;
break;
case STATE_OTHER:
@@ -489,22 +497,29 @@ controller_read_socket (f_str_t *in, void *arg)
msg_debug ("controller_read_socket: unknown state while reading %d", session->state);
break;
}
+
+ if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
+ (void)controller_write_socket (session);
+ }
+
+ return TRUE;
}
-static void
+static gboolean
controller_write_socket (void *arg)
{
struct controller_session *session = (struct controller_session *)arg;
if (session->state == STATE_QUIT) {
/* Free buffers */
- free_session (session, TRUE);
- return;
+ destroy_session (session->s);
+ return FALSE;
}
else if (session->state == STATE_REPLY) {
session->state = STATE_COMMAND;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
}
+ return TRUE;
}
static void
@@ -515,8 +530,9 @@ controller_err_socket (GError *err, void *arg)
if (err->code != EOF) {
msg_info ("controller_err_socket: abnormally closing control connection, error: %s", err->message);
}
+
/* Free buffers */
- free_session (session, FALSE);
+ destroy_session (session->s);
}
static void
@@ -552,6 +568,8 @@ accept_socket (int fd, short what, void *arg)
io_tv->tv_sec = CONTROLLER_IO_TIMEOUT;
io_tv->tv_usec = 0;
+ new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+
new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv,
(void *)new_session);
diff --git a/src/events.c b/src/events.c
new file mode 100644
index 000000000..9092e85c9
--- /dev/null
+++ b/src/events.c
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "events.h"
+
+
+struct rspamd_async_session*
+new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data)
+{
+ struct rspamd_async_session *new;
+
+ new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
+ new->pool = pool;
+ new->fin = fin;
+ new->user_data = user_data;
+ new->wanna_die = FALSE;
+ new->events = g_queue_new ();
+
+ memory_pool_add_destructor (pool, (pool_destruct_func)g_queue_free, new->events);
+
+ return new;
+}
+
+void
+register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
+{
+ struct rspamd_async_event *new, *ev;
+ GList *cur;
+
+ if (session == NULL) {
+ msg_info ("register_async_event: session is NULL");
+ return;
+ }
+
+ if (forced) {
+ /* For forced events try first to increase its reference */
+ cur = session->events->head;
+ while (cur) {
+ ev = cur->data;
+ if (ev->forced && ev->fin == fin) {
+ ev->ref ++;
+ return;
+ }
+ cur = g_list_next (cur);
+ }
+ }
+
+ new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
+ new->fin = fin;
+ new->user_data = user_data;
+ new->forced = forced;
+ new->ref = 1;
+ g_queue_push_head (session->events, new);
+}
+
+void
+remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin)
+{
+ struct rspamd_async_event *ev;
+ GList *cur;
+
+ if (session == NULL) {
+ msg_info ("remove_forced_event: session is NULL");
+ return;
+ }
+
+ cur = session->events->head;
+ while (cur) {
+ ev = cur->data;
+ if (ev->forced && ev->fin == fin) {
+ ev->ref --;
+ if (ev->ref == 0) {
+ g_queue_delete_link (session->events, cur);
+ }
+ break;
+ }
+ cur = g_list_next (cur);
+ }
+
+ if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+ /* Call session destroy after all forced events are ready */
+ session->fin (session->user_data);
+ }
+}
+
+void
+remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
+{
+ struct rspamd_async_event *ev;
+ GList *cur;
+
+ if (session == NULL) {
+ msg_info ("remove_forced_event: session is NULL");
+ return;
+ }
+
+ cur = session->events->head;
+ while (cur) {
+ ev = cur->data;
+ if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
+ g_queue_delete_link (session->events, cur);
+ if (ev->fin) {
+ ev->fin (ev->user_data);
+ }
+ break;
+ }
+ cur = g_list_next (cur);
+ }
+}
+
+gboolean
+destroy_session (struct rspamd_async_session *session)
+{
+ struct rspamd_async_event *ev;
+ GList *cur, *tmp;
+
+ if (session == NULL) {
+ msg_info ("destroy_session: session is NULL");
+ return FALSE;
+ }
+
+ session->wanna_die = TRUE;
+
+ cur = session->events->head;
+
+ while (cur) {
+ ev = cur->data;
+ if (!ev->forced) {
+ if (ev->fin != NULL) {
+ ev->fin (ev->user_data);
+ }
+ tmp = cur;
+ cur = g_list_next (cur);
+ g_queue_delete_link (session->events, tmp);
+ }
+ else {
+ /* Do nothing with forced callbacks */
+ cur = g_list_next (cur);
+ }
+ }
+
+ if (g_queue_get_length (session->events) == 0) {
+ if (session->fin != NULL) {
+ session->fin (session->user_data);
+ }
+ return TRUE;
+ }
+
+ return FALSE;
+}
diff --git a/src/events.h b/src/events.h
new file mode 100644
index 000000000..3715b4d66
--- /dev/null
+++ b/src/events.h
@@ -0,0 +1,39 @@
+#ifndef RSPAMD_EVENTS_H
+#define RSPAMD_EVENTS_H
+
+#include "config.h"
+
+struct rspamd_async_event;
+
+typedef void (*event_finalizer_t)(void *user_data);
+
+struct rspamd_async_event {
+ event_finalizer_t fin;
+ void *user_data;
+ gboolean forced;
+ guint ref;
+};
+
+struct rspamd_async_session {
+ event_finalizer_t fin;
+ GQueue *events;
+ void *user_data;
+ memory_pool_t *pool;
+ gboolean wanna_die;
+};
+
+/* Makes new async session */
+struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data);
+/* Insert event into session */
+void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced);
+/* Must be called by forced events to call session destructor properly */
+void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
+void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud);
+
+/**
+ * Must be called at the end of session, it calls fin functions for all non-forced callbacks
+ * @return true if the whole session was destroyed and false if there are forced events
+ */
+gboolean destroy_session (struct rspamd_async_session *session);
+
+#endif /* RSPAMD_EVENTS_H */
diff --git a/src/lmtp.c b/src/lmtp.c
index c3d5ab344..3becb1dfe 100644
--- a/src/lmtp.c
+++ b/src/lmtp.c
@@ -36,7 +36,7 @@
static char greetingbuf[1024];
static struct timeval io_tv;
-static void lmtp_write_socket (void *arg);
+static gboolean lmtp_write_socket (void *arg);
static
void sig_handler (int signo)
@@ -121,7 +121,7 @@ free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
/*
* Callback that is called when there is data to read in buffer
*/
-static void
+static gboolean
lmtp_read_socket (f_str_t *in, void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
@@ -163,12 +163,14 @@ lmtp_read_socket (f_str_t *in, void *arg)
msg_debug ("lmtp_read_socket: invalid state while reading from socket %d", lmtp->task->state);
break;
}
+
+ return TRUE;
}
/*
* Callback for socket writing
*/
-static void
+static gboolean
lmtp_write_socket (void *arg)
{
struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
@@ -189,11 +191,14 @@ lmtp_write_socket (void *arg)
case CLOSING_CONNECTION:
msg_debug ("lmtp_write_socket: normally closing connection");
free_lmtp_task (lmtp, TRUE);
+ return FALSE;
break;
default:
msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state);
break;
}
+
+ return TRUE;
}
/*
diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c
index 62967c03b..7fbc38684 100644
--- a/src/lmtp_proto.c
+++ b/src/lmtp_proto.c
@@ -293,13 +293,13 @@ close_mta_connection (struct mta_callback_data *cd, gboolean is_success)
else {
out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure");
}
- cd->dispatcher->wanna_die = TRUE;
+ rspamd_remove_dispatcher (cd->dispatcher);
}
/*
* Callback that is called when there is data to read in buffer
*/
-static void
+static gboolean
mta_read_socket (f_str_t *in, void *arg)
{
struct mta_callback_data *cd = (struct mta_callback_data *)arg;
@@ -317,7 +317,7 @@ mta_read_socket (f_str_t *in, void *arg)
if (fstrstr (in, &contres1) != -1 || fstrstr (in, &contres2) != -1) {
/* Skip such lines */
- return;
+ return TRUE;
}
switch (cd->state) {
@@ -325,7 +325,7 @@ mta_read_socket (f_str_t *in, void *arg)
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad greeting");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
hostbuf = alloca (hostmax);
@@ -344,7 +344,7 @@ mta_read_socket (f_str_t *in, void *arg)
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad helo");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
r = snprintf (outbuf, sizeof (outbuf), "MAIL FROM: <%s>" CRLF, cd->task->from);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
@@ -354,7 +354,7 @@ mta_read_socket (f_str_t *in, void *arg)
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad mail from");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
cur = g_list_first (cd->task->rcpt);
r = 0;
@@ -370,7 +370,7 @@ mta_read_socket (f_str_t *in, void *arg)
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad rcpt");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
@@ -380,7 +380,7 @@ mta_read_socket (f_str_t *in, void *arg)
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: got bad data");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
c = g_mime_object_to_string ((GMimeObject *)cd->task->message);
r = strlen (c);
@@ -393,11 +393,13 @@ mta_read_socket (f_str_t *in, void *arg)
if (!parse_mta_str (in, cd)) {
msg_warn ("mta_read_socket: message not delivered");
close_mta_connection (cd, FALSE);
- return;
+ return FALSE;
}
close_mta_connection (cd, TRUE);
break;
}
+
+ return TRUE;
}
/*
diff --git a/src/main.h b/src/main.h
index d31f09943..cb922252c 100644
--- a/src/main.h
+++ b/src/main.h
@@ -16,6 +16,7 @@
#include "filter.h"
#include "buffer.h"
#include "hash.h"
+#include "events.h"
#include "util.h"
/* Default values */
@@ -147,6 +148,7 @@ struct controller_session {
void (*other_handler)(struct controller_session *session,
f_str_t *in); /**< other command handler to execute at the end of processing */
void *other_data; /**< and its data */
+ struct rspamd_async_session* s; /**< async session object */
};
typedef void (*controller_func_t)(char **args, struct controller_session *session);
@@ -181,6 +183,7 @@ struct worker_task {
char *user; /**< user to deliver */
f_str_t *msg; /**< message buffer */
rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
+ struct rspamd_async_session* s; /**< async session object */
memcached_ctx_t *memc_ctx; /**< memcached context associated with task */
int parts_count; /**< mime parts count */
GMimeMessage *message; /**< message, parsed with GMime */
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index cd3641654..4001a9744 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -219,6 +219,20 @@ fuzzy_check_module_reconfig (struct config_file *cfg)
}
static void
+fuzzy_io_fin (void *ud)
+{
+ struct fuzzy_client_session *session = ud;
+
+ event_del (&session->ev);
+ session->task->save.saved --;
+ if (session->task->save.saved == 0) {
+ /* Call other filters */
+ session->task->save.saved = 1;
+ process_filters (session->task);
+ }
+}
+
+static void
fuzzy_io_callback (int fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
@@ -254,23 +268,22 @@ fuzzy_io_callback (int fd, short what, void *arg)
msg_err ("fuzzy_io_callback: got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port,
errno, strerror (errno));
ok:
- event_del (&session->ev);
close (fd);
- session->task->save.saved --;
- if (session->task->save.saved == 0) {
- /* Call other filters */
- session->task->save.saved = 1;
- process_filters (session->task);
- }
+ remove_normal_event (session->task->s, fuzzy_io_fin, session);
}
static void
-fuzzy_free_session (void *arg)
+fuzzy_learn_fin (void *arg)
{
struct fuzzy_learn_session *session = arg;
event_del (&session->ev);
+ (*session->saved) --;
+ if (*session->saved == 0) {
+ session->session->state = STATE_REPLY;
+ rspamd_dispatcher_write (session->session->dispatcher, "OK" CRLF, sizeof ("OK" CRLF) - 1, FALSE, FALSE);
+ }
}
static void
@@ -279,7 +292,6 @@ fuzzy_learn_callback (int fd, short what, void *arg)
struct fuzzy_learn_session *session = arg;
struct fuzzy_cmd cmd;
char buf[sizeof ("ERR" CRLF)];
- int r;
if (what == EV_WRITE) {
/* Send command to storage */
@@ -308,12 +320,7 @@ fuzzy_learn_callback (int fd, short what, void *arg)
session->server->port, errno, strerror (errno));
ok:
close (fd);
- (*session->saved) --;
- if (*session->saved == 0) {
- session->session->state = WRITE_REPLY;
- r = snprintf (buf, sizeof (buf), "OK" CRLF);
- rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
- }
+ remove_normal_event (session->session->s, fuzzy_learn_fin, session);
}
static void
@@ -352,6 +359,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
session->task = task;
session->server = selected;
event_add (&session->ev, &session->tv);
+ register_async_event (task->s, fuzzy_io_fin, session, FALSE);
task->save.saved ++;
}
}
@@ -378,14 +386,14 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
session->state = STATE_WAIT;
task->msg = in;
- r = process_message (task);
saved = memory_pool_alloc0 (session->session_pool, sizeof (int));
+ r = process_message (task);
if (r == -1) {
msg_warn ("read_socket: processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
free_task (task, FALSE);
- session->state = WRITE_REPLY;
+ session->state = STATE_REPLY;
+ r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
return;
}
else {
@@ -406,9 +414,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
if (selected) {
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+ session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
free_task (task, FALSE);
return;
}
@@ -425,15 +433,15 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
s->cmd = cmd;
s->saved = saved;
event_add (&s->ev, &s->tv);
- memory_pool_add_destructor (session->session_pool, fuzzy_free_session, s);
(*saved) ++;
+ register_async_event (session->s, fuzzy_learn_fin, s, FALSE);
}
}
else {
+ session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
free_task (task, FALSE);
- session->state = WRITE_REPLY;
return;
}
cur = g_list_next (cur);
@@ -442,9 +450,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
free_task (task, FALSE);
if (*saved == 0) {
+ session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
}
}
@@ -460,7 +468,7 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
msg_info ("fuzzy_controller_handler: empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
+ session->state = STATE_REPLY;
return;
}
@@ -468,7 +476,7 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
if (err_str && *err_str != '\0') {
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
- session->state = WRITE_REPLY;
+ session->state = STATE_REPLY;
return;
}
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 10d6e9fa9..75d0f0f3b 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -447,6 +447,7 @@ make_surbl_requests (struct uri* url, struct worker_task *task, GTree *tree, str
msg_debug ("surbl_test_url: send surbl dns request %s", surbl_req);
if (evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param) == 0) {
param->task->save.saved ++;
+ register_async_event (task->s, (event_finalizer_t)dns_callback, NULL, TRUE);
}
}
else {
@@ -529,6 +530,7 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
param->task->save.saved = 1;
process_filters (param->task);
}
+ remove_forced_event (param->task->s, (event_finalizer_t)dns_callback);
}
@@ -650,6 +652,23 @@ register_memcached_call (struct uri *url, struct worker_task *task, GTree *url_t
}
static void
+free_redirector_session (void *ud)
+{
+ struct redirector_param *param = (struct redirector_param *)ud;
+
+ event_del (&param->ev);
+ close (param->sock);
+ param->task->save.saved --;
+ make_surbl_requests (param->url, param->task, param->tree, param->suffix);
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+
+}
+
+static void
redirector_callback (int fd, short what, void *arg)
{
struct redirector_param *param = (struct redirector_param *)arg;
@@ -671,32 +690,16 @@ redirector_callback (int fd, short what, void *arg)
r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url));
if (write (param->sock, url_buf, r) == -1) {
msg_err ("redirector_callback: write failed %s", strerror (errno));
- event_del (&param->ev);
- close (fd);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
return;
}
param->state = STATE_READ;
}
else {
- event_del (&param->ev);
- close (fd);
msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write",
param->task->message_id);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
-
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
+ return;
}
break;
case STATE_READ:
@@ -717,28 +720,12 @@ redirector_callback (int fd, short what, void *arg)
parse_uri (param->url, memory_pool_strdup (param->task->task_pool, c), param->task->task_pool);
}
}
- event_del (&param->ev);
- close (fd);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
}
else {
- event_del (&param->ev);
- close (fd);
msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read",
param->task->message_id);
- param->task->save.saved --;
- make_surbl_requests (param->url, param->task, param->tree, param->suffix);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
}
break;
}
@@ -774,6 +761,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
timeout->tv_usec = surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000;
event_set (&param->ev, s, EV_WRITE, redirector_callback, (void *)param);
event_add (&param->ev, timeout);
+ register_async_event (task->s, free_redirector_session, param, FALSE);
}
static gboolean
diff --git a/src/worker.c b/src/worker.c
index 9e9b73938..b900eada6 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -52,7 +52,7 @@ extern PerlInterpreter *perl_interpreter;
static struct timeval io_tv;
-static void write_socket (void *arg);
+static gboolean write_socket (void *arg);
static
void sig_handler (int signo)
@@ -147,10 +147,18 @@ free_task (struct worker_task *task, gboolean is_soft)
}
}
+static void
+free_task_hard (void *ud)
+{
+ struct worker_task *task = ud;
+
+ free_task (task, FALSE);
+}
+
/*
* Callback that is called when there is data to read in buffer
*/
-static void
+static gboolean
read_socket (f_str_t *in, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
@@ -185,7 +193,7 @@ read_socket (f_str_t *in, void *arg)
/* Skip filters */
task->state = WRITE_REPLY;
write_socket (task);
- return;
+ return TRUE;
}
r = process_filters (task);
if (r == -1) {
@@ -207,12 +215,14 @@ read_socket (f_str_t *in, void *arg)
msg_debug ("read_socket: invalid state on reading stage");
break;
}
+
+ return TRUE;
}
/*
* Callback for socket writing
*/
-static void
+static gboolean
write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
@@ -220,21 +230,26 @@ write_socket (void *arg)
switch (task->state) {
case WRITE_REPLY:
write_reply (task);
- task->state = CLOSING_CONNECTION;
+ destroy_session (task->s);
+ return FALSE;
break;
case WRITE_ERROR:
write_reply (task);
- task->state = CLOSING_CONNECTION;
+ destroy_session (task->s);
+ return FALSE;
break;
case CLOSING_CONNECTION:
msg_debug ("write_socket: normally closing connection");
- free_task (task, TRUE);
+ destroy_session (task->s);
+ return FALSE;
break;
default:
msg_info ("write_socket: abnormally closing connection");
- free_task (task, TRUE);
+ destroy_session (task->s);
+ return FALSE;
break;
}
+ return TRUE;
}
/*
@@ -246,7 +261,7 @@ err_socket (GError *err, void *arg)
struct worker_task *task = (struct worker_task *)arg;
msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
- free_task (task, FALSE);
+ destroy_session (task->s);
}
struct worker_task *
@@ -279,6 +294,7 @@ construct_task (struct rspamd_worker *worker)
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
+ new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task);
return new_task;
}