From 00916c1794e6e07483d5875dbe143693a84f5bf6 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 20 Aug 2014 14:43:28 +0100 Subject: Rework maps to work with http client. --- src/libutil/map.c | 1314 ++++++++++++++++++++--------------------------------- src/libutil/map.h | 25 - 2 files changed, 488 insertions(+), 851 deletions(-) (limited to 'src/libutil') diff --git a/src/libutil/map.c b/src/libutil/map.c index 710dcdebc..4678aa6dc 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -34,24 +34,37 @@ static const gchar *hash_fill = "1"; -/* Http reply */ -struct http_reply { - gint code; - GHashTable *headers; - gchar *cur_header; - gint parser_state; +/** + * Data specific to file maps + */ +struct file_map_data { + const gchar *filename; + struct stat st; +}; + +/** + * Data specific to HTTP maps + */ +struct http_map_data { + struct addrinfo *addr; + guint16 port; + gchar *path; + gchar *host; + time_t last_checked; + gboolean request_sent; + struct rspamd_http_connection *conn; }; + struct http_callback_data { - struct event ev; struct event_base *ev_base; struct timeval tv; struct rspamd_map *map; struct http_map_data *data; - struct http_reply *reply; struct map_cb_data cbdata; - gint state; + GString *remain_buf; + gint fd; }; @@ -88,370 +101,129 @@ connect_http (struct rspamd_map *map, static void write_http_request (struct rspamd_map *map, struct http_map_data *data, - gint sock) + gint sock, + struct timeval *tv) { - gchar outbuf[BUFSIZ], datebuf[128]; - gint r; + gchar datebuf[128]; struct tm *tm; + struct rspamd_http_message *msg; - tm = gmtime (&data->last_checked); - strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm); - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "GET %s%s HTTP/1.1" CRLF "Connection: close" CRLF "Host: %s" CRLF, - (*data->path == '/') ? "" : "/", - data->path, - data->host); - if (data->last_checked != 0) { - r += rspamd_snprintf (outbuf + r, - sizeof (outbuf) - r, - "If-Modified-Since: %s" CRLF, - datebuf); - } + msg = rspamd_http_new_message (HTTP_REQUEST); - r += rspamd_snprintf (outbuf + r, sizeof (outbuf) - r, CRLF); + msg->url = g_string_new (data->path); + if (data->last_checked != 0) { + tm = gmtime (&data->last_checked); + strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm); - if (write (sock, outbuf, r) == -1) { - msg_err ("failed to write request: %d, %s", errno, strerror (errno)); + rspamd_http_message_add_header (msg, "If-Modified-Since", datebuf); } + + rspamd_http_connection_write_message (data->conn, msg, data->host, NULL, + map, sock, tv, map->ev_base); } /** - * FSM for parsing HTTP reply + * Callback for destroying HTTP callback data */ -static gchar * -parse_http_reply (gchar * chunk, gint len, struct http_reply *reply) +static void +free_http_cbdata (struct http_callback_data *cbd) { - gchar *s, *p, *err_str, *tmp; - p = chunk; - s = chunk; - - while (p - chunk < len) { - switch (reply->parser_state) { - /* Search status code */ - case 0: - /* Search for status code */ - if (*p != ' ') { - p++; - } - else { - /* Try to parse HTTP reply code */ - reply->code = strtoul (++p, (gchar **)&err_str, 10); - if (*err_str != ' ') { - msg_info ("error while reading HTTP status code: %s", p); - return NULL; - } - /* Now skip to end of status string */ - reply->parser_state = 1; - continue; - } - break; - /* Skip to end of line */ - case 1: - if (*p == '\n') { - /* Switch to read header state */ - reply->parser_state = 2; - } - /* Each skipped symbol is proceeded */ - s = ++p; - break; - /* Read header value */ - case 2: - if (*p == ':') { - reply->cur_header = g_malloc (p - s + 1); - rspamd_strlcpy (reply->cur_header, s, p - s + 1); - reply->parser_state = 3; - } - else if (*p == '\r' && *(p + 1) == '\n') { - /* Last empty line */ - reply->parser_state = 5; - } - p++; - break; - /* Skip spaces after header name */ - case 3: - if (*p != ' ') { - s = p; - reply->parser_state = 4; - } - else { - p++; - } - break; - /* Read header value */ - case 4: - if (*p == '\r') { - if (reply->cur_header != NULL) { - tmp = g_malloc (p - s + 1); - rspamd_strlcpy (tmp, s, p - s + 1); - g_hash_table_insert (reply->headers, reply->cur_header, - tmp); - reply->cur_header = NULL; - } - reply->parser_state = 1; - } - p++; - break; - case 5: - /* Set pointer to begining of HTTP body */ - p++; - s = p; - reply->parser_state = 6; - break; - case 6: - /* Headers parsed, just return */ - return p; - break; - } + g_atomic_int_set (cbd->map->locked, 0); + if (cbd->remain_buf) { + g_string_free (cbd->remain_buf, TRUE); } - return s; + rspamd_http_connection_reset (cbd->data->conn); + close (cbd->fd); + g_free (cbd); } -/** - * Read and parse chunked header +/* + * HTTP callbacks */ -static gint -read_chunk_header (gchar * buf, gint len, struct http_map_data *data) +static void +http_map_error (struct rspamd_http_connection *conn, + GError *err) { - gchar chunkbuf[32], *p, *c, *err_str; - gint skip = 0; - - p = chunkbuf; - c = buf; - /* Find hex digits */ - while (g_ascii_isxdigit (*c) && p - chunkbuf < - (gint)(sizeof (chunkbuf) - 1) && skip < len) { - *p++ = *c++; - skip++; - } - *p = '\0'; - data->chunk = strtoul (chunkbuf, &err_str, 16); - if (*err_str != '\0') { - return -1; - } - - /* Now skip to CRLF */ - while (*c != '\n' && skip < len) { - c++; - skip++; - } - if (*c == '\n' && skip < len) { - skip++; - c++; - } - data->chunk_remain = data->chunk; + struct http_callback_data *cbd = conn->ud; - return skip; + msg_err ("connection with http server terminated incorrectly: %s", + err->message); + free_http_cbdata (cbd); } -/** - * Helper callback for reading chunked reply - */ -static gboolean -read_http_chunked (gchar * buf, - size_t len, - struct rspamd_map *map, - struct http_map_data *data, - struct map_cb_data *cbdata) +static int +http_map_finish (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) { - gchar *p = buf, *remain; - gint skip = 0; - - if (data->chunked == 1) { - /* Read first chunk data */ - if ((skip = read_chunk_header (buf, len, data)) != -1) { - p += skip; - len -= skip; - data->chunked = 2; - } - else { - msg_info ("invalid chunked reply: %*s", (gint)len, buf); - return FALSE; - } - } - - if (data->chunk_remain == 0) { - /* Read another chunk */ - if ((skip = read_chunk_header (buf, len, data)) != -1) { - p += skip; - len -= skip; - } - else { - msg_info ("invalid chunked reply: %*s", (gint)len, buf); - return FALSE; - } - if (data->chunk == 0) { - return FALSE; - } - } + struct http_callback_data *cbd = conn->ud; + struct rspamd_map *map; - if (data->chunk_remain <= len ) { - /* Call callback and move remaining buffer */ - remain = map->read_callback (map->pool, p, data->chunk_remain, cbdata); - if (remain != NULL && remain != p + data->chunk_remain) { - /* Copy remaining buffer to start of buffer */ - data->rlen = len - (remain - p); - memmove (buf, remain, data->rlen); - data->chunk_remain -= data->rlen; - } - else { - /* Copy other part */ - data->rlen = len - data->chunk_remain; - if (data->rlen > 0) { - memmove (buf, p + data->chunk_remain, data->rlen); - } - data->chunk_remain = 0; + if (msg->code == 200) { + map = cbd->map; + if (cbd->remain_buf != NULL) { + map->read_callback (map->pool, cbd->remain_buf->str, + cbd->remain_buf->len, &cbd->cbdata); } + map->fin_callback (map->pool, &cbd->cbdata); + *map->user_data = cbd->cbdata.cur_data; + cbd->data->last_checked = msg->date; } - else { - /* Just read another portion of chunk */ - data->chunk_remain -= len; - remain = map->read_callback (map->pool, p, len, cbdata); - if (remain != NULL && remain != p + len) { - /* copy remaining buffer to start of buffer */ - data->rlen = len - (remain - p); - memmove (buf, remain, data->rlen); - } + else if (msg->code == 304) { + msg_info ("data is not modified for server %s", + cbd->data->host); + cbd->data->last_checked = msg->date; } - return TRUE; -} - -/** - * Callback for reading HTTP reply - */ -static gboolean -read_http_common (struct rspamd_map *map, - struct http_map_data *data, - struct http_reply *reply, - struct map_cb_data *cbdata, - gint fd) -{ - gchar *remain, *pos; - ssize_t r; - gchar *te, *date; - - if ((r = - read (fd, data->read_buf + data->rlen, sizeof (data->read_buf) - - data->rlen)) > 0) { - r += data->rlen; - data->rlen = 0; - remain = parse_http_reply (data->read_buf, r, reply); - if (remain != NULL && remain != data->read_buf) { - /* copy remaining data->read_buffer to start of data->read_buffer */ - data->rlen = r - (remain - data->read_buf); - memmove (data->read_buf, remain, data->rlen); - r = data->rlen; - data->rlen = 0; - } - if (r <= 0) { - return TRUE; - } - if (reply->parser_state == 6) { - /* If reply header is parsed successfully, try to read further data */ - if (reply->code != 200 && reply->code != 304) { - msg_err ("got error reply from server %s, %d", - data->host, - reply->code); - return FALSE; - } - else if (reply->code == 304) { - /* Do not read anything */ - return FALSE; - } - pos = data->read_buf; - /* Check for chunked */ - if (data->chunked == 0) { - if ((te = - g_hash_table_lookup (reply->headers, - "Transfer-Encoding")) != NULL) { - if (g_ascii_strcasecmp (te, "chunked") == 0) { - data->chunked = 1; - } - else { - data->chunked = -1; - } - } - else { - data->chunked = -1; - } - } - /* Check for date */ - date = g_hash_table_lookup (reply->headers, "Date"); - if (date != NULL) { - data->last_checked = rspamd_http_parse_date (date, -1); - } - else { - data->last_checked = (time_t)-1; - } - - if (data->chunked > 0) { - return read_http_chunked (data->read_buf, r, map, data, cbdata); - } - /* Read more data */ - remain = map->read_callback (map->pool, pos, r, cbdata); - if (remain != NULL && remain != pos + r) { - /* copy remaining data->read_buffer to start of data->read_buffer */ - data->rlen = r - (remain - pos); - memmove (pos, remain, data->rlen); - } - } - } - else { - return FALSE; - } + free_http_cbdata (cbd); - return TRUE; + return 0; } -/** - * Sync read of HTTP reply - */ -static void -read_http_sync (struct rspamd_map *map, struct http_map_data *data) +static int +http_map_read (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + const gchar *chunk, + gsize len) { - struct map_cb_data cbdata; - gint fd; - struct http_reply *repl; - - if (map->read_callback == NULL || map->fin_callback == NULL) { - msg_err ("bad callback for reading map file"); - return; - } + struct http_callback_data *cbd = conn->ud; + gchar *pos; + struct rspamd_map *map; - /* Connect synced */ - if ((fd = connect_http (map, data, FALSE)) == -1) { - return; + if (msg->code != 200) { + /* Ignore not full replies */ + return 0; } - write_http_request (map, data, fd); - - cbdata.state = 0; - cbdata.map = map; - cbdata.prev_data = *map->user_data; - cbdata.cur_data = NULL; - repl = g_malloc (sizeof (struct http_reply)); - repl->parser_state = 0; - repl->code = 404; - repl->headers = g_hash_table_new_full (rspamd_strcase_hash, - rspamd_strcase_equal, - g_free, - g_free); + map = cbd->map; + if (cbd->remain_buf != NULL) { + /* We need to concatenate incoming buf with the remaining buf */ + g_string_append_len (cbd->remain_buf, chunk, len); - while (read_http_common (map, data, repl, &cbdata, fd)) ; + pos = map->read_callback (map->pool, cbd->remain_buf->str, + cbd->remain_buf->len, &cbd->cbdata); - close (fd); + /* All read */ + if (pos == NULL) { + g_string_free (cbd->remain_buf, TRUE); + cbd->remain_buf = NULL; + } + else { + /* Need to erase data processed */ + g_string_erase (cbd->remain_buf, 0, pos - cbd->remain_buf->str); + } + } + else { + pos = map->read_callback (map->pool, (gchar *)chunk, len, &cbd->cbdata); - map->fin_callback (map->pool, &cbdata); - *map->user_data = cbdata.cur_data; - if (data->last_checked == (time_t)-1) { - data->last_checked = time (NULL); + if (pos != NULL) { + /* Store data in remain buf */ + cbd->remain_buf = g_string_new_len (pos, len - (pos - chunk)); + } } - g_hash_table_destroy (repl->headers); - g_free (repl); + return 0; } /** @@ -500,347 +272,54 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data) } /** - * FSM for parsing lists + * Common file callback */ -gchar * -abstract_parse_kv_list (rspamd_mempool_t * pool, - gchar * chunk, - gint len, - struct map_cb_data *data, - insert_func func) +static void +file_callback (gint fd, short what, void *ud) { - gchar *c, *p, *key = NULL, *value = NULL; - - p = chunk; - c = p; + struct rspamd_map *map = ud; + struct file_map_data *data = map->map_data; + struct stat st; + gdouble jittered_sec; - while (p - chunk < len) { - switch (data->state) { - case 0: - /* read key */ - /* Check here comments, eol and end of buffer */ - if (*p == '#') { - if (key != NULL && p - c >= 0) { - value = rspamd_mempool_alloc (pool, p - c + 1); - memcpy (value, c, p - c); - value[p - c] = '\0'; - value = g_strstrip (value); - func (data->cur_data, key, value); - msg_debug ("insert kv pair: %s -> %s", key, value); - } - data->state = 99; - } - else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) { - if (key != NULL && p - c >= 0) { - value = rspamd_mempool_alloc (pool, p - c + 1); - memcpy (value, c, p - c); - value[p - c] = '\0'; + /* Plan event again with jitter */ + evtimer_del (&map->ev); + jittered_sec = + (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout); + double_to_tv (jittered_sec, &map->tv); - value = g_strstrip (value); - func (data->cur_data, key, value); - msg_debug ("insert kv pair: %s -> %s", key, value); - } - else if (key == NULL && p - c > 0) { - /* Key only line */ - key = rspamd_mempool_alloc (pool, p - c + 1); - memcpy (key, c, p - c); - key[p - c] = '\0'; - value = rspamd_mempool_alloc (pool, 1); - *value = '\0'; - func (data->cur_data, key, value); - msg_debug ("insert kv pair: %s -> %s", key, value); - } - data->state = 100; - key = NULL; - } - else if (g_ascii_isspace (*p)) { - if (p - c > 0) { - key = rspamd_mempool_alloc (pool, p - c + 1); - memcpy (key, c, p - c); - key[p - c] = '\0'; - data->state = 2; - } - else { - key = NULL; - } - } - else { - p++; - } - break; - case 2: - /* Skip spaces before value */ - if (!g_ascii_isspace (*p)) { - c = p; - data->state = 0; - } - else { - p++; - } - break; - case 99: - /* SKIP_COMMENT */ - /* Skip comment till end of line */ - if (*p == '\r' || *p == '\n') { - while ((*p == '\r' || *p == '\n') && p - chunk < len) { - p++; - } - c = p; - key = NULL; - data->state = 0; - } - else { - p++; - } - break; - case 100: - /* Skip \r\n and whitespaces */ - if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) { - p++; - } - else { - c = p; - key = NULL; - data->state = 0; - } - break; - } - } + evtimer_add (&map->ev, &map->tv); - return c; -} - -gchar * -abstract_parse_list (rspamd_mempool_t * pool, - gchar * chunk, - gint len, - struct map_cb_data *data, - insert_func func) -{ - gchar *s, *p, *str, *start; - - p = chunk; - start = p; - - str = g_malloc (len + 1); - s = str; - - while (p - chunk < len) { - switch (data->state) { - /* READ_SYMBOL */ - case 0: - if (*p == '#') { - /* Got comment */ - if (s != str) { - /* Save previous string in lines like: "127.0.0.1 #localhost" */ - *s = '\0'; - s = rspamd_mempool_strdup (pool, g_strstrip (str)); - if (strlen (s) > 0) { - func (data->cur_data, s, hash_fill); - } - s = str; - start = p; - } - data->state = 1; - } - else if (*p == '\r' || *p == '\n') { - /* Got EOL marker, save stored string */ - if (s != str) { - *s = '\0'; - s = rspamd_mempool_strdup (pool, g_strstrip (str)); - if (strlen (s) > 0) { - func (data->cur_data, s, hash_fill); - } - s = str; - } - /* Skip EOL symbols */ - while ((*p == '\r' || *p == '\n') && p - chunk < len) { - p++; - } - start = p; - } - else { - /* Store new string in s */ - *s = *p; - s++; - p++; - } - break; - /* SKIP_COMMENT */ - case 1: - /* Skip comment till end of line */ - if (*p == '\r' || *p == '\n') { - while ((*p == '\r' || *p == '\n') && p - chunk < len) { - p++; - } - s = str; - start = p; - data->state = 0; - } - else { - p++; - } - break; - } - } - - g_free (str); - - return start; -} - -/** - * Radix tree helper function - */ -static void -radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value) -{ - radix_tree_t *tree = st; - - guint32 mask = 0xFFFFFFFF; - guint32 ip; - gchar *token, *ipnet, *err_str, **strv, **cur; - struct in_addr ina; - gint k; - - /* Split string if there are multiple items inside a single string */ - strv = g_strsplit_set ((gchar *)key, " ,;", 0); - cur = strv; - while (*cur) { - if (**cur == '\0') { - cur++; - continue; - } - /* Extract ipnet */ - ipnet = *cur; - token = strsep (&ipnet, "/"); - - if (ipnet != NULL) { - errno = 0; - /* Get mask */ - k = strtoul (ipnet, &err_str, 10); - if (errno != 0) { - msg_warn ( - "invalid netmask, error detected on symbol: %s, erorr: %s", - err_str, - strerror (errno)); - k = 32; - } - else if (k > 32 || k < 0) { - msg_warn ("invalid netmask value: %d", k); - k = 32; - } - /* Calculate mask based on CIDR presentation */ - mask = mask << (32 - k); - } - - /* Check IP */ - if (inet_aton (token, &ina) == 0) { - msg_err ("invalid ip address: %s", token); - return; - } - - /* Insert ip in a tree */ - ip = ntohl ((guint32) ina.s_addr); - k = radix32tree_insert (tree, ip, mask, 1); - if (k == -1) { - msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa ( - ina), mask); - } - else if (k == 1) { - msg_warn ("ip %s, mask %X, value already exists", inet_ntoa ( - ina), mask); - } - cur++; - } - - g_strfreev (strv); -} - -/* Helpers */ -gchar * -read_host_list (rspamd_mempool_t * pool, - gchar * chunk, - gint len, - struct map_cb_data *data) -{ - if (data->cur_data == NULL) { - data->cur_data = g_hash_table_new (rspamd_strcase_hash, - rspamd_strcase_equal); - } - return abstract_parse_list (pool, - chunk, - len, - data, - (insert_func) g_hash_table_insert); -} - -void -fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data) -{ - if (data->prev_data) { - g_hash_table_destroy (data->prev_data); - } -} - -gchar * -read_kv_list (rspamd_mempool_t * pool, - gchar * chunk, - gint len, - struct map_cb_data *data) -{ - if (data->cur_data == NULL) { - data->cur_data = g_hash_table_new (rspamd_strcase_hash, - rspamd_strcase_equal); + if (g_atomic_int_get (map->locked)) { + msg_info ( + "don't try to reread map as it is locked by other process, will reread it later"); + return; } - return abstract_parse_kv_list (pool, - chunk, - len, - data, - (insert_func) g_hash_table_insert); -} -void -fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data) -{ - if (data->prev_data) { - g_hash_table_destroy (data->prev_data); + if (stat (data->filename, + &st) != -1 && + (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) { + /* File was modified since last check */ + memcpy (&data->st, &st, sizeof (struct stat)); } -} - -gchar * -read_radix_list (rspamd_mempool_t * pool, - gchar * chunk, - gint len, - struct map_cb_data *data) -{ - if (data->cur_data == NULL) { - data->cur_data = radix_tree_create (); + else { + return; } - return abstract_parse_list (pool, - chunk, - len, - data, - (insert_func) radix_tree_insert_helper); -} -void -fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data) -{ - if (data->prev_data) { - radix_tree_free (data->prev_data); - } + msg_info ("rereading map file %s", data->filename); + read_map_file (map, data); } /** - * Common file callback + * Async HTTP callback */ static void -file_callback (gint fd, short what, void *ud) +http_callback (gint fd, short what, void *ud) { struct rspamd_map *map = ud; - struct file_map_data *data = map->map_data; - struct stat st; + struct http_map_data *data = map->map_data; + gint sock; + struct http_callback_data *cbd; gdouble jittered_sec; /* Plan event again with jitter */ @@ -848,7 +327,6 @@ file_callback (gint fd, short what, void *ud) jittered_sec = (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout); double_to_tv (jittered_sec, &map->tv); - evtimer_add (&map->ev, &map->tv); if (g_atomic_int_get (map->locked)) { @@ -857,173 +335,27 @@ file_callback (gint fd, short what, void *ud) return; } - if (stat (data->filename, - &st) != -1 && - (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) { - /* File was modified since last check */ - memcpy (&data->st, &st, sizeof (struct stat)); - } - else { - return; - } - - msg_info ("rereading map file %s", data->filename); - read_map_file (map, data); -} - -/** - * Callback for destroying HTTP callback data - */ -static void -free_http_cbdata (struct http_callback_data *cbd) -{ - if (cbd->reply) { - g_hash_table_destroy (cbd->reply->headers); - g_free (cbd->reply); - } - g_atomic_int_set (cbd->map->locked, 0); - event_del (&cbd->ev); - close (cbd->fd); - g_free (cbd); -} - -/** - * Async HTTP request parser - */ -static void -http_async_callback (gint fd, short what, void *ud) -{ - struct http_callback_data *cbd = ud; - - /* Begin of connection */ - if (what == EV_WRITE) { - if (cbd->state == 0) { - /* Can write request */ - write_http_request (cbd->map, cbd->data, fd); - /* Plan reading */ - event_set (&cbd->ev, - cbd->fd, - EV_READ | EV_PERSIST, - http_async_callback, - cbd); - event_base_set (cbd->ev_base, &cbd->ev); - cbd->tv.tv_sec = HTTP_READ_TIMEOUT; - cbd->tv.tv_usec = 0; - cbd->state = 1; - /* Allocate reply structure */ - cbd->reply = g_malloc (sizeof (struct http_reply)); - cbd->reply->parser_state = 0; - cbd->reply->code = 404; - cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash, - rspamd_strcase_equal, - g_free, - g_free); - cbd->cbdata.state = 0; - cbd->cbdata.prev_data = *cbd->map->user_data; - cbd->cbdata.cur_data = NULL; - cbd->cbdata.map = cbd->map; - cbd->data->rlen = 0; - cbd->data->chunk = 0; - cbd->data->chunk_remain = 0; - cbd->data->chunked = FALSE; - cbd->data->read_buf[0] = '\0'; - - event_add (&cbd->ev, &cbd->tv); - } - else { - msg_err ("bad state when got write readiness"); - free_http_cbdata (cbd); - return; - } - } - /* Got reply, parse it */ - else if (what == EV_READ) { - if (cbd->state >= 1) { - if (!read_http_common (cbd->map, cbd->data, cbd->reply, - &cbd->cbdata, cbd->fd)) { - /* Handle Not-Modified in a special way */ - if (cbd->reply->code == 304) { - if (cbd->data->last_checked == (time_t)-1) { - cbd->data->last_checked = time (NULL); - } - msg_info ("data is not modified for server %s", - cbd->data->host); - } - else if (cbd->cbdata.cur_data != NULL) { - /* Destroy old data and start reading request data */ - cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata); - *cbd->map->user_data = cbd->cbdata.cur_data; - if (cbd->data->last_checked == (time_t)-1) { - cbd->data->last_checked = time (NULL); - } - } - if (cbd->state == 1 && cbd->reply->code == 200) { - /* Write to log that data is modified */ - msg_info ("rereading map data from %s", cbd->data->host); - } - - free_http_cbdata (cbd); - return; - } - else if (cbd->state == 1) { - /* Write to log that data is modified */ - msg_info ("rereading map data from %s", cbd->data->host); - } - cbd->state = 2; - } - } - else { - msg_err ("connection with http server terminated incorrectly"); - free_http_cbdata (cbd); - } -} - -/** - * Async HTTP callback - */ -static void -http_callback (gint fd, short what, void *ud) -{ - struct rspamd_map *map = ud; - struct http_map_data *data = map->map_data; - gint sock; - struct http_callback_data *cbd; - gdouble jittered_sec; - - /* Plan event again with jitter */ - evtimer_del (&map->ev); - jittered_sec = - (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout); - double_to_tv (jittered_sec, &map->tv); - evtimer_add (&map->ev, &map->tv); - - if (g_atomic_int_get (map->locked)) { - msg_info ( - "don't try to reread map as it is locked by other process, will reread it later"); - return; - } - - g_atomic_int_inc (map->locked); - - /* Connect asynced */ - if ((sock = connect_http (map, data, TRUE)) == -1) { - g_atomic_int_set (map->locked, 0); + g_atomic_int_inc (map->locked); + + /* Connect asynced */ + if ((sock = connect_http (map, data, TRUE)) == -1) { + g_atomic_int_set (map->locked, 0); return; } else { /* Plan event */ cbd = g_malloc (sizeof (struct http_callback_data)); cbd->ev_base = map->ev_base; - event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd); - event_base_set (cbd->ev_base, &cbd->ev); + cbd->map = map; + cbd->cbdata.state = 0; + cbd->cbdata.prev_data = *cbd->map->user_data; + cbd->cbdata.cur_data = NULL; + cbd->cbdata.map = cbd->map; cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT; cbd->tv.tv_usec = 0; - cbd->map = map; - cbd->data = data; - cbd->state = 0; - cbd->fd = sock; - cbd->reply = NULL; - event_add (&cbd->ev, &cbd->tv); + data->conn->ud = cbd; + msg_info ("rereading map data from %s", data->host); + write_http_request (map, data, sock, &cbd->tv); } } @@ -1059,13 +391,8 @@ start_map_watch (struct rspamd_config *cfg, struct event_base *ev_base) else if (map->protocol == MAP_PROTO_HTTP) { evtimer_set (&map->ev, http_callback, map); event_base_set (map->ev_base, &map->ev); - /* Read initial data */ - read_http_sync (map, map->map_data); - /* Plan event with jitter */ - jittered_sec = - (map->cfg->map_timeout + g_random_double () * - map->cfg->map_timeout); - double_to_tv (jittered_sec, &map->tv); + map->tv.tv_sec = 0; + map->tv.tv_usec = 0; evtimer_add (&map->ev, &map->tv); } cur = g_list_next (cur); @@ -1216,7 +543,6 @@ add_map (struct rspamd_config *cfg, hdata->host = rspamd_mempool_alloc (cfg->map_pool, hostend - def + 1); rspamd_strlcpy (hdata->host, def, hostend - def + 1); hdata->path = rspamd_mempool_strdup (cfg->map_pool, p); - hdata->rlen = 0; /* Now try to resolve */ memset (&hints, 0, sizeof (hints)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ @@ -1247,6 +573,8 @@ add_map (struct rspamd_config *cfg, return FALSE; } close (s); + hdata->conn = rspamd_http_connection_new (http_map_read, http_map_error, + http_map_finish, RSPAMD_HTTP_BODY_PARTIAL, RSPAMD_HTTP_CLIENT); new_map->map_data = hdata; } /* Temp pool */ @@ -1256,3 +584,337 @@ add_map (struct rspamd_config *cfg, return TRUE; } + + +/** + * FSM for parsing lists + */ +gchar * +abstract_parse_kv_list (rspamd_mempool_t * pool, + gchar * chunk, + gint len, + struct map_cb_data *data, + insert_func func) +{ + gchar *c, *p, *key = NULL, *value = NULL; + + p = chunk; + c = p; + + while (p - chunk < len) { + switch (data->state) { + case 0: + /* read key */ + /* Check here comments, eol and end of buffer */ + if (*p == '#') { + if (key != NULL && p - c >= 0) { + value = rspamd_mempool_alloc (pool, p - c + 1); + memcpy (value, c, p - c); + value[p - c] = '\0'; + value = g_strstrip (value); + func (data->cur_data, key, value); + msg_debug ("insert kv pair: %s -> %s", key, value); + } + data->state = 99; + } + else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) { + if (key != NULL && p - c >= 0) { + value = rspamd_mempool_alloc (pool, p - c + 1); + memcpy (value, c, p - c); + value[p - c] = '\0'; + + value = g_strstrip (value); + func (data->cur_data, key, value); + msg_debug ("insert kv pair: %s -> %s", key, value); + } + else if (key == NULL && p - c > 0) { + /* Key only line */ + key = rspamd_mempool_alloc (pool, p - c + 1); + memcpy (key, c, p - c); + key[p - c] = '\0'; + value = rspamd_mempool_alloc (pool, 1); + *value = '\0'; + func (data->cur_data, key, value); + msg_debug ("insert kv pair: %s -> %s", key, value); + } + data->state = 100; + key = NULL; + } + else if (g_ascii_isspace (*p)) { + if (p - c > 0) { + key = rspamd_mempool_alloc (pool, p - c + 1); + memcpy (key, c, p - c); + key[p - c] = '\0'; + data->state = 2; + } + else { + key = NULL; + } + } + else { + p++; + } + break; + case 2: + /* Skip spaces before value */ + if (!g_ascii_isspace (*p)) { + c = p; + data->state = 0; + } + else { + p++; + } + break; + case 99: + /* SKIP_COMMENT */ + /* Skip comment till end of line */ + if (*p == '\r' || *p == '\n') { + while ((*p == '\r' || *p == '\n') && p - chunk < len) { + p++; + } + c = p; + key = NULL; + data->state = 0; + } + else { + p++; + } + break; + case 100: + /* Skip \r\n and whitespaces */ + if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) { + p++; + } + else { + c = p; + key = NULL; + data->state = 0; + } + break; + } + } + + return c; +} + +gchar * +abstract_parse_list (rspamd_mempool_t * pool, + gchar * chunk, + gint len, + struct map_cb_data *data, + insert_func func) +{ + gchar *s, *p, *str, *start; + + p = chunk; + start = p; + + str = g_malloc (len + 1); + s = str; + + while (p - chunk < len) { + switch (data->state) { + /* READ_SYMBOL */ + case 0: + if (*p == '#') { + /* Got comment */ + if (s != str) { + /* Save previous string in lines like: "127.0.0.1 #localhost" */ + *s = '\0'; + s = rspamd_mempool_strdup (pool, g_strstrip (str)); + if (strlen (s) > 0) { + func (data->cur_data, s, hash_fill); + } + s = str; + start = p; + } + data->state = 1; + } + else if (*p == '\r' || *p == '\n') { + /* Got EOL marker, save stored string */ + if (s != str) { + *s = '\0'; + s = rspamd_mempool_strdup (pool, g_strstrip (str)); + if (strlen (s) > 0) { + func (data->cur_data, s, hash_fill); + } + s = str; + } + /* Skip EOL symbols */ + while ((*p == '\r' || *p == '\n') && p - chunk < len) { + p++; + } + start = p; + } + else { + /* Store new string in s */ + *s = *p; + s++; + p++; + } + break; + /* SKIP_COMMENT */ + case 1: + /* Skip comment till end of line */ + if (*p == '\r' || *p == '\n') { + while ((*p == '\r' || *p == '\n') && p - chunk < len) { + p++; + } + s = str; + start = p; + data->state = 0; + } + else { + p++; + } + break; + } + } + + g_free (str); + + return start; +} + +/** + * Radix tree helper function + */ +static void +radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value) +{ + radix_tree_t *tree = st; + + guint32 mask = 0xFFFFFFFF; + guint32 ip; + gchar *token, *ipnet, *err_str, **strv, **cur; + struct in_addr ina; + gint k; + + /* Split string if there are multiple items inside a single string */ + strv = g_strsplit_set ((gchar *)key, " ,;", 0); + cur = strv; + while (*cur) { + if (**cur == '\0') { + cur++; + continue; + } + /* Extract ipnet */ + ipnet = *cur; + token = strsep (&ipnet, "/"); + + if (ipnet != NULL) { + errno = 0; + /* Get mask */ + k = strtoul (ipnet, &err_str, 10); + if (errno != 0) { + msg_warn ( + "invalid netmask, error detected on symbol: %s, erorr: %s", + err_str, + strerror (errno)); + k = 32; + } + else if (k > 32 || k < 0) { + msg_warn ("invalid netmask value: %d", k); + k = 32; + } + /* Calculate mask based on CIDR presentation */ + mask = mask << (32 - k); + } + + /* Check IP */ + if (inet_aton (token, &ina) == 0) { + msg_err ("invalid ip address: %s", token); + return; + } + + /* Insert ip in a tree */ + ip = ntohl ((guint32) ina.s_addr); + k = radix32tree_insert (tree, ip, mask, 1); + if (k == -1) { + msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa ( + ina), mask); + } + else if (k == 1) { + msg_warn ("ip %s, mask %X, value already exists", inet_ntoa ( + ina), mask); + } + cur++; + } + + g_strfreev (strv); +} + +/* Helpers */ +gchar * +read_host_list (rspamd_mempool_t * pool, + gchar * chunk, + gint len, + struct map_cb_data *data) +{ + if (data->cur_data == NULL) { + data->cur_data = g_hash_table_new (rspamd_strcase_hash, + rspamd_strcase_equal); + } + return abstract_parse_list (pool, + chunk, + len, + data, + (insert_func) g_hash_table_insert); +} + +void +fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data) +{ + if (data->prev_data) { + g_hash_table_destroy (data->prev_data); + } +} + +gchar * +read_kv_list (rspamd_mempool_t * pool, + gchar * chunk, + gint len, + struct map_cb_data *data) +{ + if (data->cur_data == NULL) { + data->cur_data = g_hash_table_new (rspamd_strcase_hash, + rspamd_strcase_equal); + } + return abstract_parse_kv_list (pool, + chunk, + len, + data, + (insert_func) g_hash_table_insert); +} + +void +fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data) +{ + if (data->prev_data) { + g_hash_table_destroy (data->prev_data); + } +} + +gchar * +read_radix_list (rspamd_mempool_t * pool, + gchar * chunk, + gint len, + struct map_cb_data *data) +{ + if (data->cur_data == NULL) { + data->cur_data = radix_tree_create (); + } + return abstract_parse_list (pool, + chunk, + len, + data, + (insert_func) radix_tree_insert_helper); +} + +void +fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data) +{ + if (data->prev_data) { + radix_tree_free (data->prev_data); + } +} diff --git a/src/libutil/map.h b/src/libutil/map.h index 1b5c01f1e..c0f1adc2d 100644 --- a/src/libutil/map.h +++ b/src/libutil/map.h @@ -15,31 +15,6 @@ enum fetch_proto { MAP_PROTO_FILE, MAP_PROTO_HTTP, }; - -/** - * Data specific to file maps - */ -struct file_map_data { - const gchar *filename; - struct stat st; -}; - -/** - * Data specific to HTTP maps - */ -struct http_map_data { - struct addrinfo *addr; - guint16 port; - gchar *path; - gchar *host; - time_t last_checked; - gshort chunked; - gchar read_buf[BUFSIZ]; - guint32 rlen; - guint32 chunk; - guint32 chunk_remain; -}; - struct map_cb_data; /** -- cgit v1.2.3