From a4473aedcb7c49b494112bce63d06c98d88e0e0d Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 22 Aug 2008 18:15:02 +0400 Subject: [PATCH] * Add implementation of save point for async events in rspamd filters --- main.h | 11 ++++- perl.c | 3 ++ perl/rspamd.xs | 22 ++++++++++ worker.c | 107 ++++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 127 insertions(+), 16 deletions(-) diff --git a/main.h b/main.h index fdf22b8ba..6e973ef20 100644 --- a/main.h +++ b/main.h @@ -93,6 +93,13 @@ struct mime_part { TAILQ_ENTRY (mime_part) next; }; +struct save_point { + enum { C_FILTER, PERL_FILTER } save_type; + void *entry; + void *chain; + unsigned saved:1; +}; + struct worker_task { struct rspamd_worker *worker; enum { @@ -101,6 +108,7 @@ struct worker_task { READ_MESSAGE, WRITE_REPLY, WRITE_ERROR, + WAIT_FILTER, } state; size_t content_length; char *helo; @@ -126,6 +134,7 @@ struct worker_task { /* Results of all chains */ TAILQ_HEAD (chainsq, chain_result) chain_results; struct config_file *cfg; + struct save_point save; }; struct module_ctx { @@ -142,7 +151,7 @@ struct c_module { }; void start_worker (struct rspamd_worker *worker, int listen_sock); - +int process_filters (struct worker_task *task); #endif diff --git a/perl.c b/perl.c index 2cdfacf55..2e1c9e5d7 100644 --- a/perl.c +++ b/perl.c @@ -181,6 +181,9 @@ void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, voi free (callback_data); free (ctx); + /* Set save point */ + callback_data->task->save.saved = 0; + process_filters (callback_data->task); SPAGAIN; FREETMPS; diff --git a/perl/rspamd.xs b/perl/rspamd.xs index 4ec40f0fb..1632ebade 100644 --- a/perl/rspamd.xs +++ b/perl/rspamd.xs @@ -99,6 +99,22 @@ get_part (r, num) OUTPUT: RETVAL +void +save_point (r) + CODE: + struct worker_task *r; + + perl_set_session (r); + r->save.saved = 1; + +void +recall_filter (r) + CODE: + struct worker_task *r; + + perl_set_session (r); + process_filters (r); + void read_memcached_key (r, key, datalen, callback) CODE: @@ -144,6 +160,8 @@ read_memcached_key (r, key, datalen, callback) param.expire = 0; memc_get (ctx, ¶m); + /* Set save point */ + r->save.saved = 1; XSRETURN_EMPTY; void @@ -190,6 +208,8 @@ write_memcached_key (r, key, data, expire, callback) param.expire = expire; memc_set (ctx, ¶m, expire); + /* Set save point */ + r->save.saved = 1; XSRETURN_EMPTY; void @@ -233,5 +253,7 @@ delete_memcached_key (r, key, callback) param.expire = 0; memc_delete (ctx, ¶m); + /* Set save point */ + r->save.saved = 1; XSRETURN_EMPTY; diff --git a/worker.c b/worker.c index 84c9ef763..ac9dea9db 100644 --- a/worker.c +++ b/worker.c @@ -193,47 +193,82 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) } } -static void +int process_filters (struct worker_task *task) { struct filter_result *res = NULL; struct chain_result *chain_res = NULL; - struct c_module *c_filter; - struct filter_chain *chain; - struct script_param *perl_script; + struct c_module *c_filter = NULL; + struct filter_chain *chain = NULL; + struct script_param *perl_script = NULL; int i = 0; /* First process C modules */ - LIST_FOREACH (c_filter, &task->cfg->c_modules, next) { + if (task->save.saved == 1) { + if (task->save.save_type == C_FILTER) { + task->save.saved = 0; + c_filter = (struct c_module *)task->save.entry; + } + else if (task->save.save_type == PERL_FILTER) { + chain = (struct filter_chain *)task->save.chain; + perl_script = (struct script_param *)task->save.entry; + task->save.saved = 0; + } + } + else { + c_filter = LIST_FIRST (&task->cfg->c_modules); + chain = LIST_FIRST (&task->cfg->filters); + if (chain) { + perl_script = LIST_FIRST (chain->scripts); + } + } + while (c_filter != NULL) { res = malloc (sizeof (struct filter_result)); if (res == NULL) { msg_err ("process_filters: malloc failed, %m"); - return; + return -1; } res->chain = NULL; res->symbol = c_filter->name; res->mark = 0; if (c_filter->ctx->header_filter != NULL) { res->mark += c_filter->ctx->header_filter (task); + if (task->save.saved == 1) { + task->save.save_type = C_FILTER; + goto save_point; + } } if (c_filter->ctx->message_filter != NULL) { res->mark += c_filter->ctx->message_filter (task); + if (task->save.saved == 1) { + task->save.save_type = C_FILTER; + goto save_point; + } } if (c_filter->ctx->mime_filter != NULL) { res->mark += c_filter->ctx->mime_filter (task); + if (task->save.saved == 1) { + task->save.save_type = C_FILTER; + goto save_point; + } } if (c_filter->ctx->url_filter != NULL) { res->mark += c_filter->ctx->url_filter (task); + if (task->save.saved == 1) { + task->save.save_type = C_FILTER; + goto save_point; + } } TAILQ_INSERT_TAIL (&task->results, res, next); + c_filter = LIST_NEXT (c_filter, next); } /* Process perl chains */ - LIST_FOREACH (chain, &task->cfg->filters, next) { + while (chain != NULL) { chain_res = malloc (sizeof (struct chain_result)); if (chain_res == NULL) { msg_err ("process_filters: malloc failed, %m"); - return; + return -1; } i = 0; chain_res->chain = chain; @@ -242,9 +277,9 @@ process_filters (struct worker_task *task) if (chain_res->marks == NULL) { free (chain_res); msg_err ("process_filters: malloc failed, %m"); - return; + return -1; } - LIST_FOREACH (perl_script, chain->scripts, next) { + while (perl_script != NULL) { if (perl_script->type == SCRIPT_CHAIN) { /* Skip chain filters first */ continue; @@ -252,7 +287,7 @@ process_filters (struct worker_task *task) res = malloc (sizeof (struct filter_result)); if (res == NULL) { msg_err ("process_filters: malloc failed, %m"); - return; + return -1; } res->chain = chain; res->symbol = perl_script->symbol; @@ -260,22 +295,40 @@ process_filters (struct worker_task *task) switch (perl_script->type) { case SCRIPT_HEADER: res->mark += perl_call_header_filter (perl_script->function, task); + if (task->save.saved == 1) { + task->save.save_type = PERL_FILTER; + goto save_point; + } break; case SCRIPT_MESSAGE: res->mark += perl_call_message_filter (perl_script->function, task); + if (task->save.saved == 1) { + task->save.save_type = PERL_FILTER; + goto save_point; + } break; case SCRIPT_MIME: res->mark += perl_call_mime_filter (perl_script->function, task); + if (task->save.saved == 1) { + task->save.save_type = PERL_FILTER; + goto save_point; + } break; case SCRIPT_URL: res->mark += perl_call_url_filter (perl_script->function, task); + if (task->save.saved == 1) { + task->save.save_type = PERL_FILTER; + goto save_point; + } break; } TAILQ_INSERT_TAIL (&task->results, res, next); chain_res->marks[i++] = res->mark; + perl_script = LIST_NEXT (perl_script, next); } chain_res->marks_num = i; TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next); + chain = LIST_NEXT (chain, next); } /* Now process chain results */ @@ -297,9 +350,23 @@ process_filters (struct worker_task *task) } } } + + task->state = WRITE_REPLY; + bufferevent_enable (task->bev, EV_WRITE); + return 0; + +save_point: + if (task->save.save_type == C_FILTER) { + task->save.entry = LIST_NEXT (c_filter, next); + } + else if (task->save.save_type == PERL_FILTER) { + task->save.chain = LIST_NEXT (chain, next); + task->save.entry = LIST_NEXT (perl_script, next); + } + return 1; } -static void +static int process_message (struct worker_task *task) { GMimeMessage *message; @@ -327,7 +394,7 @@ process_message (struct worker_task *task) msg_info ("process_message: found %d parts in message", task->parts_count); - process_filters (task); + return process_filters (task); } static void @@ -448,8 +515,13 @@ read_socket (struct bufferevent *bev, void *arg) task->msg->pos += r; update_buf_size (task->msg); if (task->msg->free == 0) { - process_message (task); - task->state = WRITE_REPLY; + r = process_message (task); + if (r == -1) { + task->state = WRITE_ERROR; + } + else if (r == 1) { + task->state = WAIT_FILTER; + } } } else { @@ -459,6 +531,10 @@ read_socket (struct bufferevent *bev, void *arg) free_task (task); } break; + case WAIT_FILTER: + bufferevent_disable (bev, EV_READ); + bufferevent_disable (bev, EV_READ); + break; case WRITE_REPLY: r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1); bufferevent_disable (bev, EV_READ); @@ -518,6 +594,7 @@ accept_socket (int fd, short what, void *arg) msg_err ("accept_socket: cannot allocate memory for task, %m"); return; } + bzero (new_task, sizeof (struct worker_task)); new_task->worker = worker; new_task->state = READ_COMMAND; new_task->content_length = 0; -- 2.39.5