aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-04-24 14:49:47 +0600
committerGitHub <noreply@github.com>2024-04-24 14:49:47 +0600
commitf4aa6206543418fca16331a8711b3ca710d87035 (patch)
tree728de17b138ca00b6ad3e2289a9ce6d38aa00b0e
parent980cd40f8df84b88af0a08694367daf2204b459f (diff)
parent990a9dc8db6682d45315b6952864747e3e0c521c (diff)
downloadrspamd-f4aa6206543418fca16331a8711b3ca710d87035.tar.gz
rspamd-f4aa6206543418fca16331a8711b3ca710d87035.zip
Merge pull request #4939 from rspamd/vstakhov-fuzzy-backpressure
Implement backpressure for fuzzy check
-rw-r--r--src/libutil/libev_helper.c30
-rw-r--r--src/libutil/libev_helper.h17
-rw-r--r--src/plugins/fuzzy_check.c169
3 files changed, 154 insertions, 62 deletions
diff --git a/src/libutil/libev_helper.c b/src/libutil/libev_helper.c
index 3b880aaa2..203e1ed73 100644
--- a/src/libutil/libev_helper.c
+++ b/src/libutil/libev_helper.c
@@ -108,4 +108,34 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
ev_timer_start(EV_A, &ev->tm);
}
}
+}
+
+void rspamd_ev_watcher_reschedule_at(struct ev_loop *loop,
+ struct rspamd_io_ev *ev,
+ short what,
+ ev_tstamp at)
+{
+ g_assert(ev->cb != NULL);
+
+ if (ev_can_stop(&ev->io)) {
+ ev_io_stop(EV_A, &ev->io);
+ ev_io_set(&ev->io, ev->io.fd, what);
+ ev_io_start(EV_A, &ev->io);
+ }
+ else {
+ ev->io.data = ev;
+ ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
+ ev_io_start(EV_A, &ev->io);
+ }
+
+ if (at > 0) {
+ if (!(ev_can_stop(&ev->tm))) {
+ /* Update timestamp to avoid timers running early */
+ ev_now_update_if_cheap(loop);
+
+ ev->tm.data = ev;
+ ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, at, 0.0);
+ ev_timer_start(EV_A, &ev->tm);
+ }
+ }
} \ No newline at end of file
diff --git a/src/libutil/libev_helper.h b/src/libutil/libev_helper.h
index 44d1604b0..d68f17951 100644
--- a/src/libutil/libev_helper.h
+++ b/src/libutil/libev_helper.h
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2019 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -79,6 +79,17 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
struct rspamd_io_ev *ev,
short what);
+/**
+ * Convenience function to reschedule watcher with different events and different timeout
+ * @param loop
+ * @param ev
+ * @param what
+ */
+void rspamd_ev_watcher_reschedule_at(struct ev_loop *loop,
+ struct rspamd_io_ev *ev,
+ short what,
+ ev_tstamp at);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 75968ce84..a035eeaae 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -59,7 +59,7 @@
#define RSPAMD_FUZZY_PLUGIN_VERSION RSPAMD_FUZZY_VERSION
static const int rspamd_fuzzy_hash_len = 5;
-static const char *M = "fuzzy check";
+static const char *M = "fuzzy_check";
struct fuzzy_ctx;
struct fuzzy_mapping {
@@ -139,7 +139,7 @@ struct fuzzy_client_session {
struct rspamd_io_ev ev;
int state;
int fd;
- unsigned int retransmits;
+ int retransmits;
};
struct fuzzy_learn_session {
@@ -157,7 +157,7 @@ struct fuzzy_learn_session {
struct ev_loop *event_loop;
struct rspamd_io_ev ev;
int fd;
- unsigned int retransmits;
+ int retransmits;
};
#define FUZZY_CMD_FLAG_REPLIED (1 << 0)
@@ -208,6 +208,13 @@ module_t fuzzy_check_module = {
(unsigned int) -1,
};
+INIT_LOG_MODULE(fuzzy_check)
+#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
+ task ? task->from_addr : NULL, \
+ rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+
static inline struct fuzzy_ctx *
fuzzy_get_context(struct rspamd_config *cfg)
{
@@ -1842,9 +1849,9 @@ fuzzy_cmd_from_text_part(struct rspamd_task *task,
rspamd_cryptobox_hash_final(&st, shcmd->basic.digest);
- msg_debug_task("loading shingles of type %s with key %*xs",
- rule->algorithm_str,
- 16, rule->shingles_key->str);
+ msg_debug_fuzzy_check("loading shingles of type %s with key %*xs",
+ rule->algorithm_str,
+ 16, rule->shingles_key->str);
sh = rspamd_shingles_from_text(words,
rule->shingles_key->str, task->task_pool,
rspamd_shingles_default_filter, NULL,
@@ -1987,7 +1994,7 @@ fuzzy_cmd_from_image_part (struct fuzzy_rule *rule,
(const unsigned char *)img->dct, RSPAMD_DCT_LEN / NBBY,
rule->hash_key->str, rule->hash_key->len);
- msg_debug_task ("loading shingles of type %s with key %*xs",
+ msg_debug_fuzzy_check ("loading shingles of type %s with key %*xs",
rule->algorithm_str,
16, rule->shingles_key->str);
@@ -2653,25 +2660,47 @@ fuzzy_check_timer_callback(int fd, short what, void *arg)
}
}
- if (session->retransmits >= session->rule->retransmits) {
- msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
- rspamd_upstream_name(session->server),
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->server)),
- session->retransmits,
- session->rule->retransmits);
- rspamd_upstream_fail(session->server, TRUE, "timeout");
+ if (session->retransmits >= 0) {
+ if (session->retransmits >= session->rule->retransmits) {
+ msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
+ rspamd_upstream_name(session->server),
+ rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->server)),
+ session->retransmits,
+ session->rule->retransmits);
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
- if (session->item) {
- rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ }
+ rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Apply backpressure here: at least 100ms up to 500ms */
+ session->retransmits++;
+ double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+ /* Do not make delay more than 500ms for performance considerations */
+ backpressure_time = MIN(backpressure_time, 0.5);
+ backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0);
+ backpressure_time = MAX(backpressure_time, 0.1);
+ /* Inverse to distinguish */
+ msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+ backpressure_time * 1000,
+ rspamd_upstream_name(session->server),
+ session->retransmits);
+ session->retransmits = -(session->retransmits);
+ /* Disable write event for the time of backpressure */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ,
+ backpressure_time);
}
- rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
- rspamd_ev_watcher_reschedule(session->event_loop,
- &session->ev, EV_READ | EV_WRITE);
- session->retransmits++;
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ | EV_WRITE,
+ session->rule->io_timeout);
+ session->retransmits = -(session->retransmits);
}
}
@@ -2794,51 +2823,73 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg)
task = session->task;
- if (session->retransmits >= session->rule->retransmits) {
- rspamd_upstream_fail(session->server, TRUE, "timeout");
- msg_err_task_check("got IO timeout with server %s(%s), "
- "after %d/%d retransmits",
- rspamd_upstream_name(session->server),
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->server)),
- session->retransmits,
- session->rule->retransmits);
-
- if (session->session) {
- rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
- session);
- }
- else {
- if (session->http_entry) {
- rspamd_controller_send_error(session->http_entry,
- 500, "IO timeout with fuzzy storage");
+ if (session->retransmits >= 0) {
+ if (session->retransmits >= session->rule->retransmits) {
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
+ msg_err_task_check("got IO timeout with server %s(%s), "
+ "after %d/%d retransmits",
+ rspamd_upstream_name(session->server),
+ rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->server)),
+ session->retransmits,
+ session->rule->retransmits);
+
+ if (session->session) {
+ rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
+ session);
}
+ else {
+ if (session->http_entry) {
+ rspamd_controller_send_error(session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ }
- if (*session->saved > 0) {
- (*session->saved)--;
- if (*session->saved == 0) {
- if (session->http_entry) {
- rspamd_task_free(session->task);
+ if (*session->saved > 0) {
+ (*session->saved)--;
+ if (*session->saved == 0) {
+ if (session->http_entry) {
+ rspamd_task_free(session->task);
+ }
+
+ session->task = NULL;
}
+ }
- session->task = NULL;
+ if (session->http_entry) {
+ rspamd_http_connection_unref(session->http_entry->conn);
}
- }
- if (session->http_entry) {
- rspamd_http_connection_unref(session->http_entry->conn);
+ rspamd_ev_watcher_stop(session->event_loop,
+ &session->ev);
+ close(session->fd);
}
-
- rspamd_ev_watcher_stop(session->event_loop,
- &session->ev);
- close(session->fd);
+ }
+ else {
+ /* Apply backpressure here: at least 100ms up to 500ms */
+ session->retransmits++;
+ double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+ /* Do not make delay more than 500ms for performance considerations */
+ backpressure_time = MIN(backpressure_time, 0.5);
+ backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0);
+ backpressure_time = MAX(backpressure_time, 0.1);
+ /* Inverse to distinguish */
+ msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+ backpressure_time * 1000,
+ rspamd_upstream_name(session->server),
+ session->retransmits);
+ session->retransmits = -(session->retransmits);
+ /* Disable write event for the time of backpressure */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ,
+ backpressure_time);
}
}
else {
- /* Plan write event */
- rspamd_ev_watcher_reschedule(session->event_loop,
- &session->ev, EV_READ | EV_WRITE);
- session->retransmits++;
+ /* Backpressure done, plan write event */
+ rspamd_ev_watcher_reschedule_at(session->event_loop,
+ &session->ev, EV_READ | EV_WRITE,
+ session->rule->io_timeout);
+ session->retransmits = -(session->retransmits);
}
}
@@ -4228,9 +4279,9 @@ fuzzy_lua_hex_hashes_handler(lua_State *L)
/* Check for flag */
if (g_hash_table_lookup(rule->mappings,
GINT_TO_POINTER(flag)) == NULL) {
- msg_debug_task("skip rule %s as it has no flag %d defined"
- " false",
- rule->name, flag);
+ msg_debug_fuzzy_check("skip rule %s as it has no flag %d defined"
+ " false",
+ rule->name, flag);
continue;
}