diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-06-17 19:31:48 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-06-17 19:31:48 +0400 |
commit | bca226772e9747a4587866a50122d4a8f7973b26 (patch) | |
tree | aae459617c9b3a7a82dd0b9e2a8b03be11e3ff52 | |
parent | 453ecf68e3b51941944dbc3b1dece11342be3810 (diff) | |
download | rspamd-bca226772e9747a4587866a50122d4a8f7973b26.tar.gz rspamd-bca226772e9747a4587866a50122d4a8f7973b26.zip |
* Introduce new system of workers spawning and configuring, now rspamd can be easily extended by new types of wrokers
* Rework config system and avoid from using queue (3) lists
* Upgrade version to 0.2.0 as config format is now incompatible with older one
-rw-r--r-- | CMakeLists.txt | 5 | ||||
-rw-r--r-- | rspamd.conf.sample | 40 | ||||
-rw-r--r-- | src/cfg_file.h | 51 | ||||
-rw-r--r-- | src/cfg_file.l | 24 | ||||
-rw-r--r-- | src/cfg_file.y | 293 | ||||
-rw-r--r-- | src/cfg_utils.c | 120 | ||||
-rw-r--r-- | src/controller.c | 33 | ||||
-rw-r--r-- | src/filter.c | 62 | ||||
-rw-r--r-- | src/filter.h | 1 | ||||
-rw-r--r-- | src/lmtp.c | 4 | ||||
-rw-r--r-- | src/lmtp.h | 2 | ||||
-rw-r--r-- | src/main.c | 134 | ||||
-rw-r--r-- | src/main.h | 9 | ||||
-rw-r--r-- | src/plugins/regexp.c | 36 | ||||
-rw-r--r-- | src/plugins/surbl.c | 52 | ||||
-rw-r--r-- | src/worker.c | 5 |
16 files changed, 415 insertions, 456 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a4a0b042..754b2200a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,8 +6,8 @@ PROJECT(rspamd C) SET(RSPAMD_VERSION_MAJOR 0) -SET(RSPAMD_VERSION_MINOR 1) -SET(RSPAMD_VERSION_PATCH 8) +SET(RSPAMD_VERSION_MINOR 2) +SET(RSPAMD_VERSION_PATCH 0) SET(RSPAMD_VERSION "${RSPAMD_VERSION_MAJOR}.${RSPAMD_VERSION_MINOR}.${RSPAMD_VERSION_PATCH}") SET(RSPAMD_MASTER_SITE_URL "http://cebka.pp.ru/hg/rspamd") @@ -226,7 +226,6 @@ CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN) CHECK_SYMBOL_EXISTS(MAP_SHARED sys/mman.h HAVE_MMAP_SHARED) CHECK_SYMBOL_EXISTS(MAP_ANON sys/mman.h HAVE_MMAP_ANON) CHECK_SYMBOL_EXISTS(_SC_NPROCESSORS_ONLN unistd.h HAVE_SC_NPROCESSORS_ONLN) -CHECK_SYMBOL_EXISTS(SLIST_FOREACH_SAFE sys/queue.h HAVE_COMPATIBLE_QUEUE_H) CHECK_SYMBOL_EXISTS(CLOCK_PROCESS_CPUTIME_ID time.h HAVE_CLOCK_PROCESS_CPUTIME_ID) CHECK_SYMBOL_EXISTS(CLOCK_VIRTUAL time.h HAVE_CLOCK_VIRTUAL) diff --git a/rspamd.conf.sample b/rspamd.conf.sample index 2b70cbd7c..42cf9d2a5 100644 --- a/rspamd.conf.sample +++ b/rspamd.conf.sample @@ -7,18 +7,28 @@ # Default: pidfile = /var/run/rspamd.pid pidfile = "./rspamd.pid"; -# Number of workers to process connections -# Default: 1 -workers = 1; -# Socket for accepting mail to filter, can be unix socket if begin with '/' -# (bind_socket=/var/run/rspamd.sock for example) -bind_socket = localhost:11333; +worker { + type = "normal"; + + # Number of workers to process connections + # Default: number of processors in system + count = 1; + + # Socket for accepting mail to filter, can be unix socket if begin with '/' + # (bind_socket=/var/run/rspamd.sock for example) + bind_socket = localhost:11333; +}; + # Settings for controller interface -control { +worker { + type = "controller"; + # Bind socket for control interface bind_socket = localhost:11334; + + count = 1; # Password for privilleged commands password = "q1"; }; @@ -77,29 +87,29 @@ factors { }; # Options for lmtp worker -lmtp { - enabled = yes; +worker { + type = "lmtp"; # Bind socket for lmtp interface bind_socket = localhost:11335; # Metric that is considered as main. If we have spam result on # this metric, lmtp delivery would be failed metric = "default"; # Number of lmtp workers - workers = 1; + count = 1; }; -delivery { - enabled = yes; +#worker { +# type = "delivery"; # Path to delivery agent, %f is expanded as mail from address and %r # is expanded as recipient address # Expample: agent = "/usr/local/bin/procmail -f %f -d %r" - agent = "/dev/null"; +# agent = "/dev/null"; # Bind socket for lmtp interface # Example: bind_socket = localhost:25 # Whether we should use lmtp for MTA delivery - lmtp = no; -}; +# lmtp = no; +#}; # SURBL module params, note that single quotes are mandatory here .module 'surbl' { diff --git a/src/cfg_file.h b/src/cfg_file.h index 201b46823..1a02644f7 100644 --- a/src/cfg_file.h +++ b/src/cfg_file.h @@ -97,7 +97,6 @@ struct memcached_server { */ struct perl_module { char *path; /**< path to module */ - LIST_ENTRY (perl_module) next; /**< chain link */ }; /** @@ -106,7 +105,6 @@ struct perl_module { struct module_opt { char *param; /**< parameter name */ char *value; /**< paramater value */ - LIST_ENTRY (module_opt) next; }; /** @@ -144,6 +142,21 @@ struct config_scalar { } type; /**< type of data */ }; + +/** + * Config params for rspamd worker + */ +struct worker_conf { + int type; /**< worker type */ + char *bind_host; /**< bind line */ + struct in_addr bind_addr; /**< bind address in case of TCP socket */ + uint16_t bind_port; /**< bind port in case of TCP socket */ + uint16_t bind_family; /**< bind type (AF_UNIX or AF_INET) */ + int count; /**< number of workers */ + GHashTable *params; /**< params for worker */ + int listen_sock; /**< listening socket desctiptor */ +}; + /** * Structure that stores all config data */ @@ -158,23 +171,9 @@ struct config_file { char *profile_path; #endif - char *bind_host; /**< bind line */ - struct in_addr bind_addr; /**< bind address in case of TCP socket */ - uint16_t bind_port; /**< bind port in case of TCP socket */ - uint16_t bind_family; /**< bind type (AF_UNIX or AF_INET) */ - - char *control_host; /**< bind line for controller */ - struct in_addr control_addr; /**< bind address for controller */ - uint16_t control_port; /**< bind port for controller */ - uint16_t control_family; /**< bind family for controller */ - int controller_enabled; /**< whether controller is enabled */ - char *control_password; /**< controller password */ - gboolean no_fork; /**< if 1 do not call daemon() */ gboolean config_test; /**< if TRUE do only config file test */ gboolean raw_mode; /**< work in raw mode instead of utf one */ - unsigned int workers_number; /**< number of workers */ - unsigned int lmtp_workers_number; /**< number of lmtp workers */ enum rspamd_log_type log_type; /**< log type */ int log_facility; /**< log facility in case of syslog */ @@ -192,13 +191,6 @@ struct config_file { unsigned int memcached_maxerrors; /**< maximum number of errors */ unsigned int memcached_connect_timeout; /**< connection timeout */ - gboolean lmtp_enable; /**< is lmtp agent is enabled */ - char *lmtp_host; /**< host for lmtp agent */ - struct in_addr lmtp_addr; /**< bind address for lmtp */ - uint16_t lmtp_port; /**< bind port for lmtp agent */ - uint16_t lmtp_family; /**< bind family for lmtp agent */ - char *lmtp_metric; /**< metric to use in lmtp module */ - gboolean delivery_enable; /**< is delivery agent is enabled */ char *deliver_host; /**< host for mail deliviring */ struct in_addr deliver_addr; /**< its address */ @@ -207,12 +199,13 @@ struct config_file { char *deliver_agent_path; /**< deliver to pipe instead of socket */ gboolean deliver_lmtp; /**< use LMTP instead of SMTP */ - LIST_HEAD (modulesq, perl_module) perl_modules; /**< linked list of perl modules to load */ + GList *perl_modules; /**< linked list of perl modules to load */ - LIST_HEAD (headersq, filter) header_filters; /**< linked list of all header's filters */ - LIST_HEAD (mimesq, filter) mime_filters; /**< linked list of all mime filters */ - LIST_HEAD (messagesq, filter) message_filters; /**< linked list of all message's filters */ - LIST_HEAD (urlsq, filter) url_filters; /**< linked list of all url's filters */ + GList *header_filters; /**< linked list of all header's filters */ + GList *mime_filters; /**< linked list of all mime filters */ + GList *message_filters; /**< linked list of all message's filters */ + GList *url_filters; /**< linked list of all url's filters */ + GList *workers; /**< linked list of all workers params */ char *header_filters_str; /**< string of header's filters */ char *mime_filters_str; /**< string of mime's filters */ char *message_filters_str; /**< string of message's filters */ @@ -243,7 +236,7 @@ int add_memcached_server (struct config_file *cf, char *str); * @param type type of credits * @return 1 if line was successfully parsed and 0 in case of error */ -int parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type); +int parse_bind_line (struct config_file *cfg, struct worker_conf *cf, char *str); /** * Init default values diff --git a/src/cfg_file.l b/src/cfg_file.l index 876ef9c4a..b758c7eb6 100644 --- a/src/cfg_file.l +++ b/src/cfg_file.l @@ -1,6 +1,7 @@ %x incl %x module %x lua +%x worker %{ @@ -32,10 +33,12 @@ extern struct config_file *cfg; .include BEGIN(incl); .module BEGIN(module); .lua BEGIN(lua); +worker BEGIN(worker); return WORKER; composites return COMPOSITES; tempdir return TEMPDIR; pidfile return PIDFILE; -workers return WORKERS; + + error_time return ERROR_TIME; dead_time return DEAD_TIME; maxerrors return MAXERRORS; @@ -43,7 +46,6 @@ reconnect_timeout return RECONNECT_TIMEOUT; connect_timeout return CONNECT_TIMEOUT; protocol return PROTOCOL; memcached return MEMCACHED; -bind_socket return BINDSOCK; servers return SERVERS; require return REQUIRE; header_filters return HEADER_FILTERS; @@ -161,6 +163,24 @@ yes|YES|no|NO|[yY]|[nN] yylval.flag=parse_flag(yytext); return FLAG; <module>[a-zA-Z0-9_%-]+ yylval.string=strdup(yytext); return PARAM; <module>\".+[^\\]\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; unescape_quotes(yylval.string); return QUOTEDSTRING; +<worker>\n /* ignore EOL */; +<worker>[ \t]+ /* ignore whitespace */; +<worker>[ \t]*#.* /* ignore comments */; +<worker>\{ return OBRACE; +<worker>\} BEGIN(INITIAL); return EBRACE; +<worker>\; return SEMICOLON; +<worker>= return EQSIGN; +<worker>type return TYPE; +<worker>bind_socket return BINDSOCK; +<worker>count return COUNT; +<worker>[0-9]+ yylval.number=strtol(yytext, NULL, 10); return NUMBER; +<worker>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3} yylval.string=strdup(yytext); return IPADDR; +<worker>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/[0-9]{1,2} yylval.string=strdup(yytext); return IPNETWORK; +<worker>[*a-zA-Z0-9.-]+:[0-9]{1,5} yylval.string=strdup(yytext); return HOSTPORT; +<worker>[a-zA-Z<][a-zA-Z@+>_-]* yylval.string=strdup(yytext); return STRING; +<worker>\$[a-zA-Z_][a-zA-Z0-9_]+ yylval.string=strdup(yytext + 1); return VARIABLE; +<worker>\".+[^\\]\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; unescape_quotes(yylval.string); return QUOTEDSTRING; + <lua>\n /* ignore EOL */; <lua>[ \t]+ /* ignore whitespace */; <lua>[ \t]*#.* /* ignore comments */; diff --git a/src/cfg_file.y b/src/cfg_file.y index e4e4d8dbe..0062cdeea 100644 --- a/src/cfg_file.y +++ b/src/cfg_file.y @@ -13,15 +13,17 @@ #else #include "perl.h" #endif +#define YYDEBUG 1 extern struct config_file *cfg; extern int yylineno; extern char *yytext; -LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL; +GList *cur_module_opt = NULL; struct metric *cur_metric = NULL; struct statfile *cur_statfile = NULL; struct statfile_section *cur_section = NULL; +struct worker_conf *cur_worker = NULL; %} @@ -41,14 +43,14 @@ struct statfile_section *cur_section = NULL; %token MAXSIZE SIZELIMIT SECONDS BEANSTALK MYSQL USER PASSWORD DATABASE %token TEMPDIR PIDFILE SERVERS ERROR_TIME DEAD_TIME MAXERRORS CONNECT_TIMEOUT PROTOCOL RECONNECT_TIMEOUT %token READ_SERVERS WRITE_SERVER DIRECTORY_SERVERS MAILBOX_QUERY USERS_QUERY LASTLOGIN_QUERY -%token MEMCACHED WORKERS REQUIRE MODULE +%token MEMCACHED WORKER TYPE REQUIRE MODULE %token MODULE_OPT PARAM VARIABLE %token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME %token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD %token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE %token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME %token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE TOKENIZER CLASSIFIER -%token DELIVERY LMTP ENABLED AGENT SECTION LUACODE RAW_MODE PROFILE_FILE +%token DELIVERY LMTP ENABLED AGENT SECTION LUACODE RAW_MODE PROFILE_FILE COUNT %type <string> STRING %type <string> VARIABLE @@ -71,12 +73,10 @@ file : /* empty */ ; command : - bindsock - | control | tempdir | pidfile | memcached - | workers + | worker | require | header_filters | mime_filters @@ -90,8 +90,6 @@ command : | logging | statfile | statfile_pool_size - | lmtp - | delivery | luacode | raw_mode | profile_file @@ -120,63 +118,6 @@ pidfile : } ; -control: - CONTROL OBRACE controlbody EBRACE - ; - -controlbody: - controlcmd SEMICOLON - | controlbody controlcmd SEMICOLON - ; - -controlcmd: - controlsock - | controlpassword - ; - -controlsock: - BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3, CRED_CONTROL)) { - yyerror ("yyparse: parse_bind_line"); - YYERROR; - } - cfg->controller_enabled = 1; - free ($3); - } - ; -controlpassword: - PASSWORD EQSIGN QUOTEDSTRING { - cfg->control_password = memory_pool_strdup (cfg->cfg_pool, $3); - } - ; - -bindsock: - BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3, CRED_NORMAL)) { - yyerror ("yyparse: parse_bind_line"); - YYERROR; - } - free ($3); - } - ; - -bind_cred: - STRING { - $$ = $1; - } - | IPADDR{ - $$ = $1; - } - | DOMAINNAME { - $$ = $1; - } - | HOSTPORT { - $$ = $1; - } - | QUOTEDSTRING { - $$ = $1; - } - ; header_filters: HEADER_FILTERS EQSIGN QUOTEDSTRING { @@ -283,12 +224,137 @@ memcached_protocol: } } ; -workers: - WORKERS EQSIGN NUMBER { - cfg->workers_number = $3; + +/* Workers section */ +worker: + WORKER OBRACE workerbody EBRACE { + cfg->workers = g_list_prepend (cfg->workers, cur_worker); + cur_worker = NULL; + } + ; + +workerbody: + workercmd SEMICOLON + | workerbody workercmd SEMICOLON + ; + +workercmd: + | bindsock + | workertype + | workercount + | workerparam + ; + +bindsock: + BINDSOCK EQSIGN bind_cred { + if (cur_worker == NULL) { + cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); + cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); +#ifdef HAVE_SC_NPROCESSORS_ONLN + cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); +#else + cur_worker->count = DEFAULT_WORKERS_NUM; +#endif + } + + if (!parse_bind_line (cfg, cur_worker, $3)) { + yyerror ("yyparse: parse_bind_line"); + YYERROR; + } + free ($3); + } + ; + +bind_cred: + STRING { + $$ = $1; + } + | IPADDR{ + $$ = $1; + } + | DOMAINNAME { + $$ = $1; + } + | HOSTPORT { + $$ = $1; + } + | QUOTEDSTRING { + $$ = $1; + } + ; + +workertype: + TYPE EQSIGN QUOTEDSTRING { + if (cur_worker == NULL) { + cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); + cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); +#ifdef HAVE_SC_NPROCESSORS_ONLN + cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); +#else + cur_worker->count = DEFAULT_WORKERS_NUM; +#endif + } + + if (g_ascii_strcasecmp ($3, "normal") == 0) { + cur_worker->type = TYPE_WORKER; + } + else if (g_ascii_strcasecmp ($3, "controller") == 0) { + cur_worker->type = TYPE_CONTROLLER; + } + else if (g_ascii_strcasecmp ($3, "lmtp") == 0) { + cur_worker->type = TYPE_LMTP; + } + else if (g_ascii_strcasecmp ($3, "fuzzy") == 0) { + cur_worker->type = TYPE_FUZZY; + } + else { + yyerror ("yyparse: unknown worker type: %s", $3); + YYERROR; + } + } + ; + +workercount: + COUNT EQSIGN NUMBER { + if (cur_worker == NULL) { + cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); + cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); +#ifdef HAVE_SC_NPROCESSORS_ONLN + cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); +#else + cur_worker->count = DEFAULT_WORKERS_NUM; +#endif + } + + if ($3 > 0) { + cur_worker->count = $3; + } + else { + yyerror ("yyparse: invalid number of workers: %d", $3); + YYERROR; + } } ; +workerparam: + STRING EQSIGN QUOTEDSTRING { + if (cur_worker == NULL) { + cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); + cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); +#ifdef HAVE_SC_NPROCESSORS_ONLN + cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); +#else + cur_worker->count = DEFAULT_WORKERS_NUM; +#endif + } + + g_hash_table_insert (cur_worker->params, $1, $3); + } + metric: METRIC OBRACE metricbody EBRACE { if (cur_metric == NULL || cur_metric->name == NULL) { @@ -411,7 +477,7 @@ requirecmd: YYERROR; } cur->path = $3; - LIST_INSERT_HEAD (&cfg->perl_modules, cur, next); + cfg->perl_modules = g_list_prepend (cfg->perl_modules, cur); #else yyerror ("require command is not available when perl support is not compiled"); YYERROR; @@ -453,14 +519,10 @@ moduleoptbody: optcmd: PARAM EQSIGN QUOTEDSTRING { struct module_opt *mopt; - if (cur_module_opt == NULL) { - cur_module_opt = g_malloc (sizeof (cur_module_opt)); - LIST_INIT (cur_module_opt); - } mopt = memory_pool_alloc (cfg->cfg_pool, sizeof (struct module_opt)); mopt->param = $1; mopt->value = $3; - LIST_INSERT_HEAD (cur_module_opt, mopt, next); + cur_module_opt = g_list_prepend (cur_module_opt, mopt); } | VARIABLE EQSIGN QUOTEDSTRING { g_hash_table_insert (cfg->variables, $1, $3); @@ -749,89 +811,6 @@ statfile_pool_size: } ; -lmtp: - LMTP OBRACE lmtpbody EBRACE - ; - -lmtpbody: - lmtpcmd SEMICOLON - | lmtpbody lmtpcmd SEMICOLON - ; - -lmtpcmd: - lmtpenabled - | lmtpsock - | lmtpmetric - | lmtpworkers - ; - -lmtpenabled: - ENABLED EQSIGN FLAG { - cfg->lmtp_enable = $3; - } - ; - -lmtpsock: - BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3, CRED_LMTP)) { - yyerror ("yyparse: parse_bind_line"); - YYERROR; - } - free ($3); - } - ; -lmtpmetric: - METRIC EQSIGN QUOTEDSTRING { - cfg->lmtp_metric = memory_pool_strdup (cfg->cfg_pool, $3); - } - ; -lmtpworkers: - WORKERS EQSIGN NUMBER { - cfg->lmtp_workers_number = $3; - } - ; - -delivery: - DELIVERY OBRACE deliverybody EBRACE - ; - -deliverybody: - deliverycmd SEMICOLON - | deliverybody deliverycmd SEMICOLON - ; - -deliverycmd: - deliveryenabled - | deliverysock - | deliveryagent - | deliverylmtp - ; - -deliveryenabled: - ENABLED EQSIGN FLAG { - cfg->delivery_enable = $3; - } - ; - -deliverysock: - BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3, CRED_DELIVERY)) { - yyerror ("yyparse: parse_bind_line"); - YYERROR; - } - free ($3); - } - ; -deliverylmtp: - LMTP EQSIGN FLAG { - cfg->deliver_lmtp = $3; - } - ; -deliveryagent: - AGENT EQSIGN QUOTEDSTRING { - cfg->deliver_agent_path = memory_pool_strdup (cfg->cfg_pool, $3); - } - ; luacode: LUACODE diff --git a/src/cfg_utils.c b/src/cfg_utils.c index 79fabdb50..afb15a652 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -83,7 +83,7 @@ add_memcached_server (struct config_file *cf, char *str) } int -parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type) +parse_bind_line (struct config_file *cfg, struct worker_conf *cf, char *str) { char *cur_tok, *err_str; struct hostent *hent; @@ -95,43 +95,18 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type) if (str == NULL) return 0; cur_tok = strsep (&str, ":"); - switch (type) { - case CRED_NORMAL: - host = &cf->bind_host; - port = &cf->bind_port; - *port = DEFAULT_BIND_PORT; - family = &cf->bind_family; - addr = &cf->bind_addr; - break; - case CRED_CONTROL: - host = &cf->control_host; - port = &cf->control_port; - *port = DEFAULT_CONTROL_PORT; - family = &cf->control_family; - addr = &cf->control_addr; - break; - case CRED_LMTP: - host = &cf->lmtp_host; - port = &cf->lmtp_port; - *port = DEFAULT_LMTP_PORT; - family = &cf->lmtp_family; - addr = &cf->lmtp_addr; - break; - case CRED_DELIVERY: - host = &cf->deliver_host; - port = &cf->deliver_port; - *port = 25; - family = &cf->deliver_family; - addr = &cf->deliver_addr; - break; - } + host = &cf->bind_host; + port = &cf->bind_port; + *port = DEFAULT_BIND_PORT; + family = &cf->bind_family; + addr = &cf->bind_addr; if (cur_tok[0] == '/' || cur_tok[0] == '.') { #ifdef HAVE_DIRNAME /* Try to check path of bind credit */ struct stat st; int fd; - char *copy = memory_pool_strdup (cf->cfg_pool, cur_tok); + char *copy = memory_pool_strdup (cfg->cfg_pool, cur_tok); if (stat (copy, &st) == -1) { if (errno == ENOENT) { if ((fd = open (cur_tok, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { @@ -155,7 +130,7 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type) } } #endif - *host = memory_pool_strdup (cf->cfg_pool, cur_tok); + *host = memory_pool_strdup (cfg->cfg_pool, cur_tok); *family = AF_UNIX; return 1; @@ -168,7 +143,7 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type) } } if (strcmp (cur_tok, "*") == 0) { - *host = memory_pool_strdup (cf->cfg_pool, cur_tok); + *host = memory_pool_strdup (cfg->cfg_pool, cur_tok); addr->s_addr = htonl (INADDR_ANY); } else if (!inet_aton (cur_tok, addr)) { /* Try to call gethostbyname */ @@ -177,13 +152,13 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type) return 0; } else { - *host = memory_pool_strdup (cf->cfg_pool, cur_tok); + *host = memory_pool_strdup (cfg->cfg_pool, cur_tok); memcpy((char *)addr, hent->h_addr, sizeof(struct in_addr)); s = strlen (cur_tok) + 1; } } else { - *host = memory_pool_strdup (cf->cfg_pool, cur_tok); + *host = memory_pool_strdup (cfg->cfg_pool, cur_tok); } *family = AF_INET; @@ -203,11 +178,7 @@ init_defaults (struct config_file *cfg) cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS; cfg->memcached_protocol = TCP_TEXT; -#ifdef HAVE_SC_NPROCESSORS_ONLN - cfg->workers_number = sysconf (_SC_NPROCESSORS_ONLN); -#else - cfg->workers_number = DEFAULT_WORKERS_NUM; -#endif + cfg->max_statfile_size = DEFAULT_STATFILE_SIZE; cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal); cfg->variables = g_hash_table_new (g_str_hash, g_str_equal); @@ -217,7 +188,6 @@ init_defaults (struct config_file *cfg) cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal); cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal); cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal); - cfg->lmtp_metric = "default"; def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric)); def_metric->name = "default"; @@ -227,7 +197,6 @@ init_defaults (struct config_file *cfg) def_metric->classifier = get_classifier ("winnow"); g_hash_table_insert (cfg->metrics, "default", def_metric); - LIST_INIT (&cfg->perl_modules); } void @@ -255,18 +224,20 @@ free_config (struct config_file *cfg) char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name) { - LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL; + GList *cur_opt; struct module_opt *cur; - cur_module_opt = g_hash_table_lookup (cfg->modules_opts, module_name); - if (cur_module_opt == NULL) { + cur_opt = g_hash_table_lookup (cfg->modules_opts, module_name); + if (cur_opt == NULL) { return NULL; } - - LIST_FOREACH (cur, cur_module_opt, next) { + + while (cur_opt) { + cur = cur_opt->data; if (strcmp (cur->param, opt_name) == 0) { return cur->value; } + cur_opt = g_list_next (cur_opt); } return NULL; @@ -408,13 +379,15 @@ static void substitute_module_variables (gpointer key, gpointer value, gpointer data) { struct config_file *cfg = (struct config_file *)data; - LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value; - struct module_opt *cur, *tmp; + GList *cur_opt = (GList *)value; + struct module_opt *cur; - LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) { + while (cur_opt) { + cur = cur_opt->data; if (cur->value) { cur->value = substitute_variable (cfg, NULL, cur->value, 1); } + cur_opt = g_list_next (cur_opt); } } @@ -455,19 +428,19 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty switch (type) { case SCRIPT_HEADER: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->header_filters, cur, next); + cfg->header_filters = g_list_prepend (cfg->header_filters, cur); break; case SCRIPT_MIME: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->mime_filters, cur, next); + cfg->mime_filters = g_list_prepend (cfg->mime_filters, cur); break; case SCRIPT_MESSAGE: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->message_filters, cur, next); + cfg->message_filters = g_list_prepend (cfg->message_filters, cur); break; case SCRIPT_URL: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->url_filters, cur, next); + cfg->url_filters = g_list_prepend (cfg->url_filters, cur); break; } break; @@ -483,19 +456,19 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty switch (type) { case SCRIPT_HEADER: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->header_filters, cur, next); + cfg->header_filters = g_list_prepend (cfg->header_filters, cur); break; case SCRIPT_MIME: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->mime_filters, cur, next); + cfg->mime_filters = g_list_prepend (cfg->mime_filters, cur); break; case SCRIPT_MESSAGE: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->message_filters, cur, next); + cfg->message_filters = g_list_prepend (cfg->message_filters, cur); break; case SCRIPT_URL: cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); - LIST_INSERT_HEAD (&cfg->url_filters, cur, next); + cfg->url_filters = g_list_prepend (cfg->message_filters, cur); break; } p ++; @@ -523,27 +496,9 @@ fill_cfg_params (struct config_file *cfg) scalars[2].type = SCALAR_TYPE_STR; scalars[2].pointer = &cfg->temp_dir; g_hash_table_insert (cfg->cfg_params, "temp_dir", &scalars[2]); - scalars[3].type = SCALAR_TYPE_STR; - scalars[3].pointer = &cfg->bind_host; - g_hash_table_insert (cfg->cfg_params, "bind_host", &scalars[3]); - scalars[4].type = SCALAR_TYPE_STR; - scalars[4].pointer = &cfg->control_host; - g_hash_table_insert (cfg->cfg_params, "control_host", &scalars[4]); - scalars[5].type = SCALAR_TYPE_INT; - scalars[5].pointer = &cfg->controller_enabled; - g_hash_table_insert (cfg->cfg_params, "controller_enabled", &scalars[5]); - scalars[6].type = SCALAR_TYPE_STR; - scalars[6].pointer = &cfg->control_password; - g_hash_table_insert (cfg->cfg_params, "control_password", &scalars[6]); - scalars[7].type = SCALAR_TYPE_INT; - scalars[7].pointer = &cfg->no_fork; - g_hash_table_insert (cfg->cfg_params, "no_fork", &scalars[7]); - scalars[8].type = SCALAR_TYPE_UINT; - scalars[8].pointer = &cfg->workers_number; - g_hash_table_insert (cfg->cfg_params, "workers_number", &scalars[8]); - scalars[9].type = SCALAR_TYPE_SIZE; - scalars[9].pointer = &cfg->max_statfile_size; - g_hash_table_insert (cfg->cfg_params, "max_statfile_size", &scalars[9]); + scalars[3].type = SCALAR_TYPE_SIZE; + scalars[3].pointer = &cfg->max_statfile_size; + g_hash_table_insert (cfg->cfg_params, "max_statfile_size", &scalars[3]); } @@ -555,11 +510,6 @@ post_load_config (struct config_file *cfg) { struct timespec ts; - if (cfg->lmtp_enable && !cfg->delivery_enable) { - yywarn ("post_load_config: lmtp is enabled, but delivery is not enabled, disabling lmtp"); - cfg->lmtp_enable = FALSE; - } - g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg); g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg); parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER); diff --git a/src/controller.c b/src/controller.c index f20b19ee7..e45075f9e 100644 --- a/src/controller.c +++ b/src/controller.c @@ -174,6 +174,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control struct statfile *statfile; struct metric *metric; memory_pool_stat_t mem_st; + char *password = g_hash_table_lookup (session->worker->cf->params, "password"); switch (cmd->type) { case COMMAND_PASSWORD: @@ -184,7 +185,12 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); return; } - if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) { + if (password == NULL) { + r = snprintf (out_buf, sizeof (out_buf), "password command disabled in config, authorized access unallowed" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); + return; + } + if (strncmp (arg, password, strlen (arg)) == 0) { session->authorized = 1; r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE); @@ -542,14 +548,12 @@ void start_controller (struct rspamd_worker *worker) { struct sigaction signals; - int listen_sock, i; - struct sockaddr_un *un_addr; + int i; GList *comp_list = NULL; char *hostbuf; long int hostmax; worker->srv->pid = getpid (); - worker->srv->type = TYPE_CONTROLLER; event_init (); g_mime_init (0); @@ -560,27 +564,9 @@ start_controller (struct rspamd_worker *worker) signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); signal_add (&worker->sig_ev, NULL); - if (worker->srv->cfg->control_family == AF_INET) { - if ((listen_sock = make_tcp_socket (&worker->srv->cfg->control_addr, worker->srv->cfg->control_port, TRUE)) == -1) { - msg_err ("start_controller: cannot create tcp listen socket. %s", strerror (errno)); - exit(-errno); - } - } - else { - un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un)); - if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->control_host, un_addr, TRUE)) == -1) { - msg_err ("start_controller: cannot create unix listen socket. %s", strerror (errno)); - exit(-errno); - } - } start_time = time (NULL); - if (listen (listen_sock, -1) == -1) { - msg_err ("start_controller: cannot listen on socket. %s", strerror (errno)); - exit(-errno); - } - /* Init command completion */ for (i = 0; i < G_N_ELEMENTS (commands); i ++) { comp_list = g_list_prepend (comp_list, &commands[i]); @@ -594,7 +580,7 @@ start_controller (struct rspamd_worker *worker) hostbuf[hostmax - 1] = '\0'; snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf); /* Accept event */ - event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_add(&worker->bind_ev, NULL); /* Send SIGUSR2 to parent */ @@ -604,7 +590,6 @@ start_controller (struct rspamd_worker *worker) io_tv.tv_usec = 0; event_loop (0); - close (listen_sock); exit (EXIT_SUCCESS); } diff --git a/src/filter.c b/src/filter.c index d06987b97..b34a88003 100644 --- a/src/filter.c +++ b/src/filter.c @@ -232,56 +232,61 @@ metric_process_callback (gpointer key, gpointer value, void *data) static int continue_process_filters (struct worker_task *task) { - struct filter *cur = task->save.entry; + GList *cur = task->save.entry; + struct filter *filt = cur->data; - cur = LIST_NEXT (cur, next); + cur = g_list_next (cur); /* Note: no breaks in this case! */ switch (task->save.type) { case SCRIPT_HEADER: while (cur) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER); + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_HEADER; return 0; } - cur = LIST_NEXT (cur, next); + cur = g_list_next (cur); } /* Process mime filters */ - cur = LIST_FIRST (&task->worker->srv->cfg->mime_filters); + cur = g_list_first (task->worker->srv->cfg->mime_filters); case SCRIPT_MIME: while (cur) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME); + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MIME); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_MIME; return 0; } - cur = LIST_NEXT (cur, next); + cur = g_list_next (cur); } /* Process url filters */ - cur = LIST_FIRST (&task->worker->srv->cfg->url_filters); + cur = g_list_first (task->worker->srv->cfg->url_filters); case SCRIPT_URL: while (cur) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL); + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_URL); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_URL; return 0; } - cur = LIST_NEXT (cur, next); + cur = g_list_next (cur); } /* Process message filters */ - cur = LIST_FIRST (&task->worker->srv->cfg->message_filters); + cur = g_list_first (task->worker->srv->cfg->message_filters); case SCRIPT_MESSAGE: while (cur) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE); + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MESSAGE); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_MESSAGE; return 0; } - cur = LIST_NEXT (cur, next); + cur = g_list_next (cur); } /* Process all statfiles */ process_statfiles (task); @@ -296,7 +301,8 @@ continue_process_filters (struct worker_task *task) int process_filters (struct worker_task *task) { - struct filter *cur; + GList *cur; + struct filter *filt; if (task->save.saved) { task->save.saved = 0; @@ -304,40 +310,52 @@ process_filters (struct worker_task *task) } /* Process filters in order that they are listed in config file */ - LIST_FOREACH (cur, &task->worker->srv->cfg->header_filters, next) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER); + cur = task->worker->srv->cfg->header_filters; + while (cur) { + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_HEADER; return 0; } + cur = g_list_next (cur); } - LIST_FOREACH (cur, &task->worker->srv->cfg->mime_filters, next) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME); + cur = task->worker->srv->cfg->mime_filters; + while (cur) { + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MIME); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_MIME; return 0; } + cur = g_list_next (cur); } - LIST_FOREACH (cur, &task->worker->srv->cfg->url_filters, next) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL); + cur = task->worker->srv->cfg->url_filters; + while (cur) { + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_URL); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_URL; return 0; } + cur = g_list_next (cur); } - LIST_FOREACH (cur, &task->worker->srv->cfg->message_filters, next) { - call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE); + cur = task->worker->srv->cfg->message_filters; + while (cur) { + filt = cur->data; + call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MESSAGE); if (task->save.saved) { task->save.entry = cur; task->save.type = SCRIPT_MESSAGE; return 0; } + cur = g_list_next (cur); } /* Process all metrics */ diff --git a/src/filter.h b/src/filter.h index 4e51bf2a7..c460ec317 100644 --- a/src/filter.h +++ b/src/filter.h @@ -21,7 +21,6 @@ enum filter_type { C_FILTER, PERL_FILTER }; struct filter { char *func_name; /**< function name */ enum filter_type type; /**< filter type (c or perl) */ - LIST_ENTRY (filter) next; /**< chain link */ }; /** diff --git a/src/lmtp.c b/src/lmtp.c index 5f2878a08..431083657 100644 --- a/src/lmtp.c +++ b/src/lmtp.c @@ -253,7 +253,7 @@ accept_socket (int fd, short what, void *arg) * Start lmtp worker process */ void -start_lmtp_worker (struct rspamd_worker *worker, int listen_sock) +start_lmtp_worker (struct rspamd_worker *worker) { struct sigaction signals; int i; @@ -273,7 +273,7 @@ start_lmtp_worker (struct rspamd_worker *worker, int listen_sock) signal_add (&worker->sig_ev, NULL); /* Accept event */ - event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_add(&worker->bind_ev, NULL); /* Perform modules configuring */ diff --git a/src/lmtp.h b/src/lmtp.h index b784eed3b..d7c13c497 100644 --- a/src/lmtp.h +++ b/src/lmtp.h @@ -15,6 +15,6 @@ #define LMTP_NO_RCPT 554 #define LMTP_TEMP_FAIL 421 -void start_lmtp_worker (struct rspamd_worker *worker, int listen_sock); +void start_lmtp_worker (struct rspamd_worker *worker); #endif diff --git a/src/main.c b/src/main.c index 9b8992d8b..dc0226680 100644 --- a/src/main.c +++ b/src/main.c @@ -51,7 +51,7 @@ struct config_file *cfg; rspamd_hash_t *counters; static void sig_handler (int ); -static struct rspamd_worker * fork_worker (struct rspamd_main *, int, enum process_type); +static struct rspamd_worker * fork_worker (struct rspamd_main *, struct worker_conf *); sig_atomic_t do_restart; sig_atomic_t do_terminate; @@ -269,7 +269,7 @@ reread_config (struct rspamd_main *rspamd) } static struct rspamd_worker * -fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type) +fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) { struct rspamd_worker *cur; /* Starting worker process */ @@ -278,12 +278,14 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type bzero (cur, sizeof (struct rspamd_worker)); TAILQ_INSERT_HEAD (&rspamd->workers, cur, next); cur->srv = rspamd; - cur->type = type; + cur->type = cf->type; cur->pid = fork(); + cur->cf = cf; switch (cur->pid) { case 0: - /* TODO: add worker code */ - switch (type) { + /* Drop privilleges */ + drop_priv (cfg); + switch (cf->type) { case TYPE_CONTROLLER: setproctitle ("controller process"); pidfile_close (rspamd->pfh); @@ -294,13 +296,13 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type setproctitle ("lmtp process"); pidfile_close (rspamd->pfh); msg_info ("fork_worker: starting lmtp process %d", getpid ()); - start_lmtp_worker (cur, listen_sock); + start_lmtp_worker (cur); case TYPE_WORKER: default: setproctitle ("worker process"); pidfile_close (rspamd->pfh); msg_info ("fork_worker: starting worker process %d", getpid ()); - start_worker (cur, listen_sock); + start_worker (cur); break; } break; @@ -316,35 +318,27 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type } static void -delay_fork (enum process_type type) +delay_fork (struct worker_conf *cf) { - workers_pending = g_list_prepend (workers_pending, GINT_TO_POINTER (type)); + workers_pending = g_list_prepend (workers_pending, cf); (void)alarm (SOFT_FORK_TIME); } -static void -fork_delayed (struct rspamd_main *rspamd, int listen_sock) -{ - GList *cur; - - while (workers_pending != NULL) { - cur = workers_pending; - workers_pending = g_list_remove_link (workers_pending, cur); - fork_worker (rspamd, listen_sock, GPOINTER_TO_INT (cur->data)); - g_list_free_1 (cur); - } -} static void dump_module_variables (gpointer key, gpointer value, gpointer data) -{ - LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value; - struct module_opt *cur, *tmp; +{ + GList *cur_opt; + struct module_opt *cur; + + cur_opt = (GList *)value; - LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) { + while (cur_opt) { + cur = cur_opt->data; if (cur->value) { printf ("$%s = \"%s\"\n", cur->param, cur->value); } + cur_opt = g_list_next (cur_opt); } } @@ -388,12 +382,56 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path) return listen_sock; } +static void +fork_delayed (struct rspamd_main *rspamd) +{ + GList *cur; + struct worker_conf *cf; + + while (workers_pending != NULL) { + cur = workers_pending; + cf = cur->data; + + workers_pending = g_list_remove_link (workers_pending, cur); + fork_worker (rspamd, cf); + g_list_free_1 (cur); + } +} + +static void +spawn_workers (struct rspamd_main *rspamd) +{ + GList *cur; + struct worker_conf *cf; + int i, listen_sock; + + cur = cfg->workers; + + while (cur) { + cf = cur->data; + + /* Create listen socket */ + listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, + cf->bind_family, cf->bind_host); + if (listen_sock == -1) { + exit(-errno); + } + cf->listen_sock = listen_sock; + + for (i = 0; i < cf->count; i++) { + fork_worker (rspamd, cf); + } + + cur = g_list_next (cur); + } +} + int main (int argc, char **argv, char **env) { struct rspamd_main *rspamd; struct module_ctx *cur_module = NULL; - int res = 0, i, listen_sock, lmtp_listen_sock; + int res = 0, i; struct sigaction signals; struct rspamd_worker *cur, *cur_tmp, *active_worker; struct rlimit rlim; @@ -498,23 +536,6 @@ main (int argc, char **argv, char **env) } - /* Create listen socket */ - listen_sock = create_listen_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port, - rspamd->cfg->bind_family, rspamd->cfg->bind_host); - if (listen_sock == -1) { - exit(-errno); - } - - if (cfg->lmtp_enable) { - lmtp_listen_sock = create_listen_socket (&rspamd->cfg->lmtp_addr, rspamd->cfg->lmtp_port, - rspamd->cfg->lmtp_family, rspamd->cfg->lmtp_host); - if (listen_sock == -1) { - exit(-errno); - } - } - - /* Drop privilleges */ - drop_priv (cfg); /* Set stack size for pcre */ getrlimit(RLIMIT_STACK, &rlim); @@ -589,20 +610,7 @@ main (int argc, char **argv, char **env) modules[i].module_config_func (cfg); } - for (i = 0; i < cfg->workers_number; i++) { - fork_worker (rspamd, listen_sock, TYPE_WORKER); - } - /* Start controller if enabled */ - if (cfg->controller_enabled) { - fork_worker (rspamd, listen_sock, TYPE_CONTROLLER); - } - - /* Start lmtp if enabled */ - if (cfg->lmtp_enable) { - for (i = 0; i < cfg->lmtp_workers_number; i++) { - fork_worker (rspamd, lmtp_listen_sock, TYPE_LMTP); - } - } + spawn_workers (rspamd); /* Signal processing cycle */ for (;;) { @@ -632,7 +640,7 @@ main (int argc, char **argv, char **env) (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); /* But respawn controller */ if (cur->type == TYPE_CONTROLLER) { - fork_worker (rspamd, listen_sock, TYPE_CONTROLLER); + fork_worker (rspamd, cur->cf); } } else { @@ -646,7 +654,7 @@ main (int argc, char **argv, char **env) (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); } /* Fork another worker in replace of dead one */ - delay_fork (cur->type); + delay_fork (cur->cf); } g_free (cur); } @@ -662,7 +670,7 @@ main (int argc, char **argv, char **env) TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) { /* Start new workers that would reread configuration */ - active_worker = fork_worker (rspamd, listen_sock, cur->type); + active_worker = fork_worker (rspamd, cur->cf); } /* Immideately send termination request to conroller and wait for SIGCHLD */ if (cur->type == TYPE_CONTROLLER) { @@ -690,7 +698,7 @@ main (int argc, char **argv, char **env) } if (got_alarm) { got_alarm = 0; - fork_delayed (rspamd, listen_sock); + fork_delayed (rspamd); } } @@ -705,10 +713,6 @@ main (int argc, char **argv, char **env) msg_info ("main: terminating..."); - if (rspamd->cfg->bind_family == AF_UNIX) { - unlink (rspamd->cfg->bind_host); - } - free_config (rspamd->cfg); g_free (rspamd->cfg); g_free (rspamd); diff --git a/src/main.h b/src/main.h index c356823b8..c653964b7 100644 --- a/src/main.h +++ b/src/main.h @@ -38,6 +38,7 @@ #endif #define CRLF "\r\n" + /** * Process type: main or worker */ @@ -46,6 +47,7 @@ enum process_type { TYPE_WORKER, TYPE_CONTROLLER, TYPE_LMTP, + TYPE_FUZZY, }; /** @@ -70,6 +72,7 @@ struct rspamd_worker { enum process_type type; /**< process type */ struct event sig_ev; /**< signals event */ struct event bind_ev; /**< socket events */ + struct worker_conf *cf; /**< worker config data */ TAILQ_ENTRY (rspamd_worker) next; /**< chain link to next worker */ }; @@ -118,8 +121,8 @@ struct counter_data { * Save point object for delayed filters processing */ struct save_point { - void *entry; /**< pointer to C function or perl function name */ - enum script_type type; /**< where we did stop */ + GList *entry; /**< pointer to saved filter */ + enum script_type type; unsigned int saved; /**< how much time we have delayed processing */ }; @@ -216,7 +219,7 @@ struct c_module { LIST_ENTRY (c_module) next; /**< linked list */ }; -void start_worker (struct rspamd_worker *worker, int listen_sock); +void start_worker (struct rspamd_worker *worker); void start_controller (struct rspamd_worker *worker); /** diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 7095c3041..5a1a9cd27 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -151,7 +151,7 @@ parse_autolearn_param (const char *param, const char *value, struct config_file int regexp_module_config (struct config_file *cfg) { - LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL; + GList *cur_opt = NULL; struct module_opt *cur; struct regexp_module_item *cur_item; char *value; @@ -172,24 +172,24 @@ regexp_module_config (struct config_file *cfg) regexp_module_ctx->statfile_prefix = DEFAULT_STATFILE_PREFIX; } - cur_module_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); - if (cur_module_opt != NULL) { - LIST_FOREACH (cur, cur_module_opt, next) { - if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) { - continue; - } - else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) { - parse_autolearn_param (cur->param, cur->value, cfg); - continue; - } - cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item)); - cur_item->symbol = cur->param; - if (!read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->param, cur->value, cfg)) { - res = FALSE; - } - set_counter (cur_item->symbol, 0); - regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item); + cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); + while (cur_opt) { + cur = cur_opt->data; + if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) { + continue; + } + else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) { + parse_autolearn_param (cur->param, cur->value, cfg); + continue; + } + cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item)); + cur_item->symbol = cur->param; + if (!read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->param, cur->value, cfg)) { + res = FALSE; } + set_counter (cur_item->symbol, 0); + regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item); + cur_opt = g_list_next (cur_opt); } return res; diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 020bf7764..81acb1f7b 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -82,7 +82,7 @@ int surbl_module_config (struct config_file *cfg) { struct hostent *hent; - LIST_HEAD (moduleoptq, module_opt) *opt = NULL; + GList *cur_opt; struct module_opt *cur; struct suffix_item *new_suffix; struct surbl_bit_item *new_bit; @@ -160,35 +160,35 @@ surbl_module_config (struct config_file *cfg) } } - opt = g_hash_table_lookup (cfg->modules_opts, "surbl"); - if (opt) { - LIST_FOREACH (cur, opt, next) { - if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) { - if ((str = strchr (cur->param, '_')) != NULL) { - new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item)); - *str = '\0'; - new_suffix->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, str + 1); - new_suffix->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value); - msg_debug ("surbl_module_config: add new surbl suffix: %s with symbol: %s", - new_suffix->suffix, new_suffix->symbol); - *str = '_'; - surbl_module_ctx->suffixes = g_list_prepend (surbl_module_ctx->suffixes, new_suffix); - } + cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl"); + while (cur_opt) { + cur = cur_opt->data; + if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) { + if ((str = strchr (cur->param, '_')) != NULL) { + new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item)); + *str = '\0'; + new_suffix->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, str + 1); + new_suffix->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value); + msg_debug ("surbl_module_config: add new surbl suffix: %s with symbol: %s", + new_suffix->suffix, new_suffix->symbol); + *str = '_'; + surbl_module_ctx->suffixes = g_list_prepend (surbl_module_ctx->suffixes, new_suffix); } - if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { - if ((str = strchr (cur->param, '_')) != NULL) { - bit = strtoul (str + 1, NULL, 10); - if (bit != 0) { - new_bit = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct surbl_bit_item)); - new_bit->bit = bit; - new_bit->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value); - msg_debug ("surbl_module_config: add new bit suffix: %d with symbol: %s", - (int)new_bit->bit, new_bit->symbol); - surbl_module_ctx->bits = g_list_prepend (surbl_module_ctx->bits, new_bit); - } + } + if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { + if ((str = strchr (cur->param, '_')) != NULL) { + bit = strtoul (str + 1, NULL, 10); + if (bit != 0) { + new_bit = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct surbl_bit_item)); + new_bit->bit = bit; + new_bit->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value); + msg_debug ("surbl_module_config: add new bit suffix: %d with symbol: %s", + (int)new_bit->bit, new_bit->symbol); + surbl_module_ctx->bits = g_list_prepend (surbl_module_ctx->bits, new_bit); } } } + cur_opt = g_list_next (cur_opt); } /* Add default suffix */ if (surbl_module_ctx->suffixes == NULL) { diff --git a/src/worker.c b/src/worker.c index 2da9383e9..8a9dd9d3a 100644 --- a/src/worker.c +++ b/src/worker.c @@ -316,7 +316,7 @@ accept_socket (int fd, short what, void *arg) * Start worker process */ void -start_worker (struct rspamd_worker *worker, int listen_sock) +start_worker (struct rspamd_worker *worker) { struct sigaction signals; @@ -350,7 +350,6 @@ start_worker (struct rspamd_worker *worker, int listen_sock) #endif worker->srv->pid = getpid (); - worker->srv->type = TYPE_WORKER; event_init (); evdns_init (); @@ -363,7 +362,7 @@ start_worker (struct rspamd_worker *worker, int listen_sock) signal_add (&worker->sig_ev, NULL); /* Accept event */ - event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_add(&worker->bind_ev, NULL); /* Send SIGUSR2 to parent */ |