diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2024-04-24 14:49:47 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-24 14:49:47 +0600 |
commit | f4aa6206543418fca16331a8711b3ca710d87035 (patch) | |
tree | 728de17b138ca00b6ad3e2289a9ce6d38aa00b0e | |
parent | 980cd40f8df84b88af0a08694367daf2204b459f (diff) | |
parent | 990a9dc8db6682d45315b6952864747e3e0c521c (diff) | |
download | rspamd-f4aa6206543418fca16331a8711b3ca710d87035.tar.gz rspamd-f4aa6206543418fca16331a8711b3ca710d87035.zip |
Merge pull request #4939 from rspamd/vstakhov-fuzzy-backpressure
Implement backpressure for fuzzy check
-rw-r--r-- | src/libutil/libev_helper.c | 30 | ||||
-rw-r--r-- | src/libutil/libev_helper.h | 17 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 169 |
3 files changed, 154 insertions, 62 deletions
diff --git a/src/libutil/libev_helper.c b/src/libutil/libev_helper.c index 3b880aaa2..203e1ed73 100644 --- a/src/libutil/libev_helper.c +++ b/src/libutil/libev_helper.c @@ -108,4 +108,34 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop, ev_timer_start(EV_A, &ev->tm); } } +} + +void rspamd_ev_watcher_reschedule_at(struct ev_loop *loop, + struct rspamd_io_ev *ev, + short what, + ev_tstamp at) +{ + g_assert(ev->cb != NULL); + + if (ev_can_stop(&ev->io)) { + ev_io_stop(EV_A, &ev->io); + ev_io_set(&ev->io, ev->io.fd, what); + ev_io_start(EV_A, &ev->io); + } + else { + ev->io.data = ev; + ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what); + ev_io_start(EV_A, &ev->io); + } + + if (at > 0) { + if (!(ev_can_stop(&ev->tm))) { + /* Update timestamp to avoid timers running early */ + ev_now_update_if_cheap(loop); + + ev->tm.data = ev; + ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, at, 0.0); + ev_timer_start(EV_A, &ev->tm); + } + } }
\ No newline at end of file diff --git a/src/libutil/libev_helper.h b/src/libutil/libev_helper.h index 44d1604b0..d68f17951 100644 --- a/src/libutil/libev_helper.h +++ b/src/libutil/libev_helper.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2019 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -79,6 +79,17 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop, struct rspamd_io_ev *ev, short what); +/** + * Convenience function to reschedule watcher with different events and different timeout + * @param loop + * @param ev + * @param what + */ +void rspamd_ev_watcher_reschedule_at(struct ev_loop *loop, + struct rspamd_io_ev *ev, + short what, + ev_tstamp at); + #ifdef __cplusplus } #endif diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 75968ce84..a035eeaae 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -59,7 +59,7 @@ #define RSPAMD_FUZZY_PLUGIN_VERSION RSPAMD_FUZZY_VERSION static const int rspamd_fuzzy_hash_len = 5; -static const char *M = "fuzzy check"; +static const char *M = "fuzzy_check"; struct fuzzy_ctx; struct fuzzy_mapping { @@ -139,7 +139,7 @@ struct fuzzy_client_session { struct rspamd_io_ev ev; int state; int fd; - unsigned int retransmits; + int retransmits; }; struct fuzzy_learn_session { @@ -157,7 +157,7 @@ struct fuzzy_learn_session { struct ev_loop *event_loop; struct rspamd_io_ev ev; int fd; - unsigned int retransmits; + int retransmits; }; #define FUZZY_CMD_FLAG_REPLIED (1 << 0) @@ -208,6 +208,13 @@ module_t fuzzy_check_module = { (unsigned int) -1, }; +INIT_LOG_MODULE(fuzzy_check) +#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \ + task ? task->from_addr : NULL, \ + rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) + static inline struct fuzzy_ctx * fuzzy_get_context(struct rspamd_config *cfg) { @@ -1842,9 +1849,9 @@ fuzzy_cmd_from_text_part(struct rspamd_task *task, rspamd_cryptobox_hash_final(&st, shcmd->basic.digest); - msg_debug_task("loading shingles of type %s with key %*xs", - rule->algorithm_str, - 16, rule->shingles_key->str); + msg_debug_fuzzy_check("loading shingles of type %s with key %*xs", + rule->algorithm_str, + 16, rule->shingles_key->str); sh = rspamd_shingles_from_text(words, rule->shingles_key->str, task->task_pool, rspamd_shingles_default_filter, NULL, @@ -1987,7 +1994,7 @@ fuzzy_cmd_from_image_part (struct fuzzy_rule *rule, (const unsigned char *)img->dct, RSPAMD_DCT_LEN / NBBY, rule->hash_key->str, rule->hash_key->len); - msg_debug_task ("loading shingles of type %s with key %*xs", + msg_debug_fuzzy_check ("loading shingles of type %s with key %*xs", rule->algorithm_str, 16, rule->shingles_key->str); @@ -2653,25 +2660,47 @@ fuzzy_check_timer_callback(int fd, short what, void *arg) } } - if (session->retransmits >= session->rule->retransmits) { - msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits", - rspamd_upstream_name(session->server), - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->server)), - session->retransmits, - session->rule->retransmits); - rspamd_upstream_fail(session->server, TRUE, "timeout"); + if (session->retransmits >= 0) { + if (session->retransmits >= session->rule->retransmits) { + msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits", + rspamd_upstream_name(session->server), + rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->server)), + session->retransmits, + session->rule->retransmits); + rspamd_upstream_fail(session->server, TRUE, "timeout"); - if (session->item) { - rspamd_symcache_item_async_dec_check(session->task, session->item, M); + if (session->item) { + rspamd_symcache_item_async_dec_check(session->task, session->item, M); + } + rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); + } + else { + /* Apply backpressure here: at least 100ms up to 500ms */ + session->retransmits++; + double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits; + /* Do not make delay more than 500ms for performance considerations */ + backpressure_time = MIN(backpressure_time, 0.5); + backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0); + backpressure_time = MAX(backpressure_time, 0.1); + /* Inverse to distinguish */ + msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;", + backpressure_time * 1000, + rspamd_upstream_name(session->server), + session->retransmits); + session->retransmits = -(session->retransmits); + /* Disable write event for the time of backpressure */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ, + backpressure_time); } - rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); } else { /* Plan write event */ - rspamd_ev_watcher_reschedule(session->event_loop, - &session->ev, EV_READ | EV_WRITE); - session->retransmits++; + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ | EV_WRITE, + session->rule->io_timeout); + session->retransmits = -(session->retransmits); } } @@ -2794,51 +2823,73 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg) task = session->task; - if (session->retransmits >= session->rule->retransmits) { - rspamd_upstream_fail(session->server, TRUE, "timeout"); - msg_err_task_check("got IO timeout with server %s(%s), " - "after %d/%d retransmits", - rspamd_upstream_name(session->server), - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->server)), - session->retransmits, - session->rule->retransmits); - - if (session->session) { - rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin, - session); - } - else { - if (session->http_entry) { - rspamd_controller_send_error(session->http_entry, - 500, "IO timeout with fuzzy storage"); + if (session->retransmits >= 0) { + if (session->retransmits >= session->rule->retransmits) { + rspamd_upstream_fail(session->server, TRUE, "timeout"); + msg_err_task_check("got IO timeout with server %s(%s), " + "after %d/%d retransmits", + rspamd_upstream_name(session->server), + rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->server)), + session->retransmits, + session->rule->retransmits); + + if (session->session) { + rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin, + session); } + else { + if (session->http_entry) { + rspamd_controller_send_error(session->http_entry, + 500, "IO timeout with fuzzy storage"); + } - if (*session->saved > 0) { - (*session->saved)--; - if (*session->saved == 0) { - if (session->http_entry) { - rspamd_task_free(session->task); + if (*session->saved > 0) { + (*session->saved)--; + if (*session->saved == 0) { + if (session->http_entry) { + rspamd_task_free(session->task); + } + + session->task = NULL; } + } - session->task = NULL; + if (session->http_entry) { + rspamd_http_connection_unref(session->http_entry->conn); } - } - if (session->http_entry) { - rspamd_http_connection_unref(session->http_entry->conn); + rspamd_ev_watcher_stop(session->event_loop, + &session->ev); + close(session->fd); } - - rspamd_ev_watcher_stop(session->event_loop, - &session->ev); - close(session->fd); + } + else { + /* Apply backpressure here: at least 100ms up to 500ms */ + session->retransmits++; + double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits; + /* Do not make delay more than 500ms for performance considerations */ + backpressure_time = MIN(backpressure_time, 0.5); + backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0); + backpressure_time = MAX(backpressure_time, 0.1); + /* Inverse to distinguish */ + msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;", + backpressure_time * 1000, + rspamd_upstream_name(session->server), + session->retransmits); + session->retransmits = -(session->retransmits); + /* Disable write event for the time of backpressure */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ, + backpressure_time); } } else { - /* Plan write event */ - rspamd_ev_watcher_reschedule(session->event_loop, - &session->ev, EV_READ | EV_WRITE); - session->retransmits++; + /* Backpressure done, plan write event */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ | EV_WRITE, + session->rule->io_timeout); + session->retransmits = -(session->retransmits); } } @@ -4228,9 +4279,9 @@ fuzzy_lua_hex_hashes_handler(lua_State *L) /* Check for flag */ if (g_hash_table_lookup(rule->mappings, GINT_TO_POINTER(flag)) == NULL) { - msg_debug_task("skip rule %s as it has no flag %d defined" - " false", - rule->name, flag); + msg_debug_fuzzy_check("skip rule %s as it has no flag %d defined" + " false", + rule->name, flag); continue; } |