summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-06-21 16:15:03 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-06-21 16:15:03 +0100
commit546a54a8275c9e6fd59b1de1c2843bcb0dedf992 (patch)
tree8b6e97d61c6fea6476d8cba9662127d5f573afcd
parent85fa167ae3b03da2ed354f96ac395972c455770c (diff)
downloadrspamd-546a54a8275c9e6fd59b1de1c2843bcb0dedf992.tar.gz
rspamd-546a54a8275c9e6fd59b1de1c2843bcb0dedf992.zip
[Feature] Implement inter-process maps caching
-rw-r--r--src/libutil/map.c97
-rw-r--r--src/libutil/map_private.h9
2 files changed, 105 insertions, 1 deletions
diff --git a/src/libutil/map.c b/src/libutil/map.c
index e38acc18a..17ab1058e 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -59,6 +59,13 @@ static void free_http_cbdata (struct http_callback_data *cbd);
static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
gboolean initial, gboolean errored);
+
+struct rspamd_http_map_cached_cbdata {
+ struct event timeout;
+ struct rspamd_storage_shmem *shm;
+ struct rspamd_map *map;
+};
+
/**
* Write HTTP request
*/
@@ -306,6 +313,17 @@ http_map_error (struct rspamd_http_connection *conn,
MAP_RELEASE (cbd);
}
+static void
+rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+{
+ struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+
+ g_atomic_int_set (&cache_cbd->map->cache->available, 0);
+ REF_RELEASE (cache_cbd->shm);
+ event_del (&cache_cbd->timeout);
+ g_slice_free1 (sizeof (*cache_cbd), cache_cbd);
+}
+
static int
http_map_finish (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
@@ -465,6 +483,30 @@ http_map_finish (struct rspamd_http_connection *conn,
map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
msg_info_map ("read map data from %s", cbd->data->host);
+
+ /*
+ * We know that a map is in the locked state
+ */
+ if (g_atomic_int_compare_and_exchange (&map->cache->available, 0, 1)) {
+ /* Store cached data */
+ struct rspamd_http_map_cached_cbdata *cache_cbd;
+ struct timeval tv;
+
+ rspamd_strlcpy (map->cache->shmem_name, cbd->shmem_data->shm_name,
+ sizeof (map->cache->shmem_name));
+ map->cache->len = cbd->data_len;
+ map->cache->last_checked = cbd->data->last_checked;
+ cache_cbd = g_slice_alloc0 (sizeof (*cache_cbd));
+ cache_cbd->shm = cbd->shmem_data;
+ cache_cbd->map = map;
+ REF_RETAIN (cache_cbd->shm);
+ event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
+ cache_cbd);
+ event_base_set (cbd->ev_base, &cache_cbd->timeout);
+ double_to_tv (map->poll_timeout, &tv);
+ event_add (&cache_cbd->timeout, &tv);
+ }
+
cbd->periodic->cur_backend ++;
munmap (in, dlen);
rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
@@ -570,7 +612,7 @@ static void
rspamd_map_schedule_periodic (struct rspamd_map *map,
gboolean locked, gboolean initial, gboolean errored)
{
- const gdouble error_mult = 20.0, lock_mult = 0.5;
+ const gdouble error_mult = 20.0, lock_mult = 0.1;
gdouble jittered_sec;
gdouble timeout;
struct map_periodic_cbdata *cbd;
@@ -669,6 +711,31 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
MAP_RELEASE (cbd);
}
+static gboolean
+rspamd_map_read_cached (struct rspamd_map *map,
+ struct map_periodic_cbdata *periodic, const gchar *host)
+{
+ gsize len;
+ gpointer in;
+
+ in = rspamd_shmem_xmap (map->cache->shmem_name, PROT_READ, &len);
+
+ if (in == NULL) {
+ return FALSE;
+ }
+
+ if (len < map->cache->len) {
+ munmap (in, len);
+ return FALSE;
+ }
+
+ map->read_callback (in, map->cache->len, &periodic->cbdata, TRUE);
+ msg_info_map ("read map data from %s (cached)", host);
+ munmap (in, len);
+
+ return TRUE;
+}
+
/**
* Async HTTP callback
*/
@@ -679,7 +746,31 @@ rspamd_map_common_http_callback (struct rspamd_map *map, struct rspamd_map_backe
struct http_map_data *data;
struct http_callback_data *cbd;
+
data = bk->data.hd;
+
+ if (g_atomic_int_get (&map->cache->available) == 1) {
+ /* Read cached data */
+ if (check) {
+ if (data->last_checked < map->cache->last_checked) {
+ periodic->need_modify = TRUE;
+ /* Reset the whole chain */
+ periodic->cur_backend = 0;
+ rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ }
+
+ return;
+ }
+ else if (rspamd_map_read_cached (map, periodic, data->host)) {
+ /* Switch to the next backend */
+ periodic->cur_backend ++;
+ rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ data->last_checked = map->cache->last_checked;
+
+ return;
+ }
+ }
+
cbd = g_slice_alloc0 (sizeof (struct http_callback_data));
cbd->ev_base = map->ev_base;
@@ -1171,6 +1262,8 @@ rspamd_map_add (struct rspamd_config *cfg,
map->id = g_random_int ();
map->locked =
rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
+ map->cache =
+ rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (*map->cache));
map->backends = g_ptr_array_sized_new (1);
g_ptr_array_add (map->backends, bk);
map->name = g_strdup (map_line);
@@ -1217,6 +1310,8 @@ rspamd_map_add_from_ucl (struct rspamd_config *cfg,
map->id = g_random_int ();
map->locked =
rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
+ map->cache =
+ rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (*map->cache));
map->backends = g_ptr_array_new ();
map->poll_timeout = cfg->map_timeout;
diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h
index dc5a9de4d..ca4174821 100644
--- a/src/libutil/map_private.h
+++ b/src/libutil/map_private.h
@@ -59,6 +59,13 @@ struct rspamd_map_backend {
ref_entry_t ref;
};
+struct rspamd_map_cachepoint {
+ gint available;
+ gsize len;
+ time_t last_checked;
+ gchar shmem_name[256];
+};
+
struct rspamd_map {
struct rspamd_dns_resolver *r;
struct rspamd_config *cfg;
@@ -75,6 +82,8 @@ struct rspamd_map {
gdouble poll_timeout;
/* Shared lock for temporary disabling of map reading (e.g. when this map is written by UI) */
gint *locked;
+ /* Shared cache data */
+ struct rspamd_map_cachepoint *cache;
gchar tag[MEMPOOL_UID_LEN];
rspamd_map_dtor dtor;
gpointer dtor_data;