]> source.dussan.org Git - rspamd.git/commitdiff
* Add http maps support
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 21 Jul 2009 14:50:45 +0000 (18:50 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 21 Jul 2009 14:50:45 +0000 (18:50 +0400)
src/lmtp_proto.c
src/main.c
src/map.c
src/map.h
src/plugins/emails.c
src/plugins/surbl.c
src/util.c
src/util.h

index 69f343aaf13c2665fb5b9da627f1749e3c33fed9..62967c03b95f0f8dc9654622916f180d50e00402 100644 (file)
@@ -426,7 +426,7 @@ lmtp_deliver_mta (struct worker_task *task)
                sock = make_unix_socket (task->cfg->deliver_host, un, FALSE);
        }
        else {
-               sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE);
+               sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE, TRUE);
        }
        if (sock == -1) {
                msg_warn ("lmtp_deliver_mta: cannot create socket for %s, %s", task->cfg->deliver_host, strerror (errno));
index 6052936edb4e2d2a5c173dc514c99a806e153d9e..c9c0fdfa6203a5e124df2747192860c8dbd34de5 100644 (file)
@@ -367,7 +367,7 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path)
        struct sockaddr_un *un_addr;
        /* Create listen socket */
        if (family == AF_INET) {
-               if ((listen_sock = make_tcp_socket (addr, port, TRUE)) == -1) {
+               if ((listen_sock = make_tcp_socket (addr, port, TRUE, TRUE)) == -1) {
                        msg_err ("create_listen_socket: cannot create tcp listen socket. %s", strerror (errno));
                }
        }
index a1ba5251d4f339a1873881ed6d8fef5954e90481..df6ff896036df4c8335e0a44d66a209bb033ad7e 100644 (file)
--- a/src/map.c
+++ b/src/map.c
@@ -36,16 +36,232 @@ static memory_pool_t *map_pool = NULL;
 static GList *maps = NULL;
 static char *hash_fill = "1";
 
+/* Http reply */
+struct http_reply {
+       int code;
+       GHashTable *headers;
+       char *cur_header;
+
+       int parser_state;
+};
+
+struct http_callback_data {
+       struct event ev;
+       struct timeval tv;
+       struct rspamd_map *map;
+       struct http_map_data *data;
+       struct http_reply *reply;
+       struct map_cb_data cbdata;
+       
+       int state;
+       int fd;
+};
+
 /* Value in seconds after whitch we would try to do stat on list file */
 #define MON_TIMEOUT 10
+/* HTTP timeouts */
+#define HTTP_CONNECT_TIMEOUT 2
+#define HTTP_READ_TIMEOUT 10
+
+static int
+connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_async)
+{
+       int sock;
+
+       if ((sock = make_tcp_socket (&data->addr, data->port, FALSE, is_async)) == -1) {
+               msg_info ("connect_http: cannot connect to http server %s: %d, %s", data->host, errno, strerror (errno));
+               return -1;
+       }
+
+       return sock;
+}
+
+static void
+write_http_request (struct rspamd_map *map, struct http_map_data *data, int sock)
+{
+       char outbuf[BUFSIZ];
+       int r;
+
+       r = snprintf (outbuf, sizeof (outbuf), "GET %s%s HTTP/1.1" CRLF 
+                                                                                  "Connection: close" CRLF
+                                              "Host: %s" CRLF, (*data->path == '/') ? "" : "/", 
+                                                                                  data->path, data->host);
+       if (data->last_checked != 0) {
+               r += snprintf (outbuf + r, sizeof (outbuf) - r, "If-Modified-Since: %s" CRLF, asctime (gmtime (&data->last_checked)));
+       }
+
+       r += snprintf (outbuf + r, sizeof (outbuf) - r, CRLF);
+
+       if (write (sock, outbuf, r) == -1) {
+               msg_err ("write_http_request: failed to write request: %d, %s", errno, strerror (errno));
+       }
+}
+
+static u_char *
+parse_http_reply (u_char *chunk, size_t len, struct http_reply *reply)
+{
+       u_char *s, *p, *err_str, *tmp;
+       p = chunk;
+
+       while (p - chunk < len) {
+               switch (reply->parser_state) {
+                       /* Search status code */
+                       case 0:
+                               /* Search for status code */
+                               if (*p != ' ') {
+                                       p ++;
+                               }
+                               else {
+                                       /* Try to parse HTTP reply code */
+                                       reply->code = strtoul (++p, (char **)&err_str, 10);
+                                       if (*err_str != ' ') {
+                                               msg_info ("parse_http_reply: error while reading HTTP status code: %s", p);
+                                               return NULL;
+                                       }
+                                       /* Now skip to end of status string */
+                                       reply->parser_state = 1;
+                                       continue;
+                               }
+                               break;
+                       /* Skip to end of line */
+                       case 1:
+                               if (*p == '\n') {
+                                       /* Switch to read header state */
+                                       reply->parser_state = 2;
+                               }
+                               /* Each skipped symbol is proceeded */
+                               s = ++p;
+                               break;
+                       /* Read header value */
+                       case 2:
+                               if (*p == ':') {
+                                       reply->cur_header = g_malloc (p - s + 1);
+                                       g_strlcpy (reply->cur_header, s, p - s + 1);
+                                       reply->parser_state = 3;
+                               }
+                               else if (*p == '\r' && *(p + 1) == '\n') {
+                                       /* Last empty line */
+                                       reply->parser_state = 5;
+                               }
+                               p ++;
+                               break;
+                       /* Skip spaces after header name */
+                       case 3:
+                               if (*p != ' ') {
+                                       s = p;
+                                       reply->parser_state = 4;
+                               }
+                               else {
+                                       p ++;
+                               }
+                               break;
+                       /* Read header value */
+                       case 4:
+                               if (*p == '\r') {
+                                       if (reply->cur_header != NULL) {
+                                               tmp = g_malloc (p - s + 1);
+                                               g_strlcpy (tmp, s, p - s + 1);
+                                               g_hash_table_insert (reply->headers, reply->cur_header, tmp);
+                                               reply->cur_header = NULL;
+                                       }
+                                       reply->parser_state = 1;
+                               }
+                               p ++;
+                               break;
+                       case 5:
+                               /* Set pointer to begining of HTTP body */
+                               p ++;
+                               s = p;
+                               reply->parser_state = 6;
+                               break;
+                       case 6:
+                               /* Headers parsed, just return */
+                               return p;
+                               break;
+               }
+       }
+
+       return s;
+}
+
+static gboolean
+read_http_common (struct rspamd_map *map, struct http_map_data *data, struct http_reply *reply, struct map_cb_data *cbdata, int fd)
+{
+       u_char buf[BUFSIZ], *remain;
+       int rlen;
+       ssize_t r;
+
+       rlen = 0;
+       if ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
+               buf[r ++] = '\0';
+               remain = parse_http_reply (buf, r - 1, reply);
+               if (remain != NULL && remain != buf) {
+                       /* copy remaining buffer to start of buffer */
+                       rlen = r - (remain - buf);
+                       memmove (buf, remain, rlen);
+               }
+               if (reply->parser_state == 6) {
+                       if (reply->code != 200 && reply->code != 304) {
+                               msg_err ("read_http: got error reply from server %s, %d", data->host, reply->code);
+                               return FALSE;
+                       }
+                       remain = map->read_callback (map->pool, buf, r - 1, cbdata);
+                       if (remain != NULL && remain != buf) {
+                               /* copy remaining buffer to start of buffer */
+                               rlen = r - (remain - buf);
+                               memmove (buf, remain, rlen);
+                       }
+               }
+       }
+       return FALSE;
+}
+
+static void
+read_http_sync (struct rspamd_map *map, struct http_map_data *data)
+{
+       struct map_cb_data cbdata;
+       int fd;
+       struct http_reply *repl;
+
+       if (map->read_callback == NULL || map->fin_callback == NULL) {
+               msg_err ("read_map_file: bad callback for reading map file");
+               return;
+       }
+
+       /* Connect synced */
+       if ((fd = connect_http (map, data, FALSE)) == -1) {
+               return;
+       }
+       write_http_request (map, data, fd);
+       
+       cbdata.state = 0;
+       cbdata.prev_data = *map->user_data;
+       cbdata.cur_data = NULL;
+
+       repl = g_malloc (sizeof (struct http_reply));
+       repl->parser_state = 0;
+       repl->code = 404;
+       repl->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
+       
+       while (read_http_common (map, data, repl, &cbdata, fd));
+
+       close (fd);
+
+       map->fin_callback (map->pool, &cbdata);
+       *map->user_data = cbdata.cur_data;
+       data->last_checked = time (NULL);
+
+       g_hash_table_destroy (repl->headers);
+       g_free (repl);
+}
 
 static void
 read_map_file (struct rspamd_map *map, struct file_map_data *data)
 {
        struct map_cb_data cbdata;
-       char buf[BUFSIZ];
+       u_char buf[BUFSIZ], *remain;
        ssize_t r;
-       int fd;
+       int fd, rlen;
        
        if (map->read_callback == NULL || map->fin_callback == NULL) {
                msg_err ("read_map_file: bad callback for reading map file");
@@ -60,10 +276,16 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data)
        cbdata.state = 0;
        cbdata.prev_data = *map->user_data;
        cbdata.cur_data = NULL;
-
-       while ((r = read (fd, buf, sizeof (buf) - 1)) > 0) {
+       
+       rlen = 0;
+       while ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
                buf[r ++] = '\0';
-               map->read_callback (map->pool, buf, r, &cbdata);
+               remain = map->read_callback (map->pool, buf, r - 1, &cbdata);
+               if (remain != NULL) {
+                       /* copy remaining buffer to start of buffer */
+                       rlen = r - (remain - buf);
+                       memmove (buf, remain, rlen);
+               }
        }
                
        close (fd);
@@ -158,7 +380,7 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback
                        }
                }
                /* Now try to connect */
-               if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE)) == -1) {
+               if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE, FALSE)) == -1) {
                        msg_info ("add_map: cannot connect to http server %s: %d, %s", hdata->host, errno, strerror (errno));
                        return FALSE;
                }
@@ -175,17 +397,18 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback
 
 typedef void (*insert_func)(gpointer st, gconstpointer key, gpointer value);
 
-static gboolean
+static u_char*
 abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data, insert_func func)
 {
-       u_char *s, *p, *str;
+       u_char *s, *p, *str, *start;
 
        p = chunk;
+       start = p;
 
        str = g_malloc (len + 1);
        s = str;
 
-       while (*p) {
+       while (p - chunk < len) {
                switch (data->state) {
                        /* READ_SYMBOL */
                        case 0:
@@ -193,8 +416,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
                                        if (s != str) {
                                                *s = '\0';
                                                s = memory_pool_strdup (pool, str);
-                                               func (data->cur_data, s, hash_fill);
+                                               if (strlen (s) > 0) {
+                                                       func (data->cur_data, s, hash_fill);
+                                               }
                                                s = str;
+                                               start = p;
                                        }
                                        data->state = 1;
                                }
@@ -202,8 +428,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
                                        if (s != str) {
                                                *s = '\0';
                                                s = memory_pool_strdup (pool, str);
-                                               func (data->cur_data, s, hash_fill);
+                                               if (strlen (s) > 0) {
+                                                       func (data->cur_data, s, hash_fill);
+                                               }
                                                s = str;
+                                               start = p;
                                        }
                                        while (*p == '\r' || *p == '\n') {
                                                p ++;
@@ -225,6 +454,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
                                                p ++;
                                        }
                                        s = str;
+                                       start = p;
                                        data->state = 0;
                                }
                                else {
@@ -236,7 +466,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
 
        g_free (str);
 
-       return TRUE;
+       return start;
 }
 
 static void
@@ -280,13 +510,13 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
        }
 }
 
-void
+u_char *
 read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data)
 {      
        if (data->cur_data == NULL) {
                data->cur_data = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
        }
-       (void)abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert);
+       return abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert);
 }
 
 void 
@@ -297,13 +527,13 @@ fin_host_list (memory_pool_t *pool, struct map_cb_data *data)
        }
 }
 
-void
+u_char *
 read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data)
 {
        if (data->cur_data == NULL) {
                data->cur_data = radix_tree_create ();
        }
-       (void)abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper);
+       return abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper);
 }
 
 void 
@@ -338,6 +568,117 @@ file_callback (int fd, short what, void *ud)
        read_map_file (map, data);
 }
 
+static void
+free_http_cbdata (struct http_callback_data *cbd)
+{
+       if (cbd->reply) {
+               g_hash_table_destroy (cbd->reply->headers);
+               g_free (cbd->reply);
+       }
+       event_del (&cbd->ev);
+       close (cbd->fd);
+       g_free (cbd);
+}
+
+static void
+http_async_callback (int fd, short what, void *ud)
+{
+       struct http_callback_data *cbd = ud;
+               
+       /* Begin of connection */
+       if (what == EV_WRITE) {
+               if (cbd->state == 0) {
+                       /* Can write request */
+                       write_http_request (cbd->map, cbd->data, fd);
+                       /* Plan reading */
+                       event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
+                       cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
+                       cbd->tv.tv_usec = 0;
+                       cbd->state = 1;
+                       /* Allocate reply structure */
+                       cbd->reply = g_malloc (sizeof (struct http_reply));
+                       cbd->reply->parser_state = 0;
+                       cbd->reply->code = 404;
+                       cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
+                       cbd->cbdata.state = 0;
+                       cbd->cbdata.prev_data = *cbd->map->user_data;
+                       cbd->cbdata.cur_data = NULL;
+
+                       event_add (&cbd->ev, &cbd->tv);
+               }
+               else {
+                       msg_err ("http_async_callback: bad state when got write readiness");
+                       free_http_cbdata (cbd);
+                       return;
+               }
+       }
+       else if (what == EV_READ) {
+               if (cbd->state >= 1) {
+                       if (!read_http_common (cbd->map, cbd->data, cbd->reply, &cbd->cbdata, cbd->fd)) {
+                               /* Handle Not-Modified in a special way */
+                               if (cbd->reply->code == 304) {
+                                       cbd->data->last_checked = time (NULL);
+                                       msg_info ("http_async_callback: data is not modified for server %s", cbd->data->host);
+                               }
+                               else if (cbd->cbdata.cur_data != NULL) {
+                                       cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
+                                       *cbd->map->user_data = cbd->cbdata.cur_data;
+                                       cbd->data->last_checked = time (NULL);
+                               }
+                               if (cbd->state == 1 && cbd->reply->code == 200) {
+                                       /* Write to log that data is modified */
+                                       msg_info ("http_async_callback: rereading map data from %s", cbd->data->host);
+                               }
+
+                               free_http_cbdata (cbd);
+                               return;
+                       }       
+                       else if (cbd->state == 1) {
+                               /* Write to log that data is modified */
+                               msg_info ("http_async_callback: rereading map data from %s", cbd->data->host);
+                       }
+                       cbd->state = 2;
+               }
+       }
+       else {
+               msg_err ("http_async_callback: connection with http server terminated incorrectly");
+               free_http_cbdata (cbd);
+       }
+}
+
+
+static void
+http_callback (int fd, short what, void *ud)
+{
+       struct rspamd_map *map = ud;
+       struct http_map_data *data = map->map_data;
+       int sock;
+       struct http_callback_data *cbd;
+
+       /* Plan event again with jitter */
+       evtimer_del (&map->ev);
+       map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double ();
+       map->tv.tv_usec = 0;
+       evtimer_add (&map->ev, &map->tv);
+
+       /* Connect asynced */
+       if ((sock = connect_http (map, data, TRUE)) == -1) {
+               return;
+       }
+       else {
+               /* Plan event */
+               cbd = g_malloc (sizeof (struct http_callback_data));
+               event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
+               cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
+               cbd->tv.tv_usec = 0;
+               cbd->map = map;
+               cbd->data = data;
+               cbd->state = 0;
+               cbd->fd = sock;
+               event_add (&cbd->ev, &cbd->tv);
+       }
+}
+
 /* Start watching event for all maps */
 void 
 start_map_watch (void)
@@ -357,8 +698,14 @@ start_map_watch (void)
                        map->tv.tv_usec = 0;
                        evtimer_add (&map->ev, &map->tv);
                }
-               else {
-                       /* XXX */
+               else if (map->protocol == PROTO_HTTP) {
+                       evtimer_set (&map->ev, http_callback, map);
+                       /* Read initial data */
+                       read_http_sync (map, map->map_data);
+                       /* Plan event with jitter */
+                       map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double ();
+                       map->tv.tv_usec = 0;
+                       evtimer_add (&map->ev, &map->tv);
                }
                cur = g_list_next (cur);
        }
index dfd63a06d8f56dca6b74f1b1773fde481ee9a1fb..7a207c28ece48470fafa20ed9c4f2b39e5263167 100644 (file)
--- a/src/map.h
+++ b/src/map.h
@@ -29,7 +29,7 @@ struct http_map_data {
        time_t last_checked;
 };
 
-typedef void (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
+typedef u_char* (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
 typedef void (*map_fin_cb_t)(memory_pool_t *pool, struct map_cb_data *data);
 
 struct rspamd_map {
@@ -47,9 +47,9 @@ gboolean add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin
 void start_map_watch (void);
 
 /* Common callbacks */
-void read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
+u_char* read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
 void fin_radix_list (memory_pool_t *pool, struct map_cb_data *data);
-void read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
+u_char* read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
 void fin_host_list (memory_pool_t *pool, struct map_cb_data *data);
 
 #endif
index e08d3ff8fe64f4380e11ca5aa7c9ec82c801fbc5..719350e05ec4e356e74ed27adb916be741aa9e84 100644 (file)
@@ -68,7 +68,7 @@ emails_module_init (struct config_file *cfg, struct module_ctx **ctx)
        email_module_ctx->filter = emails_mime_filter;
        email_module_ctx->email_pool = memory_pool_new (memory_pool_get_size ());
        email_module_ctx->email_re = g_regex_new (email_re_text, G_REGEX_RAW | G_REGEX_OPTIMIZE | G_REGEX_CASELESS, 0, &err);
-       email_module_ctx->blacklist = g_hash_table_new (g_str_hash, g_str_equal);
+       email_module_ctx->blacklist = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
        
        *ctx = (struct module_ctx *)email_module_ctx;
        
index 452f8749a32797a127a05017464513c60244e8ef..1b3ae8712c464273876aa32110d0ef018d9bf54f 100644 (file)
@@ -705,7 +705,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
        struct redirector_param *param;
        struct timeval *timeout;
 
-       s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE);
+       s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE, TRUE);
 
        if (s == -1) {
                msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s", 
index 8001fcb1891f788db29e7c7e299292d268542d40..364e1b0a4820b63ad7d73432c88dc8398c1a3c55 100644 (file)
@@ -50,7 +50,7 @@ make_socket_nonblocking (int fd)
 }
 
 int
-make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
+make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
 {
        int fd, r, optlen, on = 1, s_error;
        int serrno;
@@ -63,7 +63,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
                return -1;
        }
 
-       if (make_socket_nonblocking(fd) < 0) {
+       if (async && make_socket_nonblocking(fd) < 0) {
                goto out;
        }
        
@@ -87,7 +87,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
        }
 
        if (r == -1) {
-               if (errno != EINPROGRESS) {
+               if (!async || errno != EINPROGRESS) {
                        msg_warn ("make_tcp_socket: bind/connect failed: %d, '%s'", errno, strerror (errno));
                        goto out;
                }
index aa08bcc3ac1dc74f9f0d64639e45bd0f52d9179b..d7665d462f5c4050ff00789b608873969a899106 100644 (file)
@@ -10,7 +10,7 @@ struct rspamd_main;
 struct workq;
 
 /* Create socket and bind or connect it to specified address and port */
-int make_tcp_socket (struct in_addr *, u_short, gboolean is_server);
+int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
 /* Accept from socket */
 int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len);
 /* Create and bind or connect unix socket */