From ecc3b51cfdd7aecdb7a02791424d8e8cfcd22453 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 21 Jul 2009 18:50:45 +0400 Subject: [PATCH] * Add http maps support --- src/lmtp_proto.c | 2 +- src/main.c | 2 +- src/map.c | 383 +++++++++++++++++++++++++++++++++++++++++-- src/map.h | 6 +- src/plugins/emails.c | 2 +- src/plugins/surbl.c | 2 +- src/util.c | 6 +- src/util.h | 2 +- 8 files changed, 376 insertions(+), 29 deletions(-) diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c index 69f343aaf..62967c03b 100644 --- a/src/lmtp_proto.c +++ b/src/lmtp_proto.c @@ -426,7 +426,7 @@ lmtp_deliver_mta (struct worker_task *task) sock = make_unix_socket (task->cfg->deliver_host, un, FALSE); } else { - sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE); + sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE, TRUE); } if (sock == -1) { msg_warn ("lmtp_deliver_mta: cannot create socket for %s, %s", task->cfg->deliver_host, strerror (errno)); diff --git a/src/main.c b/src/main.c index 6052936ed..c9c0fdfa6 100644 --- a/src/main.c +++ b/src/main.c @@ -367,7 +367,7 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path) struct sockaddr_un *un_addr; /* Create listen socket */ if (family == AF_INET) { - if ((listen_sock = make_tcp_socket (addr, port, TRUE)) == -1) { + if ((listen_sock = make_tcp_socket (addr, port, TRUE, TRUE)) == -1) { msg_err ("create_listen_socket: cannot create tcp listen socket. %s", strerror (errno)); } } diff --git a/src/map.c b/src/map.c index a1ba5251d..df6ff8960 100644 --- a/src/map.c +++ b/src/map.c @@ -36,16 +36,232 @@ static memory_pool_t *map_pool = NULL; static GList *maps = NULL; static char *hash_fill = "1"; +/* Http reply */ +struct http_reply { + int code; + GHashTable *headers; + char *cur_header; + + int parser_state; +}; + +struct http_callback_data { + struct event ev; + struct timeval tv; + struct rspamd_map *map; + struct http_map_data *data; + struct http_reply *reply; + struct map_cb_data cbdata; + + int state; + int fd; +}; + /* Value in seconds after whitch we would try to do stat on list file */ #define MON_TIMEOUT 10 +/* HTTP timeouts */ +#define HTTP_CONNECT_TIMEOUT 2 +#define HTTP_READ_TIMEOUT 10 + +static int +connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_async) +{ + int sock; + + if ((sock = make_tcp_socket (&data->addr, data->port, FALSE, is_async)) == -1) { + msg_info ("connect_http: cannot connect to http server %s: %d, %s", data->host, errno, strerror (errno)); + return -1; + } + + return sock; +} + +static void +write_http_request (struct rspamd_map *map, struct http_map_data *data, int sock) +{ + char outbuf[BUFSIZ]; + int r; + + r = snprintf (outbuf, sizeof (outbuf), "GET %s%s HTTP/1.1" CRLF + "Connection: close" CRLF + "Host: %s" CRLF, (*data->path == '/') ? "" : "/", + data->path, data->host); + if (data->last_checked != 0) { + r += snprintf (outbuf + r, sizeof (outbuf) - r, "If-Modified-Since: %s" CRLF, asctime (gmtime (&data->last_checked))); + } + + r += snprintf (outbuf + r, sizeof (outbuf) - r, CRLF); + + if (write (sock, outbuf, r) == -1) { + msg_err ("write_http_request: failed to write request: %d, %s", errno, strerror (errno)); + } +} + +static u_char * +parse_http_reply (u_char *chunk, size_t len, struct http_reply *reply) +{ + u_char *s, *p, *err_str, *tmp; + p = chunk; + + while (p - chunk < len) { + switch (reply->parser_state) { + /* Search status code */ + case 0: + /* Search for status code */ + if (*p != ' ') { + p ++; + } + else { + /* Try to parse HTTP reply code */ + reply->code = strtoul (++p, (char **)&err_str, 10); + if (*err_str != ' ') { + msg_info ("parse_http_reply: error while reading HTTP status code: %s", p); + return NULL; + } + /* Now skip to end of status string */ + reply->parser_state = 1; + continue; + } + break; + /* Skip to end of line */ + case 1: + if (*p == '\n') { + /* Switch to read header state */ + reply->parser_state = 2; + } + /* Each skipped symbol is proceeded */ + s = ++p; + break; + /* Read header value */ + case 2: + if (*p == ':') { + reply->cur_header = g_malloc (p - s + 1); + g_strlcpy (reply->cur_header, s, p - s + 1); + reply->parser_state = 3; + } + else if (*p == '\r' && *(p + 1) == '\n') { + /* Last empty line */ + reply->parser_state = 5; + } + p ++; + break; + /* Skip spaces after header name */ + case 3: + if (*p != ' ') { + s = p; + reply->parser_state = 4; + } + else { + p ++; + } + break; + /* Read header value */ + case 4: + if (*p == '\r') { + if (reply->cur_header != NULL) { + tmp = g_malloc (p - s + 1); + g_strlcpy (tmp, s, p - s + 1); + g_hash_table_insert (reply->headers, reply->cur_header, tmp); + reply->cur_header = NULL; + } + reply->parser_state = 1; + } + p ++; + break; + case 5: + /* Set pointer to begining of HTTP body */ + p ++; + s = p; + reply->parser_state = 6; + break; + case 6: + /* Headers parsed, just return */ + return p; + break; + } + } + + return s; +} + +static gboolean +read_http_common (struct rspamd_map *map, struct http_map_data *data, struct http_reply *reply, struct map_cb_data *cbdata, int fd) +{ + u_char buf[BUFSIZ], *remain; + int rlen; + ssize_t r; + + rlen = 0; + if ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) { + buf[r ++] = '\0'; + remain = parse_http_reply (buf, r - 1, reply); + if (remain != NULL && remain != buf) { + /* copy remaining buffer to start of buffer */ + rlen = r - (remain - buf); + memmove (buf, remain, rlen); + } + if (reply->parser_state == 6) { + if (reply->code != 200 && reply->code != 304) { + msg_err ("read_http: got error reply from server %s, %d", data->host, reply->code); + return FALSE; + } + remain = map->read_callback (map->pool, buf, r - 1, cbdata); + if (remain != NULL && remain != buf) { + /* copy remaining buffer to start of buffer */ + rlen = r - (remain - buf); + memmove (buf, remain, rlen); + } + } + } + return FALSE; +} + +static void +read_http_sync (struct rspamd_map *map, struct http_map_data *data) +{ + struct map_cb_data cbdata; + int fd; + struct http_reply *repl; + + if (map->read_callback == NULL || map->fin_callback == NULL) { + msg_err ("read_map_file: bad callback for reading map file"); + return; + } + + /* Connect synced */ + if ((fd = connect_http (map, data, FALSE)) == -1) { + return; + } + write_http_request (map, data, fd); + + cbdata.state = 0; + cbdata.prev_data = *map->user_data; + cbdata.cur_data = NULL; + + repl = g_malloc (sizeof (struct http_reply)); + repl->parser_state = 0; + repl->code = 404; + repl->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free); + + while (read_http_common (map, data, repl, &cbdata, fd)); + + close (fd); + + map->fin_callback (map->pool, &cbdata); + *map->user_data = cbdata.cur_data; + data->last_checked = time (NULL); + + g_hash_table_destroy (repl->headers); + g_free (repl); +} static void read_map_file (struct rspamd_map *map, struct file_map_data *data) { struct map_cb_data cbdata; - char buf[BUFSIZ]; + u_char buf[BUFSIZ], *remain; ssize_t r; - int fd; + int fd, rlen; if (map->read_callback == NULL || map->fin_callback == NULL) { msg_err ("read_map_file: bad callback for reading map file"); @@ -60,10 +276,16 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data) cbdata.state = 0; cbdata.prev_data = *map->user_data; cbdata.cur_data = NULL; - - while ((r = read (fd, buf, sizeof (buf) - 1)) > 0) { + + rlen = 0; + while ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) { buf[r ++] = '\0'; - map->read_callback (map->pool, buf, r, &cbdata); + remain = map->read_callback (map->pool, buf, r - 1, &cbdata); + if (remain != NULL) { + /* copy remaining buffer to start of buffer */ + rlen = r - (remain - buf); + memmove (buf, remain, rlen); + } } close (fd); @@ -158,7 +380,7 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback } } /* Now try to connect */ - if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE)) == -1) { + if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE, FALSE)) == -1) { msg_info ("add_map: cannot connect to http server %s: %d, %s", hdata->host, errno, strerror (errno)); return FALSE; } @@ -175,17 +397,18 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback typedef void (*insert_func)(gpointer st, gconstpointer key, gpointer value); -static gboolean +static u_char* abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data, insert_func func) { - u_char *s, *p, *str; + u_char *s, *p, *str, *start; p = chunk; + start = p; str = g_malloc (len + 1); s = str; - while (*p) { + while (p - chunk < len) { switch (data->state) { /* READ_SYMBOL */ case 0: @@ -193,8 +416,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_ if (s != str) { *s = '\0'; s = memory_pool_strdup (pool, str); - func (data->cur_data, s, hash_fill); + if (strlen (s) > 0) { + func (data->cur_data, s, hash_fill); + } s = str; + start = p; } data->state = 1; } @@ -202,8 +428,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_ if (s != str) { *s = '\0'; s = memory_pool_strdup (pool, str); - func (data->cur_data, s, hash_fill); + if (strlen (s) > 0) { + func (data->cur_data, s, hash_fill); + } s = str; + start = p; } while (*p == '\r' || *p == '\n') { p ++; @@ -225,6 +454,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_ p ++; } s = str; + start = p; data->state = 0; } else { @@ -236,7 +466,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_ g_free (str); - return TRUE; + return start; } static void @@ -280,13 +510,13 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value) } } -void +u_char * read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data) { if (data->cur_data == NULL) { data->cur_data = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); } - (void)abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert); + return abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert); } void @@ -297,13 +527,13 @@ fin_host_list (memory_pool_t *pool, struct map_cb_data *data) } } -void +u_char * read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data) { if (data->cur_data == NULL) { data->cur_data = radix_tree_create (); } - (void)abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper); + return abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper); } void @@ -338,6 +568,117 @@ file_callback (int fd, short what, void *ud) read_map_file (map, data); } +static void +free_http_cbdata (struct http_callback_data *cbd) +{ + if (cbd->reply) { + g_hash_table_destroy (cbd->reply->headers); + g_free (cbd->reply); + } + event_del (&cbd->ev); + close (cbd->fd); + g_free (cbd); +} + +static void +http_async_callback (int fd, short what, void *ud) +{ + struct http_callback_data *cbd = ud; + + /* Begin of connection */ + if (what == EV_WRITE) { + if (cbd->state == 0) { + /* Can write request */ + write_http_request (cbd->map, cbd->data, fd); + /* Plan reading */ + event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd); + cbd->tv.tv_sec = HTTP_READ_TIMEOUT; + cbd->tv.tv_usec = 0; + cbd->state = 1; + /* Allocate reply structure */ + cbd->reply = g_malloc (sizeof (struct http_reply)); + cbd->reply->parser_state = 0; + cbd->reply->code = 404; + cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free); + cbd->cbdata.state = 0; + cbd->cbdata.prev_data = *cbd->map->user_data; + cbd->cbdata.cur_data = NULL; + + event_add (&cbd->ev, &cbd->tv); + } + else { + msg_err ("http_async_callback: bad state when got write readiness"); + free_http_cbdata (cbd); + return; + } + } + else if (what == EV_READ) { + if (cbd->state >= 1) { + if (!read_http_common (cbd->map, cbd->data, cbd->reply, &cbd->cbdata, cbd->fd)) { + /* Handle Not-Modified in a special way */ + if (cbd->reply->code == 304) { + cbd->data->last_checked = time (NULL); + msg_info ("http_async_callback: data is not modified for server %s", cbd->data->host); + } + else if (cbd->cbdata.cur_data != NULL) { + cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata); + *cbd->map->user_data = cbd->cbdata.cur_data; + cbd->data->last_checked = time (NULL); + } + if (cbd->state == 1 && cbd->reply->code == 200) { + /* Write to log that data is modified */ + msg_info ("http_async_callback: rereading map data from %s", cbd->data->host); + } + + free_http_cbdata (cbd); + return; + } + else if (cbd->state == 1) { + /* Write to log that data is modified */ + msg_info ("http_async_callback: rereading map data from %s", cbd->data->host); + } + cbd->state = 2; + } + } + else { + msg_err ("http_async_callback: connection with http server terminated incorrectly"); + free_http_cbdata (cbd); + } +} + + +static void +http_callback (int fd, short what, void *ud) +{ + struct rspamd_map *map = ud; + struct http_map_data *data = map->map_data; + int sock; + struct http_callback_data *cbd; + + /* Plan event again with jitter */ + evtimer_del (&map->ev); + map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double (); + map->tv.tv_usec = 0; + evtimer_add (&map->ev, &map->tv); + + /* Connect asynced */ + if ((sock = connect_http (map, data, TRUE)) == -1) { + return; + } + else { + /* Plan event */ + cbd = g_malloc (sizeof (struct http_callback_data)); + event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd); + cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT; + cbd->tv.tv_usec = 0; + cbd->map = map; + cbd->data = data; + cbd->state = 0; + cbd->fd = sock; + event_add (&cbd->ev, &cbd->tv); + } +} + /* Start watching event for all maps */ void start_map_watch (void) @@ -357,8 +698,14 @@ start_map_watch (void) map->tv.tv_usec = 0; evtimer_add (&map->ev, &map->tv); } - else { - /* XXX */ + else if (map->protocol == PROTO_HTTP) { + evtimer_set (&map->ev, http_callback, map); + /* Read initial data */ + read_http_sync (map, map->map_data); + /* Plan event with jitter */ + map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double (); + map->tv.tv_usec = 0; + evtimer_add (&map->ev, &map->tv); } cur = g_list_next (cur); } diff --git a/src/map.h b/src/map.h index dfd63a06d..7a207c28e 100644 --- a/src/map.h +++ b/src/map.h @@ -29,7 +29,7 @@ struct http_map_data { time_t last_checked; }; -typedef void (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data); +typedef u_char* (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data); typedef void (*map_fin_cb_t)(memory_pool_t *pool, struct map_cb_data *data); struct rspamd_map { @@ -47,9 +47,9 @@ gboolean add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin void start_map_watch (void); /* Common callbacks */ -void read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data); +u_char* read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data); void fin_radix_list (memory_pool_t *pool, struct map_cb_data *data); -void read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data); +u_char* read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data); void fin_host_list (memory_pool_t *pool, struct map_cb_data *data); #endif diff --git a/src/plugins/emails.c b/src/plugins/emails.c index e08d3ff8f..719350e05 100644 --- a/src/plugins/emails.c +++ b/src/plugins/emails.c @@ -68,7 +68,7 @@ emails_module_init (struct config_file *cfg, struct module_ctx **ctx) email_module_ctx->filter = emails_mime_filter; email_module_ctx->email_pool = memory_pool_new (memory_pool_get_size ()); email_module_ctx->email_re = g_regex_new (email_re_text, G_REGEX_RAW | G_REGEX_OPTIMIZE | G_REGEX_CASELESS, 0, &err); - email_module_ctx->blacklist = g_hash_table_new (g_str_hash, g_str_equal); + email_module_ctx->blacklist = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); *ctx = (struct module_ctx *)email_module_ctx; diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 452f8749a..1b3ae8712 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -705,7 +705,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_ struct redirector_param *param; struct timeval *timeout; - s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE); + s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE, TRUE); if (s == -1) { msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s", diff --git a/src/util.c b/src/util.c index 8001fcb18..364e1b0a4 100644 --- a/src/util.c +++ b/src/util.c @@ -50,7 +50,7 @@ make_socket_nonblocking (int fd) } int -make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server) +make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async) { int fd, r, optlen, on = 1, s_error; int serrno; @@ -63,7 +63,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server) return -1; } - if (make_socket_nonblocking(fd) < 0) { + if (async && make_socket_nonblocking(fd) < 0) { goto out; } @@ -87,7 +87,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server) } if (r == -1) { - if (errno != EINPROGRESS) { + if (!async || errno != EINPROGRESS) { msg_warn ("make_tcp_socket: bind/connect failed: %d, '%s'", errno, strerror (errno)); goto out; } diff --git a/src/util.h b/src/util.h index aa08bcc3a..d7665d462 100644 --- a/src/util.h +++ b/src/util.h @@ -10,7 +10,7 @@ struct rspamd_main; struct workq; /* Create socket and bind or connect it to specified address and port */ -int make_tcp_socket (struct in_addr *, u_short, gboolean is_server); +int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async); /* Accept from socket */ int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len); /* Create and bind or connect unix socket */ -- 2.39.5