]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Implement fuzzy check retransmits backpressure
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 23 Apr 2024 14:37:18 +0000 (15:37 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 23 Apr 2024 14:37:18 +0000 (15:37 +0100)
src/plugins/fuzzy_check.c

index d91b4f8ddb6572ae119f1cca92ca2dffe2fcc45a..00c45a4425799aab4ccf4150fc1e7195f9d6929e 100644 (file)
@@ -139,7 +139,7 @@ struct fuzzy_client_session {
        struct rspamd_io_ev ev;
        int state;
        int fd;
-       unsigned int retransmits;
+       int retransmits;
 };
 
 struct fuzzy_learn_session {
@@ -157,7 +157,7 @@ struct fuzzy_learn_session {
        struct ev_loop *event_loop;
        struct rspamd_io_ev ev;
        int fd;
-       unsigned int retransmits;
+       int retransmits;
 };
 
 #define FUZZY_CMD_FLAG_REPLIED (1 << 0)
@@ -208,11 +208,11 @@ module_t fuzzy_check_module = {
        (unsigned int) -1,
 };
 
-INIT_LOG_MODULE(N)
-#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL,                                                          \
-                                                                                                                                task ? task->from_addr : NULL,                                 \
-                                                                                                                                rspamd_task_log_id, M, task ? task->task_pool->tag.uid : NULL, \
-                                                                                                                                RSPAMD_LOG_FUNC,                                               \
+INIT_LOG_MODULE(fuzzy_check)
+#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL,                                                                 \
+                                                                                                                                task ? task->from_addr : NULL,                                        \
+                                                                                                                                rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \
+                                                                                                                                RSPAMD_LOG_FUNC,                                                      \
                                                                                                                                 __VA_ARGS__)
 
 static inline struct fuzzy_ctx *
@@ -2660,25 +2660,45 @@ fuzzy_check_timer_callback(int fd, short what, void *arg)
                }
        }
 
-       if (session->retransmits >= session->rule->retransmits) {
-               msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
-                                        rspamd_upstream_name(session->server),
-                                        rspamd_inet_address_to_string_pretty(
-                                                rspamd_upstream_addr_cur(session->server)),
-                                        session->retransmits,
-                                        session->rule->retransmits);
-               rspamd_upstream_fail(session->server, TRUE, "timeout");
+       if (session->retransmits >= 0) {
+               if (session->retransmits >= session->rule->retransmits) {
+                       msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
+                                                rspamd_upstream_name(session->server),
+                                                rspamd_inet_address_to_string_pretty(
+                                                        rspamd_upstream_addr_cur(session->server)),
+                                                session->retransmits,
+                                                session->rule->retransmits);
+                       rspamd_upstream_fail(session->server, TRUE, "timeout");
 
-               if (session->item) {
-                       rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+                       if (session->item) {
+                               rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+                       }
+                       rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+               }
+               else {
+                       /* Apply backpressure here: at least 100ms up to 500ms */
+                       session->retransmits++;
+                       double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+                       /* Do not make delay more than 500ms for performance considerations */
+                       backpressure_time = MIN(backpressure_time, 0.5);
+                       /* Inverse to distinguish */
+                       msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+                                                                 backpressure_time * 1000,
+                                                                 rspamd_upstream_name(session->server),
+                                                                 session->retransmits);
+                       session->retransmits = -(session->retransmits);
+                       /* Disable write event for the time of backpressure */
+                       rspamd_ev_watcher_reschedule_at(session->event_loop,
+                                                                                       &session->ev, EV_READ,
+                                                                                       backpressure_time);
                }
-               rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
        }
        else {
                /* Plan write event */
-               rspamd_ev_watcher_reschedule(session->event_loop,
-                                                                        &session->ev, EV_READ | EV_WRITE);
-               session->retransmits++;
+               rspamd_ev_watcher_reschedule_at(session->event_loop,
+                                                                               &session->ev, EV_READ | EV_WRITE,
+                                                                               session->rule->io_timeout);
+               session->retransmits = -(session->retransmits);
        }
 }
 
@@ -2801,51 +2821,71 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg)
 
        task = session->task;
 
-       if (session->retransmits >= session->rule->retransmits) {
-               rspamd_upstream_fail(session->server, TRUE, "timeout");
-               msg_err_task_check("got IO timeout with server %s(%s), "
-                                                  "after %d/%d retransmits",
-                                                  rspamd_upstream_name(session->server),
-                                                  rspamd_inet_address_to_string_pretty(
-                                                          rspamd_upstream_addr_cur(session->server)),
-                                                  session->retransmits,
-                                                  session->rule->retransmits);
-
-               if (session->session) {
-                       rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
-                                                                               session);
-               }
-               else {
-                       if (session->http_entry) {
-                               rspamd_controller_send_error(session->http_entry,
-                                                                                        500, "IO timeout with fuzzy storage");
+       if (session->retransmits >= 0) {
+               if (session->retransmits >= session->rule->retransmits) {
+                       rspamd_upstream_fail(session->server, TRUE, "timeout");
+                       msg_err_task_check("got IO timeout with server %s(%s), "
+                                                          "after %d/%d retransmits",
+                                                          rspamd_upstream_name(session->server),
+                                                          rspamd_inet_address_to_string_pretty(
+                                                                  rspamd_upstream_addr_cur(session->server)),
+                                                          session->retransmits,
+                                                          session->rule->retransmits);
+
+                       if (session->session) {
+                               rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
+                                                                                       session);
                        }
+                       else {
+                               if (session->http_entry) {
+                                       rspamd_controller_send_error(session->http_entry,
+                                                                                                500, "IO timeout with fuzzy storage");
+                               }
 
-                       if (*session->saved > 0) {
-                               (*session->saved)--;
-                               if (*session->saved == 0) {
-                                       if (session->http_entry) {
-                                               rspamd_task_free(session->task);
+                               if (*session->saved > 0) {
+                                       (*session->saved)--;
+                                       if (*session->saved == 0) {
+                                               if (session->http_entry) {
+                                                       rspamd_task_free(session->task);
+                                               }
+
+                                               session->task = NULL;
                                        }
+                               }
 
-                                       session->task = NULL;
+                               if (session->http_entry) {
+                                       rspamd_http_connection_unref(session->http_entry->conn);
                                }
-                       }
 
-                       if (session->http_entry) {
-                               rspamd_http_connection_unref(session->http_entry->conn);
+                               rspamd_ev_watcher_stop(session->event_loop,
+                                                                          &session->ev);
+                               close(session->fd);
                        }
-
-                       rspamd_ev_watcher_stop(session->event_loop,
-                                                                  &session->ev);
-                       close(session->fd);
+               }
+               else {
+                       /* Apply backpressure here: at least 100ms up to 500ms */
+                       session->retransmits++;
+                       double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
+                       /* Do not make delay more than 500ms for performance considerations */
+                       backpressure_time = MIN(backpressure_time, 0.5);
+                       /* Inverse to distinguish */
+                       msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
+                                                                 backpressure_time * 1000,
+                                                                 rspamd_upstream_name(session->server),
+                                                                 session->retransmits);
+                       session->retransmits = -(session->retransmits);
+                       /* Disable write event for the time of backpressure */
+                       rspamd_ev_watcher_reschedule_at(session->event_loop,
+                                                                                       &session->ev, EV_READ,
+                                                                                       backpressure_time);
                }
        }
        else {
-               /* Plan write event */
-               rspamd_ev_watcher_reschedule(session->event_loop,
-                                                                        &session->ev, EV_READ | EV_WRITE);
-               session->retransmits++;
+               /* Backpressure done, plan write event */
+               rspamd_ev_watcher_reschedule_at(session->event_loop,
+                                                                               &session->ev, EV_READ | EV_WRITE,
+                                                                               session->rule->io_timeout);
+               session->retransmits = -(session->retransmits);
        }
 }