summaryrefslogtreecommitdiffstats
path: root/src/libutil/map.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-16 21:09:59 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commit423edefbda01bb9a44afd69e78d591080d571f58 (patch)
tree8d3d5f36bf7cf20e5a87bfde891aa61466668986 /src/libutil/map.c
parente463ad556cb35ee39b92dbf7d3934d4187ab70d2 (diff)
downloadrspamd-423edefbda01bb9a44afd69e78d591080d571f58.tar.gz
rspamd-423edefbda01bb9a44afd69e78d591080d571f58.zip
[Project] Start maps rework
Diffstat (limited to 'src/libutil/map.c')
-rw-r--r--src/libutil/map.c125
1 files changed, 58 insertions, 67 deletions
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 82c668952..17da0062a 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -44,7 +44,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd,
gboolean plan_new);
static void free_http_cbdata_dtor (gpointer p);
static void free_http_cbdata (struct http_callback_data *cbd);
-static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
+static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
gboolean initial, gboolean errored);
static gboolean read_map_file_chunks (struct rspamd_map *map,
@@ -130,7 +130,7 @@ write_http_request (struct http_callback_data *cbd)
cbd->data->host,
NULL,
cbd,
- &cbd->tv);
+ cbd->timeout);
}
static gboolean
@@ -325,21 +325,23 @@ http_map_error (struct rspamd_http_connection *conn,
cbd->bk->uri,
cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
err);
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
}
static void
-rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+rspamd_map_cache_cb (struct ev_loop *loop, ev_periodic *w, int revents)
{
- struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+ struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
+ w->data;
struct rspamd_map *map;
struct http_map_data *data;
- struct timeval tv;
map = cache_cbd->map;
data = cache_cbd->data;
+ ev_periodic_stop (loop, &cache_cbd->timeout);
+
if (cache_cbd->gen != cache_cbd->data->gen) {
/* We have another update, so this cache element is obviously expired */
/*
@@ -349,7 +351,6 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
cache_cbd->gen, cache_cbd->data->gen, map->name);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
- event_del (&cache_cbd->timeout);
g_free (cache_cbd);
}
else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
@@ -359,15 +360,13 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
*/
cache_cbd->last_checked = cache_cbd->data->last_checked;
msg_debug_map ("cached data is up to date for %s", map->name);
- double_to_tv (map->poll_timeout * 2, &tv);
- event_add (&cache_cbd->timeout, &tv);
+ ev_periodic_again (loop, &cache_cbd->timeout);
}
else {
data->cur_cache_cbd = NULL;
g_atomic_int_set (&data->cache->available, 0);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
msg_info_map ("cached data is now expired for %s", map->name);
- event_del (&cache_cbd->timeout);
g_free (cache_cbd);
}
}
@@ -456,7 +455,7 @@ http_map_finish (struct rspamd_http_connection *conn,
g_atomic_int_set (&data->cache->available, 0);
data->cur_cache_cbd = NULL;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
return 0;
@@ -622,6 +621,8 @@ read_data:
}
/* Check for expires */
+ double cached_timeout = map->poll_timeout * 2;
+
expires_hdr = rspamd_http_message_find_header (msg, "Expires");
if (expires_hdr) {
@@ -635,19 +636,12 @@ read_data:
hdate = MIN (map->next_check, hdate);
}
- double cached_timeout = map->next_check - msg->date +
- map->poll_timeout * 2;
+ cached_timeout = map->next_check - msg->date +
+ map->poll_timeout * 2;
map->next_check = hdate;
- double_to_tv (cached_timeout, &tv);
- }
- else {
- double_to_tv (map->poll_timeout * 2, &tv);
}
}
- else {
- double_to_tv (map->poll_timeout * 2, &tv);
- }
/* Check for etag */
etag_hdr = rspamd_http_message_find_header (msg, "ETag");
@@ -688,10 +682,9 @@ read_data:
cache_cbd->gen = cbd->data->gen;
MAP_RETAIN (cache_cbd->shm, "shmem_data");
- event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
- cache_cbd);
- event_base_set (cbd->ev_base, &cache_cbd->timeout);
- event_add (&cache_cbd->timeout, &tv);
+ ev_periodic_set (&cache_cbd->timeout, 0.0, cached_timeout, NULL);
+ ev_periodic_start (cbd->event_loop, &cache_cbd->timeout);
+ cache_cbd->timeout.data = cache_cbd;
data->cur_cache_cbd = cache_cbd;
if (map->next_check) {
@@ -700,7 +693,7 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
- time (NULL) + map->poll_timeout);
+ ev_now (cbd->event_loop) + map->poll_timeout);
}
@@ -773,7 +766,7 @@ read_data:
cbd->periodic->cur_backend ++;
munmap (in, dlen);
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) {
cbd->data->last_checked = msg->date;
@@ -819,13 +812,13 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
- time (NULL) + map->poll_timeout);
+ ev_now (cbd->event_loop) + map->poll_timeout);
}
msg_info_map ("data is not modified for server %s, next check at %s",
cbd->data->host, next_check_date);
cbd->periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
else {
msg_info_map ("cannot load map %s from %s: HTTP error %d",
@@ -838,7 +831,7 @@ read_data:
err:
cbd->periodic->errored = 1;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
return 0;
@@ -951,6 +944,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
}
}
+ ev_stat_stat (map->event_loop, &data->st_ev);
len = st.st_size;
if (bk->is_signed) {
@@ -1045,9 +1039,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
}
- /* Also update at the read time */
- memcpy (&data->st, &st, sizeof (struct stat));
-
return TRUE;
}
@@ -1143,7 +1134,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
map = periodic->map;
msg_debug_map ("periodic dtor %p", periodic);
- event_del (&periodic->ev);
if (periodic->need_modify) {
/* We are done */
@@ -1162,6 +1152,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
g_free (periodic);
}
+/* Called on timer execution */
+static void
+rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;
+
+ ev_timer_stop (loop, w);
+ rspamd_map_process_periodic (cbd);
+}
+
static void
rspamd_map_schedule_periodic (struct rspamd_map *map,
gboolean locked, gboolean initial, gboolean errored)
@@ -1224,14 +1224,11 @@ rspamd_map_schedule_periodic (struct rspamd_map *map,
map->scheduled_check = TRUE;
REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);
- evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd);
- event_base_set (map->ev_base, &cbd->ev);
-
+ ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
+ ev_timer_start (map->event_loop, &cbd->ev);
msg_debug_map ("schedule new periodic event %p in %.2f seconds",
cbd, jittered_sec);
- double_to_tv (jittered_sec, &map->tv);
- evtimer_add (&cbd->ev, &map->tv);
}
static void
@@ -1286,7 +1283,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
msg_err_map ("cannot resolve %s: %s", cbd->data->host,
rdns_strerror (reply->code));
cbd->periodic->errored = 1;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
}
@@ -1567,7 +1564,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
periodic->need_modify = TRUE;
/* Reset the whole chain */
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
else {
if (map->active_http) {
@@ -1577,7 +1574,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else {
/* Switch to the next backend */
periodic->cur_backend++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
}
@@ -1592,7 +1589,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
/* Switch to the next backend */
periodic->cur_backend++;
data->last_modified = data->cache->last_modified;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
@@ -1601,7 +1598,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else if (!map->active_http) {
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
@@ -1609,7 +1606,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
check:
cbd = g_malloc0 (sizeof (struct http_callback_data));
- cbd->ev_base = map->ev_base;
+ cbd->event_loop = map->event_loop;
cbd->map = map;
cbd->data = data;
cbd->check = check;
@@ -1618,7 +1615,6 @@ check:
cbd->bk = bk;
MAP_RETAIN (bk, "rspamd_map_backend");
cbd->stage = map_resolve_host2;
- double_to_tv (map->cfg->map_timeout, &cbd->tv);
REF_INIT_RETAIN (cbd, free_http_cbdata);
msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
@@ -1686,7 +1682,7 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud)
}
static void
-rspamd_map_http_read_callback (gint fd, short what, void *ud)
+rspamd_map_http_read_callback (void *ud)
{
struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
@@ -1698,36 +1694,31 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud)
}
static void
-rspamd_map_file_check_callback (gint fd, short what, void *ud)
+rspamd_map_file_check_callback (void *ud)
{
struct rspamd_map *map;
struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
- struct stat st;
map = periodic->map;
bk = g_ptr_array_index (map->backends, periodic->cur_backend);
data = bk->data.fd;
- if (stat (data->filename, &st) != -1 &&
- (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
- /* File was modified since last check */
- msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
- data->st.st_mtime, st.st_mtime, data->filename);
- memcpy (&data->st, &st, sizeof (struct stat));
+ if (!data->processed) {
+ /* File has never been read */
periodic->need_modify = TRUE;
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
@@ -1746,21 +1737,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud)
periodic->need_modify = TRUE;
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_file_read_callback (gint fd, short what, void *ud)
+rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
@@ -1774,17 +1764,19 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud)
if (!read_map_file (map, data, bk, periodic)) {
periodic->errored = TRUE;
}
+ else {
+ data->processed = TRUE;
+ }
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_static_read_callback (gint fd, short what, void *ud)
+rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct static_map_data *data;
struct rspamd_map_backend *bk;
@@ -1801,14 +1793,13 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud)
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_periodic_callback (gint fd, short what, void *ud)
+rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
{
struct rspamd_map_backend *bk;
- struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
map = cbd->map;
@@ -1904,7 +1895,7 @@ rspamd_map_watch (struct rspamd_config *cfg,
/* First of all do synced read of data */
while (cur) {
map = cur->data;
- map->ev_base = ev_base;
+ map->event_loop = ev_base;
map->r = resolver;
map->wrk = worker;