aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-04-23 15:37:18 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2024-04-23 15:37:18 +0100
commite66c993097278b8a9df7fc3c213e74d7753db44a (patch)
treef395efc5ecf535689a3b2786bcddbd7042c7f4b9 /src
parentbd44de491aa5bf68c35198df2f926023a980294d (diff)
downloadrspamd-e66c993097278b8a9df7fc3c213e74d7753db44a.tar.gz
rspamd-e66c993097278b8a9df7fc3c213e74d7753db44a.zip
[Project] Implement fuzzy check retransmits backpressure
Diffstat (limited to 'src')
-rw-r--r--src/plugins/fuzzy_check.c152
1 files changed, 96 insertions, 56 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index d91b4f8dd..00c45a442 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -139,7 +139,7 @@ struct fuzzy_client_session {
struct rspamd_io_ev ev;
int state;
int fd;
- unsigned int retransmits;
+ int retransmits;
};
struct fuzzy_learn_session {
@@ -157,7 +157,7 @@ struct fuzzy_learn_session {
struct ev_loop *event_loop;
struct rspamd_io_ev ev;
int fd;
- unsigned int retransmits;
+ int retransmits;
};
#define FUZZY_CMD_FLAG_REPLIED (1 << 0)
@@ -208,11 +208,11 @@ module_t fuzzy_check_module = {
(unsigned int) -1,
};
-INIT_LOG_MODULE(N)
-#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
- task ? task->from_addr : NULL, \
- rspamd_task_log_id, M, task ? task->task_pool->tag.uid : NULL, \
- RSPAMD_LOG_FUNC, \
+INIT_LOG_MODULE(fuzzy_check)
+#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
+ task ? task->from_addr : NULL, \
+ rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \
+ RSPAMD_LOG_FUNC, \
__VA_ARGS__)
static inline struct fuzzy_ctx *
@@ -2660,25 +2660,45 @@ fuzzy_check_timer_callback(int fd, short what, void *arg)
}
}
- if (session->retransmits >= session->rule->retransmits) {
- msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
- rspamd_upstream_name(session->server),
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->server)),
- session->retransmits,
- session->rule->retransmits);
- rspamd_upstream_fail(session->server, TRUE, "timeout");
+ if (session->retransmits >= 0) {
+ if (session->retransmits >= session->rule->retransmits) {
+ msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
+ rspamd_upstream_name(session->server),
+ rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->server)),
+ session->retransmits,
+ session->rule->retransmits);
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
- if (session->item) {
- rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ }
+ rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Apply backpressure here: at least 100ms up to 500ms */
+ session->retransmits++;
+ double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+ /* Do not make delay more than 500ms for performance considerations */
+ backpressure_time = MIN(backpressure_time, 0.5);
+ /* Inverse to distinguish */
+ msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+ backpressure_time * 1000,
+ rspamd_upstream_name(session->server),
+ session->retransmits);
+ session->retransmits = -(session->retransmits);
+ /* Disable write event for the time of backpressure */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ,
+ backpressure_time);
}
- rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
- rspamd_ev_watcher_reschedule(session->event_loop,
- &session->ev, EV_READ | EV_WRITE);
- session->retransmits++;
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ | EV_WRITE,
+ session->rule->io_timeout);
+ session->retransmits = -(session->retransmits);
}
}
@@ -2801,51 +2821,71 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg)
task = session->task;
- if (session->retransmits >= session->rule->retransmits) {
- rspamd_upstream_fail(session->server, TRUE, "timeout");
- msg_err_task_check("got IO timeout with server %s(%s), "
- "after %d/%d retransmits",
- rspamd_upstream_name(session->server),
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->server)),
- session->retransmits,
- session->rule->retransmits);
-
- if (session->session) {
- rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
- session);
- }
- else {
- if (session->http_entry) {
- rspamd_controller_send_error(session->http_entry,
- 500, "IO timeout with fuzzy storage");
+ if (session->retransmits >= 0) {
+ if (session->retransmits >= session->rule->retransmits) {
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
+ msg_err_task_check("got IO timeout with server %s(%s), "
+ "after %d/%d retransmits",
+ rspamd_upstream_name(session->server),
+ rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->server)),
+ session->retransmits,
+ session->rule->retransmits);
+
+ if (session->session) {
+ rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
+ session);
}
+ else {
+ if (session->http_entry) {
+ rspamd_controller_send_error(session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ }
- if (*session->saved > 0) {
- (*session->saved)--;
- if (*session->saved == 0) {
- if (session->http_entry) {
- rspamd_task_free(session->task);
+ if (*session->saved > 0) {
+ (*session->saved)--;
+ if (*session->saved == 0) {
+ if (session->http_entry) {
+ rspamd_task_free(session->task);
+ }
+
+ session->task = NULL;
}
+ }
- session->task = NULL;
+ if (session->http_entry) {
+ rspamd_http_connection_unref(session->http_entry->conn);
}
- }
- if (session->http_entry) {
- rspamd_http_connection_unref(session->http_entry->conn);
+ rspamd_ev_watcher_stop(session->event_loop,
+ &session->ev);
+ close(session->fd);
}
-
- rspamd_ev_watcher_stop(session->event_loop,
- &session->ev);
- close(session->fd);
+ }
+ else {
+ /* Apply backpressure here: at least 100ms up to 500ms */
+ session->retransmits++;
+ double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+ /* Do not make delay more than 500ms for performance considerations */
+ backpressure_time = MIN(backpressure_time, 0.5);
+ /* Inverse to distinguish */
+ msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+ backpressure_time * 1000,
+ rspamd_upstream_name(session->server),
+ session->retransmits);
+ session->retransmits = -(session->retransmits);
+ /* Disable write event for the time of backpressure */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ,
+ backpressure_time);
}
}
else {
- /* Plan write event */
- rspamd_ev_watcher_reschedule(session->event_loop,
- &session->ev, EV_READ | EV_WRITE);
- session->retransmits++;
+ /* Backpressure done, plan write event */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ | EV_WRITE,
+ session->rule->io_timeout);
+ session->retransmits = -(session->retransmits);
}
}