]> source.dussan.org Git - rspamd.git/commitdiff
Implement retransmits for fuzzy_check plugin.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 5 Oct 2015 11:08:32 +0000 (12:08 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 5 Oct 2015 11:08:32 +0000 (12:08 +0100)
src/plugins/fuzzy_check.c

index 5b2190208a0531f23f8b9a499b1e999657281808..cdd5751361a0465a0955d8c48d98bdf70d2cbc60 100644 (file)
@@ -60,6 +60,7 @@
 #define DEFAULT_UPSTREAM_MAXERRORS 10
 
 #define DEFAULT_IO_TIMEOUT 500
+#define DEFAULT_RETRANSMITS 3
 #define DEFAULT_PORT 11335
 
 struct fuzzy_mapping {
@@ -100,30 +101,35 @@ struct fuzzy_ctx {
        guint32 min_height;
        guint32 min_width;
        guint32 io_timeout;
+       guint32 retransmits;
 };
 
 struct fuzzy_client_session {
-       gint state;
        GPtrArray *commands;
-       struct event ev;
-       struct timeval tv;
        struct rspamd_task *task;
        struct upstream *server;
        struct fuzzy_rule *rule;
+       struct event ev;
+       struct event timev;
+       struct timeval tv;
+       gint state;
        gint fd;
+       guint retransmits;
 };
 
 struct fuzzy_learn_session {
-       struct event ev;
        GPtrArray *commands;
        gint *saved;
        GError **err;
-       struct timeval tv;
        struct rspamd_http_connection_entry *http_entry;
        struct upstream *server;
        struct fuzzy_rule *rule;
        struct rspamd_task *task;
+       struct event ev;
+       struct event timev;
+       struct timeval tv;
        gint fd;
+       guint retransmits;
 };
 
 struct fuzzy_cmd_io {
@@ -508,6 +514,16 @@ fuzzy_check_module_config (struct rspamd_config *cfg)
                fuzzy_module_ctx->io_timeout = DEFAULT_IO_TIMEOUT;
        }
 
+       if ((value =
+                                rspamd_config_get_module_opt (cfg,
+                                                "fuzzy_check",
+                                                "retransmits")) != NULL) {
+               fuzzy_module_ctx->retransmits = ucl_obj_toint (value);
+       }
+       else {
+               fuzzy_module_ctx->retransmits = DEFAULT_RETRANSMITS;
+       }
+
        if ((value =
                rspamd_config_get_module_opt (cfg, "fuzzy_check",
                "whitelist")) != NULL) {
@@ -567,7 +583,9 @@ fuzzy_io_fin (void *ud)
        if (session->commands) {
                g_ptr_array_free (session->commands, TRUE);
        }
+
        event_del (&session->ev);
+       event_del (&session->timev);
        close (session->fd);
 }
 
@@ -767,16 +785,20 @@ fuzzy_cmd_vector_to_wire (gint fd, GPtrArray *v)
 {
        guint i;
        struct fuzzy_cmd_io *io;
+       gboolean processed = FALSE;
 
        for (i = 0; i < v->len; i ++) {
                io = g_ptr_array_index (v, i);
 
-               if (!fuzzy_cmd_to_wire (fd, &io->io)) {
-                       return FALSE;
+               if (!io->replied) {
+                       if (!fuzzy_cmd_to_wire (fd, &io->io)) {
+                               return FALSE;
+                       }
+                       processed = TRUE;
                }
        }
 
-       return TRUE;
+       return processed;
 }
 
 /*
@@ -852,9 +874,9 @@ fuzzy_process_reply (guchar **pos, gint *r, GPtrArray *req,
        return NULL;
 }
 
-/* Call this whenever we got data from fuzzy storage */
+/* Fuzzy check callback */
 static void
-fuzzy_io_callback (gint fd, short what, void *arg)
+fuzzy_check_io_callback (gint fd, short what, void *arg)
 {
        struct fuzzy_client_session *session = arg;
        const struct rspamd_fuzzy_reply *rep;
@@ -875,10 +897,6 @@ fuzzy_io_callback (gint fd, short what, void *arg)
                        ret = -1;
                }
                else {
-                       event_del (&session->ev);
-                       event_set (&session->ev, fd, EV_READ | EV_PERSIST,
-                                       fuzzy_io_callback, session);
-                       event_add (&session->ev, &session->tv);
                        session->state = 1;
                        ret = 0;
                }
@@ -935,14 +953,19 @@ fuzzy_io_callback (gint fd, short what, void *arg)
                }
        }
        else {
-               errno = ETIMEDOUT;
-               ret = -1;
+               /* Should not happen */
+               g_assert (0);
        }
 
        if (ret == 0) {
-               return;
+               /* Processed write, switch to reading */
+               event_del (&session->ev);
+               event_set (&session->ev, fd, EV_READ,
+                               fuzzy_check_io_callback, session);
+               event_add (&session->ev, NULL);
        }
        else if (ret == -1) {
+               /* Error state */
                msg_err_task ("got error on IO with server %s, on %s, %d, %s",
                        rspamd_upstream_name (session->server),
                        session->state == 1 ? "read" : "write",
@@ -952,6 +975,7 @@ fuzzy_io_callback (gint fd, short what, void *arg)
                rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
        }
        else {
+               /* Read something from network */
                rspamd_upstream_ok (session->server);
                guint nreplied = 0;
 
@@ -966,11 +990,49 @@ fuzzy_io_callback (gint fd, short what, void *arg)
                if (nreplied == session->commands->len) {
                        rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
                }
+               else {
+                       /* Need to read more */
+                       event_del (&session->ev);
+                       event_set (&session->ev, fd, EV_READ,
+                                       fuzzy_check_io_callback, session);
+                       event_add (&session->ev, NULL);
+               }
+       }
+}
+
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+       struct fuzzy_client_session *session = arg;
+       struct rspamd_task *task;
+
+       task = session->task;
+
+       if (session->retransmits >= fuzzy_module_ctx->retransmits) {
+               msg_err_task ("got IO timeout with server %s, after %d retransmits",
+                               rspamd_upstream_name (session->server),
+                               session->retransmits);
+               rspamd_upstream_fail (session->server);
+               rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+       }
+       else {
+               /* Plan write event */
+               event_del (&session->ev);
+               event_set (&session->ev, fd, EV_WRITE,
+                               fuzzy_check_io_callback, session);
+               event_add (&session->ev, NULL);
+
+               /* Plan new retransmit timer */
+               event_del (&session->timev);
+               event_add (&session->timev, &session->tv);
+               session->retransmits ++;
        }
 }
 
+/* Controller IO */
 static void
-fuzzy_learn_callback (gint fd, short what, void *arg)
+fuzzy_controller_io_callback (gint fd, short what, void *arg)
 {
        struct fuzzy_learn_session *session = arg;
        const struct rspamd_fuzzy_reply *rep;
@@ -993,12 +1055,6 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
                        }
                        ret = -1;
                }
-               else {
-                       event_del (&session->ev);
-                       event_set (&session->ev, fd, EV_READ | EV_PERSIST,
-                                       fuzzy_learn_callback, session);
-                       event_add (&session->ev, &session->tv);
-               }
        }
        else if (what == EV_READ) {
                if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
@@ -1053,43 +1109,90 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
                }
        }
        else {
-               errno = ETIMEDOUT;
-               if (*(session->err) == NULL) {
-                       g_set_error (session->err,
-                               g_quark_from_static_string ("fuzzy check"), EINVAL,
-                               "process fuzzy, IO timeout");
-               }
-               ret = -1;
+               g_assert (0);
        }
 
        if (ret == 0) {
-               return;
+               event_del (&session->ev);
+               event_set (&session->ev, fd, EV_READ,
+                               fuzzy_controller_io_callback, session);
+               event_add (&session->ev, NULL);
        }
        else if (ret == -1) {
                msg_err_task ("got error in IO with server %s, %d, %s",
                                rspamd_upstream_name (session->server), errno, strerror (errno));
+
+               goto cleanup;
+       }
+       else {
+               (*session->saved)--;
+
+               if (*session->saved == 0) {
+                       goto cleanup;
+               }
+               else {
+                       /* Read more */
+                       event_del (&session->ev);
+                       event_set (&session->ev, fd, EV_READ,
+                                       fuzzy_controller_io_callback, session);
+                       event_add (&session->ev, NULL);
+               }
+       }
+
+       return;
+
+cleanup:
+       if (*(session->err) != NULL) {
                rspamd_upstream_fail (session->server);
+               rspamd_controller_send_error (session->http_entry,
+                               (*session->err)->code, (*session->err)->message);
+               g_error_free (*session->err);
        }
        else {
                rspamd_upstream_ok (session->server);
+               rspamd_controller_send_string (session->http_entry,
+                               "{\"success\":true}");
        }
 
-       (*session->saved) --;
+       rspamd_task_free (session->task, TRUE);
        rspamd_http_connection_unref (session->http_entry->conn);
        event_del (&session->ev);
+       event_del (&session->timev);
        close (session->fd);
+}
 
-       if (*session->saved == 0) {
-               if (*(session->err) != NULL) {
-                       rspamd_controller_send_error (session->http_entry,
-                               (*session->err)->code, (*session->err)->message);
-                       g_error_free (*session->err);
-               }
-               else {
-                       rspamd_controller_send_string (session->http_entry,
-                               "{\"success\":true}");
-               }
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+       struct fuzzy_learn_session *session = arg;
+       struct rspamd_task *task;
+
+       task = session->task;
+
+       if (session->retransmits >= fuzzy_module_ctx->retransmits) {
+               rspamd_upstream_fail (session->server);
+               rspamd_controller_send_error (session->http_entry,
+                               500, "IO timeout with fuzzy storage");
+               msg_err_task ("got IO timeout with server %s, after %d retransmits",
+                               rspamd_upstream_name (session->server),
+                               session->retransmits);
                rspamd_task_free (session->task, TRUE);
+               rspamd_http_connection_unref (session->http_entry->conn);
+               event_del (&session->ev);
+               event_del (&session->timev);
+               close (session->fd);
+       }
+       else {
+               /* Plan write event */
+               event_del (&session->ev);
+               event_set (&session->ev, fd, EV_WRITE,
+                               fuzzy_controller_io_callback, session);
+               event_add (&session->ev, NULL);
+
+               /* Plan new retransmit timer */
+               event_del (&session->timev);
+               event_add (&session->timev, &session->tv);
+               session->retransmits ++;
        }
 }
 
@@ -1223,7 +1326,7 @@ register_fuzzy_client_call (struct rspamd_task *task,
                else {
                        /* Create session for a socket */
                        session =
-                               rspamd_mempool_alloc (task->task_pool,
+                               rspamd_mempool_alloc0 (task->task_pool,
                                        sizeof (struct fuzzy_client_session));
                        msec_to_tv (fuzzy_module_ctx->io_timeout, &session->tv);
                        session->state = 0;
@@ -1232,10 +1335,17 @@ register_fuzzy_client_call (struct rspamd_task *task,
                        session->fd = sock;
                        session->server = selected;
                        session->rule = rule;
-                       event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback,
+
+                       event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
                                        session);
                        event_base_set (session->task->ev_base, &session->ev);
-                       event_add (&session->ev, &session->tv);
+                       event_add (&session->ev, NULL);
+
+                       evtimer_set (&session->timev, fuzzy_check_timer_callback,
+                                       session);
+                       event_base_set (session->task->ev_base, &session->timev);
+                       event_add (&session->timev, &session->tv);
+
                        rspamd_session_add_event (task->s,
                                fuzzy_io_fin,
                                session,
@@ -1298,10 +1408,9 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
                }
                else {
                        s =
-                               rspamd_mempool_alloc (task->task_pool,
+                               rspamd_mempool_alloc0 (task->task_pool,
                                        sizeof (struct fuzzy_learn_session));
-                       event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
-                       event_base_set (entry->rt->ev_base, &s->ev);
+
                        msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
                        s->task = task;
                        s->commands = commands;
@@ -1313,7 +1422,16 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
                        s->rule = rule;
                        /* We ref connection to avoid freeing before we process fuzzy rule */
                        rspamd_http_connection_ref (entry->conn);
-                       event_add (&s->ev, &s->tv);
+
+                       event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
+                       event_base_set (entry->rt->ev_base, &s->ev);
+                       event_add (&s->ev, NULL);
+
+                       evtimer_set (&s->timev, fuzzy_controller_timer_callback,
+                                       s);
+                       event_base_set (s->task->ev_base, &s->timev);
+                       event_add (&s->timev, &s->tv);
+
                        (*saved)++;
                        ret = TRUE;
                }
@@ -1338,6 +1456,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
        /* Prepare task */
        task = rspamd_task_new (NULL);
        task->cfg = ctx->cfg;
+       task->ev_base = conn_ent->rt->ev_base;
 
        /* Allocate message from string */
        /* XXX: what about encrypted messsages ? */
@@ -1347,6 +1466,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
        saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint));
        err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *));
        r = rspamd_message_parse (task);
+
        if (r == -1) {
                msg_warn_task ("<%s>: cannot process message for fuzzy",
                                task->message_id);
@@ -1355,7 +1475,9 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
                        "Message processing error");
                return;
        }
+
        cur = fuzzy_module_ctx->fuzzy_rules;
+
        while (cur && res) {
                rule = cur->data;