summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-09-03 19:13:24 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-09-03 19:13:24 +0400
commitb58d2b7e4d76f3c6a60dbb3d49ad782d53b239e7 (patch)
treeb8fea2eb7c1db9f4760d472d04b26d1daafe74ce
parent2809ad4747b7e3a3795aec4e433d7754c7efc365 (diff)
downloadrspamd-b58d2b7e4d76f3c6a60dbb3d49ad782d53b239e7.tar.gz
rspamd-b58d2b7e4d76f3c6a60dbb3d49ad782d53b239e7.zip
* Add initial implementation of surbl check module
-rw-r--r--Makefile.in2
-rw-r--r--cfg_file.h4
-rw-r--r--cfg_file.l77
-rw-r--r--cfg_utils.c98
-rw-r--r--config.h.in3
-rwxr-xr-xconfigure12
-rw-r--r--main.c2
-rw-r--r--main.h2
-rw-r--r--plugins/surbl.c400
-rw-r--r--url.c4
-rw-r--r--url.h2
-rw-r--r--worker.c26
12 files changed, 536 insertions, 96 deletions
diff --git a/Makefile.in b/Makefile.in
index df7400786..92d8683a1 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -30,7 +30,7 @@ clean:
dist-clean: clean
rm -f Makefile
rm -f config.log
- rm -f md5.h md5.c strlcpy.h strlcpy.c queue.h config.h modules.c
+ rm -f md5.h md5.c strlcpy.h strlcpy.c queue.h config.h modules.c modules.h
cd perl && rm -f Makefile.old && rm -f Makefile.PL && cd ..
creategroup:
diff --git a/cfg_file.h b/cfg_file.h
index 401593584..ff4eeb0d7 100644
--- a/cfg_file.h
+++ b/cfg_file.h
@@ -122,6 +122,10 @@ int parse_bind_line (struct config_file *cf, char *str);
void init_defaults (struct config_file *cfg);
void free_config (struct config_file *cfg);
int parse_script (char *str, struct script_param *param, enum script_type type);
+char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name);
+size_t parse_limit (const char *limit);
+unsigned int parse_seconds (const char *t);
+char parse_flag (const char *str);
int yylex (void);
int yyparse (void);
diff --git a/cfg_file.l b/cfg_file.l
index 8cde589b5..89ca8a171 100644
--- a/cfg_file.l
+++ b/cfg_file.l
@@ -14,83 +14,6 @@
YY_BUFFER_STATE include_stack[MAX_INCLUDE_DEPTH];
int include_stack_ptr = 0;
-static size_t
-parse_limit (const char *limit)
-{
- size_t result = 0;
- char *err_str;
-
- if (!limit || *limit == '\0') return 0;
-
- result = strtoul (limit, &err_str, 10);
-
- if (*err_str != '\0') {
- /* Megabytes */
- if (*err_str == 'm' || *err_str == 'M') {
- result *= 1048576L;
- }
- /* Kilobytes */
- else if (*err_str == 'k' || *err_str == 'K') {
- result *= 1024;
- }
- /* Gigabytes */
- else if (*err_str == 'g' || *err_str == 'G') {
- result *= 1073741824L;
- }
- }
-
- return result;
-}
-
-static unsigned int
-parse_seconds (const char *t)
-{
- unsigned int result = 0;
- char *err_str;
-
- if (!t || *t == '\0') return 0;
-
- result = strtoul (t, &err_str, 10);
-
- if (*err_str != '\0') {
- /* Seconds */
- if (*err_str == 's' || *err_str == 'S') {
- result *= 1000;
- }
- }
-
- return result;
-}
-
-static char
-parse_flag (const char *str)
-{
- if (!str || !*str) return -1;
-
- if ((*str == 'Y' || *str == 'y') && *(str + 1) == '\0') {
- return 1;
- }
-
- if ((*str == 'Y' || *str == 'y') &&
- (*(str + 1) == 'E' || *(str + 1) == 'e') &&
- (*(str + 2) == 'S' || *(str + 2) == 's') &&
- *(str + 3) == '\0') {
- return 1;
- }
-
- if ((*str == 'N' || *str == 'n') && *(str + 1) == '\0') {
- return 0;
- }
-
- if ((*str == 'N' || *str == 'n') &&
- (*(str + 1) == 'O' || *(str + 1) == 'o') &&
- *(str + 2) == '\0') {
- return 0;
- }
-
- return -1;
-}
-
%}
%option noyywrap
diff --git a/cfg_utils.c b/cfg_utils.c
index 83738aea6..035980624 100644
--- a/cfg_utils.c
+++ b/cfg_utils.c
@@ -40,6 +40,7 @@ clean_hash_bucket (gpointer key, gpointer value, gpointer unused)
LIST_REMOVE (cur, next);
free (cur);
}
+ free (cur_module_opt);
}
int
@@ -222,6 +223,103 @@ parse_script (char *str, struct script_param *param, enum script_type type)
return 0;
}
+char*
+get_module_opt (struct config_file *cfg, char *module_name, char *opt_name)
+{
+ LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+ struct module_opt *cur;
+
+ cur_module_opt = g_hash_table_lookup (cfg->modules_opts, module_name);
+ if (cur_module_opt == NULL) {
+ return NULL;
+ }
+
+ LIST_FOREACH (cur, cur_module_opt, next) {
+ if (strcmp (cur->param, opt_name) == 0) {
+ return cur->value;
+ }
+ }
+
+ return NULL;
+}
+
+size_t
+parse_limit (const char *limit)
+{
+ size_t result = 0;
+ char *err_str;
+
+ if (!limit || *limit == '\0') return 0;
+
+ result = strtoul (limit, &err_str, 10);
+
+ if (*err_str != '\0') {
+ /* Megabytes */
+ if (*err_str == 'm' || *err_str == 'M') {
+ result *= 1048576L;
+ }
+ /* Kilobytes */
+ else if (*err_str == 'k' || *err_str == 'K') {
+ result *= 1024;
+ }
+ /* Gigabytes */
+ else if (*err_str == 'g' || *err_str == 'G') {
+ result *= 1073741824L;
+ }
+ }
+
+ return result;
+}
+
+unsigned int
+parse_seconds (const char *t)
+{
+ unsigned int result = 0;
+ char *err_str;
+
+ if (!t || *t == '\0') return 0;
+
+ result = strtoul (t, &err_str, 10);
+
+ if (*err_str != '\0') {
+ /* Seconds */
+ if (*err_str == 's' || *err_str == 'S') {
+ result *= 1000;
+ }
+ }
+
+ return result;
+}
+
+char
+parse_flag (const char *str)
+{
+ if (!str || !*str) return -1;
+
+ if ((*str == 'Y' || *str == 'y') && *(str + 1) == '\0') {
+ return 1;
+ }
+
+ if ((*str == 'Y' || *str == 'y') &&
+ (*(str + 1) == 'E' || *(str + 1) == 'e') &&
+ (*(str + 2) == 'S' || *(str + 2) == 's') &&
+ *(str + 3) == '\0') {
+ return 1;
+ }
+
+ if ((*str == 'N' || *str == 'n') && *(str + 1) == '\0') {
+ return 0;
+ }
+
+ if ((*str == 'N' || *str == 'n') &&
+ (*(str + 1) == 'O' || *(str + 1) == 'o') &&
+ *(str + 2) == '\0') {
+ return 0;
+ }
+
+ return -1;
+}
+
/*
* vi:ts=4
*/
diff --git a/config.h.in b/config.h.in
index 7b17b3427..36d67d56e 100644
--- a/config.h.in
+++ b/config.h.in
@@ -2,10 +2,11 @@
/* Forwarded declaration */
struct module_ctx;
+struct config_file;
typedef struct module_s {
const char *name;
- int (*module_init_func)(struct module_ctx *ctx);
+ int (*module_init_func)(struct config_file *cfg, struct module_ctx **ctx);
} module_t;
extern module_t modules[];
diff --git a/configure b/configure
index 776c15e1c..7130afa70 100755
--- a/configure
+++ b/configure
@@ -21,8 +21,8 @@ YACC_OUTPUT="cfg_yacc.c"
LEX_OUTPUT="cfg_lex.c"
CONFIG="config.h"
-SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c url.c perl.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
-MODULES=""
+SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c url.c perl.c plugins/surbl.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
+MODULES="surbl"
CFLAGS="$CFLAGS -W -Wpointer-arith -Wno-unused-parameter"
CFLAGS="$CFLAGS -Wno-unused-function -Wunused-variable -Wno-sign-compare"
@@ -414,14 +414,18 @@ write_result()
echo "#define RVERSION \"${VERSION}\"" >> $CONFIG
echo "#define HASH_COMPAT" >> $CONFIG
# Write modules init function
- echo "#include \"config.h\"" > modules.c
+ echo "#ifndef MODULES_H" > modules.h
+ echo "#include \"config.h\"" >> modules.h
+ echo "#include \"modules.h\"" > modules.c
echo "module_t modules[] = {" >> modules.c;
modules_num=0
for m in $MODULES ; do
echo "{\"${m}\", ${m}_module_init}," >> modules.c
+ echo "int ${m}_module_init(struct config_file *cfg, struct module_ctx **ctx);" >> modules.h
modules_num=`expr $modules_num + 1`
done
echo "};" >> modules.c
+ echo "#endif" >> modules.h
echo "#define MODULES_NUM $modules_num" >> $CONFIG
SOURCES="$SOURCES modules.c"
OBJECTS=`echo $SOURCES | sed -e 's/\.c/\.o/g'`
@@ -499,7 +503,7 @@ END
SO=`echo $o | sed -e 's/\.o/\.c/g'`
cat >> $MAKEFILE << END
${o}: \$(DEPS) ${SO}
- \$(CC) \$(OPT_FLAGS) \$(CFLAGS) \$(PTHREAD_CFLAGS) -c ${SO}
+ \$(CC) \$(OPT_FLAGS) \$(CFLAGS) \$(PTHREAD_CFLAGS) -o ${o} -c ${SO}
END
done
diff --git a/main.c b/main.c
index ecae9f139..dc1907f85 100644
--- a/main.c
+++ b/main.c
@@ -230,7 +230,7 @@ main (int argc, char **argv)
for (i = 0; i < MODULES_NUM; i ++) {
cur_module = g_malloc (sizeof (struct c_module));
cur_module->name = modules[i].name;
- if (modules[i].module_init_func(cur_module->ctx) == 0) {
+ if (modules[i].module_init_func(cfg, &cur_module->ctx) == 0) {
LIST_INSERT_HEAD (&cfg->c_modules, cur_module, next);
}
}
diff --git a/main.h b/main.h
index 6e973ef20..ddfbf959f 100644
--- a/main.h
+++ b/main.h
@@ -97,7 +97,7 @@ struct save_point {
enum { C_FILTER, PERL_FILTER } save_type;
void *entry;
void *chain;
- unsigned saved:1;
+ unsigned int saved;
};
struct worker_task {
diff --git a/plugins/surbl.c b/plugins/surbl.c
new file mode 100644
index 000000000..266816bef
--- /dev/null
+++ b/plugins/surbl.c
@@ -0,0 +1,400 @@
+/***MODULE:surbl
+ * rspamd module that implements SURBL url checking
+ */
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <sys/param.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <stdlib.h>
+
+#include "../config.h"
+#include "../main.h"
+#include "../modules.h"
+#include "../cfg_file.h"
+#include "../memcached.h"
+
+#define DEFAULT_REDIRECTOR_PORT 8080
+#define DEFAULT_SURBL_WEIGHT 10
+#define DEFAULT_REDIRECTOR_CONNECT_TIMEOUT 1000
+#define DEFAULT_REDIRECTOR_READ_TIMEOUT 5000
+#define DEFAULT_SURBL_MAX_URLS 1000
+#define DEFAULT_SURBL_URL_EXPIRE 86400
+
+struct surbl_ctx {
+ int (*header_filter)(struct worker_task *task);
+ int (*mime_filter)(struct worker_task *task);
+ int (*message_filter)(struct worker_task *task);
+ int (*url_filter)(struct worker_task *task);
+ struct in_addr redirector_addr;
+ uint16_t redirector_port;
+ uint16_t weight;
+ unsigned int connect_timeout;
+ unsigned int read_timeout;
+ unsigned int max_urls;
+ unsigned int url_expire;
+ unsigned use_redirector:1;
+};
+
+struct redirector_param {
+ struct uri *url;
+ struct worker_task *task;
+ enum {
+ STATE_CONNECT,
+ STATE_READ,
+ } state;
+ struct event ev;
+ int sock;
+};
+
+struct memcached_param {
+ struct uri *url;
+ struct worker_task *task;
+ memcached_ctx_t *ctx;
+};
+
+struct surbl_ctx *surbl_module_ctx;
+
+static int surbl_test_url (struct worker_task *task);
+
+int
+surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
+{
+ struct hostent *hent;
+
+ char *value, *cur_tok, *str;
+
+ surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx));
+
+ surbl_module_ctx->header_filter = NULL;
+ surbl_module_ctx->mime_filter = NULL;
+ surbl_module_ctx->message_filter = NULL;
+ surbl_module_ctx->url_filter = surbl_test_url;
+ surbl_module_ctx->use_redirector = 0;
+
+ if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) {
+ str = strdup (value);
+ cur_tok = strsep (&str, ":");
+ if (!inet_aton (cur_tok, &surbl_module_ctx->redirector_addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent != NULL) {
+ memcpy((char *)&surbl_module_ctx->redirector_addr, hent->h_addr, sizeof(struct in_addr));
+ if (str != NULL) {
+ surbl_module_ctx->redirector_port = (uint16_t)strtoul (str, NULL, 10);
+ }
+ else {
+ surbl_module_ctx->redirector_port = DEFAULT_REDIRECTOR_PORT;
+ }
+ surbl_module_ctx->use_redirector = 1;
+ }
+ }
+ /* Free cur_tok as it is actually initial str after strsep */
+ free (cur_tok);
+ }
+ if ((value = get_module_opt (cfg, "surbl", "weight")) != NULL) {
+ surbl_module_ctx->weight = atoi (value);
+ }
+ else {
+ surbl_module_ctx->weight = DEFAULT_SURBL_WEIGHT;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "url_expire")) != NULL) {
+ surbl_module_ctx->url_expire = atoi (value);
+ }
+ else {
+ surbl_module_ctx->url_expire = DEFAULT_SURBL_URL_EXPIRE;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "redirector_connect_timeout")) != NULL) {
+ surbl_module_ctx->connect_timeout = parse_seconds (value);
+ }
+ else {
+ surbl_module_ctx->connect_timeout = DEFAULT_REDIRECTOR_CONNECT_TIMEOUT;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "redirector_read_timeout")) != NULL) {
+ surbl_module_ctx->read_timeout = parse_seconds (value);
+ }
+ else {
+ surbl_module_ctx->read_timeout = DEFAULT_REDIRECTOR_READ_TIMEOUT;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "max_urls")) != NULL) {
+ surbl_module_ctx->max_urls = atoi (value);
+ }
+ else {
+ surbl_module_ctx->max_urls = DEFAULT_SURBL_MAX_URLS;
+ }
+
+ *ctx = (struct module_ctx *)surbl_module_ctx;
+
+ evdns_init ();
+
+ return 0;
+}
+
+static void
+memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ struct memcached_param *param = (struct memcached_param *)data;
+ int *url_count;
+ struct filter_result *res;
+
+ switch (ctx->op) {
+ case CMD_CONNECT:
+ if (error != OK) {
+ msg_info ("memcached_callback: memcached returned error %s on CONNECT stage");
+ memc_close_ctx (param->ctx);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+ }
+ else {
+ memc_get (param->ctx, param->ctx->param);
+ }
+ break;
+ case CMD_READ:
+ if (error != OK) {
+ msg_info ("memcached_callback: memcached returned error %s on READ stage");
+ memc_close_ctx (param->ctx);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+ }
+ else {
+ url_count = (int *)param->ctx->param->buf;
+ /* Do not check DNS for urls that have count more than max_urls */
+ if (*url_count > surbl_module_ctx->max_urls) {
+ msg_info ("memcached_callback: url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls);
+ res = TAILQ_LAST (&param->task->results, resultsq);
+ res->mark += surbl_module_ctx->weight;
+ }
+ (*url_count) ++;
+ memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire);
+ }
+ break;
+ case CMD_WRITE:
+ if (error != OK) {
+ msg_info ("memcached_callback: memcached returned error %s on WRITE stage");
+ }
+ memc_close_ctx (param->ctx);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ //XXX: read http://surbl.org and add surbl request here
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+ break;
+ }
+}
+
+static void
+register_memcached_call (struct uri *url, struct worker_task *task)
+{
+ struct memcached_param *param;
+ struct memcached_server *selected;
+ memcached_param_t *cur_param;
+ gchar *sum_str;
+ int *url_count;
+
+ param = g_malloc (sizeof (struct memcached_param));
+ cur_param = g_malloc (sizeof (memcached_param_t));
+ url_count = g_malloc (sizeof (int));
+
+ param->url = url;
+ param->task = task;
+
+ param->ctx = g_malloc (sizeof (memcached_ctx_t));
+ bzero (param->ctx, sizeof (memcached_ctx_t));
+ bzero (cur_param, sizeof (memcached_param_t));
+
+ cur_param->buf = (u_char *)url_count;
+ cur_param->bufsize = sizeof (int);
+
+ sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1);
+ strlcpy (cur_param->key, sum_str, sizeof (cur_param->key));
+ g_free (sum_str);
+
+ selected = (struct memcached_server *) get_upstream_by_hash ((void *)task->cfg->memcached_servers,
+ task->cfg->memcached_servers_num, sizeof (struct memcached_server),
+ time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors,
+ cur_param->key, strlen(cur_param->key));
+ param->ctx->callback = memcached_callback;
+ param->ctx->callback_data = (void *)param;
+ param->ctx->protocol = task->cfg->memcached_protocol;
+ memcpy(&param->ctx->addr, &selected->addr, sizeof (struct in_addr));
+ param->ctx->port = selected->port;
+ param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000;
+ param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000;
+ param->ctx->sock = -1;
+#ifdef WITH_DEBUG
+ param->ctx->options = MEMC_OPT_DEBUG;
+#else
+ param->ctx->options = 0;
+#endif
+ param->ctx->param = cur_param;
+ memc_init_ctx (param->ctx);
+}
+
+static void
+redirector_callback (int fd, short what, void *arg)
+{
+ struct redirector_param *param = (struct redirector_param *)arg;
+ char url_buf[1024];
+ int r;
+ struct timeval timeout;
+ char *p, *c;
+
+ switch (param->state) {
+ case STATE_CONNECT:
+ /* We have write readiness after connect call, so reinit event */
+ if (what == EV_WRITE) {
+ timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000;
+ timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000;
+ event_del (&param->ev);
+ event_set (&param->ev, param->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, redirector_callback, (void *)param);
+ event_add (&param->ev, &timeout);
+ r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url));
+ write (param->sock, url_buf, r);
+ param->state = STATE_READ;
+ }
+ else {
+ event_del (&param->ev);
+ msg_info ("redirector_callback: connection to redirector timed out");
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param);
+ }
+ break;
+ case STATE_READ:
+ if (what == EV_READ) {
+ r = read (param->sock, url_buf, sizeof (url_buf));
+ if ((p = strstr (url_buf, "Uri: ")) != NULL) {
+ p += sizeof ("Uri: ") - 1;
+ c = p;
+ while (p++ < url_buf + sizeof (url_buf) - 1) {
+ if (*p == '\r' || *p == '\n') {
+ *p = '\0';
+ break;
+ }
+ }
+ if (*p == '\0') {
+ msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c);
+ parse_uri (param->url, c);
+ normalize_uri (param->url, c);
+ register_memcached_call (param->url, param->task);
+ param->task->save.saved ++;
+ }
+ }
+ event_del (&param->ev);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param);
+ }
+ else {
+ event_del (&param->ev);
+ msg_info ("redirector_callback: reading redirector timed out");
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param);
+ }
+ break;
+ }
+}
+
+
+static void
+register_redirector_call (struct uri *url, struct worker_task *task)
+{
+ struct sockaddr_in sc;
+ int ofl, r, s;
+ struct redirector_param *param;
+ struct timeval timeout;
+
+ bzero (&sc, sizeof (struct sockaddr_in *));
+ sc.sin_family = AF_INET;
+ sc.sin_port = surbl_module_ctx->redirector_port;
+ memcpy (&sc.sin_addr, &surbl_module_ctx->redirector_addr, sizeof (struct in_addr));
+
+ s = socket (PF_INET, SOCK_STREAM, 0);
+
+ if (s == -1) {
+ msg_info ("register_redirector_call: socket() failed: %m");
+ return;
+ }
+
+ /* set nonblocking */
+ ofl = fcntl(s, F_GETFL, 0);
+ fcntl(s, F_SETFL, ofl | O_NONBLOCK);
+
+ if ((r = connect (s, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) {
+ if (errno != EINPROGRESS) {
+ close (s);
+ msg_info ("register_redirector_call: connect() failed: %m");
+ }
+ }
+ param = g_malloc (sizeof (struct redirector_param));
+ param->url = url;
+ param->task = task;
+ param->state = STATE_READ;
+ param->sock = s;
+ timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000;
+ timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000;
+ event_set (&param->ev, s, EV_WRITE | EV_TIMEOUT, redirector_callback, (void *)param);
+ event_add (&param->ev, &timeout);
+}
+
+static int
+surbl_test_url (struct worker_task *task)
+{
+ struct uri *url;
+
+ TAILQ_FOREACH (url, &task->urls, next) {
+ if (surbl_module_ctx->use_redirector) {
+ register_redirector_call (url, task);
+ }
+ else {
+ register_memcached_call (url, task);
+ }
+ task->save.saved++;
+ }
+ return 0;
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/url.c b/url.c
index b09202edc..2fb01f396 100644
--- a/url.c
+++ b/url.c
@@ -558,7 +558,7 @@ path_simplify (char *path)
return t != h;
}
-static enum uri_errno
+enum uri_errno
parse_uri(struct uri *uri, unsigned char *uristring)
{
unsigned char *prefix_end, *host_end;
@@ -762,7 +762,7 @@ parse_uri(struct uri *uri, unsigned char *uristring)
return URI_ERRNO_OK;
}
-static unsigned char *
+unsigned char *
normalize_uri(struct uri *uri, unsigned char *uristring)
{
unsigned char *parse_string = uristring;
diff --git a/url.h b/url.h
index 7d9d87db1..ae845aca1 100644
--- a/url.h
+++ b/url.h
@@ -82,5 +82,7 @@ enum protocol {
void url_parse_html (struct worker_task *task, GByteArray *part);
void url_parse_text (struct worker_task *task, GByteArray *part);
+enum uri_errno parse_uri(struct uri *uri, unsigned char *uristring);
+unsigned char * normalize_uri(struct uri *uri, unsigned char *uristring);
#endif
diff --git a/worker.c b/worker.c
index ac9dea9db..baf7479ff 100644
--- a/worker.c
+++ b/worker.c
@@ -204,7 +204,7 @@ process_filters (struct worker_task *task)
int i = 0;
/* First process C modules */
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
if (task->save.save_type == C_FILTER) {
task->save.saved = 0;
c_filter = (struct c_module *)task->save.entry;
@@ -233,28 +233,32 @@ process_filters (struct worker_task *task)
res->mark = 0;
if (c_filter->ctx->header_filter != NULL) {
res->mark += c_filter->ctx->header_filter (task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = C_FILTER;
goto save_point;
}
}
if (c_filter->ctx->message_filter != NULL) {
res->mark += c_filter->ctx->message_filter (task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = C_FILTER;
goto save_point;
}
}
if (c_filter->ctx->mime_filter != NULL) {
res->mark += c_filter->ctx->mime_filter (task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = C_FILTER;
goto save_point;
}
}
if (c_filter->ctx->url_filter != NULL) {
res->mark += c_filter->ctx->url_filter (task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = C_FILTER;
goto save_point;
}
@@ -295,28 +299,32 @@ process_filters (struct worker_task *task)
switch (perl_script->type) {
case SCRIPT_HEADER:
res->mark += perl_call_header_filter (perl_script->function, task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = PERL_FILTER;
goto save_point;
}
break;
case SCRIPT_MESSAGE:
res->mark += perl_call_message_filter (perl_script->function, task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = PERL_FILTER;
goto save_point;
}
break;
case SCRIPT_MIME:
res->mark += perl_call_mime_filter (perl_script->function, task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = PERL_FILTER;
goto save_point;
}
break;
case SCRIPT_URL:
res->mark += perl_call_url_filter (perl_script->function, task);
- if (task->save.saved == 1) {
+ if (task->save.saved > 0) {
+ TAILQ_INSERT_TAIL (&task->results, res, next);
task->save.save_type = PERL_FILTER;
goto save_point;
}