From a8cdd33ac7ee59e195dca03a395c264877ee5168 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 13 Jul 2009 20:54:13 +0400 Subject: [PATCH] * Rework the whole filters system * Add metrics optimization and symbols cache * Change all plugins [DRAGONS]: not for production usage, some things are still not working! --- CMakeLists.txt | 5 +- src/cfg_file.h | 11 +- src/cfg_file.l | 5 +- src/cfg_file.y | 35 +--- src/cfg_utils.c | 49 +---- src/filter.c | 199 ++++++-------------- src/filter.h | 11 +- src/lua-rspamd.h | 5 +- src/lua.c | 74 +------- src/main.c | 10 ++ src/main.h | 20 +-- src/perl.c | 96 ---------- src/perl.h | 5 +- src/plugins/chartable.c | 41 +++-- src/plugins/emails.c | 40 +++-- src/plugins/regexp.c | 77 ++++---- src/plugins/surbl.c | 129 +++++++------ src/plugins/surbl.h | 7 +- src/statfile.c | 2 +- src/symbols_cache.c | 389 ++++++++++++++++++++++++++++++++++++++++ src/symbols_cache.h | 54 ++++++ src/util.c | 4 +- src/util.h | 2 +- 23 files changed, 713 insertions(+), 557 deletions(-) create mode 100644 src/symbols_cache.c create mode 100644 src/symbols_cache.h diff --git a/CMakeLists.txt b/CMakeLists.txt index cfe62669e..5c2d2cbc1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ PROJECT(rspamd C) SET(RSPAMD_VERSION_MAJOR 0) SET(RSPAMD_VERSION_MINOR 2) -SET(RSPAMD_VERSION_PATCH 3) +SET(RSPAMD_VERSION_PATCH 4) SET(RSPAMD_VERSION "${RSPAMD_VERSION_MAJOR}.${RSPAMD_VERSION_MINOR}.${RSPAMD_VERSION_PATCH}") SET(RSPAMD_MASTER_SITE_URL "http://cebka.pp.ru/hg/rspamd") @@ -314,7 +314,8 @@ SET(RSPAMDSRC src/modules.c src/lmtp.c src/lmtp_proto.c src/radix.c - src/view.c) + src/view.c + src/symbols_cache.c) IF(ENABLE_PERL MATCHES "ON") LIST(APPEND RSPAMDSRC src/perl.c) diff --git a/src/cfg_file.h b/src/cfg_file.h index 850c34ece..d93bc9d34 100644 --- a/src/cfg_file.h +++ b/src/cfg_file.h @@ -212,18 +212,13 @@ struct config_file { GList *perl_modules; /**< linked list of perl modules to load */ - GList *header_filters; /**< linked list of all header's filters */ - GList *mime_filters; /**< linked list of all mime filters */ - GList *message_filters; /**< linked list of all message's filters */ - GList *url_filters; /**< linked list of all url's filters */ + GList *filters; /**< linked list of all filters */ GList *workers; /**< linked list of all workers params */ - char *header_filters_str; /**< string of header's filters */ - char *mime_filters_str; /**< string of mime's filters */ - char *message_filters_str; /**< string of message's filters */ - char *url_filters_str; /**< string for url's filters */ + char *filters_str; /**< string of filters */ GHashTable* modules_opts; /**< hash for module options indexed by module name */ GHashTable* variables; /**< hash of $variables defined in config, indexed by variable name */ GHashTable* metrics; /**< hash of metrics indexed by metric name */ + GList* metrics_list; /**< linked list of metrics */ GHashTable* factors; /**< hash of factors indexed by symbol name */ GHashTable* c_modules; /**< hash of c modules indexed by module name */ GHashTable* composite_symbols; /**< hash of composite symbols indexed by its name */ diff --git a/src/cfg_file.l b/src/cfg_file.l index 0cf635f7e..06e1a33e7 100644 --- a/src/cfg_file.l +++ b/src/cfg_file.l @@ -52,10 +52,7 @@ protocol return PROTOCOL; memcached return MEMCACHED; servers return SERVERS; require return REQUIRE; -header_filters return HEADER_FILTERS; -mime_filters return MIME_FILTERS; -message_filters return MESSAGE_FILTERS; -url_filters return URL_FILTERS; +filters return FILTERS; factors return FACTORS; metric return METRIC; name return NAME; diff --git a/src/cfg_file.y b/src/cfg_file.y index 1fdc7275f..af54a9fb4 100644 --- a/src/cfg_file.y +++ b/src/cfg_file.y @@ -49,7 +49,7 @@ struct rspamd_view *cur_view = NULL; %token READ_SERVERS WRITE_SERVER DIRECTORY_SERVERS MAILBOX_QUERY USERS_QUERY LASTLOGIN_QUERY %token MEMCACHED WORKER TYPE REQUIRE MODULE %token MODULE_OPT PARAM VARIABLE -%token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME +%token FILTERS FACTORS METRIC NAME %token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD %token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE %token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME @@ -84,10 +84,7 @@ command : | memcached | worker | require - | header_filters - | mime_filters - | message_filters - | url_filters + | filters | module_opt | variable | factors @@ -126,30 +123,9 @@ pidfile : ; -header_filters: - HEADER_FILTERS EQSIGN QUOTEDSTRING { - cfg->header_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); - free ($3); - } - ; - -mime_filters: - MIME_FILTERS EQSIGN QUOTEDSTRING { - cfg->mime_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); - free ($3); - } - ; - -message_filters: - MESSAGE_FILTERS EQSIGN QUOTEDSTRING { - cfg->message_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); - free ($3); - } - ; - -url_filters: - URL_FILTERS EQSIGN QUOTEDSTRING { - cfg->url_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); +filters: + FILTERS EQSIGN QUOTEDSTRING { + cfg->filters_str = memory_pool_strdup (cfg->cfg_pool, $3); free ($3); } ; @@ -372,6 +348,7 @@ metric: cur_metric->classifier = get_classifier ("winnow"); } g_hash_table_insert (cfg->metrics, cur_metric->name, cur_metric); + cfg->metrics_list = g_list_prepend (cfg->metrics_list, cur_metric); cur_metric = NULL; } ; diff --git a/src/cfg_utils.c b/src/cfg_utils.c index 900e55f1b..5688d44a0 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -218,6 +218,7 @@ free_config (struct config_file *cfg) g_hash_table_unref (cfg->statfiles); g_hash_table_remove_all (cfg->cfg_params); g_hash_table_unref (cfg->cfg_params); + g_list_free (cfg->metrics_list); memory_pool_delete (cfg->cfg_pool); } @@ -401,7 +402,7 @@ substitute_all_variables (gpointer key, gpointer value, gpointer data) } static void -parse_filters_str (struct config_file *cfg, const char *str, enum script_type type) +parse_filters_str (struct config_file *cfg, const char *str) { gchar **strvec, **p; struct filter *cur; @@ -425,24 +426,9 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct filter)); cur->type = C_FILTER; msg_debug ("parse_filters_str: found C filter %s", *p); - switch (type) { - case SCRIPT_HEADER: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->header_filters = g_list_prepend (cfg->header_filters, cur); - break; - case SCRIPT_MIME: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->mime_filters = g_list_prepend (cfg->mime_filters, cur); - break; - case SCRIPT_MESSAGE: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->message_filters = g_list_prepend (cfg->message_filters, cur); - break; - case SCRIPT_URL: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->url_filters = g_list_prepend (cfg->url_filters, cur); - break; - } + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + cfg->filters = g_list_prepend (cfg->filters, cur); + break; } } @@ -453,24 +439,8 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty } cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct filter)); cur->type = PERL_FILTER; - switch (type) { - case SCRIPT_HEADER: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->header_filters = g_list_prepend (cfg->header_filters, cur); - break; - case SCRIPT_MIME: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->mime_filters = g_list_prepend (cfg->mime_filters, cur); - break; - case SCRIPT_MESSAGE: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->message_filters = g_list_prepend (cfg->message_filters, cur); - break; - case SCRIPT_URL: - cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - cfg->url_filters = g_list_prepend (cfg->message_filters, cur); - break; - } + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + cfg->filters = g_list_prepend (cfg->filters, cur); p ++; } @@ -512,10 +482,7 @@ post_load_config (struct config_file *cfg) g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg); g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg); - parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER); - parse_filters_str (cfg, cfg->mime_filters_str, SCRIPT_MIME); - parse_filters_str (cfg, cfg->message_filters_str, SCRIPT_MESSAGE); - parse_filters_str (cfg, cfg->url_filters_str, SCRIPT_URL); + parse_filters_str (cfg, cfg->filters_str); fill_cfg_params (cfg); #ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID diff --git a/src/filter.c b/src/filter.c index 1c45f0886..2bfd2bc36 100644 --- a/src/filter.c +++ b/src/filter.c @@ -144,7 +144,7 @@ factor_consolidation_func (struct worker_task *task, const char *metric_name, co * Call perl or C module function for specified part of message */ static void -call_filter_by_name (struct worker_task *task, const char *name, enum filter_type filt_type, enum script_type sc_type) +call_filter_by_name (struct worker_task *task, const char *name, enum filter_type filt_type) { struct module_ctx *c_module; int res = 0; @@ -154,20 +154,7 @@ call_filter_by_name (struct worker_task *task, const char *name, enum filter_typ c_module = g_hash_table_lookup (task->worker->srv->cfg->c_modules, name); if (c_module) { res = 1; - switch (sc_type) { - case SCRIPT_HEADER: - c_module->header_filter (task); - break; - case SCRIPT_MIME: - c_module->mime_filter (task); - break; - case SCRIPT_URL: - c_module->url_filter (task); - break; - case SCRIPT_MESSAGE: - c_module->message_filter (task); - break; - } + c_module->filter (task); } else { msg_debug ("call_filter_by_name: %s is not a C module", name); @@ -176,35 +163,9 @@ call_filter_by_name (struct worker_task *task, const char *name, enum filter_typ case PERL_FILTER: res = 1; #ifndef WITHOUT_PERL - switch (sc_type) { - case SCRIPT_HEADER: - perl_call_header_filter (name, task); - break; - case SCRIPT_MIME: - perl_call_mime_filter (name, task); - break; - case SCRIPT_URL: - perl_call_url_filter (name, task); - break; - case SCRIPT_MESSAGE: - perl_call_message_filter (name, task); - break; - } + perl_call_filter (name, task); #elif defined(WITH_LUA) - switch (sc_type) { - case SCRIPT_HEADER: - lua_call_header_filter (name, task); - break; - case SCRIPT_MIME: - lua_call_mime_filter (name, task); - break; - case SCRIPT_URL: - lua_call_url_filter (name, task); - break; - case SCRIPT_MESSAGE: - lua_call_message_filter (name, task); - break; - } + lua_call_filter (name, task); #else msg_err ("call_filter_by_name: trying to call perl function while perl support is disabled %s", name); #endif @@ -250,131 +211,77 @@ metric_process_callback_forced (gpointer key, gpointer value, void *data) metric_process_callback_common (key, value, data, TRUE); } +/* Return true if metric has score that is more than spam score for it */ +static gboolean +check_metric_is_spam (struct worker_task *task, struct metric *metric) +{ + struct metric_result *res; + + res = g_hash_table_lookup (task->results, metric->name); + if (res) { + return res->score >= metric->required_score; + } + + return FALSE; +} + static int continue_process_filters (struct worker_task *task) { GList *cur = task->save.entry; - struct filter *filt = cur->data; + struct cache_item *item = task->save.item; + + struct metric *metric = cur->data; - cur = g_list_next (cur); - /* Note: no breaks in this case! */ - switch (task->save.type) { - case SCRIPT_HEADER: - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_HEADER; - return 0; - } - cur = g_list_next (cur); - } - /* Process mime filters */ - cur = g_list_first (task->worker->srv->cfg->mime_filters); - case SCRIPT_MIME: - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MIME); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_MIME; - return 0; - } - cur = g_list_next (cur); - } - /* Process url filters */ - cur = g_list_first (task->worker->srv->cfg->url_filters); - case SCRIPT_URL: - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_URL); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_URL; - return 0; - } - cur = g_list_next (cur); + while (cur) { + metric = cur->data; + while (call_symbol_callback (task, metric->cache, &item)) { + /* call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); */ + if (task->save.saved) { + task->save.entry = cur; + task->save.item = item; + return 0; } - /* Process message filters */ - cur = g_list_first (task->worker->srv->cfg->message_filters); - case SCRIPT_MESSAGE: - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MESSAGE); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_MESSAGE; - return 0; - } - cur = g_list_next (cur); + else if (check_metric_is_spam (task, metric)) { + break; } - /* Process all statfiles */ - process_statfiles (task); - /* XXX: ugly direct call */ - task->dispatcher->write_callback (task); - return 1; + } + cur = g_list_next (cur); } - return -1; + /* Process all statfiles */ + process_statfiles (task); + /* XXX: ugly direct call */ + task->dispatcher->write_callback (task); + return 1; } int process_filters (struct worker_task *task) { GList *cur; - struct filter *filt; + struct metric *metric; + struct cache_item *item = NULL; if (task->save.saved) { task->save.saved = 0; return continue_process_filters (task); } - /* Process filters in order that they are listed in config file */ - cur = task->worker->srv->cfg->header_filters; - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_HEADER; - return 0; - } - cur = g_list_next (cur); - } - - cur = task->worker->srv->cfg->mime_filters; + /* Process metrics symbols */ + cur = task->worker->srv->cfg->metrics_list; while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MIME); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_MIME; - return 0; - } - cur = g_list_next (cur); - } - - cur = task->worker->srv->cfg->url_filters; - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_URL); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_URL; - return 0; - } - cur = g_list_next (cur); - } - - cur = task->worker->srv->cfg->message_filters; - while (cur) { - filt = cur->data; - call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MESSAGE); - if (task->save.saved) { - task->save.entry = cur; - task->save.type = SCRIPT_MESSAGE; - return 0; + metric = cur->data; + while (call_symbol_callback (task, metric->cache, &item)) { + /* call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); */ + if (task->save.saved) { + task->save.entry = cur; + task->save.item = item; + return 0; + } + else if (check_metric_is_spam (task, metric)) { + break; + } } cur = g_list_next (cur); } diff --git a/src/filter.h b/src/filter.h index e0c989f85..56a70fdf7 100644 --- a/src/filter.h +++ b/src/filter.h @@ -7,6 +7,7 @@ #define RSPAMD_FILTER_H #include "config.h" +#include "symbols_cache.h" struct worker_task; @@ -19,16 +20,16 @@ enum filter_type { C_FILTER, PERL_FILTER }; * Filter structure */ struct filter { - char *func_name; /**< function name */ - enum filter_type type; /**< filter type (c or perl) */ + char *func_name; /**< function name */ + enum filter_type type; /**< filter type (c or perl) */ }; /** * Rspamd symbol */ struct symbol { - double score; /**< symbol's score */ - GList *options; /**< list of symbol's options */ + double score; /**< symbol's score */ + GList *options; /**< list of symbol's options */ }; /** @@ -40,6 +41,8 @@ struct metric { metric_cons_func func; /**< c consolidation function */ double required_score; /**< required score for this metric */ struct classifier *classifier; /**< classifier that is used for metric */ + struct symbols_cache *cache; /**< symbols cache for metric */ + char *cache_filename; /**< filename of cache file */ }; /** diff --git a/src/lua-rspamd.h b/src/lua-rspamd.h index 0d0b2a693..2a5692583 100644 --- a/src/lua-rspamd.h +++ b/src/lua-rspamd.h @@ -9,10 +9,7 @@ struct config_file; void init_lua_filters (struct config_file *cfg); -int lua_call_header_filter (const char *function, struct worker_task *task); -int lua_call_mime_filter (const char *function, struct worker_task *task); -int lua_call_message_filter (const char *function, struct worker_task *task); -int lua_call_url_filter (const char *function, struct worker_task *task); +int lua_call_filter (const char *function, struct worker_task *task); int lua_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number); double lua_consolidation_func (struct worker_task *task, const char *metric_name, const char *function_name); void add_luabuf (const char *line); diff --git a/src/lua.c b/src/lua.c index edc64bc91..df03eafa6 100644 --- a/src/lua.c +++ b/src/lua.c @@ -335,79 +335,7 @@ init_lua_filters (struct config_file *cfg) int -lua_call_header_filter (const char *function, struct worker_task *task) -{ - int result; - struct worker_task **ptask; - - lua_getglobal (L, function); - ptask = lua_newuserdata (L, sizeof (struct worker_task *)); - lua_setclass (L, "Rspamd.task", -1); - *ptask = task; - - if (lua_pcall (L, 1, 1, 0) != 0) { - msg_info ("lua_init_filters: call to %s failed", function); - } - - /* retrieve result */ - if (!lua_isnumber (L, -1)) { - msg_info ("lua_call_header_filter: function %s must return a number", function); - } - result = lua_tonumber (L, -1); - lua_pop (L, 1); /* pop returned value */ - return result; -} - -int -lua_call_mime_filter (const char *function, struct worker_task *task) -{ - int result; - struct worker_task **ptask; - - lua_getglobal (L, function); - ptask = lua_newuserdata (L, sizeof (struct worker_task *)); - lua_setclass (L, "Rspamd.task", -1); - *ptask = task; - - if (lua_pcall (L, 1, 1, 0) != 0) { - msg_info ("lua_init_filters: call to %s failed", function); - } - - /* retrieve result */ - if (!lua_isnumber (L, -1)) { - msg_info ("lua_call_header_filter: function %s must return a number", function); - } - result = lua_tonumber (L, -1); - lua_pop (L, 1); /* pop returned value */ - return result; -} - -int -lua_call_message_filter (const char *function, struct worker_task *task) -{ - int result; - struct worker_task **ptask; - - lua_getglobal (L, function); - ptask = lua_newuserdata (L, sizeof (struct worker_task *)); - lua_setclass (L, "Rspamd.task", -1); - *ptask = task; - - if (lua_pcall (L, 1, 1, 0) != 0) { - msg_info ("lua_init_filters: call to %s failed", function); - } - - /* retrieve result */ - if (!lua_isnumber (L, -1)) { - msg_info ("lua_call_header_filter: function %s must return a number", function); - } - result = lua_tonumber (L, -1); - lua_pop (L, 1); /* pop returned value */ - return result; -} - -int -lua_call_url_filter (const char *function, struct worker_task *task) +lua_call_filter (const char *function, struct worker_task *task) { int result; struct worker_task **ptask; diff --git a/src/main.c b/src/main.c index dc0226680..beefabdcb 100644 --- a/src/main.c +++ b/src/main.c @@ -435,8 +435,10 @@ main (int argc, char **argv, char **env) struct sigaction signals; struct rspamd_worker *cur, *cur_tmp, *active_worker; struct rlimit rlim; + struct metric *metric; FILE *f; pid_t wrk; + GList *l; #ifndef WITHOUT_PERL char *args[] = { "", "-e", "0", NULL }; #endif @@ -609,6 +611,14 @@ main (int argc, char **argv, char **env) for (i = 0; i < MODULES_NUM; i ++) { modules[i].module_config_func (cfg); } + + /* Init symbols cache for each metric */ + l = g_list_first (cfg->metrics_list); + while (l) { + metric = l->data; + init_symbols_cache (cfg->cfg_pool, metric->cache, metric->cache_filename); + l = g_list_next (l); + } spawn_workers (rspamd); diff --git a/src/main.h b/src/main.h index c929e6cb7..0e8f9966c 100644 --- a/src/main.h +++ b/src/main.h @@ -50,16 +50,6 @@ enum process_type { TYPE_FUZZY, }; -/** - * Filter type - */ -enum script_type { - SCRIPT_HEADER, - SCRIPT_MIME, - SCRIPT_URL, - SCRIPT_MESSAGE, -}; - /** * Worker process structure @@ -122,8 +112,8 @@ struct counter_data { * Save point object for delayed filters processing */ struct save_point { - GList *entry; /**< pointer to saved filter */ - enum script_type type; + GList *entry; /**< pointer to saved metric */ + void *item; /**< pointer to saved item */ unsigned int saved; /**< how much time we have delayed processing */ }; @@ -207,10 +197,7 @@ struct worker_task { * Common structure representing C module context */ struct module_ctx { - int (*header_filter)(struct worker_task *task); /**< pointer to headers process function */ - int (*mime_filter)(struct worker_task *task); /**< pointer to mime parts process function */ - int (*message_filter)(struct worker_task *task); /**< pointer to the whole message process function */ - int (*url_filter)(struct worker_task *task); /**< pointer to urls process function */ + int (*filter)(struct worker_task *task); /**< pointer to headers process function */ }; /** @@ -219,7 +206,6 @@ struct module_ctx { struct c_module { const char *name; /**< name */ struct module_ctx *ctx; /**< pointer to context */ - LIST_ENTRY (c_module) next; /**< linked list */ }; void start_worker (struct rspamd_worker *worker); diff --git a/src/perl.c b/src/perl.c index f39ac7df0..eab3a424d 100644 --- a/src/perl.c +++ b/src/perl.c @@ -125,102 +125,6 @@ perl_call_header_filter (const char *function, struct worker_task *task) return result; } -int -perl_call_mime_filter (const char *function, struct worker_task *task) -{ - int result; - SV *sv; - - dTHXa (perl_interpreter); - PERL_SET_CONTEXT (perl_interpreter); - - dSP; - ENTER; - SAVETMPS; - - PUSHMARK (SP); - sv = sv_2mortal (sv_bless (newRV_noinc (newSViv (PTR2IV(task))), rspamd_task_stash)); - XPUSHs (sv); - PUTBACK; - - call_pv (function, G_SCALAR); - - SPAGAIN; - - result = POPi; - msg_debug ("mime_filter: call of %s returned mark %d\n", function, result); - - PUTBACK; - FREETMPS; - LEAVE; - - return result; -} - -int -perl_call_message_filter (const char *function, struct worker_task *task) -{ - int result; - SV *sv; - - dTHXa (perl_interpreter); - PERL_SET_CONTEXT (perl_interpreter); - - dSP; - ENTER; - SAVETMPS; - - PUSHMARK (SP); - sv = sv_2mortal (sv_bless (newRV_noinc (newSViv (PTR2IV(task))), rspamd_task_stash)); - XPUSHs (sv); - PUTBACK; - - call_pv (function, G_SCALAR); - - SPAGAIN; - - result = POPi; - msg_debug ("message_filter: call of %s returned mark %d\n", function, result); - - PUTBACK; - FREETMPS; - LEAVE; - - return result; -} - -int -perl_call_url_filter (const char *function, struct worker_task *task) -{ - int result; - SV *sv; - - dTHXa (perl_interpreter); - PERL_SET_CONTEXT (perl_interpreter); - - dSP; - ENTER; - SAVETMPS; - - PUSHMARK (SP); - sv = sv_2mortal (sv_bless (newRV_noinc (newSViv (PTR2IV(task))), rspamd_task_stash)); - XPUSHs (sv); - PUTBACK; - - call_pv (function, G_SCALAR); - - SPAGAIN; - - result = POPi; - msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result); - - PUTBACK; - FREETMPS; - LEAVE; - - return result; -} - int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number) { diff --git a/src/perl.h b/src/perl.h index 9b1f8af63..109ead48d 100644 --- a/src/perl.h +++ b/src/perl.h @@ -12,10 +12,7 @@ struct config_file; void init_perl_filters (struct config_file *cfg); -int perl_call_header_filter (const char *function, struct worker_task *task); -int perl_call_mime_filter (const char *function, struct worker_task *task); -int perl_call_message_filter (const char *function, struct worker_task *task); -int perl_call_url_filter (const char *function, struct worker_task *task); +int perl_call_filter (const char *function, struct worker_task *task); int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number); void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data); diff --git a/src/plugins/chartable.c b/src/plugins/chartable.c index bb0f79da7..19c46c7b7 100644 --- a/src/plugins/chartable.c +++ b/src/plugins/chartable.c @@ -38,10 +38,7 @@ #define DEFAULT_THRESHOLD 0.1 struct chartable_ctx { - int (*header_filter)(struct worker_task *task); - int (*mime_filter)(struct worker_task *task); - int (*message_filter)(struct worker_task *task); - int (*url_filter)(struct worker_task *task); + int (*filter)(struct worker_task *task); char *metric; char *symbol; double threshold; @@ -52,16 +49,14 @@ struct chartable_ctx { static struct chartable_ctx *chartable_module_ctx = NULL; static int chartable_mime_filter (struct worker_task *task); +static void chartable_symbol_callback (struct worker_task *task, void *unused); int chartable_module_init (struct config_file *cfg, struct module_ctx **ctx) { chartable_module_ctx = g_malloc (sizeof (struct chartable_ctx)); - chartable_module_ctx->header_filter = NULL; - chartable_module_ctx->mime_filter = chartable_mime_filter; - chartable_module_ctx->message_filter = NULL; - chartable_module_ctx->url_filter = NULL; + chartable_module_ctx->filter = chartable_mime_filter; chartable_module_ctx->chartable_pool = memory_pool_new (memory_pool_get_size ()); *ctx = (struct module_ctx *)chartable_module_ctx; @@ -75,6 +70,8 @@ chartable_module_config (struct config_file *cfg) { char *value; int res = TRUE; + struct metric *metric; + double *w; if ((value = get_module_opt (cfg, "chartable", "metric")) != NULL) { chartable_module_ctx->metric = memory_pool_strdup (chartable_module_ctx->chartable_pool, value); @@ -101,7 +98,22 @@ chartable_module_config (struct config_file *cfg) else { chartable_module_ctx->threshold = DEFAULT_THRESHOLD; } - + + metric = g_hash_table_lookup (cfg->metrics, chartable_module_ctx->metric); + if (metric == NULL) { + msg_err ("chartable_module_config: cannot find metric definition %s", chartable_module_ctx->metric); + return FALSE; + } + + /* Search in factors hash table */ + w = g_hash_table_lookup (cfg->factors, chartable_module_ctx->symbol); + if (w == NULL) { + register_symbol (metric->cache, chartable_module_ctx->symbol, 1, chartable_symbol_callback, NULL); + } + else { + register_symbol (metric->cache, chartable_module_ctx->symbol, *w, chartable_symbol_callback, NULL); + } + return res; } @@ -178,8 +190,8 @@ check_part (struct mime_text_part *part, gboolean raw_mode) return ((double)mark / (double)total) > chartable_module_ctx->threshold; } -static int -chartable_mime_filter (struct worker_task *task) +static void +chartable_symbol_callback (struct worker_task *task, void *unused) { GList *cur; @@ -193,6 +205,11 @@ chartable_mime_filter (struct worker_task *task) } } - return 0; } +static int +chartable_mime_filter (struct worker_task *task) +{ + /* XXX: remove it */ + return 0; +} diff --git a/src/plugins/emails.c b/src/plugins/emails.c index 0b7e35d84..6b789916b 100644 --- a/src/plugins/emails.c +++ b/src/plugins/emails.c @@ -40,10 +40,7 @@ static const char *email_re_text = "[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+(?:[A-Z]{2}|com|org|net|gov|mil|biz|info|mobi|name|aero|jobs|museum)\\b"; struct email_ctx { - int (*header_filter)(struct worker_task *task); - int (*mime_filter)(struct worker_task *task); - int (*message_filter)(struct worker_task *task); - int (*url_filter)(struct worker_task *task); + int (*filter)(struct worker_task *task); char *metric; char *symbol; GRegex *email_re; @@ -57,6 +54,7 @@ struct email_ctx { static struct email_ctx *email_module_ctx = NULL; static int emails_mime_filter (struct worker_task *task); +static void emails_symbol_callback (struct worker_task *task, void *unused); static int emails_command_handler (struct worker_task *task); int @@ -66,10 +64,7 @@ emails_module_init (struct config_file *cfg, struct module_ctx **ctx) email_module_ctx = g_malloc (sizeof (struct email_ctx)); - email_module_ctx->header_filter = NULL; - email_module_ctx->mime_filter = emails_mime_filter; - email_module_ctx->message_filter = NULL; - email_module_ctx->url_filter = NULL; + email_module_ctx->filter = emails_mime_filter; email_module_ctx->email_pool = memory_pool_new (memory_pool_get_size ()); email_module_ctx->email_re = g_regex_new (email_re_text, G_REGEX_RAW | G_REGEX_OPTIMIZE | G_REGEX_CASELESS, 0, &err); email_module_ctx->blacklist = g_hash_table_new (g_str_hash, g_str_equal); @@ -87,6 +82,8 @@ emails_module_config (struct config_file *cfg) { char *value; int res = TRUE; + struct metric *metric; + double *w; if ((value = get_module_opt (cfg, "emails", "metric")) != NULL) { email_module_ctx->metric = memory_pool_strdup (email_module_ctx->email_pool, value); @@ -109,6 +106,22 @@ emails_module_config (struct config_file *cfg) } } } + + metric = g_hash_table_lookup (cfg->metrics, email_module_ctx->metric); + if (metric == NULL) { + msg_err ("emails_module_config: cannot find metric definition %s", email_module_ctx->metric); + return FALSE; + } + + /* Search in factors hash table */ + w = g_hash_table_lookup (cfg->factors, email_module_ctx->symbol); + if (w == NULL) { + register_symbol (metric->cache, email_module_ctx->symbol, 1, emails_symbol_callback, NULL); + } + else { + register_symbol (metric->cache, email_module_ctx->symbol, *w, emails_symbol_callback, NULL); + } + return res; } @@ -198,8 +211,8 @@ emails_command_handler (struct worker_task *task) return 0; } -static int -emails_mime_filter (struct worker_task *task) +static void +emails_symbol_callback (struct worker_task *task, void *unused) { GList *emails, *cur; @@ -220,6 +233,11 @@ emails_mime_filter (struct worker_task *task) } } - return 0; } +static int +emails_mime_filter (struct worker_task *task) +{ + /* XXX: remove this */ + return 0; +} diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 097df7f96..7b123dd33 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -53,11 +53,7 @@ struct autolearn_data { }; struct regexp_ctx { - int (*header_filter)(struct worker_task *task); - int (*mime_filter)(struct worker_task *task); - int (*message_filter)(struct worker_task *task); - int (*url_filter)(struct worker_task *task); - GList *items; + int (*filter)(struct worker_task *task); GHashTable *autolearn_symbols; char *metric; char *statfile_prefix; @@ -70,18 +66,16 @@ static struct regexp_ctx *regexp_module_ctx = NULL; static int regexp_common_filter (struct worker_task *task); static gboolean rspamd_regexp_match_number (struct worker_task *task, GList *args); static gboolean rspamd_raw_header_exists (struct worker_task *task, GList *args); +static void process_regexp_item (struct worker_task *task, void *user_data); + int regexp_module_init (struct config_file *cfg, struct module_ctx **ctx) { regexp_module_ctx = g_malloc (sizeof (struct regexp_ctx)); - regexp_module_ctx->header_filter = regexp_common_filter; - regexp_module_ctx->mime_filter = NULL; - regexp_module_ctx->message_filter = NULL; - regexp_module_ctx->url_filter = NULL; + regexp_module_ctx->filter = regexp_common_filter; regexp_module_ctx->regexp_pool = memory_pool_new (1024); - regexp_module_ctx->items = NULL; regexp_module_ctx->autolearn_symbols = g_hash_table_new (g_str_hash, g_str_equal); *ctx = (struct module_ctx *)regexp_module_ctx; @@ -155,8 +149,10 @@ regexp_module_config (struct config_file *cfg) GList *cur_opt = NULL; struct module_opt *cur; struct regexp_module_item *cur_item; + struct metric *metric; char *value; int res = TRUE; + double *w; if ((value = get_module_opt (cfg, "regexp", "metric")) != NULL) { regexp_module_ctx->metric = memory_pool_strdup (regexp_module_ctx->regexp_pool, value); @@ -172,6 +168,12 @@ regexp_module_config (struct config_file *cfg) else { regexp_module_ctx->statfile_prefix = DEFAULT_STATFILE_PREFIX; } + + metric = g_hash_table_lookup (cfg->metrics, regexp_module_ctx->metric); + if (metric == NULL) { + msg_err ("regexp_module_config: cannot find metric definition %s", regexp_module_ctx->metric); + return FALSE; + } cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); while (cur_opt) { @@ -188,8 +190,16 @@ regexp_module_config (struct config_file *cfg) if (!read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->param, cur->value, cfg)) { res = FALSE; } - set_counter (cur_item->symbol, 0); - regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item); + + /* Search in factors hash table */ + w = g_hash_table_lookup (cfg->factors, cur->param); + if (w == NULL) { + register_symbol (metric->cache, cur->param, 1, process_regexp_item, cur_item); + } + else { + register_symbol (metric->cache, cur->param, *w, process_regexp_item, cur_item); + } + cur_opt = g_list_next (cur_opt); } @@ -587,47 +597,20 @@ process_regexp_expression (struct expression *expr, struct worker_task *task) } static void -process_regexp_item (struct regexp_module_item *item, struct worker_task *task) -{ - struct timespec ts1, ts2; - uint64_t diff; - - if (check_view (task->cfg->views, item->symbol, task)) { -#ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID - clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &ts1); -#elif defined(HAVE_CLOCK_VIRTUAL) - clock_gettime (CLOCK_VIRTUAL, &ts1); -#else - clock_gettime (CLOCK_REALTIME, &ts1); -#endif - if (process_regexp_expression (item->expr, task)) { - insert_result (task, regexp_module_ctx->metric, item->symbol, 1, NULL); - } - -#ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID - clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &ts2); -#elif defined(HAVE_CLOCK_VIRTUAL) - clock_gettime (CLOCK_VIRTUAL, &ts2); -#else - clock_gettime (CLOCK_REALTIME, &ts2); -#endif +process_regexp_item (struct worker_task *task, void *user_data) +{ + struct regexp_module_item *item = user_data; - diff = (ts2.tv_sec - ts1.tv_sec) * 1000000 + (ts2.tv_nsec - ts1.tv_nsec) / 1000; - set_counter (item->symbol, diff); + if (process_regexp_expression (item->expr, task)) { + insert_result (task, regexp_module_ctx->metric, item->symbol, 1, NULL); } } -static int +static int regexp_common_filter (struct worker_task *task) { - GList *cur_expr = g_list_first (regexp_module_ctx->items); - - while (cur_expr) { - process_regexp_item ((struct regexp_module_item *)cur_expr->data, task); - cur_expr = g_list_next (cur_expr); - } - - return 0; + /* XXX: remove this shit too */ + return 0; } static gboolean diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 3e00b0e73..691c57681 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -36,7 +36,8 @@ static struct surbl_ctx *surbl_module_ctx = NULL; -static int surbl_test_url (struct worker_task *task); +static int surbl_filter (struct worker_task *task); +static void surbl_test_url (struct worker_task *task, void *user_data); static void dns_callback (int result, char type, int count, int ttl, void *addresses, void *data); static void process_dns_results (struct worker_task *task, struct suffix_item *suffix, char *url, uint32_t addr); static int urls_command_handler (struct worker_task *task); @@ -56,10 +57,7 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx) surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx)); - surbl_module_ctx->header_filter = NULL; - surbl_module_ctx->mime_filter = NULL; - surbl_module_ctx->message_filter = NULL; - surbl_module_ctx->url_filter = surbl_test_url; + surbl_module_ctx->filter = surbl_filter; surbl_module_ctx->use_redirector = 0; surbl_module_ctx->suffixes = NULL; surbl_module_ctx->bits = NULL; @@ -98,6 +96,8 @@ surbl_module_config (struct config_file *cfg) struct module_opt *cur; struct suffix_item *new_suffix; struct surbl_bit_item *new_bit; + struct metric *metric; + double *w; char *value, *cur_tok, *str; uint32_t bit; @@ -172,6 +172,13 @@ surbl_module_config (struct config_file *cfg) } } + metric = g_hash_table_lookup (cfg->metrics, surbl_module_ctx->metric); + if (metric == NULL) { + msg_err ("surbl_module_config: cannot find metric definition %s", surbl_module_ctx->metric); + return FALSE; + } + + cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl"); while (cur_opt) { cur = cur_opt->data; @@ -185,6 +192,14 @@ surbl_module_config (struct config_file *cfg) new_suffix->suffix, new_suffix->symbol); *str = '_'; surbl_module_ctx->suffixes = g_list_prepend (surbl_module_ctx->suffixes, new_suffix); + /* Search in factors hash table */ + w = g_hash_table_lookup (cfg->factors, new_suffix->symbol); + if (w == NULL) { + register_symbol (metric->cache, new_suffix->symbol, 1, surbl_test_url, NULL); + } + else { + register_symbol (metric->cache, new_suffix->symbol, *w, surbl_test_url, NULL); + } } } if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { @@ -210,6 +225,13 @@ surbl_module_config (struct config_file *cfg) msg_debug ("surbl_module_config: add default surbl suffix: %s with symbol: %s", new_suffix->suffix, new_suffix->symbol); surbl_module_ctx->suffixes = g_list_prepend (surbl_module_ctx->suffixes, new_suffix); + w = g_hash_table_lookup (cfg->factors, new_suffix->symbol); + if (w == NULL) { + register_symbol (metric->cache, new_suffix->symbol, 1, surbl_test_url, new_suffix); + } + else { + register_symbol (metric->cache, new_suffix->symbol, *w, surbl_test_url, new_suffix); + } } return TRUE; @@ -356,55 +378,48 @@ format_surbl_request (memory_pool_t *pool, f_str_t *hostname, struct suffix_item } static void -make_surbl_requests (struct uri* url, struct worker_task *task, GTree *tree) +make_surbl_requests (struct uri* url, struct worker_task *task, GTree *tree, struct suffix_item *suffix) { char *surbl_req; f_str_t f; - GList *cur; GError *err = NULL; struct dns_param *param; - struct suffix_item *suffix; char *host_end; - cur = g_list_first (surbl_module_ctx->suffixes); f.begin = url->host; f.len = url->hostlen; - while (cur) { - suffix = (struct suffix_item *)cur->data; - if (check_view (task->cfg->views, suffix->symbol, task)) { - if ((surbl_req = format_surbl_request (task->task_pool, &f, suffix, &host_end, TRUE, &err)) != NULL) { - if (g_tree_lookup (tree, surbl_req) == NULL) { - g_tree_insert (tree, surbl_req, surbl_req); - param = memory_pool_alloc (task->task_pool, sizeof (struct dns_param)); - param->url = url; - param->task = task; - param->suffix = suffix; - *host_end = '\0'; - param->host_resolve = memory_pool_strdup (task->task_pool, surbl_req); - *host_end = '.'; - msg_debug ("surbl_test_url: send surbl dns request %s", surbl_req); - if (evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param) == 0) { - param->task->save.saved ++; - } - } - else { - msg_debug ("make_surbl_requests: request %s is already sent", surbl_req); + if (check_view (task->cfg->views, suffix->symbol, task)) { + if ((surbl_req = format_surbl_request (task->task_pool, &f, suffix, &host_end, TRUE, &err)) != NULL) { + if (g_tree_lookup (tree, surbl_req) == NULL) { + g_tree_insert (tree, surbl_req, surbl_req); + param = memory_pool_alloc (task->task_pool, sizeof (struct dns_param)); + param->url = url; + param->task = task; + param->suffix = suffix; + *host_end = '\0'; + param->host_resolve = memory_pool_strdup (task->task_pool, surbl_req); + *host_end = '.'; + msg_debug ("surbl_test_url: send surbl dns request %s", surbl_req); + if (evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param) == 0) { + param->task->save.saved ++; } } - else if (err != NULL && err->code != WHITELIST_ERROR) { - msg_info ("surbl_test_url: cannot format url string for surbl %s, %s", struri (url), err->message); - g_error_free (err); - return; - } - else if (err != NULL) { - g_error_free (err); + else { + msg_debug ("make_surbl_requests: request %s is already sent", surbl_req); } } - else { - msg_debug ("make_surbl_requests: skipping symbol that is not in view: %s", suffix->symbol); + else if (err != NULL && err->code != WHITELIST_ERROR) { + msg_info ("surbl_test_url: cannot format url string for surbl %s, %s", struri (url), err->message); + g_error_free (err); + return; + } + else if (err != NULL) { + g_error_free (err); } - cur = g_list_next (cur); + } + else { + msg_debug ("make_surbl_requests: skipping symbol that is not in view: %s", suffix->symbol); } } @@ -531,7 +546,7 @@ memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) param->task->save.saved = 1; process_filters (param->task); } - make_surbl_requests (param->url, param->task, param->tree); + make_surbl_requests (param->url, param->task, param->tree, param->suffix); break; default: return; @@ -539,7 +554,7 @@ memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) } static void -register_memcached_call (struct uri *url, struct worker_task *task, GTree *url_tree) +register_memcached_call (struct uri *url, struct worker_task *task, GTree *url_tree, struct suffix_item *suffix) { struct memcached_param *param; struct memcached_server *selected; @@ -554,6 +569,7 @@ register_memcached_call (struct uri *url, struct worker_task *task, GTree *url_t param->url = url; param->task = task; param->tree = url_tree; + param->suffix = suffix; param->ctx = memory_pool_alloc0 (task->task_pool, sizeof (memcached_ctx_t)); @@ -614,7 +630,7 @@ redirector_callback (int fd, short what, void *arg) event_del (¶m->ev); close (fd); param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree); + make_surbl_requests (param->url, param->task, param->tree, param->suffix); if (param->task->save.saved == 0) { /* Call other filters */ param->task->save.saved = 1; @@ -630,7 +646,7 @@ redirector_callback (int fd, short what, void *arg) msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write", param->task->message_id); param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree); + make_surbl_requests (param->url, param->task, param->tree, param->suffix); if (param->task->save.saved == 0) { /* Call other filters */ @@ -660,7 +676,7 @@ redirector_callback (int fd, short what, void *arg) event_del (¶m->ev); close (fd); param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree); + make_surbl_requests (param->url, param->task, param->tree, param->suffix); if (param->task->save.saved == 0) { /* Call other filters */ param->task->save.saved = 1; @@ -673,7 +689,7 @@ redirector_callback (int fd, short what, void *arg) msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read", param->task->message_id); param->task->save.saved --; - make_surbl_requests (param->url, param->task, param->tree); + make_surbl_requests (param->url, param->task, param->tree, param->suffix); if (param->task->save.saved == 0) { /* Call other filters */ param->task->save.saved = 1; @@ -686,7 +702,7 @@ redirector_callback (int fd, short what, void *arg) static void -register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_tree) +register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_tree, struct suffix_item *suffix) { int s; struct redirector_param *param; @@ -698,7 +714,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_ msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s", task->message_id, strerror (errno)); task->save.saved --; - make_surbl_requests (url, task, url_tree); + make_surbl_requests (url, task, url_tree, suffix); return; } @@ -708,6 +724,7 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_ param->state = STATE_CONNECT; param->sock = s; param->tree = url_tree; + param->suffix = suffix; timeout = memory_pool_alloc (task->task_pool, sizeof (struct timeval)); timeout->tv_sec = surbl_module_ctx->connect_timeout / 1000; timeout->tv_usec = surbl_module_ctx->connect_timeout - timeout->tv_sec * 1000; @@ -724,29 +741,30 @@ tree_url_callback (gpointer key, gpointer value, void *data) msg_debug ("surbl_test_url: check url %s", struri (url)); if (surbl_module_ctx->use_redirector) { - register_redirector_call (url, param->task, param->tree); + register_redirector_call (url, param->task, param->tree, param->suffix); param->task->save.saved++; } else { if (param->task->worker->srv->cfg->memcached_servers_num > 0) { - register_memcached_call (url, param->task, param->tree); + register_memcached_call (url, param->task, param->tree, param->suffix); param->task->save.saved++; } else { - make_surbl_requests (url, param->task, param->tree); + make_surbl_requests (url, param->task, param->tree, param->suffix); } } return FALSE; } -static int -surbl_test_url (struct worker_task *task) +static void +surbl_test_url (struct worker_task *task, void *user_data) { GTree *url_tree; GList *cur; struct mime_text_part *part; struct redirector_param param; + struct suffix_item *suffix = user_data; /* Try to check lists */ if (surbl_module_ctx->tld2_file) { @@ -760,6 +778,7 @@ surbl_test_url (struct worker_task *task) param.tree = url_tree; param.task = task; + param.suffix = suffix; cur = task->text_parts; while (cur) { part = cur->data; @@ -774,6 +793,12 @@ surbl_test_url (struct worker_task *task) } memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_tree_destroy, url_tree); +} + +static int +surbl_filter (struct worker_task *task) +{ + /* XXX: remove this shit */ return 0; } diff --git a/src/plugins/surbl.h b/src/plugins/surbl.h index 8f674c9ca..cfe17b404 100644 --- a/src/plugins/surbl.h +++ b/src/plugins/surbl.h @@ -17,10 +17,7 @@ #define DEFAULT_SURBL_SUFFIX "multi.surbl.org" struct surbl_ctx { - int (*header_filter)(struct worker_task *task); - int (*mime_filter)(struct worker_task *task); - int (*message_filter)(struct worker_task *task); - int (*url_filter)(struct worker_task *task); + int (*filter)(struct worker_task *task); struct in_addr redirector_addr; uint16_t redirector_port; uint16_t weight; @@ -64,6 +61,7 @@ struct redirector_param { struct event ev; int sock; GTree *tree; + struct suffix_item *suffix; }; struct memcached_param { @@ -71,6 +69,7 @@ struct memcached_param { struct worker_task *task; memcached_ctx_t *ctx; GTree *tree; + struct suffix_item *suffix; }; struct surbl_bit_item { diff --git a/src/statfile.c b/src/statfile.c index 4a52008ed..048ebc485 100644 --- a/src/statfile.c +++ b/src/statfile.c @@ -163,7 +163,7 @@ statfile_pool_open (statfile_pool_t *pool, char *filename) return NULL; } - if ((new_file->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, new_file->fd, 0)) == NULL) { + if ((new_file->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, new_file->fd, 0)) == MAP_FAILED) { close (new_file->fd); msg_info ("statfile_pool_open: cannot mmap file %s, error %d, %s", filename, errno, strerror (errno)); pool->opened --; diff --git a/src/symbols_cache.c b/src/symbols_cache.c new file mode 100644 index 000000000..60cdd2644 --- /dev/null +++ b/src/symbols_cache.c @@ -0,0 +1,389 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "util.h" +#include "main.h" +#include "message.h" +#include "symbols_cache.h" +#include "view.h" +#include "cfg_file.h" + +#define WEIGHT_MULT 2.0 +#define FREQUENCY_MULT 1.0 +#define TIME_MULT -1.0 + +/* After which number of messages try to resort cache */ +#define MAX_USES 100 +/* + * Symbols cache utility functions + */ + +#define MIN_CACHE 17 + +int +cache_cmp (const void *p1, const void *p2) +{ + const struct cache_item *i1 = p1, *i2 = p2; + + return strcmp (i1->s->symbol, i2->s->symbol); +} + +int +cache_logic_cmp (const void *p1, const void *p2) +{ + const struct cache_item *i1 = p1, *i2 = p2; + double w1, w2; + + w1 = abs (i1->s->weight) * WEIGHT_MULT + + i1->s->frequency * FREQUENCY_MULT + + i1->s->avg_time * TIME_MULT; + w2 = abs (i2->s->weight) * WEIGHT_MULT + + i2->s->frequency * FREQUENCY_MULT + + i2->s->avg_time * TIME_MULT; + + return (int)w1 - w2; +} + +static void +grow_cache (struct symbols_cache *cache) +{ + guint old = cache->cur_items, i; + + cache->cur_items = cache->cur_items * 2; + cache->items = g_renew (struct cache_item, cache->items, cache->cur_items); + /* Create new saved_cache_items */ + for (i = old - 1; i < cache->cur_items; i ++) { + cache->items[i].s = g_malloc (sizeof (struct saved_cache_item)); + } +} + +static void +truncate_cache (struct symbols_cache *cache) +{ + cache->items = g_renew (struct cache_item, cache->items, cache->used_items); + cache->cur_items = cache->used_items; +} + +static GChecksum * +get_mem_cksum (struct symbols_cache *cache) +{ + int i; + GChecksum *result; + + result = g_checksum_new (G_CHECKSUM_SHA1); + + for (i = 0; i < cache->used_items; i ++) { + if (cache->items[i].s->symbol[0] != '\0') { + g_checksum_update (result, cache->items[i].s->symbol, strlen (cache->items[i].s->symbol)); + } + } + + return result; +} + +/* Sort items in logical order */ +static void +post_cache_init (struct symbols_cache *cache) +{ + qsort (cache->items, cache->used_items, sizeof (struct cache_item), cache_logic_cmp); +} + +static gboolean +mmap_cache_file (struct symbols_cache *cache, int fd) +{ + void *map; + int i; + + map = mmap (NULL, cache->used_items * sizeof (struct saved_cache_item), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (map == MAP_FAILED) { + msg_err ("mmap_cache_file: cannot mmap cache file: %d, %s", errno, strerror (errno)); + close (fd); + return FALSE; + } + /* Close descriptor as it would never be used */ + close (fd); + /* Now free old values for saved cache items and fill them with mmapped ones */ + for (i = 0; i < cache->used_items; i ++) { + g_free (cache->items[i].s); + cache->items[i].s = ((struct saved_cache_item *)map) + i; + } + + post_cache_init (cache); + return TRUE; +} + +/* Fd must be opened for writing, after creating file is mmapped */ +static gboolean +create_cache_file (struct symbols_cache *cache, const char *filename, int fd) +{ + int i, cklen; + GChecksum *cksum; + u_char *digest; + + /* Calculate checksum */ + cksum = get_mem_cksum (cache); + if (cksum == NULL) { + msg_err ("load_symbols_cache: cannot calculate checksum for symbols"); + close (fd); + return FALSE; + } + + cklen = g_checksum_type_get_length (G_CHECKSUM_SHA1); + digest = g_malloc (cklen); + + g_checksum_get_digest (cksum, digest, &cklen); + /* Now write data to file */ + for (i = 0; i < cache->used_items; i ++) { + if (write (fd, cache->items[i].s, sizeof (struct saved_cache_item)) == -1) { + msg_err ("create_cache_file: cannot write to file %d, %s", errno, strerror (errno)); + close (fd); + g_checksum_free (cksum); + g_free (digest); + return FALSE; + } + } + /* Write checksum */ + if (write (fd, digest, cklen) == -1) { + msg_err ("create_cache_file: cannot write to file %d, %s", errno, strerror (errno)); + close (fd); + g_checksum_free (cksum); + g_free (digest); + return FALSE; + } + + close (fd); + g_checksum_free (cksum); + g_free (digest); + /* Reopen for reading */ + if ((fd = open (filename, O_RDWR)) == -1) { + msg_info ("create_cache_file: cannot open file %s, error %d, %s", errno, strerror (errno)); + return FALSE; + } + + return mmap_cache_file (cache, fd); +} + +void +register_symbol (struct symbols_cache *cache, const char *name, double weight, symbol_func_t func, gpointer user_data) +{ + struct cache_item *item = NULL; + int i; + + if (cache == NULL) { + cache = g_new0 (struct symbols_cache, 1); + } + if (cache->items == NULL) { + cache->cur_items = MIN_CACHE; + cache->used_items = 0; + cache->items = g_new0 (struct cache_item, cache->cur_items); + for (i = 0; i < cache->cur_items; i ++) { + cache->items[i].s = g_malloc (sizeof (struct saved_cache_item)); + } + } + + for (i = 0; i < cache->cur_items; i ++) { + if (cache->items[i].s->symbol[0] != '\0') { + item = &cache->items[i]; + } + } + + if (item == NULL) { + grow_cache (cache); + /* Call once more */ + register_symbol (cache, name, weight, func, user_data); + return; + } + + g_strlcpy (item->s->symbol, name, sizeof (item->s->symbol)); + item->func = func; + item->user_data = user_data; + item->s->weight = weight; + cache->used_items ++; + set_counter (item->s->symbol, 0); +} + +gboolean +init_symbols_cache (memory_pool_t *pool, struct symbols_cache *cache, const char *filename) +{ + struct stat st; + int fd; + GChecksum *cksum; + u_char *mem_sum, *file_sum; + int cklen; + + if (cache == NULL || cache->items == NULL) { + return FALSE; + } + + truncate_cache (cache); + /* Sort items in cache */ + qsort (cache->items, cache->used_items, sizeof (struct cache_item), cache_cmp); + + /* Init locking */ + cache->lock = memory_pool_get_rwlock (pool); + + /* Just in-memory cache */ + if (filename == NULL) { + post_cache_init (cache); + return TRUE; + } + + /* First of all try to stat file */ + if (stat (filename, &st) == -1) { + /* Check errno */ + if (errno == ENOENT) { + /* Try to create file */ + if ((fd = open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1 ) { + msg_info ("load_symbols_cache: cannot create file %s, error %d, %s", filename, errno, strerror (errno)); + return FALSE; + } + else { + return create_cache_file (cache, filename, fd); + } + } + else { + msg_info ("load_symbols_cache: cannot stat file %s, error %d, %s", filename, errno, strerror (errno)); + return FALSE; + } + } + else { + if ((fd = open (filename, O_RDWR)) == -1) { + msg_info ("load_symbols_cache: cannot open file %s, error %d, %s", filename, errno, strerror (errno)); + return FALSE; + } + } + + /* Calculate checksum */ + cksum = get_mem_cksum (cache); + if (cksum == NULL) { + msg_err ("load_symbols_cache: cannot calculate checksum for symbols"); + close (fd); + return FALSE; + } + + cklen = g_checksum_type_get_length (G_CHECKSUM_SHA1); + mem_sum = g_malloc (cklen); + + g_checksum_get_digest (cksum, mem_sum, &cklen); + /* Now try to read file sum */ + if (lseek (fd, SEEK_END, -(cklen)) == -1) { + close (fd); + g_free (mem_sum); + g_checksum_free (cksum); + msg_err ("load_symbols_cache: cannot seek to read checksum, %d, %s", errno, strerror (errno)); + return FALSE; + } + file_sum = g_malloc (cklen); + if (read (fd, file_sum, cklen) == -1) { + close (fd); + g_free (mem_sum); + g_free (file_sum); + g_checksum_free (cksum); + msg_err ("load_symbols_cache: cannot read checksum, %d, %s", errno, strerror (errno)); + return FALSE; + } + + if (memcmp (file_sum, mem_sum, cklen) != 0) { + close (fd); + g_free (mem_sum); + g_free (file_sum); + g_checksum_free (cksum); + msg_info ("load_symbols_cache: checksum mismatch, recreating file"); + /* Reopen with rw permissions */ + if ((fd = open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1 ) { + msg_info ("load_symbols_cache: cannot create file %s, error %d, %s", filename, errno, strerror (errno)); + return FALSE; + } + else { + return create_cache_file (cache, filename, fd); + } + } + + g_free (mem_sum); + g_free (file_sum); + g_checksum_free (cksum); + /* MMap cache file and copy saved_cache structures */ + return mmap_cache_file (cache, fd); +} + +gboolean +call_symbol_callback (struct worker_task *task, struct symbols_cache *cache, struct cache_item **saved_item) +{ + struct timespec ts1, ts2; + uint64_t diff; + struct cache_item *item; + + if (*saved_item == NULL) { + if (cache == NULL) { + return FALSE; + } + if (cache->uses ++ >= MAX_USES) { + memory_pool_wlock_rwlock (cache->lock); + cache->uses = 0; + /* Resort while having write lock */ + post_cache_init (cache); + memory_pool_wunlock_rwlock (cache->lock); + } + item = &cache->items[0]; + } + else { + /* Next pointer */ + if (*saved_item - cache->items == cache->used_items) { + /* No more items in cache */ + return FALSE; + } + memory_pool_rlock_rwlock (cache->lock); + item = *saved_item + 1; + memory_pool_runlock_rwlock (cache->lock); + } + + if (check_view (task->cfg->views, item->s->symbol, task)) { +#ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID + clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &ts1); +#elif defined(HAVE_CLOCK_VIRTUAL) + clock_gettime (CLOCK_VIRTUAL, &ts1); +#else + clock_gettime (CLOCK_REALTIME, &ts1); +#endif + item->func (task, item->user_data); + +#ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID + clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &ts2); +#elif defined(HAVE_CLOCK_VIRTUAL) + clock_gettime (CLOCK_VIRTUAL, &ts2); +#else + clock_gettime (CLOCK_REALTIME, &ts2); +#endif + + diff = (ts2.tv_sec - ts1.tv_sec) * 1000000 + (ts2.tv_nsec - ts1.tv_nsec) / 1000; + item->s->avg_time = set_counter (item->s->symbol, diff); + item->s->frequency ++; + } + + *saved_item = item; + + return TRUE; + +} diff --git a/src/symbols_cache.h b/src/symbols_cache.h new file mode 100644 index 000000000..e8e0be24f --- /dev/null +++ b/src/symbols_cache.h @@ -0,0 +1,54 @@ +#ifndef RSPAMD_SYMBOLS_CACHE_H +#define RSPAMD_SYMBOLS_CACHE_H + +#include "config.h" + +#define MAX_SYMBOL 128 + +struct worker_task; + +typedef void (*symbol_func_t)(struct worker_task *task, gpointer user_data); + +struct saved_cache_item { + char symbol[MAX_SYMBOL]; + double weight; + uint32_t frequency; + double avg_time; +}; + +struct cache_item { + struct saved_cache_item *s; + symbol_func_t func; + gpointer user_data; +}; + +struct symbols_cache { + struct cache_item *items; + guint cur_items; + guint used_items; + guint uses; + memory_pool_rwlock_t *lock; +}; + +/** + * Load symbols cache from file, must be called _after_ init_symbols_cache + */ +gboolean init_symbols_cache (memory_pool_t *pool, struct symbols_cache *cache, const char *filename); + +/** + * Register function for symbols parsing + * @param name name of symbol + * @param func pointer to handler + * @param user_data pointer to user_data + */ +void register_symbol (struct symbols_cache *cache, const char *name, double weight, symbol_func_t func, gpointer user_data); + +/** + * Call function for cached symbol using saved callback + * @param task task object + * @param cache symbols cache + * @param saved_item pointer to currently saved item + */ +gboolean call_symbol_callback (struct worker_task *task, struct symbols_cache *cache, struct cache_item **saved_item); + +#endif diff --git a/src/util.c b/src/util.c index 9dab02da7..86be3c65e 100644 --- a/src/util.c +++ b/src/util.c @@ -851,7 +851,7 @@ calculate_check_time (struct timespec *begin, int resolution) return (const char *)res; } -void +double set_counter (const char *name, long int value) { struct counter_data *cd; @@ -876,6 +876,8 @@ set_counter (const char *name, long int value) memory_pool_wunlock_rwlock (counters->lock); } + + return cd->value; } typedef void (*insert_func)(gpointer st, gconstpointer key, gpointer value); diff --git a/src/util.h b/src/util.h index b657316ad..91de0a291 100644 --- a/src/util.h +++ b/src/util.h @@ -62,7 +62,7 @@ void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const char* resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *from); const char* calculate_check_time (struct timespec *begin, int resolution); -void set_counter (const char *name, long int value); +double set_counter (const char *name, long int value); gboolean parse_host_list (memory_pool_t *pool, GHashTable *tbl, const char *filename); gboolean maybe_parse_host_list (memory_pool_t *pool, GHashTable *tbl, const char *filename); -- 2.39.5