diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-11-01 18:01:05 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-11-01 18:01:05 +0300 |
commit | 2aa9c74f1c449da92f6faf870f8cc801a83bb08b (patch) | |
tree | 33f0f941f08583fd0c4c3653cadde8d6ce8426c2 /src/plugins/surbl.c | |
parent | cc5343692b448c27485a24ea7f1b24d714bb82f6 (diff) | |
download | rspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.tar.gz rspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.zip |
* Reorganize structure of source files
* Adopt build system for new structure
--HG--
rename : cfg_file.h => src/cfg_file.h
rename : cfg_file.l => src/cfg_file.l
rename : cfg_file.y => src/cfg_file.y
rename : cfg_utils.c => src/cfg_utils.c
rename : controller.c => src/controller.c
rename : filter.c => src/filter.c
rename : filter.h => src/filter.h
rename : fstring.c => src/fstring.c
rename : fstring.h => src/fstring.h
rename : main.c => src/main.c
rename : main.h => src/main.h
rename : mem_pool.c => src/mem_pool.c
rename : mem_pool.h => src/mem_pool.h
rename : memcached-test.c => src/memcached-test.c
rename : memcached.c => src/memcached.c
rename : memcached.h => src/memcached.h
rename : perl.c => src/perl.c
rename : perl.h => src/perl.h
rename : plugins/regexp.c => src/plugins/regexp.c
rename : plugins/surbl.c => src/plugins/surbl.c
rename : protocol.c => src/protocol.c
rename : protocol.h => src/protocol.h
rename : upstream.c => src/upstream.c
rename : upstream.h => src/upstream.h
rename : url.c => src/url.c
rename : url.h => src/url.h
rename : util.c => src/util.c
rename : util.h => src/util.h
rename : worker.c => src/worker.c
Diffstat (limited to 'src/plugins/surbl.c')
-rw-r--r-- | src/plugins/surbl.c | 593 |
1 files changed, 593 insertions, 0 deletions
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c new file mode 100644 index 000000000..c90a8419e --- /dev/null +++ b/src/plugins/surbl.c @@ -0,0 +1,593 @@ +/***MODULE:surbl + * rspamd module that implements SURBL url checking + */ + +#include <sys/types.h> +#include <sys/time.h> +#include <sys/wait.h> +#include <sys/param.h> + +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <syslog.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> + +#include <evdns.h> + +#include "../config.h" +#include "../main.h" +#include "../modules.h" +#include "../cfg_file.h" +#include "../memcached.h" + +#define DEFAULT_REDIRECTOR_PORT 8080 +#define DEFAULT_SURBL_WEIGHT 10 +#define DEFAULT_REDIRECTOR_CONNECT_TIMEOUT 1000 +#define DEFAULT_REDIRECTOR_READ_TIMEOUT 5000 +#define DEFAULT_SURBL_MAX_URLS 1000 +#define DEFAULT_SURBL_URL_EXPIRE 86400 +#define DEFAULT_SURBL_SYMBOL "SURBL_DNS" +#define DEFAULT_SURBL_SUFFIX "multi.surbl.org" + +struct surbl_ctx { + int (*header_filter)(struct worker_task *task); + int (*mime_filter)(struct worker_task *task); + int (*message_filter)(struct worker_task *task); + int (*url_filter)(struct worker_task *task); + struct in_addr redirector_addr; + uint16_t redirector_port; + uint16_t weight; + unsigned int connect_timeout; + unsigned int read_timeout; + unsigned int max_urls; + unsigned int url_expire; + char *suffix; + char *symbol; + char *metric; + GHashTable *hosters; + GHashTable *whitelist; + unsigned use_redirector; + memory_pool_t *surbl_pool; +}; + +struct redirector_param { + struct uri *url; + struct worker_task *task; + enum { + STATE_CONNECT, + STATE_READ, + } state; + struct event ev; + int sock; +}; + +struct memcached_param { + struct uri *url; + struct worker_task *task; + memcached_ctx_t *ctx; +}; + +static char *hash_fill = "1"; +struct surbl_ctx *surbl_module_ctx; +GRegex *extract_hoster_regexp, *extract_normal_regexp, *extract_numeric_regexp; + +static int surbl_test_url (struct worker_task *task); + +int +surbl_module_init (struct config_file *cfg, struct module_ctx **ctx) +{ + GError *err = NULL; + + surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx)); + + surbl_module_ctx->header_filter = NULL; + surbl_module_ctx->mime_filter = NULL; + surbl_module_ctx->message_filter = NULL; + surbl_module_ctx->url_filter = surbl_test_url; + surbl_module_ctx->use_redirector = 0; + surbl_module_ctx->surbl_pool = memory_pool_new (1024); + + surbl_module_ctx->hosters = g_hash_table_new (g_str_hash, g_str_equal); + /* Register destructors */ + memory_pool_add_destructor (surbl_module_ctx->surbl_pool, (pool_destruct_func)g_hash_table_remove_all, surbl_module_ctx->hosters); + + surbl_module_ctx->whitelist = g_hash_table_new (g_str_hash, g_str_equal); + /* Register destructors */ + memory_pool_add_destructor (surbl_module_ctx->surbl_pool, (pool_destruct_func)g_hash_table_remove_all, surbl_module_ctx->whitelist); + + /* Init matching regexps */ + extract_hoster_regexp = g_regex_new ("([^.]+)\\.([^.]+)\\.([^.]+)$", G_REGEX_RAW, 0, &err); + extract_normal_regexp = g_regex_new ("([^.]+)\\.([^.]+)$", G_REGEX_RAW, 0, &err); + extract_numeric_regexp = g_regex_new ("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})$", G_REGEX_RAW, 0, &err); + + *ctx = (struct module_ctx *)surbl_module_ctx; + + return 0; +} + +int +surbl_module_config (struct config_file *cfg) +{ + struct hostent *hent; + + char *value, *cur_tok, *str; + + evdns_init (); + + if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) { + str = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + cur_tok = strsep (&str, ":"); + if (!inet_aton (cur_tok, &surbl_module_ctx->redirector_addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent != NULL) { + memcpy((char *)&surbl_module_ctx->redirector_addr, hent->h_addr, sizeof(struct in_addr)); + if (str != NULL) { + surbl_module_ctx->redirector_port = (uint16_t)strtoul (str, NULL, 10); + } + else { + surbl_module_ctx->redirector_port = DEFAULT_REDIRECTOR_PORT; + } + surbl_module_ctx->use_redirector = 1; + } + } + /* Free cur_tok as it is actually initial str after strsep */ + free (cur_tok); + } + if ((value = get_module_opt (cfg, "surbl", "weight")) != NULL) { + surbl_module_ctx->weight = atoi (value); + } + else { + surbl_module_ctx->weight = DEFAULT_SURBL_WEIGHT; + } + if ((value = get_module_opt (cfg, "surbl", "url_expire")) != NULL) { + surbl_module_ctx->url_expire = atoi (value); + } + else { + surbl_module_ctx->url_expire = DEFAULT_SURBL_URL_EXPIRE; + } + if ((value = get_module_opt (cfg, "surbl", "redirector_connect_timeout")) != NULL) { + surbl_module_ctx->connect_timeout = parse_seconds (value); + } + else { + surbl_module_ctx->connect_timeout = DEFAULT_REDIRECTOR_CONNECT_TIMEOUT; + } + if ((value = get_module_opt (cfg, "surbl", "redirector_read_timeout")) != NULL) { + surbl_module_ctx->read_timeout = parse_seconds (value); + } + else { + surbl_module_ctx->read_timeout = DEFAULT_REDIRECTOR_READ_TIMEOUT; + } + if ((value = get_module_opt (cfg, "surbl", "max_urls")) != NULL) { + surbl_module_ctx->max_urls = atoi (value); + } + else { + surbl_module_ctx->max_urls = DEFAULT_SURBL_MAX_URLS; + } + if ((value = get_module_opt (cfg, "surbl", "suffix")) != NULL) { + surbl_module_ctx->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + g_free (value); + } + else { + surbl_module_ctx->suffix = DEFAULT_SURBL_SUFFIX; + } + if ((value = get_module_opt (cfg, "surbl", "symbol")) != NULL) { + surbl_module_ctx->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + g_free (value); + } + else { + surbl_module_ctx->symbol = DEFAULT_SURBL_SYMBOL; + } + if ((value = get_module_opt (cfg, "surbl", "metric")) != NULL) { + surbl_module_ctx->metric = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + g_free (value); + } + else { + surbl_module_ctx->metric = DEFAULT_METRIC; + } + if ((value = get_module_opt (cfg, "surbl", "hostings")) != NULL) { + char comment_flag = 0; + str = value; + while (*value ++) { + if (*value == '#') { + comment_flag = 1; + } + if (*value == '\r' || *value == '\n' || *value == ',') { + if (!comment_flag && str != value) { + g_hash_table_insert (surbl_module_ctx->hosters, g_strstrip(str), hash_fill); + str = value + 1; + } + else if (*value != ',') { + comment_flag = 0; + str = value + 1; + } + } + } + } + if ((value = get_module_opt (cfg, "surbl", "whitelist")) != NULL) { + char comment_flag = 0; + str = value; + while (*value ++) { + if (*value == '#') { + comment_flag = 1; + } + if (*value == '\r' || *value == '\n' || *value == ',') { + if (!comment_flag && str != value) { + g_hash_table_insert (surbl_module_ctx->whitelist, g_strstrip(str), hash_fill); + str = value + 1; + } + else if (*value != ',') { + comment_flag = 0; + str = value + 1; + } + } + } + } +} + +int +surbl_module_reconfig (struct config_file *cfg) +{ + memory_pool_delete (surbl_module_ctx->surbl_pool); + surbl_module_ctx->surbl_pool = memory_pool_new (1024); + + return surbl_module_config (cfg); +} + +static char * +format_surbl_request (char *hostname) +{ + GMatchInfo *info; + char *result; + + result = g_malloc (strlen (hostname) + strlen (surbl_module_ctx->suffix) + 1); + + /* First try to match numeric expression */ + if (g_regex_match (extract_numeric_regexp, hostname, 0, &info) == TRUE) { + gchar *octet1, *octet2, *octet3, *octet4; + octet1 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + octet2 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + octet3 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + octet4 = g_match_info_fetch (info, 0); + g_match_info_free (info); + sprintf (result, "%s.%s.%s.%s.%s", octet4, octet3, octet2, octet1, surbl_module_ctx->suffix); + g_free (octet1); + g_free (octet2); + g_free (octet3); + g_free (octet4); + return result; + } + g_match_info_free (info); + /* Try to match normal domain */ + if (g_regex_match (extract_normal_regexp, hostname, 0, &info) == TRUE) { + gchar *part1, *part2; + part1 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + part2 = g_match_info_fetch (info, 0); + g_match_info_free (info); + sprintf (result, "%s.%s", part1, part2); + if (g_hash_table_lookup (surbl_module_ctx->hosters, result) != NULL) { + /* Match additional part for hosters */ + g_free (part1); + g_free (part2); + if (g_regex_match (extract_hoster_regexp, hostname, 0, &info) == TRUE) { + gchar *hpart1, *hpart2, *hpart3; + hpart1 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + hpart2 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + hpart3 = g_match_info_fetch (info, 0); + g_match_info_free (info); + sprintf (result, "%s.%s.%s.%s", hpart1, hpart2, hpart3, surbl_module_ctx->suffix); + g_free (hpart1); + g_free (hpart2); + g_free (hpart3); + return result; + } + return NULL; + } + g_free (part1); + g_free (part2); + return result; + } + + return NULL; +} + +static void +dns_callback (int result, char type, int count, int ttl, void *addresses, void *data) +{ + struct memcached_param *param = (struct memcached_param *)data; + + /* If we have result from DNS server, this url exists in SURBL, so increase score */ + if (result != DNS_ERR_NONE || type != DNS_IPv4_A) { + msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix); + insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1); + } + else { + insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 0); + } + + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); +} + +static void +memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + struct memcached_param *param = (struct memcached_param *)data; + int *url_count; + char *surbl_req; + + switch (ctx->op) { + case CMD_CONNECT: + if (error != OK) { + msg_info ("memcached_callback: memcached returned error %s on CONNECT stage"); + memc_close_ctx (param->ctx); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); + } + else { + memc_get (param->ctx, param->ctx->param); + } + break; + case CMD_READ: + if (error != OK) { + msg_info ("memcached_callback: memcached returned error %s on READ stage"); + memc_close_ctx (param->ctx); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); + } + else { + url_count = (int *)param->ctx->param->buf; + /* Do not check DNS for urls that have count more than max_urls */ + if (*url_count > surbl_module_ctx->max_urls) { + msg_info ("memcached_callback: url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls); + insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1); + } + (*url_count) ++; + memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire); + } + break; + case CMD_WRITE: + if (error != OK) { + msg_info ("memcached_callback: memcached returned error %s on WRITE stage"); + } + memc_close_ctx (param->ctx); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + if ((surbl_req = format_surbl_request (param->url->host)) != NULL) { + param->task->save.saved ++; + evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param); + return; + } + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); + break; + } +} + +static void +register_memcached_call (struct uri *url, struct worker_task *task) +{ + struct memcached_param *param; + struct memcached_server *selected; + memcached_param_t *cur_param; + gchar *sum_str; + int *url_count; + + param = g_malloc (sizeof (struct memcached_param)); + cur_param = g_malloc (sizeof (memcached_param_t)); + url_count = g_malloc (sizeof (int)); + + param->url = url; + param->task = task; + + param->ctx = g_malloc (sizeof (memcached_ctx_t)); + bzero (param->ctx, sizeof (memcached_ctx_t)); + bzero (cur_param, sizeof (memcached_param_t)); + + cur_param->buf = (u_char *)url_count; + cur_param->bufsize = sizeof (int); + + sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1); + strlcpy (cur_param->key, sum_str, sizeof (cur_param->key)); + g_free (sum_str); + + selected = (struct memcached_server *) get_upstream_by_hash ((void *)task->cfg->memcached_servers, + task->cfg->memcached_servers_num, sizeof (struct memcached_server), + time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors, + cur_param->key, strlen(cur_param->key)); + param->ctx->callback = memcached_callback; + param->ctx->callback_data = (void *)param; + param->ctx->protocol = task->cfg->memcached_protocol; + memcpy(¶m->ctx->addr, &selected->addr, sizeof (struct in_addr)); + param->ctx->port = selected->port; + param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000; + param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000; + param->ctx->sock = -1; +#ifdef WITH_DEBUG + param->ctx->options = MEMC_OPT_DEBUG; +#else + param->ctx->options = 0; +#endif + param->ctx->param = cur_param; + memc_init_ctx (param->ctx); +} + +static void +redirector_callback (int fd, short what, void *arg) +{ + struct redirector_param *param = (struct redirector_param *)arg; + char url_buf[1024]; + int r; + struct timeval timeout; + char *p, *c; + + switch (param->state) { + case STATE_CONNECT: + /* We have write readiness after connect call, so reinit event */ + if (what == EV_WRITE) { + timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000; + timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000; + event_del (¶m->ev); + event_set (¶m->ev, param->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, redirector_callback, (void *)param); + event_add (¶m->ev, &timeout); + r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url)); + write (param->sock, url_buf, r); + param->state = STATE_READ; + } + else { + event_del (¶m->ev); + msg_info ("redirector_callback: connection to redirector timed out"); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param); + } + break; + case STATE_READ: + if (what == EV_READ) { + r = read (param->sock, url_buf, sizeof (url_buf)); + if ((p = strstr (url_buf, "Uri: ")) != NULL) { + p += sizeof ("Uri: ") - 1; + c = p; + while (p++ < url_buf + sizeof (url_buf) - 1) { + if (*p == '\r' || *p == '\n') { + *p = '\0'; + break; + } + } + if (*p == '\0') { + msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c); + parse_uri (param->url, c, param->task->task_pool); + register_memcached_call (param->url, param->task); + param->task->save.saved ++; + } + } + event_del (¶m->ev); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param); + } + else { + event_del (¶m->ev); + msg_info ("redirector_callback: reading redirector timed out"); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param); + } + break; + } +} + + +static void +register_redirector_call (struct uri *url, struct worker_task *task) +{ + struct sockaddr_in sc; + int ofl, r, s; + struct redirector_param *param; + struct timeval timeout; + + bzero (&sc, sizeof (struct sockaddr_in *)); + sc.sin_family = AF_INET; + sc.sin_port = surbl_module_ctx->redirector_port; + memcpy (&sc.sin_addr, &surbl_module_ctx->redirector_addr, sizeof (struct in_addr)); + + s = socket (PF_INET, SOCK_STREAM, 0); + + if (s == -1) { + msg_info ("register_redirector_call: socket() failed: %m"); + return; + } + + /* set nonblocking */ + ofl = fcntl(s, F_GETFL, 0); + fcntl(s, F_SETFL, ofl | O_NONBLOCK); + + if ((r = connect (s, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) { + if (errno != EINPROGRESS) { + close (s); + msg_info ("register_redirector_call: connect() failed: %m"); + } + } + param = g_malloc (sizeof (struct redirector_param)); + param->url = url; + param->task = task; + param->state = STATE_READ; + param->sock = s; + timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000; + timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000; + event_set (¶m->ev, s, EV_WRITE | EV_TIMEOUT, redirector_callback, (void *)param); + event_add (¶m->ev, &timeout); +} + +static int +surbl_test_url (struct worker_task *task) +{ + struct uri *url; + + TAILQ_FOREACH (url, &task->urls, next) { + if (surbl_module_ctx->use_redirector) { + register_redirector_call (url, task); + } + else { + register_memcached_call (url, task); + } + task->save.saved++; + } + return 0; +} + +/* + * vi:ts=4 + */ |