]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement inter-process maps caching
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 21 Jun 2016 15:15:03 +0000 (16:15 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 21 Jun 2016 15:15:03 +0000 (16:15 +0100)
src/libutil/map.c
src/libutil/map_private.h

index e38acc18aba4c48de6dd711ade018eabb8b7674f..17ab1058e7e06a4464ba06e7e629bb34da95bdd6 100644 (file)
@@ -59,6 +59,13 @@ static void free_http_cbdata (struct http_callback_data *cbd);
 static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
 static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
                gboolean initial, gboolean errored);
+
+struct rspamd_http_map_cached_cbdata {
+       struct event timeout;
+       struct rspamd_storage_shmem *shm;
+       struct rspamd_map *map;
+};
+
 /**
  * Write HTTP request
  */
@@ -306,6 +313,17 @@ http_map_error (struct rspamd_http_connection *conn,
        MAP_RELEASE (cbd);
 }
 
+static void
+rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+{
+       struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+
+       g_atomic_int_set (&cache_cbd->map->cache->available, 0);
+       REF_RELEASE (cache_cbd->shm);
+       event_del (&cache_cbd->timeout);
+       g_slice_free1 (sizeof (*cache_cbd), cache_cbd);
+}
+
 static int
 http_map_finish (struct rspamd_http_connection *conn,
                struct rspamd_http_message *msg)
@@ -465,6 +483,30 @@ http_map_finish (struct rspamd_http_connection *conn,
 
                map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
                msg_info_map ("read map data from %s", cbd->data->host);
+
+               /*
+                * We know that a map is in the locked state
+                */
+               if (g_atomic_int_compare_and_exchange (&map->cache->available, 0, 1)) {
+                       /* Store cached data */
+                       struct rspamd_http_map_cached_cbdata *cache_cbd;
+                       struct timeval tv;
+
+                       rspamd_strlcpy (map->cache->shmem_name, cbd->shmem_data->shm_name,
+                                       sizeof (map->cache->shmem_name));
+                       map->cache->len = cbd->data_len;
+                       map->cache->last_checked = cbd->data->last_checked;
+                       cache_cbd = g_slice_alloc0 (sizeof (*cache_cbd));
+                       cache_cbd->shm = cbd->shmem_data;
+                       cache_cbd->map = map;
+                       REF_RETAIN (cache_cbd->shm);
+                       event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
+                                       cache_cbd);
+                       event_base_set (cbd->ev_base, &cache_cbd->timeout);
+                       double_to_tv (map->poll_timeout, &tv);
+                       event_add (&cache_cbd->timeout, &tv);
+               }
+
                cbd->periodic->cur_backend ++;
                munmap (in, dlen);
                rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
@@ -570,7 +612,7 @@ static void
 rspamd_map_schedule_periodic (struct rspamd_map *map,
                gboolean locked, gboolean initial, gboolean errored)
 {
-       const gdouble error_mult = 20.0, lock_mult = 0.5;
+       const gdouble error_mult = 20.0, lock_mult = 0.1;
        gdouble jittered_sec;
        gdouble timeout;
        struct map_periodic_cbdata *cbd;
@@ -669,6 +711,31 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
        MAP_RELEASE (cbd);
 }
 
+static gboolean
+rspamd_map_read_cached (struct rspamd_map *map,
+               struct map_periodic_cbdata *periodic, const gchar *host)
+{
+       gsize len;
+       gpointer in;
+
+       in = rspamd_shmem_xmap (map->cache->shmem_name, PROT_READ, &len);
+
+       if (in == NULL) {
+               return FALSE;
+       }
+
+       if (len < map->cache->len) {
+               munmap (in, len);
+               return FALSE;
+       }
+
+       map->read_callback (in, map->cache->len, &periodic->cbdata, TRUE);
+       msg_info_map ("read map data from %s (cached)", host);
+       munmap (in, len);
+
+       return TRUE;
+}
+
 /**
  * Async HTTP callback
  */
@@ -679,7 +746,31 @@ rspamd_map_common_http_callback (struct rspamd_map *map, struct rspamd_map_backe
        struct http_map_data *data;
        struct http_callback_data *cbd;
 
+
        data = bk->data.hd;
+
+       if (g_atomic_int_get (&map->cache->available) == 1) {
+               /* Read cached data */
+               if (check) {
+                       if (data->last_checked < map->cache->last_checked) {
+                               periodic->need_modify = TRUE;
+                               /* Reset the whole chain */
+                               periodic->cur_backend = 0;
+                               rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+                       }
+
+                       return;
+               }
+               else if (rspamd_map_read_cached (map, periodic, data->host)) {
+                       /* Switch to the next backend */
+                       periodic->cur_backend ++;
+                       rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+                       data->last_checked = map->cache->last_checked;
+
+                       return;
+               }
+       }
+
        cbd = g_slice_alloc0 (sizeof (struct http_callback_data));
 
        cbd->ev_base = map->ev_base;
@@ -1171,6 +1262,8 @@ rspamd_map_add (struct rspamd_config *cfg,
        map->id = g_random_int ();
        map->locked =
                rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
+       map->cache =
+                       rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (*map->cache));
        map->backends = g_ptr_array_sized_new (1);
        g_ptr_array_add (map->backends, bk);
        map->name = g_strdup (map_line);
@@ -1217,6 +1310,8 @@ rspamd_map_add_from_ucl (struct rspamd_config *cfg,
        map->id = g_random_int ();
        map->locked =
                        rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
+       map->cache =
+                               rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (*map->cache));
        map->backends = g_ptr_array_new ();
        map->poll_timeout = cfg->map_timeout;
 
index dc5a9de4d98ed316a13356f743ed3a79d8240a85..ca4174821375e4d7f822b847ecd6b6dbc53445be 100644 (file)
@@ -59,6 +59,13 @@ struct rspamd_map_backend {
        ref_entry_t ref;
 };
 
+struct rspamd_map_cachepoint {
+       gint available;
+       gsize len;
+       time_t last_checked;
+       gchar shmem_name[256];
+};
+
 struct rspamd_map {
        struct rspamd_dns_resolver *r;
        struct rspamd_config *cfg;
@@ -75,6 +82,8 @@ struct rspamd_map {
        gdouble poll_timeout;
        /* Shared lock for temporary disabling of map reading (e.g. when this map is written by UI) */
        gint *locked;
+       /* Shared cache data */
+       struct rspamd_map_cachepoint *cache;
        gchar tag[MEMPOOL_UID_LEN];
        rspamd_map_dtor dtor;
        gpointer dtor_data;