Browse Source

Merge pull request #4939 from rspamd/vstakhov-fuzzy-backpressure

Implement backpressure for fuzzy check
pull/4942/head
Vsevolod Stakhov 1 week ago
parent
commit
f4aa620654
No account linked to committer's email address
3 changed files with 154 additions and 62 deletions
  1. 30
    0
      src/libutil/libev_helper.c
  2. 14
    3
      src/libutil/libev_helper.h
  3. 110
    59
      src/plugins/fuzzy_check.c

+ 30
- 0
src/libutil/libev_helper.c View File

@@ -108,4 +108,34 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
ev_timer_start(EV_A, &ev->tm);
}
}
}

void rspamd_ev_watcher_reschedule_at(struct ev_loop *loop,
struct rspamd_io_ev *ev,
short what,
ev_tstamp at)
{
g_assert(ev->cb != NULL);

if (ev_can_stop(&ev->io)) {
ev_io_stop(EV_A, &ev->io);
ev_io_set(&ev->io, ev->io.fd, what);
ev_io_start(EV_A, &ev->io);
}
else {
ev->io.data = ev;
ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
ev_io_start(EV_A, &ev->io);
}

if (at > 0) {
if (!(ev_can_stop(&ev->tm))) {
/* Update timestamp to avoid timers running early */
ev_now_update_if_cheap(loop);

ev->tm.data = ev;
ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, at, 0.0);
ev_timer_start(EV_A, &ev->tm);
}
}
}

+ 14
- 3
src/libutil/libev_helper.h View File

@@ -1,11 +1,11 @@
/*-
* Copyright 2019 Vsevolod Stakhov
/*
* Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -79,6 +79,17 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
struct rspamd_io_ev *ev,
short what);

/**
* Convenience function to reschedule watcher with different events and different timeout
* @param loop
* @param ev
* @param what
*/
void rspamd_ev_watcher_reschedule_at(struct ev_loop *loop,
struct rspamd_io_ev *ev,
short what,
ev_tstamp at);

#ifdef __cplusplus
}
#endif

+ 110
- 59
src/plugins/fuzzy_check.c View File

@@ -59,7 +59,7 @@
#define RSPAMD_FUZZY_PLUGIN_VERSION RSPAMD_FUZZY_VERSION

static const int rspamd_fuzzy_hash_len = 5;
static const char *M = "fuzzy check";
static const char *M = "fuzzy_check";
struct fuzzy_ctx;

struct fuzzy_mapping {
@@ -139,7 +139,7 @@ struct fuzzy_client_session {
struct rspamd_io_ev ev;
int state;
int fd;
unsigned int retransmits;
int retransmits;
};

struct fuzzy_learn_session {
@@ -157,7 +157,7 @@ struct fuzzy_learn_session {
struct ev_loop *event_loop;
struct rspamd_io_ev ev;
int fd;
unsigned int retransmits;
int retransmits;
};

#define FUZZY_CMD_FLAG_REPLIED (1 << 0)
@@ -208,6 +208,13 @@ module_t fuzzy_check_module = {
(unsigned int) -1,
};

INIT_LOG_MODULE(fuzzy_check)
#define msg_debug_fuzzy_check(...) rspamd_conditional_debug_fast(NULL, \
task ? task->from_addr : NULL, \
rspamd_fuzzy_check_log_id, M, task ? task->task_pool->tag.uid : NULL, \
RSPAMD_LOG_FUNC, \
__VA_ARGS__)

static inline struct fuzzy_ctx *
fuzzy_get_context(struct rspamd_config *cfg)
{
@@ -1842,9 +1849,9 @@ fuzzy_cmd_from_text_part(struct rspamd_task *task,

rspamd_cryptobox_hash_final(&st, shcmd->basic.digest);

msg_debug_task("loading shingles of type %s with key %*xs",
rule->algorithm_str,
16, rule->shingles_key->str);
msg_debug_fuzzy_check("loading shingles of type %s with key %*xs",
rule->algorithm_str,
16, rule->shingles_key->str);
sh = rspamd_shingles_from_text(words,
rule->shingles_key->str, task->task_pool,
rspamd_shingles_default_filter, NULL,
@@ -1987,7 +1994,7 @@ fuzzy_cmd_from_image_part (struct fuzzy_rule *rule,
(const unsigned char *)img->dct, RSPAMD_DCT_LEN / NBBY,
rule->hash_key->str, rule->hash_key->len);

msg_debug_task ("loading shingles of type %s with key %*xs",
msg_debug_fuzzy_check ("loading shingles of type %s with key %*xs",
rule->algorithm_str,
16, rule->shingles_key->str);

@@ -2653,25 +2660,47 @@ fuzzy_check_timer_callback(int fd, short what, void *arg)
}
}

if (session->retransmits >= session->rule->retransmits) {
msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);
rspamd_upstream_fail(session->server, TRUE, "timeout");
if (session->retransmits >= 0) {
if (session->retransmits >= session->rule->retransmits) {
msg_err_task("got IO timeout with server %s(%s), after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);
rspamd_upstream_fail(session->server, TRUE, "timeout");

if (session->item) {
rspamd_symcache_item_async_dec_check(session->task, session->item, M);
if (session->item) {
rspamd_symcache_item_async_dec_check(session->task, session->item, M);
}
rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Apply backpressure here: at least 100ms up to 500ms */
session->retransmits++;
double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
/* Do not make delay more than 500ms for performance considerations */
backpressure_time = MIN(backpressure_time, 0.5);
backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0);
backpressure_time = MAX(backpressure_time, 0.1);
/* Inverse to distinguish */
msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
backpressure_time * 1000,
rspamd_upstream_name(session->server),
session->retransmits);
session->retransmits = -(session->retransmits);
/* Disable write event for the time of backpressure */
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ,
backpressure_time);
}
rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
rspamd_ev_watcher_reschedule(session->event_loop,
&session->ev, EV_READ | EV_WRITE);
session->retransmits++;
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ | EV_WRITE,
session->rule->io_timeout);
session->retransmits = -(session->retransmits);
}
}

@@ -2794,51 +2823,73 @@ fuzzy_controller_timer_callback(int fd, short what, void *arg)

task = session->task;

if (session->retransmits >= session->rule->retransmits) {
rspamd_upstream_fail(session->server, TRUE, "timeout");
msg_err_task_check("got IO timeout with server %s(%s), "
"after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);

if (session->session) {
rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
session);
}
else {
if (session->http_entry) {
rspamd_controller_send_error(session->http_entry,
500, "IO timeout with fuzzy storage");
if (session->retransmits >= 0) {
if (session->retransmits >= session->rule->retransmits) {
rspamd_upstream_fail(session->server, TRUE, "timeout");
msg_err_task_check("got IO timeout with server %s(%s), "
"after %d/%d retransmits",
rspamd_upstream_name(session->server),
rspamd_inet_address_to_string_pretty(
rspamd_upstream_addr_cur(session->server)),
session->retransmits,
session->rule->retransmits);

if (session->session) {
rspamd_session_remove_event(session->session, fuzzy_controller_lua_fin,
session);
}
else {
if (session->http_entry) {
rspamd_controller_send_error(session->http_entry,
500, "IO timeout with fuzzy storage");
}

if (*session->saved > 0) {
(*session->saved)--;
if (*session->saved == 0) {
if (session->http_entry) {
rspamd_task_free(session->task);
if (*session->saved > 0) {
(*session->saved)--;
if (*session->saved == 0) {
if (session->http_entry) {
rspamd_task_free(session->task);
}

session->task = NULL;
}
}

session->task = NULL;
if (session->http_entry) {
rspamd_http_connection_unref(session->http_entry->conn);
}
}

if (session->http_entry) {
rspamd_http_connection_unref(session->http_entry->conn);
rspamd_ev_watcher_stop(session->event_loop,
&session->ev);
close(session->fd);
}

rspamd_ev_watcher_stop(session->event_loop,
&session->ev);
close(session->fd);
}
else {
/* Apply backpressure here: at least 100ms up to 500ms */
session->retransmits++;
double backpressure_time = MAX(session->rule->io_timeout * 0.1, 0.1) * session->retransmits;
/* Do not make delay more than 500ms for performance considerations */
backpressure_time = MIN(backpressure_time, 0.5);
backpressure_time = rspamd_time_jitter(backpressure_time * 0.5, 0.0);
backpressure_time = MAX(backpressure_time, 0.1);
/* Inverse to distinguish */
msg_debug_fuzzy_check("backpressure for %.2f milliseconds (server=%s), retransmits: %d;",
backpressure_time * 1000,
rspamd_upstream_name(session->server),
session->retransmits);
session->retransmits = -(session->retransmits);
/* Disable write event for the time of backpressure */
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ,
backpressure_time);
}
}
else {
/* Plan write event */
rspamd_ev_watcher_reschedule(session->event_loop,
&session->ev, EV_READ | EV_WRITE);
session->retransmits++;
/* Backpressure done, plan write event */
rspamd_ev_watcher_reschedule_at(session->event_loop,
&session->ev, EV_READ | EV_WRITE,
session->rule->io_timeout);
session->retransmits = -(session->retransmits);
}
}

@@ -4228,9 +4279,9 @@ fuzzy_lua_hex_hashes_handler(lua_State *L)
/* Check for flag */
if (g_hash_table_lookup(rule->mappings,
GINT_TO_POINTER(flag)) == NULL) {
msg_debug_task("skip rule %s as it has no flag %d defined"
" false",
rule->name, flag);
msg_debug_fuzzy_check("skip rule %s as it has no flag %d defined"
" false",
rule->name, flag);
continue;
}


Loading…
Cancel
Save