]> source.dussan.org Git - rspamd.git/commitdiff
* Add filters logic
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 21 Aug 2008 13:58:30 +0000 (17:58 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 21 Aug 2008 13:58:30 +0000 (17:58 +0400)
* Perl should use separate memcached context for its operations (just do memcpy with the same socket)
TODO: add block mechanics here to avoid memcached connection closing before perl operation is finished
* Change logic of perl chain filter

cfg_file.h
cfg_file.y
main.h
perl.c
perl.h
perl/rspamd.xs
worker.c

index bf532c5885799db65a97e5f3dff3e3e1f527775a..62856b489fd65744dd4e82070c2d4040dcfd3538 100644 (file)
@@ -6,6 +6,7 @@
 #ifndef CFG_FILE_H
 #define CFG_FILE_H
 
+#include "config.h"
 #include <sys/types.h>
 #ifndef OWN_QUEUE_H
 #include <sys/queue.h>
@@ -18,6 +19,7 @@
 #include <glib.h>
 #include "upstream.h"
 #include "memcached.h"
+#include "main.h"
 
 #define DEFAULT_BIND_PORT 768
 #define MAX_MEMCACHED_SERVERS 48
@@ -55,8 +57,6 @@ enum script_type {
        SCRIPT_CHAIN,
 };
 
-struct uri;
-
 struct memcached_server {
        struct upstream up;
        struct in_addr addr;
@@ -70,20 +70,6 @@ struct perl_module {
        LIST_ENTRY (perl_module) next;
 };
 
-struct module_ctx {
-       int (*header_filter)(const char *header_name, const char *header_value);
-       int (*mime_filter)(GByteArray *content);
-       int (*message_filter)(GByteArray *content);
-       int (*uri_filter)(struct uri *uri);
-       int (*chain_filter)(GArray *results);
-};
-
-struct c_module {
-       const char *name;
-       struct module_ctx *ctx;
-       LIST_ENTRY (c_module) next;
-};
-
 struct script_param {
        char *symbol;
        char *function;
@@ -93,6 +79,7 @@ struct script_param {
 
 struct filter_chain {
        unsigned int metric;
+       unsigned int scripts_number;
        LIST_HEAD (scriptq, script_param) *scripts;
        LIST_ENTRY (filter_chain) next;
 };
index 0c6038f6083cd6fdd77d00e111413615dda9980d..91f84f4ceafcf610336dcf3e9a6dc83254ece3e7 100644 (file)
@@ -25,6 +25,7 @@ extern int yylineno;
 extern char *yytext;
 
 struct scriptq *cur_scripts;
+unsigned int cur_scripts_num = 0;
 
 %}
 
@@ -226,6 +227,7 @@ filterbody:
 
                cur_chain->metric = $1;
                cur_chain->scripts = cur_scripts;
+               cur_chain->scripts_number = cur_scripts_num;
                LIST_INSERT_HEAD (&cfg->filters, cur_chain, next);
 
        }
@@ -250,6 +252,7 @@ filter_chain:
                        YYERROR;
                }
                LIST_INSERT_HEAD (cur_scripts, $1, next);
+               cur_scripts_num = 1;
        }
        | filter_chain filter_param SEMICOLON   {
                if ($2 == NULL) {
@@ -257,6 +260,7 @@ filter_chain:
                        YYERROR;
                }
                LIST_INSERT_HEAD (cur_scripts, $2, next);
+               cur_scripts_num ++;
        }
        ;
 
diff --git a/main.h b/main.h
index cb98167ae7f859935cfa708d8fca671f77c7cf89..fdf22b8bada395ce1ee966f0e2c2d9c656e8f36d 100644 (file)
--- a/main.h
+++ b/main.h
@@ -59,6 +59,7 @@ struct pidfh;
 struct config_file;
 struct filter_chain;
 
+
 /* Struct that determine main server object (for logging purposes) */
 struct rspamd_main {
        struct config_file *cfg;
@@ -78,6 +79,14 @@ struct filter_result {
        TAILQ_ENTRY (filter_result) next;
 };
 
+struct chain_result {
+       struct filter_chain *chain;
+       int *marks;
+       unsigned int marks_num;
+       int result_mark;
+       TAILQ_ENTRY (chain_result) next;
+};
+
 struct mime_part {
        GMimeContentType *type;
        GByteArray *content;
@@ -114,6 +123,22 @@ struct worker_task {
        TAILQ_HEAD (uriq, uri) urls;
        /* List of filter results */
        TAILQ_HEAD (resultsq, filter_result) results;
+       /* Results of all chains */
+       TAILQ_HEAD (chainsq, chain_result) chain_results;
+       struct config_file *cfg;
+};
+
+struct module_ctx {
+       int (*header_filter)(struct worker_task *task);
+       int (*mime_filter)(struct worker_task *task);
+       int (*message_filter)(struct worker_task *task);
+       int (*url_filter)(struct worker_task *task);
+};
+
+struct c_module {
+       const char *name;
+       struct module_ctx *ctx;
+       LIST_ENTRY (c_module) next;
 };
 
 void start_worker (struct rspamd_worker *worker, int listen_sock);
diff --git a/perl.c b/perl.c
index e2ae086ebf609f161956a860c855c3b890742d31..2cdfacf552547944c90a4c7514c9440984ce62b0 100644 (file)
--- a/perl.c
+++ b/perl.c
@@ -125,16 +125,23 @@ perl_call_url_filter (const char *function, struct worker_task *task)
 }
 
 int
-perl_call_chain_filter (const char *function, struct worker_task *task)
+perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number)
 {
-       int result;
+       int result, i;
+       AV *av;
 
        dSP;
-
+       
        ENTER;
        SAVETMPS;
+       av = newAV();
+       av_extend (av, number);
+       for (i = 0; i < number; i ++) {
+               av_push (av, sv_2mortal (newSViv (marks[i])));
+       }
        PUSHMARK (SP);
        XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+       XPUSHs (AvARRAY (av));
        PUTBACK;
        
        call_pv (function, G_SCALAR);
@@ -146,6 +153,7 @@ perl_call_chain_filter (const char *function, struct worker_task *task)
 
        PUTBACK;
        FREETMPS;
+       av_undef (av);
        LEAVE;
 
 
@@ -170,6 +178,9 @@ void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, voi
        PUTBACK;
 
        call_sv (callback_data->callback, G_SCALAR);
+       
+       free (callback_data);
+       free (ctx);
 
        SPAGAIN;
        FREETMPS;
diff --git a/perl.h b/perl.h
index fc2ae93213dbc718c6d3b258c4db3fb4d5774fc3..9a37634e3743ed8908b404ccc8ad0c040869d6cd 100644 (file)
--- a/perl.h
+++ b/perl.h
@@ -12,7 +12,7 @@ int perl_call_header_filter (const char *function, struct worker_task *task);
 int perl_call_mime_filter (const char *function, struct worker_task *task);
 int perl_call_message_filter (const char *function, struct worker_task *task);
 int perl_call_url_filter (const char *function, struct worker_task *task);
-int perl_call_chain_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number);
 
 void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
 
index 681e40308173aff5e1a9c403854f21ccaeceab6e..4ec40f0fbe392e698373d0c333486af465bea276 100644 (file)
@@ -111,6 +111,7 @@ read_memcached_key (r, key, datalen, callback)
         SV *callback;
         struct worker_task *task;
     } *callback_data;
+    memcached_ctx_t *ctx;
     memcached_param_t param;
 
     perl_set_session (r);
@@ -118,16 +119,21 @@ read_memcached_key (r, key, datalen, callback)
     datalen = (unsigned int) SvIV (ST(2));
     callback = SvRV(ST(3));
 
-    r->memc_ctx->callback = perl_call_memcached_callback;
+    /* Copy old ctx to new one */
+    ctx = malloc (sizeof (memcached_ctx_t));
+    if (ctx == NULL) {
+        XSRETURN_UNDEF;
+    }
+    memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+    /* Set perl callback */
+    ctx->callback = perl_call_memcached_callback;
     callback_data = malloc (sizeof (struct _param));
     if (callback_data == NULL) {
                XSRETURN_UNDEF;
     }
     callback_data->callback = callback;
     callback_data->task = r;
-    r->memc_ctx->callback_data = (void *)callback_data;
-
-    r->memc_busy = 1;
+    ctx->callback_data = (void *)callback_data;
 
     strlcpy (param.key, key, sizeof (param.key));
     param.buf = malloc (datalen);
@@ -137,7 +143,7 @@ read_memcached_key (r, key, datalen, callback)
     param.bufpos = 0;
     param.expire = 0;
 
-    memc_get (r->memc_ctx, &param);
+    memc_get (ctx, &param);
     XSRETURN_EMPTY;
 
 void
@@ -152,6 +158,7 @@ write_memcached_key (r, key, data, expire, callback)
         SV *callback;
         struct worker_task *task;
     } *callback_data;
+    memcached_ctx_t *ctx;
     memcached_param_t param;
 
     perl_set_session (r);
@@ -160,16 +167,21 @@ write_memcached_key (r, key, data, expire, callback)
     expire = (int) SvIV (ST(3));
     callback = SvRV(ST(4));
 
-    r->memc_ctx->callback = perl_call_memcached_callback;
+    /* Copy old ctx to new one */
+    ctx = malloc (sizeof (memcached_ctx_t));
+    if (ctx == NULL) {
+        XSRETURN_UNDEF;
+    }
+    memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+    /* Set perl callback */
+    ctx->callback = perl_call_memcached_callback;
     callback_data = malloc (sizeof (struct _param));
     if (callback_data == NULL) {
                XSRETURN_UNDEF;
     }
     callback_data->callback = callback;
     callback_data->task = r;
-    r->memc_ctx->callback_data = (void *)callback_data;
-
-    r->memc_busy = 1;
+    ctx->callback_data = (void *)callback_data;
 
     strlcpy (param.key, key, sizeof (param.key));
     param.buf = data;
@@ -177,7 +189,7 @@ write_memcached_key (r, key, data, expire, callback)
     param.bufpos = 0;
     param.expire = expire;
 
-    memc_set (r->memc_ctx, &param, expire);
+    memc_set (ctx, &param, expire);
     XSRETURN_EMPTY;
 
 void
@@ -191,22 +203,28 @@ delete_memcached_key (r, key, callback)
         SV *callback;
         struct worker_task *task;
     } *callback_data;
+    memcached_ctx_t *ctx;
     memcached_param_t param;
 
     perl_set_session (r);
     key = (char *) SvPV (ST(1), keylen);
     callback = SvRV(ST(2));
 
-    r->memc_ctx->callback = perl_call_memcached_callback;
+    /* Copy old ctx to new one */
+    ctx = malloc (sizeof (memcached_ctx_t));
+    if (ctx == NULL) {
+        XSRETURN_UNDEF;
+    }
+    memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+    /* Set perl callback */
+    ctx->callback = perl_call_memcached_callback;
     callback_data = malloc (sizeof (struct _param));
     if (callback_data == NULL) {
                XSRETURN_UNDEF;
     }
     callback_data->callback = callback;
     callback_data->task = r;
-    r->memc_ctx->callback_data = (void *)callback_data;
-
-    r->memc_busy = 1;
+    ctx->callback_data = (void *)callback_data;
 
     strlcpy (param.key, key, sizeof (param.key));
     param.buf = NULL;
@@ -214,6 +232,6 @@ delete_memcached_key (r, key, callback)
     param.bufpos = 0;
     param.expire = 0;
 
-    memc_delete (r->memc_ctx, &param);
+    memc_delete (ctx, &param);
     XSRETURN_EMPTY;
 
index d452843928c54ebcf51f5b00e021aaf22c2a0a2e..84c9ef763d4320dc7ad6f78e8e54348b0a3e06fd 100644 (file)
--- a/worker.c
+++ b/worker.c
@@ -73,6 +73,7 @@ free_task (struct worker_task *task)
 {
        struct uri *cur;
        struct filter_result *res;
+       struct chain_result *chain_res;
        struct mime_part *part;
 
        if (task) {
@@ -104,9 +105,18 @@ free_task (struct worker_task *task)
                }
                while (!TAILQ_EMPTY (&task->results)) {
                        res = TAILQ_FIRST (&task->results);
-                       free (res);
                        TAILQ_REMOVE (&task->results, res, next);
+                       free (res);
+               }
+               while (!TAILQ_EMPTY (&task->chain_results)) {
+                       chain_res = TAILQ_FIRST (&task->chain_results);
+                       if (chain_res->marks != NULL) {
+                               free (chain_res->marks);
+                       }
+                       TAILQ_REMOVE (&task->chain_results, chain_res, next);
+                       free (chain_res);
                }
+
                while (!TAILQ_EMPTY (&task->parts)) {
                        part = TAILQ_FIRST (&task->parts);
                        g_object_unref (part->type);
@@ -123,7 +133,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
 {
        struct worker_task *task = (struct worker_task *)user_data;
        struct mime_part *mime_part;
-       const GMimeContentType *type;
+       GMimeContentType *type;
        GMimeDataWrapper *wrapper;
        GMimeStream *part_stream;
        GByteArray *part_content;
@@ -165,7 +175,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
                        part_stream = g_mime_stream_mem_new ();
                        if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) {
                                part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream));
-                               type = g_mime_part_get_content_type (GMIME_PART (part));
+                               type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part));
                                mime_part = g_malloc (sizeof (struct mime_part));
                                mime_part->type = type;
                                mime_part->content = part_content;
@@ -183,6 +193,111 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
        }
 }
 
+static void
+process_filters (struct worker_task *task)
+{
+       struct filter_result *res = NULL;
+       struct chain_result *chain_res = NULL;
+       struct c_module *c_filter;
+       struct filter_chain *chain;
+       struct script_param *perl_script;
+       int i = 0;
+       
+       /* First process C modules */
+       LIST_FOREACH (c_filter, &task->cfg->c_modules, next) {
+               res = malloc (sizeof (struct filter_result));
+               if (res == NULL) {
+                       msg_err ("process_filters: malloc failed, %m");
+                       return;
+               }
+               res->chain = NULL;
+               res->symbol = c_filter->name;
+               res->mark = 0;
+               if (c_filter->ctx->header_filter != NULL) {
+                       res->mark += c_filter->ctx->header_filter (task);
+               }
+               if (c_filter->ctx->message_filter != NULL) {
+                       res->mark += c_filter->ctx->message_filter (task);
+               }
+               if (c_filter->ctx->mime_filter != NULL) {
+                       res->mark += c_filter->ctx->mime_filter (task);
+               }
+               if (c_filter->ctx->url_filter != NULL) {
+                       res->mark += c_filter->ctx->url_filter (task);
+               }
+               TAILQ_INSERT_TAIL (&task->results, res, next);
+       }
+
+       /* Process perl chains */
+       LIST_FOREACH (chain, &task->cfg->filters, next) {
+               chain_res = malloc (sizeof (struct chain_result));
+               if (chain_res == NULL) {
+                       msg_err ("process_filters: malloc failed, %m");
+                       return;
+               }
+               i = 0;
+               chain_res->chain = chain;
+               chain_res->marks = malloc (sizeof (int) * chain->scripts_number);
+               chain_res->result_mark = 0;
+               if (chain_res->marks == NULL) {
+                       free (chain_res);
+                       msg_err ("process_filters: malloc failed, %m");
+                       return;
+               }
+               LIST_FOREACH (perl_script, chain->scripts, next) {
+                       if (perl_script->type == SCRIPT_CHAIN) {
+                               /* Skip chain filters first */
+                               continue;
+                       }
+                       res = malloc (sizeof (struct filter_result));
+                       if (res == NULL) {
+                               msg_err ("process_filters: malloc failed, %m");
+                               return;
+                       }
+                       res->chain = chain;
+                       res->symbol = perl_script->symbol;
+                       res->mark = 0;
+                       switch (perl_script->type) {
+                               case SCRIPT_HEADER:
+                                       res->mark += perl_call_header_filter (perl_script->function, task);
+                                       break;
+                               case SCRIPT_MESSAGE:
+                                       res->mark += perl_call_message_filter (perl_script->function, task);
+                                       break;
+                               case SCRIPT_MIME:
+                                       res->mark += perl_call_mime_filter (perl_script->function, task);
+                                       break;
+                               case SCRIPT_URL:
+                                       res->mark += perl_call_url_filter (perl_script->function, task);
+                                       break;
+                       }
+                       TAILQ_INSERT_TAIL (&task->results, res, next);
+                       chain_res->marks[i++] = res->mark;
+               }
+               chain_res->marks_num = i;
+               TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next);
+       }
+
+       /* Now process chain results */
+       TAILQ_FOREACH (chain_res, &task->chain_results, next) {
+               i = 0;
+               LIST_FOREACH (perl_script, chain_res->chain->scripts, next) {
+                       if (perl_script->type != SCRIPT_CHAIN) {
+                               /* Skip not chain filters */
+                               continue;
+                       }
+                       /* Increment i; if i would be equal to zero that would mean that this chain has no chain filter script */
+                       i ++;
+                       chain_res->result_mark += perl_call_url_filter (perl_script->function, task, chain_res->marks, chain_res->marks_num);
+               }
+               /* If chain has no chain filter, just do addition of all marks */
+               if (i == 0) {
+                       for (i = 0; i < chain_res->marks_num; i++) {
+                               chain_res->result_mark += chain_res->marks[i];
+                       }
+               }
+       }
+}
 
 static void
 process_message (struct worker_task *task)
@@ -211,6 +326,8 @@ process_message (struct worker_task *task)
        g_mime_message_foreach_part (message, mime_foreach_callback, task);
        
        msg_info ("process_message: found %d parts in message", task->parts_count);
+
+       process_filters (task);
 }
 
 static void
@@ -405,6 +522,7 @@ accept_socket (int fd, short what, void *arg)
        new_task->state = READ_COMMAND;
        new_task->content_length = 0;
        new_task->parts_count = 0;
+       new_task->cfg = worker->srv->cfg;
        TAILQ_INIT (&new_task->urls);
        TAILQ_INIT (&new_task->results);
        TAILQ_INIT (&new_task->parts);
@@ -427,7 +545,6 @@ void
 start_worker (struct rspamd_worker *worker, int listen_sock)
 {
        struct sigaction signals;
-       struct config_file *cfg = worker->srv->cfg;
        worker->srv->pid = getpid ();
        worker->srv->type = TYPE_WORKER;
        event_init ();