]> source.dussan.org Git - rspamd.git/commitdiff
* Implement new system of async events handling (experimental)
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 22 Sep 2009 16:22:31 +0000 (20:22 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 22 Sep 2009 16:22:31 +0000 (20:22 +0400)
12 files changed:
CMakeLists.txt
src/buffer.c
src/buffer.h
src/controller.c
src/events.c [new file with mode: 0644]
src/events.h [new file with mode: 0644]
src/lmtp.c
src/lmtp_proto.c
src/main.h
src/plugins/fuzzy_check.c
src/plugins/surbl.c
src/worker.c

index 390b1bbfc146f8acd2d1a9d9e47a87cb66413530..f43f8bab59f23acbab06446314145ef9c45d0dc9 100644 (file)
@@ -360,6 +360,7 @@ SET(RSPAMDSRC       src/modules.c
                                src/controller.c
                                src/cfg_utils.c
                                src/buffer.c
+                               src/events.c
                                src/html.c
                                src/lmtp.c
                                src/lmtp_proto.c
index 7c52da10de4ecc100504aa6717833effa098f989..33d0904f48a1b559ea31303207bb8bb669964a13 100644 (file)
@@ -38,13 +38,12 @@ dispatcher_error_quark (void)
 
 #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
 
-static void
-write_buffers (int fd, rspamd_io_dispatcher_t *d)
+static gboolean
+write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
 {
        GList *cur;
        GError *err;
        rspamd_buffer_t *buf;
-       struct timeval *ntv;
        ssize_t r;
 
        /* Fix order */
@@ -64,7 +63,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                        if (d->err_callback) {
                                err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
                                d->err_callback (err, d->user_data);
-                               return;
+                               return FALSE;
                        }
                }
                else if (r > 0) {
@@ -80,7 +79,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                        if (d->err_callback) {
                                err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
                                d->err_callback (err, d->user_data);
-                               return;
+                               return FALSE;
                        }
                }
                else if (r == -1 && errno == EAGAIN) {
@@ -88,10 +87,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                        /* Wait for other event */
                        event_del (d->ev);
                        event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
-                       ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
-                       memcpy (ntv, d->tv, sizeof (struct timeval));
-                       event_add (d->ev, ntv);
-                       return;
+                       event_add (d->ev, d->tv);
+                       return TRUE;
                }
                cur = g_list_next (cur);
        }
@@ -103,29 +100,25 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
 
                msg_debug ("write_buffers: all buffers were written successfully");
 
-               if (d->write_callback) {
-                       d->write_callback (d->user_data);
-                       if (d->wanna_die) {
+               if (is_delayed && d->write_callback) {
+                       if (!d->write_callback (d->user_data)) {
                                msg_debug ("write_buffers: callback set wanna_die flag, terminating");
-                               rspamd_remove_dispatcher (d);
-                               return;
+                               return FALSE;
                        }
                }
                
                event_del (d->ev);
                event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
-               ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
-               memcpy (ntv, d->tv, sizeof (struct timeval));
-               event_add (d->ev, ntv);
+               event_add (d->ev, d->tv);
        }
        else {
                /* Plan other write event */
                event_del (d->ev);
                event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
-               ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
-               memcpy (ntv, d->tv, sizeof (struct timeval));
-               event_add (d->ev, ntv);
+               event_add (d->ev, d->tv);
        }
+
+       return TRUE;
 }
 
 static void
@@ -138,7 +131,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
        char **pos;
        size_t *len;
        enum io_policy saved_policy;
-
+       
+       if (d->wanna_die) {
+               rspamd_remove_dispatcher (d);
+               return;
+       }
 
        if (d->in_buf == NULL) {
                d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -208,10 +205,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
                                                res.len --;
                                        }
                                        if (d->read_callback) {
-                                               d->read_callback (&res, d->user_data);
-                                               if (d->wanna_die) {
-                                                       msg_debug ("read_buffers: callback set wanna_die flag, terminating");
-                                                       rspamd_remove_dispatcher (d);
+                                               if (!d->read_callback (&res, d->user_data)) {
                                                        return;
                                                }
                                                /* Move remaining string to begin of buffer (draining) */
@@ -239,7 +233,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
                                res.len = r;
                                c = b + r;
                                if (d->read_callback) {
-                                       d->read_callback (&res, d->user_data);
+                                       if (!d->read_callback (&res, d->user_data)) {
+                                               return;
+                                       }
                                        /* Move remaining string to begin of buffer (draining) */
                                        memmove (d->in_buf->data->begin, c, *len - r);
                                        b = d->in_buf->data->begin;
@@ -264,7 +260,6 @@ dispatcher_cb (int fd, short what, void *arg)
 {
        rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
        GError *err;
-       struct timeval *ntv;
 
        msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
 
@@ -280,12 +275,11 @@ dispatcher_cb (int fd, short what, void *arg)
                        if (d->out_buffers == NULL) {
                                event_del (d->ev);
                                event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
-                               ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
-                               memcpy (ntv, d->tv, sizeof (struct timeval));
-                               event_add (d->ev, ntv);
+                               event_add (d->ev, d->tv);
                        }
                        else {
-                               write_buffers (fd, d);
+                               /* Delayed write */
+                               write_buffers (fd, d, TRUE);
                        }
                        break;
                case EV_READ:
@@ -303,7 +297,6 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
                                                        struct timeval *tv, void *user_data)
 {
        rspamd_io_dispatcher_t *new;
-       struct timeval *ntv;
 
        if (fd == -1) {
                return NULL;
@@ -331,9 +324,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
        new->fd = fd;
 
        event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
-       ntv = memory_pool_alloc (new->pool, sizeof (struct timeval));
-       memcpy (ntv, new->tv, sizeof (struct timeval));
-       event_add (new->ev, ntv);
+       event_add (new->ev, new->tv);
 
        return new;
 }
@@ -388,13 +379,12 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
        msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
 }
 
-void 
+gboolean 
 rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
                                                                        void *data,
                                                                        size_t len, gboolean delayed, gboolean allocated)
 {
        rspamd_buffer_t *newbuf;
-       struct timeval *ntv;
 
        newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
        if (!allocated) {
@@ -416,12 +406,9 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
 
        if (!delayed) {
                msg_debug ("rspamd_dispatcher_write: plan write event");
-               event_del (d->ev);
-               event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
-               ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
-               memcpy (ntv, d->tv, sizeof (struct timeval));
-               event_add (d->ev, ntv);
+               return write_buffers (d->fd, d, FALSE);
        }
+       return TRUE;
 }
 
 void 
index 458ea32ded9d179f5d57195d1ea31097853ce59c..d3410ebb8764bfe7fa6958f836195e85cb9be7b6 100644 (file)
@@ -10,8 +10,8 @@
 #include "mem_pool.h"
 #include "fstring.h"
 
-typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
-typedef void (*dispatcher_write_callback_t)(void *user_data);
+typedef gboolean (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
+typedef gboolean (*dispatcher_write_callback_t)(void *user_data);
 typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
 
 /**
@@ -81,7 +81,7 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
  * @param data data to write
  * @param len length of data
  */
-void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
                                                                                                  void *data,
                                                                                                  size_t len, gboolean delayed, gboolean allocated);
 
index 0aaa8bd992d44b6fcf8324b58c088e70a19513b5..dd1f4a91ba38897605fd9c74878963081a016511 100644 (file)
@@ -84,6 +84,8 @@ static time_t start_time;
 static char greetingbuf[1024];
 extern rspamd_hash_t *counters;
 
+static gboolean controller_write_socket (void *arg);
+
 static 
 void sig_handler (int signo)
 {
@@ -119,12 +121,13 @@ completion_func (gpointer elem)
 }
 
 static void
-free_session (struct controller_session *session, gboolean is_soft)
+free_session (void *ud)
 {
        GList *part;
        struct mime_part *p;
+    struct controller_session *session = ud;
        
-       msg_debug ("free_session: freeing session %p", session);
+       msg_info ("free_session: freeing session %p", session);
        
        while ((part = g_list_first (session->parts))) {
                session->parts = g_list_remove_link (session->parts, part);
@@ -132,13 +135,7 @@ free_session (struct controller_session *session, gboolean is_soft)
                g_byte_array_free (p->content, FALSE);
                g_list_free_1 (part);
        }
-       if (is_soft) {
-               /* Plan dispatcher shutdown */
-               session->dispatcher->wanna_die = 1;
-       }
-       else {
-               rspamd_remove_dispatcher (session->dispatcher);
-       }
+       rspamd_remove_dispatcher (session->dispatcher);
 
        close (session->sock);
 
@@ -397,7 +394,7 @@ process_custom_command (char *line, char **cmd_args, struct controller_session *
        return FALSE;
 }
 
-static void
+static gboolean
 controller_read_socket (f_str_t *in, void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
@@ -426,13 +423,17 @@ controller_read_socket (f_str_t *in, void *arg)
                                                if (!process_custom_command (cmd, &params[1], session)) {
                                                        msg_debug ("Unknown command: '%s'", cmd);
                                                        i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
-                                                       rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+                                                       if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+                                return FALSE;
+                            }
                                                }
                                                break;
                                        default:
                                                msg_debug ("Ambigious command: '%s'", cmd);
                                                i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
-                                               rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+                                               if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+                            return FALSE;
+                        }
                                                break;
                                }
                        }
@@ -440,7 +441,9 @@ controller_read_socket (f_str_t *in, void *arg)
                                session->state = STATE_REPLY;
                        }
                        if (session->state != STATE_LEARN && session->state != STATE_OTHER) {
-                               rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE);
+                               if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) {
+                    return FALSE;
+                }
                        }
 
                        g_strfreev (params);
@@ -454,17 +457,17 @@ controller_read_socket (f_str_t *in, void *arg)
                                if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer, 
                                                        session->session_pool, &c, &tokens)) {
                                        i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
-                                       rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
+                                       if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+                        return FALSE;
+                    }
                                        session->state = STATE_REPLY;
-                                       return;
+                                       return TRUE;
                                }
                        }
                        cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
                        session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
                                                                                                        session->learn_symbol, tokens, session->in_class);
                        session->worker->srv->stat->messages_learned ++;
-                       i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
-                       rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE);
 
                        /* Clean learned parts */
                        while ((cur = g_list_first (session->parts))) {
@@ -474,6 +477,11 @@ controller_read_socket (f_str_t *in, void *arg)
                                g_list_free_1 (cur);
                        }
 
+                       i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+                       if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
+                return FALSE;
+            }
+
                        session->state = STATE_REPLY;
                        break;
                case STATE_OTHER:
@@ -489,22 +497,29 @@ controller_read_socket (f_str_t *in, void *arg)
                        msg_debug ("controller_read_socket: unknown state while reading %d", session->state);
                        break;
        }
+
+    if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
+        (void)controller_write_socket (session);
+    }
+
+    return TRUE;
 }
 
-static void
+static gboolean
 controller_write_socket (void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
        
        if (session->state == STATE_QUIT) {
                /* Free buffers */
-               free_session (session, TRUE);
-               return;
+        destroy_session (session->s);
+        return FALSE;
        }
        else if (session->state == STATE_REPLY) {
                session->state = STATE_COMMAND;
                rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
        }
+    return TRUE;
 }
 
 static void
@@ -515,8 +530,9 @@ controller_err_socket (GError *err, void *arg)
        if (err->code != EOF) {
                msg_info ("controller_err_socket: abnormally closing control connection, error: %s", err->message);
        }
+
        /* Free buffers */
-       free_session (session, FALSE);
+    destroy_session (session->s);
 }
 
 static void
@@ -552,6 +568,8 @@ accept_socket (int fd, short what, void *arg)
        io_tv->tv_sec = CONTROLLER_IO_TIMEOUT;
        io_tv->tv_usec = 0;
 
+    new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+
        new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket,
                                                                                                                controller_write_socket, controller_err_socket, io_tv,
                                                                                                                (void *)new_session);
diff --git a/src/events.c b/src/events.c
new file mode 100644 (file)
index 0000000..9092e85
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *      * Redistributions of source code must retain the above copyright
+ *        notice, this list of conditions and the following disclaimer.
+ *      * Redistributions in binary form must reproduce the above copyright
+ *        notice, this list of conditions and the following disclaimer in the
+ *        documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "events.h"
+
+
+struct rspamd_async_session*
+new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data)
+{
+       struct rspamd_async_session *new;
+
+       new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
+       new->pool = pool;
+       new->fin = fin;
+       new->user_data = user_data;
+       new->wanna_die = FALSE;
+       new->events = g_queue_new ();
+
+       memory_pool_add_destructor (pool, (pool_destruct_func)g_queue_free, new->events);
+
+       return new;
+}
+
+void 
+register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
+{
+       struct rspamd_async_event *new, *ev;
+       GList *cur;
+
+       if (session == NULL) {
+               msg_info ("register_async_event: session is NULL");
+               return;
+       }
+       
+       if (forced) {
+               /* For forced events try first to increase its reference */
+               cur = session->events->head;
+               while (cur) {
+                       ev = cur->data;
+                       if (ev->forced && ev->fin == fin) {
+                               ev->ref ++;
+                               return;
+                       }
+                       cur = g_list_next (cur);
+               }
+       }
+
+       new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
+       new->fin = fin;
+       new->user_data = user_data;
+       new->forced = forced;
+       new->ref = 1;
+       g_queue_push_head (session->events, new);
+}
+
+void 
+remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin)
+{
+       struct rspamd_async_event *ev;
+       GList *cur;
+
+       if (session == NULL) {
+               msg_info ("remove_forced_event: session is NULL");
+               return;
+       }
+       
+       cur = session->events->head;
+       while (cur) {
+               ev = cur->data;
+               if (ev->forced && ev->fin == fin) {
+                       ev->ref --;
+                       if (ev->ref == 0) {
+                               g_queue_delete_link (session->events, cur);
+                       }
+                       break;
+               }
+               cur = g_list_next (cur);
+       }
+
+       if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+               /* Call session destroy after all forced events are ready */
+               session->fin (session->user_data);
+       }
+}
+
+void 
+remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) 
+{
+       struct rspamd_async_event *ev;
+       GList *cur;
+
+       if (session == NULL) {
+               msg_info ("remove_forced_event: session is NULL");
+               return;
+       }
+       
+       cur = session->events->head;
+       while (cur) {
+               ev = cur->data;
+               if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
+                       g_queue_delete_link (session->events, cur);
+                       if (ev->fin) {
+                               ev->fin (ev->user_data);
+                       }
+                       break;
+               }
+               cur = g_list_next (cur);
+       }
+}
+
+gboolean
+destroy_session (struct rspamd_async_session *session)
+{
+       struct rspamd_async_event *ev;
+       GList *cur, *tmp;
+
+       if (session == NULL) {
+               msg_info ("destroy_session: session is NULL");
+               return FALSE;
+       }
+       
+       session->wanna_die = TRUE;
+       
+       cur = session->events->head;
+
+       while (cur) {
+               ev = cur->data;
+               if (!ev->forced) {
+                       if (ev->fin != NULL) {
+                               ev->fin (ev->user_data);
+                       }
+                       tmp = cur;
+                       cur = g_list_next (cur);
+                       g_queue_delete_link (session->events, tmp);
+               }
+               else {
+                       /* Do nothing with forced callbacks */
+                       cur = g_list_next (cur);
+               }
+       }
+
+       if (g_queue_get_length (session->events) == 0) {
+               if (session->fin != NULL) {
+                       session->fin (session->user_data);
+               }
+               return TRUE;
+       }
+
+       return FALSE;
+}
diff --git a/src/events.h b/src/events.h
new file mode 100644 (file)
index 0000000..3715b4d
--- /dev/null
@@ -0,0 +1,39 @@
+#ifndef RSPAMD_EVENTS_H
+#define RSPAMD_EVENTS_H
+
+#include "config.h"
+
+struct rspamd_async_event;
+
+typedef void (*event_finalizer_t)(void *user_data);
+
+struct rspamd_async_event {
+       event_finalizer_t fin;
+       void *user_data;
+       gboolean forced;
+       guint ref;
+};
+
+struct rspamd_async_session {
+       event_finalizer_t fin;
+       GQueue *events;
+       void *user_data;
+       memory_pool_t *pool;
+       gboolean wanna_die;
+};
+
+/* Makes new async session */
+struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data);
+/* Insert event into session */
+void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced);
+/* Must be called by forced events to call session destructor properly */
+void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
+void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud); 
+
+/**
+ * Must be called at the end of session, it calls fin functions for all non-forced callbacks
+ * @return true if the whole session was destroyed and false if there are forced events 
+ */
+gboolean destroy_session (struct rspamd_async_session *session);
+
+#endif /* RSPAMD_EVENTS_H */
index c3d5ab344eb70fa88f40611d556b1a9703f4776e..3becb1dfe6a946dad1cff0c034d001c13f688599 100644 (file)
@@ -36,7 +36,7 @@
 static char greetingbuf[1024];
 static struct timeval io_tv;
 
-static void lmtp_write_socket (void *arg);
+static gboolean lmtp_write_socket (void *arg);
 
 static 
 void sig_handler (int signo)
@@ -121,7 +121,7 @@ free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
 /*
  * Callback that is called when there is data to read in buffer
  */
-static void
+static gboolean
 lmtp_read_socket (f_str_t *in, void *arg)
 {
        struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
@@ -163,12 +163,14 @@ lmtp_read_socket (f_str_t *in, void *arg)
                        msg_debug ("lmtp_read_socket: invalid state while reading from socket %d", lmtp->task->state);
                        break;
        }
+
+       return TRUE;
 }
 
 /*
  * Callback for socket writing
  */
-static void
+static gboolean
 lmtp_write_socket (void *arg)
 {
        struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
@@ -189,11 +191,14 @@ lmtp_write_socket (void *arg)
                case CLOSING_CONNECTION:
                        msg_debug ("lmtp_write_socket: normally closing connection");
                        free_lmtp_task (lmtp, TRUE);
+                       return FALSE;
                        break;
                default:
                        msg_debug ("lmtp_write_socket: invalid state while writing to socket %d", lmtp->task->state);
                        break;
        }
+
+       return TRUE;
 }
 
 /*
index 62967c03b95f0f8dc9654622916f180d50e00402..7fbc38684b8722a08c7a11c60cdc404c907501cf 100644 (file)
@@ -293,13 +293,13 @@ close_mta_connection (struct mta_callback_data *cd, gboolean is_success)
        else {
                out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure");
        }
-       cd->dispatcher->wanna_die = TRUE;
+       rspamd_remove_dispatcher (cd->dispatcher);
 }
 
 /*
  * Callback that is called when there is data to read in buffer
  */
-static void
+static gboolean
 mta_read_socket (f_str_t *in, void *arg)
 {
        struct mta_callback_data *cd = (struct mta_callback_data *)arg;
@@ -317,7 +317,7 @@ mta_read_socket (f_str_t *in, void *arg)
        
        if (fstrstr (in, &contres1) != -1 || fstrstr (in, &contres2) != -1) {
                /* Skip such lines */
-               return;
+               return TRUE;
        }
 
        switch (cd->state) {
@@ -325,7 +325,7 @@ mta_read_socket (f_str_t *in, void *arg)
                        if (!parse_mta_str (in, cd)) {
                                msg_warn ("mta_read_socket: got bad greeting");
                                close_mta_connection (cd, FALSE);
-                               return;
+                               return FALSE;
                        }
                        hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
                        hostbuf = alloca (hostmax);
@@ -344,7 +344,7 @@ mta_read_socket (f_str_t *in, void *arg)
                        if (!parse_mta_str (in, cd)) {
                                msg_warn ("mta_read_socket: got bad helo");
                                close_mta_connection (cd, FALSE);
-                               return;
+                               return FALSE;
                        }
                        r = snprintf (outbuf, sizeof (outbuf), "MAIL FROM: <%s>" CRLF, cd->task->from);
                        rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
@@ -354,7 +354,7 @@ mta_read_socket (f_str_t *in, void *arg)
                        if (!parse_mta_str (in, cd)) {
                                msg_warn ("mta_read_socket: got bad mail from");
                                close_mta_connection (cd, FALSE);
-                               return;
+                               return FALSE;
                        }
                        cur = g_list_first (cd->task->rcpt);
                        r = 0;
@@ -370,7 +370,7 @@ mta_read_socket (f_str_t *in, void *arg)
                        if (!parse_mta_str (in, cd)) {
                                msg_warn ("mta_read_socket: got bad rcpt");
                                close_mta_connection (cd, FALSE);
-                               return;
+                               return FALSE;
                        }
                        r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
                        rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, FALSE);
@@ -380,7 +380,7 @@ mta_read_socket (f_str_t *in, void *arg)
                        if (!parse_mta_str (in, cd)) {
                                msg_warn ("mta_read_socket: got bad data");
                                close_mta_connection (cd, FALSE);
-                               return;
+                               return FALSE;
                        }
                        c = g_mime_object_to_string ((GMimeObject *)cd->task->message);
                        r = strlen (c);
@@ -393,11 +393,13 @@ mta_read_socket (f_str_t *in, void *arg)
                        if (!parse_mta_str (in, cd)) {
                                msg_warn ("mta_read_socket: message not delivered");
                                close_mta_connection (cd, FALSE);
-                               return;
+                               return FALSE;
                        }
                        close_mta_connection (cd, TRUE);
                        break;
        }
+
+       return TRUE;
 }
 
 /*
index d31f09943b365d6bfa159f1173ab4e61337d58de..cb922252cdcdbdcf8918f20e792459c18cd4afd6 100644 (file)
@@ -16,6 +16,7 @@
 #include "filter.h"
 #include "buffer.h"
 #include "hash.h"
+#include "events.h"
 #include "util.h"
 
 /* Default values */
@@ -147,6 +148,7 @@ struct controller_session {
        void (*other_handler)(struct controller_session *session, 
                                                                f_str_t *in);                                   /**< other command handler to execute at the end of processing */
        void *other_data;                                                                                       /**< and its data                                                                       */
+    struct rspamd_async_session* s;                                                            /**< async session object                                                       */
 };
 
 typedef void (*controller_func_t)(char **args, struct controller_session *session);
@@ -181,6 +183,7 @@ struct worker_task {
        char *user;                                                                                                     /**< user to deliver                                                            */
        f_str_t *msg;                                                                                           /**< message buffer                                                                     */
        rspamd_io_dispatcher_t *dispatcher;                                                     /**< IO dispatcher object                                                       */
+    struct rspamd_async_session* s;                                                            /**< async session object                                                       */
        memcached_ctx_t *memc_ctx;                                                                      /**< memcached context associated with task                     */
        int parts_count;                                                                                        /**< mime parts count                                                           */
        GMimeMessage *message;                                                                          /**< message, parsed with GMime                                         */
index cd3641654a922220e9a608752a0bb2e8f9c2b653..4001a9744879b22e655e8ffacf1fbee8e9adc69e 100644 (file)
@@ -218,6 +218,20 @@ fuzzy_check_module_reconfig (struct config_file *cfg)
        return fuzzy_check_module_config (cfg);
 }
 
+static void
+fuzzy_io_fin (void *ud)
+{
+       struct fuzzy_client_session *session = ud;
+
+       event_del (&session->ev);
+       session->task->save.saved --;
+       if (session->task->save.saved == 0) {
+               /* Call other filters */
+               session->task->save.saved = 1;
+               process_filters (session->task);
+       }
+}
+
 static void
 fuzzy_io_callback (int fd, short what, void *arg)
 {
@@ -254,23 +268,22 @@ fuzzy_io_callback (int fd, short what, void *arg)
                msg_err ("fuzzy_io_callback: got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port,
                                        errno, strerror (errno));
        ok:
-               event_del (&session->ev);
                close (fd);
-               session->task->save.saved --;
-               if (session->task->save.saved == 0) {
-                       /* Call other filters */
-                       session->task->save.saved = 1;
-                       process_filters (session->task);
-               }
+               remove_normal_event (session->task->s, fuzzy_io_fin, session);
 
 }
 
 static void
-fuzzy_free_session (void *arg)
+fuzzy_learn_fin (void *arg)
 {
        struct fuzzy_learn_session *session = arg;
 
        event_del (&session->ev);
+       (*session->saved) --;
+       if (*session->saved == 0) {
+               session->session->state = STATE_REPLY;
+               rspamd_dispatcher_write (session->session->dispatcher, "OK" CRLF, sizeof ("OK" CRLF) - 1, FALSE, FALSE);
+       }
 }
 
 static void
@@ -279,7 +292,6 @@ fuzzy_learn_callback (int fd, short what, void *arg)
        struct fuzzy_learn_session *session = arg;
        struct fuzzy_cmd cmd;
        char buf[sizeof ("ERR" CRLF)];
-       int r;
 
        if (what == EV_WRITE) {
                /* Send command to storage */
@@ -308,12 +320,7 @@ fuzzy_learn_callback (int fd, short what, void *arg)
                                        session->server->port, errno, strerror (errno));
        ok:
                close (fd);
-               (*session->saved) --;
-               if (*session->saved == 0) {
-                       session->session->state = WRITE_REPLY;
-                       r = snprintf (buf, sizeof (buf), "OK" CRLF);
-                       rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
-               }
+               remove_normal_event (session->session->s, fuzzy_learn_fin, session);
 }
 
 static void 
@@ -352,6 +359,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
                                session->task = task;
                                session->server = selected;
                                event_add (&session->ev, &session->tv);
+                               register_async_event (task->s, fuzzy_io_fin, session, FALSE);
                                task->save.saved ++;
                        }
                }
@@ -378,14 +386,14 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
        session->state = STATE_WAIT;
 
        task->msg = in;
-       r = process_message (task);
        saved = memory_pool_alloc0 (session->session_pool, sizeof (int));
+       r = process_message (task);
        if (r == -1) {
                msg_warn ("read_socket: processing of message failed");
-               task->last_error = "MIME processing error";
-               task->error_code = RSPAMD_FILTER_ERROR;
                free_task (task, FALSE);
-               session->state = WRITE_REPLY;
+               session->state = STATE_REPLY;
+               r = snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
+               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
                return;
        }
        else {
@@ -406,9 +414,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
                        if (selected) {
                                if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
                                        msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+                                       session->state = STATE_REPLY;
                                        r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
                                        rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
-                                       session->state = WRITE_REPLY;
                                        free_task (task, FALSE);
                                        return;
                                }       
@@ -425,15 +433,15 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
                                        s->cmd = cmd;
                                        s->saved = saved;
                                        event_add (&s->ev, &s->tv);
-                                       memory_pool_add_destructor (session->session_pool, fuzzy_free_session, s);
                                        (*saved) ++;
+                                       register_async_event (session->s, fuzzy_learn_fin, s, FALSE);
                                }
                        }
                        else {
+                               session->state = STATE_REPLY;
                                r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
                                rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
                                free_task (task, FALSE);
-                               session->state = WRITE_REPLY;
                                return;
                        }
                        cur = g_list_next (cur);
@@ -442,9 +450,9 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
 
        free_task (task, FALSE);
        if (*saved == 0) {
+               session->state = STATE_REPLY;
                r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
                rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
-               session->state = WRITE_REPLY;
        }
 }
 
@@ -460,7 +468,7 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
                msg_info ("fuzzy_controller_handler: empty content length");
                r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
                rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
-               session->state = WRITE_REPLY;
+               session->state = STATE_REPLY;
                return;
        }
 
@@ -468,7 +476,7 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
        if (err_str && *err_str != '\0') {
                r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
                rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
-               session->state = WRITE_REPLY;
+               session->state = STATE_REPLY;
                return;
        }
 
index 10d6e9fa957483563d124e4bc00e09d6de95d5f0..75d0f0f3bb9c04a15283df8b4c74fbfb15248a6d 100644 (file)
@@ -447,6 +447,7 @@ make_surbl_requests (struct uri* url, struct worker_task *task, GTree *tree, str
                 msg_debug ("surbl_test_url: send surbl dns request %s", surbl_req);
                 if (evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param) == 0) {
                     param->task->save.saved ++;
+                                       register_async_event (task->s, (event_finalizer_t)dns_callback, NULL, TRUE);
                 }
             }
             else {
@@ -529,6 +530,7 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
         param->task->save.saved = 1;
         process_filters (param->task);
     }
+       remove_forced_event (param->task->s, (event_finalizer_t)dns_callback);
 
 }
 
@@ -649,6 +651,23 @@ register_memcached_call (struct uri *url, struct worker_task *task, GTree *url_t
     memc_init_ctx (param->ctx);
 }
 
+static void
+free_redirector_session (void *ud)
+{
+    struct redirector_param *param = (struct redirector_param *)ud;
+
+       event_del (&param->ev);
+       close (param->sock);
+       param->task->save.saved --;
+       make_surbl_requests (param->url, param->task, param->tree, param->suffix);
+       if (param->task->save.saved == 0) {
+               /* Call other filters */
+               param->task->save.saved = 1;
+               process_filters (param->task);
+       }
+       
+}
+
 static void
 redirector_callback (int fd, short what, void *arg)
 {
@@ -671,32 +690,16 @@ redirector_callback (int fd, short what, void *arg)
                 r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url));
                 if (write (param->sock, url_buf, r) == -1) {
                     msg_err ("redirector_callback: write failed %s", strerror (errno));
-                    event_del (&param->ev);
-                    close (fd);
-                    param->task->save.saved --;
-                    make_surbl_requests (param->url, param->task, param->tree, param->suffix);
-                    if (param->task->save.saved == 0) {
-                        /* Call other filters */
-                        param->task->save.saved = 1;
-                        process_filters (param->task);
-                    }
+                                       remove_normal_event (param->task->s, free_redirector_session, param);
                     return;
                 }
                 param->state = STATE_READ;
             }
             else {
-                event_del (&param->ev);
-                close (fd);
                 msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write",
                             param->task->message_id);
-                param->task->save.saved --;
-                make_surbl_requests (param->url, param->task, param->tree, param->suffix);
-
-                if (param->task->save.saved == 0) {
-                    /* Call other filters */
-                    param->task->save.saved = 1;
-                    process_filters (param->task);
-                }
+                               remove_normal_event (param->task->s, free_redirector_session, param);
+                               return;
             }
             break;
         case STATE_READ:
@@ -717,28 +720,12 @@ redirector_callback (int fd, short what, void *arg)
                         parse_uri (param->url, memory_pool_strdup (param->task->task_pool, c), param->task->task_pool);
                     }
                 }
-                event_del (&param->ev);
-                close (fd);
-                param->task->save.saved --;
-                make_surbl_requests (param->url, param->task, param->tree, param->suffix);
-                if (param->task->save.saved == 0) {
-                    /* Call other filters */
-                    param->task->save.saved = 1;
-                    process_filters (param->task);
-                }
+                               remove_normal_event (param->task->s, free_redirector_session, param);
             }
             else {
-                event_del (&param->ev);
-                close (fd);
                 msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read",
                             param->task->message_id);
-                param->task->save.saved --;
-                make_surbl_requests (param->url, param->task, param->tree, param->suffix);
-                if (param->task->save.saved == 0) {
-                    /* Call other filters */
-                    param->task->save.saved = 1;
-                    process_filters (param->task);
-                }
+                               remove_normal_event (param->task->s, free_redirector_session, param);
             }
             break;
     }
@@ -774,6 +761,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
     timeout->tv_usec = surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000;
     event_set (&param->ev, s, EV_WRITE, redirector_callback, (void *)param);
     event_add (&param->ev, timeout);
+       register_async_event (task->s, free_redirector_session, param, FALSE);
 }
 
 static gboolean
index 9e9b739382c1e2337ba53239bfa4a6f6ac29540a..b900eada6f020867dda7fc8817c8218d3f58b404 100644 (file)
@@ -52,7 +52,7 @@ extern PerlInterpreter *perl_interpreter;
 
 static struct timeval io_tv;
 
-static void write_socket (void *arg);
+static gboolean write_socket (void *arg);
 
 static 
 void sig_handler (int signo)
@@ -147,10 +147,18 @@ free_task (struct worker_task *task, gboolean is_soft)
        }
 }
 
+static void 
+free_task_hard (void *ud)
+{
+       struct worker_task *task = ud;
+
+       free_task (task, FALSE);
+}
+
 /*
  * Callback that is called when there is data to read in buffer
  */
-static void
+static gboolean
 read_socket (f_str_t *in, void *arg)
 {
        struct worker_task *task = (struct worker_task *)arg;
@@ -185,7 +193,7 @@ read_socket (f_str_t *in, void *arg)
                                /* Skip filters */
                                task->state = WRITE_REPLY;
                                write_socket (task);
-                               return;
+                               return TRUE;
                        }
                        r = process_filters (task);
                        if (r == -1) {
@@ -207,12 +215,14 @@ read_socket (f_str_t *in, void *arg)
                        msg_debug ("read_socket: invalid state on reading stage");
                        break;
        }
+
+       return TRUE;
 }
 
 /*
  * Callback for socket writing
  */
-static void
+static gboolean 
 write_socket (void *arg)
 {
        struct worker_task *task = (struct worker_task *)arg;
@@ -220,21 +230,26 @@ write_socket (void *arg)
        switch (task->state) {
                case WRITE_REPLY:
                        write_reply (task);
-                       task->state = CLOSING_CONNECTION;
+                       destroy_session (task->s);
+                       return FALSE;
                        break;
                case WRITE_ERROR:
                        write_reply (task);
-                       task->state = CLOSING_CONNECTION;
+                       destroy_session (task->s);
+                       return FALSE;
                        break;
                case CLOSING_CONNECTION:
                        msg_debug ("write_socket: normally closing connection");
-                       free_task (task, TRUE);
+                       destroy_session (task->s);
+                       return FALSE;
                        break;
                default:
                        msg_info ("write_socket: abnormally closing connection");
-                       free_task (task, TRUE);
+                       destroy_session (task->s);
+                       return FALSE;
                        break;
        }
+       return TRUE;
 }
 
 /*
@@ -246,7 +261,7 @@ err_socket (GError *err, void *arg)
        struct worker_task *task = (struct worker_task *)arg;
        msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
        /* Free buffers */
-       free_task (task, FALSE);
+       destroy_session (task->s);
 }
 
 struct worker_task *
@@ -279,6 +294,7 @@ construct_task (struct rspamd_worker *worker)
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
        new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
+       new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task);
 
        return new_task;
 }