Browse Source

Start the complete HTTP maps rework

tags/1.2.0
Vsevolod Stakhov 8 years ago
parent
commit
20efe9b218
3 changed files with 141 additions and 128 deletions
  1. 125
    126
      src/libutil/map.c
  2. 4
    1
      src/libutil/map.h
  3. 12
    1
      src/libutil/map_private.h

+ 125
- 126
src/libutil/map.c View File

@@ -23,33 +23,10 @@
#include "rspamd.h"
#include "cryptobox.h"
#include "unix-std.h"
#include "http_parser.h"

static const gchar *hash_fill = "1";

/**
* Helper for HTTP connection establishment
*/
static gint
connect_http (struct rspamd_map *map,
struct http_map_data *data,
gboolean is_async)
{
gint sock;
rspamd_mempool_t *pool;

pool = map->pool;

if ((sock = rspamd_socket_tcp (data->addr, FALSE, is_async)) == -1) {
msg_info_pool ("cannot connect to http server %s: %d, %s",
data->host,
errno,
strerror (errno));
return -1;
}

return sock;
}

/**
* Write HTTP request
*/
@@ -70,7 +47,7 @@ write_http_request (struct http_callback_data *cbd)
rspamd_http_message_add_header (msg, "If-Modified-Since", datebuf);
}

rspamd_http_connection_write_message (cbd->data->conn, msg, cbd->data->host,
rspamd_http_connection_write_message (cbd->conn, msg, cbd->data->host,
NULL, cbd, cbd->fd, &cbd->tv, cbd->ev_base);
}

@@ -80,12 +57,11 @@ write_http_request (struct http_callback_data *cbd)
static void
free_http_cbdata (struct http_callback_data *cbd)
{
g_atomic_int_set (cbd->map->locked, 0);
if (cbd->remain_buf) {
g_string_free (cbd->remain_buf, TRUE);
}

rspamd_http_connection_reset (cbd->data->conn);
rspamd_http_connection_free (cbd->conn);
close (cbd->fd);
g_slice_free1 (sizeof (struct http_callback_data), cbd);
}
@@ -380,6 +356,58 @@ file_callback (gint fd, short what, void *ud)
g_atomic_int_set (map->locked, 0);
}


static void
rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
{
struct http_callback_data *cbd = arg;
rspamd_mempool_t *pool;

if (cbd->stage >= map_load_file) {
/* No need in further corrections */
return;
}

pool = cbd->map->pool;

if (reply->code == RDNS_RC_NOERROR) {
/*
* We just get the first address hoping that a resolver performs
* round-robin rotation well
*/
cbd->addr = rspamd_inet_address_from_rnds (reply->entries);


if (cbd->addr != NULL) {
rspamd_inet_address_set_port (cbd->addr, cbd->data->port);
/* Try to open a socket */
cbd->fd = rspamd_inet_address_connect (cbd->addr, SOCK_STREAM, TRUE);

if (cbd->fd != -1) {
cbd->stage = map_load_file;
cbd->conn = rspamd_http_connection_new (http_map_read,
http_map_error, http_map_finish,
RSPAMD_HTTP_BODY_PARTIAL|RSPAMD_HTTP_CLIENT_SIMPLE,
RSPAMD_HTTP_CLIENT, NULL);

write_http_request (cbd);
}
}
}

if (cbd->stage < map_load_file) {
if (cbd->stage == map_resolve_host2) {
/* We have still one request pending */
cbd->stage = map_resolve_host1;
}
else {
/* We could not resolve host, so cowardly fail here */
msg_err_pool ("cannot resolve %s", cbd->data->host);
free_http_cbdata (cbd);
}
}
}

/**
* Async HTTP callback
*/
@@ -388,55 +416,54 @@ http_callback (gint fd, short what, void *ud)
{
struct rspamd_map *map = ud;
struct http_map_data *data;
gint sock;
struct http_callback_data *cbd;
rspamd_mempool_t *pool;
gchar tmpbuf[PATH_MAX];

data = map->map_data;
pool = map->pool;

if (g_atomic_int_get (map->locked)) {
msg_info_pool (
"don't try to reread map as it is locked by other process, will reread it later");
if (data->conn->ud == NULL) {
jitter_timeout_event (map, TRUE, TRUE);
}
else {
jitter_timeout_event (map, TRUE, FALSE);
}
return;
}

g_atomic_int_inc (map->locked);
jitter_timeout_event (map, FALSE, FALSE);
/* Connect asynced */
if ((sock = connect_http (map, data, TRUE)) == -1) {
g_atomic_int_set (map->locked, 0);
/* Plan event */
cbd = g_slice_alloc (sizeof (struct http_callback_data));

rspamd_snprintf (tmpbuf, sizeof (tmpbuf),
"%s" G_DIR_SEPARATOR_S "rspamd_map%d-XXXXXX",
map->cfg->temp_dir, map->id);
cbd->out_fd = mkstemp (tmpbuf);

if (cbd->out_fd == -1) {
msg_err_pool ("cannot create tempfile: %s", strerror (errno));
return;
}
else {
/* Plan event */
cbd = g_slice_alloc (sizeof (struct http_callback_data));
cbd->ev_base = map->ev_base;
cbd->map = map;
cbd->data = data;
cbd->remain_buf = NULL;
cbd->cbdata.state = 0;
cbd->cbdata.prev_data = *cbd->map->user_data;
cbd->cbdata.cur_data = NULL;
cbd->cbdata.map = cbd->map;
cbd->tv.tv_sec = 5;
cbd->tv.tv_usec = 0;
cbd->fd = sock;
data->conn->ud = cbd;
msg_debug_pool ("reading map data from %s", data->host);
write_http_request (cbd);
}

cbd->tmpfile = g_strdup (tmpbuf);
cbd->ev_base = map->ev_base;
cbd->map = map;
cbd->data = data;
cbd->remain_buf = NULL;
cbd->cbdata.state = 0;
cbd->cbdata.prev_data = *cbd->map->user_data;
cbd->cbdata.cur_data = NULL;
cbd->cbdata.map = cbd->map;
cbd->stage = map_resolve_host2;
double_to_tv (map->cfg->map_timeout, &cbd->tv);

msg_debug_pool ("reading map data from %s", data->host);
/* Send both A and AAAA requests */
rdns_make_request_full (map->r->r, rspamd_map_dns_callback, cbd,
map->cfg->dns_timeout, map->cfg->dns_retransmits, 1,
RDNS_REQUEST_A, data->host);
rdns_make_request_full (map->r->r, rspamd_map_dns_callback, cbd,
map->cfg->dns_timeout, map->cfg->dns_retransmits, 1,
RDNS_REQUEST_AAAA, data->host);
}

/* Start watching event for all maps */
void
rspamd_map_watch (struct rspamd_config *cfg, struct event_base *ev_base)
rspamd_map_watch (struct rspamd_config *cfg,
struct rspamd_dns_resolver *resolver,
struct event_base *ev_base)
{
GList *cur = cfg->maps;
struct rspamd_map *map;
@@ -446,7 +473,9 @@ rspamd_map_watch (struct rspamd_config *cfg, struct event_base *ev_base)
while (cur) {
map = cur->data;
map->ev_base = ev_base;
map->r = resolver;
event_base_set (map->ev_base, &map->ev);

if (map->protocol == MAP_PROTO_FILE) {
evtimer_set (&map->ev, file_callback, map);
/* Read initial data */
@@ -462,6 +491,7 @@ rspamd_map_watch (struct rspamd_config *cfg, struct event_base *ev_base)
evtimer_set (&map->ev, http_callback, map);
jitter_timeout_event (map, FALSE, TRUE);
}

cur = g_list_next (cur);
}
}
@@ -580,13 +610,13 @@ rspamd_map_add (struct rspamd_config *cfg,
void **user_data)
{
struct rspamd_map *new_map;
const gchar *def, *p, *hostend;
const gchar *def;
struct file_map_data *fdata;
struct http_map_data *hdata;
gchar portbuf[6], *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES];
gint i, s, r;
struct addrinfo hints, *res;
gchar *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES];
rspamd_mempool_t *pool;
struct http_parser_url up;
rspamd_ftok_t tok;

if (cfg->map_pool == NULL) {
cfg->map_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
@@ -644,71 +674,40 @@ rspamd_map_add (struct rspamd_config *cfg,
hdata =
rspamd_mempool_alloc0 (cfg->map_pool,
sizeof (struct http_map_data));
/* Try to search port */
if ((p = strchr (def, ':')) != NULL) {
hostend = p;
i = 0;
p++;
while (g_ascii_isdigit (*p) && i < (gint)sizeof (portbuf) - 1) {
portbuf[i++] = *p++;
}
if (*p != '/') {
msg_info_config ("bad http map definition: %s", def);
return FALSE;
}
portbuf[i] = '\0';
hdata->port = atoi (portbuf);

memset (&up, 0, sizeof (up));
if (http_parser_parse_url (new_map->uri, strlen (new_map->uri), TRUE,
&up) != 0) {
msg_err_config ("cannot parse HTTP url: %s", new_map->uri);
return FALSE;
}
else {
/* Default http port */
rspamd_snprintf (portbuf, sizeof (portbuf), "80");
hdata->port = 80;
/* Now separate host from path */
if ((p = strchr (def, '/')) == NULL) {
msg_info_config ("bad http map definition: %s", def);
if (!(up.field_set & 1 << UF_HOST)) {
msg_err_config ("cannot parse HTTP url: %s: no host", new_map->uri);
return FALSE;
}
hostend = p;
}
hdata->host = rspamd_mempool_alloc (cfg->map_pool, hostend - def + 1);
rspamd_strlcpy (hdata->host, def, hostend - def + 1);
hdata->path = rspamd_mempool_strdup (cfg->map_pool, p);
/* Now try to resolve */
memset (&hints, 0, sizeof (hints));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Stream socket */
hints.ai_flags = 0;
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;

if ((r = getaddrinfo (hdata->host, portbuf, &hints, &res)) == 0) {
hdata->addr = res;
rspamd_mempool_add_destructor (cfg->cfg_pool,
(rspamd_mempool_destruct_t)freeaddrinfo, hdata->addr);
}
else {
msg_err_config ("address resolution for %s failed: %s",
hdata->host,
gai_strerror (r));
return FALSE;
}
/* Now try to connect */
if ((s = rspamd_socket_tcp (hdata->addr, FALSE, FALSE)) == -1) {
msg_info_config ("cannot connect to http server %s: %d, %s",
hdata->host,
errno,
strerror (errno));
return FALSE;

tok.begin = new_map->uri + up.field_data[UF_HOST].off;
tok.len = up.field_data[UF_HOST].len;
hdata->host = rspamd_mempool_ftokdup (cfg->map_pool, &tok);

if (up.field_set & 1 << UF_PORT) {
hdata->port = up.port;
}
else {
hdata->port = 80;
}

if (up.field_set & 1 << UF_PATH) {
tok.begin = new_map->uri + up.field_data[UF_PATH].off;
tok.len = strlen (tok.begin);

hdata->path = rspamd_mempool_ftokdup (cfg->map_pool, &tok);
}
}
close (s);
hdata->conn = rspamd_http_connection_new (http_map_read, http_map_error,
http_map_finish,
RSPAMD_HTTP_BODY_PARTIAL | RSPAMD_HTTP_CLIENT_SIMPLE,
RSPAMD_HTTP_CLIENT, NULL);
new_map->map_data = hdata;

}

/* Temp pool */
rspamd_cryptobox_hash (cksum, new_map->uri, strlen (new_map->uri), NULL, 0);
cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum));

+ 4
- 1
src/libutil/map.h View File

@@ -6,6 +6,7 @@

#include "mem_pool.h"
#include "radix.h"
#include "dns.h"

/**
* Maps API is designed to load lists data from different dynamic sources.
@@ -57,7 +58,9 @@ gboolean rspamd_map_add (struct rspamd_config *cfg,
/**
* Start watching of maps by adding events to libevent event loop
*/
void rspamd_map_watch (struct rspamd_config *cfg, struct event_base *ev_base);
void rspamd_map_watch (struct rspamd_config *cfg,
struct rspamd_dns_resolver *resolver,
struct event_base *ev_base);

/**
* Remove all maps watched (remove events)

+ 12
- 1
src/libutil/map_private.h View File

@@ -27,6 +27,7 @@ enum fetch_proto {
};
struct rspamd_map {
rspamd_mempool_t *pool;
struct rspamd_dns_resolver *r;
gboolean is_signed;
struct rspamd_cryptobox_pubkey *trusted_pubkey;
struct rspamd_config *cfg;
@@ -64,17 +65,27 @@ struct http_map_data {
gchar *host;
time_t last_checked;
gboolean request_sent;
struct rspamd_http_connection *conn;
};


struct http_callback_data {
struct event_base *ev_base;
struct rspamd_http_connection *conn;
rspamd_inet_addr_t *addr;
struct timeval tv;
struct rspamd_map *map;
struct http_map_data *data;
struct map_cb_data cbdata;
GString *remain_buf;
enum {
map_resolve_host2 = 0, /* 2 requests sent */
map_resolve_host1, /* 1 requests sent */
map_load_file,
map_load_pubkey,
map_load_signature
} stage;
gint out_fd;
gchar *tmpfile;
gint fd;
};


Loading…
Cancel
Save