diff options
Diffstat (limited to 'src/plugins/fuzzy_check.c')
-rw-r--r-- | src/plugins/fuzzy_check.c | 178 |
1 files changed, 132 insertions, 46 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index ece9a91e0..7dd5162ac 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,7 +78,8 @@ enum fuzzy_rule_mode { }; struct fuzzy_rule { - struct upstream_list *servers; + struct upstream_list *read_servers; /* Servers for read operations */ + struct upstream_list *write_servers; /* Servers for write operations */ const char *symbol; const char *algorithm_str; const char *name; @@ -543,22 +544,68 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, } if ((value = ucl_object_lookup(obj, "servers")) != NULL) { - rule->servers = rspamd_upstreams_create(cfg->ups_ctx); - /* pass max_error and revive_time configuration in upstream for fuzzy storage - * it allows to configure error_rate threshold and upstream dead timer - */ - rspamd_upstreams_set_limits(rule->servers, + rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx); + rspamd_upstreams_set_limits(rule->read_servers, (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN, (unsigned int) fuzzy_module_ctx->max_errors, 0); rspamd_mempool_add_destructor(cfg->cfg_pool, (rspamd_mempool_destruct_t) rspamd_upstreams_destroy, - rule->servers); - if (!rspamd_upstreams_from_ucl(rule->servers, value, DEFAULT_PORT, NULL)) { + rule->read_servers); + if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) { msg_err_config("cannot read servers definition"); return -1; } + + rule->write_servers = rule->read_servers; + } + else { + /* Check for read_servers and write_servers */ + gboolean has_read = FALSE, has_write = FALSE; + + if ((value = ucl_object_lookup(obj, "read_servers")) != NULL) { + rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx); + rspamd_upstreams_set_limits(rule->read_servers, + (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN, + (unsigned int) fuzzy_module_ctx->max_errors, 0); + + rspamd_mempool_add_destructor(cfg->cfg_pool, + (rspamd_mempool_destruct_t) rspamd_upstreams_destroy, + rule->read_servers); + if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) { + msg_err_config("cannot read read_servers definition"); + return -1; + } + has_read = TRUE; + } + + if ((value = ucl_object_lookup(obj, "write_servers")) != NULL) { + rule->write_servers = rspamd_upstreams_create(cfg->ups_ctx); + rspamd_upstreams_set_limits(rule->write_servers, + (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN, + (unsigned int) fuzzy_module_ctx->max_errors, 0); + + rspamd_mempool_add_destructor(cfg->cfg_pool, + (rspamd_mempool_destruct_t) rspamd_upstreams_destroy, + rule->write_servers); + if (!rspamd_upstreams_from_ucl(rule->write_servers, value, DEFAULT_PORT, NULL)) { + msg_err_config("cannot read write_servers definition"); + return -1; + } + has_write = TRUE; + } + + /* If we have both read and write servers, we don't need the common servers list */ + if (has_read && !has_write) { + /* Use read_servers for all operations */ + rule->write_servers = rule->read_servers; + } + else if (has_write && !has_read) { + /* Use write_servers for all operations */ + rule->read_servers = rule->write_servers; + } } + if ((value = ucl_object_lookup(obj, "fuzzy_map")) != NULL) { it = NULL; while ((cur = ucl_object_iterate(value, &it, true)) != NULL) { @@ -636,7 +683,7 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, strlen(shingles_key_str), NULL, 0); rule->shingles_key->len = 16; - if (rspamd_upstreams_count(rule->servers) == 0) { + if (rspamd_upstreams_count(rule->read_servers) == 0) { msg_err_config("no servers defined for fuzzy rule with name: %s", rule->name); return -1; @@ -898,6 +945,24 @@ int fuzzy_check_module_init(struct rspamd_config *cfg, struct module_ctx **ctx) 0); rspamd_rcl_add_doc_by_path(cfg, "fuzzy_check.rule", + "List of servers to check (read-only operations)", + "read_servers", + UCL_STRING, + NULL, + 0, + NULL, + 0); + rspamd_rcl_add_doc_by_path(cfg, + "fuzzy_check.rule", + "List of servers to learn (write operations)", + "write_servers", + UCL_STRING, + NULL, + 0, + NULL, + 0); + rspamd_rcl_add_doc_by_path(cfg, + "fuzzy_check.rule", "If true then never try to learn this fuzzy storage", "read_only", UCL_BOOLEAN, @@ -1249,7 +1314,7 @@ int fuzzy_check_module_config(struct rspamd_config *cfg, bool validate) LL_FOREACH(value, cur) { - if (ucl_object_lookup(cur, "servers")) { + if (ucl_object_lookup_any(cur, "servers", "read_servers", "write_servers", NULL) != NULL) { /* Unnamed rule */ fuzzy_parse_rule(cfg, cur, NULL, cb_id); nrules++; @@ -1366,10 +1431,10 @@ fuzzy_io_fin(void *ud) close(session->fd); } -static GArray * +static rspamd_words_t * fuzzy_preprocess_words(struct rspamd_mime_text_part *part, rspamd_mempool_t *pool) { - return part->utf_words; + return &part->utf_words; } static void @@ -1715,26 +1780,30 @@ fuzzy_cmd_write_extensions(struct rspamd_task *task, struct rspamd_email_address *addr = g_ptr_array_index(MESSAGE_FIELD(task, from_mime), 0); - unsigned int to_write = MIN(MAX_FUZZY_DOMAIN, addr->domain_len) + 2; - if (to_write > 0 && to_write <= available) { - *dest++ = RSPAMD_FUZZY_EXT_SOURCE_DOMAIN; - *dest++ = to_write - 2; + if (addr->domain_len > 0) { + /* Filter invalid domains */ + unsigned int to_write = MIN(MAX_FUZZY_DOMAIN, addr->domain_len) + 2; - if (addr->domain_len < MAX_FUZZY_DOMAIN) { - memcpy(dest, addr->domain, addr->domain_len); - dest += addr->domain_len; - } - else { - /* Trim from left */ - memcpy(dest, - addr->domain + (addr->domain_len - MAX_FUZZY_DOMAIN), - MAX_FUZZY_DOMAIN); - dest += MAX_FUZZY_DOMAIN; - } + if (to_write > 0 && to_write <= available) { + *dest++ = RSPAMD_FUZZY_EXT_SOURCE_DOMAIN; + *dest++ = to_write - 2; + + if (addr->domain_len < MAX_FUZZY_DOMAIN) { + memcpy(dest, addr->domain, addr->domain_len); + dest += addr->domain_len; + } + else { + /* Trim from left */ + memcpy(dest, + addr->domain + (addr->domain_len - MAX_FUZZY_DOMAIN), + MAX_FUZZY_DOMAIN); + dest += MAX_FUZZY_DOMAIN; + } - available -= to_write; - written += to_write; + available -= to_write; + written += to_write; + } } } @@ -1792,7 +1861,7 @@ fuzzy_cmd_from_text_part(struct rspamd_task *task, unsigned int i; rspamd_cryptobox_hash_state_t st; rspamd_stat_token_t *word; - GArray *words; + rspamd_words_t *words; struct fuzzy_cmd_io *io; unsigned int additional_length; unsigned char *additional_data; @@ -1901,10 +1970,10 @@ fuzzy_cmd_from_text_part(struct rspamd_task *task, rspamd_cryptobox_hash_init(&st, rule->hash_key->str, rule->hash_key->len); words = fuzzy_preprocess_words(part, task->task_pool); - for (i = 0; i < words->len; i++) { - word = &g_array_index(words, rspamd_stat_token_t, i); + for (i = 0; i < kv_size(*words); i++) { + word = &kv_A(*words, i); - if (!((word->flags & RSPAMD_STAT_TOKEN_FLAG_SKIPPED) || word->stemmed.len == 0)) { + if (!((word->flags & RSPAMD_WORD_FLAG_SKIPPED) || word->stemmed.len == 0)) { rspamd_cryptobox_hash_update(&st, word->stemmed.begin, word->stemmed.len); } @@ -2615,7 +2684,7 @@ fuzzy_insert_metric_results(struct rspamd_task *task, struct fuzzy_rule *rule, if (task->message) { PTR_ARRAY_FOREACH(MESSAGE_FIELD(task, text_parts), i, tp) { - if (!IS_TEXT_PART_EMPTY(tp) && tp->utf_words != NULL && tp->utf_words->len > 0) { + if (!IS_TEXT_PART_EMPTY(tp) && kv_size(tp->utf_words) > 0) { seen_text_part = TRUE; if (tp->utf_stripped_text.magic == UTEXT_MAGIC) { @@ -3394,8 +3463,8 @@ register_fuzzy_client_call(struct rspamd_task *task, int sock; if (!rspamd_session_blocked(task->s)) { - /* Get upstream */ - selected = rspamd_upstream_get(rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN, + /* Get upstream - use read_servers for check operations */ + selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); if (selected) { addr = rspamd_upstream_addr_next(selected); @@ -3522,9 +3591,8 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry, int sock; int ret = -1; - /* Get upstream */ - - while ((selected = rspamd_upstream_get_forced(rule->servers, + /* Get upstream - use write_servers for learn/unlearn operations */ + while ((selected = rspamd_upstream_get_forced(rule->write_servers, RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { /* Create UDP socket */ addr = rspamd_upstream_addr_next(selected); @@ -3538,6 +3606,9 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry, rspamd_upstream_fail(selected, TRUE, strerror(errno)); } else { + msg_info_task("fuzzy storage %s (%s rule) is used for write", + rspamd_inet_address_to_string_pretty(addr), + rule->name); s = rspamd_mempool_alloc0(session->pool, sizeof(struct fuzzy_learn_session)); @@ -3620,6 +3691,7 @@ fuzzy_modify_handler(struct rspamd_http_connection_entry *conn_ent, PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule) { if (rule->mode == fuzzy_rule_read_only) { + msg_debug_task("skip rule %s as it is read-only", rule->name); continue; } @@ -3729,6 +3801,8 @@ fuzzy_modify_handler(struct rspamd_http_connection_entry *conn_ent, else { commands = fuzzy_generate_commands(task, rule, cmd, flag, value, flags); + msg_debug_task("fuzzy command %d for rule %s, flag %d, value %d", + cmd, rule->name, flag, value); if (commands != NULL) { res = register_fuzzy_controller_call(conn_ent, rule, @@ -3894,7 +3968,7 @@ fuzzy_check_send_lua_learn(struct fuzzy_rule *rule, /* Get upstream */ if (!rspamd_session_blocked(task->s)) { - while ((selected = rspamd_upstream_get(rule->servers, + while ((selected = rspamd_upstream_get(rule->write_servers, RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { /* Create UDP socket */ addr = rspamd_upstream_addr_next(selected); @@ -4491,9 +4565,21 @@ fuzzy_lua_list_storages(lua_State *L) lua_setfield(L, -2, "read_only"); /* Push servers */ - lua_createtable(L, rspamd_upstreams_count(rule->servers), 0); - rspamd_upstreams_foreach(rule->servers, lua_upstream_str_inserter, L); - lua_setfield(L, -2, "servers"); + if (rule->read_servers == rule->write_servers) { + /* Same servers for both operations */ + lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0); + rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L); + lua_setfield(L, -2, "servers"); + } + else { + /* Different servers for read and write */ + lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0); + rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L); + lua_setfield(L, -2, "read_servers"); + lua_createtable(L, rspamd_upstreams_count(rule->write_servers), 0); + rspamd_upstreams_foreach(rule->write_servers, lua_upstream_str_inserter, L); + lua_setfield(L, -2, "write_servers"); + } /* Push flags */ GHashTableIter it; @@ -4780,7 +4866,7 @@ fuzzy_lua_ping_storage(lua_State *L) rspamd_ptr_array_free_hard, addrs); } else { - struct upstream *selected = rspamd_upstream_get(rule_found->servers, + struct upstream *selected = rspamd_upstream_get(rule_found->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); addr = rspamd_upstream_addr_next(selected); } @@ -4824,4 +4910,4 @@ fuzzy_lua_ping_storage(lua_State *L) lua_pushboolean(L, TRUE); return 1; -}
\ No newline at end of file +} |