From f3b6712e04fd993caccaa18e425639d85d81b1eb Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 21 Aug 2008 17:58:30 +0400 Subject: [PATCH] * Add filters logic * Perl should use separate memcached context for its operations (just do memcpy with the same socket) TODO: add block mechanics here to avoid memcached connection closing before perl operation is finished * Change logic of perl chain filter --- cfg_file.h | 19 ++------ cfg_file.y | 4 ++ main.h | 25 ++++++++++ perl.c | 17 +++++-- perl.h | 2 +- perl/rspamd.xs | 48 +++++++++++++------ worker.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 201 insertions(+), 39 deletions(-) diff --git a/cfg_file.h b/cfg_file.h index bf532c588..62856b489 100644 --- a/cfg_file.h +++ b/cfg_file.h @@ -6,6 +6,7 @@ #ifndef CFG_FILE_H #define CFG_FILE_H +#include "config.h" #include #ifndef OWN_QUEUE_H #include @@ -18,6 +19,7 @@ #include #include "upstream.h" #include "memcached.h" +#include "main.h" #define DEFAULT_BIND_PORT 768 #define MAX_MEMCACHED_SERVERS 48 @@ -55,8 +57,6 @@ enum script_type { SCRIPT_CHAIN, }; -struct uri; - struct memcached_server { struct upstream up; struct in_addr addr; @@ -70,20 +70,6 @@ struct perl_module { LIST_ENTRY (perl_module) next; }; -struct module_ctx { - int (*header_filter)(const char *header_name, const char *header_value); - int (*mime_filter)(GByteArray *content); - int (*message_filter)(GByteArray *content); - int (*uri_filter)(struct uri *uri); - int (*chain_filter)(GArray *results); -}; - -struct c_module { - const char *name; - struct module_ctx *ctx; - LIST_ENTRY (c_module) next; -}; - struct script_param { char *symbol; char *function; @@ -93,6 +79,7 @@ struct script_param { struct filter_chain { unsigned int metric; + unsigned int scripts_number; LIST_HEAD (scriptq, script_param) *scripts; LIST_ENTRY (filter_chain) next; }; diff --git a/cfg_file.y b/cfg_file.y index 0c6038f60..91f84f4ce 100644 --- a/cfg_file.y +++ b/cfg_file.y @@ -25,6 +25,7 @@ extern int yylineno; extern char *yytext; struct scriptq *cur_scripts; +unsigned int cur_scripts_num = 0; %} @@ -226,6 +227,7 @@ filterbody: cur_chain->metric = $1; cur_chain->scripts = cur_scripts; + cur_chain->scripts_number = cur_scripts_num; LIST_INSERT_HEAD (&cfg->filters, cur_chain, next); } @@ -250,6 +252,7 @@ filter_chain: YYERROR; } LIST_INSERT_HEAD (cur_scripts, $1, next); + cur_scripts_num = 1; } | filter_chain filter_param SEMICOLON { if ($2 == NULL) { @@ -257,6 +260,7 @@ filter_chain: YYERROR; } LIST_INSERT_HEAD (cur_scripts, $2, next); + cur_scripts_num ++; } ; diff --git a/main.h b/main.h index cb98167ae..fdf22b8ba 100644 --- a/main.h +++ b/main.h @@ -59,6 +59,7 @@ struct pidfh; struct config_file; struct filter_chain; + /* Struct that determine main server object (for logging purposes) */ struct rspamd_main { struct config_file *cfg; @@ -78,6 +79,14 @@ struct filter_result { TAILQ_ENTRY (filter_result) next; }; +struct chain_result { + struct filter_chain *chain; + int *marks; + unsigned int marks_num; + int result_mark; + TAILQ_ENTRY (chain_result) next; +}; + struct mime_part { GMimeContentType *type; GByteArray *content; @@ -114,6 +123,22 @@ struct worker_task { TAILQ_HEAD (uriq, uri) urls; /* List of filter results */ TAILQ_HEAD (resultsq, filter_result) results; + /* Results of all chains */ + TAILQ_HEAD (chainsq, chain_result) chain_results; + struct config_file *cfg; +}; + +struct module_ctx { + int (*header_filter)(struct worker_task *task); + int (*mime_filter)(struct worker_task *task); + int (*message_filter)(struct worker_task *task); + int (*url_filter)(struct worker_task *task); +}; + +struct c_module { + const char *name; + struct module_ctx *ctx; + LIST_ENTRY (c_module) next; }; void start_worker (struct rspamd_worker *worker, int listen_sock); diff --git a/perl.c b/perl.c index e2ae086eb..2cdfacf55 100644 --- a/perl.c +++ b/perl.c @@ -125,16 +125,23 @@ perl_call_url_filter (const char *function, struct worker_task *task) } int -perl_call_chain_filter (const char *function, struct worker_task *task) +perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number) { - int result; + int result, i; + AV *av; dSP; - + ENTER; SAVETMPS; + av = newAV(); + av_extend (av, number); + for (i = 0; i < number; i ++) { + av_push (av, sv_2mortal (newSViv (marks[i]))); + } PUSHMARK (SP); XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); + XPUSHs (AvARRAY (av)); PUTBACK; call_pv (function, G_SCALAR); @@ -146,6 +153,7 @@ perl_call_chain_filter (const char *function, struct worker_task *task) PUTBACK; FREETMPS; + av_undef (av); LEAVE; @@ -170,6 +178,9 @@ void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, voi PUTBACK; call_sv (callback_data->callback, G_SCALAR); + + free (callback_data); + free (ctx); SPAGAIN; FREETMPS; diff --git a/perl.h b/perl.h index fc2ae9321..9a37634e3 100644 --- a/perl.h +++ b/perl.h @@ -12,7 +12,7 @@ int perl_call_header_filter (const char *function, struct worker_task *task); int perl_call_mime_filter (const char *function, struct worker_task *task); int perl_call_message_filter (const char *function, struct worker_task *task); int perl_call_url_filter (const char *function, struct worker_task *task); -int perl_call_chain_filter (const char *function, struct worker_task *task); +int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number); void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data); diff --git a/perl/rspamd.xs b/perl/rspamd.xs index 681e40308..4ec40f0fb 100644 --- a/perl/rspamd.xs +++ b/perl/rspamd.xs @@ -111,6 +111,7 @@ read_memcached_key (r, key, datalen, callback) SV *callback; struct worker_task *task; } *callback_data; + memcached_ctx_t *ctx; memcached_param_t param; perl_set_session (r); @@ -118,16 +119,21 @@ read_memcached_key (r, key, datalen, callback) datalen = (unsigned int) SvIV (ST(2)); callback = SvRV(ST(3)); - r->memc_ctx->callback = perl_call_memcached_callback; + /* Copy old ctx to new one */ + ctx = malloc (sizeof (memcached_ctx_t)); + if (ctx == NULL) { + XSRETURN_UNDEF; + } + memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t)); + /* Set perl callback */ + ctx->callback = perl_call_memcached_callback; callback_data = malloc (sizeof (struct _param)); if (callback_data == NULL) { XSRETURN_UNDEF; } callback_data->callback = callback; callback_data->task = r; - r->memc_ctx->callback_data = (void *)callback_data; - - r->memc_busy = 1; + ctx->callback_data = (void *)callback_data; strlcpy (param.key, key, sizeof (param.key)); param.buf = malloc (datalen); @@ -137,7 +143,7 @@ read_memcached_key (r, key, datalen, callback) param.bufpos = 0; param.expire = 0; - memc_get (r->memc_ctx, ¶m); + memc_get (ctx, ¶m); XSRETURN_EMPTY; void @@ -152,6 +158,7 @@ write_memcached_key (r, key, data, expire, callback) SV *callback; struct worker_task *task; } *callback_data; + memcached_ctx_t *ctx; memcached_param_t param; perl_set_session (r); @@ -160,16 +167,21 @@ write_memcached_key (r, key, data, expire, callback) expire = (int) SvIV (ST(3)); callback = SvRV(ST(4)); - r->memc_ctx->callback = perl_call_memcached_callback; + /* Copy old ctx to new one */ + ctx = malloc (sizeof (memcached_ctx_t)); + if (ctx == NULL) { + XSRETURN_UNDEF; + } + memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t)); + /* Set perl callback */ + ctx->callback = perl_call_memcached_callback; callback_data = malloc (sizeof (struct _param)); if (callback_data == NULL) { XSRETURN_UNDEF; } callback_data->callback = callback; callback_data->task = r; - r->memc_ctx->callback_data = (void *)callback_data; - - r->memc_busy = 1; + ctx->callback_data = (void *)callback_data; strlcpy (param.key, key, sizeof (param.key)); param.buf = data; @@ -177,7 +189,7 @@ write_memcached_key (r, key, data, expire, callback) param.bufpos = 0; param.expire = expire; - memc_set (r->memc_ctx, ¶m, expire); + memc_set (ctx, ¶m, expire); XSRETURN_EMPTY; void @@ -191,22 +203,28 @@ delete_memcached_key (r, key, callback) SV *callback; struct worker_task *task; } *callback_data; + memcached_ctx_t *ctx; memcached_param_t param; perl_set_session (r); key = (char *) SvPV (ST(1), keylen); callback = SvRV(ST(2)); - r->memc_ctx->callback = perl_call_memcached_callback; + /* Copy old ctx to new one */ + ctx = malloc (sizeof (memcached_ctx_t)); + if (ctx == NULL) { + XSRETURN_UNDEF; + } + memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t)); + /* Set perl callback */ + ctx->callback = perl_call_memcached_callback; callback_data = malloc (sizeof (struct _param)); if (callback_data == NULL) { XSRETURN_UNDEF; } callback_data->callback = callback; callback_data->task = r; - r->memc_ctx->callback_data = (void *)callback_data; - - r->memc_busy = 1; + ctx->callback_data = (void *)callback_data; strlcpy (param.key, key, sizeof (param.key)); param.buf = NULL; @@ -214,6 +232,6 @@ delete_memcached_key (r, key, callback) param.bufpos = 0; param.expire = 0; - memc_delete (r->memc_ctx, ¶m); + memc_delete (ctx, ¶m); XSRETURN_EMPTY; diff --git a/worker.c b/worker.c index d45284392..84c9ef763 100644 --- a/worker.c +++ b/worker.c @@ -73,6 +73,7 @@ free_task (struct worker_task *task) { struct uri *cur; struct filter_result *res; + struct chain_result *chain_res; struct mime_part *part; if (task) { @@ -104,9 +105,18 @@ free_task (struct worker_task *task) } while (!TAILQ_EMPTY (&task->results)) { res = TAILQ_FIRST (&task->results); - free (res); TAILQ_REMOVE (&task->results, res, next); + free (res); + } + while (!TAILQ_EMPTY (&task->chain_results)) { + chain_res = TAILQ_FIRST (&task->chain_results); + if (chain_res->marks != NULL) { + free (chain_res->marks); + } + TAILQ_REMOVE (&task->chain_results, chain_res, next); + free (chain_res); } + while (!TAILQ_EMPTY (&task->parts)) { part = TAILQ_FIRST (&task->parts); g_object_unref (part->type); @@ -123,7 +133,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) { struct worker_task *task = (struct worker_task *)user_data; struct mime_part *mime_part; - const GMimeContentType *type; + GMimeContentType *type; GMimeDataWrapper *wrapper; GMimeStream *part_stream; GByteArray *part_content; @@ -165,7 +175,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) part_stream = g_mime_stream_mem_new (); if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) { part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream)); - type = g_mime_part_get_content_type (GMIME_PART (part)); + type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part)); mime_part = g_malloc (sizeof (struct mime_part)); mime_part->type = type; mime_part->content = part_content; @@ -183,6 +193,111 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data) } } +static void +process_filters (struct worker_task *task) +{ + struct filter_result *res = NULL; + struct chain_result *chain_res = NULL; + struct c_module *c_filter; + struct filter_chain *chain; + struct script_param *perl_script; + int i = 0; + + /* First process C modules */ + LIST_FOREACH (c_filter, &task->cfg->c_modules, next) { + res = malloc (sizeof (struct filter_result)); + if (res == NULL) { + msg_err ("process_filters: malloc failed, %m"); + return; + } + res->chain = NULL; + res->symbol = c_filter->name; + res->mark = 0; + if (c_filter->ctx->header_filter != NULL) { + res->mark += c_filter->ctx->header_filter (task); + } + if (c_filter->ctx->message_filter != NULL) { + res->mark += c_filter->ctx->message_filter (task); + } + if (c_filter->ctx->mime_filter != NULL) { + res->mark += c_filter->ctx->mime_filter (task); + } + if (c_filter->ctx->url_filter != NULL) { + res->mark += c_filter->ctx->url_filter (task); + } + TAILQ_INSERT_TAIL (&task->results, res, next); + } + + /* Process perl chains */ + LIST_FOREACH (chain, &task->cfg->filters, next) { + chain_res = malloc (sizeof (struct chain_result)); + if (chain_res == NULL) { + msg_err ("process_filters: malloc failed, %m"); + return; + } + i = 0; + chain_res->chain = chain; + chain_res->marks = malloc (sizeof (int) * chain->scripts_number); + chain_res->result_mark = 0; + if (chain_res->marks == NULL) { + free (chain_res); + msg_err ("process_filters: malloc failed, %m"); + return; + } + LIST_FOREACH (perl_script, chain->scripts, next) { + if (perl_script->type == SCRIPT_CHAIN) { + /* Skip chain filters first */ + continue; + } + res = malloc (sizeof (struct filter_result)); + if (res == NULL) { + msg_err ("process_filters: malloc failed, %m"); + return; + } + res->chain = chain; + res->symbol = perl_script->symbol; + res->mark = 0; + switch (perl_script->type) { + case SCRIPT_HEADER: + res->mark += perl_call_header_filter (perl_script->function, task); + break; + case SCRIPT_MESSAGE: + res->mark += perl_call_message_filter (perl_script->function, task); + break; + case SCRIPT_MIME: + res->mark += perl_call_mime_filter (perl_script->function, task); + break; + case SCRIPT_URL: + res->mark += perl_call_url_filter (perl_script->function, task); + break; + } + TAILQ_INSERT_TAIL (&task->results, res, next); + chain_res->marks[i++] = res->mark; + } + chain_res->marks_num = i; + TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next); + } + + /* Now process chain results */ + TAILQ_FOREACH (chain_res, &task->chain_results, next) { + i = 0; + LIST_FOREACH (perl_script, chain_res->chain->scripts, next) { + if (perl_script->type != SCRIPT_CHAIN) { + /* Skip not chain filters */ + continue; + } + /* Increment i; if i would be equal to zero that would mean that this chain has no chain filter script */ + i ++; + chain_res->result_mark += perl_call_url_filter (perl_script->function, task, chain_res->marks, chain_res->marks_num); + } + /* If chain has no chain filter, just do addition of all marks */ + if (i == 0) { + for (i = 0; i < chain_res->marks_num; i++) { + chain_res->result_mark += chain_res->marks[i]; + } + } + } +} static void process_message (struct worker_task *task) @@ -211,6 +326,8 @@ process_message (struct worker_task *task) g_mime_message_foreach_part (message, mime_foreach_callback, task); msg_info ("process_message: found %d parts in message", task->parts_count); + + process_filters (task); } static void @@ -405,6 +522,7 @@ accept_socket (int fd, short what, void *arg) new_task->state = READ_COMMAND; new_task->content_length = 0; new_task->parts_count = 0; + new_task->cfg = worker->srv->cfg; TAILQ_INIT (&new_task->urls); TAILQ_INIT (&new_task->results); TAILQ_INIT (&new_task->parts); @@ -427,7 +545,6 @@ void start_worker (struct rspamd_worker *worker, int listen_sock) { struct sigaction signals; - struct config_file *cfg = worker->srv->cfg; worker->srv->pid = getpid (); worker->srv->type = TYPE_WORKER; event_init (); -- 2.39.5