From 626a11ad9819593eadaca1e321192c75a32b51f3 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 22 Sep 2009 20:22:31 +0400 Subject: [PATCH] * Implement new system of async events handling (experimental) --- CMakeLists.txt | 1 + src/buffer.c | 71 +++++++--------- src/buffer.h | 6 +- src/controller.c | 60 ++++++++----- src/events.c | 173 ++++++++++++++++++++++++++++++++++++++ src/events.h | 39 +++++++++ src/lmtp.c | 11 ++- src/lmtp_proto.c | 20 +++-- src/main.h | 3 + src/plugins/fuzzy_check.c | 58 +++++++------ src/plugins/surbl.c | 62 ++++++-------- src/worker.c | 34 ++++++-- 12 files changed, 389 insertions(+), 149 deletions(-) create mode 100644 src/events.c create mode 100644 src/events.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 390b1bbfc..f43f8bab5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -360,6 +360,7 @@ SET(RSPAMDSRC src/modules.c src/controller.c src/cfg_utils.c src/buffer.c + src/events.c src/html.c src/lmtp.c src/lmtp_proto.c diff --git a/src/buffer.c b/src/buffer.c index 7c52da10d..33d0904f4 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -38,13 +38,12 @@ dispatcher_error_quark (void) #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) -static void -write_buffers (int fd, rspamd_io_dispatcher_t *d) +static gboolean +write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed) { GList *cur; GError *err; rspamd_buffer_t *buf; - struct timeval *ntv; ssize_t r; /* Fix order */ @@ -64,7 +63,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) if (d->err_callback) { err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); - return; + return FALSE; } } else if (r > 0) { @@ -80,7 +79,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) if (d->err_callback) { err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); d->err_callback (err, d->user_data); - return; + return FALSE; } } else if (r == -1 && errno == EAGAIN) { @@ -88,10 +87,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); - return; + event_add (d->ev, d->tv); + return TRUE; } cur = g_list_next (cur); } @@ -103,29 +100,25 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) msg_debug ("write_buffers: all buffers were written successfully"); - if (d->write_callback) { - d->write_callback (d->user_data); - if (d->wanna_die) { + if (is_delayed && d->write_callback) { + if (!d->write_callback (d->user_data)) { msg_debug ("write_buffers: callback set wanna_die flag, terminating"); - rspamd_remove_dispatcher (d); - return; + return FALSE; } } event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + event_add (d->ev, d->tv); } else { /* Plan other write event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + event_add (d->ev, d->tv); } + + return TRUE; } static void @@ -138,7 +131,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) char **pos; size_t *len; enum io_policy saved_policy; - + + if (d->wanna_die) { + rspamd_remove_dispatcher (d); + return; + } if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); @@ -208,10 +205,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) res.len --; } if (d->read_callback) { - d->read_callback (&res, d->user_data); - if (d->wanna_die) { - msg_debug ("read_buffers: callback set wanna_die flag, terminating"); - rspamd_remove_dispatcher (d); + if (!d->read_callback (&res, d->user_data)) { return; } /* Move remaining string to begin of buffer (draining) */ @@ -239,7 +233,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) res.len = r; c = b + r; if (d->read_callback) { - d->read_callback (&res, d->user_data); + if (!d->read_callback (&res, d->user_data)) { + return; + } /* Move remaining string to begin of buffer (draining) */ memmove (d->in_buf->data->begin, c, *len - r); b = d->in_buf->data->begin; @@ -264,7 +260,6 @@ dispatcher_cb (int fd, short what, void *arg) { rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg; GError *err; - struct timeval *ntv; msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd); @@ -280,12 +275,11 @@ dispatcher_cb (int fd, short what, void *arg) if (d->out_buffers == NULL) { event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + event_add (d->ev, d->tv); } else { - write_buffers (fd, d); + /* Delayed write */ + write_buffers (fd, d, TRUE); } break; case EV_READ: @@ -303,7 +297,6 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, struct timeval *tv, void *user_data) { rspamd_io_dispatcher_t *new; - struct timeval *ntv; if (fd == -1) { return NULL; @@ -331,9 +324,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, new->fd = fd; event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); - ntv = memory_pool_alloc (new->pool, sizeof (struct timeval)); - memcpy (ntv, new->tv, sizeof (struct timeval)); - event_add (new->ev, ntv); + event_add (new->ev, new->tv); return new; } @@ -388,13 +379,12 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars); } -void +gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, void *data, size_t len, gboolean delayed, gboolean allocated) { rspamd_buffer_t *newbuf; - struct timeval *ntv; newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); if (!allocated) { @@ -416,12 +406,9 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, if (!delayed) { msg_debug ("rspamd_dispatcher_write: plan write event"); - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + return write_buffers (d->fd, d, FALSE); } + return TRUE; } void diff --git a/src/buffer.h b/src/buffer.h index 458ea32de..d3410ebb8 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -10,8 +10,8 @@ #include "mem_pool.h" #include "fstring.h" -typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data); -typedef void (*dispatcher_write_callback_t)(void *user_data); +typedef gboolean (*dispatcher_read_callback_t)(f_str_t *in, void *user_data); +typedef gboolean (*dispatcher_write_callback_t)(void *user_data); typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data); /** @@ -81,7 +81,7 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, * @param data data to write * @param len length of data */ -void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, +gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, void *data, size_t len, gboolean delayed, gboolean allocated); diff --git a/src/controller.c b/src/controller.c index 0aaa8bd99..dd1f4a91b 100644 --- a/src/controller.c +++ b/src/controller.c @@ -84,6 +84,8 @@ static time_t start_time; static char greetingbuf[1024]; extern rspamd_hash_t *counters; +static gboolean controller_write_socket (void *arg); + static void sig_handler (int signo) { @@ -119,12 +121,13 @@ completion_func (gpointer elem) } static void -free_session (struct controller_session *session, gboolean is_soft) +free_session (void *ud) { GList *part; struct mime_part *p; + struct controller_session *session = ud; - msg_debug ("free_session: freeing session %p", session); + msg_info ("free_session: freeing session %p", session); while ((part = g_list_first (session->parts))) { session->parts = g_list_remove_link (session->parts, part); @@ -132,13 +135,7 @@ free_session (struct controller_session *session, gboolean is_soft) g_byte_array_free (p->content, FALSE); g_list_free_1 (part); } - if (is_soft) { - /* Plan dispatcher shutdown */ - session->dispatcher->wanna_die = 1; - } - else { - rspamd_remove_dispatcher (session->dispatcher); - } + rspamd_remove_dispatcher (session->dispatcher); close (session->sock); @@ -397,7 +394,7 @@ process_custom_command (char *line, char **cmd_args, struct controller_session * return FALSE; } -static void +static gboolean controller_read_socket (f_str_t *in, void *arg) { struct controller_session *session = (struct controller_session *)arg; @@ -426,13 +423,17 @@ controller_read_socket (f_str_t *in, void *arg) if (!process_custom_command (cmd, ¶ms[1], session)) { msg_debug ("Unknown command: '%s'", cmd); i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } } break; default: msg_debug ("Ambigious command: '%s'", cmd); i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } break; } } @@ -440,7 +441,9 @@ controller_read_socket (f_str_t *in, void *arg) session->state = STATE_REPLY; } if (session->state != STATE_LEARN && session->state != STATE_OTHER) { - rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE); + if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) { + return FALSE; + } } g_strfreev (params); @@ -454,17 +457,17 @@ controller_read_socket (f_str_t *in, void *arg) if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer, session->session_pool, &c, &tokens)) { i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } session->state = STATE_REPLY; - return; + return TRUE; } } cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier); session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, session->learn_symbol, tokens, session->in_class); session->worker->srv->stat->messages_learned ++; - i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE); /* Clean learned parts */ while ((cur = g_list_first (session->parts))) { @@ -474,6 +477,11 @@ controller_read_socket (f_str_t *in, void *arg) g_list_free_1 (cur); } + i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); + if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { + return FALSE; + } + session->state = STATE_REPLY; break; case STATE_OTHER: @@ -489,22 +497,29 @@ controller_read_socket (f_str_t *in, void *arg) msg_debug ("controller_read_socket: unknown state while reading %d", session->state); break; } + + if (session->state == STATE_REPLY || session->state == STATE_QUIT) { + (void)controller_write_socket (session); + } + + return TRUE; } -static void +static gboolean controller_write_socket (void *arg) { struct controller_session *session = (struct controller_session *)arg; if (session->state == STATE_QUIT) { /* Free buffers */ - free_session (session, TRUE); - return; + destroy_session (session->s); + return FALSE; } else if (session->state == STATE_REPLY) { session->state = STATE_COMMAND; rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ); } + return TRUE; } static void @@ -515,8 +530,9 @@ controller_err_socket (GError *err, void *arg) if (err->code != EOF) { msg_info ("controller_err_socket: abnormally closing control connection, error: %s", err->message); } + /* Free buffers */ - free_session (session, FALSE); + destroy_session (session->s); } static void @@ -552,6 +568,8 @@ accept_socket (int fd, short what, void *arg) io_tv->tv_sec = CONTROLLER_IO_TIMEOUT; io_tv->tv_usec = 0; + new_session->s = new_async_session (new_session->session_pool, free_session, new_session); + new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); diff --git a/src/events.c b/src/events.c new file mode 100644 index 000000000..9092e85c9 --- /dev/null +++ b/src/events.c @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "events.h" + + +struct rspamd_async_session* +new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data) +{ + struct rspamd_async_session *new; + + new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session)); + new->pool = pool; + new->fin = fin; + new->user_data = user_data; + new->wanna_die = FALSE; + new->events = g_queue_new (); + + memory_pool_add_destructor (pool, (pool_destruct_func)g_queue_free, new->events); + + return new; +} + +void +register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced) +{ + struct rspamd_async_event *new, *ev; + GList *cur; + + if (session == NULL) { + msg_info ("register_async_event: session is NULL"); + return; + } + + if (forced) { + /* For forced events try first to increase its reference */ + cur = session->events->head; + while (cur) { + ev = cur->data; + if (ev->forced && ev->fin == fin) { + ev->ref ++; + return; + } + cur = g_list_next (cur); + } + } + + new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event)); + new->fin = fin; + new->user_data = user_data; + new->forced = forced; + new->ref = 1; + g_queue_push_head (session->events, new); +} + +void +remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin) +{ + struct rspamd_async_event *ev; + GList *cur; + + if (session == NULL) { + msg_info ("remove_forced_event: session is NULL"); + return; + } + + cur = session->events->head; + while (cur) { + ev = cur->data; + if (ev->forced && ev->fin == fin) { + ev->ref --; + if (ev->ref == 0) { + g_queue_delete_link (session->events, cur); + } + break; + } + cur = g_list_next (cur); + } + + if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) { + /* Call session destroy after all forced events are ready */ + session->fin (session->user_data); + } +} + +void +remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) +{ + struct rspamd_async_event *ev; + GList *cur; + + if (session == NULL) { + msg_info ("remove_forced_event: session is NULL"); + return; + } + + cur = session->events->head; + while (cur) { + ev = cur->data; + if (ev->fin == fin && ev->user_data == ud && !ev->forced) { + g_queue_delete_link (session->events, cur); + if (ev->fin) { + ev->fin (ev->user_data); + } + break; + } + cur = g_list_next (cur); + } +} + +gboolean +destroy_session (struct rspamd_async_session *session) +{ + struct rspamd_async_event *ev; + GList *cur, *tmp; + + if (session == NULL) { + msg_info ("destroy_session: session is NULL"); + return FALSE; + } + + session->wanna_die = TRUE; + + cur = session->events->head; + + while (cur) { + ev = cur->data; + if (!ev->forced) { + if (ev->fin != NULL) { + ev->fin (ev->user_data); + } + tmp = cur; + cur = g_list_next (cur); + g_queue_delete_link (session->events, tmp); + } + else { + /* Do nothing with forced callbacks */ + cur = g_list_next (cur); + } + } + + if (g_queue_get_length (session->events) == 0) { + if (session->fin != NULL) { + session->fin (session->user_data); + } + return TRUE; + } + + return FALSE; +} diff --git a/src/events.h b/src/events.h new file mode 100644 index 000000000..3715b4d66 --- /dev/null +++ b/src/events.h @@ -0,0 +1,39 @@ +#ifndef RSPAMD_EVENTS_H +#define RSPAMD_EVENTS_H + +#include "config.h" + +struct rspamd_async_event; + +typedef void (*event_finalizer_t)(void *user_data); + +struct rspamd_async_event { + event_finalizer_t fin; + void *user_data; + gboolean forced; + guint ref; +}; + +struct rspamd_async_session { + event_finalizer_t fin; + GQueue *events; + void *user_data; + memory_pool_t *pool; + gboolean wanna_die; +}; + +/* Makes new async session */ +struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data); +/* Insert event into session */ +void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced); +/* Must be called by forced events to call session destructor properly */ +void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin); +void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud); + +/** + * Must be called at the end of session, it calls fin functions for all non-forced callbacks + * @return true if the whole session was destroyed and false if there are forced events + */ +gboolean destroy_session (struct rspamd_async_session *session); + +#endif /* RSPAMD_EVENTS_H */ diff --git a/src/lmtp.c b/src/lmtp.c index c3d5ab344..3becb1dfe 100644 --- a/src/lmtp.c +++ b/src/lmtp.c @@ -36,7 +36,7 @@ static char greetingbuf[1024]; static struct timeval io_tv; -static void lmtp_write_socket (void *arg); +static gboolean lmtp_write_socket (void *arg); static void sig_handler (int signo) @@ -121,7 +121,7 @@ free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft) /* * Callback that is called when there is data to read in buffer */ -static void +static gboolean lmtp_read_socket (f_str_t *in, void *arg) { struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; @@ -163,12 +163,14 @@ lmtp_read_socket (f_str_t *in, void *arg) msg_debug ("lmtp_read_socket: invalid state while reading from socket %d", lmtp->task->state); break; } + + return TRUE; } /* * Callback for socket writing */ -static void +static gboolean lmtp_write_socket (void *arg) { struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; @@ -189,11 +191,14 @@ lmtp_write_socket (void *arg) case CLOSING_CONNECTION: msg_debug ("lmtp_write_socket: normally closing connection"); free_lmtp_task (lmtp, TRUE); + return FALSE; break; default: msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state); break; } + + return TRUE; } /* diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c index 62967c03b..7fbc38684 100644 --- a/src/lmtp_proto.c +++ b/src/lmtp_proto.c @@ -293,13 +293,13 @@ close_mta_connection (struct mta_callback_data *cd, gboolean is_success) else { out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure"); } - cd->dispatcher->wanna_die = TRUE; + rspamd_remove_dispatcher (cd->dispatcher); } /* * Callback that is called when there is data to read in buffer */ -static void +static gboolean mta_read_socket (f_str_t *in, void *arg) { struct mta_callback_data *cd = (struct mta_callback_data *)arg; @@ -317,7 +317,7 @@ mta_read_socket (f_str_t *in, void *arg) if (fstrstr (in, &contres1) != -1 || fstrstr (in, &contres2) != -1) { /* Skip such lines */ - return; + return TRUE; } switch (cd->state) { @@ -325,7 +325,7 @@ mta_read_socket (f_str_t *in, void *arg) if (!parse_mta_str (in, cd)) { msg_warn ("mta_read_socket: got bad greeting"); close_mta_connection (cd, FALSE); - return; + return FALSE; } hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; hostbuf = alloca (hostmax); @@ -344,7 +344,7 @@ mta_read_socket (f_str_t *in, void *arg) if (!parse_mta_str (in, cd)) { msg_warn ("mta_read_socket: got bad helo"); close_mta_connection (cd, FALSE); - return; + return FALSE; } r = snprintf (outbuf, sizeof (outbuf), "MAIL FROM: <%s>" CRLF, cd->task->from); rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE); @@ -354,7 +354,7 @@ mta_read_socket (f_str_t *in, void *arg) if (!parse_mta_str (in, cd)) { msg_warn ("mta_read_socket: got bad mail from"); close_mta_connection (cd, FALSE); - return; + return FALSE; } cur = g_list_first (cd->task->rcpt); r = 0; @@ -370,7 +370,7 @@ mta_read_socket (f_str_t *in, void *arg) if (!parse_mta_str (in, cd)) { msg_warn ("mta_read_socket: got bad rcpt"); close_mta_connection (cd, FALSE); - return; + return FALSE; } r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF); rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE); @@ -380,7 +380,7 @@ mta_read_socket (f_str_t *in, void *arg) if (!parse_mta_str (in, cd)) { msg_warn ("mta_read_socket: got bad data"); close_mta_connection (cd, FALSE); - return; + return FALSE; } c = g_mime_object_to_string ((GMimeObject *)cd->task->message); r = strlen (c); @@ -393,11 +393,13 @@ mta_read_socket (f_str_t *in, void *arg) if (!parse_mta_str (in, cd)) { msg_warn ("mta_read_socket: message not delivered"); close_mta_connection (cd, FALSE); - return; + return FALSE; } close_mta_connection (cd, TRUE); break; } + + return TRUE; } /* diff --git a/src/main.h b/src/main.h index d31f09943..cb922252c 100644 --- a/src/main.h +++ b/src/main.h @@ -16,6 +16,7 @@ #include "filter.h" #include "buffer.h" #include "hash.h" +#include "events.h" #include "util.h" /* Default values */ @@ -147,6 +148,7 @@ struct controller_session { void (*other_handler)(struct controller_session *session, f_str_t *in); /**< other command handler to execute at the end of processing */ void *other_data; /**< and its data */ + struct rspamd_async_session* s; /**< async session object */ }; typedef void (*controller_func_t)(char **args, struct controller_session *session); @@ -181,6 +183,7 @@ struct worker_task { char *user; /**< user to deliver */ f_str_t *msg; /**< message buffer */ rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */ + struct rspamd_async_session* s; /**< async session object */ memcached_ctx_t *memc_ctx; /**< memcached context associated with task */ int parts_count; /**< mime parts count */ GMimeMessage *message; /**< message, parsed with GMime */ diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index cd3641654..4001a9744 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -218,6 +218,20 @@ fuzzy_check_module_reconfig (struct config_file *cfg) return fuzzy_check_module_config (cfg); } +static void +fuzzy_io_fin (void *ud) +{ + struct fuzzy_client_session *session = ud; + + event_del (&session->ev); + session->task->save.saved --; + if (session->task->save.saved == 0) { + /* Call other filters */ + session->task->save.saved = 1; + process_filters (session->task); + } +} + static void fuzzy_io_callback (int fd, short what, void *arg) { @@ -254,23 +268,22 @@ fuzzy_io_callback (int fd, short what, void *arg) msg_err ("fuzzy_io_callback: got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno)); ok: - event_del (&session->ev); close (fd); - session->task->save.saved --; - if (session->task->save.saved == 0) { - /* Call other filters */ - session->task->save.saved = 1; - process_filters (session->task); - } + remove_normal_event (session->task->s, fuzzy_io_fin, session); } static void -fuzzy_free_session (void *arg) +fuzzy_learn_fin (void *arg) { struct fuzzy_learn_session *session = arg; event_del (&session->ev); + (*session->saved) --; + if (*session->saved == 0) { + session->session->state = STATE_REPLY; + rspamd_dispatcher_write (session->session->dispatcher, "OK" CRLF, sizeof ("OK" CRLF) - 1, FALSE, FALSE); + } } static void @@ -279,7 +292,6 @@ fuzzy_learn_callback (int fd, short what, void *arg) struct fuzzy_learn_session *session = arg; struct fuzzy_cmd cmd; char buf[sizeof ("ERR" CRLF)]; - int r; if (what == EV_WRITE) { /* Send command to storage */ @@ -308,12 +320,7 @@ fuzzy_learn_callback (int fd, short what, void *arg) session->server->port, errno, strerror (errno)); ok: close (fd); - (*session->saved) --; - if (*session->saved == 0) { - session->session->state = WRITE_REPLY; - r = snprintf (buf, sizeof (buf), "OK" CRLF); - rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE); - } + remove_normal_event (session->session->s, fuzzy_learn_fin, session); } static void @@ -352,6 +359,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused) session->task = task; session->server = selected; event_add (&session->ev, &session->tv); + register_async_event (task->s, fuzzy_io_fin, session, FALSE); task->save.saved ++; } } @@ -378,14 +386,14 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) session->state = STATE_WAIT; task->msg = in; - r = process_message (task); saved = memory_pool_alloc0 (session->session_pool, sizeof (int)); + r = process_message (task); if (r == -1) { msg_warn ("read_socket: processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; free_task (task, FALSE); - session->state = WRITE_REPLY; + session->state = STATE_REPLY; + r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); return; } else { @@ -406,9 +414,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) if (selected) { if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); + session->state = STATE_REPLY; r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); - session->state = WRITE_REPLY; free_task (task, FALSE); return; } @@ -425,15 +433,15 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) s->cmd = cmd; s->saved = saved; event_add (&s->ev, &s->tv); - memory_pool_add_destructor (session->session_pool, fuzzy_free_session, s); (*saved) ++; + register_async_event (session->s, fuzzy_learn_fin, s, FALSE); } } else { + session->state = STATE_REPLY; r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); free_task (task, FALSE); - session->state = WRITE_REPLY; return; } cur = g_list_next (cur); @@ -442,9 +450,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) free_task (task, FALSE); if (*saved == 0) { + session->state = STATE_REPLY; r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); - session->state = WRITE_REPLY; } } @@ -460,7 +468,7 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c msg_info ("fuzzy_controller_handler: empty content length"); r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); - session->state = WRITE_REPLY; + session->state = STATE_REPLY; return; } @@ -468,7 +476,7 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c if (err_str && *err_str != '\0') { r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); - session->state = WRITE_REPLY; + session->state = STATE_REPLY; return; } diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 10d6e9fa9..75d0f0f3b 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -447,6 +447,7 @@ make_surbl_requests (struct uri* url, struct worker_task *task, GTree *tree, str msg_debug ("surbl_test_url: send surbl dns request %s", surbl_req); if (evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param) == 0) { param->task->save.saved ++; + register_async_event (task->s, (event_finalizer_t)dns_callback, NULL, TRUE); } } else { @@ -529,6 +530,7 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void * param->task->save.saved = 1; process_filters (param->task); } + remove_forced_event (param->task->s, (event_finalizer_t)dns_callback); } @@ -649,6 +651,23 @@ register_memcached_call (struct uri *url, struct worker_task *task, GTree *url_t memc_init_ctx (param->ctx); } +static void +free_redirector_session (void *ud) +{ + struct redirector_param *param = (struct redirector_param *)ud; + + event_del (¶m->ev); + close (param->sock); + param->task->save.saved --; + make_surbl_requests (param->url, param->task, param->tree, param->suffix); + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + +} + static void redirector_callback (int fd, short what, void *arg) { @@ -671,32 +690,16 @@ redirector_callback (int fd, short what, void *arg) r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url)); if (write (param->sock, url_buf, r) == -1) { msg_err ("redirector_callback: write failed %s", strerror (errno)); - event_del (¶m->ev); - close (fd); - param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree, param->suffix); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } + remove_normal_event (param->task->s, free_redirector_session, param); return; } param->state = STATE_READ; } else { - event_del (¶m->ev); - close (fd); msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write", param->task->message_id); - param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree, param->suffix); - - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } + remove_normal_event (param->task->s, free_redirector_session, param); + return; } break; case STATE_READ: @@ -717,28 +720,12 @@ redirector_callback (int fd, short what, void *arg) parse_uri (param->url, memory_pool_strdup (param->task->task_pool, c), param->task->task_pool); } } - event_del (¶m->ev); - close (fd); - param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree, param->suffix); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } + remove_normal_event (param->task->s, free_redirector_session, param); } else { - event_del (¶m->ev); - close (fd); msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read", param->task->message_id); - param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree, param->suffix); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } + remove_normal_event (param->task->s, free_redirector_session, param); } break; } @@ -774,6 +761,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_ timeout->tv_usec = surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000; event_set (¶m->ev, s, EV_WRITE, redirector_callback, (void *)param); event_add (¶m->ev, timeout); + register_async_event (task->s, free_redirector_session, param, FALSE); } static gboolean diff --git a/src/worker.c b/src/worker.c index 9e9b73938..b900eada6 100644 --- a/src/worker.c +++ b/src/worker.c @@ -52,7 +52,7 @@ extern PerlInterpreter *perl_interpreter; static struct timeval io_tv; -static void write_socket (void *arg); +static gboolean write_socket (void *arg); static void sig_handler (int signo) @@ -147,10 +147,18 @@ free_task (struct worker_task *task, gboolean is_soft) } } +static void +free_task_hard (void *ud) +{ + struct worker_task *task = ud; + + free_task (task, FALSE); +} + /* * Callback that is called when there is data to read in buffer */ -static void +static gboolean read_socket (f_str_t *in, void *arg) { struct worker_task *task = (struct worker_task *)arg; @@ -185,7 +193,7 @@ read_socket (f_str_t *in, void *arg) /* Skip filters */ task->state = WRITE_REPLY; write_socket (task); - return; + return TRUE; } r = process_filters (task); if (r == -1) { @@ -207,12 +215,14 @@ read_socket (f_str_t *in, void *arg) msg_debug ("read_socket: invalid state on reading stage"); break; } + + return TRUE; } /* * Callback for socket writing */ -static void +static gboolean write_socket (void *arg) { struct worker_task *task = (struct worker_task *)arg; @@ -220,21 +230,26 @@ write_socket (void *arg) switch (task->state) { case WRITE_REPLY: write_reply (task); - task->state = CLOSING_CONNECTION; + destroy_session (task->s); + return FALSE; break; case WRITE_ERROR: write_reply (task); - task->state = CLOSING_CONNECTION; + destroy_session (task->s); + return FALSE; break; case CLOSING_CONNECTION: msg_debug ("write_socket: normally closing connection"); - free_task (task, TRUE); + destroy_session (task->s); + return FALSE; break; default: msg_info ("write_socket: abnormally closing connection"); - free_task (task, TRUE); + destroy_session (task->s); + return FALSE; break; } + return TRUE; } /* @@ -246,7 +261,7 @@ err_socket (GError *err, void *arg) struct worker_task *task = (struct worker_task *)arg; msg_info ("err_socket: abnormally closing connection, error: %s", err->message); /* Free buffers */ - free_task (task, FALSE); + destroy_session (task->s); } struct worker_task * @@ -279,6 +294,7 @@ construct_task (struct rspamd_worker *worker) memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results); new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache); + new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task); return new_task; } -- 2.39.5