diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2025-03-28 13:04:53 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2025-03-28 13:04:53 +0000 |
commit | 4c98aab6f670c659dff4c7e0cf392576f7850732 (patch) | |
tree | aba8bba85d072d713eb587e957ca1afe6a060e2d | |
parent | 97540ff1ae17dddb84e0a843338bdc2d39911064 (diff) | |
download | rspamd-4c98aab6f670c659dff4c7e0cf392576f7850732.tar.gz rspamd-4c98aab6f670c659dff4c7e0cf392576f7850732.zip |
[Rework] Use locks/loaded per backend for all maps
-rw-r--r-- | src/controller.c | 10 | ||||
-rw-r--r-- | src/libserver/maps/map.c | 65 | ||||
-rw-r--r-- | src/libserver/maps/map_private.h | 20 |
3 files changed, 48 insertions, 47 deletions
diff --git a/src/controller.c b/src/controller.c index 895611589..22423e999 100644 --- a/src/controller.c +++ b/src/controller.c @@ -992,9 +992,9 @@ rspamd_controller_handle_maps(struct rspamd_http_connection_entry *conn_ent, "type", 0, false); ucl_object_insert_key(obj, ucl_object_frombool(editable), "editable", 0, false); - ucl_object_insert_key(obj, ucl_object_frombool(map->shared->loaded), + ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->loaded), "loaded", 0, false); - ucl_object_insert_key(obj, ucl_object_frombool(map->shared->cached), + ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->cached), "cached", 0, false); ucl_array_append(top, obj); } @@ -1012,9 +1012,9 @@ rspamd_controller_handle_maps(struct rspamd_http_connection_entry *conn_ent, "type", 0, false); ucl_object_insert_key(obj, ucl_object_frombool(false), "editable", 0, false); - ucl_object_insert_key(obj, ucl_object_frombool(map->shared->loaded), + ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->loaded), "loaded", 0, false); - ucl_object_insert_key(obj, ucl_object_frombool(map->shared->cached), + ucl_object_insert_key(obj, ucl_object_frombool(bk->shared->cached), "cached", 0, false); ucl_array_append(top, obj); } @@ -1141,7 +1141,7 @@ rspamd_controller_handle_get_map(struct rspamd_http_connection_entry *conn_ent, rspamd_map_traverse(bk->map, rspamd_controller_map_traverse_callback, &map_body, FALSE); rspamd_http_message_set_body_from_fstring_steal(reply, map_body); } - else if (map->shared->loaded) { + else if (bk->shared->loaded) { reply = rspamd_http_new_message(HTTP_RESPONSE); reply->code = 200; rspamd_fstring_t *map_body = rspamd_fstring_new(); diff --git a/src/libserver/maps/map.c b/src/libserver/maps/map.c index 76d639a69..51390f24b 100644 --- a/src/libserver/maps/map.c +++ b/src/libserver/maps/map.c @@ -339,7 +339,7 @@ http_map_finish(struct rspamd_http_connection *conn, cbd->periodic->cur_backend = 0; /* Reset cache, old cached data will be cleaned on timeout */ g_atomic_int_set(&data->cache->available, 0); - g_atomic_int_set(&map->shared->loaded, 0); + g_atomic_int_set(&bk->shared->loaded, 0); data->cur_cache_cbd = NULL; rspamd_map_process_periodic(cbd->periodic); @@ -425,8 +425,8 @@ http_map_finish(struct rspamd_http_connection *conn, * We know that a map is in the locked state */ g_atomic_int_set(&data->cache->available, 1); - g_atomic_int_set(&map->shared->loaded, 1); - g_atomic_int_set(&map->shared->cached, 0); + g_atomic_int_set(&bk->shared->loaded, 1); + g_atomic_int_set(&bk->shared->cached, 0); /* Store cached data */ rspamd_strlcpy(data->cache->shmem_name, cbd->shmem_data->shm_name, sizeof(data->cache->shmem_name)); @@ -922,7 +922,7 @@ read_map_file(struct rspamd_map *map, struct file_map_data *data, map->read_callback(NULL, 0, &periodic->cbdata, TRUE); } - g_atomic_int_set(&map->shared->loaded, 1); + g_atomic_int_set(&bk->shared->loaded, 1); return TRUE; } @@ -1008,7 +1008,7 @@ read_map_static(struct rspamd_map *map, struct static_map_data *data, } data->processed = TRUE; - g_atomic_int_set(&map->shared->loaded, 1); + g_atomic_int_set(&bk->shared->loaded, 1); return TRUE; } @@ -1017,6 +1017,7 @@ static void rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic) { struct rspamd_map *map; + struct rspamd_map_backend *bk; map = periodic->map; msg_debug_map("periodic dtor %p; need_modify=%d", periodic, periodic->need_modify); @@ -1034,8 +1035,11 @@ rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic) } if (periodic->locked) { - g_atomic_int_set(&periodic->map->shared->locked, 0); - msg_debug_map("unlocked map %s", periodic->map->name); + if (periodic->cur_backend < map->backends->len) { + bk = (struct rspamd_map_backend *) g_ptr_array_index(map->backends, periodic->cur_backend); + g_atomic_int_set(&bk->shared->locked, 0); + msg_debug_map("unlocked map %s", map->name); + } if (periodic->map->wrk->state == rspamd_worker_state_running) { rspamd_map_schedule_periodic(periodic->map, @@ -1444,8 +1448,8 @@ rspamd_map_read_cached(struct rspamd_map *map, struct rspamd_map_backend *bk, map->read_callback(in, len, &periodic->cbdata, TRUE); } - g_atomic_int_set(&map->shared->loaded, 1); - g_atomic_int_set(&map->shared->cached, 1); + g_atomic_int_set(&bk->shared->loaded, 1); + g_atomic_int_set(&bk->shared->cached, 1); munmap(in, mmap_len); @@ -1736,8 +1740,8 @@ rspamd_map_read_http_cached_file(struct rspamd_map *map, struct tm tm; char ncheck_buf[32], lm_buf[32]; - g_atomic_int_set(&map->shared->loaded, 1); - g_atomic_int_set(&map->shared->cached, 1); + g_atomic_int_set(&bk->shared->loaded, 1); + g_atomic_int_set(&bk->shared->cached, 1); rspamd_localtime(map->next_check, &tm); strftime(ncheck_buf, sizeof(ncheck_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm); rspamd_localtime(htdata->last_modified, &tm); @@ -2038,8 +2042,20 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd) map = cbd->map; map->scheduled_check = NULL; + /* For each backend we need to check for modifications */ + if (cbd->cur_backend >= cbd->map->backends->len) { + /* Last backend */ + msg_debug_map("finished map: %d of %d", cbd->cur_backend, + cbd->map->backends->len); + MAP_RELEASE(cbd, "periodic"); + + return; + } + + bk = g_ptr_array_index(map->backends, cbd->cur_backend); + if (!map->file_only && !cbd->locked) { - if (!g_atomic_int_compare_and_exchange(&cbd->map->shared->locked, + if (!g_atomic_int_compare_and_exchange(&bk->shared->locked, 0, 1)) { msg_debug_map( "don't try to reread map %s as it is locked by other process, " @@ -2051,7 +2067,7 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd) return; } else { - msg_debug_map("locked map %s", cbd->map->name); + msg_debug_map("locked map %s", map->name); cbd->locked = TRUE; } } @@ -2061,7 +2077,7 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd) rspamd_map_schedule_periodic(cbd->map, RSPAMD_MAP_SCHEDULE_ERROR); if (cbd->locked) { - g_atomic_int_set(&cbd->map->shared->locked, 0); + g_atomic_int_set(&bk->shared->locked, 0); cbd->locked = FALSE; } @@ -2075,19 +2091,7 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd) return; } - /* For each backend we need to check for modifications */ - if (cbd->cur_backend >= cbd->map->backends->len) { - /* Last backend */ - msg_debug_map("finished map: %d of %d", cbd->cur_backend, - cbd->map->backends->len); - MAP_RELEASE(cbd, "periodic"); - - return; - } - if (cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running) { - bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); - g_assert(bk != NULL); if (cbd->need_modify) { /* Load data from the next backend */ @@ -2792,6 +2796,9 @@ rspamd_map_parse_backend(struct rspamd_config *cfg, const char *map_line) bk->data.sd = sdata; } + bk->shared = rspamd_mempool_alloc0_shared(cfg->cfg_pool, + sizeof(struct rspamd_map_shared_backend_data)); + return bk; err: @@ -2922,8 +2929,6 @@ rspamd_map_add(struct rspamd_config *cfg, map->user_data = user_data; map->cfg = cfg; map->id = rspamd_random_uint64_fast(); - map->shared = - rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(struct rspamd_map_shared_data)); map->backends = g_ptr_array_sized_new(1); map->wrk = worker; rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard, @@ -3022,8 +3027,6 @@ rspamd_map_add_from_ucl(struct rspamd_config *cfg, map->user_data = user_data; map->cfg = cfg; map->id = rspamd_random_uint64_fast(); - map->shared = - rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(struct rspamd_map_shared_data)); map->backends = g_ptr_array_new(); map->wrk = worker; map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ); @@ -3205,7 +3208,7 @@ rspamd_map_add_from_ucl(struct rspamd_config *cfg, if (all_loaded) { /* Static map */ - g_atomic_int_set(&map->shared->loaded, 1); + g_atomic_int_set(&bk->shared->loaded, 1); } rspamd_map_calculate_hash(map); diff --git a/src/libserver/maps/map_private.h b/src/libserver/maps/map_private.h index 0a912a5da..66949f926 100644 --- a/src/libserver/maps/map_private.h +++ b/src/libserver/maps/map_private.h @@ -134,11 +134,20 @@ union rspamd_map_backend_data { struct rspamd_map; +/* + * Shared between workers + */ +struct rspamd_map_shared_backend_data { + int locked; + int loaded; + int cached; +}; struct rspamd_map_backend { enum fetch_proto protocol; gboolean is_signed; gboolean is_compressed; gboolean is_fallback; + struct rspamd_map_shared_backend_data *shared; struct rspamd_map *map; struct ev_loop *event_loop; uint64_t id; @@ -150,15 +159,6 @@ struct rspamd_map_backend { struct map_periodic_cbdata; -/* - * Shared between workers - */ -struct rspamd_map_shared_data { - int locked; - int loaded; - int cached; -}; - struct rspamd_map { struct rspamd_dns_resolver *r; struct rspamd_config *cfg; @@ -193,8 +193,6 @@ struct rspamd_map { bool static_only; /* No need to check */ bool no_file_read; /* Do not read files */ bool seen; /* This map has already been watched or pre-loaded */ - /* Shared lock for temporary disabling of map reading (e.g. when this map is written by UI) */ - struct rspamd_map_shared_data *shared; char tag[MEMPOOL_UID_LEN]; }; |