static const gchar *hash_fill = "1";
-/* Http reply */
-struct http_reply {
- gint code;
- GHashTable *headers;
- gchar *cur_header;
- gint parser_state;
+/**
+ * Data specific to file maps
+ */
+struct file_map_data {
+ const gchar *filename;
+ struct stat st;
+};
+
+/**
+ * Data specific to HTTP maps
+ */
+struct http_map_data {
+ struct addrinfo *addr;
+ guint16 port;
+ gchar *path;
+ gchar *host;
+ time_t last_checked;
+ gboolean request_sent;
+ struct rspamd_http_connection *conn;
};
+
struct http_callback_data {
- struct event ev;
struct event_base *ev_base;
struct timeval tv;
struct rspamd_map *map;
struct http_map_data *data;
- struct http_reply *reply;
struct map_cb_data cbdata;
- gint state;
+ GString *remain_buf;
+
gint fd;
};
static void
write_http_request (struct rspamd_map *map,
struct http_map_data *data,
- gint sock)
+ gint sock,
+ struct timeval *tv)
{
- gchar outbuf[BUFSIZ], datebuf[128];
- gint r;
+ gchar datebuf[128];
struct tm *tm;
+ struct rspamd_http_message *msg;
- tm = gmtime (&data->last_checked);
- strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm);
- r = rspamd_snprintf (outbuf,
- sizeof (outbuf),
- "GET %s%s HTTP/1.1" CRLF "Connection: close" CRLF "Host: %s" CRLF,
- (*data->path == '/') ? "" : "/",
- data->path,
- data->host);
- if (data->last_checked != 0) {
- r += rspamd_snprintf (outbuf + r,
- sizeof (outbuf) - r,
- "If-Modified-Since: %s" CRLF,
- datebuf);
- }
+ msg = rspamd_http_new_message (HTTP_REQUEST);
- r += rspamd_snprintf (outbuf + r, sizeof (outbuf) - r, CRLF);
+ msg->url = g_string_new (data->path);
+ if (data->last_checked != 0) {
+ tm = gmtime (&data->last_checked);
+ strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm);
- if (write (sock, outbuf, r) == -1) {
- msg_err ("failed to write request: %d, %s", errno, strerror (errno));
+ rspamd_http_message_add_header (msg, "If-Modified-Since", datebuf);
}
+
+ rspamd_http_connection_write_message (data->conn, msg, data->host, NULL,
+ map, sock, tv, map->ev_base);
}
/**
- * FSM for parsing HTTP reply
+ * Callback for destroying HTTP callback data
*/
-static gchar *
-parse_http_reply (gchar * chunk, gint len, struct http_reply *reply)
+static void
+free_http_cbdata (struct http_callback_data *cbd)
{
- gchar *s, *p, *err_str, *tmp;
- p = chunk;
- s = chunk;
-
- while (p - chunk < len) {
- switch (reply->parser_state) {
- /* Search status code */
- case 0:
- /* Search for status code */
- if (*p != ' ') {
- p++;
- }
- else {
- /* Try to parse HTTP reply code */
- reply->code = strtoul (++p, (gchar **)&err_str, 10);
- if (*err_str != ' ') {
- msg_info ("error while reading HTTP status code: %s", p);
- return NULL;
- }
- /* Now skip to end of status string */
- reply->parser_state = 1;
- continue;
- }
- break;
- /* Skip to end of line */
- case 1:
- if (*p == '\n') {
- /* Switch to read header state */
- reply->parser_state = 2;
- }
- /* Each skipped symbol is proceeded */
- s = ++p;
- break;
- /* Read header value */
- case 2:
- if (*p == ':') {
- reply->cur_header = g_malloc (p - s + 1);
- rspamd_strlcpy (reply->cur_header, s, p - s + 1);
- reply->parser_state = 3;
- }
- else if (*p == '\r' && *(p + 1) == '\n') {
- /* Last empty line */
- reply->parser_state = 5;
- }
- p++;
- break;
- /* Skip spaces after header name */
- case 3:
- if (*p != ' ') {
- s = p;
- reply->parser_state = 4;
- }
- else {
- p++;
- }
- break;
- /* Read header value */
- case 4:
- if (*p == '\r') {
- if (reply->cur_header != NULL) {
- tmp = g_malloc (p - s + 1);
- rspamd_strlcpy (tmp, s, p - s + 1);
- g_hash_table_insert (reply->headers, reply->cur_header,
- tmp);
- reply->cur_header = NULL;
- }
- reply->parser_state = 1;
- }
- p++;
- break;
- case 5:
- /* Set pointer to begining of HTTP body */
- p++;
- s = p;
- reply->parser_state = 6;
- break;
- case 6:
- /* Headers parsed, just return */
- return p;
- break;
- }
+ g_atomic_int_set (cbd->map->locked, 0);
+ if (cbd->remain_buf) {
+ g_string_free (cbd->remain_buf, TRUE);
}
- return s;
+ rspamd_http_connection_reset (cbd->data->conn);
+ close (cbd->fd);
+ g_free (cbd);
}
-/**
- * Read and parse chunked header
+/*
+ * HTTP callbacks
*/
-static gint
-read_chunk_header (gchar * buf, gint len, struct http_map_data *data)
+static void
+http_map_error (struct rspamd_http_connection *conn,
+ GError *err)
{
- gchar chunkbuf[32], *p, *c, *err_str;
- gint skip = 0;
-
- p = chunkbuf;
- c = buf;
- /* Find hex digits */
- while (g_ascii_isxdigit (*c) && p - chunkbuf <
- (gint)(sizeof (chunkbuf) - 1) && skip < len) {
- *p++ = *c++;
- skip++;
- }
- *p = '\0';
- data->chunk = strtoul (chunkbuf, &err_str, 16);
- if (*err_str != '\0') {
- return -1;
- }
-
- /* Now skip to CRLF */
- while (*c != '\n' && skip < len) {
- c++;
- skip++;
- }
- if (*c == '\n' && skip < len) {
- skip++;
- c++;
- }
- data->chunk_remain = data->chunk;
+ struct http_callback_data *cbd = conn->ud;
- return skip;
+ msg_err ("connection with http server terminated incorrectly: %s",
+ err->message);
+ free_http_cbdata (cbd);
}
-/**
- * Helper callback for reading chunked reply
- */
-static gboolean
-read_http_chunked (gchar * buf,
- size_t len,
- struct rspamd_map *map,
- struct http_map_data *data,
- struct map_cb_data *cbdata)
+static int
+http_map_finish (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
{
- gchar *p = buf, *remain;
- gint skip = 0;
-
- if (data->chunked == 1) {
- /* Read first chunk data */
- if ((skip = read_chunk_header (buf, len, data)) != -1) {
- p += skip;
- len -= skip;
- data->chunked = 2;
- }
- else {
- msg_info ("invalid chunked reply: %*s", (gint)len, buf);
- return FALSE;
- }
- }
-
- if (data->chunk_remain == 0) {
- /* Read another chunk */
- if ((skip = read_chunk_header (buf, len, data)) != -1) {
- p += skip;
- len -= skip;
- }
- else {
- msg_info ("invalid chunked reply: %*s", (gint)len, buf);
- return FALSE;
- }
- if (data->chunk == 0) {
- return FALSE;
- }
- }
+ struct http_callback_data *cbd = conn->ud;
+ struct rspamd_map *map;
- if (data->chunk_remain <= len ) {
- /* Call callback and move remaining buffer */
- remain = map->read_callback (map->pool, p, data->chunk_remain, cbdata);
- if (remain != NULL && remain != p + data->chunk_remain) {
- /* Copy remaining buffer to start of buffer */
- data->rlen = len - (remain - p);
- memmove (buf, remain, data->rlen);
- data->chunk_remain -= data->rlen;
- }
- else {
- /* Copy other part */
- data->rlen = len - data->chunk_remain;
- if (data->rlen > 0) {
- memmove (buf, p + data->chunk_remain, data->rlen);
- }
- data->chunk_remain = 0;
+ if (msg->code == 200) {
+ map = cbd->map;
+ if (cbd->remain_buf != NULL) {
+ map->read_callback (map->pool, cbd->remain_buf->str,
+ cbd->remain_buf->len, &cbd->cbdata);
}
+ map->fin_callback (map->pool, &cbd->cbdata);
+ *map->user_data = cbd->cbdata.cur_data;
+ cbd->data->last_checked = msg->date;
}
- else {
- /* Just read another portion of chunk */
- data->chunk_remain -= len;
- remain = map->read_callback (map->pool, p, len, cbdata);
- if (remain != NULL && remain != p + len) {
- /* copy remaining buffer to start of buffer */
- data->rlen = len - (remain - p);
- memmove (buf, remain, data->rlen);
- }
+ else if (msg->code == 304) {
+ msg_info ("data is not modified for server %s",
+ cbd->data->host);
+ cbd->data->last_checked = msg->date;
}
- return TRUE;
-}
-
-/**
- * Callback for reading HTTP reply
- */
-static gboolean
-read_http_common (struct rspamd_map *map,
- struct http_map_data *data,
- struct http_reply *reply,
- struct map_cb_data *cbdata,
- gint fd)
-{
- gchar *remain, *pos;
- ssize_t r;
- gchar *te, *date;
-
- if ((r =
- read (fd, data->read_buf + data->rlen, sizeof (data->read_buf) -
- data->rlen)) > 0) {
- r += data->rlen;
- data->rlen = 0;
- remain = parse_http_reply (data->read_buf, r, reply);
- if (remain != NULL && remain != data->read_buf) {
- /* copy remaining data->read_buffer to start of data->read_buffer */
- data->rlen = r - (remain - data->read_buf);
- memmove (data->read_buf, remain, data->rlen);
- r = data->rlen;
- data->rlen = 0;
- }
- if (r <= 0) {
- return TRUE;
- }
- if (reply->parser_state == 6) {
- /* If reply header is parsed successfully, try to read further data */
- if (reply->code != 200 && reply->code != 304) {
- msg_err ("got error reply from server %s, %d",
- data->host,
- reply->code);
- return FALSE;
- }
- else if (reply->code == 304) {
- /* Do not read anything */
- return FALSE;
- }
- pos = data->read_buf;
- /* Check for chunked */
- if (data->chunked == 0) {
- if ((te =
- g_hash_table_lookup (reply->headers,
- "Transfer-Encoding")) != NULL) {
- if (g_ascii_strcasecmp (te, "chunked") == 0) {
- data->chunked = 1;
- }
- else {
- data->chunked = -1;
- }
- }
- else {
- data->chunked = -1;
- }
- }
- /* Check for date */
- date = g_hash_table_lookup (reply->headers, "Date");
- if (date != NULL) {
- data->last_checked = rspamd_http_parse_date (date, -1);
- }
- else {
- data->last_checked = (time_t)-1;
- }
-
- if (data->chunked > 0) {
- return read_http_chunked (data->read_buf, r, map, data, cbdata);
- }
- /* Read more data */
- remain = map->read_callback (map->pool, pos, r, cbdata);
- if (remain != NULL && remain != pos + r) {
- /* copy remaining data->read_buffer to start of data->read_buffer */
- data->rlen = r - (remain - pos);
- memmove (pos, remain, data->rlen);
- }
- }
- }
- else {
- return FALSE;
- }
+ free_http_cbdata (cbd);
- return TRUE;
+ return 0;
}
-/**
- * Sync read of HTTP reply
- */
-static void
-read_http_sync (struct rspamd_map *map, struct http_map_data *data)
+static int
+http_map_read (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg,
+ const gchar *chunk,
+ gsize len)
{
- struct map_cb_data cbdata;
- gint fd;
- struct http_reply *repl;
-
- if (map->read_callback == NULL || map->fin_callback == NULL) {
- msg_err ("bad callback for reading map file");
- return;
- }
+ struct http_callback_data *cbd = conn->ud;
+ gchar *pos;
+ struct rspamd_map *map;
- /* Connect synced */
- if ((fd = connect_http (map, data, FALSE)) == -1) {
- return;
+ if (msg->code != 200) {
+ /* Ignore not full replies */
+ return 0;
}
- write_http_request (map, data, fd);
-
- cbdata.state = 0;
- cbdata.map = map;
- cbdata.prev_data = *map->user_data;
- cbdata.cur_data = NULL;
- repl = g_malloc (sizeof (struct http_reply));
- repl->parser_state = 0;
- repl->code = 404;
- repl->headers = g_hash_table_new_full (rspamd_strcase_hash,
- rspamd_strcase_equal,
- g_free,
- g_free);
+ map = cbd->map;
+ if (cbd->remain_buf != NULL) {
+ /* We need to concatenate incoming buf with the remaining buf */
+ g_string_append_len (cbd->remain_buf, chunk, len);
- while (read_http_common (map, data, repl, &cbdata, fd)) ;
+ pos = map->read_callback (map->pool, cbd->remain_buf->str,
+ cbd->remain_buf->len, &cbd->cbdata);
- close (fd);
+ /* All read */
+ if (pos == NULL) {
+ g_string_free (cbd->remain_buf, TRUE);
+ cbd->remain_buf = NULL;
+ }
+ else {
+ /* Need to erase data processed */
+ g_string_erase (cbd->remain_buf, 0, pos - cbd->remain_buf->str);
+ }
+ }
+ else {
+ pos = map->read_callback (map->pool, (gchar *)chunk, len, &cbd->cbdata);
- map->fin_callback (map->pool, &cbdata);
- *map->user_data = cbdata.cur_data;
- if (data->last_checked == (time_t)-1) {
- data->last_checked = time (NULL);
+ if (pos != NULL) {
+ /* Store data in remain buf */
+ cbd->remain_buf = g_string_new_len (pos, len - (pos - chunk));
+ }
}
- g_hash_table_destroy (repl->headers);
- g_free (repl);
+ return 0;
}
/**
}
/**
- * FSM for parsing lists
+ * Common file callback
*/
-gchar *
-abstract_parse_kv_list (rspamd_mempool_t * pool,
- gchar * chunk,
- gint len,
- struct map_cb_data *data,
- insert_func func)
+static void
+file_callback (gint fd, short what, void *ud)
{
- gchar *c, *p, *key = NULL, *value = NULL;
-
- p = chunk;
- c = p;
+ struct rspamd_map *map = ud;
+ struct file_map_data *data = map->map_data;
+ struct stat st;
+ gdouble jittered_sec;
- while (p - chunk < len) {
- switch (data->state) {
- case 0:
- /* read key */
- /* Check here comments, eol and end of buffer */
- if (*p == '#') {
- if (key != NULL && p - c >= 0) {
- value = rspamd_mempool_alloc (pool, p - c + 1);
- memcpy (value, c, p - c);
- value[p - c] = '\0';
- value = g_strstrip (value);
- func (data->cur_data, key, value);
- msg_debug ("insert kv pair: %s -> %s", key, value);
- }
- data->state = 99;
- }
- else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) {
- if (key != NULL && p - c >= 0) {
- value = rspamd_mempool_alloc (pool, p - c + 1);
- memcpy (value, c, p - c);
- value[p - c] = '\0';
+ /* Plan event again with jitter */
+ evtimer_del (&map->ev);
+ jittered_sec =
+ (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
+ double_to_tv (jittered_sec, &map->tv);
- value = g_strstrip (value);
- func (data->cur_data, key, value);
- msg_debug ("insert kv pair: %s -> %s", key, value);
- }
- else if (key == NULL && p - c > 0) {
- /* Key only line */
- key = rspamd_mempool_alloc (pool, p - c + 1);
- memcpy (key, c, p - c);
- key[p - c] = '\0';
- value = rspamd_mempool_alloc (pool, 1);
- *value = '\0';
- func (data->cur_data, key, value);
- msg_debug ("insert kv pair: %s -> %s", key, value);
- }
- data->state = 100;
- key = NULL;
- }
- else if (g_ascii_isspace (*p)) {
- if (p - c > 0) {
- key = rspamd_mempool_alloc (pool, p - c + 1);
- memcpy (key, c, p - c);
- key[p - c] = '\0';
- data->state = 2;
- }
- else {
- key = NULL;
- }
- }
- else {
- p++;
- }
- break;
- case 2:
- /* Skip spaces before value */
- if (!g_ascii_isspace (*p)) {
- c = p;
- data->state = 0;
- }
- else {
- p++;
- }
- break;
- case 99:
- /* SKIP_COMMENT */
- /* Skip comment till end of line */
- if (*p == '\r' || *p == '\n') {
- while ((*p == '\r' || *p == '\n') && p - chunk < len) {
- p++;
- }
- c = p;
- key = NULL;
- data->state = 0;
- }
- else {
- p++;
- }
- break;
- case 100:
- /* Skip \r\n and whitespaces */
- if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) {
- p++;
- }
- else {
- c = p;
- key = NULL;
- data->state = 0;
- }
- break;
- }
- }
+ evtimer_add (&map->ev, &map->tv);
- return c;
-}
-
-gchar *
-abstract_parse_list (rspamd_mempool_t * pool,
- gchar * chunk,
- gint len,
- struct map_cb_data *data,
- insert_func func)
-{
- gchar *s, *p, *str, *start;
-
- p = chunk;
- start = p;
-
- str = g_malloc (len + 1);
- s = str;
-
- while (p - chunk < len) {
- switch (data->state) {
- /* READ_SYMBOL */
- case 0:
- if (*p == '#') {
- /* Got comment */
- if (s != str) {
- /* Save previous string in lines like: "127.0.0.1 #localhost" */
- *s = '\0';
- s = rspamd_mempool_strdup (pool, g_strstrip (str));
- if (strlen (s) > 0) {
- func (data->cur_data, s, hash_fill);
- }
- s = str;
- start = p;
- }
- data->state = 1;
- }
- else if (*p == '\r' || *p == '\n') {
- /* Got EOL marker, save stored string */
- if (s != str) {
- *s = '\0';
- s = rspamd_mempool_strdup (pool, g_strstrip (str));
- if (strlen (s) > 0) {
- func (data->cur_data, s, hash_fill);
- }
- s = str;
- }
- /* Skip EOL symbols */
- while ((*p == '\r' || *p == '\n') && p - chunk < len) {
- p++;
- }
- start = p;
- }
- else {
- /* Store new string in s */
- *s = *p;
- s++;
- p++;
- }
- break;
- /* SKIP_COMMENT */
- case 1:
- /* Skip comment till end of line */
- if (*p == '\r' || *p == '\n') {
- while ((*p == '\r' || *p == '\n') && p - chunk < len) {
- p++;
- }
- s = str;
- start = p;
- data->state = 0;
- }
- else {
- p++;
- }
- break;
- }
- }
-
- g_free (str);
-
- return start;
-}
-
-/**
- * Radix tree helper function
- */
-static void
-radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
-{
- radix_tree_t *tree = st;
-
- guint32 mask = 0xFFFFFFFF;
- guint32 ip;
- gchar *token, *ipnet, *err_str, **strv, **cur;
- struct in_addr ina;
- gint k;
-
- /* Split string if there are multiple items inside a single string */
- strv = g_strsplit_set ((gchar *)key, " ,;", 0);
- cur = strv;
- while (*cur) {
- if (**cur == '\0') {
- cur++;
- continue;
- }
- /* Extract ipnet */
- ipnet = *cur;
- token = strsep (&ipnet, "/");
-
- if (ipnet != NULL) {
- errno = 0;
- /* Get mask */
- k = strtoul (ipnet, &err_str, 10);
- if (errno != 0) {
- msg_warn (
- "invalid netmask, error detected on symbol: %s, erorr: %s",
- err_str,
- strerror (errno));
- k = 32;
- }
- else if (k > 32 || k < 0) {
- msg_warn ("invalid netmask value: %d", k);
- k = 32;
- }
- /* Calculate mask based on CIDR presentation */
- mask = mask << (32 - k);
- }
-
- /* Check IP */
- if (inet_aton (token, &ina) == 0) {
- msg_err ("invalid ip address: %s", token);
- return;
- }
-
- /* Insert ip in a tree */
- ip = ntohl ((guint32) ina.s_addr);
- k = radix32tree_insert (tree, ip, mask, 1);
- if (k == -1) {
- msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa (
- ina), mask);
- }
- else if (k == 1) {
- msg_warn ("ip %s, mask %X, value already exists", inet_ntoa (
- ina), mask);
- }
- cur++;
- }
-
- g_strfreev (strv);
-}
-
-/* Helpers */
-gchar *
-read_host_list (rspamd_mempool_t * pool,
- gchar * chunk,
- gint len,
- struct map_cb_data *data)
-{
- if (data->cur_data == NULL) {
- data->cur_data = g_hash_table_new (rspamd_strcase_hash,
- rspamd_strcase_equal);
- }
- return abstract_parse_list (pool,
- chunk,
- len,
- data,
- (insert_func) g_hash_table_insert);
-}
-
-void
-fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data)
-{
- if (data->prev_data) {
- g_hash_table_destroy (data->prev_data);
- }
-}
-
-gchar *
-read_kv_list (rspamd_mempool_t * pool,
- gchar * chunk,
- gint len,
- struct map_cb_data *data)
-{
- if (data->cur_data == NULL) {
- data->cur_data = g_hash_table_new (rspamd_strcase_hash,
- rspamd_strcase_equal);
+ if (g_atomic_int_get (map->locked)) {
+ msg_info (
+ "don't try to reread map as it is locked by other process, will reread it later");
+ return;
}
- return abstract_parse_kv_list (pool,
- chunk,
- len,
- data,
- (insert_func) g_hash_table_insert);
-}
-void
-fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data)
-{
- if (data->prev_data) {
- g_hash_table_destroy (data->prev_data);
+ if (stat (data->filename,
+ &st) != -1 &&
+ (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
+ /* File was modified since last check */
+ memcpy (&data->st, &st, sizeof (struct stat));
}
-}
-
-gchar *
-read_radix_list (rspamd_mempool_t * pool,
- gchar * chunk,
- gint len,
- struct map_cb_data *data)
-{
- if (data->cur_data == NULL) {
- data->cur_data = radix_tree_create ();
+ else {
+ return;
}
- return abstract_parse_list (pool,
- chunk,
- len,
- data,
- (insert_func) radix_tree_insert_helper);
-}
-void
-fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data)
-{
- if (data->prev_data) {
- radix_tree_free (data->prev_data);
- }
+ msg_info ("rereading map file %s", data->filename);
+ read_map_file (map, data);
}
/**
- * Common file callback
+ * Async HTTP callback
*/
static void
-file_callback (gint fd, short what, void *ud)
+http_callback (gint fd, short what, void *ud)
{
struct rspamd_map *map = ud;
- struct file_map_data *data = map->map_data;
- struct stat st;
+ struct http_map_data *data = map->map_data;
+ gint sock;
+ struct http_callback_data *cbd;
gdouble jittered_sec;
/* Plan event again with jitter */
jittered_sec =
(map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
double_to_tv (jittered_sec, &map->tv);
-
evtimer_add (&map->ev, &map->tv);
if (g_atomic_int_get (map->locked)) {
return;
}
- if (stat (data->filename,
- &st) != -1 &&
- (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
- /* File was modified since last check */
- memcpy (&data->st, &st, sizeof (struct stat));
- }
- else {
- return;
- }
-
- msg_info ("rereading map file %s", data->filename);
- read_map_file (map, data);
-}
-
-/**
- * Callback for destroying HTTP callback data
- */
-static void
-free_http_cbdata (struct http_callback_data *cbd)
-{
- if (cbd->reply) {
- g_hash_table_destroy (cbd->reply->headers);
- g_free (cbd->reply);
- }
- g_atomic_int_set (cbd->map->locked, 0);
- event_del (&cbd->ev);
- close (cbd->fd);
- g_free (cbd);
-}
-
-/**
- * Async HTTP request parser
- */
-static void
-http_async_callback (gint fd, short what, void *ud)
-{
- struct http_callback_data *cbd = ud;
-
- /* Begin of connection */
- if (what == EV_WRITE) {
- if (cbd->state == 0) {
- /* Can write request */
- write_http_request (cbd->map, cbd->data, fd);
- /* Plan reading */
- event_set (&cbd->ev,
- cbd->fd,
- EV_READ | EV_PERSIST,
- http_async_callback,
- cbd);
- event_base_set (cbd->ev_base, &cbd->ev);
- cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
- cbd->tv.tv_usec = 0;
- cbd->state = 1;
- /* Allocate reply structure */
- cbd->reply = g_malloc (sizeof (struct http_reply));
- cbd->reply->parser_state = 0;
- cbd->reply->code = 404;
- cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash,
- rspamd_strcase_equal,
- g_free,
- g_free);
- cbd->cbdata.state = 0;
- cbd->cbdata.prev_data = *cbd->map->user_data;
- cbd->cbdata.cur_data = NULL;
- cbd->cbdata.map = cbd->map;
- cbd->data->rlen = 0;
- cbd->data->chunk = 0;
- cbd->data->chunk_remain = 0;
- cbd->data->chunked = FALSE;
- cbd->data->read_buf[0] = '\0';
-
- event_add (&cbd->ev, &cbd->tv);
- }
- else {
- msg_err ("bad state when got write readiness");
- free_http_cbdata (cbd);
- return;
- }
- }
- /* Got reply, parse it */
- else if (what == EV_READ) {
- if (cbd->state >= 1) {
- if (!read_http_common (cbd->map, cbd->data, cbd->reply,
- &cbd->cbdata, cbd->fd)) {
- /* Handle Not-Modified in a special way */
- if (cbd->reply->code == 304) {
- if (cbd->data->last_checked == (time_t)-1) {
- cbd->data->last_checked = time (NULL);
- }
- msg_info ("data is not modified for server %s",
- cbd->data->host);
- }
- else if (cbd->cbdata.cur_data != NULL) {
- /* Destroy old data and start reading request data */
- cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
- *cbd->map->user_data = cbd->cbdata.cur_data;
- if (cbd->data->last_checked == (time_t)-1) {
- cbd->data->last_checked = time (NULL);
- }
- }
- if (cbd->state == 1 && cbd->reply->code == 200) {
- /* Write to log that data is modified */
- msg_info ("rereading map data from %s", cbd->data->host);
- }
-
- free_http_cbdata (cbd);
- return;
- }
- else if (cbd->state == 1) {
- /* Write to log that data is modified */
- msg_info ("rereading map data from %s", cbd->data->host);
- }
- cbd->state = 2;
- }
- }
- else {
- msg_err ("connection with http server terminated incorrectly");
- free_http_cbdata (cbd);
- }
-}
-
-/**
- * Async HTTP callback
- */
-static void
-http_callback (gint fd, short what, void *ud)
-{
- struct rspamd_map *map = ud;
- struct http_map_data *data = map->map_data;
- gint sock;
- struct http_callback_data *cbd;
- gdouble jittered_sec;
-
- /* Plan event again with jitter */
- evtimer_del (&map->ev);
- jittered_sec =
- (map->cfg->map_timeout + g_random_double () * map->cfg->map_timeout);
- double_to_tv (jittered_sec, &map->tv);
- evtimer_add (&map->ev, &map->tv);
-
- if (g_atomic_int_get (map->locked)) {
- msg_info (
- "don't try to reread map as it is locked by other process, will reread it later");
- return;
- }
-
- g_atomic_int_inc (map->locked);
-
- /* Connect asynced */
- if ((sock = connect_http (map, data, TRUE)) == -1) {
- g_atomic_int_set (map->locked, 0);
+ g_atomic_int_inc (map->locked);
+
+ /* Connect asynced */
+ if ((sock = connect_http (map, data, TRUE)) == -1) {
+ g_atomic_int_set (map->locked, 0);
return;
}
else {
/* Plan event */
cbd = g_malloc (sizeof (struct http_callback_data));
cbd->ev_base = map->ev_base;
- event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
- event_base_set (cbd->ev_base, &cbd->ev);
+ cbd->map = map;
+ cbd->cbdata.state = 0;
+ cbd->cbdata.prev_data = *cbd->map->user_data;
+ cbd->cbdata.cur_data = NULL;
+ cbd->cbdata.map = cbd->map;
cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
cbd->tv.tv_usec = 0;
- cbd->map = map;
- cbd->data = data;
- cbd->state = 0;
- cbd->fd = sock;
- cbd->reply = NULL;
- event_add (&cbd->ev, &cbd->tv);
+ data->conn->ud = cbd;
+ msg_info ("rereading map data from %s", data->host);
+ write_http_request (map, data, sock, &cbd->tv);
}
}
else if (map->protocol == MAP_PROTO_HTTP) {
evtimer_set (&map->ev, http_callback, map);
event_base_set (map->ev_base, &map->ev);
- /* Read initial data */
- read_http_sync (map, map->map_data);
- /* Plan event with jitter */
- jittered_sec =
- (map->cfg->map_timeout + g_random_double () *
- map->cfg->map_timeout);
- double_to_tv (jittered_sec, &map->tv);
+ map->tv.tv_sec = 0;
+ map->tv.tv_usec = 0;
evtimer_add (&map->ev, &map->tv);
}
cur = g_list_next (cur);
hdata->host = rspamd_mempool_alloc (cfg->map_pool, hostend - def + 1);
rspamd_strlcpy (hdata->host, def, hostend - def + 1);
hdata->path = rspamd_mempool_strdup (cfg->map_pool, p);
- hdata->rlen = 0;
/* Now try to resolve */
memset (&hints, 0, sizeof (hints));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
return FALSE;
}
close (s);
+ hdata->conn = rspamd_http_connection_new (http_map_read, http_map_error,
+ http_map_finish, RSPAMD_HTTP_BODY_PARTIAL, RSPAMD_HTTP_CLIENT);
new_map->map_data = hdata;
}
/* Temp pool */
return TRUE;
}
+
+
+/**
+ * FSM for parsing lists
+ */
+gchar *
+abstract_parse_kv_list (rspamd_mempool_t * pool,
+ gchar * chunk,
+ gint len,
+ struct map_cb_data *data,
+ insert_func func)
+{
+ gchar *c, *p, *key = NULL, *value = NULL;
+
+ p = chunk;
+ c = p;
+
+ while (p - chunk < len) {
+ switch (data->state) {
+ case 0:
+ /* read key */
+ /* Check here comments, eol and end of buffer */
+ if (*p == '#') {
+ if (key != NULL && p - c >= 0) {
+ value = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (value, c, p - c);
+ value[p - c] = '\0';
+ value = g_strstrip (value);
+ func (data->cur_data, key, value);
+ msg_debug ("insert kv pair: %s -> %s", key, value);
+ }
+ data->state = 99;
+ }
+ else if (*p == '\r' || *p == '\n' || p - chunk == len - 1) {
+ if (key != NULL && p - c >= 0) {
+ value = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (value, c, p - c);
+ value[p - c] = '\0';
+
+ value = g_strstrip (value);
+ func (data->cur_data, key, value);
+ msg_debug ("insert kv pair: %s -> %s", key, value);
+ }
+ else if (key == NULL && p - c > 0) {
+ /* Key only line */
+ key = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (key, c, p - c);
+ key[p - c] = '\0';
+ value = rspamd_mempool_alloc (pool, 1);
+ *value = '\0';
+ func (data->cur_data, key, value);
+ msg_debug ("insert kv pair: %s -> %s", key, value);
+ }
+ data->state = 100;
+ key = NULL;
+ }
+ else if (g_ascii_isspace (*p)) {
+ if (p - c > 0) {
+ key = rspamd_mempool_alloc (pool, p - c + 1);
+ memcpy (key, c, p - c);
+ key[p - c] = '\0';
+ data->state = 2;
+ }
+ else {
+ key = NULL;
+ }
+ }
+ else {
+ p++;
+ }
+ break;
+ case 2:
+ /* Skip spaces before value */
+ if (!g_ascii_isspace (*p)) {
+ c = p;
+ data->state = 0;
+ }
+ else {
+ p++;
+ }
+ break;
+ case 99:
+ /* SKIP_COMMENT */
+ /* Skip comment till end of line */
+ if (*p == '\r' || *p == '\n') {
+ while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+ p++;
+ }
+ c = p;
+ key = NULL;
+ data->state = 0;
+ }
+ else {
+ p++;
+ }
+ break;
+ case 100:
+ /* Skip \r\n and whitespaces */
+ if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) {
+ p++;
+ }
+ else {
+ c = p;
+ key = NULL;
+ data->state = 0;
+ }
+ break;
+ }
+ }
+
+ return c;
+}
+
+gchar *
+abstract_parse_list (rspamd_mempool_t * pool,
+ gchar * chunk,
+ gint len,
+ struct map_cb_data *data,
+ insert_func func)
+{
+ gchar *s, *p, *str, *start;
+
+ p = chunk;
+ start = p;
+
+ str = g_malloc (len + 1);
+ s = str;
+
+ while (p - chunk < len) {
+ switch (data->state) {
+ /* READ_SYMBOL */
+ case 0:
+ if (*p == '#') {
+ /* Got comment */
+ if (s != str) {
+ /* Save previous string in lines like: "127.0.0.1 #localhost" */
+ *s = '\0';
+ s = rspamd_mempool_strdup (pool, g_strstrip (str));
+ if (strlen (s) > 0) {
+ func (data->cur_data, s, hash_fill);
+ }
+ s = str;
+ start = p;
+ }
+ data->state = 1;
+ }
+ else if (*p == '\r' || *p == '\n') {
+ /* Got EOL marker, save stored string */
+ if (s != str) {
+ *s = '\0';
+ s = rspamd_mempool_strdup (pool, g_strstrip (str));
+ if (strlen (s) > 0) {
+ func (data->cur_data, s, hash_fill);
+ }
+ s = str;
+ }
+ /* Skip EOL symbols */
+ while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+ p++;
+ }
+ start = p;
+ }
+ else {
+ /* Store new string in s */
+ *s = *p;
+ s++;
+ p++;
+ }
+ break;
+ /* SKIP_COMMENT */
+ case 1:
+ /* Skip comment till end of line */
+ if (*p == '\r' || *p == '\n') {
+ while ((*p == '\r' || *p == '\n') && p - chunk < len) {
+ p++;
+ }
+ s = str;
+ start = p;
+ data->state = 0;
+ }
+ else {
+ p++;
+ }
+ break;
+ }
+ }
+
+ g_free (str);
+
+ return start;
+}
+
+/**
+ * Radix tree helper function
+ */
+static void
+radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
+{
+ radix_tree_t *tree = st;
+
+ guint32 mask = 0xFFFFFFFF;
+ guint32 ip;
+ gchar *token, *ipnet, *err_str, **strv, **cur;
+ struct in_addr ina;
+ gint k;
+
+ /* Split string if there are multiple items inside a single string */
+ strv = g_strsplit_set ((gchar *)key, " ,;", 0);
+ cur = strv;
+ while (*cur) {
+ if (**cur == '\0') {
+ cur++;
+ continue;
+ }
+ /* Extract ipnet */
+ ipnet = *cur;
+ token = strsep (&ipnet, "/");
+
+ if (ipnet != NULL) {
+ errno = 0;
+ /* Get mask */
+ k = strtoul (ipnet, &err_str, 10);
+ if (errno != 0) {
+ msg_warn (
+ "invalid netmask, error detected on symbol: %s, erorr: %s",
+ err_str,
+ strerror (errno));
+ k = 32;
+ }
+ else if (k > 32 || k < 0) {
+ msg_warn ("invalid netmask value: %d", k);
+ k = 32;
+ }
+ /* Calculate mask based on CIDR presentation */
+ mask = mask << (32 - k);
+ }
+
+ /* Check IP */
+ if (inet_aton (token, &ina) == 0) {
+ msg_err ("invalid ip address: %s", token);
+ return;
+ }
+
+ /* Insert ip in a tree */
+ ip = ntohl ((guint32) ina.s_addr);
+ k = radix32tree_insert (tree, ip, mask, 1);
+ if (k == -1) {
+ msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa (
+ ina), mask);
+ }
+ else if (k == 1) {
+ msg_warn ("ip %s, mask %X, value already exists", inet_ntoa (
+ ina), mask);
+ }
+ cur++;
+ }
+
+ g_strfreev (strv);
+}
+
+/* Helpers */
+gchar *
+read_host_list (rspamd_mempool_t * pool,
+ gchar * chunk,
+ gint len,
+ struct map_cb_data *data)
+{
+ if (data->cur_data == NULL) {
+ data->cur_data = g_hash_table_new (rspamd_strcase_hash,
+ rspamd_strcase_equal);
+ }
+ return abstract_parse_list (pool,
+ chunk,
+ len,
+ data,
+ (insert_func) g_hash_table_insert);
+}
+
+void
+fin_host_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+ if (data->prev_data) {
+ g_hash_table_destroy (data->prev_data);
+ }
+}
+
+gchar *
+read_kv_list (rspamd_mempool_t * pool,
+ gchar * chunk,
+ gint len,
+ struct map_cb_data *data)
+{
+ if (data->cur_data == NULL) {
+ data->cur_data = g_hash_table_new (rspamd_strcase_hash,
+ rspamd_strcase_equal);
+ }
+ return abstract_parse_kv_list (pool,
+ chunk,
+ len,
+ data,
+ (insert_func) g_hash_table_insert);
+}
+
+void
+fin_kv_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+ if (data->prev_data) {
+ g_hash_table_destroy (data->prev_data);
+ }
+}
+
+gchar *
+read_radix_list (rspamd_mempool_t * pool,
+ gchar * chunk,
+ gint len,
+ struct map_cb_data *data)
+{
+ if (data->cur_data == NULL) {
+ data->cur_data = radix_tree_create ();
+ }
+ return abstract_parse_list (pool,
+ chunk,
+ len,
+ data,
+ (insert_func) radix_tree_insert_helper);
+}
+
+void
+fin_radix_list (rspamd_mempool_t * pool, struct map_cb_data *data)
+{
+ if (data->prev_data) {
+ radix_tree_free (data->prev_data);
+ }
+}