From e66c993097278b8a9df7fc3c213e74d7753db44a Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 23 Apr 2024 15:37:18 +0100 Subject: [PATCH] [Project] Implement fuzzy check retransmits backpressure --- src/plugins/fuzzy_check.c | 152 ++++++++++++++++++++++++-------------- 1 file changed, 96 insertions(+), 56 deletions(-) diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index d91b4f8dd..00c45a442 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -139,7 +139,7 @@ struct fuzzy_client_session { struct rspamd_io_ev ev; int state; int fd; - unsigned int retransmits; + int retransmits; }; struct fuzzy_learn_session { @@ -157,7 +157,7 @@ struct fuzzy_learn_session { struct ev_loop *event_loop; struct rspamd_io_ev ev; int fd; - unsigned int retransmits; + int retransmits; }; #define FUZZY_CMD_FLAG_REPLIED (1 << 0) @@ -208,11 +208,11 @@ module_t fuzzy_check_module = { (unsigned int) -1, }; -INIT_LOG_MODULE(N) -#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \ - task ? task->from_addr : NULL, \ - rspamd_task_log_id, M, task ? task->task_pool->tag.uid : NULL, \ - RSPAMD_LOG_FUNC, \ +INIT_LOG_MODULE(fuzzy_check) +#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \ + task ? task->from_addr : NULL, \ + rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \ + RSPAMD_LOG_FUNC, \ __VA_ARGS__) static inline struct fuzzy_ctx * @@ -2660,25 +2660,45 @@ fuzzy_check_timer_callback(int fd, short what, void *arg) } } - if (session->retransmits >= session->rule->retransmits) { - msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits", - rspamd_upstream_name(session->server), - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->server)), - session->retransmits, - session->rule->retransmits); - rspamd_upstream_fail(session->server, TRUE, "timeout"); + if (session->retransmits >= 0) { + if (session->retransmits >= session->rule->retransmits) { + msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits", + rspamd_upstream_name(session->server), + rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->server)), + session->retransmits, + session->rule->retransmits); + rspamd_upstream_fail(session->server, TRUE, "timeout"); - if (session->item) { - rspamd_symcache_item_async_dec_check(session->task, session->item, M); + if (session->item) { + rspamd_symcache_item_async_dec_check(session->task, session->item, M); + } + rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); + } + else { + /* Apply backpressure here: at least 100ms up to 500ms */ + session->retransmits++; + double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits; + /* Do not make delay more than 500ms for performance considerations */ + backpressure_time = MIN(backpressure_time, 0.5); + /* Inverse to distinguish */ + msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;", + backpressure_time * 1000, + rspamd_upstream_name(session->server), + session->retransmits); + session->retransmits = -(session->retransmits); + /* Disable write event for the time of backpressure */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ, + backpressure_time); } - rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); } else { /* Plan write event */ - rspamd_ev_watcher_reschedule(session->event_loop, - &session->ev, EV_READ | EV_WRITE); - session->retransmits++; + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ | EV_WRITE, + session->rule->io_timeout); + session->retransmits = -(session->retransmits); } } @@ -2801,51 +2821,71 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg) task = session->task; - if (session->retransmits >= session->rule->retransmits) { - rspamd_upstream_fail(session->server, TRUE, "timeout"); - msg_err_task_check("got IO timeout with server %s(%s), " - "after %d/%d retransmits", - rspamd_upstream_name(session->server), - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->server)), - session->retransmits, - session->rule->retransmits); - - if (session->session) { - rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin, - session); - } - else { - if (session->http_entry) { - rspamd_controller_send_error(session->http_entry, - 500, "IO timeout with fuzzy storage"); + if (session->retransmits >= 0) { + if (session->retransmits >= session->rule->retransmits) { + rspamd_upstream_fail(session->server, TRUE, "timeout"); + msg_err_task_check("got IO timeout with server %s(%s), " + "after %d/%d retransmits", + rspamd_upstream_name(session->server), + rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->server)), + session->retransmits, + session->rule->retransmits); + + if (session->session) { + rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin, + session); } + else { + if (session->http_entry) { + rspamd_controller_send_error(session->http_entry, + 500, "IO timeout with fuzzy storage"); + } - if (*session->saved > 0) { - (*session->saved)--; - if (*session->saved == 0) { - if (session->http_entry) { - rspamd_task_free(session->task); + if (*session->saved > 0) { + (*session->saved)--; + if (*session->saved == 0) { + if (session->http_entry) { + rspamd_task_free(session->task); + } + + session->task = NULL; } + } - session->task = NULL; + if (session->http_entry) { + rspamd_http_connection_unref(session->http_entry->conn); } - } - if (session->http_entry) { - rspamd_http_connection_unref(session->http_entry->conn); + rspamd_ev_watcher_stop(session->event_loop, + &session->ev); + close(session->fd); } - - rspamd_ev_watcher_stop(session->event_loop, - &session->ev); - close(session->fd); + } + else { + /* Apply backpressure here: at least 100ms up to 500ms */ + session->retransmits++; + double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits; + /* Do not make delay more than 500ms for performance considerations */ + backpressure_time = MIN(backpressure_time, 0.5); + /* Inverse to distinguish */ + msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;", + backpressure_time * 1000, + rspamd_upstream_name(session->server), + session->retransmits); + session->retransmits = -(session->retransmits); + /* Disable write event for the time of backpressure */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ, + backpressure_time); } } else { - /* Plan write event */ - rspamd_ev_watcher_reschedule(session->event_loop, - &session->ev, EV_READ | EV_WRITE); - session->retransmits++; + /* Backpressure done, plan write event */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ | EV_WRITE, + session->rule->io_timeout); + session->retransmits = -(session->retransmits); } } -- 2.39.5