From b7de31588d06d254754fcb46381181d9cb91e88e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 5 Oct 2015 12:08:32 +0100 Subject: [PATCH] Implement retransmits for fuzzy_check plugin. --- src/plugins/fuzzy_check.c | 222 +++++++++++++++++++++++++++++--------- 1 file changed, 172 insertions(+), 50 deletions(-) diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 5b2190208..cdd575136 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -60,6 +60,7 @@ #define DEFAULT_UPSTREAM_MAXERRORS 10 #define DEFAULT_IO_TIMEOUT 500 +#define DEFAULT_RETRANSMITS 3 #define DEFAULT_PORT 11335 struct fuzzy_mapping { @@ -100,30 +101,35 @@ struct fuzzy_ctx { guint32 min_height; guint32 min_width; guint32 io_timeout; + guint32 retransmits; }; struct fuzzy_client_session { - gint state; GPtrArray *commands; - struct event ev; - struct timeval tv; struct rspamd_task *task; struct upstream *server; struct fuzzy_rule *rule; + struct event ev; + struct event timev; + struct timeval tv; + gint state; gint fd; + guint retransmits; }; struct fuzzy_learn_session { - struct event ev; GPtrArray *commands; gint *saved; GError **err; - struct timeval tv; struct rspamd_http_connection_entry *http_entry; struct upstream *server; struct fuzzy_rule *rule; struct rspamd_task *task; + struct event ev; + struct event timev; + struct timeval tv; gint fd; + guint retransmits; }; struct fuzzy_cmd_io { @@ -508,6 +514,16 @@ fuzzy_check_module_config (struct rspamd_config *cfg) fuzzy_module_ctx->io_timeout = DEFAULT_IO_TIMEOUT; } + if ((value = + rspamd_config_get_module_opt (cfg, + "fuzzy_check", + "retransmits")) != NULL) { + fuzzy_module_ctx->retransmits = ucl_obj_toint (value); + } + else { + fuzzy_module_ctx->retransmits = DEFAULT_RETRANSMITS; + } + if ((value = rspamd_config_get_module_opt (cfg, "fuzzy_check", "whitelist")) != NULL) { @@ -567,7 +583,9 @@ fuzzy_io_fin (void *ud) if (session->commands) { g_ptr_array_free (session->commands, TRUE); } + event_del (&session->ev); + event_del (&session->timev); close (session->fd); } @@ -767,16 +785,20 @@ fuzzy_cmd_vector_to_wire (gint fd, GPtrArray *v) { guint i; struct fuzzy_cmd_io *io; + gboolean processed = FALSE; for (i = 0; i < v->len; i ++) { io = g_ptr_array_index (v, i); - if (!fuzzy_cmd_to_wire (fd, &io->io)) { - return FALSE; + if (!io->replied) { + if (!fuzzy_cmd_to_wire (fd, &io->io)) { + return FALSE; + } + processed = TRUE; } } - return TRUE; + return processed; } /* @@ -852,9 +874,9 @@ fuzzy_process_reply (guchar **pos, gint *r, GPtrArray *req, return NULL; } -/* Call this whenever we got data from fuzzy storage */ +/* Fuzzy check callback */ static void -fuzzy_io_callback (gint fd, short what, void *arg) +fuzzy_check_io_callback (gint fd, short what, void *arg) { struct fuzzy_client_session *session = arg; const struct rspamd_fuzzy_reply *rep; @@ -875,10 +897,6 @@ fuzzy_io_callback (gint fd, short what, void *arg) ret = -1; } else { - event_del (&session->ev); - event_set (&session->ev, fd, EV_READ | EV_PERSIST, - fuzzy_io_callback, session); - event_add (&session->ev, &session->tv); session->state = 1; ret = 0; } @@ -935,14 +953,19 @@ fuzzy_io_callback (gint fd, short what, void *arg) } } else { - errno = ETIMEDOUT; - ret = -1; + /* Should not happen */ + g_assert (0); } if (ret == 0) { - return; + /* Processed write, switch to reading */ + event_del (&session->ev); + event_set (&session->ev, fd, EV_READ, + fuzzy_check_io_callback, session); + event_add (&session->ev, NULL); } else if (ret == -1) { + /* Error state */ msg_err_task ("got error on IO with server %s, on %s, %d, %s", rspamd_upstream_name (session->server), session->state == 1 ? "read" : "write", @@ -952,6 +975,7 @@ fuzzy_io_callback (gint fd, short what, void *arg) rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); } else { + /* Read something from network */ rspamd_upstream_ok (session->server); guint nreplied = 0; @@ -966,11 +990,49 @@ fuzzy_io_callback (gint fd, short what, void *arg) if (nreplied == session->commands->len) { rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); } + else { + /* Need to read more */ + event_del (&session->ev); + event_set (&session->ev, fd, EV_READ, + fuzzy_check_io_callback, session); + event_add (&session->ev, NULL); + } + } +} + +/* Fuzzy check timeout callback */ +static void +fuzzy_check_timer_callback (gint fd, short what, void *arg) +{ + struct fuzzy_client_session *session = arg; + struct rspamd_task *task; + + task = session->task; + + if (session->retransmits >= fuzzy_module_ctx->retransmits) { + msg_err_task ("got IO timeout with server %s, after %d retransmits", + rspamd_upstream_name (session->server), + session->retransmits); + rspamd_upstream_fail (session->server); + rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); + } + else { + /* Plan write event */ + event_del (&session->ev); + event_set (&session->ev, fd, EV_WRITE, + fuzzy_check_io_callback, session); + event_add (&session->ev, NULL); + + /* Plan new retransmit timer */ + event_del (&session->timev); + event_add (&session->timev, &session->tv); + session->retransmits ++; } } +/* Controller IO */ static void -fuzzy_learn_callback (gint fd, short what, void *arg) +fuzzy_controller_io_callback (gint fd, short what, void *arg) { struct fuzzy_learn_session *session = arg; const struct rspamd_fuzzy_reply *rep; @@ -993,12 +1055,6 @@ fuzzy_learn_callback (gint fd, short what, void *arg) } ret = -1; } - else { - event_del (&session->ev); - event_set (&session->ev, fd, EV_READ | EV_PERSIST, - fuzzy_learn_callback, session); - event_add (&session->ev, &session->tv); - } } else if (what == EV_READ) { if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) { @@ -1053,43 +1109,90 @@ fuzzy_learn_callback (gint fd, short what, void *arg) } } else { - errno = ETIMEDOUT; - if (*(session->err) == NULL) { - g_set_error (session->err, - g_quark_from_static_string ("fuzzy check"), EINVAL, - "process fuzzy, IO timeout"); - } - ret = -1; + g_assert (0); } if (ret == 0) { - return; + event_del (&session->ev); + event_set (&session->ev, fd, EV_READ, + fuzzy_controller_io_callback, session); + event_add (&session->ev, NULL); } else if (ret == -1) { msg_err_task ("got error in IO with server %s, %d, %s", rspamd_upstream_name (session->server), errno, strerror (errno)); + + goto cleanup; + } + else { + (*session->saved)--; + + if (*session->saved == 0) { + goto cleanup; + } + else { + /* Read more */ + event_del (&session->ev); + event_set (&session->ev, fd, EV_READ, + fuzzy_controller_io_callback, session); + event_add (&session->ev, NULL); + } + } + + return; + +cleanup: + if (*(session->err) != NULL) { rspamd_upstream_fail (session->server); + rspamd_controller_send_error (session->http_entry, + (*session->err)->code, (*session->err)->message); + g_error_free (*session->err); } else { rspamd_upstream_ok (session->server); + rspamd_controller_send_string (session->http_entry, + "{\"success\":true}"); } - (*session->saved) --; + rspamd_task_free (session->task, TRUE); rspamd_http_connection_unref (session->http_entry->conn); event_del (&session->ev); + event_del (&session->timev); close (session->fd); +} - if (*session->saved == 0) { - if (*(session->err) != NULL) { - rspamd_controller_send_error (session->http_entry, - (*session->err)->code, (*session->err)->message); - g_error_free (*session->err); - } - else { - rspamd_controller_send_string (session->http_entry, - "{\"success\":true}"); - } +static void +fuzzy_controller_timer_callback (gint fd, short what, void *arg) +{ + struct fuzzy_learn_session *session = arg; + struct rspamd_task *task; + + task = session->task; + + if (session->retransmits >= fuzzy_module_ctx->retransmits) { + rspamd_upstream_fail (session->server); + rspamd_controller_send_error (session->http_entry, + 500, "IO timeout with fuzzy storage"); + msg_err_task ("got IO timeout with server %s, after %d retransmits", + rspamd_upstream_name (session->server), + session->retransmits); rspamd_task_free (session->task, TRUE); + rspamd_http_connection_unref (session->http_entry->conn); + event_del (&session->ev); + event_del (&session->timev); + close (session->fd); + } + else { + /* Plan write event */ + event_del (&session->ev); + event_set (&session->ev, fd, EV_WRITE, + fuzzy_controller_io_callback, session); + event_add (&session->ev, NULL); + + /* Plan new retransmit timer */ + event_del (&session->timev); + event_add (&session->timev, &session->tv); + session->retransmits ++; } } @@ -1223,7 +1326,7 @@ register_fuzzy_client_call (struct rspamd_task *task, else { /* Create session for a socket */ session = - rspamd_mempool_alloc (task->task_pool, + rspamd_mempool_alloc0 (task->task_pool, sizeof (struct fuzzy_client_session)); msec_to_tv (fuzzy_module_ctx->io_timeout, &session->tv); session->state = 0; @@ -1232,10 +1335,17 @@ register_fuzzy_client_call (struct rspamd_task *task, session->fd = sock; session->server = selected; session->rule = rule; - event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, + + event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback, session); event_base_set (session->task->ev_base, &session->ev); - event_add (&session->ev, &session->tv); + event_add (&session->ev, NULL); + + evtimer_set (&session->timev, fuzzy_check_timer_callback, + session); + event_base_set (session->task->ev_base, &session->timev); + event_add (&session->timev, &session->tv); + rspamd_session_add_event (task->s, fuzzy_io_fin, session, @@ -1298,10 +1408,9 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, } else { s = - rspamd_mempool_alloc (task->task_pool, + rspamd_mempool_alloc0 (task->task_pool, sizeof (struct fuzzy_learn_session)); - event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s); - event_base_set (entry->rt->ev_base, &s->ev); + msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv); s->task = task; s->commands = commands; @@ -1313,7 +1422,16 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, s->rule = rule; /* We ref connection to avoid freeing before we process fuzzy rule */ rspamd_http_connection_ref (entry->conn); - event_add (&s->ev, &s->tv); + + event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); + event_base_set (entry->rt->ev_base, &s->ev); + event_add (&s->ev, NULL); + + evtimer_set (&s->timev, fuzzy_controller_timer_callback, + s); + event_base_set (s->task->ev_base, &s->timev); + event_add (&s->timev, &s->tv); + (*saved)++; ret = TRUE; } @@ -1338,6 +1456,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent, /* Prepare task */ task = rspamd_task_new (NULL); task->cfg = ctx->cfg; + task->ev_base = conn_ent->rt->ev_base; /* Allocate message from string */ /* XXX: what about encrypted messsages ? */ @@ -1347,6 +1466,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent, saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint)); err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *)); r = rspamd_message_parse (task); + if (r == -1) { msg_warn_task ("<%s>: cannot process message for fuzzy", task->message_id); @@ -1355,7 +1475,9 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent, "Message processing error"); return; } + cur = fuzzy_module_ctx->fuzzy_rules; + while (cur && res) { rule = cur->data; -- 2.39.5