diff options
Diffstat (limited to 'src/plugins/fuzzy_check.c')
-rw-r--r-- | src/plugins/fuzzy_check.c | 169 |
1 files changed, 110 insertions, 59 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 75968ce84..a035eeaae 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -59,7 +59,7 @@ #define RSPAMD_FUZZY_PLUGIN_VERSION RSPAMD_FUZZY_VERSION static const int rspamd_fuzzy_hash_len = 5; -static const char *M = "fuzzy check"; +static const char *M = "fuzzy_check"; struct fuzzy_ctx; struct fuzzy_mapping { @@ -139,7 +139,7 @@ struct fuzzy_client_session { struct rspamd_io_ev ev; int state; int fd; - unsigned int retransmits; + int retransmits; }; struct fuzzy_learn_session { @@ -157,7 +157,7 @@ struct fuzzy_learn_session { struct ev_loop *event_loop; struct rspamd_io_ev ev; int fd; - unsigned int retransmits; + int retransmits; }; #define FUZZY_CMD_FLAG_REPLIED (1 << 0) @@ -208,6 +208,13 @@ module_t fuzzy_check_module = { (unsigned int) -1, }; +INIT_LOG_MODULE(fuzzy_check) +#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \ + task ? task->from_addr : NULL, \ + rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) + static inline struct fuzzy_ctx * fuzzy_get_context(struct rspamd_config *cfg) { @@ -1842,9 +1849,9 @@ fuzzy_cmd_from_text_part(struct rspamd_task *task, rspamd_cryptobox_hash_final(&st, shcmd->basic.digest); - msg_debug_task("loading shingles of type %s with key %*xs", - rule->algorithm_str, - 16, rule->shingles_key->str); + msg_debug_fuzzy_check("loading shingles of type %s with key %*xs", + rule->algorithm_str, + 16, rule->shingles_key->str); sh = rspamd_shingles_from_text(words, rule->shingles_key->str, task->task_pool, rspamd_shingles_default_filter, NULL, @@ -1987,7 +1994,7 @@ fuzzy_cmd_from_image_part (struct fuzzy_rule *rule, (const unsigned char *)img->dct, RSPAMD_DCT_LEN / NBBY, rule->hash_key->str, rule->hash_key->len); - msg_debug_task ("loading shingles of type %s with key %*xs", + msg_debug_fuzzy_check ("loading shingles of type %s with key %*xs", rule->algorithm_str, 16, rule->shingles_key->str); @@ -2653,25 +2660,47 @@ fuzzy_check_timer_callback(int fd, short what, void *arg) } } - if (session->retransmits >= session->rule->retransmits) { - msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits", - rspamd_upstream_name(session->server), - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->server)), - session->retransmits, - session->rule->retransmits); - rspamd_upstream_fail(session->server, TRUE, "timeout"); + if (session->retransmits >= 0) { + if (session->retransmits >= session->rule->retransmits) { + msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits", + rspamd_upstream_name(session->server), + rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->server)), + session->retransmits, + session->rule->retransmits); + rspamd_upstream_fail(session->server, TRUE, "timeout"); - if (session->item) { - rspamd_symcache_item_async_dec_check(session->task, session->item, M); + if (session->item) { + rspamd_symcache_item_async_dec_check(session->task, session->item, M); + } + rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); + } + else { + /* Apply backpressure here: at least 100ms up to 500ms */ + session->retransmits++; + double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits; + /* Do not make delay more than 500ms for performance considerations */ + backpressure_time = MIN(backpressure_time, 0.5); + backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0); + backpressure_time = MAX(backpressure_time, 0.1); + /* Inverse to distinguish */ + msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;", + backpressure_time * 1000, + rspamd_upstream_name(session->server), + session->retransmits); + session->retransmits = -(session->retransmits); + /* Disable write event for the time of backpressure */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ, + backpressure_time); } - rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session); } else { /* Plan write event */ - rspamd_ev_watcher_reschedule(session->event_loop, - &session->ev, EV_READ | EV_WRITE); - session->retransmits++; + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ | EV_WRITE, + session->rule->io_timeout); + session->retransmits = -(session->retransmits); } } @@ -2794,51 +2823,73 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg) task = session->task; - if (session->retransmits >= session->rule->retransmits) { - rspamd_upstream_fail(session->server, TRUE, "timeout"); - msg_err_task_check("got IO timeout with server %s(%s), " - "after %d/%d retransmits", - rspamd_upstream_name(session->server), - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->server)), - session->retransmits, - session->rule->retransmits); - - if (session->session) { - rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin, - session); - } - else { - if (session->http_entry) { - rspamd_controller_send_error(session->http_entry, - 500, "IO timeout with fuzzy storage"); + if (session->retransmits >= 0) { + if (session->retransmits >= session->rule->retransmits) { + rspamd_upstream_fail(session->server, TRUE, "timeout"); + msg_err_task_check("got IO timeout with server %s(%s), " + "after %d/%d retransmits", + rspamd_upstream_name(session->server), + rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->server)), + session->retransmits, + session->rule->retransmits); + + if (session->session) { + rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin, + session); } + else { + if (session->http_entry) { + rspamd_controller_send_error(session->http_entry, + 500, "IO timeout with fuzzy storage"); + } - if (*session->saved > 0) { - (*session->saved)--; - if (*session->saved == 0) { - if (session->http_entry) { - rspamd_task_free(session->task); + if (*session->saved > 0) { + (*session->saved)--; + if (*session->saved == 0) { + if (session->http_entry) { + rspamd_task_free(session->task); + } + + session->task = NULL; } + } - session->task = NULL; + if (session->http_entry) { + rspamd_http_connection_unref(session->http_entry->conn); } - } - if (session->http_entry) { - rspamd_http_connection_unref(session->http_entry->conn); + rspamd_ev_watcher_stop(session->event_loop, + &session->ev); + close(session->fd); } - - rspamd_ev_watcher_stop(session->event_loop, - &session->ev); - close(session->fd); + } + else { + /* Apply backpressure here: at least 100ms up to 500ms */ + session->retransmits++; + double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits; + /* Do not make delay more than 500ms for performance considerations */ + backpressure_time = MIN(backpressure_time, 0.5); + backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0); + backpressure_time = MAX(backpressure_time, 0.1); + /* Inverse to distinguish */ + msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;", + backpressure_time * 1000, + rspamd_upstream_name(session->server), + session->retransmits); + session->retransmits = -(session->retransmits); + /* Disable write event for the time of backpressure */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ, + backpressure_time); } } else { - /* Plan write event */ - rspamd_ev_watcher_reschedule(session->event_loop, - &session->ev, EV_READ | EV_WRITE); - session->retransmits++; + /* Backpressure done, plan write event */ + rspamd_ev_watcher_reschedule_at(session->event_loop, + &session->ev, EV_READ | EV_WRITE, + session->rule->io_timeout); + session->retransmits = -(session->retransmits); } } @@ -4228,9 +4279,9 @@ fuzzy_lua_hex_hashes_handler(lua_State *L) /* Check for flag */ if (g_hash_table_lookup(rule->mappings, GINT_TO_POINTER(flag)) == NULL) { - msg_debug_task("skip rule %s as it has no flag %d defined" - " false", - rule->name, flag); + msg_debug_fuzzy_check("skip rule %s as it has no flag %d defined" + " false", + rule->name, flag); continue; } |