aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2025-03-28 13:04:53 +0000
committerVsevolod Stakhov <vsevolod@rspamd.com>2025-03-28 13:04:53 +0000
commit4c98aab6f670c659dff4c7e0cf392576f7850732 (patch)
treeaba8bba85d072d713eb587e957ca1afe6a060e2d
parent97540ff1ae17dddb84e0a843338bdc2d39911064 (diff)
downloadrspamd-4c98aab6f670c659dff4c7e0cf392576f7850732.tar.gz
rspamd-4c98aab6f670c659dff4c7e0cf392576f7850732.zip
[Rework] Use locks/loaded per backend for all maps
-rw-r--r--src/controller.c10
-rw-r--r--src/libserver/maps/map.c65
-rw-r--r--src/libserver/maps/map_private.h20
3 files changed, 48 insertions, 47 deletions
diff --git a/src/controller.c b/src/controller.c
index 895611589..22423e999 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -992,9 +992,9 @@ rspamd_controller_handle_maps(struct rspamd_http_connection_entry *conn_ent,
"type", 0, false);
ucl_object_insert_key(obj, ucl_object_frombool(editable),
"editable", 0, false);
- ucl_object_insert_key(obj, ucl_object_frombool(map->shared->loaded),
+ ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->loaded),
"loaded", 0, false);
- ucl_object_insert_key(obj, ucl_object_frombool(map->shared->cached),
+ ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->cached),
"cached", 0, false);
ucl_array_append(top, obj);
}
@@ -1012,9 +1012,9 @@ rspamd_controller_handle_maps(struct rspamd_http_connection_entry *conn_ent,
"type", 0, false);
ucl_object_insert_key(obj, ucl_object_frombool(false),
"editable", 0, false);
- ucl_object_insert_key(obj, ucl_object_frombool(map->shared->loaded),
+ ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->loaded),
"loaded", 0, false);
- ucl_object_insert_key(obj, ucl_object_frombool(map->shared->cached),
+ ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->cached),
"cached", 0, false);
ucl_array_append(top, obj);
}
@@ -1141,7 +1141,7 @@ rspamd_controller_handle_get_map(struct rspamd_http_connection_entry *conn_ent,
rspamd_map_traverse(bk->map, rspamd_controller_map_traverse_callback, &map_body, FALSE);
rspamd_http_message_set_body_from_fstring_steal(reply, map_body);
}
- else if (map->shared->loaded) {
+ else if (bk->shared->loaded) {
reply = rspamd_http_new_message(HTTP_RESPONSE);
reply->code = 200;
rspamd_fstring_t *map_body = rspamd_fstring_new();
diff --git a/src/libserver/maps/map.c b/src/libserver/maps/map.c
index 76d639a69..51390f24b 100644
--- a/src/libserver/maps/map.c
+++ b/src/libserver/maps/map.c
@@ -339,7 +339,7 @@ http_map_finish(struct rspamd_http_connection *conn,
cbd->periodic->cur_backend = 0;
/* Reset cache, old cached data will be cleaned on timeout */
g_atomic_int_set(&data->cache->available, 0);
- g_atomic_int_set(&map->shared->loaded, 0);
+ g_atomic_int_set(&bk->shared->loaded, 0);
data->cur_cache_cbd = NULL;
rspamd_map_process_periodic(cbd->periodic);
@@ -425,8 +425,8 @@ http_map_finish(struct rspamd_http_connection *conn,
* We know that a map is in the locked state
*/
g_atomic_int_set(&data->cache->available, 1);
- g_atomic_int_set(&map->shared->loaded, 1);
- g_atomic_int_set(&map->shared->cached, 0);
+ g_atomic_int_set(&bk->shared->loaded, 1);
+ g_atomic_int_set(&bk->shared->cached, 0);
/* Store cached data */
rspamd_strlcpy(data->cache->shmem_name, cbd->shmem_data->shm_name,
sizeof(data->cache->shmem_name));
@@ -922,7 +922,7 @@ read_map_file(struct rspamd_map *map, struct file_map_data *data,
map->read_callback(NULL, 0, &periodic->cbdata, TRUE);
}
- g_atomic_int_set(&map->shared->loaded, 1);
+ g_atomic_int_set(&bk->shared->loaded, 1);
return TRUE;
}
@@ -1008,7 +1008,7 @@ read_map_static(struct rspamd_map *map, struct static_map_data *data,
}
data->processed = TRUE;
- g_atomic_int_set(&map->shared->loaded, 1);
+ g_atomic_int_set(&bk->shared->loaded, 1);
return TRUE;
}
@@ -1017,6 +1017,7 @@ static void
rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
map = periodic->map;
msg_debug_map("periodic dtor %p; need_modify=%d", periodic, periodic->need_modify);
@@ -1034,8 +1035,11 @@ rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic)
}
if (periodic->locked) {
- g_atomic_int_set(&periodic->map->shared->locked, 0);
- msg_debug_map("unlocked map %s", periodic->map->name);
+ if (periodic->cur_backend < map->backends->len) {
+ bk = (struct rspamd_map_backend *) g_ptr_array_index(map->backends, periodic->cur_backend);
+ g_atomic_int_set(&bk->shared->locked, 0);
+ msg_debug_map("unlocked map %s", map->name);
+ }
if (periodic->map->wrk->state == rspamd_worker_state_running) {
rspamd_map_schedule_periodic(periodic->map,
@@ -1444,8 +1448,8 @@ rspamd_map_read_cached(struct rspamd_map *map, struct rspamd_map_backend *bk,
map->read_callback(in, len, &periodic->cbdata, TRUE);
}
- g_atomic_int_set(&map->shared->loaded, 1);
- g_atomic_int_set(&map->shared->cached, 1);
+ g_atomic_int_set(&bk->shared->loaded, 1);
+ g_atomic_int_set(&bk->shared->cached, 1);
munmap(in, mmap_len);
@@ -1736,8 +1740,8 @@ rspamd_map_read_http_cached_file(struct rspamd_map *map,
struct tm tm;
char ncheck_buf[32], lm_buf[32];
- g_atomic_int_set(&map->shared->loaded, 1);
- g_atomic_int_set(&map->shared->cached, 1);
+ g_atomic_int_set(&bk->shared->loaded, 1);
+ g_atomic_int_set(&bk->shared->cached, 1);
rspamd_localtime(map->next_check, &tm);
strftime(ncheck_buf, sizeof(ncheck_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
rspamd_localtime(htdata->last_modified, &tm);
@@ -2038,8 +2042,20 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd)
map = cbd->map;
map->scheduled_check = NULL;
+ /* For each backend we need to check for modifications */
+ if (cbd->cur_backend >= cbd->map->backends->len) {
+ /* Last backend */
+ msg_debug_map("finished map: %d of %d", cbd->cur_backend,
+ cbd->map->backends->len);
+ MAP_RELEASE(cbd, "periodic");
+
+ return;
+ }
+
+ bk = g_ptr_array_index(map->backends, cbd->cur_backend);
+
if (!map->file_only && !cbd->locked) {
- if (!g_atomic_int_compare_and_exchange(&cbd->map->shared->locked,
+ if (!g_atomic_int_compare_and_exchange(&bk->shared->locked,
0, 1)) {
msg_debug_map(
"don't try to reread map %s as it is locked by other process, "
@@ -2051,7 +2067,7 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd)
return;
}
else {
- msg_debug_map("locked map %s", cbd->map->name);
+ msg_debug_map("locked map %s", map->name);
cbd->locked = TRUE;
}
}
@@ -2061,7 +2077,7 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd)
rspamd_map_schedule_periodic(cbd->map, RSPAMD_MAP_SCHEDULE_ERROR);
if (cbd->locked) {
- g_atomic_int_set(&cbd->map->shared->locked, 0);
+ g_atomic_int_set(&bk->shared->locked, 0);
cbd->locked = FALSE;
}
@@ -2075,19 +2091,7 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd)
return;
}
- /* For each backend we need to check for modifications */
- if (cbd->cur_backend >= cbd->map->backends->len) {
- /* Last backend */
- msg_debug_map("finished map: %d of %d", cbd->cur_backend,
- cbd->map->backends->len);
- MAP_RELEASE(cbd, "periodic");
-
- return;
- }
-
if (cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running) {
- bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend);
- g_assert(bk != NULL);
if (cbd->need_modify) {
/* Load data from the next backend */
@@ -2792,6 +2796,9 @@ rspamd_map_parse_backend(struct rspamd_config *cfg, const char *map_line)
bk->data.sd = sdata;
}
+ bk->shared = rspamd_mempool_alloc0_shared(cfg->cfg_pool,
+ sizeof(struct rspamd_map_shared_backend_data));
+
return bk;
err:
@@ -2922,8 +2929,6 @@ rspamd_map_add(struct rspamd_config *cfg,
map->user_data = user_data;
map->cfg = cfg;
map->id = rspamd_random_uint64_fast();
- map->shared =
- rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(struct rspamd_map_shared_data));
map->backends = g_ptr_array_sized_new(1);
map->wrk = worker;
rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard,
@@ -3022,8 +3027,6 @@ rspamd_map_add_from_ucl(struct rspamd_config *cfg,
map->user_data = user_data;
map->cfg = cfg;
map->id = rspamd_random_uint64_fast();
- map->shared =
- rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(struct rspamd_map_shared_data));
map->backends = g_ptr_array_new();
map->wrk = worker;
map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ);
@@ -3205,7 +3208,7 @@ rspamd_map_add_from_ucl(struct rspamd_config *cfg,
if (all_loaded) {
/* Static map */
- g_atomic_int_set(&map->shared->loaded, 1);
+ g_atomic_int_set(&bk->shared->loaded, 1);
}
rspamd_map_calculate_hash(map);
diff --git a/src/libserver/maps/map_private.h b/src/libserver/maps/map_private.h
index 0a912a5da..66949f926 100644
--- a/src/libserver/maps/map_private.h
+++ b/src/libserver/maps/map_private.h
@@ -134,11 +134,20 @@ union rspamd_map_backend_data {
struct rspamd_map;
+/*
+ * Shared between workers
+ */
+struct rspamd_map_shared_backend_data {
+ int locked;
+ int loaded;
+ int cached;
+};
struct rspamd_map_backend {
enum fetch_proto protocol;
gboolean is_signed;
gboolean is_compressed;
gboolean is_fallback;
+ struct rspamd_map_shared_backend_data *shared;
struct rspamd_map *map;
struct ev_loop *event_loop;
uint64_t id;
@@ -150,15 +159,6 @@ struct rspamd_map_backend {
struct map_periodic_cbdata;
-/*
- * Shared between workers
- */
-struct rspamd_map_shared_data {
- int locked;
- int loaded;
- int cached;
-};
-
struct rspamd_map {
struct rspamd_dns_resolver *r;
struct rspamd_config *cfg;
@@ -193,8 +193,6 @@ struct rspamd_map {
bool static_only; /* No need to check */
bool no_file_read; /* Do not read files */
bool seen; /* This map has already been watched or pre-loaded */
- /* Shared lock for temporary disabling of map reading (e.g. when this map is written by UI) */
- struct rspamd_map_shared_data *shared;
char tag[MEMPOOL_UID_LEN];
};