diff options
Diffstat (limited to 'src/plugins/fuzzy_check.c')
-rw-r--r-- | src/plugins/fuzzy_check.c | 299 |
1 files changed, 129 insertions, 170 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 2c91869d6..75df2a645 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -47,6 +47,7 @@ #include "libstat/stat_api.h" #include <math.h> #include <src/libmime/message.h> +#include "libutil/libev_helper.h" #define DEFAULT_SYMBOL "R_FUZZY_HASH" @@ -129,9 +130,8 @@ struct fuzzy_client_session { struct rspamd_symcache_item *item; struct upstream *server; struct fuzzy_rule *rule; - struct event ev; - struct event timev; - struct timeval tv; + struct ev_loop *event_loop; + struct rspamd_io_ev ev; gint state; gint fd; guint retransmits; @@ -146,9 +146,8 @@ struct fuzzy_learn_session { struct upstream *server; struct fuzzy_rule *rule; struct rspamd_task *task; - struct event ev; - struct event timev; - struct timeval tv; + struct ev_loop *event_loop; + struct rspamd_io_ev ev; gint fd; guint retransmits; }; @@ -1185,8 +1184,7 @@ fuzzy_io_fin (void *ud) g_ptr_array_free (session->results, TRUE); } - event_del (&session->ev); - event_del (&session->timev); + rspamd_ev_watcher_stop (session->event_loop, &session->ev); close (session->fd); } @@ -2181,13 +2179,49 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session) return FALSE; } +/* Fuzzy check timeout callback */ +static void +fuzzy_check_timer_callback (gint fd, short what, void *arg) +{ + struct fuzzy_client_session *session = arg; + struct rspamd_task *task; + + task = session->task; + + /* We might be here because of other checks being slow */ + if (fuzzy_check_try_read (session) > 0) { + if (fuzzy_check_session_is_completed (session)) { + return; + } + } + + if (session->retransmits >= session->rule->ctx->retransmits) { + msg_err_task ("got IO timeout with server %s(%s), after %d retransmits", + rspamd_upstream_name (session->server), + rspamd_inet_address_to_string_pretty ( + rspamd_upstream_addr_cur (session->server)), + session->retransmits); + rspamd_upstream_fail (session->server, TRUE); + + if (session->item) { + rspamd_symcache_item_async_dec_check (session->task, session->item, M); + } + rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); + } + else { + /* Plan write event */ + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ|EV_WRITE); + session->retransmits ++; + } +} + /* Fuzzy check callback */ static void fuzzy_check_io_callback (gint fd, short what, void *arg) { struct fuzzy_client_session *session = arg; struct rspamd_task *task; - struct ev_loop *ev_base; gint r; enum { @@ -2224,18 +2258,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) } } else { - /* Should not happen */ - g_assert (0); + fuzzy_check_timer_callback (fd, what, arg); + return; } if (ret == return_want_more) { /* Processed write, switch to reading */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_READ, - fuzzy_check_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); } else if (ret == return_error) { /* Error state */ @@ -2258,78 +2288,82 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) /* Read something from network */ if (!fuzzy_check_session_is_completed (session)) { /* Need to read more */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, session->fd, EV_READ, - fuzzy_check_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); } } } -/* Fuzzy check timeout callback */ + static void -fuzzy_check_timer_callback (gint fd, short what, void *arg) +fuzzy_lua_fin (void *ud) { - struct fuzzy_client_session *session = arg; + struct fuzzy_learn_session *session = ud; + + (*session->saved)--; + + rspamd_ev_watcher_stop (session->event_loop, &session->ev); + close (session->fd); +} + +/* Controller IO */ + +static void +fuzzy_controller_timer_callback (gint fd, short what, void *arg) +{ + struct fuzzy_learn_session *session = arg; struct rspamd_task *task; - struct ev_loop *ev_base; task = session->task; - /* We might be here because of other checks being slow */ - if (fuzzy_check_try_read (session) > 0) { - if (fuzzy_check_session_is_completed (session)) { - return; - } - } - if (session->retransmits >= session->rule->ctx->retransmits) { - msg_err_task ("got IO timeout with server %s(%s), after %d retransmits", + rspamd_upstream_fail (session->server, TRUE); + msg_err_task_check ("got IO timeout with server %s(%s), " + "after %d retransmits", rspamd_upstream_name (session->server), rspamd_inet_address_to_string_pretty ( rspamd_upstream_addr_cur (session->server)), session->retransmits); - rspamd_upstream_fail (session->server, TRUE); - if (session->item) { - rspamd_symcache_item_async_dec_check (session->task, session->item, M); + if (session->session) { + rspamd_session_remove_event (session->session, fuzzy_lua_fin, + session); + } + else { + if (session->http_entry) { + rspamd_controller_send_error (session->http_entry, + 500, "IO timeout with fuzzy storage"); + } + + if (*session->saved > 0 ) { + (*session->saved)--; + if (*session->saved == 0) { + if (session->http_entry) { + rspamd_task_free (session->task); + } + + session->task = NULL; + } + } + + if (session->http_entry) { + rspamd_http_connection_unref (session->http_entry->conn); + } + + rspamd_ev_watcher_stop (session->event_loop, + &session->ev); + close (session->fd); } - rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); } else { /* Plan write event */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_WRITE|EV_READ, - fuzzy_check_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); - - /* Plan new retransmit timer */ - ev_base = event_get_base (&session->timev); - event_del (&session->timev); - event_base_set (ev_base, &session->timev); - event_add (&session->timev, &session->tv); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ|EV_WRITE); session->retransmits ++; } } static void -fuzzy_lua_fin (void *ud) -{ - struct fuzzy_learn_session *session = ud; - - (*session->saved)--; - - event_del (&session->ev); - event_del (&session->timev); - close (session->fd); -} - -/* Controller IO */ -static void fuzzy_controller_io_callback (gint fd, short what, void *arg) { struct fuzzy_learn_session *session = arg; @@ -2340,7 +2374,6 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) struct fuzzy_cmd_io *io; struct rspamd_fuzzy_cmd *cmd = NULL; const gchar *symbol, *ftype; - struct ev_loop *ev_base; gint r; enum { return_error = 0, @@ -2355,7 +2388,8 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) if (what & EV_READ) { if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); return; } @@ -2482,16 +2516,14 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) } } else { - g_assert (0); + fuzzy_controller_timer_callback (fd, what, arg); + + return; } if (ret == return_want_more) { - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_READ, - fuzzy_controller_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); return; } @@ -2518,8 +2550,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) rspamd_http_connection_unref (session->http_entry->conn); } - event_del (&session->ev); - event_del (&session->timev); + rspamd_ev_watcher_stop (session->event_loop, &session->ev); close (session->fd); if (*session->saved == 0) { @@ -2555,7 +2586,6 @@ cleanup: if (session->http_entry) { ucl_object_t *reply, *hashes; - guint i; gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1]; reply = ucl_object_typed_new (UCL_OBJECT); @@ -2588,72 +2618,6 @@ cleanup: } -static void -fuzzy_controller_timer_callback (gint fd, short what, void *arg) -{ - struct fuzzy_learn_session *session = arg; - struct rspamd_task *task; - struct ev_loop *ev_base; - - task = session->task; - - if (session->retransmits >= session->rule->ctx->retransmits) { - rspamd_upstream_fail (session->server, TRUE); - msg_err_task_check ("got IO timeout with server %s(%s), " - "after %d retransmits", - rspamd_upstream_name (session->server), - rspamd_inet_address_to_string_pretty ( - rspamd_upstream_addr_cur (session->server)), - session->retransmits); - - if (session->session) { - rspamd_session_remove_event (session->session, fuzzy_lua_fin, - session); - } - else { - if (session->http_entry) { - rspamd_controller_send_error (session->http_entry, - 500, "IO timeout with fuzzy storage"); - } - - if (*session->saved > 0 ) { - (*session->saved)--; - if (*session->saved == 0) { - if (session->http_entry) { - rspamd_task_free (session->task); - } - - session->task = NULL; - } - } - - if (session->http_entry) { - rspamd_http_connection_unref (session->http_entry->conn); - } - - event_del (&session->ev); - event_del (&session->timev); - close (session->fd); - } - } - else { - /* Plan write event */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_WRITE|EV_READ, - fuzzy_controller_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); - - /* Plan new retransmit timer */ - ev_base = event_get_base (&session->timev); - event_del (&session->timev); - event_base_set (ev_base, &session->timev); - event_add (&session->timev, &session->tv); - session->retransmits ++; - } -} - static GPtrArray * fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule, gint c, gint flag, guint32 value, guint flags) @@ -2774,7 +2738,6 @@ register_fuzzy_client_call (struct rspamd_task *task, session = rspamd_mempool_alloc0 (task->task_pool, sizeof (struct fuzzy_client_session)); - msec_to_tv (rule->ctx->io_timeout, &session->tv); session->state = 0; session->commands = commands; session->task = task; @@ -2782,16 +2745,15 @@ register_fuzzy_client_call (struct rspamd_task *task, session->server = selected; session->rule = rule; session->results = g_ptr_array_sized_new (32); + session->event_loop = task->event_loop; - event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback, - session); - event_base_set (session->task->event_loop, &session->ev); - event_add (&session->ev, NULL); - - evtimer_set (&session->timev, fuzzy_check_timer_callback, + rspamd_ev_watcher_init (&session->ev, + sock, + EV_WRITE, + fuzzy_check_io_callback, session); - event_base_set (session->task->event_loop, &session->timev); - event_add (&session->timev, &session->tv); + rspamd_ev_watcher_start (session->event_loop, &session->ev, + ((double)rule->ctx->io_timeout) / 1000.0); rspamd_session_add_event (task->s, fuzzy_io_fin, session, M); session->item = rspamd_symcache_get_cur_item (task); @@ -2881,7 +2843,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, struct rspamd_controller_session *session = entry->ud; gint sock; gint ret = -1; - struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg); /* Get upstream */ @@ -2899,7 +2860,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, rspamd_mempool_alloc0 (session->pool, sizeof (struct fuzzy_learn_session)); - msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv); s->task = task; s->commands = commands; s->http_entry = entry; @@ -2908,17 +2868,17 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, s->fd = sock; s->err = err; s->rule = rule; + s->event_loop = task->event_loop; /* We ref connection to avoid freeing before we process fuzzy rule */ rspamd_http_connection_ref (entry->conn); - event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); - event_base_set (entry->rt->event_loop, &s->ev); - event_add (&s->ev, NULL); - - evtimer_set (&s->timev, fuzzy_controller_timer_callback, + rspamd_ev_watcher_init (&s->ev, + sock, + EV_WRITE, + fuzzy_controller_io_callback, s); - event_base_set (s->task->event_loop, &s->timev); - event_add (&s->timev, &s->tv); + rspamd_ev_watcher_start (s->event_loop, &s->ev, + ((double)rule->ctx->io_timeout) / 1000.0); (*saved)++; ret = 1; @@ -3258,8 +3218,6 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, s = rspamd_mempool_alloc0 (task->task_pool, sizeof (struct fuzzy_learn_session)); - - msec_to_tv (rule->ctx->io_timeout, &s->tv); s->task = task; s->commands = commands; s->http_entry = NULL; @@ -3269,14 +3227,15 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, s->err = err; s->rule = rule; s->session = task->s; + s->event_loop = task->event_loop; - event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); - event_base_set (task->event_loop, &s->ev); - event_add (&s->ev, NULL); - - evtimer_set (&s->timev, fuzzy_controller_timer_callback, s); - event_base_set (s->task->event_loop, &s->timev); - event_add (&s->timev, &s->tv); + rspamd_ev_watcher_init (&s->ev, + sock, + EV_WRITE, + fuzzy_controller_io_callback, + s); + rspamd_ev_watcher_start (s->event_loop, &s->ev, + ((double)rule->ctx->io_timeout) / 1000.0); rspamd_session_add_event (task->s, fuzzy_lua_fin, @@ -3367,7 +3326,7 @@ static gint fuzzy_lua_learn_handler (lua_State *L) { struct rspamd_task *task = lua_check_task (L, 1); - guint flag = 0, weight = 1.0, send_flags = 0; + guint flag = 0, weight = 1, send_flags = 0; const gchar *symbol; struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg); |