Browse Source

[Project] Start maps rework

tags/2.0
Vsevolod Stakhov 5 years ago
parent
commit
423edefbda
2 changed files with 66 additions and 75 deletions
  1. 58
    67
      src/libutil/map.c
  2. 8
    8
      src/libutil/map_private.h

+ 58
- 67
src/libutil/map.c View File

@@ -44,7 +44,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd,
gboolean plan_new);
static void free_http_cbdata_dtor (gpointer p);
static void free_http_cbdata (struct http_callback_data *cbd);
static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
gboolean initial, gboolean errored);
static gboolean read_map_file_chunks (struct rspamd_map *map,
@@ -130,7 +130,7 @@ write_http_request (struct http_callback_data *cbd)
cbd->data->host,
NULL,
cbd,
&cbd->tv);
cbd->timeout);
}

static gboolean
@@ -325,21 +325,23 @@ http_map_error (struct rspamd_http_connection *conn,
cbd->bk->uri,
cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
err);
rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
}

static void
rspamd_map_cache_cb (gint fd, short what, gpointer ud)
rspamd_map_cache_cb (struct ev_loop *loop, ev_periodic *w, int revents)
{
struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
w->data;
struct rspamd_map *map;
struct http_map_data *data;
struct timeval tv;

map = cache_cbd->map;
data = cache_cbd->data;

ev_periodic_stop (loop, &cache_cbd->timeout);

if (cache_cbd->gen != cache_cbd->data->gen) {
/* We have another update, so this cache element is obviously expired */
/*
@@ -349,7 +351,6 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
cache_cbd->gen, cache_cbd->data->gen, map->name);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
event_del (&cache_cbd->timeout);
g_free (cache_cbd);
}
else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
@@ -359,15 +360,13 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
*/
cache_cbd->last_checked = cache_cbd->data->last_checked;
msg_debug_map ("cached data is up to date for %s", map->name);
double_to_tv (map->poll_timeout * 2, &tv);
event_add (&cache_cbd->timeout, &tv);
ev_periodic_again (loop, &cache_cbd->timeout);
}
else {
data->cur_cache_cbd = NULL;
g_atomic_int_set (&data->cache->available, 0);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
msg_info_map ("cached data is now expired for %s", map->name);
event_del (&cache_cbd->timeout);
g_free (cache_cbd);
}
}
@@ -456,7 +455,7 @@ http_map_finish (struct rspamd_http_connection *conn,
g_atomic_int_set (&data->cache->available, 0);
data->cur_cache_cbd = NULL;

rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");

return 0;
@@ -622,6 +621,8 @@ read_data:
}

/* Check for expires */
double cached_timeout = map->poll_timeout * 2;

expires_hdr = rspamd_http_message_find_header (msg, "Expires");

if (expires_hdr) {
@@ -635,19 +636,12 @@ read_data:
hdate = MIN (map->next_check, hdate);
}

double cached_timeout = map->next_check - msg->date +
map->poll_timeout * 2;
cached_timeout = map->next_check - msg->date +
map->poll_timeout * 2;

map->next_check = hdate;
double_to_tv (cached_timeout, &tv);
}
else {
double_to_tv (map->poll_timeout * 2, &tv);
}
}
else {
double_to_tv (map->poll_timeout * 2, &tv);
}

/* Check for etag */
etag_hdr = rspamd_http_message_find_header (msg, "ETag");
@@ -688,10 +682,9 @@ read_data:
cache_cbd->gen = cbd->data->gen;
MAP_RETAIN (cache_cbd->shm, "shmem_data");

event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
cache_cbd);
event_base_set (cbd->ev_base, &cache_cbd->timeout);
event_add (&cache_cbd->timeout, &tv);
ev_periodic_set (&cache_cbd->timeout, 0.0, cached_timeout, NULL);
ev_periodic_start (cbd->event_loop, &cache_cbd->timeout);
cache_cbd->timeout.data = cache_cbd;
data->cur_cache_cbd = cache_cbd;

if (map->next_check) {
@@ -700,7 +693,7 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
time (NULL) + map->poll_timeout);
ev_now (cbd->event_loop) + map->poll_timeout);
}


@@ -773,7 +766,7 @@ read_data:

cbd->periodic->cur_backend ++;
munmap (in, dlen);
rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
rspamd_map_process_periodic (cbd->periodic);
}
else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) {
cbd->data->last_checked = msg->date;
@@ -819,13 +812,13 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
time (NULL) + map->poll_timeout);
ev_now (cbd->event_loop) + map->poll_timeout);
}
msg_info_map ("data is not modified for server %s, next check at %s",
cbd->data->host, next_check_date);

cbd->periodic->cur_backend ++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
rspamd_map_process_periodic (cbd->periodic);
}
else {
msg_info_map ("cannot load map %s from %s: HTTP error %d",
@@ -838,7 +831,7 @@ read_data:

err:
cbd->periodic->errored = 1;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");

return 0;
@@ -951,6 +944,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
}
}

ev_stat_stat (map->event_loop, &data->st_ev);
len = st.st_size;

if (bk->is_signed) {
@@ -1045,9 +1039,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
}

/* Also update at the read time */
memcpy (&data->st, &st, sizeof (struct stat));

return TRUE;
}

@@ -1143,7 +1134,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)

map = periodic->map;
msg_debug_map ("periodic dtor %p", periodic);
event_del (&periodic->ev);

if (periodic->need_modify) {
/* We are done */
@@ -1162,6 +1152,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
g_free (periodic);
}

/* Called on timer execution */
static void
rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
{
struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;

ev_timer_stop (loop, w);
rspamd_map_process_periodic (cbd);
}

static void
rspamd_map_schedule_periodic (struct rspamd_map *map,
gboolean locked, gboolean initial, gboolean errored)
@@ -1224,14 +1224,11 @@ rspamd_map_schedule_periodic (struct rspamd_map *map,
map->scheduled_check = TRUE;
REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);

evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd);
event_base_set (map->ev_base, &cbd->ev);

ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
ev_timer_start (map->event_loop, &cbd->ev);

msg_debug_map ("schedule new periodic event %p in %.2f seconds",
cbd, jittered_sec);
double_to_tv (jittered_sec, &map->tv);
evtimer_add (&cbd->ev, &map->tv);
}

static void
@@ -1286,7 +1283,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
msg_err_map ("cannot resolve %s: %s", cbd->data->host,
rdns_strerror (reply->code));
cbd->periodic->errored = 1;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
rspamd_map_process_periodic (cbd->periodic);
}
}

@@ -1567,7 +1564,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
periodic->need_modify = TRUE;
/* Reset the whole chain */
periodic->cur_backend = 0;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);
}
else {
if (map->active_http) {
@@ -1577,7 +1574,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else {
/* Switch to the next backend */
periodic->cur_backend++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);
}
}

@@ -1592,7 +1589,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
/* Switch to the next backend */
periodic->cur_backend++;
data->last_modified = data->cache->last_modified;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);

return;
}
@@ -1601,7 +1598,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else if (!map->active_http) {
/* Switch to the next backend */
periodic->cur_backend ++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);

return;
}
@@ -1609,7 +1606,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
check:
cbd = g_malloc0 (sizeof (struct http_callback_data));

cbd->ev_base = map->ev_base;
cbd->event_loop = map->event_loop;
cbd->map = map;
cbd->data = data;
cbd->check = check;
@@ -1618,7 +1615,6 @@ check:
cbd->bk = bk;
MAP_RETAIN (bk, "rspamd_map_backend");
cbd->stage = map_resolve_host2;
double_to_tv (map->cfg->map_timeout, &cbd->tv);
REF_INIT_RETAIN (cbd, free_http_cbdata);

msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
@@ -1686,7 +1682,7 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud)
}

static void
rspamd_map_http_read_callback (gint fd, short what, void *ud)
rspamd_map_http_read_callback (void *ud)
{
struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
@@ -1698,36 +1694,31 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud)
}

static void
rspamd_map_file_check_callback (gint fd, short what, void *ud)
rspamd_map_file_check_callback (void *ud)
{
struct rspamd_map *map;
struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
struct stat st;

map = periodic->map;

bk = g_ptr_array_index (map->backends, periodic->cur_backend);
data = bk->data.fd;

if (stat (data->filename, &st) != -1 &&
(st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
/* File was modified since last check */
msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
data->st.st_mtime, st.st_mtime, data->filename);
memcpy (&data->st, &st, sizeof (struct stat));
if (!data->processed) {
/* File has never been read */
periodic->need_modify = TRUE;
periodic->cur_backend = 0;

rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);

return;
}

/* Switch to the next backend */
periodic->cur_backend ++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);
}

static void
@@ -1746,21 +1737,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud)
periodic->need_modify = TRUE;
periodic->cur_backend = 0;

rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);

return;
}

/* Switch to the next backend */
periodic->cur_backend ++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);
}

static void
rspamd_map_file_read_callback (gint fd, short what, void *ud)
rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;

@@ -1774,17 +1764,19 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud)
if (!read_map_file (map, data, bk, periodic)) {
periodic->errored = TRUE;
}
else {
data->processed = TRUE;
}

/* Switch to the next backend */
periodic->cur_backend ++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);
}

static void
rspamd_map_static_read_callback (gint fd, short what, void *ud)
rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
struct map_periodic_cbdata *periodic = ud;
struct static_map_data *data;
struct rspamd_map_backend *bk;

@@ -1801,14 +1793,13 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud)

/* Switch to the next backend */
periodic->cur_backend ++;
rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
rspamd_map_process_periodic (periodic);
}

static void
rspamd_map_periodic_callback (gint fd, short what, void *ud)
rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
{
struct rspamd_map_backend *bk;
struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;

map = cbd->map;
@@ -1904,7 +1895,7 @@ rspamd_map_watch (struct rspamd_config *cfg,
/* First of all do synced read of data */
while (cur) {
map = cur->data;
map->ev_base = ev_base;
map->event_loop = ev_base;
map->r = resolver;
map->wrk = worker;


+ 8
- 8
src/libutil/map_private.h View File

@@ -54,7 +54,8 @@ enum fetch_proto {
*/
struct file_map_data {
gchar *filename;
struct stat st;
gboolean processed;
ev_stat st_ev;
};


@@ -130,7 +131,7 @@ struct rspamd_map {
map_fin_cb_t fin_callback;
map_dtor_t dtor;
void **user_data;
struct ev_loop *ev_base;
struct ev_loop *event_loop;
struct rspamd_worker *wrk;
gchar *description;
gchar *name;
@@ -143,7 +144,7 @@ struct rspamd_map {
gsize nelts;
guint64 digest;
/* Should we check HTTP or just load cached data */
struct timeval tv;
ev_tstamp timeout;
gdouble poll_timeout;
time_t next_check;
gboolean active_http;
@@ -164,7 +165,7 @@ enum rspamd_map_http_stage {
struct map_periodic_cbdata {
struct rspamd_map *map;
struct map_cb_data cbdata;
struct event ev;
ev_timer ev;
gboolean need_modify;
gboolean errored;
gboolean locked;
@@ -183,7 +184,7 @@ struct rspamd_http_file_data {
};

struct http_callback_data {
struct ev_loop *ev_base;
struct ev_loop *event_loop;
struct rspamd_http_connection *conn;
rspamd_inet_addr_t *addr;
struct rspamd_map *map;
@@ -191,16 +192,15 @@ struct http_callback_data {
struct http_map_data *data;
struct map_periodic_cbdata *periodic;
struct rspamd_cryptobox_pubkey *pk;
gboolean check;
struct rspamd_storage_shmem *shmem_data;
struct rspamd_storage_shmem *shmem_sig;
struct rspamd_storage_shmem *shmem_pubkey;
gsize data_len;
gsize sig_len;
gsize pubkey_len;
gboolean check;
enum rspamd_map_http_stage stage;
struct timeval tv;
ev_tstamp timeout;

ref_entry_t ref;
};

Loading…
Cancel
Save