aboutsummaryrefslogtreecommitdiffstats
path: root/src/libutil/map.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/map.c')
-rw-r--r--src/libutil/map.c1148
1 files changed, 1148 insertions, 0 deletions
diff --git a/src/libutil/map.c b/src/libutil/map.c
new file mode 100644
index 000000000..703622585
--- /dev/null
+++ b/src/libutil/map.c
@@ -0,0 +1,1148 @@
+/*
+ * Copyright (c) 2009-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Implementation of map files handling
+ */
+#include "config.h"
+#include "map.h"
+#include "http.h"
+#include "main.h"
+#include "util.h"
+#include "mem_pool.h"
+
+static const gchar *hash_fill = "1";
+
+/* Http reply */
+struct http_reply {
+ gint code;
+ GHashTable *headers;
+ gchar *cur_header;
+ gint parser_state;
+};
+
+struct http_callback_data {
+ struct event ev;
+ struct event_base *ev_base;
+ struct timeval tv;
+ struct rspamd_map *map;
+ struct http_map_data *data;
+ struct http_reply *reply;
+ struct map_cb_data cbdata;
+
+ gint state;
+ gint fd;
+};
+
+/* Value in seconds after whitch we would try to do stat on list file */
+
+/* HTTP timeouts */
+#define HTTP_CONNECT_TIMEOUT 2
+#define HTTP_READ_TIMEOUT 10
+
+/**
+ * Helper for HTTP connection establishment
+ */
+static gint
+connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_async)
+{
+ gint sock;
+
+ if ((sock = make_tcp_socket (data->addr, FALSE, is_async)) == -1) {
+ msg_info ("cannot connect to http server %s: %d, %s", data->host, errno, strerror (errno));
+ return -1;
+ }
+
+ return sock;
+}
+
+/**
+ * Write HTTP request
+ */
+static void
+write_http_request (struct rspamd_map *map, struct http_map_data *data, gint sock)
+{
+ gchar outbuf[BUFSIZ], datebuf[128];
+ gint r;
+ struct tm *tm;
+
+ tm = gmtime (&data->last_checked);
+ strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm);
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "GET %s%s HTTP/1.1" CRLF "Connection: close" CRLF "Host: %s" CRLF, (*data->path == '/') ? "" : "/", data->path, data->host);
+ if (data->last_checked != 0) {
+ r += rspamd_snprintf (outbuf + r, sizeof (outbuf) - r, "If-Modified-Since: %s" CRLF, datebuf);
+ }
+
+ r += rspamd_snprintf (outbuf + r, sizeof (outbuf) - r, CRLF);
+
+ if (write (sock, outbuf, r) == -1) {
+ msg_err ("failed to write request: %d, %s", errno, strerror (errno));
+ }
+}
+
+/**
+ * FSM for parsing HTTP reply
+ */
+static gchar *
+parse_http_reply (gchar * chunk, gint len, struct http_reply *reply)
+{
+ gchar *s, *p, *err_str, *tmp;
+ p = chunk;
+ s = chunk;
+
+ while (p - chunk < len) {
+ switch (reply->parser_state) {
+ /* Search status code */
+ case 0:
+ /* Search for status code */
+ if (*p != ' ') {
+ p++;
+ }
+ else {
+ /* Try to parse HTTP reply code */
+ reply->code = strtoul (++p, (gchar **)&err_str, 10);
+ if (*err_str != ' ') {
+ msg_info ("error while reading HTTP status code: %s", p);
+ return NULL;
+ }
+ /* Now skip to end of status string */
+ reply->parser_state = 1;
+ continue;
+ }
+ break;
+ /* Skip to end of line */
+ case 1:
+ if (*p == '\n') {
+ /* Switch to read header state */
+ reply->parser_state = 2;
+ }
+ /* Each skipped symbol is proceeded */
+ s = ++p;
+ break;
+ /* Read header value */
+ case 2:
+ if (*p == ':') {
+ reply->cur_header = g_malloc (p - s + 1);
+ rspamd_strlcpy (reply->cur_header, s, p - s + 1);
+ reply->parser_state = 3;
+ }
+ else if (*p == '\r' && *(p + 1) == '\n') {
+ /* Last empty line */
+ reply->parser_state = 5;
+ }
+ p++;
+ break;
+ /* Skip spaces after header name */
+ case 3:
+ if (*p != ' ') {
+ s = p;
+ reply->parser_state = 4;
+ }
+ else {
+ p++;
+ }
+ break;
+ /* Read header value */
+ case 4:
+ if (*p == '\r') {
+ if (reply->cur_header != NULL) {
+ tmp = g_malloc (p - s + 1);
+ rspamd_strlcpy (tmp, s, p - s + 1);
+ g_hash_table_insert (reply->headers, reply->cur_header, tmp);
+ reply->cur_header = NULL;
+ }
+ reply->parser_state = 1;
+ }
+ p++;
+ break;
+ case 5:
+ /* Set pointer to begining of HTTP body */
+ p++;
+ s = p;
+ reply->parser_state = 6;
+ break;
+ case 6:
+ /* Headers parsed, just return */
+ return p;
+ break;
+ }
+ }
+
+ return s;
+}
+
+/**
+ * Read and parse chunked header
+ */
+static gint
+read_chunk_header (gchar * buf, gint len, struct http_map_data *data)
+{
+ gchar chunkbuf[32], *p, *c, *err_str;
+ gint skip = 0;
+
+ p = chunkbuf;
+ c = buf;
+ /* Find hex digits */
+ while (g_ascii_isxdigit (*c) && p - chunkbuf < (gint)(sizeof (chunkbuf) - 1) && skip < len) {
+ *p++ = *c++;
+ skip++;
+ }
+ *p = '\0';
+ data->chunk = strtoul (chunkbuf, &err_str, 16);
+ if (*err_str != '\0') {
+ return -1;
+ }
+
+ /* Now skip to CRLF */
+ while (*c != '\n' && skip < len) {
+ c++;
+ skip++;
+ }
+ if (*c == '\n' && skip < len) {
+ skip++;
+ c++;
+ }
+ data->chunk_remain = data->chunk;
+
+ return skip;
+}
+
+/**
+ * Helper callback for reading chunked reply
+ */
+static gboolean
+read_http_chunked (gchar * buf, size_t len, struct rspamd_map *map, struct http_map_data *data, struct map_cb_data *cbdata)
+{
+ gchar *p = buf, *remain;
+ gint skip = 0;
+
+ if (data->chunked == 1) {
+ /* Read first chunk data */
+ if ((skip = read_chunk_header (buf, len, data)) != -1) {
+ p += skip;
+ len -= skip;
+ data->chunked = 2;
+ }
+ else {
+ msg_info ("invalid chunked reply: %*s", (gint)len, buf);
+ return FALSE;
+ }
+ }
+
+ if (data->chunk_remain == 0) {
+ /* Read another chunk */
+ if ((skip = read_chunk_header (buf, len, data)) != -1) {
+ p += skip;
+ len -= skip;
+ }
+ else {
+ msg_info ("invalid chunked reply: %*s", (gint)len, buf);
+ return FALSE;
+ }
+ if (data->chunk == 0) {
+ return FALSE;
+ }
+ }
+
+ if (data->chunk_remain <= len ) {
+ /* Call callback and move remaining buffer */
+ remain = map->read_callback (map->pool, p, data->chunk_remain, cbdata);
+ if (remain != NULL && remain != p + data->chunk_remain) {
+ /* Copy remaining buffer to start of buffer */
+ data->rlen = len - (remain - p);
+ memmove (buf, remain, data->rlen);
+ data->chunk_remain -= data->rlen;
+ }
+ else {
+ /* Copy other part */
+ data->rlen = len - data->chunk_remain;
+ if (data->rlen > 0) {
+ memmove (buf, p + data->chunk_remain, data->rlen);
+ }
+ data->chunk_remain = 0;
+ }
+
+ }
+ else {
+ /* Just read another portion of chunk */
+ data->chunk_remain -= len;
+ remain = map->read_callback (map->pool, p, len, cbdata);
+ if (remain != NULL && remain != p + len) {
+ /* copy remaining buffer to start of buffer */
+ data->rlen = len - (remain - p);
+ memmove (buf, remain, data->rlen);
+ }
+ }
+
+ return TRUE;
+}
+
+/**
+ * Callback for reading HTTP reply
+ */
+static gboolean
+read_http_common (struct rspamd_map *map, struct http_map_data *data, struct http_reply *reply, struct map_cb_data *cbdata, gint fd)
+{
+ gchar *remain, *pos;
+ ssize_t r;
+ gchar *te, *date;
+
+ if ((r = read (fd, data->read_buf + data->rlen, sizeof (data->read_buf) - data->rlen)) > 0) {
+ r += data->rlen;
+ data->rlen = 0;
+ remain = parse_http_reply (data->read_buf, r, reply);
+ if (remain != NULL && remain != data->read_buf) {
+ /* copy remaining data->read_buffer to start of data->read_buffer */
+ data->rlen = r - (remain - data->read_buf);
+ memmove (data->read_buf, remain, data->rlen);
+ r = data->rlen;
+ data->rlen = 0;
+ }
+ if (r <= 0) {
+ return TRUE;
+ }
+ if (reply->parser_state == 6) {
+ /* If reply header is parsed successfully, try to read further data */
+ if (reply->code != 200 && reply->code != 304) {
+ msg_err ("got error reply from server %s, %d", data->host, reply->code);
+ return FALSE;
+ }
+ else if (reply->code == 304) {
+ /* Do not read anything */
+ return FALSE;
+ }
+ pos = data->read_buf;
+ /* Check for chunked */
+ if (data->chunked == 0) {
+ if ((te = g_hash_table_lookup (reply->headers, "Transfer-Encoding")) != NULL) {
+ if (g_ascii_strcasecmp (te, "chunked") == 0) {
+ data->chunked = 1;
+ }
+ else {
+ data->chunked = -1;
+ }
+ }
+ else {
+ data->chunked = -1;
+ }
+ }
+ /* Check for date */
+ date = g_hash_table_lookup (reply->headers, "Date");
+ if (date != NULL) {
+ data->last_checked = rspamd_http_parse_date (date, -1);
+ }
+ else {
+ data->last_checked = (time_t)-1;
+ }
+
+ if (data->chunked > 0) {
+ return read_http_chunked (data->read_buf, r, map, data, cbdata);
+ }
+ /* Read more data */
+ remain = map->read_callback (map->pool, pos, r, cbdata);
+ if (remain != NULL && remain != pos + r) {
+ /* copy remaining data->read_buffer to start of data->read_buffer */
+ data->rlen = r - (remain - pos);
+ memmove (pos, remain, data->rlen);
+ }
+ }
+ }
+ else {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/**
+ * Sync read of HTTP reply
+ */
+static void
+read_http_sync (struct rspamd_map *map, struct http_map_data *data)
+{
+ struct map_cb_data cbdata;
+ gint fd;
+ struct http_reply *repl;
+
+ if (map->read_callback == NULL || map->fin_callback == NULL) {
+ msg_err ("bad callback for reading map file");
+ return;
+ }
+
+ /* Connect synced */
+ if ((fd = connect_http (map, data, FALSE)) == -1) {
+ return;
+ }
+ write_http_request (map, data, fd);
+
+ cbdata.state = 0;
+ cbdata.map = map;
+ cbdata.prev_data = *map->user_data;
+ cbdata.cur_data = NULL;
+
+ repl = g_malloc (sizeof (struct http_reply));
+ repl->parser_state = 0;
+ repl->code = 404;
+ repl->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
+
+ while (read_http_common (map, data, repl, &cbdata, fd));
+
+ close (fd);
+
+ map->fin_callback (map->pool, &cbdata);
+ *map->user_data = cbdata.cur_data;
+ if (data->last_checked == (time_t)-1) {
+ data->last_checked = time (NULL);
+ }
+
+ g_hash_table_destroy (repl->headers);
+ g_free (repl);
+}
+
+/**
+ * Callback for reading data from file
+ */
+static void
+read_map_file (struct rspamd_map *map, struct file_map_data *data)
+{
+ struct map_cb_data cbdata;
+ gchar buf[BUFSIZ], *remain;
+ ssize_t r;
+ gint fd, rlen;
+
+ if (map->read_callback == NULL || map->fin_callback == NULL) {
+ msg_err ("bad callback for reading map file");
+ return;
+ }
+
+ if ((fd = open (data->filename, O_RDONLY)) == -1) {
+ msg_warn ("cannot open file '%s': %s", data->filename, strerror (errno));
+ return;
+ }
+
+ cbdata.state = 0;
+ cbdata.prev_data = *map->user_data;
+ cbdata.cur_data = NULL;
+ cbdata.map = map;
+
+ rlen = 0;
+ while ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
+ r += rlen;
+ buf[r] = '\0';
+ remain = map->read_callback (map->pool, buf, r, &cbdata);
+ if (remain != NULL) {
+ /* copy remaining buffer to start of buffer */
+ rlen = r - (remain - buf);
+ memmove (buf, remain, rlen);
+ }
+ }
+
+ close (fd);
+
+ map->fin_callback (map->pool, &cbdata);
+ *map->user_data = cbdata.cur_data;
+}
+
+/**
+ * FSM for parsing lists
+ */
+gchar *
+abstract_parse_kv_list (rspamd_mempool_t * pool, gchar * chunk, gint len, struct map_cb_data *data, insert_func func)
+{
+ gchar *c, *p, *key = NULL, *value = NULL;
+
+ p = chunk;
+ c = p;
+
+ while (p - chunk < len) {
+ switch (data->state) {
+ case 0:
+ /* read key */
+ /* Check here comments, eol and end of buffer */
+ if (*p == '#') {
+ if (key != NULL && p - c >= 0) {
+ value = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (value, c, p - c);
+ value[p - c] = '\0';
+ value = g_strstrip (value);
+ func (data->cur_data, key, value);
+ msg_debug ("insert kv pair: %s -> %s", key, value);
+ }
+ data->state = 99;
+ }
+ else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) {
+ if (key != NULL && p - c >= 0) {
+ value = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (value, c, p - c);
+ value[p - c] = '\0';
+
+ value = g_strstrip (value);
+ func (data->cur_data, key, value);
+ msg_debug ("insert kv pair: %s -> %s", key, value);
+ }
+ else if (key == NULL && p - c > 0) {
+ /* Key only line */
+ key = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (key, c, p - c);
+ key[p - c] = '\0';
+ value = rspamd_mempool_alloc (pool, 1);
+ *value = '\0';
+ func (data->cur_data, key, value);
+ msg_debug ("insert kv pair: %s -> %s", key, value);
+ }
+ data->state = 100;
+ key = NULL;
+ }
+ else if (g_ascii_isspace (*p)) {
+ if (p - c > 0) {
+ key = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (key, c, p - c);
+ key[p - c] = '\0';
+ data->state = 2;
+ }
+ else {
+ key = NULL;
+ }
+ }
+ else {
+ p ++;
+ }
+ break;
+ case 2:
+ /* Skip spaces before value */
+ if (!g_ascii_isspace (*p)) {
+ c = p;
+ data->state = 0;
+ }
+ else {
+ p ++;
+ }
+ break;
+ case 99:
+ /* SKIP_COMMENT */
+ /* Skip comment till end of line */
+ if (*p == '\r' || *p == '\n') {
+ while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+ p++;
+ }
+ c = p;
+ key = NULL;
+ data->state = 0;
+ }
+ else {
+ p++;
+ }
+ break;
+ case 100:
+ /* Skip \r\n and whitespaces */
+ if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) {
+ p ++;
+ }
+ else {
+ c = p;
+ key = NULL;
+ data->state = 0;
+ }
+ break;
+ }
+ }
+
+ return c;
+}
+
+gchar *
+abstract_parse_list (rspamd_mempool_t * pool, gchar * chunk, gint len, struct map_cb_data *data, insert_func func)
+{
+ gchar *s, *p, *str, *start;
+
+ p = chunk;
+ start = p;
+
+ str = g_malloc (len + 1);
+ s = str;
+
+ while (p - chunk < len) {
+ switch (data->state) {
+ /* READ_SYMBOL */
+ case 0:
+ if (*p == '#') {
+ /* Got comment */
+ if (s != str) {
+ /* Save previous string in lines like: "127.0.0.1 #localhost" */
+ *s = '\0';
+ s = rspamd_mempool_strdup (pool, g_strstrip (str));
+ if (strlen (s) > 0) {
+ func (data->cur_data, s, hash_fill);
+ }
+ s = str;
+ start = p;
+ }
+ data->state = 1;
+ }
+ else if (*p == '\r' || *p == '\n') {
+ /* Got EOL marker, save stored string */
+ if (s != str) {
+ *s = '\0';
+ s = rspamd_mempool_strdup (pool, g_strstrip (str));
+ if (strlen (s) > 0) {
+ func (data->cur_data, s, hash_fill);
+ }
+ s = str;
+ }
+ /* Skip EOL symbols */
+ while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+ p++;
+ }
+ start = p;
+ }
+ else {
+ /* Store new string in s */
+ *s = *p;
+ s++;
+ p++;
+ }
+ break;
+ /* SKIP_COMMENT */
+ case 1:
+ /* Skip comment till end of line */
+ if (*p == '\r' || *p == '\n') {
+ while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+ p++;
+ }
+ s = str;
+ start = p;
+ data->state = 0;
+ }
+ else {
+ p++;
+ }
+ break;
+ }
+ }
+
+ g_free (str);
+
+ return start;
+}
+
+/**
+ * Radix tree helper function
+ */
+static void
+radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
+{
+ radix_tree_t *tree = st;
+
+ guint32 mask = 0xFFFFFFFF;
+ guint32 ip;
+ gchar *token, *ipnet, *err_str, **strv, **cur;
+ struct in_addr ina;
+ gint k;
+
+ /* Split string if there are multiple items inside a single string */
+ strv = g_strsplit_set ((gchar *)key, " ,;", 0);
+ cur = strv;
+ while (*cur) {
+ if (**cur == '\0') {
+ cur++;
+ continue;
+ }
+ /* Extract ipnet */
+ ipnet = *cur;
+ token = strsep (&ipnet, "/");
+
+ if (ipnet != NULL) {
+ errno = 0;
+ /* Get mask */
+ k = strtoul (ipnet, &err_str, 10);
+ if (errno != 0) {
+ msg_warn ("invalid netmask, error detected on symbol: %s, erorr: %s", err_str, strerror (errno));
+ k = 32;
+ }
+ else if (k > 32 || k < 0) {
+ msg_warn ("invalid netmask value: %d", k);
+ k = 32;
+ }
+ /* Calculate mask based on CIDR presentation */
+ mask = mask << (32 - k);
+ }
+
+ /* Check IP */
+ if (inet_aton (token, &ina) == 0) {
+ msg_err ("invalid ip address: %s", token);
+ return;
+ }
+
+ /* Insert ip in a tree */
+ ip = ntohl ((guint32) ina.s_addr);
+ k = radix32tree_insert (tree, ip, mask, 1);
+ if (k == -1) {
+ msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa (ina), mask);
+ }
+ else if (k == 1) {
+ msg_warn ("ip %s, mask %X, value already exists", inet_ntoa (ina), mask);
+ }
+ cur++;
+ }
+
+ g_strfreev (strv);
+}
+
+/* Helpers */
+gchar *
+read_host_list (rspamd_mempool_t * pool, gchar * chunk, gint len, struct map_cb_data *data)
+{
+ if (data->cur_data == NULL) {
+ data->cur_data = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+ }
+ return abstract_parse_list (pool, chunk, len, data, (insert_func) g_hash_table_insert);
+}
+
+void
+fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+ if (data->prev_data) {
+ g_hash_table_destroy (data->prev_data);
+ }
+}
+
+gchar *
+read_kv_list (rspamd_mempool_t * pool, gchar * chunk, gint len, struct map_cb_data *data)
+{
+ if (data->cur_data == NULL) {
+ data->cur_data = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+ }
+ return abstract_parse_kv_list (pool, chunk, len, data, (insert_func) g_hash_table_insert);
+}
+
+void
+fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+ if (data->prev_data) {
+ g_hash_table_destroy (data->prev_data);
+ }
+}
+
+gchar *
+read_radix_list (rspamd_mempool_t * pool, gchar * chunk, gint len, struct map_cb_data *data)
+{
+ if (data->cur_data == NULL) {
+ data->cur_data = radix_tree_create ();
+ }
+ return abstract_parse_list (pool, chunk, len, data, (insert_func) radix_tree_insert_helper);
+}
+
+void
+fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+ if (data->prev_data) {
+ radix_tree_free (data->prev_data);
+ }
+}
+
+/**
+ * Common file callback
+ */
+static void
+file_callback (gint fd, short what, void *ud)
+{
+ struct rspamd_map *map = ud;
+ struct file_map_data *data = map->map_data;
+ struct stat st;
+ gdouble jittered_sec;
+
+ /* Plan event again with jitter */
+ evtimer_del (&map->ev);
+ jittered_sec = (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
+ double_to_tv (jittered_sec, &map->tv);
+
+ evtimer_add (&map->ev, &map->tv);
+
+ if (g_atomic_int_get (map->locked)) {
+ msg_info ("don't try to reread map as it is locked by other process, will reread it later");
+ return;
+ }
+
+ if (stat (data->filename, &st) != -1 && (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
+ /* File was modified since last check */
+ memcpy (&data->st, &st, sizeof (struct stat));
+ }
+ else {
+ return;
+ }
+
+ msg_info ("rereading map file %s", data->filename);
+ read_map_file (map, data);
+}
+
+/**
+ * Callback for destroying HTTP callback data
+ */
+static void
+free_http_cbdata (struct http_callback_data *cbd)
+{
+ if (cbd->reply) {
+ g_hash_table_destroy (cbd->reply->headers);
+ g_free (cbd->reply);
+ }
+ g_atomic_int_set (cbd->map->locked, 0);
+ event_del (&cbd->ev);
+ close (cbd->fd);
+ g_free (cbd);
+}
+
+/**
+ * Async HTTP request parser
+ */
+static void
+http_async_callback (gint fd, short what, void *ud)
+{
+ struct http_callback_data *cbd = ud;
+
+ /* Begin of connection */
+ if (what == EV_WRITE) {
+ if (cbd->state == 0) {
+ /* Can write request */
+ write_http_request (cbd->map, cbd->data, fd);
+ /* Plan reading */
+ event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
+ cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
+ cbd->tv.tv_usec = 0;
+ cbd->state = 1;
+ /* Allocate reply structure */
+ cbd->reply = g_malloc (sizeof (struct http_reply));
+ cbd->reply->parser_state = 0;
+ cbd->reply->code = 404;
+ cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
+ cbd->cbdata.state = 0;
+ cbd->cbdata.prev_data = *cbd->map->user_data;
+ cbd->cbdata.cur_data = NULL;
+ cbd->cbdata.map = cbd->map;
+ cbd->data->rlen = 0;
+ cbd->data->chunk = 0;
+ cbd->data->chunk_remain = 0;
+ cbd->data->chunked = FALSE;
+ cbd->data->read_buf[0] = '\0';
+
+ event_add (&cbd->ev, &cbd->tv);
+ }
+ else {
+ msg_err ("bad state when got write readiness");
+ free_http_cbdata (cbd);
+ return;
+ }
+ }
+ /* Got reply, parse it */
+ else if (what == EV_READ) {
+ if (cbd->state >= 1) {
+ if (!read_http_common (cbd->map, cbd->data, cbd->reply, &cbd->cbdata, cbd->fd)) {
+ /* Handle Not-Modified in a special way */
+ if (cbd->reply->code == 304) {
+ if (cbd->data->last_checked == (time_t)-1) {
+ cbd->data->last_checked = time (NULL);
+ }
+ msg_info ("data is not modified for server %s", cbd->data->host);
+ }
+ else if (cbd->cbdata.cur_data != NULL) {
+ /* Destroy old data and start reading request data */
+ cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
+ *cbd->map->user_data = cbd->cbdata.cur_data;
+ if (cbd->data->last_checked == (time_t)-1) {
+ cbd->data->last_checked = time (NULL);
+ }
+ }
+ if (cbd->state == 1 && cbd->reply->code == 200) {
+ /* Write to log that data is modified */
+ msg_info ("rereading map data from %s", cbd->data->host);
+ }
+
+ free_http_cbdata (cbd);
+ return;
+ }
+ else if (cbd->state == 1) {
+ /* Write to log that data is modified */
+ msg_info ("rereading map data from %s", cbd->data->host);
+ }
+ cbd->state = 2;
+ }
+ }
+ else {
+ msg_err ("connection with http server terminated incorrectly");
+ free_http_cbdata (cbd);
+ }
+}
+
+/**
+ * Async HTTP callback
+ */
+static void
+http_callback (gint fd, short what, void *ud)
+{
+ struct rspamd_map *map = ud;
+ struct http_map_data *data = map->map_data;
+ gint sock;
+ struct http_callback_data *cbd;
+ gdouble jittered_sec;
+
+ /* Plan event again with jitter */
+ evtimer_del (&map->ev);
+ jittered_sec = (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
+ double_to_tv (jittered_sec, &map->tv);
+ evtimer_add (&map->ev, &map->tv);
+
+ if (g_atomic_int_get (map->locked)) {
+ msg_info ("don't try to reread map as it is locked by other process, will reread it later");
+ return;
+ }
+
+ g_atomic_int_inc (map->locked);
+
+ /* Connect asynced */
+ if ((sock = connect_http (map, data, TRUE)) == -1) {
+ g_atomic_int_set (map->locked, 0);
+ return;
+ }
+ else {
+ /* Plan event */
+ cbd = g_malloc (sizeof (struct http_callback_data));
+ cbd->ev_base = map->ev_base;
+ event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
+ cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
+ cbd->tv.tv_usec = 0;
+ cbd->map = map;
+ cbd->data = data;
+ cbd->state = 0;
+ cbd->fd = sock;
+ cbd->reply = NULL;
+ event_add (&cbd->ev, &cbd->tv);
+ }
+}
+
+/* Start watching event for all maps */
+void
+start_map_watch (struct config_file *cfg, struct event_base *ev_base)
+{
+ GList *cur = cfg->maps;
+ struct rspamd_map *map;
+ struct file_map_data *fdata;
+ gdouble jittered_sec;
+
+ /* First of all do synced read of data */
+ while (cur) {
+ map = cur->data;
+ map->ev_base = ev_base;
+ if (map->protocol == MAP_PROTO_FILE) {
+ evtimer_set (&map->ev, file_callback, map);
+ event_base_set (map->ev_base, &map->ev);
+ /* Read initial data */
+ fdata = map->map_data;
+ if (fdata->st.st_mtime != -1) {
+ /* Do not try to read non-existent file */
+ read_map_file (map, map->map_data);
+ }
+ /* Plan event with jitter */
+ jittered_sec = (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout) / 2.;
+ double_to_tv (jittered_sec, &map->tv);
+ evtimer_add (&map->ev, &map->tv);
+ }
+ else if (map->protocol == MAP_PROTO_HTTP) {
+ evtimer_set (&map->ev, http_callback, map);
+ event_base_set (map->ev_base, &map->ev);
+ /* Read initial data */
+ read_http_sync (map, map->map_data);
+ /* Plan event with jitter */
+ jittered_sec = (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
+ double_to_tv (jittered_sec, &map->tv);
+ evtimer_add (&map->ev, &map->tv);
+ }
+ cur = g_list_next (cur);
+ }
+}
+
+void
+remove_all_maps (struct config_file *cfg)
+{
+ g_list_free (cfg->maps);
+ cfg->maps = NULL;
+ if (cfg->map_pool != NULL) {
+ rspamd_mempool_delete (cfg->map_pool);
+ cfg->map_pool = NULL;
+ }
+}
+
+gboolean
+check_map_proto (const gchar *map_line, gint *res, const gchar **pos)
+{
+ if (g_ascii_strncasecmp (map_line, "http://", sizeof ("http://") - 1) == 0) {
+ if (res && pos) {
+ *res = MAP_PROTO_HTTP;
+ *pos = map_line + sizeof ("http://") - 1;
+ }
+ }
+ else if (g_ascii_strncasecmp (map_line, "file://", sizeof ("file://") - 1) == 0) {
+ if (res && pos) {
+ *res = MAP_PROTO_FILE;
+ *pos = map_line + sizeof ("file://") - 1;
+ }
+ }
+ else if (*map_line == '/') {
+ /* Trivial file case */
+ *res = MAP_PROTO_FILE;
+ *pos = map_line;
+ }
+ else {
+ msg_debug ("invalid map fetching protocol: %s", map_line);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+add_map (struct config_file *cfg, const gchar *map_line, const gchar *description,
+ map_cb_t read_callback, map_fin_cb_t fin_callback, void **user_data)
+{
+ struct rspamd_map *new_map;
+ enum fetch_proto proto;
+ const gchar *def, *p, *hostend;
+ struct file_map_data *fdata;
+ struct http_map_data *hdata;
+ gchar portbuf[6];
+ gint i, s, r;
+ struct addrinfo hints, *res;
+
+ /* First of all detect protocol line */
+ if (!check_map_proto (map_line, (int *)&proto, &def)) {
+ return FALSE;
+ }
+ /* Constant pool */
+ if (cfg->map_pool == NULL) {
+ cfg->map_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
+ }
+ new_map = rspamd_mempool_alloc0 (cfg->map_pool, sizeof (struct rspamd_map));
+ new_map->read_callback = read_callback;
+ new_map->fin_callback = fin_callback;
+ new_map->user_data = user_data;
+ new_map->protocol = proto;
+ new_map->cfg = cfg;
+ new_map->id = g_random_int ();
+ new_map->locked = rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
+
+ if (proto == MAP_PROTO_FILE) {
+ new_map->uri = rspamd_mempool_strdup (cfg->cfg_pool, def);
+ def = new_map->uri;
+ }
+ else {
+ new_map->uri = rspamd_mempool_strdup (cfg->cfg_pool, map_line);
+ }
+ if (description != NULL) {
+ new_map->description = rspamd_mempool_strdup (cfg->cfg_pool, description);
+ }
+
+ /* Now check for each proto separately */
+ if (proto == MAP_PROTO_FILE) {
+ fdata = rspamd_mempool_alloc0 (cfg->map_pool, sizeof (struct file_map_data));
+ if (access (def, R_OK) == -1) {
+ if (errno != ENOENT) {
+ msg_err ("cannot open file '%s': %s", def, strerror (errno));
+ return FALSE;
+
+ }
+ msg_info ("map '%s' is not found, but it can be loaded automatically later", def);
+ /* We still can add this file */
+ fdata->st.st_mtime = -1;
+ }
+ else {
+ stat (def, &fdata->st);
+ }
+ fdata->filename = rspamd_mempool_strdup (cfg->map_pool, def);
+ new_map->map_data = fdata;
+ }
+ else if (proto == MAP_PROTO_HTTP) {
+ hdata = rspamd_mempool_alloc0 (cfg->map_pool, sizeof (struct http_map_data));
+ /* Try to search port */
+ if ((p = strchr (def, ':')) != NULL) {
+ hostend = p;
+ i = 0;
+ p++;
+ while (g_ascii_isdigit (*p) && i < (gint)sizeof (portbuf) - 1) {
+ portbuf[i++] = *p++;
+ }
+ if (*p != '/') {
+ msg_info ("bad http map definition: %s", def);
+ return FALSE;
+ }
+ portbuf[i] = '\0';
+ hdata->port = atoi (portbuf);
+ }
+ else {
+ /* Default http port */
+ rspamd_snprintf (portbuf, sizeof (portbuf), "80");
+ hdata->port = 80;
+ /* Now separate host from path */
+ if ((p = strchr (def, '/')) == NULL) {
+ msg_info ("bad http map definition: %s", def);
+ return FALSE;
+ }
+ hostend = p;
+ }
+ hdata->host = rspamd_mempool_alloc (cfg->map_pool, hostend - def + 1);
+ rspamd_strlcpy (hdata->host, def, hostend - def + 1);
+ hdata->path = rspamd_mempool_strdup (cfg->map_pool, p);
+ hdata->rlen = 0;
+ /* Now try to resolve */
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_socktype = SOCK_STREAM; /* Stream socket */
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0; /* Any protocol */
+ hints.ai_canonname = NULL;
+ hints.ai_addr = NULL;
+ hints.ai_next = NULL;
+
+ if ((r = getaddrinfo (hdata->host, portbuf, &hints, &res)) == 0) {
+ hdata->addr = res;
+ rspamd_mempool_add_destructor (cfg->cfg_pool, (rspamd_mempool_destruct_t)freeaddrinfo, hdata->addr);
+ }
+ else {
+ msg_err ("address resolution for %s failed: %s", hdata->host, gai_strerror (r));
+ return FALSE;
+ }
+ /* Now try to connect */
+ if ((s = make_tcp_socket (hdata->addr, FALSE, FALSE)) == -1) {
+ msg_info ("cannot connect to http server %s: %d, %s", hdata->host, errno, strerror (errno));
+ return FALSE;
+ }
+ close (s);
+ new_map->map_data = hdata;
+ }
+ /* Temp pool */
+ new_map->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
+
+ cfg->maps = g_list_prepend (cfg->maps, new_map);
+
+ return TRUE;
+}