Browse Source

[Project] Implement fuzzy check retransmits backpressure

pull/4939/head
Vsevolod Stakhov 1 week ago
parent
commit
e66c993097
No account linked to committer's email address
1 changed files with 96 additions and 56 deletions
  1. 96
    56
      src/plugins/fuzzy_check.c

+ 96
- 56
src/plugins/fuzzy_check.c View File

@@ -139,7 +139,7 @@ struct fuzzy_client_session {
struct rspamd_io_ev ev;
int state;
int fd;
unsigned int retransmits;
int retransmits;
};

struct fuzzy_learn_session {
@@ -157,7 +157,7 @@ struct fuzzy_learn_session {
struct ev_loop *event_loop;
struct rspamd_io_ev ev;
int fd;
unsigned int retransmits;
int retransmits;
};

#define FUZZY_CMD_FLAG_REPLIED (1 << 0)
@@ -208,11 +208,11 @@ module_t fuzzy_check_module = {
(unsigned int) -1,
};

INIT_LOG_MODULE(N)
#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
task ? task->from_addr : NULL, \
rspamd_task_log_id, M, task ? task->task_pool->tag.uid : NULL, \
RSPAMD_LOG_FUNC, \
INIT_LOG_MODULE(fuzzy_check)
#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
task ? task->from_addr : NULL, \
rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \
RSPAMD_LOG_FUNC, \
__VA_ARGS__)

static inline struct fuzzy_ctx *
@@ -2660,25 +2660,45 @@ fuzzy_check_timer_callback(int fd, short what, void *arg)
}
}

if (session->retransmits >= session->rule->retransmits) {
msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);
rspamd_upstream_fail(session->server, TRUE, "timeout");
if (session->retransmits >= 0) {
if (session->retransmits >= session->rule->retransmits) {
msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);
rspamd_upstream_fail(session->server, TRUE, "timeout");

if (session->item) {
rspamd_symcache_item_async_dec_check(session->task, session->item, M);
if (session->item) {
rspamd_symcache_item_async_dec_check(session->task, session->item, M);
}
rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Apply backpressure here: at least 100ms up to 500ms */
session->retransmits++;
double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
/* Do not make delay more than 500ms for performance considerations */
backpressure_time = MIN(backpressure_time, 0.5);
/* Inverse to distinguish */
msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
backpressure_time * 1000,
rspamd_upstream_name(session->server),
session->retransmits);
session->retransmits = -(session->retransmits);
/* Disable write event for the time of backpressure */
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ,
backpressure_time);
}
rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
rspamd_ev_watcher_reschedule(session->event_loop,
&session->ev, EV_READ | EV_WRITE);
session->retransmits++;
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ | EV_WRITE,
session->rule->io_timeout);
session->retransmits = -(session->retransmits);
}
}

@@ -2801,51 +2821,71 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg)

task = session->task;

if (session->retransmits >= session->rule->retransmits) {
rspamd_upstream_fail(session->server, TRUE, "timeout");
msg_err_task_check("got IO timeout with server %s(%s), "
"after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);

if (session->session) {
rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
session);
}
else {
if (session->http_entry) {
rspamd_controller_send_error(session->http_entry,
500, "IO timeout with fuzzy storage");
if (session->retransmits >= 0) {
if (session->retransmits >= session->rule->retransmits) {
rspamd_upstream_fail(session->server, TRUE, "timeout");
msg_err_task_check("got IO timeout with server %s(%s), "
"after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);

if (session->session) {
rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
session);
}
else {
if (session->http_entry) {
rspamd_controller_send_error(session->http_entry,
500, "IO timeout with fuzzy storage");
}

if (*session->saved > 0) {
(*session->saved)--;
if (*session->saved == 0) {
if (session->http_entry) {
rspamd_task_free(session->task);
if (*session->saved > 0) {
(*session->saved)--;
if (*session->saved == 0) {
if (session->http_entry) {
rspamd_task_free(session->task);
}

session->task = NULL;
}
}

session->task = NULL;
if (session->http_entry) {
rspamd_http_connection_unref(session->http_entry->conn);
}
}

if (session->http_entry) {
rspamd_http_connection_unref(session->http_entry->conn);
rspamd_ev_watcher_stop(session->event_loop,
&session->ev);
close(session->fd);
}

rspamd_ev_watcher_stop(session->event_loop,
&session->ev);
close(session->fd);
}
else {
/* Apply backpressure here: at least 100ms up to 500ms */
session->retransmits++;
double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
/* Do not make delay more than 500ms for performance considerations */
backpressure_time = MIN(backpressure_time, 0.5);
/* Inverse to distinguish */
msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
backpressure_time * 1000,
rspamd_upstream_name(session->server),
session->retransmits);
session->retransmits = -(session->retransmits);
/* Disable write event for the time of backpressure */
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ,
backpressure_time);
}
}
else {
/* Plan write event */
rspamd_ev_watcher_reschedule(session->event_loop,
&session->ev, EV_READ | EV_WRITE);
session->retransmits++;
/* Backpressure done, plan write event */
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ | EV_WRITE,
session->rule->io_timeout);
session->retransmits = -(session->retransmits);
}
}


Loading…
Cancel
Save