aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-21 18:50:45 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-21 18:50:45 +0400
commitecc3b51cfdd7aecdb7a02791424d8e8cfcd22453 (patch)
tree6ae84e113c6c91b0577242aa4c8d4c0bad78214e
parent1094cced952ce0565dde55c75f318124d0f84f3e (diff)
downloadrspamd-ecc3b51cfdd7aecdb7a02791424d8e8cfcd22453.tar.gz
rspamd-ecc3b51cfdd7aecdb7a02791424d8e8cfcd22453.zip
* Add http maps support
-rw-r--r--src/lmtp_proto.c2
-rw-r--r--src/main.c2
-rw-r--r--src/map.c383
-rw-r--r--src/map.h6
-rw-r--r--src/plugins/emails.c2
-rw-r--r--src/plugins/surbl.c2
-rw-r--r--src/util.c6
-rw-r--r--src/util.h2
8 files changed, 376 insertions, 29 deletions
diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c
index 69f343aaf..62967c03b 100644
--- a/src/lmtp_proto.c
+++ b/src/lmtp_proto.c
@@ -426,7 +426,7 @@ lmtp_deliver_mta (struct worker_task *task)
sock = make_unix_socket (task->cfg->deliver_host, un, FALSE);
}
else {
- sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE);
+ sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE, TRUE);
}
if (sock == -1) {
msg_warn ("lmtp_deliver_mta: cannot create socket for %s, %s", task->cfg->deliver_host, strerror (errno));
diff --git a/src/main.c b/src/main.c
index 6052936ed..c9c0fdfa6 100644
--- a/src/main.c
+++ b/src/main.c
@@ -367,7 +367,7 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path)
struct sockaddr_un *un_addr;
/* Create listen socket */
if (family == AF_INET) {
- if ((listen_sock = make_tcp_socket (addr, port, TRUE)) == -1) {
+ if ((listen_sock = make_tcp_socket (addr, port, TRUE, TRUE)) == -1) {
msg_err ("create_listen_socket: cannot create tcp listen socket. %s", strerror (errno));
}
}
diff --git a/src/map.c b/src/map.c
index a1ba5251d..df6ff8960 100644
--- a/src/map.c
+++ b/src/map.c
@@ -36,16 +36,232 @@ static memory_pool_t *map_pool = NULL;
static GList *maps = NULL;
static char *hash_fill = "1";
+/* Http reply */
+struct http_reply {
+ int code;
+ GHashTable *headers;
+ char *cur_header;
+
+ int parser_state;
+};
+
+struct http_callback_data {
+ struct event ev;
+ struct timeval tv;
+ struct rspamd_map *map;
+ struct http_map_data *data;
+ struct http_reply *reply;
+ struct map_cb_data cbdata;
+
+ int state;
+ int fd;
+};
+
/* Value in seconds after whitch we would try to do stat on list file */
#define MON_TIMEOUT 10
+/* HTTP timeouts */
+#define HTTP_CONNECT_TIMEOUT 2
+#define HTTP_READ_TIMEOUT 10
+
+static int
+connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_async)
+{
+ int sock;
+
+ if ((sock = make_tcp_socket (&data->addr, data->port, FALSE, is_async)) == -1) {
+ msg_info ("connect_http: cannot connect to http server %s: %d, %s", data->host, errno, strerror (errno));
+ return -1;
+ }
+
+ return sock;
+}
+
+static void
+write_http_request (struct rspamd_map *map, struct http_map_data *data, int sock)
+{
+ char outbuf[BUFSIZ];
+ int r;
+
+ r = snprintf (outbuf, sizeof (outbuf), "GET %s%s HTTP/1.1" CRLF
+ "Connection: close" CRLF
+ "Host: %s" CRLF, (*data->path == '/') ? "" : "/",
+ data->path, data->host);
+ if (data->last_checked != 0) {
+ r += snprintf (outbuf + r, sizeof (outbuf) - r, "If-Modified-Since: %s" CRLF, asctime (gmtime (&data->last_checked)));
+ }
+
+ r += snprintf (outbuf + r, sizeof (outbuf) - r, CRLF);
+
+ if (write (sock, outbuf, r) == -1) {
+ msg_err ("write_http_request: failed to write request: %d, %s", errno, strerror (errno));
+ }
+}
+
+static u_char *
+parse_http_reply (u_char *chunk, size_t len, struct http_reply *reply)
+{
+ u_char *s, *p, *err_str, *tmp;
+ p = chunk;
+
+ while (p - chunk < len) {
+ switch (reply->parser_state) {
+ /* Search status code */
+ case 0:
+ /* Search for status code */
+ if (*p != ' ') {
+ p ++;
+ }
+ else {
+ /* Try to parse HTTP reply code */
+ reply->code = strtoul (++p, (char **)&err_str, 10);
+ if (*err_str != ' ') {
+ msg_info ("parse_http_reply: error while reading HTTP status code: %s", p);
+ return NULL;
+ }
+ /* Now skip to end of status string */
+ reply->parser_state = 1;
+ continue;
+ }
+ break;
+ /* Skip to end of line */
+ case 1:
+ if (*p == '\n') {
+ /* Switch to read header state */
+ reply->parser_state = 2;
+ }
+ /* Each skipped symbol is proceeded */
+ s = ++p;
+ break;
+ /* Read header value */
+ case 2:
+ if (*p == ':') {
+ reply->cur_header = g_malloc (p - s + 1);
+ g_strlcpy (reply->cur_header, s, p - s + 1);
+ reply->parser_state = 3;
+ }
+ else if (*p == '\r' && *(p + 1) == '\n') {
+ /* Last empty line */
+ reply->parser_state = 5;
+ }
+ p ++;
+ break;
+ /* Skip spaces after header name */
+ case 3:
+ if (*p != ' ') {
+ s = p;
+ reply->parser_state = 4;
+ }
+ else {
+ p ++;
+ }
+ break;
+ /* Read header value */
+ case 4:
+ if (*p == '\r') {
+ if (reply->cur_header != NULL) {
+ tmp = g_malloc (p - s + 1);
+ g_strlcpy (tmp, s, p - s + 1);
+ g_hash_table_insert (reply->headers, reply->cur_header, tmp);
+ reply->cur_header = NULL;
+ }
+ reply->parser_state = 1;
+ }
+ p ++;
+ break;
+ case 5:
+ /* Set pointer to begining of HTTP body */
+ p ++;
+ s = p;
+ reply->parser_state = 6;
+ break;
+ case 6:
+ /* Headers parsed, just return */
+ return p;
+ break;
+ }
+ }
+
+ return s;
+}
+
+static gboolean
+read_http_common (struct rspamd_map *map, struct http_map_data *data, struct http_reply *reply, struct map_cb_data *cbdata, int fd)
+{
+ u_char buf[BUFSIZ], *remain;
+ int rlen;
+ ssize_t r;
+
+ rlen = 0;
+ if ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
+ buf[r ++] = '\0';
+ remain = parse_http_reply (buf, r - 1, reply);
+ if (remain != NULL && remain != buf) {
+ /* copy remaining buffer to start of buffer */
+ rlen = r - (remain - buf);
+ memmove (buf, remain, rlen);
+ }
+ if (reply->parser_state == 6) {
+ if (reply->code != 200 && reply->code != 304) {
+ msg_err ("read_http: got error reply from server %s, %d", data->host, reply->code);
+ return FALSE;
+ }
+ remain = map->read_callback (map->pool, buf, r - 1, cbdata);
+ if (remain != NULL && remain != buf) {
+ /* copy remaining buffer to start of buffer */
+ rlen = r - (remain - buf);
+ memmove (buf, remain, rlen);
+ }
+ }
+ }
+ return FALSE;
+}
+
+static void
+read_http_sync (struct rspamd_map *map, struct http_map_data *data)
+{
+ struct map_cb_data cbdata;
+ int fd;
+ struct http_reply *repl;
+
+ if (map->read_callback == NULL || map->fin_callback == NULL) {
+ msg_err ("read_map_file: bad callback for reading map file");
+ return;
+ }
+
+ /* Connect synced */
+ if ((fd = connect_http (map, data, FALSE)) == -1) {
+ return;
+ }
+ write_http_request (map, data, fd);
+
+ cbdata.state = 0;
+ cbdata.prev_data = *map->user_data;
+ cbdata.cur_data = NULL;
+
+ repl = g_malloc (sizeof (struct http_reply));
+ repl->parser_state = 0;
+ repl->code = 404;
+ repl->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
+
+ while (read_http_common (map, data, repl, &cbdata, fd));
+
+ close (fd);
+
+ map->fin_callback (map->pool, &cbdata);
+ *map->user_data = cbdata.cur_data;
+ data->last_checked = time (NULL);
+
+ g_hash_table_destroy (repl->headers);
+ g_free (repl);
+}
static void
read_map_file (struct rspamd_map *map, struct file_map_data *data)
{
struct map_cb_data cbdata;
- char buf[BUFSIZ];
+ u_char buf[BUFSIZ], *remain;
ssize_t r;
- int fd;
+ int fd, rlen;
if (map->read_callback == NULL || map->fin_callback == NULL) {
msg_err ("read_map_file: bad callback for reading map file");
@@ -60,10 +276,16 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data)
cbdata.state = 0;
cbdata.prev_data = *map->user_data;
cbdata.cur_data = NULL;
-
- while ((r = read (fd, buf, sizeof (buf) - 1)) > 0) {
+
+ rlen = 0;
+ while ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
buf[r ++] = '\0';
- map->read_callback (map->pool, buf, r, &cbdata);
+ remain = map->read_callback (map->pool, buf, r - 1, &cbdata);
+ if (remain != NULL) {
+ /* copy remaining buffer to start of buffer */
+ rlen = r - (remain - buf);
+ memmove (buf, remain, rlen);
+ }
}
close (fd);
@@ -158,7 +380,7 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback
}
}
/* Now try to connect */
- if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE)) == -1) {
+ if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE, FALSE)) == -1) {
msg_info ("add_map: cannot connect to http server %s: %d, %s", hdata->host, errno, strerror (errno));
return FALSE;
}
@@ -175,17 +397,18 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback
typedef void (*insert_func)(gpointer st, gconstpointer key, gpointer value);
-static gboolean
+static u_char*
abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data, insert_func func)
{
- u_char *s, *p, *str;
+ u_char *s, *p, *str, *start;
p = chunk;
+ start = p;
str = g_malloc (len + 1);
s = str;
- while (*p) {
+ while (p - chunk < len) {
switch (data->state) {
/* READ_SYMBOL */
case 0:
@@ -193,8 +416,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
if (s != str) {
*s = '\0';
s = memory_pool_strdup (pool, str);
- func (data->cur_data, s, hash_fill);
+ if (strlen (s) > 0) {
+ func (data->cur_data, s, hash_fill);
+ }
s = str;
+ start = p;
}
data->state = 1;
}
@@ -202,8 +428,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
if (s != str) {
*s = '\0';
s = memory_pool_strdup (pool, str);
- func (data->cur_data, s, hash_fill);
+ if (strlen (s) > 0) {
+ func (data->cur_data, s, hash_fill);
+ }
s = str;
+ start = p;
}
while (*p == '\r' || *p == '\n') {
p ++;
@@ -225,6 +454,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
p ++;
}
s = str;
+ start = p;
data->state = 0;
}
else {
@@ -236,7 +466,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
g_free (str);
- return TRUE;
+ return start;
}
static void
@@ -280,13 +510,13 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
}
}
-void
+u_char *
read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data)
{
if (data->cur_data == NULL) {
data->cur_data = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
}
- (void)abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert);
+ return abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert);
}
void
@@ -297,13 +527,13 @@ fin_host_list (memory_pool_t *pool, struct map_cb_data *data)
}
}
-void
+u_char *
read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data)
{
if (data->cur_data == NULL) {
data->cur_data = radix_tree_create ();
}
- (void)abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper);
+ return abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper);
}
void
@@ -338,6 +568,117 @@ file_callback (int fd, short what, void *ud)
read_map_file (map, data);
}
+static void
+free_http_cbdata (struct http_callback_data *cbd)
+{
+ if (cbd->reply) {
+ g_hash_table_destroy (cbd->reply->headers);
+ g_free (cbd->reply);
+ }
+ event_del (&cbd->ev);
+ close (cbd->fd);
+ g_free (cbd);
+}
+
+static void
+http_async_callback (int fd, short what, void *ud)
+{
+ struct http_callback_data *cbd = ud;
+
+ /* Begin of connection */
+ if (what == EV_WRITE) {
+ if (cbd->state == 0) {
+ /* Can write request */
+ write_http_request (cbd->map, cbd->data, fd);
+ /* Plan reading */
+ event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
+ cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
+ cbd->tv.tv_usec = 0;
+ cbd->state = 1;
+ /* Allocate reply structure */
+ cbd->reply = g_malloc (sizeof (struct http_reply));
+ cbd->reply->parser_state = 0;
+ cbd->reply->code = 404;
+ cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
+ cbd->cbdata.state = 0;
+ cbd->cbdata.prev_data = *cbd->map->user_data;
+ cbd->cbdata.cur_data = NULL;
+
+ event_add (&cbd->ev, &cbd->tv);
+ }
+ else {
+ msg_err ("http_async_callback: bad state when got write readiness");
+ free_http_cbdata (cbd);
+ return;
+ }
+ }
+ else if (what == EV_READ) {
+ if (cbd->state >= 1) {
+ if (!read_http_common (cbd->map, cbd->data, cbd->reply, &cbd->cbdata, cbd->fd)) {
+ /* Handle Not-Modified in a special way */
+ if (cbd->reply->code == 304) {
+ cbd->data->last_checked = time (NULL);
+ msg_info ("http_async_callback: data is not modified for server %s", cbd->data->host);
+ }
+ else if (cbd->cbdata.cur_data != NULL) {
+ cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
+ *cbd->map->user_data = cbd->cbdata.cur_data;
+ cbd->data->last_checked = time (NULL);
+ }
+ if (cbd->state == 1 && cbd->reply->code == 200) {
+ /* Write to log that data is modified */
+ msg_info ("http_async_callback: rereading map data from %s", cbd->data->host);
+ }
+
+ free_http_cbdata (cbd);
+ return;
+ }
+ else if (cbd->state == 1) {
+ /* Write to log that data is modified */
+ msg_info ("http_async_callback: rereading map data from %s", cbd->data->host);
+ }
+ cbd->state = 2;
+ }
+ }
+ else {
+ msg_err ("http_async_callback: connection with http server terminated incorrectly");
+ free_http_cbdata (cbd);
+ }
+}
+
+
+static void
+http_callback (int fd, short what, void *ud)
+{
+ struct rspamd_map *map = ud;
+ struct http_map_data *data = map->map_data;
+ int sock;
+ struct http_callback_data *cbd;
+
+ /* Plan event again with jitter */
+ evtimer_del (&map->ev);
+ map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double ();
+ map->tv.tv_usec = 0;
+ evtimer_add (&map->ev, &map->tv);
+
+ /* Connect asynced */
+ if ((sock = connect_http (map, data, TRUE)) == -1) {
+ return;
+ }
+ else {
+ /* Plan event */
+ cbd = g_malloc (sizeof (struct http_callback_data));
+ event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
+ cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
+ cbd->tv.tv_usec = 0;
+ cbd->map = map;
+ cbd->data = data;
+ cbd->state = 0;
+ cbd->fd = sock;
+ event_add (&cbd->ev, &cbd->tv);
+ }
+}
+
/* Start watching event for all maps */
void
start_map_watch (void)
@@ -357,8 +698,14 @@ start_map_watch (void)
map->tv.tv_usec = 0;
evtimer_add (&map->ev, &map->tv);
}
- else {
- /* XXX */
+ else if (map->protocol == PROTO_HTTP) {
+ evtimer_set (&map->ev, http_callback, map);
+ /* Read initial data */
+ read_http_sync (map, map->map_data);
+ /* Plan event with jitter */
+ map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double ();
+ map->tv.tv_usec = 0;
+ evtimer_add (&map->ev, &map->tv);
}
cur = g_list_next (cur);
}
diff --git a/src/map.h b/src/map.h
index dfd63a06d..7a207c28e 100644
--- a/src/map.h
+++ b/src/map.h
@@ -29,7 +29,7 @@ struct http_map_data {
time_t last_checked;
};
-typedef void (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
+typedef u_char* (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
typedef void (*map_fin_cb_t)(memory_pool_t *pool, struct map_cb_data *data);
struct rspamd_map {
@@ -47,9 +47,9 @@ gboolean add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin
void start_map_watch (void);
/* Common callbacks */
-void read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
+u_char* read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
void fin_radix_list (memory_pool_t *pool, struct map_cb_data *data);
-void read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
+u_char* read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
void fin_host_list (memory_pool_t *pool, struct map_cb_data *data);
#endif
diff --git a/src/plugins/emails.c b/src/plugins/emails.c
index e08d3ff8f..719350e05 100644
--- a/src/plugins/emails.c
+++ b/src/plugins/emails.c
@@ -68,7 +68,7 @@ emails_module_init (struct config_file *cfg, struct module_ctx **ctx)
email_module_ctx->filter = emails_mime_filter;
email_module_ctx->email_pool = memory_pool_new (memory_pool_get_size ());
email_module_ctx->email_re = g_regex_new (email_re_text, G_REGEX_RAW | G_REGEX_OPTIMIZE | G_REGEX_CASELESS, 0, &err);
- email_module_ctx->blacklist = g_hash_table_new (g_str_hash, g_str_equal);
+ email_module_ctx->blacklist = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
*ctx = (struct module_ctx *)email_module_ctx;
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 452f8749a..1b3ae8712 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -705,7 +705,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
struct redirector_param *param;
struct timeval *timeout;
- s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE);
+ s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE, TRUE);
if (s == -1) {
msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s",
diff --git a/src/util.c b/src/util.c
index 8001fcb18..364e1b0a4 100644
--- a/src/util.c
+++ b/src/util.c
@@ -50,7 +50,7 @@ make_socket_nonblocking (int fd)
}
int
-make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
+make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
{
int fd, r, optlen, on = 1, s_error;
int serrno;
@@ -63,7 +63,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
return -1;
}
- if (make_socket_nonblocking(fd) < 0) {
+ if (async && make_socket_nonblocking(fd) < 0) {
goto out;
}
@@ -87,7 +87,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
}
if (r == -1) {
- if (errno != EINPROGRESS) {
+ if (!async || errno != EINPROGRESS) {
msg_warn ("make_tcp_socket: bind/connect failed: %d, '%s'", errno, strerror (errno));
goto out;
}
diff --git a/src/util.h b/src/util.h
index aa08bcc3a..d7665d462 100644
--- a/src/util.h
+++ b/src/util.h
@@ -10,7 +10,7 @@ struct rspamd_main;
struct workq;
/* Create socket and bind or connect it to specified address and port */
-int make_tcp_socket (struct in_addr *, u_short, gboolean is_server);
+int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
/* Accept from socket */
int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len);
/* Create and bind or connect unix socket */