Browse Source

* Add http maps support

tags/0.2.7
Vsevolod Stakhov 15 years ago
parent
commit
ecc3b51cfd
8 changed files with 376 additions and 29 deletions
  1. 1
    1
      src/lmtp_proto.c
  2. 1
    1
      src/main.c
  3. 365
    18
      src/map.c
  4. 3
    3
      src/map.h
  5. 1
    1
      src/plugins/emails.c
  6. 1
    1
      src/plugins/surbl.c
  7. 3
    3
      src/util.c
  8. 1
    1
      src/util.h

+ 1
- 1
src/lmtp_proto.c View File

@@ -426,7 +426,7 @@ lmtp_deliver_mta (struct worker_task *task)
sock = make_unix_socket (task->cfg->deliver_host, un, FALSE);
}
else {
sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE);
sock = make_tcp_socket (&task->cfg->deliver_addr, task->cfg->deliver_port, FALSE, TRUE);
}
if (sock == -1) {
msg_warn ("lmtp_deliver_mta: cannot create socket for %s, %s", task->cfg->deliver_host, strerror (errno));

+ 1
- 1
src/main.c View File

@@ -367,7 +367,7 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path)
struct sockaddr_un *un_addr;
/* Create listen socket */
if (family == AF_INET) {
if ((listen_sock = make_tcp_socket (addr, port, TRUE)) == -1) {
if ((listen_sock = make_tcp_socket (addr, port, TRUE, TRUE)) == -1) {
msg_err ("create_listen_socket: cannot create tcp listen socket. %s", strerror (errno));
}
}

+ 365
- 18
src/map.c View File

@@ -36,16 +36,232 @@ static memory_pool_t *map_pool = NULL;
static GList *maps = NULL;
static char *hash_fill = "1";

/* Http reply */
struct http_reply {
int code;
GHashTable *headers;
char *cur_header;

int parser_state;
};

struct http_callback_data {
struct event ev;
struct timeval tv;
struct rspamd_map *map;
struct http_map_data *data;
struct http_reply *reply;
struct map_cb_data cbdata;
int state;
int fd;
};

/* Value in seconds after whitch we would try to do stat on list file */
#define MON_TIMEOUT 10
/* HTTP timeouts */
#define HTTP_CONNECT_TIMEOUT 2
#define HTTP_READ_TIMEOUT 10

static int
connect_http (struct rspamd_map *map, struct http_map_data *data, gboolean is_async)
{
int sock;

if ((sock = make_tcp_socket (&data->addr, data->port, FALSE, is_async)) == -1) {
msg_info ("connect_http: cannot connect to http server %s: %d, %s", data->host, errno, strerror (errno));
return -1;
}

return sock;
}

static void
write_http_request (struct rspamd_map *map, struct http_map_data *data, int sock)
{
char outbuf[BUFSIZ];
int r;

r = snprintf (outbuf, sizeof (outbuf), "GET %s%s HTTP/1.1" CRLF
"Connection: close" CRLF
"Host: %s" CRLF, (*data->path == '/') ? "" : "/",
data->path, data->host);
if (data->last_checked != 0) {
r += snprintf (outbuf + r, sizeof (outbuf) - r, "If-Modified-Since: %s" CRLF, asctime (gmtime (&data->last_checked)));
}

r += snprintf (outbuf + r, sizeof (outbuf) - r, CRLF);

if (write (sock, outbuf, r) == -1) {
msg_err ("write_http_request: failed to write request: %d, %s", errno, strerror (errno));
}
}

static u_char *
parse_http_reply (u_char *chunk, size_t len, struct http_reply *reply)
{
u_char *s, *p, *err_str, *tmp;
p = chunk;

while (p - chunk < len) {
switch (reply->parser_state) {
/* Search status code */
case 0:
/* Search for status code */
if (*p != ' ') {
p ++;
}
else {
/* Try to parse HTTP reply code */
reply->code = strtoul (++p, (char **)&err_str, 10);
if (*err_str != ' ') {
msg_info ("parse_http_reply: error while reading HTTP status code: %s", p);
return NULL;
}
/* Now skip to end of status string */
reply->parser_state = 1;
continue;
}
break;
/* Skip to end of line */
case 1:
if (*p == '\n') {
/* Switch to read header state */
reply->parser_state = 2;
}
/* Each skipped symbol is proceeded */
s = ++p;
break;
/* Read header value */
case 2:
if (*p == ':') {
reply->cur_header = g_malloc (p - s + 1);
g_strlcpy (reply->cur_header, s, p - s + 1);
reply->parser_state = 3;
}
else if (*p == '\r' && *(p + 1) == '\n') {
/* Last empty line */
reply->parser_state = 5;
}
p ++;
break;
/* Skip spaces after header name */
case 3:
if (*p != ' ') {
s = p;
reply->parser_state = 4;
}
else {
p ++;
}
break;
/* Read header value */
case 4:
if (*p == '\r') {
if (reply->cur_header != NULL) {
tmp = g_malloc (p - s + 1);
g_strlcpy (tmp, s, p - s + 1);
g_hash_table_insert (reply->headers, reply->cur_header, tmp);
reply->cur_header = NULL;
}
reply->parser_state = 1;
}
p ++;
break;
case 5:
/* Set pointer to begining of HTTP body */
p ++;
s = p;
reply->parser_state = 6;
break;
case 6:
/* Headers parsed, just return */
return p;
break;
}
}

return s;
}

static gboolean
read_http_common (struct rspamd_map *map, struct http_map_data *data, struct http_reply *reply, struct map_cb_data *cbdata, int fd)
{
u_char buf[BUFSIZ], *remain;
int rlen;
ssize_t r;

rlen = 0;
if ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
buf[r ++] = '\0';
remain = parse_http_reply (buf, r - 1, reply);
if (remain != NULL && remain != buf) {
/* copy remaining buffer to start of buffer */
rlen = r - (remain - buf);
memmove (buf, remain, rlen);
}
if (reply->parser_state == 6) {
if (reply->code != 200 && reply->code != 304) {
msg_err ("read_http: got error reply from server %s, %d", data->host, reply->code);
return FALSE;
}
remain = map->read_callback (map->pool, buf, r - 1, cbdata);
if (remain != NULL && remain != buf) {
/* copy remaining buffer to start of buffer */
rlen = r - (remain - buf);
memmove (buf, remain, rlen);
}
}
}
return FALSE;
}

static void
read_http_sync (struct rspamd_map *map, struct http_map_data *data)
{
struct map_cb_data cbdata;
int fd;
struct http_reply *repl;

if (map->read_callback == NULL || map->fin_callback == NULL) {
msg_err ("read_map_file: bad callback for reading map file");
return;
}

/* Connect synced */
if ((fd = connect_http (map, data, FALSE)) == -1) {
return;
}
write_http_request (map, data, fd);
cbdata.state = 0;
cbdata.prev_data = *map->user_data;
cbdata.cur_data = NULL;

repl = g_malloc (sizeof (struct http_reply));
repl->parser_state = 0;
repl->code = 404;
repl->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
while (read_http_common (map, data, repl, &cbdata, fd));

close (fd);

map->fin_callback (map->pool, &cbdata);
*map->user_data = cbdata.cur_data;
data->last_checked = time (NULL);

g_hash_table_destroy (repl->headers);
g_free (repl);
}

static void
read_map_file (struct rspamd_map *map, struct file_map_data *data)
{
struct map_cb_data cbdata;
char buf[BUFSIZ];
u_char buf[BUFSIZ], *remain;
ssize_t r;
int fd;
int fd, rlen;
if (map->read_callback == NULL || map->fin_callback == NULL) {
msg_err ("read_map_file: bad callback for reading map file");
@@ -60,10 +276,16 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data)
cbdata.state = 0;
cbdata.prev_data = *map->user_data;
cbdata.cur_data = NULL;

while ((r = read (fd, buf, sizeof (buf) - 1)) > 0) {
rlen = 0;
while ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 1)) > 0) {
buf[r ++] = '\0';
map->read_callback (map->pool, buf, r, &cbdata);
remain = map->read_callback (map->pool, buf, r - 1, &cbdata);
if (remain != NULL) {
/* copy remaining buffer to start of buffer */
rlen = r - (remain - buf);
memmove (buf, remain, rlen);
}
}
close (fd);
@@ -158,7 +380,7 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback
}
}
/* Now try to connect */
if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE)) == -1) {
if ((s = make_tcp_socket (&hdata->addr, hdata->port, FALSE, FALSE)) == -1) {
msg_info ("add_map: cannot connect to http server %s: %d, %s", hdata->host, errno, strerror (errno));
return FALSE;
}
@@ -175,17 +397,18 @@ add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin_callback

typedef void (*insert_func)(gpointer st, gconstpointer key, gpointer value);

static gboolean
static u_char*
abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data, insert_func func)
{
u_char *s, *p, *str;
u_char *s, *p, *str, *start;

p = chunk;
start = p;

str = g_malloc (len + 1);
s = str;

while (*p) {
while (p - chunk < len) {
switch (data->state) {
/* READ_SYMBOL */
case 0:
@@ -193,8 +416,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
if (s != str) {
*s = '\0';
s = memory_pool_strdup (pool, str);
func (data->cur_data, s, hash_fill);
if (strlen (s) > 0) {
func (data->cur_data, s, hash_fill);
}
s = str;
start = p;
}
data->state = 1;
}
@@ -202,8 +428,11 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
if (s != str) {
*s = '\0';
s = memory_pool_strdup (pool, str);
func (data->cur_data, s, hash_fill);
if (strlen (s) > 0) {
func (data->cur_data, s, hash_fill);
}
s = str;
start = p;
}
while (*p == '\r' || *p == '\n') {
p ++;
@@ -225,6 +454,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_
p ++;
}
s = str;
start = p;
data->state = 0;
}
else {
@@ -236,7 +466,7 @@ abstract_parse_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_

g_free (str);

return TRUE;
return start;
}

static void
@@ -280,13 +510,13 @@ radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
}
}

void
u_char *
read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data)
{
if (data->cur_data == NULL) {
data->cur_data = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
}
(void)abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert);
return abstract_parse_list (pool, chunk, len, data, (insert_func)g_hash_table_insert);
}

void
@@ -297,13 +527,13 @@ fin_host_list (memory_pool_t *pool, struct map_cb_data *data)
}
}

void
u_char *
read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data)
{
if (data->cur_data == NULL) {
data->cur_data = radix_tree_create ();
}
(void)abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper);
return abstract_parse_list (pool, chunk, len, data, (insert_func)radix_tree_insert_helper);
}

void
@@ -338,6 +568,117 @@ file_callback (int fd, short what, void *ud)
read_map_file (map, data);
}

static void
free_http_cbdata (struct http_callback_data *cbd)
{
if (cbd->reply) {
g_hash_table_destroy (cbd->reply->headers);
g_free (cbd->reply);
}
event_del (&cbd->ev);
close (cbd->fd);
g_free (cbd);
}

static void
http_async_callback (int fd, short what, void *ud)
{
struct http_callback_data *cbd = ud;
/* Begin of connection */
if (what == EV_WRITE) {
if (cbd->state == 0) {
/* Can write request */
write_http_request (cbd->map, cbd->data, fd);
/* Plan reading */
event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
cbd->tv.tv_usec = 0;
cbd->state = 1;
/* Allocate reply structure */
cbd->reply = g_malloc (sizeof (struct http_reply));
cbd->reply->parser_state = 0;
cbd->reply->code = 404;
cbd->reply->headers = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, g_free, g_free);
cbd->cbdata.state = 0;
cbd->cbdata.prev_data = *cbd->map->user_data;
cbd->cbdata.cur_data = NULL;

event_add (&cbd->ev, &cbd->tv);
}
else {
msg_err ("http_async_callback: bad state when got write readiness");
free_http_cbdata (cbd);
return;
}
}
else if (what == EV_READ) {
if (cbd->state >= 1) {
if (!read_http_common (cbd->map, cbd->data, cbd->reply, &cbd->cbdata, cbd->fd)) {
/* Handle Not-Modified in a special way */
if (cbd->reply->code == 304) {
cbd->data->last_checked = time (NULL);
msg_info ("http_async_callback: data is not modified for server %s", cbd->data->host);
}
else if (cbd->cbdata.cur_data != NULL) {
cbd->map->fin_callback (cbd->map->pool, &cbd->cbdata);
*cbd->map->user_data = cbd->cbdata.cur_data;
cbd->data->last_checked = time (NULL);
}
if (cbd->state == 1 && cbd->reply->code == 200) {
/* Write to log that data is modified */
msg_info ("http_async_callback: rereading map data from %s", cbd->data->host);
}

free_http_cbdata (cbd);
return;
}
else if (cbd->state == 1) {
/* Write to log that data is modified */
msg_info ("http_async_callback: rereading map data from %s", cbd->data->host);
}
cbd->state = 2;
}
}
else {
msg_err ("http_async_callback: connection with http server terminated incorrectly");
free_http_cbdata (cbd);
}
}


static void
http_callback (int fd, short what, void *ud)
{
struct rspamd_map *map = ud;
struct http_map_data *data = map->map_data;
int sock;
struct http_callback_data *cbd;

/* Plan event again with jitter */
evtimer_del (&map->ev);
map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double ();
map->tv.tv_usec = 0;
evtimer_add (&map->ev, &map->tv);

/* Connect asynced */
if ((sock = connect_http (map, data, TRUE)) == -1) {
return;
}
else {
/* Plan event */
cbd = g_malloc (sizeof (struct http_callback_data));
event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
cbd->tv.tv_usec = 0;
cbd->map = map;
cbd->data = data;
cbd->state = 0;
cbd->fd = sock;
event_add (&cbd->ev, &cbd->tv);
}
}

/* Start watching event for all maps */
void
start_map_watch (void)
@@ -357,8 +698,14 @@ start_map_watch (void)
map->tv.tv_usec = 0;
evtimer_add (&map->ev, &map->tv);
}
else {
/* XXX */
else if (map->protocol == PROTO_HTTP) {
evtimer_set (&map->ev, http_callback, map);
/* Read initial data */
read_http_sync (map, map->map_data);
/* Plan event with jitter */
map->tv.tv_sec = MON_TIMEOUT + MON_TIMEOUT * g_random_double ();
map->tv.tv_usec = 0;
evtimer_add (&map->ev, &map->tv);
}
cur = g_list_next (cur);
}

+ 3
- 3
src/map.h View File

@@ -29,7 +29,7 @@ struct http_map_data {
time_t last_checked;
};

typedef void (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
typedef u_char* (*map_cb_t)(memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
typedef void (*map_fin_cb_t)(memory_pool_t *pool, struct map_cb_data *data);

struct rspamd_map {
@@ -47,9 +47,9 @@ gboolean add_map (const char *map_line, map_cb_t read_callback, map_fin_cb_t fin
void start_map_watch (void);

/* Common callbacks */
void read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
u_char* read_radix_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
void fin_radix_list (memory_pool_t *pool, struct map_cb_data *data);
void read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
u_char* read_host_list (memory_pool_t *pool, u_char *chunk, size_t len, struct map_cb_data *data);
void fin_host_list (memory_pool_t *pool, struct map_cb_data *data);

#endif

+ 1
- 1
src/plugins/emails.c View File

@@ -68,7 +68,7 @@ emails_module_init (struct config_file *cfg, struct module_ctx **ctx)
email_module_ctx->filter = emails_mime_filter;
email_module_ctx->email_pool = memory_pool_new (memory_pool_get_size ());
email_module_ctx->email_re = g_regex_new (email_re_text, G_REGEX_RAW | G_REGEX_OPTIMIZE | G_REGEX_CASELESS, 0, &err);
email_module_ctx->blacklist = g_hash_table_new (g_str_hash, g_str_equal);
email_module_ctx->blacklist = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
*ctx = (struct module_ctx *)email_module_ctx;

+ 1
- 1
src/plugins/surbl.c View File

@@ -705,7 +705,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
struct redirector_param *param;
struct timeval *timeout;

s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE);
s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE, TRUE);

if (s == -1) {
msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s",

+ 3
- 3
src/util.c View File

@@ -50,7 +50,7 @@ make_socket_nonblocking (int fd)
}

int
make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server, gboolean async)
{
int fd, r, optlen, on = 1, s_error;
int serrno;
@@ -63,7 +63,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
return -1;
}

if (make_socket_nonblocking(fd) < 0) {
if (async && make_socket_nonblocking(fd) < 0) {
goto out;
}
@@ -87,7 +87,7 @@ make_tcp_socket (struct in_addr *addr, u_short port, gboolean is_server)
}

if (r == -1) {
if (errno != EINPROGRESS) {
if (!async || errno != EINPROGRESS) {
msg_warn ("make_tcp_socket: bind/connect failed: %d, '%s'", errno, strerror (errno));
goto out;
}

+ 1
- 1
src/util.h View File

@@ -10,7 +10,7 @@ struct rspamd_main;
struct workq;

/* Create socket and bind or connect it to specified address and port */
int make_tcp_socket (struct in_addr *, u_short, gboolean is_server);
int make_tcp_socket (struct in_addr *, u_short, gboolean is_server, gboolean async);
/* Accept from socket */
int accept_from_socket (int listen_sock, struct sockaddr *addr, socklen_t *len);
/* Create and bind or connect unix socket */

Loading…
Cancel
Save