From 423edefbda01bb9a44afd69e78d591080d571f58 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sun, 16 Jun 2019 21:09:59 +0100 Subject: [PATCH] [Project] Start maps rework --- src/libutil/map.c | 125 ++++++++++++++++++-------------------- src/libutil/map_private.h | 16 ++--- 2 files changed, 66 insertions(+), 75 deletions(-) diff --git a/src/libutil/map.c b/src/libutil/map.c index 82c668952..17da0062a 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -44,7 +44,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new); static void free_http_cbdata_dtor (gpointer p); static void free_http_cbdata (struct http_callback_data *cbd); -static void rspamd_map_periodic_callback (gint fd, short what, void *ud); +static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd); static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked, gboolean initial, gboolean errored); static gboolean read_map_file_chunks (struct rspamd_map *map, @@ -130,7 +130,7 @@ write_http_request (struct http_callback_data *cbd) cbd->data->host, NULL, cbd, - &cbd->tv); + cbd->timeout); } static gboolean @@ -325,21 +325,23 @@ http_map_error (struct rspamd_http_connection *conn, cbd->bk->uri, cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "", err); - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); MAP_RELEASE (cbd, "http_callback_data"); } static void -rspamd_map_cache_cb (gint fd, short what, gpointer ud) +rspamd_map_cache_cb (struct ev_loop *loop, ev_periodic *w, int revents) { - struct rspamd_http_map_cached_cbdata *cache_cbd = ud; + struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *) + w->data; struct rspamd_map *map; struct http_map_data *data; - struct timeval tv; map = cache_cbd->map; data = cache_cbd->data; + ev_periodic_stop (loop, &cache_cbd->timeout); + if (cache_cbd->gen != cache_cbd->data->gen) { /* We have another update, so this cache element is obviously expired */ /* @@ -349,7 +351,6 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud) msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s", cache_cbd->gen, cache_cbd->data->gen, map->name); MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata"); - event_del (&cache_cbd->timeout); g_free (cache_cbd); } else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) { @@ -359,15 +360,13 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud) */ cache_cbd->last_checked = cache_cbd->data->last_checked; msg_debug_map ("cached data is up to date for %s", map->name); - double_to_tv (map->poll_timeout * 2, &tv); - event_add (&cache_cbd->timeout, &tv); + ev_periodic_again (loop, &cache_cbd->timeout); } else { data->cur_cache_cbd = NULL; g_atomic_int_set (&data->cache->available, 0); MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata"); msg_info_map ("cached data is now expired for %s", map->name); - event_del (&cache_cbd->timeout); g_free (cache_cbd); } } @@ -456,7 +455,7 @@ http_map_finish (struct rspamd_http_connection *conn, g_atomic_int_set (&data->cache->available, 0); data->cur_cache_cbd = NULL; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); MAP_RELEASE (cbd, "http_callback_data"); return 0; @@ -622,6 +621,8 @@ read_data: } /* Check for expires */ + double cached_timeout = map->poll_timeout * 2; + expires_hdr = rspamd_http_message_find_header (msg, "Expires"); if (expires_hdr) { @@ -635,19 +636,12 @@ read_data: hdate = MIN (map->next_check, hdate); } - double cached_timeout = map->next_check - msg->date + - map->poll_timeout * 2; + cached_timeout = map->next_check - msg->date + + map->poll_timeout * 2; map->next_check = hdate; - double_to_tv (cached_timeout, &tv); - } - else { - double_to_tv (map->poll_timeout * 2, &tv); } } - else { - double_to_tv (map->poll_timeout * 2, &tv); - } /* Check for etag */ etag_hdr = rspamd_http_message_find_header (msg, "ETag"); @@ -688,10 +682,9 @@ read_data: cache_cbd->gen = cbd->data->gen; MAP_RETAIN (cache_cbd->shm, "shmem_data"); - event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb, - cache_cbd); - event_base_set (cbd->ev_base, &cache_cbd->timeout); - event_add (&cache_cbd->timeout, &tv); + ev_periodic_set (&cache_cbd->timeout, 0.0, cached_timeout, NULL); + ev_periodic_start (cbd->event_loop, &cache_cbd->timeout); + cache_cbd->timeout.data = cache_cbd; data->cur_cache_cbd = cache_cbd; if (map->next_check) { @@ -700,7 +693,7 @@ read_data: } else { rspamd_http_date_format (next_check_date, sizeof (next_check_date), - time (NULL) + map->poll_timeout); + ev_now (cbd->event_loop) + map->poll_timeout); } @@ -773,7 +766,7 @@ read_data: cbd->periodic->cur_backend ++; munmap (in, dlen); - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); } else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) { cbd->data->last_checked = msg->date; @@ -819,13 +812,13 @@ read_data: } else { rspamd_http_date_format (next_check_date, sizeof (next_check_date), - time (NULL) + map->poll_timeout); + ev_now (cbd->event_loop) + map->poll_timeout); } msg_info_map ("data is not modified for server %s, next check at %s", cbd->data->host, next_check_date); cbd->periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); } else { msg_info_map ("cannot load map %s from %s: HTTP error %d", @@ -838,7 +831,7 @@ read_data: err: cbd->periodic->errored = 1; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); MAP_RELEASE (cbd, "http_callback_data"); return 0; @@ -951,6 +944,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data, } } + ev_stat_stat (map->event_loop, &data->st_ev); len = st.st_size; if (bk->is_signed) { @@ -1045,9 +1039,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data, map->read_callback (NULL, 0, &periodic->cbdata, TRUE); } - /* Also update at the read time */ - memcpy (&data->st, &st, sizeof (struct stat)); - return TRUE; } @@ -1143,7 +1134,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic) map = periodic->map; msg_debug_map ("periodic dtor %p", periodic); - event_del (&periodic->ev); if (periodic->need_modify) { /* We are done */ @@ -1162,6 +1152,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic) g_free (periodic); } +/* Called on timer execution */ +static void +rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents) +{ + struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data; + + ev_timer_stop (loop, w); + rspamd_map_process_periodic (cbd); +} + static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked, gboolean initial, gboolean errored) @@ -1224,14 +1224,11 @@ rspamd_map_schedule_periodic (struct rspamd_map *map, map->scheduled_check = TRUE; REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor); - evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd); - event_base_set (map->ev_base, &cbd->ev); - + ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0); + ev_timer_start (map->event_loop, &cbd->ev); msg_debug_map ("schedule new periodic event %p in %.2f seconds", cbd, jittered_sec); - double_to_tv (jittered_sec, &map->tv); - evtimer_add (&cbd->ev, &map->tv); } static void @@ -1286,7 +1283,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg) msg_err_map ("cannot resolve %s: %s", cbd->data->host, rdns_strerror (reply->code)); cbd->periodic->errored = 1; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic); + rspamd_map_process_periodic (cbd->periodic); } } @@ -1567,7 +1564,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, periodic->need_modify = TRUE; /* Reset the whole chain */ periodic->cur_backend = 0; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } else { if (map->active_http) { @@ -1577,7 +1574,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, else { /* Switch to the next backend */ periodic->cur_backend++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } } @@ -1592,7 +1589,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, /* Switch to the next backend */ periodic->cur_backend++; data->last_modified = data->cache->last_modified; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } @@ -1601,7 +1598,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, else if (!map->active_http) { /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } @@ -1609,7 +1606,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, check: cbd = g_malloc0 (sizeof (struct http_callback_data)); - cbd->ev_base = map->ev_base; + cbd->event_loop = map->event_loop; cbd->map = map; cbd->data = data; cbd->check = check; @@ -1618,7 +1615,6 @@ check: cbd->bk = bk; MAP_RETAIN (bk, "rspamd_map_backend"); cbd->stage = map_resolve_host2; - double_to_tv (map->cfg->map_timeout, &cbd->tv); REF_INIT_RETAIN (cbd, free_http_cbdata); msg_debug_map ("%s map data from %s", check ? "checking" : "reading", @@ -1686,7 +1682,7 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud) } static void -rspamd_map_http_read_callback (gint fd, short what, void *ud) +rspamd_map_http_read_callback (void *ud) { struct map_periodic_cbdata *cbd = ud; struct rspamd_map *map; @@ -1698,36 +1694,31 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud) } static void -rspamd_map_file_check_callback (gint fd, short what, void *ud) +rspamd_map_file_check_callback (void *ud) { struct rspamd_map *map; struct map_periodic_cbdata *periodic = ud; struct file_map_data *data; struct rspamd_map_backend *bk; - struct stat st; map = periodic->map; bk = g_ptr_array_index (map->backends, periodic->cur_backend); data = bk->data.fd; - if (stat (data->filename, &st) != -1 && - (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) { - /* File was modified since last check */ - msg_info_map ("old mtime is %t, new mtime is %t for map file %s", - data->st.st_mtime, st.st_mtime, data->filename); - memcpy (&data->st, &st, sizeof (struct stat)); + if (!data->processed) { + /* File has never been read */ periodic->need_modify = TRUE; periodic->cur_backend = 0; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void @@ -1746,21 +1737,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud) periodic->need_modify = TRUE; periodic->cur_backend = 0; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); return; } /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_file_read_callback (gint fd, short what, void *ud) +rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic) { struct rspamd_map *map; - struct map_periodic_cbdata *periodic = ud; struct file_map_data *data; struct rspamd_map_backend *bk; @@ -1774,17 +1764,19 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud) if (!read_map_file (map, data, bk, periodic)) { periodic->errored = TRUE; } + else { + data->processed = TRUE; + } /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_static_read_callback (gint fd, short what, void *ud) +rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic) { struct rspamd_map *map; - struct map_periodic_cbdata *periodic = ud; struct static_map_data *data; struct rspamd_map_backend *bk; @@ -1801,14 +1793,13 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud) /* Switch to the next backend */ periodic->cur_backend ++; - rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic); + rspamd_map_process_periodic (periodic); } static void -rspamd_map_periodic_callback (gint fd, short what, void *ud) +rspamd_map_process_periodic (struct map_periodic_cbdata *cbd) { struct rspamd_map_backend *bk; - struct map_periodic_cbdata *cbd = ud; struct rspamd_map *map; map = cbd->map; @@ -1904,7 +1895,7 @@ rspamd_map_watch (struct rspamd_config *cfg, /* First of all do synced read of data */ while (cur) { map = cur->data; - map->ev_base = ev_base; + map->event_loop = ev_base; map->r = resolver; map->wrk = worker; diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h index 8b45881b6..71f05aee7 100644 --- a/src/libutil/map_private.h +++ b/src/libutil/map_private.h @@ -54,7 +54,8 @@ enum fetch_proto { */ struct file_map_data { gchar *filename; - struct stat st; + gboolean processed; + ev_stat st_ev; }; @@ -130,7 +131,7 @@ struct rspamd_map { map_fin_cb_t fin_callback; map_dtor_t dtor; void **user_data; - struct ev_loop *ev_base; + struct ev_loop *event_loop; struct rspamd_worker *wrk; gchar *description; gchar *name; @@ -143,7 +144,7 @@ struct rspamd_map { gsize nelts; guint64 digest; /* Should we check HTTP or just load cached data */ - struct timeval tv; + ev_tstamp timeout; gdouble poll_timeout; time_t next_check; gboolean active_http; @@ -164,7 +165,7 @@ enum rspamd_map_http_stage { struct map_periodic_cbdata { struct rspamd_map *map; struct map_cb_data cbdata; - struct event ev; + ev_timer ev; gboolean need_modify; gboolean errored; gboolean locked; @@ -183,7 +184,7 @@ struct rspamd_http_file_data { }; struct http_callback_data { - struct ev_loop *ev_base; + struct ev_loop *event_loop; struct rspamd_http_connection *conn; rspamd_inet_addr_t *addr; struct rspamd_map *map; @@ -191,16 +192,15 @@ struct http_callback_data { struct http_map_data *data; struct map_periodic_cbdata *periodic; struct rspamd_cryptobox_pubkey *pk; - gboolean check; struct rspamd_storage_shmem *shmem_data; struct rspamd_storage_shmem *shmem_sig; struct rspamd_storage_shmem *shmem_pubkey; gsize data_len; gsize sig_len; gsize pubkey_len; - + gboolean check; enum rspamd_map_http_stage stage; - struct timeval tv; + ev_tstamp timeout; ref_entry_t ref; }; -- 2.39.5