]> source.dussan.org Git - rspamd.git/commitdiff
Rework maps to work with http client.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 20 Aug 2014 13:43:28 +0000 (14:43 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 20 Aug 2014 13:43:28 +0000 (14:43 +0100)
src/libutil/map.c
src/libutil/map.h

index 710dcdebcf7de54707641d883b7275bd4468151a..4678aa6dc0be45e3d938d365b33f2ca4716800dd 100644 (file)
 
 static const gchar *hash_fill = "1";
 
-/* Http reply */
-struct http_reply {
-       gint code;
-       GHashTable *headers;
-       gchar *cur_header;
-       gint parser_state;
+/**
+ * Data specific to file maps
+ */
+struct file_map_data {
+       const gchar *filename;
+       struct stat st;
+};
+
+/**
+ * Data specific to HTTP maps
+ */
+struct http_map_data {
+       struct addrinfo *addr;
+       guint16 port;
+       gchar *path;
+       gchar *host;
+       time_t last_checked;
+       gboolean request_sent;
+       struct rspamd_http_connection *conn;
 };
 
+
 struct http_callback_data {
-       struct event ev;
        struct event_base *ev_base;
        struct timeval tv;
        struct rspamd_map *map;
        struct http_map_data *data;
-       struct http_reply *reply;
        struct map_cb_data cbdata;
 
-       gint state;
+       GString *remain_buf;
+
        gint fd;
 };
 
@@ -88,370 +101,129 @@ connect_http (struct rspamd_map *map,
 static void
 write_http_request (struct rspamd_map *map,
        struct http_map_data *data,
-       gint sock)
+       gint sock,
+       struct timeval *tv)
 {
-       gchar outbuf[BUFSIZ], datebuf[128];
-       gint r;
+       gchar datebuf[128];
        struct tm *tm;
+       struct rspamd_http_message *msg;
 
-       tm = gmtime (&data->last_checked);
-       strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm);
-       r = rspamd_snprintf (outbuf,
-                       sizeof (outbuf),
-                       "GET %s%s HTTP/1.1" CRLF "Connection: close" CRLF "Host: %s" CRLF,
-                       (*data->path == '/') ? "" : "/",
-                       data->path,
-                       data->host);
-       if (data->last_checked != 0) {
-               r += rspamd_snprintf (outbuf + r,
-                               sizeof (outbuf) - r,
-                               "If-Modified-Since: %s" CRLF,
-                               datebuf);
-       }
+       msg = rspamd_http_new_message (HTTP_REQUEST);
 
-       r += rspamd_snprintf (outbuf + r, sizeof (outbuf) - r, CRLF);
+       msg->url = g_string_new (data->path);
+       if (data->last_checked != 0) {
+               tm = gmtime (&data->last_checked);
+               strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm);
 
-       if (write (sock, outbuf, r) == -1) {
-               msg_err ("failed to write request: %d, %s", errno, strerror (errno));
+               rspamd_http_message_add_header (msg, "If-Modified-Since", datebuf);
        }
+
+       rspamd_http_connection_write_message (data->conn, msg, data->host, NULL,
+               map, sock, tv, map->ev_base);
 }
 
 /**
- * FSM for parsing HTTP reply
+ * Callback for destroying HTTP callback data
  */
-static gchar *
-parse_http_reply (gchar * chunk, gint len, struct http_reply *reply)
+static void
+free_http_cbdata (struct http_callback_data *cbd)
 {
-       gchar *s, *p, *err_str, *tmp;
-       p = chunk;
-       s = chunk;
-
-       while (p - chunk < len) {
-               switch (reply->parser_state) {
-               /* Search status code */
-               case 0:
-                       /* Search for status code */
-                       if (*p != ' ') {
-                               p++;
-                       }
-                       else {
-                               /* Try to parse HTTP reply code */
-                               reply->code = strtoul (++p, (gchar **)&err_str, 10);
-                               if (*err_str != ' ') {
-                                       msg_info ("error while reading HTTP status code: %s", p);
-                                       return NULL;
-                               }
-                               /* Now skip to end of status string */
-                               reply->parser_state = 1;
-                               continue;
-                       }
-                       break;
-               /* Skip to end of line */
-               case 1:
-                       if (*p == '\n') {
-                               /* Switch to read header state */
-                               reply->parser_state = 2;
-                       }
-                       /* Each skipped symbol is proceeded */
-                       s = ++p;
-                       break;
-               /* Read header value */
-               case 2:
-                       if (*p == ':') {
-                               reply->cur_header = g_malloc (p - s + 1);
-                               rspamd_strlcpy (reply->cur_header, s, p - s + 1);
-                               reply->parser_state = 3;
-                       }
-                       else if (*p == '\r' && *(p + 1) == '\n') {
-                               /* Last empty line */
-                               reply->parser_state = 5;
-                       }
-                       p++;
-                       break;
-               /* Skip spaces after header name */
-               case 3:
-                       if (*p != ' ') {
-                               s = p;
-                               reply->parser_state = 4;
-                       }
-                       else {
-                               p++;
-                       }
-                       break;
-               /* Read header value */
-               case 4:
-                       if (*p == '\r') {
-                               if (reply->cur_header != NULL) {
-                                       tmp = g_malloc (p - s + 1);
-                                       rspamd_strlcpy (tmp, s, p - s + 1);
-                                       g_hash_table_insert (reply->headers, reply->cur_header,
-                                               tmp);
-                                       reply->cur_header = NULL;
-                               }
-                               reply->parser_state = 1;
-                       }
-                       p++;
-                       break;
-               case 5:
-                       /* Set pointer to begining of HTTP body */
-                       p++;
-                       s = p;
-                       reply->parser_state = 6;
-                       break;
-               case 6:
-                       /* Headers parsed, just return */
-                       return p;
-                       break;
-               }
+       g_atomic_int_set (cbd->map->locked, 0);
+       if (cbd->remain_buf) {
+               g_string_free (cbd->remain_buf, TRUE);
        }
 
-       return s;
+       rspamd_http_connection_reset (cbd->data->conn);
+       close (cbd->fd);
+       g_free (cbd);
 }
 
-/**
- * Read and parse chunked header
+/*
+ * HTTP callbacks
  */
-static gint
-read_chunk_header (gchar * buf, gint len, struct http_map_data *data)
+static void
+http_map_error (struct rspamd_http_connection *conn,
+       GError *err)
 {
-       gchar chunkbuf[32], *p, *c, *err_str;
-       gint skip = 0;
-
-       p = chunkbuf;
-       c = buf;
-       /* Find hex digits */
-       while (g_ascii_isxdigit (*c) && p - chunkbuf <
-               (gint)(sizeof (chunkbuf) - 1) && skip < len) {
-               *p++ = *c++;
-               skip++;
-       }
-       *p = '\0';
-       data->chunk = strtoul (chunkbuf, &err_str, 16);
-       if (*err_str != '\0') {
-               return -1;
-       }
-
-       /* Now skip to CRLF */
-       while (*c != '\n' && skip < len) {
-               c++;
-               skip++;
-       }
-       if (*c == '\n' && skip < len) {
-               skip++;
-               c++;
-       }
-       data->chunk_remain = data->chunk;
+       struct http_callback_data *cbd = conn->ud;
 
-       return skip;
+       msg_err ("connection with http server terminated incorrectly: %s",
+                       err->message);
+       free_http_cbdata (cbd);
 }
 
-/**
- * Helper callback for reading chunked reply
- */
-static gboolean
-read_http_chunked (gchar * buf,
-       size_t len,
-       struct rspamd_map *map,
-       struct http_map_data *data,
-       struct map_cb_data *cbdata)
+static int
+http_map_finish (struct rspamd_http_connection *conn,
+               struct rspamd_http_message *msg)
 {
-       gchar *p = buf, *remain;
-       gint skip = 0;
-
-       if (data->chunked == 1) {
-               /* Read first chunk data */
-               if ((skip = read_chunk_header (buf, len, data)) != -1) {
-                       p += skip;
-                       len -= skip;
-                       data->chunked = 2;
-               }
-               else {
-                       msg_info ("invalid chunked reply: %*s", (gint)len, buf);
-                       return FALSE;
-               }
-       }
-
-       if (data->chunk_remain == 0) {
-               /* Read another chunk */
-               if ((skip = read_chunk_header (buf, len, data)) != -1) {
-                       p += skip;
-                       len -= skip;
-               }
-               else {
-                       msg_info ("invalid chunked reply: %*s", (gint)len, buf);
-                       return FALSE;
-               }
-               if (data->chunk == 0) {
-                       return FALSE;
-               }
-       }
+       struct http_callback_data *cbd = conn->ud;
+       struct rspamd_map *map;
 
-       if (data->chunk_remain <= len ) {
-               /* Call callback and move remaining buffer */
-               remain = map->read_callback (map->pool, p, data->chunk_remain, cbdata);
-               if (remain != NULL && remain != p + data->chunk_remain) {
-                       /* Copy remaining buffer to start of buffer */
-                       data->rlen = len - (remain - p);
-                       memmove (buf, remain, data->rlen);
-                       data->chunk_remain -= data->rlen;
-               }
-               else {
-                       /* Copy other part */
-                       data->rlen = len - data->chunk_remain;
-                       if (data->rlen > 0) {
-                               memmove (buf, p + data->chunk_remain, data->rlen);
-                       }
-                       data->chunk_remain = 0;
+       if (msg->code == 200) {
+               map = cbd->map;
+               if (cbd->remain_buf != NULL) {
+                       map->read_callback (map->pool, cbd->remain_buf->str,
+                                       cbd->remain_buf->len, &cbd->cbdata);
                }
 
+               map->fin_callback (map->pool, &cbd->cbdata);
+               *map->user_data = cbd->cbdata.cur_data;
+               cbd->data->last_checked = msg->date;
        }
-       else {
-               /* Just read another portion of chunk */
-               data->chunk_remain -= len;
-               remain = map->read_callback (map->pool, p, len, cbdata);
-               if (remain != NULL && remain != p + len) {
-                       /* copy remaining buffer to start of buffer */
-                       data->rlen = len - (remain - p);
-                       memmove (buf, remain, data->rlen);
-               }
+       else if (msg->code == 304) {
+               msg_info ("data is not modified for server %s",
+                               cbd->data->host);
+               cbd->data->last_checked = msg->date;
        }
 
-       return TRUE;
-}
-
-/**
- * Callback for reading HTTP reply
- */
-static gboolean
-read_http_common (struct rspamd_map *map,
-       struct http_map_data *data,
-       struct http_reply *reply,
-       struct map_cb_data *cbdata,
-       gint fd)
-{
-       gchar *remain, *pos;
-       ssize_t r;
-       gchar *te, *date;
-
-       if ((r =
-               read (fd, data->read_buf + data->rlen, sizeof (data->read_buf) -
-               data->rlen)) > 0) {
-               r += data->rlen;
-               data->rlen = 0;
-               remain = parse_http_reply (data->read_buf, r, reply);
-               if (remain != NULL && remain != data->read_buf) {
-                       /* copy remaining data->read_buffer to start of data->read_buffer */
-                       data->rlen = r - (remain - data->read_buf);
-                       memmove (data->read_buf, remain, data->rlen);
-                       r = data->rlen;
-                       data->rlen = 0;
-               }
-               if (r <= 0) {
-                       return TRUE;
-               }
-               if (reply->parser_state == 6) {
-                       /* If reply header is parsed successfully, try to read further data */
-                       if (reply->code != 200 && reply->code != 304) {
-                               msg_err ("got error reply from server %s, %d",
-                                       data->host,
-                                       reply->code);
-                               return FALSE;
-                       }
-                       else if (reply->code == 304) {
-                               /* Do not read anything */
-                               return FALSE;
-                       }
-                       pos = data->read_buf;
-                       /* Check for chunked */
-                       if (data->chunked == 0) {
-                               if ((te =
-                                       g_hash_table_lookup (reply->headers,
-                                       "Transfer-Encoding")) != NULL) {
-                                       if (g_ascii_strcasecmp (te, "chunked") == 0) {
-                                               data->chunked = 1;
-                                       }
-                                       else {
-                                               data->chunked = -1;
-                                       }
-                               }
-                               else {
-                                       data->chunked = -1;
-                               }
-                       }
-                       /* Check for date */
-                       date = g_hash_table_lookup (reply->headers, "Date");
-                       if (date != NULL) {
-                               data->last_checked = rspamd_http_parse_date (date, -1);
-                       }
-                       else {
-                               data->last_checked = (time_t)-1;
-                       }
-
-                       if (data->chunked > 0) {
-                               return read_http_chunked (data->read_buf, r, map, data, cbdata);
-                       }
-                       /* Read more data */
-                       remain = map->read_callback (map->pool, pos, r, cbdata);
-                       if (remain != NULL && remain != pos + r) {
-                               /* copy remaining data->read_buffer to start of data->read_buffer */
-                               data->rlen = r - (remain - pos);
-                               memmove (pos, remain, data->rlen);
-                       }
-               }
-       }
-       else {
-               return FALSE;
-       }
+       free_http_cbdata (cbd);
 
-       return TRUE;
+       return 0;
 }
 
-/**
- * Sync read of HTTP reply
- */
-static void
-read_http_sync (struct rspamd_map *map, struct http_map_data *data)
+static int
+http_map_read (struct rspamd_http_connection *conn,
+       struct rspamd_http_message *msg,
+       const gchar *chunk,
+       gsize len)
 {
-       struct map_cb_data cbdata;
-       gint fd;
-       struct http_reply *repl;
-
-       if (map->read_callback == NULL || map->fin_callback == NULL) {
-               msg_err ("bad callback for reading map file");
-               return;
-       }
+       struct http_callback_data *cbd = conn->ud;
+       gchar *pos;
+       struct rspamd_map *map;
 
-       /* Connect synced */
-       if ((fd = connect_http (map, data, FALSE)) == -1) {
-               return;
+       if (msg->code != 200) {
+               /* Ignore not full replies */
+               return 0;
        }
-       write_http_request (map, data, fd);
-
-       cbdata.state = 0;
-       cbdata.map = map;
-       cbdata.prev_data = *map->user_data;
-       cbdata.cur_data = NULL;
 
-       repl = g_malloc (sizeof (struct http_reply));
-       repl->parser_state = 0;
-       repl->code = 404;
-       repl->headers = g_hash_table_new_full (rspamd_strcase_hash,
-                       rspamd_strcase_equal,
-                       g_free,
-                       g_free);
+       map = cbd->map;
+       if (cbd->remain_buf != NULL) {
+               /* We need to concatenate incoming buf with the remaining buf */
+               g_string_append_len (cbd->remain_buf, chunk, len);
 
-       while (read_http_common (map, data, repl, &cbdata, fd)) ;
+               pos = map->read_callback (map->pool, cbd->remain_buf->str,
+                               cbd->remain_buf->len, &cbd->cbdata);
 
-       close (fd);
+               /* All read */
+               if (pos == NULL) {
+                       g_string_free (cbd->remain_buf, TRUE);
+                       cbd->remain_buf = NULL;
+               }
+               else {
+                       /* Need to erase data processed */
+                       g_string_erase (cbd->remain_buf, 0, pos - cbd->remain_buf->str);
+               }
+       }
+       else {
+               pos = map->read_callback (map->pool, (gchar *)chunk, len, &cbd->cbdata);
 
-       map->fin_callback (map->pool, &cbdata);
-       *map->user_data = cbdata.cur_data;
-       if (data->last_checked == (time_t)-1) {
-               data->last_checked = time (NULL);
+               if (pos != NULL) {
+                       /* Store data in remain buf */
+                       cbd->remain_buf = g_string_new_len (pos, len - (pos - chunk));
+               }
        }
 
-       g_hash_table_destroy (repl->headers);
-       g_free (repl);
+       return 0;
 }
 
 /**
@@ -500,347 +272,54 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data)
 }
 
 /**
- * FSM for parsing lists
+ * Common file callback
  */
-gchar *
-abstract_parse_kv_list (rspamd_mempool_t * pool,
-       gchar * chunk,
-       gint len,
-       struct map_cb_data *data,
-       insert_func func)
+static void
+file_callback (gint fd, short what, void *ud)
 {
-       gchar *c, *p, *key = NULL, *value = NULL;
-
-       p = chunk;
-       c = p;
+       struct rspamd_map *map = ud;
+       struct file_map_data *data = map->map_data;
+       struct stat st;
+       gdouble jittered_sec;
 
-       while (p - chunk < len) {
-               switch (data->state) {
-               case 0:
-                       /* read key */
-                       /* Check here comments, eol and end of buffer */
-                       if (*p == '#') {
-                               if (key != NULL && p - c  >= 0) {
-                                       value = rspamd_mempool_alloc (pool, p - c + 1);
-                                       memcpy (value, c, p - c);
-                                       value[p - c] = '\0';
-                                       value = g_strstrip (value);
-                                       func (data->cur_data, key, value);
-                                       msg_debug ("insert kv pair: %s -> %s", key, value);
-                               }
-                               data->state = 99;
-                       }
-                       else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) {
-                               if (key != NULL && p - c >= 0) {
-                                       value = rspamd_mempool_alloc (pool, p - c + 1);
-                                       memcpy (value, c, p - c);
-                                       value[p - c] = '\0';
+       /* Plan event again with jitter */
+       evtimer_del (&map->ev);
+       jittered_sec =
+               (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
+       double_to_tv (jittered_sec, &map->tv);
 
-                                       value = g_strstrip (value);
-                                       func (data->cur_data, key, value);
-                                       msg_debug ("insert kv pair: %s -> %s", key, value);
-                               }
-                               else if (key == NULL && p - c > 0) {
-                                       /* Key only line */
-                                       key = rspamd_mempool_alloc (pool, p - c + 1);
-                                       memcpy (key, c, p - c);
-                                       key[p - c] = '\0';
-                                       value = rspamd_mempool_alloc (pool, 1);
-                                       *value = '\0';
-                                       func (data->cur_data, key, value);
-                                       msg_debug ("insert kv pair: %s -> %s", key, value);
-                               }
-                               data->state = 100;
-                               key = NULL;
-                       }
-                       else if (g_ascii_isspace (*p)) {
-                               if (p - c > 0) {
-                                       key = rspamd_mempool_alloc (pool, p - c + 1);
-                                       memcpy (key, c, p - c);
-                                       key[p - c] = '\0';
-                                       data->state = 2;
-                               }
-                               else {
-                                       key = NULL;
-                               }
-                       }
-                       else {
-                               p++;
-                       }
-                       break;
-               case 2:
-                       /* Skip spaces before value */
-                       if (!g_ascii_isspace (*p)) {
-                               c = p;
-                               data->state = 0;
-                       }
-                       else {
-                               p++;
-                       }
-                       break;
-               case 99:
-                       /* SKIP_COMMENT */
-                       /* Skip comment till end of line */
-                       if (*p == '\r' || *p == '\n') {
-                               while ((*p == '\r' || *p == '\n') && p - chunk < len) {
-                                       p++;
-                               }
-                               c = p;
-                               key = NULL;
-                               data->state = 0;
-                       }
-                       else {
-                               p++;
-                       }
-                       break;
-               case 100:
-                       /* Skip \r\n and whitespaces */
-                       if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) {
-                               p++;
-                       }
-                       else {
-                               c = p;
-                               key = NULL;
-                               data->state = 0;
-                       }
-                       break;
-               }
-       }
+       evtimer_add (&map->ev, &map->tv);
 
-       return c;
-}
-
-gchar *
-abstract_parse_list (rspamd_mempool_t * pool,
-       gchar * chunk,
-       gint len,
-       struct map_cb_data *data,
-       insert_func func)
-{
-       gchar *s, *p, *str, *start;
-
-       p = chunk;
-       start = p;
-
-       str = g_malloc (len + 1);
-       s = str;
-
-       while (p - chunk < len) {
-               switch (data->state) {
-               /* READ_SYMBOL */
-               case 0:
-                       if (*p == '#') {
-                               /* Got comment */
-                               if (s != str) {
-                                       /* Save previous string in lines like: "127.0.0.1 #localhost" */
-                                       *s = '\0';
-                                       s = rspamd_mempool_strdup (pool, g_strstrip (str));
-                                       if (strlen (s) > 0) {
-                                               func (data->cur_data, s, hash_fill);
-                                       }
-                                       s = str;
-                                       start = p;
-                               }
-                               data->state = 1;
-                       }
-                       else if (*p == '\r' || *p == '\n') {
-                               /* Got EOL marker, save stored string */
-                               if (s != str) {
-                                       *s = '\0';
-                                       s = rspamd_mempool_strdup (pool, g_strstrip (str));
-                                       if (strlen (s) > 0) {
-                                               func (data->cur_data, s, hash_fill);
-                                       }
-                                       s = str;
-                               }
-                               /* Skip EOL symbols */
-                               while ((*p == '\r' || *p == '\n') && p - chunk < len) {
-                                       p++;
-                               }
-                               start = p;
-                       }
-                       else {
-                               /* Store new string in s */
-                               *s = *p;
-                               s++;
-                               p++;
-                       }
-                       break;
-               /* SKIP_COMMENT */
-               case 1:
-                       /* Skip comment till end of line */
-                       if (*p == '\r' || *p == '\n') {
-                               while ((*p == '\r' || *p == '\n') && p - chunk < len) {
-                                       p++;
-                               }
-                               s = str;
-                               start = p;
-                               data->state = 0;
-                       }
-                       else {
-                               p++;
-                       }
-                       break;
-               }
-       }
-
-       g_free (str);
-
-       return start;
-}
-
-/**
- * Radix tree helper function
- */
-static void
-radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
-{
-       radix_tree_t *tree = st;
-
-       guint32 mask = 0xFFFFFFFF;
-       guint32 ip;
-       gchar *token, *ipnet, *err_str, **strv, **cur;
-       struct in_addr ina;
-       gint k;
-
-       /* Split string if there are multiple items inside a single string */
-       strv = g_strsplit_set ((gchar *)key, " ,;", 0);
-       cur = strv;
-       while (*cur) {
-               if (**cur == '\0') {
-                       cur++;
-                       continue;
-               }
-               /* Extract ipnet */
-               ipnet = *cur;
-               token = strsep (&ipnet, "/");
-
-               if (ipnet != NULL) {
-                       errno = 0;
-                       /* Get mask */
-                       k = strtoul (ipnet, &err_str, 10);
-                       if (errno != 0) {
-                               msg_warn (
-                                       "invalid netmask, error detected on symbol: %s, erorr: %s",
-                                       err_str,
-                                       strerror (errno));
-                               k = 32;
-                       }
-                       else if (k > 32 || k < 0) {
-                               msg_warn ("invalid netmask value: %d", k);
-                               k = 32;
-                       }
-                       /* Calculate mask based on CIDR presentation */
-                       mask = mask << (32 - k);
-               }
-
-               /* Check IP */
-               if (inet_aton (token, &ina) == 0) {
-                       msg_err ("invalid ip address: %s", token);
-                       return;
-               }
-
-               /* Insert ip in a tree */
-               ip = ntohl ((guint32) ina.s_addr);
-               k = radix32tree_insert (tree, ip, mask, 1);
-               if (k == -1) {
-                       msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa (
-                                       ina), mask);
-               }
-               else if (k == 1) {
-                       msg_warn ("ip %s, mask %X, value already exists", inet_ntoa (
-                                       ina), mask);
-               }
-               cur++;
-       }
-
-       g_strfreev (strv);
-}
-
-/* Helpers */
-gchar *
-read_host_list (rspamd_mempool_t * pool,
-       gchar * chunk,
-       gint len,
-       struct map_cb_data *data)
-{
-       if (data->cur_data == NULL) {
-               data->cur_data = g_hash_table_new (rspamd_strcase_hash,
-                               rspamd_strcase_equal);
-       }
-       return abstract_parse_list (pool,
-                          chunk,
-                          len,
-                          data,
-                          (insert_func) g_hash_table_insert);
-}
-
-void
-fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data)
-{
-       if (data->prev_data) {
-               g_hash_table_destroy (data->prev_data);
-       }
-}
-
-gchar *
-read_kv_list (rspamd_mempool_t * pool,
-       gchar * chunk,
-       gint len,
-       struct map_cb_data *data)
-{
-       if (data->cur_data == NULL) {
-               data->cur_data = g_hash_table_new (rspamd_strcase_hash,
-                               rspamd_strcase_equal);
+       if (g_atomic_int_get (map->locked)) {
+               msg_info (
+                       "don't try to reread map as it is locked by other process, will reread it later");
+               return;
        }
-       return abstract_parse_kv_list (pool,
-                          chunk,
-                          len,
-                          data,
-                          (insert_func) g_hash_table_insert);
-}
 
-void
-fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data)
-{
-       if (data->prev_data) {
-               g_hash_table_destroy (data->prev_data);
+       if (stat (data->filename,
+               &st) != -1 &&
+               (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
+               /* File was modified since last check */
+               memcpy (&data->st, &st, sizeof (struct stat));
        }
-}
-
-gchar *
-read_radix_list (rspamd_mempool_t * pool,
-       gchar * chunk,
-       gint len,
-       struct map_cb_data *data)
-{
-       if (data->cur_data == NULL) {
-               data->cur_data = radix_tree_create ();
+       else {
+               return;
        }
-       return abstract_parse_list (pool,
-                          chunk,
-                          len,
-                          data,
-                          (insert_func) radix_tree_insert_helper);
-}
 
-void
-fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data)
-{
-       if (data->prev_data) {
-               radix_tree_free (data->prev_data);
-       }
+       msg_info ("rereading map file %s", data->filename);
+       read_map_file (map, data);
 }
 
 /**
- * Common file callback
+ * Async HTTP callback
  */
 static void
-file_callback (gint fd, short what, void *ud)
+http_callback (gint fd, short what, void *ud)
 {
        struct rspamd_map *map = ud;
-       struct file_map_data *data = map->map_data;
-       struct stat st;
+       struct http_map_data *data = map->map_data;
+       gint sock;
+       struct http_callback_data *cbd;
        gdouble jittered_sec;
 
        /* Plan event again with jitter */
@@ -848,7 +327,6 @@ file_callback (gint fd, short what, void *ud)
        jittered_sec =
                (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
        double_to_tv (jittered_sec, &map->tv);
-
        evtimer_add (&map->ev, &map->tv);
 
        if (g_atomic_int_get (map->locked)) {
@@ -857,173 +335,27 @@ file_callback (gint fd, short what, void *ud)
                return;
        }
 
-       if (stat (data->filename,
-               &st) != -1 &&
-               (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
-               /* File was modified since last check */
-               memcpy (&data->st, &st, sizeof (struct stat));
-       }
-       else {
-               return;
-       }
-
-       msg_info ("rereading map file %s", data->filename);
-       read_map_file (map, data);
-}
-
-/**
- * Callback for destroying HTTP callback data
- */
-static void
-free_http_cbdata (struct http_callback_data *cbd)
-{
-       if (cbd->reply) {
-               g_hash_table_destroy (cbd->reply->headers);
-               g_free (cbd->reply);
-       }
-       g_atomic_int_set (cbd->map->locked, 0);
-       event_del (&cbd->ev);
-       close (cbd->fd);
-       g_free (cbd);
-}
-
-/**
- * Async HTTP request parser
- */
-static void
-http_async_callback (gint fd, short what, void *ud)
-{
-       struct http_callback_data *cbd = ud;
-
-       /* Begin of connection */
-       if (what == EV_WRITE) {
-               if (cbd->state == 0) {
-                       /* Can write request */
-                       write_http_request (cbd->map, cbd->data, fd);
-                       /* Plan reading */
-                       event_set (&cbd->ev,
-                               cbd->fd,
-                               EV_READ | EV_PERSIST,
-                               http_async_callback,
-                               cbd);
-                       event_base_set (cbd->ev_base, &cbd->ev);
-                       cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
-                       cbd->tv.tv_usec = 0;
-                       cbd->state = 1;
-                       /* Allocate reply structure */
-                       cbd->reply = g_malloc (sizeof (struct http_reply));
-                       cbd->reply->parser_state = 0;
-                       cbd->reply->code = 404;
-                       cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash,
-                                       rspamd_strcase_equal,
-                                       g_free,
-                                       g_free);
-                       cbd->cbdata.state = 0;
-                       cbd->cbdata.prev_data = *cbd->map->user_data;
-                       cbd->cbdata.cur_data = NULL;
-                       cbd->cbdata.map = cbd->map;
-                       cbd->data->rlen = 0;
-                       cbd->data->chunk = 0;
-                       cbd->data->chunk_remain = 0;
-                       cbd->data->chunked = FALSE;
-                       cbd->data->read_buf[0] = '\0';
-
-                       event_add (&cbd->ev, &cbd->tv);
-               }
-               else {
-                       msg_err ("bad state when got write readiness");
-                       free_http_cbdata (cbd);
-                       return;
-               }
-       }
-       /* Got reply, parse it */
-       else if (what == EV_READ) {
-               if (cbd->state >= 1) {
-                       if (!read_http_common (cbd->map, cbd->data, cbd->reply,
-                               &cbd->cbdata, cbd->fd)) {
-                               /* Handle Not-Modified in a special way */
-                               if (cbd->reply->code == 304) {
-                                       if (cbd->data->last_checked == (time_t)-1) {
-                                               cbd->data->last_checked = time (NULL);
-                                       }
-                                       msg_info ("data is not modified for server %s",
-                                               cbd->data->host);
-                               }
-                               else if (cbd->cbdata.cur_data != NULL) {
-                                       /* Destroy old data and start reading request data */
-                                       cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
-                                       *cbd->map->user_data = cbd->cbdata.cur_data;
-                                       if (cbd->data->last_checked == (time_t)-1) {
-                                               cbd->data->last_checked = time (NULL);
-                                       }
-                               }
-                               if (cbd->state == 1 && cbd->reply->code == 200) {
-                                       /* Write to log that data is modified */
-                                       msg_info ("rereading map data from %s", cbd->data->host);
-                               }
-
-                               free_http_cbdata (cbd);
-                               return;
-                       }
-                       else if (cbd->state == 1) {
-                               /* Write to log that data is modified */
-                               msg_info ("rereading map data from %s", cbd->data->host);
-                       }
-                       cbd->state = 2;
-               }
-       }
-       else {
-               msg_err ("connection with http server terminated incorrectly");
-               free_http_cbdata (cbd);
-       }
-}
-
-/**
- * Async HTTP callback
- */
-static void
-http_callback (gint fd, short what, void *ud)
-{
-       struct rspamd_map *map = ud;
-       struct http_map_data *data = map->map_data;
-       gint sock;
-       struct http_callback_data *cbd;
-       gdouble jittered_sec;
-
-       /* Plan event again with jitter */
-       evtimer_del (&map->ev);
-       jittered_sec =
-               (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
-       double_to_tv (jittered_sec, &map->tv);
-       evtimer_add (&map->ev, &map->tv);
-
-       if (g_atomic_int_get (map->locked)) {
-               msg_info (
-                       "don't try to reread map as it is locked by other process, will reread it later");
-               return;
-       }
-
-       g_atomic_int_inc (map->locked);
-
-       /* Connect asynced */
-       if ((sock = connect_http (map, data, TRUE)) == -1) {
-               g_atomic_int_set (map->locked, 0);
+       g_atomic_int_inc (map->locked);
+
+       /* Connect asynced */
+       if ((sock = connect_http (map, data, TRUE)) == -1) {
+               g_atomic_int_set (map->locked, 0);
                return;
        }
        else {
                /* Plan event */
                cbd = g_malloc (sizeof (struct http_callback_data));
                cbd->ev_base = map->ev_base;
-               event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
-               event_base_set (cbd->ev_base, &cbd->ev);
+               cbd->map = map;
+               cbd->cbdata.state = 0;
+               cbd->cbdata.prev_data = *cbd->map->user_data;
+               cbd->cbdata.cur_data = NULL;
+               cbd->cbdata.map = cbd->map;
                cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
                cbd->tv.tv_usec = 0;
-               cbd->map = map;
-               cbd->data = data;
-               cbd->state = 0;
-               cbd->fd = sock;
-               cbd->reply = NULL;
-               event_add (&cbd->ev, &cbd->tv);
+               data->conn->ud = cbd;
+               msg_info ("rereading map data from %s", data->host);
+               write_http_request (map, data, sock, &cbd->tv);
        }
 }
 
@@ -1059,13 +391,8 @@ start_map_watch (struct rspamd_config *cfg, struct event_base *ev_base)
                else if (map->protocol == MAP_PROTO_HTTP) {
                        evtimer_set (&map->ev, http_callback, map);
                        event_base_set (map->ev_base, &map->ev);
-                       /* Read initial data */
-                       read_http_sync (map, map->map_data);
-                       /* Plan event with jitter */
-                       jittered_sec =
-                               (map->cfg->map_timeout + g_random_double () *
-                               map->cfg->map_timeout);
-                       double_to_tv (jittered_sec, &map->tv);
+                       map->tv.tv_sec = 0;
+                       map->tv.tv_usec = 0;
                        evtimer_add (&map->ev, &map->tv);
                }
                cur = g_list_next (cur);
@@ -1216,7 +543,6 @@ add_map (struct rspamd_config *cfg,
                hdata->host = rspamd_mempool_alloc (cfg->map_pool, hostend - def + 1);
                rspamd_strlcpy (hdata->host, def, hostend - def + 1);
                hdata->path = rspamd_mempool_strdup (cfg->map_pool, p);
-               hdata->rlen = 0;
                /* Now try to resolve */
                memset (&hints, 0, sizeof (hints));
                hints.ai_family = AF_UNSPEC;     /* Allow IPv4 or IPv6 */
@@ -1247,6 +573,8 @@ add_map (struct rspamd_config *cfg,
                        return FALSE;
                }
                close (s);
+               hdata->conn = rspamd_http_connection_new (http_map_read, http_map_error,
+                       http_map_finish, RSPAMD_HTTP_BODY_PARTIAL, RSPAMD_HTTP_CLIENT);
                new_map->map_data = hdata;
        }
        /* Temp pool */
@@ -1256,3 +584,337 @@ add_map (struct rspamd_config *cfg,
 
        return TRUE;
 }
+
+
+/**
+ * FSM for parsing lists
+ */
+gchar *
+abstract_parse_kv_list (rspamd_mempool_t * pool,
+       gchar * chunk,
+       gint len,
+       struct map_cb_data *data,
+       insert_func func)
+{
+       gchar *c, *p, *key = NULL, *value = NULL;
+
+       p = chunk;
+       c = p;
+
+       while (p - chunk < len) {
+               switch (data->state) {
+               case 0:
+                       /* read key */
+                       /* Check here comments, eol and end of buffer */
+                       if (*p == '#') {
+                               if (key != NULL && p - c  >= 0) {
+                                       value = rspamd_mempool_alloc (pool, p - c + 1);
+                                       memcpy (value, c, p - c);
+                                       value[p - c] = '\0';
+                                       value = g_strstrip (value);
+                                       func (data->cur_data, key, value);
+                                       msg_debug ("insert kv pair: %s -> %s", key, value);
+                               }
+                               data->state = 99;
+                       }
+                       else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) {
+                               if (key != NULL && p - c >= 0) {
+                                       value = rspamd_mempool_alloc (pool, p - c + 1);
+                                       memcpy (value, c, p - c);
+                                       value[p - c] = '\0';
+
+                                       value = g_strstrip (value);
+                                       func (data->cur_data, key, value);
+                                       msg_debug ("insert kv pair: %s -> %s", key, value);
+                               }
+                               else if (key == NULL && p - c > 0) {
+                                       /* Key only line */
+                                       key = rspamd_mempool_alloc (pool, p - c + 1);
+                                       memcpy (key, c, p - c);
+                                       key[p - c] = '\0';
+                                       value = rspamd_mempool_alloc (pool, 1);
+                                       *value = '\0';
+                                       func (data->cur_data, key, value);
+                                       msg_debug ("insert kv pair: %s -> %s", key, value);
+                               }
+                               data->state = 100;
+                               key = NULL;
+                       }
+                       else if (g_ascii_isspace (*p)) {
+                               if (p - c > 0) {
+                                       key = rspamd_mempool_alloc (pool, p - c + 1);
+                                       memcpy (key, c, p - c);
+                                       key[p - c] = '\0';
+                                       data->state = 2;
+                               }
+                               else {
+                                       key = NULL;
+                               }
+                       }
+                       else {
+                               p++;
+                       }
+                       break;
+               case 2:
+                       /* Skip spaces before value */
+                       if (!g_ascii_isspace (*p)) {
+                               c = p;
+                               data->state = 0;
+                       }
+                       else {
+                               p++;
+                       }
+                       break;
+               case 99:
+                       /* SKIP_COMMENT */
+                       /* Skip comment till end of line */
+                       if (*p == '\r' || *p == '\n') {
+                               while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+                                       p++;
+                               }
+                               c = p;
+                               key = NULL;
+                               data->state = 0;
+                       }
+                       else {
+                               p++;
+                       }
+                       break;
+               case 100:
+                       /* Skip \r\n and whitespaces */
+                       if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) {
+                               p++;
+                       }
+                       else {
+                               c = p;
+                               key = NULL;
+                               data->state = 0;
+                       }
+                       break;
+               }
+       }
+
+       return c;
+}
+
+gchar *
+abstract_parse_list (rspamd_mempool_t * pool,
+       gchar * chunk,
+       gint len,
+       struct map_cb_data *data,
+       insert_func func)
+{
+       gchar *s, *p, *str, *start;
+
+       p = chunk;
+       start = p;
+
+       str = g_malloc (len + 1);
+       s = str;
+
+       while (p - chunk < len) {
+               switch (data->state) {
+               /* READ_SYMBOL */
+               case 0:
+                       if (*p == '#') {
+                               /* Got comment */
+                               if (s != str) {
+                                       /* Save previous string in lines like: "127.0.0.1 #localhost" */
+                                       *s = '\0';
+                                       s = rspamd_mempool_strdup (pool, g_strstrip (str));
+                                       if (strlen (s) > 0) {
+                                               func (data->cur_data, s, hash_fill);
+                                       }
+                                       s = str;
+                                       start = p;
+                               }
+                               data->state = 1;
+                       }
+                       else if (*p == '\r' || *p == '\n') {
+                               /* Got EOL marker, save stored string */
+                               if (s != str) {
+                                       *s = '\0';
+                                       s = rspamd_mempool_strdup (pool, g_strstrip (str));
+                                       if (strlen (s) > 0) {
+                                               func (data->cur_data, s, hash_fill);
+                                       }
+                                       s = str;
+                               }
+                               /* Skip EOL symbols */
+                               while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+                                       p++;
+                               }
+                               start = p;
+                       }
+                       else {
+                               /* Store new string in s */
+                               *s = *p;
+                               s++;
+                               p++;
+                       }
+                       break;
+               /* SKIP_COMMENT */
+               case 1:
+                       /* Skip comment till end of line */
+                       if (*p == '\r' || *p == '\n') {
+                               while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+                                       p++;
+                               }
+                               s = str;
+                               start = p;
+                               data->state = 0;
+                       }
+                       else {
+                               p++;
+                       }
+                       break;
+               }
+       }
+
+       g_free (str);
+
+       return start;
+}
+
+/**
+ * Radix tree helper function
+ */
+static void
+radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
+{
+       radix_tree_t *tree = st;
+
+       guint32 mask = 0xFFFFFFFF;
+       guint32 ip;
+       gchar *token, *ipnet, *err_str, **strv, **cur;
+       struct in_addr ina;
+       gint k;
+
+       /* Split string if there are multiple items inside a single string */
+       strv = g_strsplit_set ((gchar *)key, " ,;", 0);
+       cur = strv;
+       while (*cur) {
+               if (**cur == '\0') {
+                       cur++;
+                       continue;
+               }
+               /* Extract ipnet */
+               ipnet = *cur;
+               token = strsep (&ipnet, "/");
+
+               if (ipnet != NULL) {
+                       errno = 0;
+                       /* Get mask */
+                       k = strtoul (ipnet, &err_str, 10);
+                       if (errno != 0) {
+                               msg_warn (
+                                       "invalid netmask, error detected on symbol: %s, erorr: %s",
+                                       err_str,
+                                       strerror (errno));
+                               k = 32;
+                       }
+                       else if (k > 32 || k < 0) {
+                               msg_warn ("invalid netmask value: %d", k);
+                               k = 32;
+                       }
+                       /* Calculate mask based on CIDR presentation */
+                       mask = mask << (32 - k);
+               }
+
+               /* Check IP */
+               if (inet_aton (token, &ina) == 0) {
+                       msg_err ("invalid ip address: %s", token);
+                       return;
+               }
+
+               /* Insert ip in a tree */
+               ip = ntohl ((guint32) ina.s_addr);
+               k = radix32tree_insert (tree, ip, mask, 1);
+               if (k == -1) {
+                       msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa (
+                                       ina), mask);
+               }
+               else if (k == 1) {
+                       msg_warn ("ip %s, mask %X, value already exists", inet_ntoa (
+                                       ina), mask);
+               }
+               cur++;
+       }
+
+       g_strfreev (strv);
+}
+
+/* Helpers */
+gchar *
+read_host_list (rspamd_mempool_t * pool,
+       gchar * chunk,
+       gint len,
+       struct map_cb_data *data)
+{
+       if (data->cur_data == NULL) {
+               data->cur_data = g_hash_table_new (rspamd_strcase_hash,
+                               rspamd_strcase_equal);
+       }
+       return abstract_parse_list (pool,
+                          chunk,
+                          len,
+                          data,
+                          (insert_func) g_hash_table_insert);
+}
+
+void
+fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+       if (data->prev_data) {
+               g_hash_table_destroy (data->prev_data);
+       }
+}
+
+gchar *
+read_kv_list (rspamd_mempool_t * pool,
+       gchar * chunk,
+       gint len,
+       struct map_cb_data *data)
+{
+       if (data->cur_data == NULL) {
+               data->cur_data = g_hash_table_new (rspamd_strcase_hash,
+                               rspamd_strcase_equal);
+       }
+       return abstract_parse_kv_list (pool,
+                          chunk,
+                          len,
+                          data,
+                          (insert_func) g_hash_table_insert);
+}
+
+void
+fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+       if (data->prev_data) {
+               g_hash_table_destroy (data->prev_data);
+       }
+}
+
+gchar *
+read_radix_list (rspamd_mempool_t * pool,
+       gchar * chunk,
+       gint len,
+       struct map_cb_data *data)
+{
+       if (data->cur_data == NULL) {
+               data->cur_data = radix_tree_create ();
+       }
+       return abstract_parse_list (pool,
+                          chunk,
+                          len,
+                          data,
+                          (insert_func) radix_tree_insert_helper);
+}
+
+void
+fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+       if (data->prev_data) {
+               radix_tree_free (data->prev_data);
+       }
+}
index 1b5c01f1e11e6603805d4e9aa605cdcb9badf877..c0f1adc2da4d202e2c63e32c2c84581b37d37dd0 100644 (file)
@@ -15,31 +15,6 @@ enum fetch_proto {
        MAP_PROTO_FILE,
        MAP_PROTO_HTTP,
 };
-
-/**
- * Data specific to file maps
- */
-struct file_map_data {
-       const gchar *filename;
-       struct stat st;
-};
-
-/**
- * Data specific to HTTP maps
- */
-struct http_map_data {
-       struct addrinfo *addr;
-       guint16 port;
-       gchar *path;
-       gchar *host;
-       time_t last_checked;
-       gshort chunked;
-       gchar read_buf[BUFSIZ];
-       guint32 rlen;
-       guint32 chunk;
-       guint32 chunk_remain;
-};
-
 struct map_cb_data;
 
 /**