]> source.dussan.org Git - rspamd.git/commitdiff
* Add implementation of save point for async events in rspamd filters
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 22 Aug 2008 14:15:02 +0000 (18:15 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 22 Aug 2008 14:15:02 +0000 (18:15 +0400)
main.h
perl.c
perl/rspamd.xs
worker.c

diff --git a/main.h b/main.h
index fdf22b8bada395ce1ee966f0e2c2d9c656e8f36d..6e973ef206f4ddd34ccd4ebadb237c4bdae456fc 100644 (file)
--- a/main.h
+++ b/main.h
@@ -93,6 +93,13 @@ struct mime_part {
        TAILQ_ENTRY (mime_part) next;
 };
 
+struct save_point {
+       enum { C_FILTER, PERL_FILTER } save_type;
+       void *entry;
+       void *chain;
+       unsigned saved:1;
+};
+
 struct worker_task {
        struct rspamd_worker *worker;
        enum {
@@ -101,6 +108,7 @@ struct worker_task {
                READ_MESSAGE,
                WRITE_REPLY,
                WRITE_ERROR,
+               WAIT_FILTER,
        } state;
        size_t content_length;
        char *helo;
@@ -126,6 +134,7 @@ struct worker_task {
        /* Results of all chains */
        TAILQ_HEAD (chainsq, chain_result) chain_results;
        struct config_file *cfg;
+       struct save_point save;
 };
 
 struct module_ctx {
@@ -142,7 +151,7 @@ struct c_module {
 };
 
 void start_worker (struct rspamd_worker *worker, int listen_sock);
-
+int process_filters (struct worker_task *task);
 
 #endif
 
diff --git a/perl.c b/perl.c
index 2cdfacf552547944c90a4c7514c9440984ce62b0..2e1c9e5d7a1b172c97bc9fbc38cb9cf114990316 100644 (file)
--- a/perl.c
+++ b/perl.c
@@ -181,6 +181,9 @@ void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, voi
        
        free (callback_data);
        free (ctx);
+    /* Set save point */
+    callback_data->task->save.saved = 0;
+       process_filters (callback_data->task);
 
        SPAGAIN;
        FREETMPS;
index 4ec40f0fbe392e698373d0c333486af465bea276..1632ebadefda216a17b4bb216389fae97cc6aed8 100644 (file)
@@ -99,6 +99,22 @@ get_part (r, num)
        OUTPUT:
        RETVAL
 
+void
+save_point (r)
+    CODE:
+       struct worker_task *r;
+
+       perl_set_session (r);
+    r->save.saved = 1;
+
+void
+recall_filter (r)
+    CODE:
+    struct worker_task *r;
+
+       perl_set_session (r);
+    process_filters (r);
+
 void
 read_memcached_key (r, key, datalen, callback)
     CODE:
@@ -144,6 +160,8 @@ read_memcached_key (r, key, datalen, callback)
     param.expire = 0;
 
     memc_get (ctx, &param);
+    /* Set save point */
+    r->save.saved = 1;
     XSRETURN_EMPTY;
 
 void
@@ -190,6 +208,8 @@ write_memcached_key (r, key, data, expire, callback)
     param.expire = expire;
 
     memc_set (ctx, &param, expire);
+    /* Set save point */
+    r->save.saved = 1;
     XSRETURN_EMPTY;
 
 void
@@ -233,5 +253,7 @@ delete_memcached_key (r, key, callback)
     param.expire = 0;
 
     memc_delete (ctx, &param);
+    /* Set save point */
+    r->save.saved = 1;
     XSRETURN_EMPTY;
 
index 84c9ef763d4320dc7ad6f78e8e54348b0a3e06fd..ac9dea9db4cdb13ce238281abb1ab2abb6636555 100644 (file)
--- a/worker.c
+++ b/worker.c
@@ -193,47 +193,82 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
        }
 }
 
-static void
+int
 process_filters (struct worker_task *task)
 {
        struct filter_result *res = NULL;
        struct chain_result *chain_res = NULL;
-       struct c_module *c_filter;
-       struct filter_chain *chain;
-       struct script_param *perl_script;
+       struct c_module *c_filter = NULL;
+       struct filter_chain *chain = NULL;
+       struct script_param *perl_script = NULL;
        int i = 0;
        
        /* First process C modules */
-       LIST_FOREACH (c_filter, &task->cfg->c_modules, next) {
+       if (task->save.saved == 1) {
+               if (task->save.save_type == C_FILTER) {
+                       task->save.saved = 0;
+                       c_filter = (struct c_module *)task->save.entry;
+               }
+               else if (task->save.save_type == PERL_FILTER) {
+                       chain = (struct filter_chain *)task->save.chain;
+                       perl_script = (struct script_param *)task->save.entry;
+                       task->save.saved = 0;
+               }
+       }
+       else {
+               c_filter = LIST_FIRST (&task->cfg->c_modules);
+               chain = LIST_FIRST (&task->cfg->filters);
+               if (chain) {
+                       perl_script = LIST_FIRST (chain->scripts);
+               }
+       }
+       while (c_filter != NULL) {
                res = malloc (sizeof (struct filter_result));
                if (res == NULL) {
                        msg_err ("process_filters: malloc failed, %m");
-                       return;
+                       return -1;
                }
                res->chain = NULL;
                res->symbol = c_filter->name;
                res->mark = 0;
                if (c_filter->ctx->header_filter != NULL) {
                        res->mark += c_filter->ctx->header_filter (task);
+                       if (task->save.saved == 1) {
+                               task->save.save_type = C_FILTER;
+                               goto save_point;
+                       }
                }
                if (c_filter->ctx->message_filter != NULL) {
                        res->mark += c_filter->ctx->message_filter (task);
+                       if (task->save.saved == 1) {
+                               task->save.save_type = C_FILTER;
+                               goto save_point;
+                       }
                }
                if (c_filter->ctx->mime_filter != NULL) {
                        res->mark += c_filter->ctx->mime_filter (task);
+                       if (task->save.saved == 1) {
+                               task->save.save_type = C_FILTER;
+                               goto save_point;
+                       }
                }
                if (c_filter->ctx->url_filter != NULL) {
                        res->mark += c_filter->ctx->url_filter (task);
+                       if (task->save.saved == 1) {
+                               task->save.save_type = C_FILTER;
+                               goto save_point;
+                       }
                }
                TAILQ_INSERT_TAIL (&task->results, res, next);
+               c_filter = LIST_NEXT (c_filter, next);
        }
 
        /* Process perl chains */
-       LIST_FOREACH (chain, &task->cfg->filters, next) {
+       while (chain != NULL) {
                chain_res = malloc (sizeof (struct chain_result));
                if (chain_res == NULL) {
                        msg_err ("process_filters: malloc failed, %m");
-                       return;
+                       return -1;
                }
                i = 0;
                chain_res->chain = chain;
@@ -242,9 +277,9 @@ process_filters (struct worker_task *task)
                if (chain_res->marks == NULL) {
                        free (chain_res);
                        msg_err ("process_filters: malloc failed, %m");
-                       return;
+                       return -1;
                }
-               LIST_FOREACH (perl_script, chain->scripts, next) {
+               while (perl_script != NULL) {
                        if (perl_script->type == SCRIPT_CHAIN) {
                                /* Skip chain filters first */
                                continue;
@@ -252,7 +287,7 @@ process_filters (struct worker_task *task)
                        res = malloc (sizeof (struct filter_result));
                        if (res == NULL) {
                                msg_err ("process_filters: malloc failed, %m");
-                               return;
+                               return -1;
                        }
                        res->chain = chain;
                        res->symbol = perl_script->symbol;
@@ -260,22 +295,40 @@ process_filters (struct worker_task *task)
                        switch (perl_script->type) {
                                case SCRIPT_HEADER:
                                        res->mark += perl_call_header_filter (perl_script->function, task);
+                                       if (task->save.saved == 1) {
+                                               task->save.save_type = PERL_FILTER;
+                                               goto save_point;
+                                       }
                                        break;
                                case SCRIPT_MESSAGE:
                                        res->mark += perl_call_message_filter (perl_script->function, task);
+                                       if (task->save.saved == 1) {
+                                               task->save.save_type = PERL_FILTER;
+                                               goto save_point;
+                                       }
                                        break;
                                case SCRIPT_MIME:
                                        res->mark += perl_call_mime_filter (perl_script->function, task);
+                                       if (task->save.saved == 1) {
+                                               task->save.save_type = PERL_FILTER;
+                                               goto save_point;
+                                       }
                                        break;
                                case SCRIPT_URL:
                                        res->mark += perl_call_url_filter (perl_script->function, task);
+                                       if (task->save.saved == 1) {
+                                               task->save.save_type = PERL_FILTER;
+                                               goto save_point;
+                                       }
                                        break;
                        }
                        TAILQ_INSERT_TAIL (&task->results, res, next);
                        chain_res->marks[i++] = res->mark;
+                       perl_script = LIST_NEXT (perl_script, next);
                }
                chain_res->marks_num = i;
                TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next);
+               chain = LIST_NEXT (chain, next);
        }
 
        /* Now process chain results */
@@ -297,9 +350,23 @@ process_filters (struct worker_task *task)
                        }
                }
        }
+       
+       task->state = WRITE_REPLY;
+       bufferevent_enable (task->bev, EV_WRITE);
+       return 0;
+
+save_point:
+       if (task->save.save_type == C_FILTER) {
+               task->save.entry = LIST_NEXT (c_filter, next);
+       }
+       else if (task->save.save_type == PERL_FILTER) {
+               task->save.chain = LIST_NEXT (chain, next);
+               task->save.entry = LIST_NEXT (perl_script, next);
+       }
+       return 1;
 }
 
-static void
+static int
 process_message (struct worker_task *task)
 {
        GMimeMessage *message;
@@ -327,7 +394,7 @@ process_message (struct worker_task *task)
        
        msg_info ("process_message: found %d parts in message", task->parts_count);
 
-       process_filters (task);
+       return process_filters (task);
 }
 
 static void
@@ -448,8 +515,13 @@ read_socket (struct bufferevent *bev, void *arg)
                                task->msg->pos += r;
                                update_buf_size (task->msg);
                                if (task->msg->free == 0) {
-                                       process_message (task);
-                                       task->state = WRITE_REPLY;
+                                       r = process_message (task);
+                                       if (r == -1) {
+                                               task->state = WRITE_ERROR;
+                                       }
+                                       else if (r == 1) {
+                                               task->state = WAIT_FILTER;
+                                       }
                                }
                        }
                        else {
@@ -459,6 +531,10 @@ read_socket (struct bufferevent *bev, void *arg)
                                free_task (task);
                        }
                        break;
+               case WAIT_FILTER:
+                       bufferevent_disable (bev, EV_READ);
+                       bufferevent_disable (bev, EV_READ);
+                       break;
                case WRITE_REPLY:
                        r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1);
                        bufferevent_disable (bev, EV_READ);
@@ -518,6 +594,7 @@ accept_socket (int fd, short what, void *arg)
                msg_err ("accept_socket: cannot allocate memory for task, %m");
                return;
        }
+       bzero (new_task, sizeof (struct worker_task));
        new_task->worker = worker;
        new_task->state = READ_COMMAND;
        new_task->content_length = 0;