diff options
Diffstat (limited to 'src/plugins/fuzzy_check.c')
-rw-r--r-- | src/plugins/fuzzy_check.c | 164 |
1 files changed, 125 insertions, 39 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index ece9a91e0..85ea3b00c 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,7 +78,8 @@ enum fuzzy_rule_mode { }; struct fuzzy_rule { - struct upstream_list *servers; + struct upstream_list *read_servers; /* Servers for read operations */ + struct upstream_list *write_servers; /* Servers for write operations */ const char *symbol; const char *algorithm_str; const char *name; @@ -543,22 +544,68 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, } if ((value = ucl_object_lookup(obj, "servers")) != NULL) { - rule->servers = rspamd_upstreams_create(cfg->ups_ctx); - /* pass max_error and revive_time configuration in upstream for fuzzy storage - * it allows to configure error_rate threshold and upstream dead timer - */ - rspamd_upstreams_set_limits(rule->servers, + rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx); + rspamd_upstreams_set_limits(rule->read_servers, (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN, (unsigned int) fuzzy_module_ctx->max_errors, 0); rspamd_mempool_add_destructor(cfg->cfg_pool, (rspamd_mempool_destruct_t) rspamd_upstreams_destroy, - rule->servers); - if (!rspamd_upstreams_from_ucl(rule->servers, value, DEFAULT_PORT, NULL)) { + rule->read_servers); + if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) { msg_err_config("cannot read servers definition"); return -1; } + + rule->write_servers = rule->read_servers; + } + else { + /* Check for read_servers and write_servers */ + gboolean has_read = FALSE, has_write = FALSE; + + if ((value = ucl_object_lookup(obj, "read_servers")) != NULL) { + rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx); + rspamd_upstreams_set_limits(rule->read_servers, + (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN, + (unsigned int) fuzzy_module_ctx->max_errors, 0); + + rspamd_mempool_add_destructor(cfg->cfg_pool, + (rspamd_mempool_destruct_t) rspamd_upstreams_destroy, + rule->read_servers); + if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) { + msg_err_config("cannot read read_servers definition"); + return -1; + } + has_read = TRUE; + } + + if ((value = ucl_object_lookup(obj, "write_servers")) != NULL) { + rule->write_servers = rspamd_upstreams_create(cfg->ups_ctx); + rspamd_upstreams_set_limits(rule->write_servers, + (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN, + (unsigned int) fuzzy_module_ctx->max_errors, 0); + + rspamd_mempool_add_destructor(cfg->cfg_pool, + (rspamd_mempool_destruct_t) rspamd_upstreams_destroy, + rule->write_servers); + if (!rspamd_upstreams_from_ucl(rule->write_servers, value, DEFAULT_PORT, NULL)) { + msg_err_config("cannot read write_servers definition"); + return -1; + } + has_write = TRUE; + } + + /* If we have both read and write servers, we don't need the common servers list */ + if (has_read && !has_write) { + /* Use read_servers for all operations */ + rule->write_servers = rule->read_servers; + } + else if (has_write && !has_read) { + /* Use write_servers for all operations */ + rule->read_servers = rule->write_servers; + } } + if ((value = ucl_object_lookup(obj, "fuzzy_map")) != NULL) { it = NULL; while ((cur = ucl_object_iterate(value, &it, true)) != NULL) { @@ -636,7 +683,7 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, strlen(shingles_key_str), NULL, 0); rule->shingles_key->len = 16; - if (rspamd_upstreams_count(rule->servers) == 0) { + if (rspamd_upstreams_count(rule->read_servers) == 0) { msg_err_config("no servers defined for fuzzy rule with name: %s", rule->name); return -1; @@ -898,6 +945,24 @@ int fuzzy_check_module_init(struct rspamd_config *cfg, struct module_ctx **ctx) 0); rspamd_rcl_add_doc_by_path(cfg, "fuzzy_check.rule", + "List of servers to check (read-only operations)", + "read_servers", + UCL_STRING, + NULL, + 0, + NULL, + 0); + rspamd_rcl_add_doc_by_path(cfg, + "fuzzy_check.rule", + "List of servers to learn (write operations)", + "write_servers", + UCL_STRING, + NULL, + 0, + NULL, + 0); + rspamd_rcl_add_doc_by_path(cfg, + "fuzzy_check.rule", "If true then never try to learn this fuzzy storage", "read_only", UCL_BOOLEAN, @@ -1249,7 +1314,7 @@ int fuzzy_check_module_config(struct rspamd_config *cfg, bool validate) LL_FOREACH(value, cur) { - if (ucl_object_lookup(cur, "servers")) { + if (ucl_object_lookup_any(cur, "servers", "read_servers", "write_servers", NULL) != NULL) { /* Unnamed rule */ fuzzy_parse_rule(cfg, cur, NULL, cb_id); nrules++; @@ -1715,26 +1780,30 @@ fuzzy_cmd_write_extensions(struct rspamd_task *task, struct rspamd_email_address *addr = g_ptr_array_index(MESSAGE_FIELD(task, from_mime), 0); - unsigned int to_write = MIN(MAX_FUZZY_DOMAIN, addr->domain_len) + 2; - if (to_write > 0 && to_write <= available) { - *dest++ = RSPAMD_FUZZY_EXT_SOURCE_DOMAIN; - *dest++ = to_write - 2; + if (addr->domain_len > 0) { + /* Filter invalid domains */ + unsigned int to_write = MIN(MAX_FUZZY_DOMAIN, addr->domain_len) + 2; - if (addr->domain_len < MAX_FUZZY_DOMAIN) { - memcpy(dest, addr->domain, addr->domain_len); - dest += addr->domain_len; - } - else { - /* Trim from left */ - memcpy(dest, - addr->domain + (addr->domain_len - MAX_FUZZY_DOMAIN), - MAX_FUZZY_DOMAIN); - dest += MAX_FUZZY_DOMAIN; - } + if (to_write > 0 && to_write <= available) { + *dest++ = RSPAMD_FUZZY_EXT_SOURCE_DOMAIN; + *dest++ = to_write - 2; + + if (addr->domain_len < MAX_FUZZY_DOMAIN) { + memcpy(dest, addr->domain, addr->domain_len); + dest += addr->domain_len; + } + else { + /* Trim from left */ + memcpy(dest, + addr->domain + (addr->domain_len - MAX_FUZZY_DOMAIN), + MAX_FUZZY_DOMAIN); + dest += MAX_FUZZY_DOMAIN; + } - available -= to_write; - written += to_write; + available -= to_write; + written += to_write; + } } } @@ -3394,8 +3463,8 @@ register_fuzzy_client_call(struct rspamd_task *task, int sock; if (!rspamd_session_blocked(task->s)) { - /* Get upstream */ - selected = rspamd_upstream_get(rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN, + /* Get upstream - use read_servers for check operations */ + selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); if (selected) { addr = rspamd_upstream_addr_next(selected); @@ -3522,9 +3591,8 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry, int sock; int ret = -1; - /* Get upstream */ - - while ((selected = rspamd_upstream_get_forced(rule->servers, + /* Get upstream - use write_servers for learn/unlearn operations */ + while ((selected = rspamd_upstream_get_forced(rule->write_servers, RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { /* Create UDP socket */ addr = rspamd_upstream_addr_next(selected); @@ -3538,6 +3606,9 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry, rspamd_upstream_fail(selected, TRUE, strerror(errno)); } else { + msg_info_task("fuzzy storage %s (%s rule) is used for write", + rspamd_inet_address_to_string_pretty(addr), + rule->name); s = rspamd_mempool_alloc0(session->pool, sizeof(struct fuzzy_learn_session)); @@ -3620,6 +3691,7 @@ fuzzy_modify_handler(struct rspamd_http_connection_entry *conn_ent, PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule) { if (rule->mode == fuzzy_rule_read_only) { + msg_debug_task("skip rule %s as it is read-only", rule->name); continue; } @@ -3729,6 +3801,8 @@ fuzzy_modify_handler(struct rspamd_http_connection_entry *conn_ent, else { commands = fuzzy_generate_commands(task, rule, cmd, flag, value, flags); + msg_debug_task("fuzzy command %d for rule %s, flag %d, value %d", + cmd, rule->name, flag, value); if (commands != NULL) { res = register_fuzzy_controller_call(conn_ent, rule, @@ -3894,7 +3968,7 @@ fuzzy_check_send_lua_learn(struct fuzzy_rule *rule, /* Get upstream */ if (!rspamd_session_blocked(task->s)) { - while ((selected = rspamd_upstream_get(rule->servers, + while ((selected = rspamd_upstream_get(rule->write_servers, RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { /* Create UDP socket */ addr = rspamd_upstream_addr_next(selected); @@ -4491,9 +4565,21 @@ fuzzy_lua_list_storages(lua_State *L) lua_setfield(L, -2, "read_only"); /* Push servers */ - lua_createtable(L, rspamd_upstreams_count(rule->servers), 0); - rspamd_upstreams_foreach(rule->servers, lua_upstream_str_inserter, L); - lua_setfield(L, -2, "servers"); + if (rule->read_servers == rule->write_servers) { + /* Same servers for both operations */ + lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0); + rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L); + lua_setfield(L, -2, "servers"); + } + else { + /* Different servers for read and write */ + lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0); + rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L); + lua_setfield(L, -2, "read_servers"); + lua_createtable(L, rspamd_upstreams_count(rule->write_servers), 0); + rspamd_upstreams_foreach(rule->write_servers, lua_upstream_str_inserter, L); + lua_setfield(L, -2, "write_servers"); + } /* Push flags */ GHashTableIter it; @@ -4780,7 +4866,7 @@ fuzzy_lua_ping_storage(lua_State *L) rspamd_ptr_array_free_hard, addrs); } else { - struct upstream *selected = rspamd_upstream_get(rule_found->servers, + struct upstream *selected = rspamd_upstream_get(rule_found->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); addr = rspamd_upstream_addr_next(selected); } @@ -4824,4 +4910,4 @@ fuzzy_lua_ping_storage(lua_State *L) lua_pushboolean(L, TRUE); return 1; -}
\ No newline at end of file +} |