aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/plugins/fuzzy_check.c124
-rw-r--r--src/rspamd_proxy.c12
2 files changed, 109 insertions, 27 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 4ed94a141..85ea3b00c 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -78,7 +78,8 @@ enum fuzzy_rule_mode {
};
struct fuzzy_rule {
- struct upstream_list *servers;
+ struct upstream_list *read_servers; /* Servers for read operations */
+ struct upstream_list *write_servers; /* Servers for write operations */
const char *symbol;
const char *algorithm_str;
const char *name;
@@ -543,22 +544,68 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
}
if ((value = ucl_object_lookup(obj, "servers")) != NULL) {
- rule->servers = rspamd_upstreams_create(cfg->ups_ctx);
- /* pass max_error and revive_time configuration in upstream for fuzzy storage
- * it allows to configure error_rate threshold and upstream dead timer
- */
- rspamd_upstreams_set_limits(rule->servers,
+ rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+ rspamd_upstreams_set_limits(rule->read_servers,
(double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN,
(unsigned int) fuzzy_module_ctx->max_errors, 0);
rspamd_mempool_add_destructor(cfg->cfg_pool,
(rspamd_mempool_destruct_t) rspamd_upstreams_destroy,
- rule->servers);
- if (!rspamd_upstreams_from_ucl(rule->servers, value, DEFAULT_PORT, NULL)) {
+ rule->read_servers);
+ if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) {
msg_err_config("cannot read servers definition");
return -1;
}
+
+ rule->write_servers = rule->read_servers;
+ }
+ else {
+ /* Check for read_servers and write_servers */
+ gboolean has_read = FALSE, has_write = FALSE;
+
+ if ((value = ucl_object_lookup(obj, "read_servers")) != NULL) {
+ rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+ rspamd_upstreams_set_limits(rule->read_servers,
+ (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN,
+ (unsigned int) fuzzy_module_ctx->max_errors, 0);
+
+ rspamd_mempool_add_destructor(cfg->cfg_pool,
+ (rspamd_mempool_destruct_t) rspamd_upstreams_destroy,
+ rule->read_servers);
+ if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) {
+ msg_err_config("cannot read read_servers definition");
+ return -1;
+ }
+ has_read = TRUE;
+ }
+
+ if ((value = ucl_object_lookup(obj, "write_servers")) != NULL) {
+ rule->write_servers = rspamd_upstreams_create(cfg->ups_ctx);
+ rspamd_upstreams_set_limits(rule->write_servers,
+ (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN,
+ (unsigned int) fuzzy_module_ctx->max_errors, 0);
+
+ rspamd_mempool_add_destructor(cfg->cfg_pool,
+ (rspamd_mempool_destruct_t) rspamd_upstreams_destroy,
+ rule->write_servers);
+ if (!rspamd_upstreams_from_ucl(rule->write_servers, value, DEFAULT_PORT, NULL)) {
+ msg_err_config("cannot read write_servers definition");
+ return -1;
+ }
+ has_write = TRUE;
+ }
+
+ /* If we have both read and write servers, we don't need the common servers list */
+ if (has_read && !has_write) {
+ /* Use read_servers for all operations */
+ rule->write_servers = rule->read_servers;
+ }
+ else if (has_write && !has_read) {
+ /* Use write_servers for all operations */
+ rule->read_servers = rule->write_servers;
+ }
}
+
if ((value = ucl_object_lookup(obj, "fuzzy_map")) != NULL) {
it = NULL;
while ((cur = ucl_object_iterate(value, &it, true)) != NULL) {
@@ -636,7 +683,7 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
strlen(shingles_key_str), NULL, 0);
rule->shingles_key->len = 16;
- if (rspamd_upstreams_count(rule->servers) == 0) {
+ if (rspamd_upstreams_count(rule->read_servers) == 0) {
msg_err_config("no servers defined for fuzzy rule with name: %s",
rule->name);
return -1;
@@ -898,6 +945,24 @@ int fuzzy_check_module_init(struct rspamd_config *cfg, struct module_ctx **ctx)
0);
rspamd_rcl_add_doc_by_path(cfg,
"fuzzy_check.rule",
+ "List of servers to check (read-only operations)",
+ "read_servers",
+ UCL_STRING,
+ NULL,
+ 0,
+ NULL,
+ 0);
+ rspamd_rcl_add_doc_by_path(cfg,
+ "fuzzy_check.rule",
+ "List of servers to learn (write operations)",
+ "write_servers",
+ UCL_STRING,
+ NULL,
+ 0,
+ NULL,
+ 0);
+ rspamd_rcl_add_doc_by_path(cfg,
+ "fuzzy_check.rule",
"If true then never try to learn this fuzzy storage",
"read_only",
UCL_BOOLEAN,
@@ -1249,7 +1314,7 @@ int fuzzy_check_module_config(struct rspamd_config *cfg, bool validate)
LL_FOREACH(value, cur)
{
- if (ucl_object_lookup(cur, "servers")) {
+ if (ucl_object_lookup_any(cur, "servers", "read_servers", "write_servers", NULL) != NULL) {
/* Unnamed rule */
fuzzy_parse_rule(cfg, cur, NULL, cb_id);
nrules++;
@@ -3398,8 +3463,8 @@ register_fuzzy_client_call(struct rspamd_task *task,
int sock;
if (!rspamd_session_blocked(task->s)) {
- /* Get upstream */
- selected = rspamd_upstream_get(rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+ /* Get upstream - use read_servers for check operations */
+ selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL, 0);
if (selected) {
addr = rspamd_upstream_addr_next(selected);
@@ -3526,9 +3591,8 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry,
int sock;
int ret = -1;
- /* Get upstream */
-
- while ((selected = rspamd_upstream_get_forced(rule->servers,
+ /* Get upstream - use write_servers for learn/unlearn operations */
+ while ((selected = rspamd_upstream_get_forced(rule->write_servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
addr = rspamd_upstream_addr_next(selected);
@@ -3542,6 +3606,9 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry,
rspamd_upstream_fail(selected, TRUE, strerror(errno));
}
else {
+ msg_info_task("fuzzy storage %s (%s rule) is used for write",
+ rspamd_inet_address_to_string_pretty(addr),
+ rule->name);
s =
rspamd_mempool_alloc0(session->pool,
sizeof(struct fuzzy_learn_session));
@@ -3624,6 +3691,7 @@ fuzzy_modify_handler(struct rspamd_http_connection_entry *conn_ent,
PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule)
{
if (rule->mode == fuzzy_rule_read_only) {
+ msg_debug_task("skip rule %s as it is read-only", rule->name);
continue;
}
@@ -3733,6 +3801,8 @@ fuzzy_modify_handler(struct rspamd_http_connection_entry *conn_ent,
else {
commands = fuzzy_generate_commands(task, rule, cmd, flag, value,
flags);
+ msg_debug_task("fuzzy command %d for rule %s, flag %d, value %d",
+ cmd, rule->name, flag, value);
if (commands != NULL) {
res = register_fuzzy_controller_call(conn_ent,
rule,
@@ -3898,7 +3968,7 @@ fuzzy_check_send_lua_learn(struct fuzzy_rule *rule,
/* Get upstream */
if (!rspamd_session_blocked(task->s)) {
- while ((selected = rspamd_upstream_get(rule->servers,
+ while ((selected = rspamd_upstream_get(rule->write_servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
addr = rspamd_upstream_addr_next(selected);
@@ -4495,9 +4565,21 @@ fuzzy_lua_list_storages(lua_State *L)
lua_setfield(L, -2, "read_only");
/* Push servers */
- lua_createtable(L, rspamd_upstreams_count(rule->servers), 0);
- rspamd_upstreams_foreach(rule->servers, lua_upstream_str_inserter, L);
- lua_setfield(L, -2, "servers");
+ if (rule->read_servers == rule->write_servers) {
+ /* Same servers for both operations */
+ lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0);
+ rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L);
+ lua_setfield(L, -2, "servers");
+ }
+ else {
+ /* Different servers for read and write */
+ lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0);
+ rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L);
+ lua_setfield(L, -2, "read_servers");
+ lua_createtable(L, rspamd_upstreams_count(rule->write_servers), 0);
+ rspamd_upstreams_foreach(rule->write_servers, lua_upstream_str_inserter, L);
+ lua_setfield(L, -2, "write_servers");
+ }
/* Push flags */
GHashTableIter it;
@@ -4784,7 +4866,7 @@ fuzzy_lua_ping_storage(lua_State *L)
rspamd_ptr_array_free_hard, addrs);
}
else {
- struct upstream *selected = rspamd_upstream_get(rule_found->servers,
+ struct upstream *selected = rspamd_upstream_get(rule_found->read_servers,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
addr = rspamd_upstream_addr_next(selected);
}
@@ -4828,4 +4910,4 @@ fuzzy_lua_ping_storage(lua_State *L)
lua_pushboolean(L, TRUE);
return 1;
-} \ No newline at end of file
+}
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 694e87c12..b7decbbcc 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -1600,8 +1600,8 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError *
session->retries++;
msg_info_session("abnormally closing connection from backend: %s, error: %e,"
" retries left: %d",
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->master_conn->up)),
+ session->master_conn->up ? rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->master_conn->up)) : "self-scan",
err,
session->ctx->max_retries - session->retries);
rspamd_upstream_fail(bk_conn->up, FALSE, err ? err->message : "unknown");
@@ -1632,8 +1632,8 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError *
else {
msg_info_session("retry connection to: %s"
" retries left: %d",
- rspamd_inet_address_to_string(
- rspamd_upstream_addr_cur(session->master_conn->up)),
+ session->master_conn->up ? rspamd_inet_address_to_string(
+ rspamd_upstream_addr_cur(session->master_conn->up)) : "self-scan",
session->ctx->max_retries - session->retries);
}
}
@@ -2216,8 +2216,8 @@ proxy_client_finish_handler(struct rspamd_http_connection *conn,
}
else {
msg_info_session("finished master connection to %s; HTTP code: %d",
- rspamd_inet_address_to_string_pretty(
- rspamd_upstream_addr_cur(session->master_conn->up)),
+ session->master_conn->up ? rspamd_inet_address_to_string_pretty(
+ rspamd_upstream_addr_cur(session->master_conn->up)) : "self-scan",
msg->code);
proxy_backend_close_connection(session->master_conn);
REF_RELEASE(session);