]> source.dussan.org Git - rspamd.git/commitdiff
* Add initial implementation of surbl check module
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 3 Sep 2008 15:13:24 +0000 (19:13 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 3 Sep 2008 15:13:24 +0000 (19:13 +0400)
12 files changed:
Makefile.in
cfg_file.h
cfg_file.l
cfg_utils.c
config.h.in
configure
main.c
main.h
plugins/surbl.c [new file with mode: 0644]
url.c
url.h
worker.c

index df7400786a08c6fe812597c8b391e6bfc3159391..92d8683a173c533ac8d252659eee2b8017d486ec 100644 (file)
@@ -30,7 +30,7 @@ clean:
 dist-clean: clean
        rm -f Makefile
        rm -f config.log
-       rm -f md5.h md5.c strlcpy.h strlcpy.c queue.h config.h modules.c
+       rm -f md5.h md5.c strlcpy.h strlcpy.c queue.h config.h modules.c modules.h
        cd perl && rm -f Makefile.old && rm -f Makefile.PL && cd ..
 
 creategroup:
index 40159358406f63cbeaeb3c27617f977a6bf34829..ff4eeb0d710de9a283b60191a68675fbd7461582 100644 (file)
@@ -122,6 +122,10 @@ int parse_bind_line (struct config_file *cf, char *str);
 void init_defaults (struct config_file *cfg);
 void free_config (struct config_file *cfg);
 int parse_script (char *str, struct script_param *param, enum script_type type);
+char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name);
+size_t parse_limit (const char *limit);
+unsigned int parse_seconds (const char *t);
+char parse_flag (const char *str);
 
 int yylex (void);
 int yyparse (void);
index 8cde589b5df96cddd8ca9f1de07e13f8e757c87f..89ca8a171dadcce8e647e44f708b9d21addc7ba3 100644 (file)
 YY_BUFFER_STATE include_stack[MAX_INCLUDE_DEPTH];
 int include_stack_ptr = 0;
 
-static size_t
-parse_limit (const char *limit)
-{
-       size_t result = 0;
-       char *err_str;
-
-       if (!limit || *limit == '\0') return 0;
-
-       result = strtoul (limit, &err_str, 10);
-
-       if (*err_str != '\0') {
-               /* Megabytes */
-               if (*err_str == 'm' || *err_str == 'M') {
-                       result *= 1048576L;
-               }
-               /* Kilobytes */
-               else if (*err_str == 'k' || *err_str == 'K') {
-                       result *= 1024;
-               }
-               /* Gigabytes */
-               else if (*err_str == 'g' || *err_str == 'G') {
-                       result *= 1073741824L;
-               }
-       }
-
-       return result;
-}
-
-static unsigned int
-parse_seconds (const char *t)
-{
-       unsigned int result = 0;
-       char *err_str;
-
-       if (!t || *t == '\0') return 0;
-
-       result = strtoul (t, &err_str, 10);
-
-       if (*err_str != '\0') {
-               /* Seconds */
-               if (*err_str == 's' || *err_str == 'S') {
-                       result *= 1000;
-               }
-       }
-
-       return result;
-}
-
-static char
-parse_flag (const char *str)
-{
-       if (!str || !*str) return -1;
-
-       if ((*str == 'Y' || *str == 'y') && *(str + 1) == '\0') {
-               return 1;
-       }
-
-       if ((*str == 'Y' || *str == 'y') &&
-               (*(str + 1) == 'E' || *(str + 1) == 'e') &&
-               (*(str + 2) == 'S' || *(str + 2) == 's') &&
-               *(str + 3) == '\0') {
-               return 1;               
-       }
-
-       if ((*str == 'N' || *str == 'n') && *(str + 1) == '\0') {
-               return 0;
-       }
-
-       if ((*str == 'N' || *str == 'n') &&
-               (*(str + 1) == 'O' || *(str + 1) == 'o') &&
-               *(str + 2) == '\0') {
-               return 0;               
-       }
-
-       return -1;
-}
-
 %}
 
 %option noyywrap
index 83738aea629d3b85b6afce0535519f41d9bb669e..03598062448499008174f4c5e393cf552c1e2402 100644 (file)
@@ -40,6 +40,7 @@ clean_hash_bucket (gpointer key, gpointer value, gpointer unused)
                LIST_REMOVE (cur, next);
                free (cur);
        }
+       free (cur_module_opt);
 }
 
 int
@@ -222,6 +223,103 @@ parse_script (char *str, struct script_param *param, enum script_type type)
        return 0;
 }
 
+char* 
+get_module_opt (struct config_file *cfg, char *module_name, char *opt_name)
+{
+       LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+       struct module_opt *cur;
+       
+       cur_module_opt = g_hash_table_lookup (cfg->modules_opts, module_name);
+       if (cur_module_opt == NULL) {
+               return NULL;
+       }
+
+       LIST_FOREACH (cur, cur_module_opt, next) {
+               if (strcmp (cur->param, opt_name) == 0) {
+                       return cur->value;
+               }
+       }
+
+       return NULL;
+}
+
+size_t
+parse_limit (const char *limit)
+{
+       size_t result = 0;
+       char *err_str;
+
+       if (!limit || *limit == '\0') return 0;
+
+       result = strtoul (limit, &err_str, 10);
+
+       if (*err_str != '\0') {
+               /* Megabytes */
+               if (*err_str == 'm' || *err_str == 'M') {
+                       result *= 1048576L;
+               }
+               /* Kilobytes */
+               else if (*err_str == 'k' || *err_str == 'K') {
+                       result *= 1024;
+               }
+               /* Gigabytes */
+               else if (*err_str == 'g' || *err_str == 'G') {
+                       result *= 1073741824L;
+               }
+       }
+
+       return result;
+}
+
+unsigned int
+parse_seconds (const char *t)
+{
+       unsigned int result = 0;
+       char *err_str;
+
+       if (!t || *t == '\0') return 0;
+
+       result = strtoul (t, &err_str, 10);
+
+       if (*err_str != '\0') {
+               /* Seconds */
+               if (*err_str == 's' || *err_str == 'S') {
+                       result *= 1000;
+               }
+       }
+
+       return result;
+}
+
+char
+parse_flag (const char *str)
+{
+       if (!str || !*str) return -1;
+
+       if ((*str == 'Y' || *str == 'y') && *(str + 1) == '\0') {
+               return 1;
+       }
+
+       if ((*str == 'Y' || *str == 'y') &&
+               (*(str + 1) == 'E' || *(str + 1) == 'e') &&
+               (*(str + 2) == 'S' || *(str + 2) == 's') &&
+               *(str + 3) == '\0') {
+               return 1;               
+       }
+
+       if ((*str == 'N' || *str == 'n') && *(str + 1) == '\0') {
+               return 0;
+       }
+
+       if ((*str == 'N' || *str == 'n') &&
+               (*(str + 1) == 'O' || *(str + 1) == 'o') &&
+               *(str + 2) == '\0') {
+               return 0;               
+       }
+
+       return -1;
+}
+
 /*
  * vi:ts=4
  */
index 7b17b34276f5812274eaa1cc09f022498c861b07..36d67d56eca0a87f6a04a17063426c1375e39181 100644 (file)
@@ -2,10 +2,11 @@
 
 /* Forwarded declaration */
 struct module_ctx;
+struct config_file;
 
 typedef struct module_s {
        const char *name;
-       int (*module_init_func)(struct module_ctx *ctx);
+       int (*module_init_func)(struct config_file *cfg, struct module_ctx **ctx);
 } module_t;
 
 extern module_t modules[];
index 776c15e1cf6976e2c7ae27a1c59ecd09a265e167..7130afa70c223dd23bd8551129d33caf6ad11b30 100755 (executable)
--- a/configure
+++ b/configure
@@ -21,8 +21,8 @@ YACC_OUTPUT="cfg_yacc.c"
 LEX_OUTPUT="cfg_lex.c"
 CONFIG="config.h"
 
-SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c url.c perl.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
-MODULES=""
+SOURCES="upstream.c cfg_utils.c memcached.c main.c util.c worker.c fstring.c url.c perl.c plugins/surbl.c ${LEX_OUTPUT} ${YACC_OUTPUT}"
+MODULES="surbl"
 
 CFLAGS="$CFLAGS -W -Wpointer-arith -Wno-unused-parameter"
 CFLAGS="$CFLAGS -Wno-unused-function -Wunused-variable -Wno-sign-compare"
@@ -414,14 +414,18 @@ write_result()
        echo "#define RVERSION \"${VERSION}\"" >> $CONFIG
        echo "#define HASH_COMPAT" >> $CONFIG
        # Write modules init function
-       echo "#include \"config.h\"" > modules.c
+       echo "#ifndef MODULES_H" > modules.h
+       echo "#include \"config.h\"" >> modules.h
+       echo "#include \"modules.h\"" > modules.c
        echo "module_t modules[] = {" >> modules.c;
        modules_num=0
        for m in $MODULES ; do
                echo "{\"${m}\", ${m}_module_init}," >> modules.c
+               echo "int ${m}_module_init(struct config_file *cfg, struct module_ctx **ctx);" >> modules.h
                modules_num=`expr $modules_num + 1`
        done
        echo "};" >> modules.c
+       echo "#endif" >> modules.h
        echo "#define MODULES_NUM $modules_num" >> $CONFIG
        SOURCES="$SOURCES modules.c"
        OBJECTS=`echo $SOURCES | sed -e 's/\.c/\.o/g'`
@@ -499,7 +503,7 @@ END
                SO=`echo $o | sed -e 's/\.o/\.c/g'`
                cat >> $MAKEFILE << END
 ${o}: \$(DEPS) ${SO}
-       \$(CC) \$(OPT_FLAGS) \$(CFLAGS) \$(PTHREAD_CFLAGS) -c ${SO}
+       \$(CC) \$(OPT_FLAGS) \$(CFLAGS) \$(PTHREAD_CFLAGS) -o ${o} -c ${SO}
 
 END
        done
diff --git a/main.c b/main.c
index ecae9f1390a1c128682e938a63ab3fc0cbe7fe4e..dc1907f85191d5b244a1a679ca00b832cde50def 100644 (file)
--- a/main.c
+++ b/main.c
@@ -230,7 +230,7 @@ main (int argc, char **argv)
        for (i = 0; i < MODULES_NUM; i ++) {
                cur_module = g_malloc (sizeof (struct c_module));
                cur_module->name = modules[i].name;
-               if (modules[i].module_init_func(cur_module->ctx) == 0) {
+               if (modules[i].module_init_func(cfg, &cur_module->ctx) == 0) {
                        LIST_INSERT_HEAD (&cfg->c_modules, cur_module, next);
                }
        }
diff --git a/main.h b/main.h
index 6e973ef206f4ddd34ccd4ebadb237c4bdae456fc..ddfbf959ff5044f66899b9b3841121aade5695ee 100644 (file)
--- a/main.h
+++ b/main.h
@@ -97,7 +97,7 @@ struct save_point {
        enum { C_FILTER, PERL_FILTER } save_type;
        void *entry;
        void *chain;
-       unsigned saved:1;
+       unsigned int saved;
 };
 
 struct worker_task {
diff --git a/plugins/surbl.c b/plugins/surbl.c
new file mode 100644 (file)
index 0000000..266816b
--- /dev/null
@@ -0,0 +1,400 @@
+/***MODULE:surbl
+ * rspamd module that implements SURBL url checking
+ */
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <sys/param.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <stdlib.h>
+
+#include "../config.h"
+#include "../main.h"
+#include "../modules.h"
+#include "../cfg_file.h"
+#include "../memcached.h"
+
+#define DEFAULT_REDIRECTOR_PORT 8080
+#define DEFAULT_SURBL_WEIGHT 10
+#define DEFAULT_REDIRECTOR_CONNECT_TIMEOUT 1000
+#define DEFAULT_REDIRECTOR_READ_TIMEOUT 5000
+#define DEFAULT_SURBL_MAX_URLS 1000
+#define DEFAULT_SURBL_URL_EXPIRE 86400
+
+struct surbl_ctx {
+       int (*header_filter)(struct worker_task *task);
+       int (*mime_filter)(struct worker_task *task);
+       int (*message_filter)(struct worker_task *task);
+       int (*url_filter)(struct worker_task *task);
+       struct in_addr redirector_addr;
+       uint16_t redirector_port;
+       uint16_t weight;
+       unsigned int connect_timeout;
+       unsigned int read_timeout;
+       unsigned int max_urls;
+       unsigned int url_expire;
+       unsigned use_redirector:1;
+};
+
+struct redirector_param {
+       struct uri *url;
+       struct worker_task *task;
+       enum {
+               STATE_CONNECT,
+               STATE_READ,
+       } state;
+       struct event ev;
+       int sock;
+};
+
+struct memcached_param {
+       struct uri *url;
+       struct worker_task *task;
+       memcached_ctx_t *ctx;
+};
+
+struct surbl_ctx *surbl_module_ctx;
+
+static int surbl_test_url (struct worker_task *task);
+
+int
+surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
+{
+       struct hostent *hent;
+
+       char *value, *cur_tok, *str;
+
+       surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx));
+
+       surbl_module_ctx->header_filter = NULL;
+       surbl_module_ctx->mime_filter = NULL;
+       surbl_module_ctx->message_filter = NULL;
+       surbl_module_ctx->url_filter = surbl_test_url;
+       surbl_module_ctx->use_redirector = 0;
+
+       if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) {
+               str = strdup (value);
+               cur_tok = strsep (&str, ":");
+               if (!inet_aton (cur_tok, &surbl_module_ctx->redirector_addr)) {
+                       /* Try to call gethostbyname */
+                       hent = gethostbyname (cur_tok);
+                       if (hent != NULL) {
+                               memcpy((char *)&surbl_module_ctx->redirector_addr, hent->h_addr, sizeof(struct in_addr));
+                               if (str != NULL) {
+                                       surbl_module_ctx->redirector_port = (uint16_t)strtoul (str, NULL, 10);
+                               }
+                               else {
+                                       surbl_module_ctx->redirector_port = DEFAULT_REDIRECTOR_PORT;
+                               }
+                               surbl_module_ctx->use_redirector = 1;
+                       }
+               }
+               /* Free cur_tok as it is actually initial str after strsep */
+               free (cur_tok);
+       }
+       if ((value = get_module_opt (cfg, "surbl", "weight")) != NULL) {
+               surbl_module_ctx->weight = atoi (value);
+       }
+       else {
+               surbl_module_ctx->weight = DEFAULT_SURBL_WEIGHT;
+       }
+       if ((value = get_module_opt (cfg, "surbl", "url_expire")) != NULL) {
+               surbl_module_ctx->url_expire = atoi (value);
+       }
+       else {
+               surbl_module_ctx->url_expire = DEFAULT_SURBL_URL_EXPIRE;
+       }
+       if ((value = get_module_opt (cfg, "surbl", "redirector_connect_timeout")) != NULL) {
+               surbl_module_ctx->connect_timeout = parse_seconds (value);
+       }
+       else {
+               surbl_module_ctx->connect_timeout = DEFAULT_REDIRECTOR_CONNECT_TIMEOUT;
+       }
+       if ((value = get_module_opt (cfg, "surbl", "redirector_read_timeout")) != NULL) {
+               surbl_module_ctx->read_timeout = parse_seconds (value);
+       }
+       else {
+               surbl_module_ctx->read_timeout = DEFAULT_REDIRECTOR_READ_TIMEOUT;
+       }
+       if ((value = get_module_opt (cfg, "surbl", "max_urls")) != NULL) {
+               surbl_module_ctx->max_urls = atoi (value);
+       }
+       else {
+               surbl_module_ctx->max_urls = DEFAULT_SURBL_MAX_URLS;
+       }
+
+       *ctx = (struct module_ctx *)surbl_module_ctx;
+
+       evdns_init ();
+
+       return 0;
+}
+
+static void 
+memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+       struct memcached_param *param = (struct memcached_param *)data;
+       int *url_count;
+       struct filter_result *res;
+
+       switch (ctx->op) {
+               case CMD_CONNECT:
+                       if (error != OK) {
+                               msg_info ("memcached_callback: memcached returned error %s on CONNECT stage");
+                               memc_close_ctx (param->ctx);
+                               param->task->save.saved --;
+                               if (param->task->save.saved == 0) {
+                                       /* Call other filters */
+                                       param->task->save.saved = 1;
+                                       process_filters (param->task);
+                               }
+                               g_free (param->ctx->param->buf);
+                               g_free (param->ctx->param);
+                               g_free (param->ctx);
+                               g_free (param);
+                       }
+                       else {
+                               memc_get (param->ctx, param->ctx->param);
+                       }
+                       break;
+               case CMD_READ:
+                       if (error != OK) {
+                               msg_info ("memcached_callback: memcached returned error %s on READ stage");
+                               memc_close_ctx (param->ctx);
+                               param->task->save.saved --;
+                               if (param->task->save.saved == 0) {
+                                       /* Call other filters */
+                                       param->task->save.saved = 1;
+                                       process_filters (param->task);
+                               }
+                               g_free (param->ctx->param->buf);
+                               g_free (param->ctx->param);
+                               g_free (param->ctx);
+                               g_free (param);
+                       }
+                       else {
+                               url_count = (int *)param->ctx->param->buf;
+                               /* Do not check DNS for urls that have count more than max_urls */
+                               if (*url_count > surbl_module_ctx->max_urls) {
+                                       msg_info ("memcached_callback: url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls);
+                                       res = TAILQ_LAST (&param->task->results, resultsq);
+                                       res->mark += surbl_module_ctx->weight;
+                               }
+                               (*url_count) ++;
+                               memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire);
+                       }
+                       break;
+               case CMD_WRITE:
+                       if (error != OK) {
+                               msg_info ("memcached_callback: memcached returned error %s on WRITE stage");
+                       }
+                       memc_close_ctx (param->ctx);
+                       param->task->save.saved --;
+                       if (param->task->save.saved == 0) {
+                               /* Call other filters */
+                               param->task->save.saved = 1;
+                               process_filters (param->task);
+                       }
+                       //XXX: read http://surbl.org and add surbl request here
+                       g_free (param->ctx->param->buf);
+                       g_free (param->ctx->param);
+                       g_free (param->ctx);
+                       g_free (param);
+                       break;
+       }
+}
+
+static void
+register_memcached_call (struct uri *url, struct worker_task *task) 
+{
+       struct memcached_param *param;
+       struct memcached_server *selected;
+       memcached_param_t *cur_param;
+       gchar *sum_str;
+       int *url_count;
+
+       param = g_malloc (sizeof (struct memcached_param));
+       cur_param = g_malloc (sizeof (memcached_param_t));
+       url_count = g_malloc (sizeof (int));
+
+       param->url = url;
+       param->task = task;
+
+       param->ctx = g_malloc (sizeof (memcached_ctx_t));
+       bzero (param->ctx, sizeof (memcached_ctx_t));
+       bzero (cur_param, sizeof (memcached_param_t));
+
+       cur_param->buf = (u_char *)url_count;
+       cur_param->bufsize = sizeof (int);
+
+       sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1);
+       strlcpy (cur_param->key, sum_str, sizeof (cur_param->key));
+       g_free (sum_str);
+
+       selected = (struct memcached_server *) get_upstream_by_hash ((void *)task->cfg->memcached_servers,
+                                                                                       task->cfg->memcached_servers_num, sizeof (struct memcached_server),
+                                                                                       time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors,
+                                                                                       cur_param->key, strlen(cur_param->key));
+       param->ctx->callback = memcached_callback;
+       param->ctx->callback_data = (void *)param;
+       param->ctx->protocol = task->cfg->memcached_protocol;
+       memcpy(&param->ctx->addr, &selected->addr, sizeof (struct in_addr));
+       param->ctx->port = selected->port;
+       param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000;
+       param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000;
+       param->ctx->sock = -1;
+#ifdef WITH_DEBUG
+       param->ctx->options = MEMC_OPT_DEBUG;
+#else
+       param->ctx->options = 0;
+#endif
+       param->ctx->param = cur_param;
+       memc_init_ctx (param->ctx);
+}
+
+static void
+redirector_callback (int fd, short what, void *arg)
+{
+       struct redirector_param *param = (struct redirector_param *)arg;
+       char url_buf[1024];
+       int r;
+       struct timeval timeout;
+       char *p, *c;
+
+       switch (param->state) {
+               case STATE_CONNECT:
+                       /* We have write readiness after connect call, so reinit event */
+                       if (what == EV_WRITE) {
+                               timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000;
+                               timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000;
+                               event_del (&param->ev);
+                               event_set (&param->ev, param->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, redirector_callback, (void *)param);
+                               event_add (&param->ev, &timeout);
+                               r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url));
+                               write (param->sock, url_buf, r);
+                               param->state = STATE_READ;
+                       }
+                       else {
+                               event_del (&param->ev);
+                               msg_info ("redirector_callback: connection to redirector timed out");
+                               param->task->save.saved --;
+                               if (param->task->save.saved == 0) {
+                                       /* Call other filters */
+                                       param->task->save.saved = 1;
+                                       process_filters (param->task);
+                               }
+                               g_free (param);
+                       }
+                       break;
+               case STATE_READ:
+                       if (what == EV_READ) {
+                               r = read (param->sock, url_buf, sizeof (url_buf));
+                               if ((p = strstr (url_buf, "Uri: ")) != NULL) {
+                                       p += sizeof ("Uri: ") - 1;
+                                       c = p;
+                                       while (p++ < url_buf + sizeof (url_buf) - 1) {
+                                               if (*p == '\r' || *p == '\n') {
+                                                       *p = '\0';
+                                                       break;
+                                               }
+                                       }
+                                       if (*p == '\0') {
+                                               msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c);
+                                               parse_uri (param->url, c);
+                                               normalize_uri (param->url, c);
+                                               register_memcached_call (param->url, param->task);
+                                               param->task->save.saved ++;
+                                       }
+                               }
+                               event_del (&param->ev);
+                               param->task->save.saved --;
+                               if (param->task->save.saved == 0) {
+                                       /* Call other filters */
+                                       param->task->save.saved = 1;
+                                       process_filters (param->task);
+                               }
+                               g_free (param);
+                       }
+                       else {
+                               event_del (&param->ev);
+                               msg_info ("redirector_callback: reading redirector timed out");
+                               param->task->save.saved --;
+                               if (param->task->save.saved == 0) {
+                                       /* Call other filters */
+                                       param->task->save.saved = 1;
+                                       process_filters (param->task);
+                               }
+                               g_free (param);
+                       }
+                       break;
+       }
+}
+
+
+static void
+register_redirector_call (struct uri *url, struct worker_task *task) 
+{
+       struct sockaddr_in sc;
+       int ofl, r, s;
+       struct redirector_param *param;
+       struct timeval timeout;
+
+       bzero (&sc, sizeof (struct sockaddr_in *));
+       sc.sin_family = AF_INET;
+       sc.sin_port = surbl_module_ctx->redirector_port;
+       memcpy (&sc.sin_addr, &surbl_module_ctx->redirector_addr, sizeof (struct in_addr));
+
+       s = socket (PF_INET, SOCK_STREAM, 0);
+
+       if (s == -1) {
+               msg_info ("register_redirector_call: socket() failed: %m");
+               return; 
+       }
+
+       /* set nonblocking */
+    ofl = fcntl(s, F_GETFL, 0);
+    fcntl(s, F_SETFL, ofl | O_NONBLOCK);
+       
+       if ((r = connect (s, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) {
+               if (errno != EINPROGRESS) {
+                       close (s);
+                       msg_info ("register_redirector_call: connect() failed: %m");
+               }
+       }
+       param = g_malloc (sizeof (struct redirector_param));
+       param->url = url;
+       param->task = task;
+       param->state = STATE_READ;
+       param->sock = s;
+       timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000;
+       timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000;
+       event_set (&param->ev, s, EV_WRITE | EV_TIMEOUT, redirector_callback, (void *)param);
+       event_add (&param->ev, &timeout);
+}
+
+static int 
+surbl_test_url (struct worker_task *task)
+{
+       struct uri *url;
+
+       TAILQ_FOREACH (url, &task->urls, next) {
+               if (surbl_module_ctx->use_redirector) {
+                       register_redirector_call (url, task);
+               }
+               else {
+                       register_memcached_call (url, task);
+               }
+               task->save.saved++;
+       }
+       return 0;
+}
+
+/*
+ * vi:ts=4 
+ */
diff --git a/url.c b/url.c
index b09202edc30a78ac18df816a322e26bf39613748..2fb01f396aebe3da21a8228bda5a53b395b0ec49 100644 (file)
--- a/url.c
+++ b/url.c
@@ -558,7 +558,7 @@ path_simplify (char *path)
        return t != h;
 }
 
-static enum uri_errno
+enum uri_errno
 parse_uri(struct uri *uri, unsigned char *uristring)
 {
        unsigned char *prefix_end, *host_end;
@@ -762,7 +762,7 @@ parse_uri(struct uri *uri, unsigned char *uristring)
        return URI_ERRNO_OK;
 }
 
-static unsigned char *
+unsigned char *
 normalize_uri(struct uri *uri, unsigned char *uristring)
 {
        unsigned char *parse_string = uristring;
diff --git a/url.h b/url.h
index 7d9d87db1b6a126c0fadc2b81719898d0a201720..ae845aca138271b22b0a443cf07d56c39b8100f3 100644 (file)
--- a/url.h
+++ b/url.h
@@ -82,5 +82,7 @@ enum protocol {
 
 void url_parse_html (struct worker_task *task, GByteArray *part);
 void url_parse_text (struct worker_task *task, GByteArray *part);
+enum uri_errno parse_uri(struct uri *uri, unsigned char *uristring);
+unsigned char * normalize_uri(struct uri *uri, unsigned char *uristring);
 
 #endif
index ac9dea9db4cdb13ce238281abb1ab2abb6636555..baf7479ffee272d861df132e84770d8474d4019c 100644 (file)
--- a/worker.c
+++ b/worker.c
@@ -204,7 +204,7 @@ process_filters (struct worker_task *task)
        int i = 0;
        
        /* First process C modules */
-       if (task->save.saved == 1) {
+       if (task->save.saved > 0) {
                if (task->save.save_type == C_FILTER) {
                        task->save.saved = 0;
                        c_filter = (struct c_module *)task->save.entry;
@@ -233,28 +233,32 @@ process_filters (struct worker_task *task)
                res->mark = 0;
                if (c_filter->ctx->header_filter != NULL) {
                        res->mark += c_filter->ctx->header_filter (task);
-                       if (task->save.saved == 1) {
+                       if (task->save.saved > 0) {
+                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                task->save.save_type = C_FILTER;
                                goto save_point;
                        }
                }
                if (c_filter->ctx->message_filter != NULL) {
                        res->mark += c_filter->ctx->message_filter (task);
-                       if (task->save.saved == 1) {
+                       if (task->save.saved > 0) {
+                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                task->save.save_type = C_FILTER;
                                goto save_point;
                        }
                }
                if (c_filter->ctx->mime_filter != NULL) {
                        res->mark += c_filter->ctx->mime_filter (task);
-                       if (task->save.saved == 1) {
+                       if (task->save.saved > 0) {
+                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                task->save.save_type = C_FILTER;
                                goto save_point;
                        }
                }
                if (c_filter->ctx->url_filter != NULL) {
                        res->mark += c_filter->ctx->url_filter (task);
-                       if (task->save.saved == 1) {
+                       if (task->save.saved > 0) {
+                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                task->save.save_type = C_FILTER;
                                goto save_point;
                        }
@@ -295,28 +299,32 @@ process_filters (struct worker_task *task)
                        switch (perl_script->type) {
                                case SCRIPT_HEADER:
                                        res->mark += perl_call_header_filter (perl_script->function, task);
-                                       if (task->save.saved == 1) {
+                                       if (task->save.saved > 0) {
+                                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                                task->save.save_type = PERL_FILTER;
                                                goto save_point;
                                        }
                                        break;
                                case SCRIPT_MESSAGE:
                                        res->mark += perl_call_message_filter (perl_script->function, task);
-                                       if (task->save.saved == 1) {
+                                       if (task->save.saved > 0) {
+                                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                                task->save.save_type = PERL_FILTER;
                                                goto save_point;
                                        }
                                        break;
                                case SCRIPT_MIME:
                                        res->mark += perl_call_mime_filter (perl_script->function, task);
-                                       if (task->save.saved == 1) {
+                                       if (task->save.saved > 0) {
+                                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                                task->save.save_type = PERL_FILTER;
                                                goto save_point;
                                        }
                                        break;
                                case SCRIPT_URL:
                                        res->mark += perl_call_url_filter (perl_script->function, task);
-                                       if (task->save.saved == 1) {
+                                       if (task->save.saved > 0) {
+                                               TAILQ_INSERT_TAIL (&task->results, res, next);
                                                task->save.save_type = PERL_FILTER;
                                                goto save_point;
                                        }