From f3b6712e04fd993caccaa18e425639d85d81b1eb Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 21 Aug 2008 17:58:30 +0400 Subject: * Add filters logic * Perl should use separate memcached context for its operations (just do memcpy with the same socket) TODO: add block mechanics here to avoid memcached connection closing before perl operation is finished * Change logic of perl chain filter --- worker.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 4 deletions(-) (limited to 'worker.c') diff --git a/worker.c b/worker.c index d45284392..84c9ef763 100644 --- a/worker.c +++ b/worker.c @@ -73,6 +73,7 @@ free_task (struct worker_task *task) { struct uri *cur; struct filter_result *res; + struct chain_result *chain_res; struct mime_part *part; if (task) { @@ -104,9 +105,18 @@ free_task (struct worker_task *task) } while (!TAILQ_EMPTY (&task->results)) { res = TAILQ_FIRST (&task->results); - free (res); TAILQ_REMOVE (&task->results, res, next); + free (res); + } + while (!TAILQ_EMPTY (&task->chain_results)) { + chain_res = TAILQ_FIRST (&task->chain_results); + if (chain_res->marks != NULL) { + free (chain_res->marks); + } + TAILQ_REMOVE (&task->chain_results, chain_res, next); + free (chain_res); } + while (!TAILQ_EMPTY (&task->parts)) { part = TAILQ_FIRST (&task->parts); g_object_unref (part->type); @@ -123,7 +133,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) { struct worker_task *task = (struct worker_task *)user_data; struct mime_part *mime_part; - const GMimeContentType *type; + GMimeContentType *type; GMimeDataWrapper *wrapper; GMimeStream *part_stream; GByteArray *part_content; @@ -165,7 +175,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) part_stream = g_mime_stream_mem_new (); if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) { part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream)); - type = g_mime_part_get_content_type (GMIME_PART (part)); + type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part)); mime_part = g_malloc (sizeof (struct mime_part)); mime_part->type = type; mime_part->content = part_content; @@ -183,6 +193,111 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) } } +static void +process_filters (struct worker_task *task) +{ + struct filter_result *res = NULL; + struct chain_result *chain_res = NULL; + struct c_module *c_filter; + struct filter_chain *chain; + struct script_param *perl_script; + int i = 0; + + /* First process C modules */ + LIST_FOREACH (c_filter, &task->cfg->c_modules, next) { + res = malloc (sizeof (struct filter_result)); + if (res == NULL) { + msg_err ("process_filters: malloc failed, %m"); + return; + } + res->chain = NULL; + res->symbol = c_filter->name; + res->mark = 0; + if (c_filter->ctx->header_filter != NULL) { + res->mark += c_filter->ctx->header_filter (task); + } + if (c_filter->ctx->message_filter != NULL) { + res->mark += c_filter->ctx->message_filter (task); + } + if (c_filter->ctx->mime_filter != NULL) { + res->mark += c_filter->ctx->mime_filter (task); + } + if (c_filter->ctx->url_filter != NULL) { + res->mark += c_filter->ctx->url_filter (task); + } + TAILQ_INSERT_TAIL (&task->results, res, next); + } + + /* Process perl chains */ + LIST_FOREACH (chain, &task->cfg->filters, next) { + chain_res = malloc (sizeof (struct chain_result)); + if (chain_res == NULL) { + msg_err ("process_filters: malloc failed, %m"); + return; + } + i = 0; + chain_res->chain = chain; + chain_res->marks = malloc (sizeof (int) * chain->scripts_number); + chain_res->result_mark = 0; + if (chain_res->marks == NULL) { + free (chain_res); + msg_err ("process_filters: malloc failed, %m"); + return; + } + LIST_FOREACH (perl_script, chain->scripts, next) { + if (perl_script->type == SCRIPT_CHAIN) { + /* Skip chain filters first */ + continue; + } + res = malloc (sizeof (struct filter_result)); + if (res == NULL) { + msg_err ("process_filters: malloc failed, %m"); + return; + } + res->chain = chain; + res->symbol = perl_script->symbol; + res->mark = 0; + switch (perl_script->type) { + case SCRIPT_HEADER: + res->mark += perl_call_header_filter (perl_script->function, task); + break; + case SCRIPT_MESSAGE: + res->mark += perl_call_message_filter (perl_script->function, task); + break; + case SCRIPT_MIME: + res->mark += perl_call_mime_filter (perl_script->function, task); + break; + case SCRIPT_URL: + res->mark += perl_call_url_filter (perl_script->function, task); + break; + } + TAILQ_INSERT_TAIL (&task->results, res, next); + chain_res->marks[i++] = res->mark; + } + chain_res->marks_num = i; + TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next); + } + + /* Now process chain results */ + TAILQ_FOREACH (chain_res, &task->chain_results, next) { + i = 0; + LIST_FOREACH (perl_script, chain_res->chain->scripts, next) { + if (perl_script->type != SCRIPT_CHAIN) { + /* Skip not chain filters */ + continue; + } + /* Increment i; if i would be equal to zero that would mean that this chain has no chain filter script */ + i ++; + chain_res->result_mark += perl_call_url_filter (perl_script->function, task, chain_res->marks, chain_res->marks_num); + } + /* If chain has no chain filter, just do addition of all marks */ + if (i == 0) { + for (i = 0; i < chain_res->marks_num; i++) { + chain_res->result_mark += chain_res->marks[i]; + } + } + } +} static void process_message (struct worker_task *task) @@ -211,6 +326,8 @@ process_message (struct worker_task *task) g_mime_message_foreach_part (message, mime_foreach_callback, task); msg_info ("process_message: found %d parts in message", task->parts_count); + + process_filters (task); } static void @@ -405,6 +522,7 @@ accept_socket (int fd, short what, void *arg) new_task->state = READ_COMMAND; new_task->content_length = 0; new_task->parts_count = 0; + new_task->cfg = worker->srv->cfg; TAILQ_INIT (&new_task->urls); TAILQ_INIT (&new_task->results); TAILQ_INIT (&new_task->parts); @@ -427,7 +545,6 @@ void start_worker (struct rspamd_worker *worker, int listen_sock) { struct sigaction signals; - struct config_file *cfg = worker->srv->cfg; worker->srv->pid = getpid (); worker->srv->type = TYPE_WORKER; event_init (); -- cgit v1.2.3