]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Start maps rework
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 16 Jun 2019 20:09:59 +0000 (21:09 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
src/libutil/map.c
src/libutil/map_private.h

index 82c668952627ca36e1918b7bb028c5e89f959b44..17da0062ac39375f197ac8a98e7df7f7c51fabe5 100644 (file)
@@ -44,7 +44,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd,
                                                                         gboolean plan_new);
 static void free_http_cbdata_dtor (gpointer p);
 static void free_http_cbdata (struct http_callback_data *cbd);
-static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
+static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
 static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
                                                                                  gboolean initial, gboolean errored);
 static gboolean read_map_file_chunks (struct rspamd_map *map,
@@ -130,7 +130,7 @@ write_http_request (struct http_callback_data *cbd)
                        cbd->data->host,
                        NULL,
                        cbd,
-                       &cbd->tv);
+                       cbd->timeout);
 }
 
 static gboolean
@@ -325,21 +325,23 @@ http_map_error (struct rspamd_http_connection *conn,
                        cbd->bk->uri,
                        cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
                        err);
-       rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+       rspamd_map_process_periodic (cbd->periodic);
        MAP_RELEASE (cbd, "http_callback_data");
 }
 
 static void
-rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+rspamd_map_cache_cb (struct ev_loop *loop, ev_periodic *w, int revents)
 {
-       struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+       struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
+                       w->data;
        struct rspamd_map *map;
        struct http_map_data *data;
-       struct timeval tv;
 
        map = cache_cbd->map;
        data = cache_cbd->data;
 
+       ev_periodic_stop (loop, &cache_cbd->timeout);
+
        if (cache_cbd->gen != cache_cbd->data->gen) {
                /* We have another update, so this cache element is obviously expired */
                /*
@@ -349,7 +351,6 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
                msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
                                cache_cbd->gen, cache_cbd->data->gen, map->name);
                MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
-               event_del (&cache_cbd->timeout);
                g_free (cache_cbd);
        }
        else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
@@ -359,15 +360,13 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
                 */
                cache_cbd->last_checked = cache_cbd->data->last_checked;
                msg_debug_map ("cached data is up to date for %s", map->name);
-               double_to_tv (map->poll_timeout * 2, &tv);
-               event_add (&cache_cbd->timeout, &tv);
+               ev_periodic_again (loop, &cache_cbd->timeout);
        }
        else {
                data->cur_cache_cbd = NULL;
                g_atomic_int_set (&data->cache->available, 0);
                MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
                msg_info_map ("cached data is now expired for %s", map->name);
-               event_del (&cache_cbd->timeout);
                g_free (cache_cbd);
        }
 }
@@ -456,7 +455,7 @@ http_map_finish (struct rspamd_http_connection *conn,
                        g_atomic_int_set (&data->cache->available, 0);
                        data->cur_cache_cbd = NULL;
 
-                       rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+                       rspamd_map_process_periodic (cbd->periodic);
                        MAP_RELEASE (cbd, "http_callback_data");
 
                        return 0;
@@ -622,6 +621,8 @@ read_data:
                }
 
                /* Check for expires */
+               double cached_timeout = map->poll_timeout * 2;
+
                expires_hdr = rspamd_http_message_find_header (msg, "Expires");
 
                if (expires_hdr) {
@@ -635,19 +636,12 @@ read_data:
                                        hdate = MIN (map->next_check, hdate);
                                }
 
-                               double cached_timeout = map->next_check - msg->date +
-                                       map->poll_timeout * 2;
+                               cached_timeout = map->next_check - msg->date +
+                                                                map->poll_timeout * 2;
 
                                map->next_check = hdate;
-                               double_to_tv (cached_timeout, &tv);
-                       }
-                       else {
-                               double_to_tv (map->poll_timeout * 2, &tv);
                        }
                }
-               else {
-                       double_to_tv (map->poll_timeout * 2, &tv);
-               }
 
                /* Check for etag */
                etag_hdr = rspamd_http_message_find_header (msg, "ETag");
@@ -688,10 +682,9 @@ read_data:
                cache_cbd->gen = cbd->data->gen;
                MAP_RETAIN (cache_cbd->shm, "shmem_data");
 
-               event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
-                               cache_cbd);
-               event_base_set (cbd->ev_base, &cache_cbd->timeout);
-               event_add (&cache_cbd->timeout, &tv);
+               ev_periodic_set (&cache_cbd->timeout, 0.0, cached_timeout, NULL);
+               ev_periodic_start (cbd->event_loop, &cache_cbd->timeout);
+               cache_cbd->timeout.data = cache_cbd;
                data->cur_cache_cbd = cache_cbd;
 
                if (map->next_check) {
@@ -700,7 +693,7 @@ read_data:
                }
                else {
                        rspamd_http_date_format (next_check_date, sizeof (next_check_date),
-                                       time (NULL) + map->poll_timeout);
+                                       ev_now (cbd->event_loop) + map->poll_timeout);
                }
 
 
@@ -773,7 +766,7 @@ read_data:
 
                cbd->periodic->cur_backend ++;
                munmap (in, dlen);
-               rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+               rspamd_map_process_periodic (cbd->periodic);
        }
        else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) {
                cbd->data->last_checked = msg->date;
@@ -819,13 +812,13 @@ read_data:
                }
                else {
                        rspamd_http_date_format (next_check_date, sizeof (next_check_date),
-                                       time (NULL) + map->poll_timeout);
+                                       ev_now (cbd->event_loop) + map->poll_timeout);
                }
                msg_info_map ("data is not modified for server %s, next check at %s",
                                cbd->data->host, next_check_date);
 
                cbd->periodic->cur_backend ++;
-               rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+               rspamd_map_process_periodic (cbd->periodic);
        }
        else {
                msg_info_map ("cannot load map %s from %s: HTTP error %d",
@@ -838,7 +831,7 @@ read_data:
 
 err:
        cbd->periodic->errored = 1;
-       rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+       rspamd_map_process_periodic (cbd->periodic);
        MAP_RELEASE (cbd, "http_callback_data");
 
        return 0;
@@ -951,6 +944,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
                }
        }
 
+       ev_stat_stat (map->event_loop, &data->st_ev);
        len = st.st_size;
 
        if (bk->is_signed) {
@@ -1045,9 +1039,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
                map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
        }
 
-       /* Also update at the read time */
-       memcpy (&data->st, &st, sizeof (struct stat));
-
        return TRUE;
 }
 
@@ -1143,7 +1134,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
 
        map = periodic->map;
        msg_debug_map ("periodic dtor %p", periodic);
-       event_del (&periodic->ev);
 
        if (periodic->need_modify) {
                /* We are done */
@@ -1162,6 +1152,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
        g_free (periodic);
 }
 
+/* Called on timer execution */
+static void
+rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
+{
+       struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;
+
+       ev_timer_stop (loop, w);
+       rspamd_map_process_periodic (cbd);
+}
+
 static void
 rspamd_map_schedule_periodic (struct rspamd_map *map,
                gboolean locked, gboolean initial, gboolean errored)
@@ -1224,14 +1224,11 @@ rspamd_map_schedule_periodic (struct rspamd_map *map,
        map->scheduled_check = TRUE;
        REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);
 
-       evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd);
-       event_base_set (map->ev_base, &cbd->ev);
-
+       ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
+       ev_timer_start (map->event_loop, &cbd->ev);
 
        msg_debug_map ("schedule new periodic event %p in %.2f seconds",
                        cbd, jittered_sec);
-       double_to_tv (jittered_sec, &map->tv);
-       evtimer_add (&cbd->ev, &map->tv);
 }
 
 static void
@@ -1286,7 +1283,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
                        msg_err_map ("cannot resolve %s: %s", cbd->data->host,
                                        rdns_strerror (reply->code));
                        cbd->periodic->errored = 1;
-                       rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+                       rspamd_map_process_periodic (cbd->periodic);
                }
        }
 
@@ -1567,7 +1564,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
                                periodic->need_modify = TRUE;
                                /* Reset the whole chain */
                                periodic->cur_backend = 0;
-                               rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+                               rspamd_map_process_periodic (periodic);
                        }
                        else {
                                if (map->active_http) {
@@ -1577,7 +1574,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
                                else {
                                        /* Switch to the next backend */
                                        periodic->cur_backend++;
-                                       rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+                                       rspamd_map_process_periodic (periodic);
                                }
                        }
 
@@ -1592,7 +1589,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
                                /* Switch to the next backend */
                                periodic->cur_backend++;
                                data->last_modified = data->cache->last_modified;
-                               rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+                               rspamd_map_process_periodic (periodic);
 
                                return;
                        }
@@ -1601,7 +1598,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
        else if (!map->active_http) {
                /* Switch to the next backend */
                periodic->cur_backend ++;
-               rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+               rspamd_map_process_periodic (periodic);
 
                return;
        }
@@ -1609,7 +1606,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
 check:
        cbd = g_malloc0 (sizeof (struct http_callback_data));
 
-       cbd->ev_base = map->ev_base;
+       cbd->event_loop = map->event_loop;
        cbd->map = map;
        cbd->data = data;
        cbd->check = check;
@@ -1618,7 +1615,6 @@ check:
        cbd->bk = bk;
        MAP_RETAIN (bk, "rspamd_map_backend");
        cbd->stage = map_resolve_host2;
-       double_to_tv (map->cfg->map_timeout, &cbd->tv);
        REF_INIT_RETAIN (cbd, free_http_cbdata);
 
        msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
@@ -1686,7 +1682,7 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud)
 }
 
 static void
-rspamd_map_http_read_callback (gint fd, short what, void *ud)
+rspamd_map_http_read_callback (void *ud)
 {
        struct map_periodic_cbdata *cbd = ud;
        struct rspamd_map *map;
@@ -1698,36 +1694,31 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud)
 }
 
 static void
-rspamd_map_file_check_callback (gint fd, short what, void *ud)
+rspamd_map_file_check_callback (void *ud)
 {
        struct rspamd_map *map;
        struct map_periodic_cbdata *periodic = ud;
        struct file_map_data *data;
        struct rspamd_map_backend *bk;
-       struct stat st;
 
        map = periodic->map;
 
        bk = g_ptr_array_index (map->backends, periodic->cur_backend);
        data = bk->data.fd;
 
-       if (stat (data->filename, &st) != -1 &&
-                       (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
-               /* File was modified since last check */
-               msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
-                               data->st.st_mtime, st.st_mtime, data->filename);
-               memcpy (&data->st, &st, sizeof (struct stat));
+       if (!data->processed) {
+               /* File has never been read */
                periodic->need_modify = TRUE;
                periodic->cur_backend = 0;
 
-               rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+               rspamd_map_process_periodic (periodic);
 
                return;
        }
 
        /* Switch to the next backend */
        periodic->cur_backend ++;
-       rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+       rspamd_map_process_periodic (periodic);
 }
 
 static void
@@ -1746,21 +1737,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud)
                periodic->need_modify = TRUE;
                periodic->cur_backend = 0;
 
-               rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+               rspamd_map_process_periodic (periodic);
 
                return;
        }
 
        /* Switch to the next backend */
        periodic->cur_backend ++;
-       rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+       rspamd_map_process_periodic (periodic);
 }
 
 static void
-rspamd_map_file_read_callback (gint fd, short what, void *ud)
+rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
 {
        struct rspamd_map *map;
-       struct map_periodic_cbdata *periodic = ud;
        struct file_map_data *data;
        struct rspamd_map_backend *bk;
 
@@ -1774,17 +1764,19 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud)
        if (!read_map_file (map, data, bk, periodic)) {
                periodic->errored = TRUE;
        }
+       else {
+               data->processed = TRUE;
+       }
 
        /* Switch to the next backend */
        periodic->cur_backend ++;
-       rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+       rspamd_map_process_periodic (periodic);
 }
 
 static void
-rspamd_map_static_read_callback (gint fd, short what, void *ud)
+rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
 {
        struct rspamd_map *map;
-       struct map_periodic_cbdata *periodic = ud;
        struct static_map_data *data;
        struct rspamd_map_backend *bk;
 
@@ -1801,14 +1793,13 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud)
 
        /* Switch to the next backend */
        periodic->cur_backend ++;
-       rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+       rspamd_map_process_periodic (periodic);
 }
 
 static void
-rspamd_map_periodic_callback (gint fd, short what, void *ud)
+rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
 {
        struct rspamd_map_backend *bk;
-       struct map_periodic_cbdata *cbd = ud;
        struct rspamd_map *map;
 
        map = cbd->map;
@@ -1904,7 +1895,7 @@ rspamd_map_watch (struct rspamd_config *cfg,
        /* First of all do synced read of data */
        while (cur) {
                map = cur->data;
-               map->ev_base = ev_base;
+               map->event_loop = ev_base;
                map->r = resolver;
                map->wrk = worker;
 
index 8b45881b698b658b15346a208648b970a951037b..71f05aee78d2a2717c7ed8bc4284aeeb87b28c96 100644 (file)
@@ -54,7 +54,8 @@ enum fetch_proto {
  */
 struct file_map_data {
        gchar *filename;
-       struct stat st;
+       gboolean processed;
+       ev_stat st_ev;
 };
 
 
@@ -130,7 +131,7 @@ struct rspamd_map {
        map_fin_cb_t fin_callback;
        map_dtor_t dtor;
        void **user_data;
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        struct rspamd_worker *wrk;
        gchar *description;
        gchar *name;
@@ -143,7 +144,7 @@ struct rspamd_map {
        gsize nelts;
        guint64 digest;
        /* Should we check HTTP or just load cached data */
-       struct timeval tv;
+       ev_tstamp timeout;
        gdouble poll_timeout;
        time_t next_check;
        gboolean active_http;
@@ -164,7 +165,7 @@ enum rspamd_map_http_stage {
 struct map_periodic_cbdata {
        struct rspamd_map *map;
        struct map_cb_data cbdata;
-       struct event ev;
+       ev_timer ev;
        gboolean need_modify;
        gboolean errored;
        gboolean locked;
@@ -183,7 +184,7 @@ struct rspamd_http_file_data {
 };
 
 struct http_callback_data {
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        struct rspamd_http_connection *conn;
        rspamd_inet_addr_t *addr;
        struct rspamd_map *map;
@@ -191,16 +192,15 @@ struct http_callback_data {
        struct http_map_data *data;
        struct map_periodic_cbdata *periodic;
        struct rspamd_cryptobox_pubkey *pk;
-       gboolean check;
        struct rspamd_storage_shmem *shmem_data;
        struct rspamd_storage_shmem *shmem_sig;
        struct rspamd_storage_shmem *shmem_pubkey;
        gsize data_len;
        gsize sig_len;
        gsize pubkey_len;
-
+       gboolean check;
        enum rspamd_map_http_stage stage;
-       struct timeval tv;
+       ev_tstamp timeout;
 
        ref_entry_t ref;
 };