aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-06-17 19:31:48 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-06-17 19:31:48 +0400
commitbca226772e9747a4587866a50122d4a8f7973b26 (patch)
treeaae459617c9b3a7a82dd0b9e2a8b03be11e3ff52
parent453ecf68e3b51941944dbc3b1dece11342be3810 (diff)
downloadrspamd-bca226772e9747a4587866a50122d4a8f7973b26.tar.gz
rspamd-bca226772e9747a4587866a50122d4a8f7973b26.zip
* Introduce new system of workers spawning and configuring, now rspamd can be easily extended by new types of wrokers
* Rework config system and avoid from using queue (3) lists * Upgrade version to 0.2.0 as config format is now incompatible with older one
-rw-r--r--CMakeLists.txt5
-rw-r--r--rspamd.conf.sample40
-rw-r--r--src/cfg_file.h51
-rw-r--r--src/cfg_file.l24
-rw-r--r--src/cfg_file.y293
-rw-r--r--src/cfg_utils.c120
-rw-r--r--src/controller.c33
-rw-r--r--src/filter.c62
-rw-r--r--src/filter.h1
-rw-r--r--src/lmtp.c4
-rw-r--r--src/lmtp.h2
-rw-r--r--src/main.c134
-rw-r--r--src/main.h9
-rw-r--r--src/plugins/regexp.c36
-rw-r--r--src/plugins/surbl.c52
-rw-r--r--src/worker.c5
16 files changed, 415 insertions, 456 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1a4a0b042..754b2200a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,8 +6,8 @@
PROJECT(rspamd C)
SET(RSPAMD_VERSION_MAJOR 0)
-SET(RSPAMD_VERSION_MINOR 1)
-SET(RSPAMD_VERSION_PATCH 8)
+SET(RSPAMD_VERSION_MINOR 2)
+SET(RSPAMD_VERSION_PATCH 0)
SET(RSPAMD_VERSION "${RSPAMD_VERSION_MAJOR}.${RSPAMD_VERSION_MINOR}.${RSPAMD_VERSION_PATCH}")
SET(RSPAMD_MASTER_SITE_URL "http://cebka.pp.ru/hg/rspamd")
@@ -226,7 +226,6 @@ CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
CHECK_SYMBOL_EXISTS(MAP_SHARED sys/mman.h HAVE_MMAP_SHARED)
CHECK_SYMBOL_EXISTS(MAP_ANON sys/mman.h HAVE_MMAP_ANON)
CHECK_SYMBOL_EXISTS(_SC_NPROCESSORS_ONLN unistd.h HAVE_SC_NPROCESSORS_ONLN)
-CHECK_SYMBOL_EXISTS(SLIST_FOREACH_SAFE sys/queue.h HAVE_COMPATIBLE_QUEUE_H)
CHECK_SYMBOL_EXISTS(CLOCK_PROCESS_CPUTIME_ID time.h HAVE_CLOCK_PROCESS_CPUTIME_ID)
CHECK_SYMBOL_EXISTS(CLOCK_VIRTUAL time.h HAVE_CLOCK_VIRTUAL)
diff --git a/rspamd.conf.sample b/rspamd.conf.sample
index 2b70cbd7c..42cf9d2a5 100644
--- a/rspamd.conf.sample
+++ b/rspamd.conf.sample
@@ -7,18 +7,28 @@
# Default: pidfile = /var/run/rspamd.pid
pidfile = "./rspamd.pid";
-# Number of workers to process connections
-# Default: 1
-workers = 1;
-# Socket for accepting mail to filter, can be unix socket if begin with '/'
-# (bind_socket=/var/run/rspamd.sock for example)
-bind_socket = localhost:11333;
+worker {
+ type = "normal";
+
+ # Number of workers to process connections
+ # Default: number of processors in system
+ count = 1;
+
+ # Socket for accepting mail to filter, can be unix socket if begin with '/'
+ # (bind_socket=/var/run/rspamd.sock for example)
+ bind_socket = localhost:11333;
+};
+
# Settings for controller interface
-control {
+worker {
+ type = "controller";
+
# Bind socket for control interface
bind_socket = localhost:11334;
+
+ count = 1;
# Password for privilleged commands
password = "q1";
};
@@ -77,29 +87,29 @@ factors {
};
# Options for lmtp worker
-lmtp {
- enabled = yes;
+worker {
+ type = "lmtp";
# Bind socket for lmtp interface
bind_socket = localhost:11335;
# Metric that is considered as main. If we have spam result on
# this metric, lmtp delivery would be failed
metric = "default";
# Number of lmtp workers
- workers = 1;
+ count = 1;
};
-delivery {
- enabled = yes;
+#worker {
+# type = "delivery";
# Path to delivery agent, %f is expanded as mail from address and %r
# is expanded as recipient address
# Expample: agent = "/usr/local/bin/procmail -f %f -d %r"
- agent = "/dev/null";
+# agent = "/dev/null";
# Bind socket for lmtp interface
# Example: bind_socket = localhost:25
# Whether we should use lmtp for MTA delivery
- lmtp = no;
-};
+# lmtp = no;
+#};
# SURBL module params, note that single quotes are mandatory here
.module 'surbl' {
diff --git a/src/cfg_file.h b/src/cfg_file.h
index 201b46823..1a02644f7 100644
--- a/src/cfg_file.h
+++ b/src/cfg_file.h
@@ -97,7 +97,6 @@ struct memcached_server {
*/
struct perl_module {
char *path; /**< path to module */
- LIST_ENTRY (perl_module) next; /**< chain link */
};
/**
@@ -106,7 +105,6 @@ struct perl_module {
struct module_opt {
char *param; /**< parameter name */
char *value; /**< paramater value */
- LIST_ENTRY (module_opt) next;
};
/**
@@ -144,6 +142,21 @@ struct config_scalar {
} type; /**< type of data */
};
+
+/**
+ * Config params for rspamd worker
+ */
+struct worker_conf {
+ int type; /**< worker type */
+ char *bind_host; /**< bind line */
+ struct in_addr bind_addr; /**< bind address in case of TCP socket */
+ uint16_t bind_port; /**< bind port in case of TCP socket */
+ uint16_t bind_family; /**< bind type (AF_UNIX or AF_INET) */
+ int count; /**< number of workers */
+ GHashTable *params; /**< params for worker */
+ int listen_sock; /**< listening socket desctiptor */
+};
+
/**
* Structure that stores all config data
*/
@@ -158,23 +171,9 @@ struct config_file {
char *profile_path;
#endif
- char *bind_host; /**< bind line */
- struct in_addr bind_addr; /**< bind address in case of TCP socket */
- uint16_t bind_port; /**< bind port in case of TCP socket */
- uint16_t bind_family; /**< bind type (AF_UNIX or AF_INET) */
-
- char *control_host; /**< bind line for controller */
- struct in_addr control_addr; /**< bind address for controller */
- uint16_t control_port; /**< bind port for controller */
- uint16_t control_family; /**< bind family for controller */
- int controller_enabled; /**< whether controller is enabled */
- char *control_password; /**< controller password */
-
gboolean no_fork; /**< if 1 do not call daemon() */
gboolean config_test; /**< if TRUE do only config file test */
gboolean raw_mode; /**< work in raw mode instead of utf one */
- unsigned int workers_number; /**< number of workers */
- unsigned int lmtp_workers_number; /**< number of lmtp workers */
enum rspamd_log_type log_type; /**< log type */
int log_facility; /**< log facility in case of syslog */
@@ -192,13 +191,6 @@ struct config_file {
unsigned int memcached_maxerrors; /**< maximum number of errors */
unsigned int memcached_connect_timeout; /**< connection timeout */
- gboolean lmtp_enable; /**< is lmtp agent is enabled */
- char *lmtp_host; /**< host for lmtp agent */
- struct in_addr lmtp_addr; /**< bind address for lmtp */
- uint16_t lmtp_port; /**< bind port for lmtp agent */
- uint16_t lmtp_family; /**< bind family for lmtp agent */
- char *lmtp_metric; /**< metric to use in lmtp module */
-
gboolean delivery_enable; /**< is delivery agent is enabled */
char *deliver_host; /**< host for mail deliviring */
struct in_addr deliver_addr; /**< its address */
@@ -207,12 +199,13 @@ struct config_file {
char *deliver_agent_path; /**< deliver to pipe instead of socket */
gboolean deliver_lmtp; /**< use LMTP instead of SMTP */
- LIST_HEAD (modulesq, perl_module) perl_modules; /**< linked list of perl modules to load */
+ GList *perl_modules; /**< linked list of perl modules to load */
- LIST_HEAD (headersq, filter) header_filters; /**< linked list of all header's filters */
- LIST_HEAD (mimesq, filter) mime_filters; /**< linked list of all mime filters */
- LIST_HEAD (messagesq, filter) message_filters; /**< linked list of all message's filters */
- LIST_HEAD (urlsq, filter) url_filters; /**< linked list of all url's filters */
+ GList *header_filters; /**< linked list of all header's filters */
+ GList *mime_filters; /**< linked list of all mime filters */
+ GList *message_filters; /**< linked list of all message's filters */
+ GList *url_filters; /**< linked list of all url's filters */
+ GList *workers; /**< linked list of all workers params */
char *header_filters_str; /**< string of header's filters */
char *mime_filters_str; /**< string of mime's filters */
char *message_filters_str; /**< string of message's filters */
@@ -243,7 +236,7 @@ int add_memcached_server (struct config_file *cf, char *str);
* @param type type of credits
* @return 1 if line was successfully parsed and 0 in case of error
*/
-int parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type);
+int parse_bind_line (struct config_file *cfg, struct worker_conf *cf, char *str);
/**
* Init default values
diff --git a/src/cfg_file.l b/src/cfg_file.l
index 876ef9c4a..b758c7eb6 100644
--- a/src/cfg_file.l
+++ b/src/cfg_file.l
@@ -1,6 +1,7 @@
%x incl
%x module
%x lua
+%x worker
%{
@@ -32,10 +33,12 @@ extern struct config_file *cfg;
.include BEGIN(incl);
.module BEGIN(module);
.lua BEGIN(lua);
+worker BEGIN(worker); return WORKER;
composites return COMPOSITES;
tempdir return TEMPDIR;
pidfile return PIDFILE;
-workers return WORKERS;
+
+
error_time return ERROR_TIME;
dead_time return DEAD_TIME;
maxerrors return MAXERRORS;
@@ -43,7 +46,6 @@ reconnect_timeout return RECONNECT_TIMEOUT;
connect_timeout return CONNECT_TIMEOUT;
protocol return PROTOCOL;
memcached return MEMCACHED;
-bind_socket return BINDSOCK;
servers return SERVERS;
require return REQUIRE;
header_filters return HEADER_FILTERS;
@@ -161,6 +163,24 @@ yes|YES|no|NO|[yY]|[nN] yylval.flag=parse_flag(yytext); return FLAG;
<module>[a-zA-Z0-9_%-]+ yylval.string=strdup(yytext); return PARAM;
<module>\".+[^\\]\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; unescape_quotes(yylval.string); return QUOTEDSTRING;
+<worker>\n /* ignore EOL */;
+<worker>[ \t]+ /* ignore whitespace */;
+<worker>[ \t]*#.* /* ignore comments */;
+<worker>\{ return OBRACE;
+<worker>\} BEGIN(INITIAL); return EBRACE;
+<worker>\; return SEMICOLON;
+<worker>= return EQSIGN;
+<worker>type return TYPE;
+<worker>bind_socket return BINDSOCK;
+<worker>count return COUNT;
+<worker>[0-9]+ yylval.number=strtol(yytext, NULL, 10); return NUMBER;
+<worker>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3} yylval.string=strdup(yytext); return IPADDR;
+<worker>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/[0-9]{1,2} yylval.string=strdup(yytext); return IPNETWORK;
+<worker>[*a-zA-Z0-9.-]+:[0-9]{1,5} yylval.string=strdup(yytext); return HOSTPORT;
+<worker>[a-zA-Z<][a-zA-Z@+>_-]* yylval.string=strdup(yytext); return STRING;
+<worker>\$[a-zA-Z_][a-zA-Z0-9_]+ yylval.string=strdup(yytext + 1); return VARIABLE;
+<worker>\".+[^\\]\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; unescape_quotes(yylval.string); return QUOTEDSTRING;
+
<lua>\n /* ignore EOL */;
<lua>[ \t]+ /* ignore whitespace */;
<lua>[ \t]*#.* /* ignore comments */;
diff --git a/src/cfg_file.y b/src/cfg_file.y
index e4e4d8dbe..0062cdeea 100644
--- a/src/cfg_file.y
+++ b/src/cfg_file.y
@@ -13,15 +13,17 @@
#else
#include "perl.h"
#endif
+#define YYDEBUG 1
extern struct config_file *cfg;
extern int yylineno;
extern char *yytext;
-LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+GList *cur_module_opt = NULL;
struct metric *cur_metric = NULL;
struct statfile *cur_statfile = NULL;
struct statfile_section *cur_section = NULL;
+struct worker_conf *cur_worker = NULL;
%}
@@ -41,14 +43,14 @@ struct statfile_section *cur_section = NULL;
%token MAXSIZE SIZELIMIT SECONDS BEANSTALK MYSQL USER PASSWORD DATABASE
%token TEMPDIR PIDFILE SERVERS ERROR_TIME DEAD_TIME MAXERRORS CONNECT_TIMEOUT PROTOCOL RECONNECT_TIMEOUT
%token READ_SERVERS WRITE_SERVER DIRECTORY_SERVERS MAILBOX_QUERY USERS_QUERY LASTLOGIN_QUERY
-%token MEMCACHED WORKERS REQUIRE MODULE
+%token MEMCACHED WORKER TYPE REQUIRE MODULE
%token MODULE_OPT PARAM VARIABLE
%token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME
%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD
%token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
%token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
%token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE TOKENIZER CLASSIFIER
-%token DELIVERY LMTP ENABLED AGENT SECTION LUACODE RAW_MODE PROFILE_FILE
+%token DELIVERY LMTP ENABLED AGENT SECTION LUACODE RAW_MODE PROFILE_FILE COUNT
%type <string> STRING
%type <string> VARIABLE
@@ -71,12 +73,10 @@ file : /* empty */
;
command :
- bindsock
- | control
| tempdir
| pidfile
| memcached
- | workers
+ | worker
| require
| header_filters
| mime_filters
@@ -90,8 +90,6 @@ command :
| logging
| statfile
| statfile_pool_size
- | lmtp
- | delivery
| luacode
| raw_mode
| profile_file
@@ -120,63 +118,6 @@ pidfile :
}
;
-control:
- CONTROL OBRACE controlbody EBRACE
- ;
-
-controlbody:
- controlcmd SEMICOLON
- | controlbody controlcmd SEMICOLON
- ;
-
-controlcmd:
- controlsock
- | controlpassword
- ;
-
-controlsock:
- BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3, CRED_CONTROL)) {
- yyerror ("yyparse: parse_bind_line");
- YYERROR;
- }
- cfg->controller_enabled = 1;
- free ($3);
- }
- ;
-controlpassword:
- PASSWORD EQSIGN QUOTEDSTRING {
- cfg->control_password = memory_pool_strdup (cfg->cfg_pool, $3);
- }
- ;
-
-bindsock:
- BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3, CRED_NORMAL)) {
- yyerror ("yyparse: parse_bind_line");
- YYERROR;
- }
- free ($3);
- }
- ;
-
-bind_cred:
- STRING {
- $$ = $1;
- }
- | IPADDR{
- $$ = $1;
- }
- | DOMAINNAME {
- $$ = $1;
- }
- | HOSTPORT {
- $$ = $1;
- }
- | QUOTEDSTRING {
- $$ = $1;
- }
- ;
header_filters:
HEADER_FILTERS EQSIGN QUOTEDSTRING {
@@ -283,12 +224,137 @@ memcached_protocol:
}
}
;
-workers:
- WORKERS EQSIGN NUMBER {
- cfg->workers_number = $3;
+
+/* Workers section */
+worker:
+ WORKER OBRACE workerbody EBRACE {
+ cfg->workers = g_list_prepend (cfg->workers, cur_worker);
+ cur_worker = NULL;
+ }
+ ;
+
+workerbody:
+ workercmd SEMICOLON
+ | workerbody workercmd SEMICOLON
+ ;
+
+workercmd:
+ | bindsock
+ | workertype
+ | workercount
+ | workerparam
+ ;
+
+bindsock:
+ BINDSOCK EQSIGN bind_cred {
+ if (cur_worker == NULL) {
+ cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
+ cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
+#ifdef HAVE_SC_NPROCESSORS_ONLN
+ cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
+#else
+ cur_worker->count = DEFAULT_WORKERS_NUM;
+#endif
+ }
+
+ if (!parse_bind_line (cfg, cur_worker, $3)) {
+ yyerror ("yyparse: parse_bind_line");
+ YYERROR;
+ }
+ free ($3);
+ }
+ ;
+
+bind_cred:
+ STRING {
+ $$ = $1;
+ }
+ | IPADDR{
+ $$ = $1;
+ }
+ | DOMAINNAME {
+ $$ = $1;
+ }
+ | HOSTPORT {
+ $$ = $1;
+ }
+ | QUOTEDSTRING {
+ $$ = $1;
+ }
+ ;
+
+workertype:
+ TYPE EQSIGN QUOTEDSTRING {
+ if (cur_worker == NULL) {
+ cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
+ cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
+#ifdef HAVE_SC_NPROCESSORS_ONLN
+ cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
+#else
+ cur_worker->count = DEFAULT_WORKERS_NUM;
+#endif
+ }
+
+ if (g_ascii_strcasecmp ($3, "normal") == 0) {
+ cur_worker->type = TYPE_WORKER;
+ }
+ else if (g_ascii_strcasecmp ($3, "controller") == 0) {
+ cur_worker->type = TYPE_CONTROLLER;
+ }
+ else if (g_ascii_strcasecmp ($3, "lmtp") == 0) {
+ cur_worker->type = TYPE_LMTP;
+ }
+ else if (g_ascii_strcasecmp ($3, "fuzzy") == 0) {
+ cur_worker->type = TYPE_FUZZY;
+ }
+ else {
+ yyerror ("yyparse: unknown worker type: %s", $3);
+ YYERROR;
+ }
+ }
+ ;
+
+workercount:
+ COUNT EQSIGN NUMBER {
+ if (cur_worker == NULL) {
+ cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
+ cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
+#ifdef HAVE_SC_NPROCESSORS_ONLN
+ cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
+#else
+ cur_worker->count = DEFAULT_WORKERS_NUM;
+#endif
+ }
+
+ if ($3 > 0) {
+ cur_worker->count = $3;
+ }
+ else {
+ yyerror ("yyparse: invalid number of workers: %d", $3);
+ YYERROR;
+ }
}
;
+workerparam:
+ STRING EQSIGN QUOTEDSTRING {
+ if (cur_worker == NULL) {
+ cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
+ cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
+#ifdef HAVE_SC_NPROCESSORS_ONLN
+ cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
+#else
+ cur_worker->count = DEFAULT_WORKERS_NUM;
+#endif
+ }
+
+ g_hash_table_insert (cur_worker->params, $1, $3);
+ }
+
metric:
METRIC OBRACE metricbody EBRACE {
if (cur_metric == NULL || cur_metric->name == NULL) {
@@ -411,7 +477,7 @@ requirecmd:
YYERROR;
}
cur->path = $3;
- LIST_INSERT_HEAD (&cfg->perl_modules, cur, next);
+ cfg->perl_modules = g_list_prepend (cfg->perl_modules, cur);
#else
yyerror ("require command is not available when perl support is not compiled");
YYERROR;
@@ -453,14 +519,10 @@ moduleoptbody:
optcmd:
PARAM EQSIGN QUOTEDSTRING {
struct module_opt *mopt;
- if (cur_module_opt == NULL) {
- cur_module_opt = g_malloc (sizeof (cur_module_opt));
- LIST_INIT (cur_module_opt);
- }
mopt = memory_pool_alloc (cfg->cfg_pool, sizeof (struct module_opt));
mopt->param = $1;
mopt->value = $3;
- LIST_INSERT_HEAD (cur_module_opt, mopt, next);
+ cur_module_opt = g_list_prepend (cur_module_opt, mopt);
}
| VARIABLE EQSIGN QUOTEDSTRING {
g_hash_table_insert (cfg->variables, $1, $3);
@@ -749,89 +811,6 @@ statfile_pool_size:
}
;
-lmtp:
- LMTP OBRACE lmtpbody EBRACE
- ;
-
-lmtpbody:
- lmtpcmd SEMICOLON
- | lmtpbody lmtpcmd SEMICOLON
- ;
-
-lmtpcmd:
- lmtpenabled
- | lmtpsock
- | lmtpmetric
- | lmtpworkers
- ;
-
-lmtpenabled:
- ENABLED EQSIGN FLAG {
- cfg->lmtp_enable = $3;
- }
- ;
-
-lmtpsock:
- BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3, CRED_LMTP)) {
- yyerror ("yyparse: parse_bind_line");
- YYERROR;
- }
- free ($3);
- }
- ;
-lmtpmetric:
- METRIC EQSIGN QUOTEDSTRING {
- cfg->lmtp_metric = memory_pool_strdup (cfg->cfg_pool, $3);
- }
- ;
-lmtpworkers:
- WORKERS EQSIGN NUMBER {
- cfg->lmtp_workers_number = $3;
- }
- ;
-
-delivery:
- DELIVERY OBRACE deliverybody EBRACE
- ;
-
-deliverybody:
- deliverycmd SEMICOLON
- | deliverybody deliverycmd SEMICOLON
- ;
-
-deliverycmd:
- deliveryenabled
- | deliverysock
- | deliveryagent
- | deliverylmtp
- ;
-
-deliveryenabled:
- ENABLED EQSIGN FLAG {
- cfg->delivery_enable = $3;
- }
- ;
-
-deliverysock:
- BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3, CRED_DELIVERY)) {
- yyerror ("yyparse: parse_bind_line");
- YYERROR;
- }
- free ($3);
- }
- ;
-deliverylmtp:
- LMTP EQSIGN FLAG {
- cfg->deliver_lmtp = $3;
- }
- ;
-deliveryagent:
- AGENT EQSIGN QUOTEDSTRING {
- cfg->deliver_agent_path = memory_pool_strdup (cfg->cfg_pool, $3);
- }
- ;
luacode:
LUACODE
diff --git a/src/cfg_utils.c b/src/cfg_utils.c
index 79fabdb50..afb15a652 100644
--- a/src/cfg_utils.c
+++ b/src/cfg_utils.c
@@ -83,7 +83,7 @@ add_memcached_server (struct config_file *cf, char *str)
}
int
-parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
+parse_bind_line (struct config_file *cfg, struct worker_conf *cf, char *str)
{
char *cur_tok, *err_str;
struct hostent *hent;
@@ -95,43 +95,18 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
if (str == NULL) return 0;
cur_tok = strsep (&str, ":");
- switch (type) {
- case CRED_NORMAL:
- host = &cf->bind_host;
- port = &cf->bind_port;
- *port = DEFAULT_BIND_PORT;
- family = &cf->bind_family;
- addr = &cf->bind_addr;
- break;
- case CRED_CONTROL:
- host = &cf->control_host;
- port = &cf->control_port;
- *port = DEFAULT_CONTROL_PORT;
- family = &cf->control_family;
- addr = &cf->control_addr;
- break;
- case CRED_LMTP:
- host = &cf->lmtp_host;
- port = &cf->lmtp_port;
- *port = DEFAULT_LMTP_PORT;
- family = &cf->lmtp_family;
- addr = &cf->lmtp_addr;
- break;
- case CRED_DELIVERY:
- host = &cf->deliver_host;
- port = &cf->deliver_port;
- *port = 25;
- family = &cf->deliver_family;
- addr = &cf->deliver_addr;
- break;
- }
+ host = &cf->bind_host;
+ port = &cf->bind_port;
+ *port = DEFAULT_BIND_PORT;
+ family = &cf->bind_family;
+ addr = &cf->bind_addr;
if (cur_tok[0] == '/' || cur_tok[0] == '.') {
#ifdef HAVE_DIRNAME
/* Try to check path of bind credit */
struct stat st;
int fd;
- char *copy = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ char *copy = memory_pool_strdup (cfg->cfg_pool, cur_tok);
if (stat (copy, &st) == -1) {
if (errno == ENOENT) {
if ((fd = open (cur_tok, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
@@ -155,7 +130,7 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
}
}
#endif
- *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ *host = memory_pool_strdup (cfg->cfg_pool, cur_tok);
*family = AF_UNIX;
return 1;
@@ -168,7 +143,7 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
}
}
if (strcmp (cur_tok, "*") == 0) {
- *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ *host = memory_pool_strdup (cfg->cfg_pool, cur_tok);
addr->s_addr = htonl (INADDR_ANY);
} else if (!inet_aton (cur_tok, addr)) {
/* Try to call gethostbyname */
@@ -177,13 +152,13 @@ parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
return 0;
}
else {
- *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ *host = memory_pool_strdup (cfg->cfg_pool, cur_tok);
memcpy((char *)addr, hent->h_addr, sizeof(struct in_addr));
s = strlen (cur_tok) + 1;
}
}
else {
- *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ *host = memory_pool_strdup (cfg->cfg_pool, cur_tok);
}
*family = AF_INET;
@@ -203,11 +178,7 @@ init_defaults (struct config_file *cfg)
cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS;
cfg->memcached_protocol = TCP_TEXT;
-#ifdef HAVE_SC_NPROCESSORS_ONLN
- cfg->workers_number = sysconf (_SC_NPROCESSORS_ONLN);
-#else
- cfg->workers_number = DEFAULT_WORKERS_NUM;
-#endif
+
cfg->max_statfile_size = DEFAULT_STATFILE_SIZE;
cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal);
cfg->variables = g_hash_table_new (g_str_hash, g_str_equal);
@@ -217,7 +188,6 @@ init_defaults (struct config_file *cfg)
cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal);
cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal);
cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal);
- cfg->lmtp_metric = "default";
def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric));
def_metric->name = "default";
@@ -227,7 +197,6 @@ init_defaults (struct config_file *cfg)
def_metric->classifier = get_classifier ("winnow");
g_hash_table_insert (cfg->metrics, "default", def_metric);
- LIST_INIT (&cfg->perl_modules);
}
void
@@ -255,18 +224,20 @@ free_config (struct config_file *cfg)
char*
get_module_opt (struct config_file *cfg, char *module_name, char *opt_name)
{
- LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+ GList *cur_opt;
struct module_opt *cur;
- cur_module_opt = g_hash_table_lookup (cfg->modules_opts, module_name);
- if (cur_module_opt == NULL) {
+ cur_opt = g_hash_table_lookup (cfg->modules_opts, module_name);
+ if (cur_opt == NULL) {
return NULL;
}
-
- LIST_FOREACH (cur, cur_module_opt, next) {
+
+ while (cur_opt) {
+ cur = cur_opt->data;
if (strcmp (cur->param, opt_name) == 0) {
return cur->value;
}
+ cur_opt = g_list_next (cur_opt);
}
return NULL;
@@ -408,13 +379,15 @@ static void
substitute_module_variables (gpointer key, gpointer value, gpointer data)
{
struct config_file *cfg = (struct config_file *)data;
- LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value;
- struct module_opt *cur, *tmp;
+ GList *cur_opt = (GList *)value;
+ struct module_opt *cur;
- LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) {
+ while (cur_opt) {
+ cur = cur_opt->data;
if (cur->value) {
cur->value = substitute_variable (cfg, NULL, cur->value, 1);
}
+ cur_opt = g_list_next (cur_opt);
}
}
@@ -455,19 +428,19 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty
switch (type) {
case SCRIPT_HEADER:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->header_filters, cur, next);
+ cfg->header_filters = g_list_prepend (cfg->header_filters, cur);
break;
case SCRIPT_MIME:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->mime_filters, cur, next);
+ cfg->mime_filters = g_list_prepend (cfg->mime_filters, cur);
break;
case SCRIPT_MESSAGE:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->message_filters, cur, next);
+ cfg->message_filters = g_list_prepend (cfg->message_filters, cur);
break;
case SCRIPT_URL:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->url_filters, cur, next);
+ cfg->url_filters = g_list_prepend (cfg->url_filters, cur);
break;
}
break;
@@ -483,19 +456,19 @@ parse_filters_str (struct config_file *cfg, const char *str, enum script_type ty
switch (type) {
case SCRIPT_HEADER:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->header_filters, cur, next);
+ cfg->header_filters = g_list_prepend (cfg->header_filters, cur);
break;
case SCRIPT_MIME:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->mime_filters, cur, next);
+ cfg->mime_filters = g_list_prepend (cfg->mime_filters, cur);
break;
case SCRIPT_MESSAGE:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->message_filters, cur, next);
+ cfg->message_filters = g_list_prepend (cfg->message_filters, cur);
break;
case SCRIPT_URL:
cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
- LIST_INSERT_HEAD (&cfg->url_filters, cur, next);
+ cfg->url_filters = g_list_prepend (cfg->message_filters, cur);
break;
}
p ++;
@@ -523,27 +496,9 @@ fill_cfg_params (struct config_file *cfg)
scalars[2].type = SCALAR_TYPE_STR;
scalars[2].pointer = &cfg->temp_dir;
g_hash_table_insert (cfg->cfg_params, "temp_dir", &scalars[2]);
- scalars[3].type = SCALAR_TYPE_STR;
- scalars[3].pointer = &cfg->bind_host;
- g_hash_table_insert (cfg->cfg_params, "bind_host", &scalars[3]);
- scalars[4].type = SCALAR_TYPE_STR;
- scalars[4].pointer = &cfg->control_host;
- g_hash_table_insert (cfg->cfg_params, "control_host", &scalars[4]);
- scalars[5].type = SCALAR_TYPE_INT;
- scalars[5].pointer = &cfg->controller_enabled;
- g_hash_table_insert (cfg->cfg_params, "controller_enabled", &scalars[5]);
- scalars[6].type = SCALAR_TYPE_STR;
- scalars[6].pointer = &cfg->control_password;
- g_hash_table_insert (cfg->cfg_params, "control_password", &scalars[6]);
- scalars[7].type = SCALAR_TYPE_INT;
- scalars[7].pointer = &cfg->no_fork;
- g_hash_table_insert (cfg->cfg_params, "no_fork", &scalars[7]);
- scalars[8].type = SCALAR_TYPE_UINT;
- scalars[8].pointer = &cfg->workers_number;
- g_hash_table_insert (cfg->cfg_params, "workers_number", &scalars[8]);
- scalars[9].type = SCALAR_TYPE_SIZE;
- scalars[9].pointer = &cfg->max_statfile_size;
- g_hash_table_insert (cfg->cfg_params, "max_statfile_size", &scalars[9]);
+ scalars[3].type = SCALAR_TYPE_SIZE;
+ scalars[3].pointer = &cfg->max_statfile_size;
+ g_hash_table_insert (cfg->cfg_params, "max_statfile_size", &scalars[3]);
}
@@ -555,11 +510,6 @@ post_load_config (struct config_file *cfg)
{
struct timespec ts;
- if (cfg->lmtp_enable && !cfg->delivery_enable) {
- yywarn ("post_load_config: lmtp is enabled, but delivery is not enabled, disabling lmtp");
- cfg->lmtp_enable = FALSE;
- }
-
g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg);
g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg);
parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER);
diff --git a/src/controller.c b/src/controller.c
index f20b19ee7..e45075f9e 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -174,6 +174,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
struct statfile *statfile;
struct metric *metric;
memory_pool_stat_t mem_st;
+ char *password = g_hash_table_lookup (session->worker->cf->params, "password");
switch (cmd->type) {
case COMMAND_PASSWORD:
@@ -184,7 +185,12 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
return;
}
- if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) {
+ if (password == NULL) {
+ r = snprintf (out_buf, sizeof (out_buf), "password command disabled in config, authorized access unallowed" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
+ return;
+ }
+ if (strncmp (arg, password, strlen (arg)) == 0) {
session->authorized = 1;
r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
@@ -542,14 +548,12 @@ void
start_controller (struct rspamd_worker *worker)
{
struct sigaction signals;
- int listen_sock, i;
- struct sockaddr_un *un_addr;
+ int i;
GList *comp_list = NULL;
char *hostbuf;
long int hostmax;
worker->srv->pid = getpid ();
- worker->srv->type = TYPE_CONTROLLER;
event_init ();
g_mime_init (0);
@@ -560,27 +564,9 @@ start_controller (struct rspamd_worker *worker)
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
signal_add (&worker->sig_ev, NULL);
- if (worker->srv->cfg->control_family == AF_INET) {
- if ((listen_sock = make_tcp_socket (&worker->srv->cfg->control_addr, worker->srv->cfg->control_port, TRUE)) == -1) {
- msg_err ("start_controller: cannot create tcp listen socket. %s", strerror (errno));
- exit(-errno);
- }
- }
- else {
- un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
- if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->control_host, un_addr, TRUE)) == -1) {
- msg_err ("start_controller: cannot create unix listen socket. %s", strerror (errno));
- exit(-errno);
- }
- }
start_time = time (NULL);
- if (listen (listen_sock, -1) == -1) {
- msg_err ("start_controller: cannot listen on socket. %s", strerror (errno));
- exit(-errno);
- }
-
/* Init command completion */
for (i = 0; i < G_N_ELEMENTS (commands); i ++) {
comp_list = g_list_prepend (comp_list, &commands[i]);
@@ -594,7 +580,7 @@ start_controller (struct rspamd_worker *worker)
hostbuf[hostmax - 1] = '\0';
snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
/* Accept event */
- event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);
/* Send SIGUSR2 to parent */
@@ -604,7 +590,6 @@ start_controller (struct rspamd_worker *worker)
io_tv.tv_usec = 0;
event_loop (0);
- close (listen_sock);
exit (EXIT_SUCCESS);
}
diff --git a/src/filter.c b/src/filter.c
index d06987b97..b34a88003 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -232,56 +232,61 @@ metric_process_callback (gpointer key, gpointer value, void *data)
static int
continue_process_filters (struct worker_task *task)
{
- struct filter *cur = task->save.entry;
+ GList *cur = task->save.entry;
+ struct filter *filt = cur->data;
- cur = LIST_NEXT (cur, next);
+ cur = g_list_next (cur);
/* Note: no breaks in this case! */
switch (task->save.type) {
case SCRIPT_HEADER:
while (cur) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER);
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_HEADER;
return 0;
}
- cur = LIST_NEXT (cur, next);
+ cur = g_list_next (cur);
}
/* Process mime filters */
- cur = LIST_FIRST (&task->worker->srv->cfg->mime_filters);
+ cur = g_list_first (task->worker->srv->cfg->mime_filters);
case SCRIPT_MIME:
while (cur) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME);
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MIME);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_MIME;
return 0;
}
- cur = LIST_NEXT (cur, next);
+ cur = g_list_next (cur);
}
/* Process url filters */
- cur = LIST_FIRST (&task->worker->srv->cfg->url_filters);
+ cur = g_list_first (task->worker->srv->cfg->url_filters);
case SCRIPT_URL:
while (cur) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL);
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_URL);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_URL;
return 0;
}
- cur = LIST_NEXT (cur, next);
+ cur = g_list_next (cur);
}
/* Process message filters */
- cur = LIST_FIRST (&task->worker->srv->cfg->message_filters);
+ cur = g_list_first (task->worker->srv->cfg->message_filters);
case SCRIPT_MESSAGE:
while (cur) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE);
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MESSAGE);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_MESSAGE;
return 0;
}
- cur = LIST_NEXT (cur, next);
+ cur = g_list_next (cur);
}
/* Process all statfiles */
process_statfiles (task);
@@ -296,7 +301,8 @@ continue_process_filters (struct worker_task *task)
int
process_filters (struct worker_task *task)
{
- struct filter *cur;
+ GList *cur;
+ struct filter *filt;
if (task->save.saved) {
task->save.saved = 0;
@@ -304,40 +310,52 @@ process_filters (struct worker_task *task)
}
/* Process filters in order that they are listed in config file */
- LIST_FOREACH (cur, &task->worker->srv->cfg->header_filters, next) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER);
+ cur = task->worker->srv->cfg->header_filters;
+ while (cur) {
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_HEADER;
return 0;
}
+ cur = g_list_next (cur);
}
- LIST_FOREACH (cur, &task->worker->srv->cfg->mime_filters, next) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME);
+ cur = task->worker->srv->cfg->mime_filters;
+ while (cur) {
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MIME);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_MIME;
return 0;
}
+ cur = g_list_next (cur);
}
- LIST_FOREACH (cur, &task->worker->srv->cfg->url_filters, next) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL);
+ cur = task->worker->srv->cfg->url_filters;
+ while (cur) {
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_URL);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_URL;
return 0;
}
+ cur = g_list_next (cur);
}
- LIST_FOREACH (cur, &task->worker->srv->cfg->message_filters, next) {
- call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE);
+ cur = task->worker->srv->cfg->message_filters;
+ while (cur) {
+ filt = cur->data;
+ call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_MESSAGE);
if (task->save.saved) {
task->save.entry = cur;
task->save.type = SCRIPT_MESSAGE;
return 0;
}
+ cur = g_list_next (cur);
}
/* Process all metrics */
diff --git a/src/filter.h b/src/filter.h
index 4e51bf2a7..c460ec317 100644
--- a/src/filter.h
+++ b/src/filter.h
@@ -21,7 +21,6 @@ enum filter_type { C_FILTER, PERL_FILTER };
struct filter {
char *func_name; /**< function name */
enum filter_type type; /**< filter type (c or perl) */
- LIST_ENTRY (filter) next; /**< chain link */
};
/**
diff --git a/src/lmtp.c b/src/lmtp.c
index 5f2878a08..431083657 100644
--- a/src/lmtp.c
+++ b/src/lmtp.c
@@ -253,7 +253,7 @@ accept_socket (int fd, short what, void *arg)
* Start lmtp worker process
*/
void
-start_lmtp_worker (struct rspamd_worker *worker, int listen_sock)
+start_lmtp_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
int i;
@@ -273,7 +273,7 @@ start_lmtp_worker (struct rspamd_worker *worker, int listen_sock)
signal_add (&worker->sig_ev, NULL);
/* Accept event */
- event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);
/* Perform modules configuring */
diff --git a/src/lmtp.h b/src/lmtp.h
index b784eed3b..d7c13c497 100644
--- a/src/lmtp.h
+++ b/src/lmtp.h
@@ -15,6 +15,6 @@
#define LMTP_NO_RCPT 554
#define LMTP_TEMP_FAIL 421
-void start_lmtp_worker (struct rspamd_worker *worker, int listen_sock);
+void start_lmtp_worker (struct rspamd_worker *worker);
#endif
diff --git a/src/main.c b/src/main.c
index 9b8992d8b..dc0226680 100644
--- a/src/main.c
+++ b/src/main.c
@@ -51,7 +51,7 @@ struct config_file *cfg;
rspamd_hash_t *counters;
static void sig_handler (int );
-static struct rspamd_worker * fork_worker (struct rspamd_main *, int, enum process_type);
+static struct rspamd_worker * fork_worker (struct rspamd_main *, struct worker_conf *);
sig_atomic_t do_restart;
sig_atomic_t do_terminate;
@@ -269,7 +269,7 @@ reread_config (struct rspamd_main *rspamd)
}
static struct rspamd_worker *
-fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type)
+fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
{
struct rspamd_worker *cur;
/* Starting worker process */
@@ -278,12 +278,14 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type
bzero (cur, sizeof (struct rspamd_worker));
TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
cur->srv = rspamd;
- cur->type = type;
+ cur->type = cf->type;
cur->pid = fork();
+ cur->cf = cf;
switch (cur->pid) {
case 0:
- /* TODO: add worker code */
- switch (type) {
+ /* Drop privilleges */
+ drop_priv (cfg);
+ switch (cf->type) {
case TYPE_CONTROLLER:
setproctitle ("controller process");
pidfile_close (rspamd->pfh);
@@ -294,13 +296,13 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type
setproctitle ("lmtp process");
pidfile_close (rspamd->pfh);
msg_info ("fork_worker: starting lmtp process %d", getpid ());
- start_lmtp_worker (cur, listen_sock);
+ start_lmtp_worker (cur);
case TYPE_WORKER:
default:
setproctitle ("worker process");
pidfile_close (rspamd->pfh);
msg_info ("fork_worker: starting worker process %d", getpid ());
- start_worker (cur, listen_sock);
+ start_worker (cur);
break;
}
break;
@@ -316,35 +318,27 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type
}
static void
-delay_fork (enum process_type type)
+delay_fork (struct worker_conf *cf)
{
- workers_pending = g_list_prepend (workers_pending, GINT_TO_POINTER (type));
+ workers_pending = g_list_prepend (workers_pending, cf);
(void)alarm (SOFT_FORK_TIME);
}
-static void
-fork_delayed (struct rspamd_main *rspamd, int listen_sock)
-{
- GList *cur;
-
- while (workers_pending != NULL) {
- cur = workers_pending;
- workers_pending = g_list_remove_link (workers_pending, cur);
- fork_worker (rspamd, listen_sock, GPOINTER_TO_INT (cur->data));
- g_list_free_1 (cur);
- }
-}
static void
dump_module_variables (gpointer key, gpointer value, gpointer data)
-{
- LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value;
- struct module_opt *cur, *tmp;
+{
+ GList *cur_opt;
+ struct module_opt *cur;
+
+ cur_opt = (GList *)value;
- LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) {
+ while (cur_opt) {
+ cur = cur_opt->data;
if (cur->value) {
printf ("$%s = \"%s\"\n", cur->param, cur->value);
}
+ cur_opt = g_list_next (cur_opt);
}
}
@@ -388,12 +382,56 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path)
return listen_sock;
}
+static void
+fork_delayed (struct rspamd_main *rspamd)
+{
+ GList *cur;
+ struct worker_conf *cf;
+
+ while (workers_pending != NULL) {
+ cur = workers_pending;
+ cf = cur->data;
+
+ workers_pending = g_list_remove_link (workers_pending, cur);
+ fork_worker (rspamd, cf);
+ g_list_free_1 (cur);
+ }
+}
+
+static void
+spawn_workers (struct rspamd_main *rspamd)
+{
+ GList *cur;
+ struct worker_conf *cf;
+ int i, listen_sock;
+
+ cur = cfg->workers;
+
+ while (cur) {
+ cf = cur->data;
+
+ /* Create listen socket */
+ listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port,
+ cf->bind_family, cf->bind_host);
+ if (listen_sock == -1) {
+ exit(-errno);
+ }
+ cf->listen_sock = listen_sock;
+
+ for (i = 0; i < cf->count; i++) {
+ fork_worker (rspamd, cf);
+ }
+
+ cur = g_list_next (cur);
+ }
+}
+
int
main (int argc, char **argv, char **env)
{
struct rspamd_main *rspamd;
struct module_ctx *cur_module = NULL;
- int res = 0, i, listen_sock, lmtp_listen_sock;
+ int res = 0, i;
struct sigaction signals;
struct rspamd_worker *cur, *cur_tmp, *active_worker;
struct rlimit rlim;
@@ -498,23 +536,6 @@ main (int argc, char **argv, char **env)
}
- /* Create listen socket */
- listen_sock = create_listen_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port,
- rspamd->cfg->bind_family, rspamd->cfg->bind_host);
- if (listen_sock == -1) {
- exit(-errno);
- }
-
- if (cfg->lmtp_enable) {
- lmtp_listen_sock = create_listen_socket (&rspamd->cfg->lmtp_addr, rspamd->cfg->lmtp_port,
- rspamd->cfg->lmtp_family, rspamd->cfg->lmtp_host);
- if (listen_sock == -1) {
- exit(-errno);
- }
- }
-
- /* Drop privilleges */
- drop_priv (cfg);
/* Set stack size for pcre */
getrlimit(RLIMIT_STACK, &rlim);
@@ -589,20 +610,7 @@ main (int argc, char **argv, char **env)
modules[i].module_config_func (cfg);
}
- for (i = 0; i < cfg->workers_number; i++) {
- fork_worker (rspamd, listen_sock, TYPE_WORKER);
- }
- /* Start controller if enabled */
- if (cfg->controller_enabled) {
- fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
- }
-
- /* Start lmtp if enabled */
- if (cfg->lmtp_enable) {
- for (i = 0; i < cfg->lmtp_workers_number; i++) {
- fork_worker (rspamd, lmtp_listen_sock, TYPE_LMTP);
- }
- }
+ spawn_workers (rspamd);
/* Signal processing cycle */
for (;;) {
@@ -632,7 +640,7 @@ main (int argc, char **argv, char **env)
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
/* But respawn controller */
if (cur->type == TYPE_CONTROLLER) {
- fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
+ fork_worker (rspamd, cur->cf);
}
}
else {
@@ -646,7 +654,7 @@ main (int argc, char **argv, char **env)
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
}
/* Fork another worker in replace of dead one */
- delay_fork (cur->type);
+ delay_fork (cur->cf);
}
g_free (cur);
}
@@ -662,7 +670,7 @@ main (int argc, char **argv, char **env)
TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) {
/* Start new workers that would reread configuration */
- active_worker = fork_worker (rspamd, listen_sock, cur->type);
+ active_worker = fork_worker (rspamd, cur->cf);
}
/* Immideately send termination request to conroller and wait for SIGCHLD */
if (cur->type == TYPE_CONTROLLER) {
@@ -690,7 +698,7 @@ main (int argc, char **argv, char **env)
}
if (got_alarm) {
got_alarm = 0;
- fork_delayed (rspamd, listen_sock);
+ fork_delayed (rspamd);
}
}
@@ -705,10 +713,6 @@ main (int argc, char **argv, char **env)
msg_info ("main: terminating...");
- if (rspamd->cfg->bind_family == AF_UNIX) {
- unlink (rspamd->cfg->bind_host);
- }
-
free_config (rspamd->cfg);
g_free (rspamd->cfg);
g_free (rspamd);
diff --git a/src/main.h b/src/main.h
index c356823b8..c653964b7 100644
--- a/src/main.h
+++ b/src/main.h
@@ -38,6 +38,7 @@
#endif
#define CRLF "\r\n"
+
/**
* Process type: main or worker
*/
@@ -46,6 +47,7 @@ enum process_type {
TYPE_WORKER,
TYPE_CONTROLLER,
TYPE_LMTP,
+ TYPE_FUZZY,
};
/**
@@ -70,6 +72,7 @@ struct rspamd_worker {
enum process_type type; /**< process type */
struct event sig_ev; /**< signals event */
struct event bind_ev; /**< socket events */
+ struct worker_conf *cf; /**< worker config data */
TAILQ_ENTRY (rspamd_worker) next; /**< chain link to next worker */
};
@@ -118,8 +121,8 @@ struct counter_data {
* Save point object for delayed filters processing
*/
struct save_point {
- void *entry; /**< pointer to C function or perl function name */
- enum script_type type; /**< where we did stop */
+ GList *entry; /**< pointer to saved filter */
+ enum script_type type;
unsigned int saved; /**< how much time we have delayed processing */
};
@@ -216,7 +219,7 @@ struct c_module {
LIST_ENTRY (c_module) next; /**< linked list */
};
-void start_worker (struct rspamd_worker *worker, int listen_sock);
+void start_worker (struct rspamd_worker *worker);
void start_controller (struct rspamd_worker *worker);
/**
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 7095c3041..5a1a9cd27 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -151,7 +151,7 @@ parse_autolearn_param (const char *param, const char *value, struct config_file
int
regexp_module_config (struct config_file *cfg)
{
- LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+ GList *cur_opt = NULL;
struct module_opt *cur;
struct regexp_module_item *cur_item;
char *value;
@@ -172,24 +172,24 @@ regexp_module_config (struct config_file *cfg)
regexp_module_ctx->statfile_prefix = DEFAULT_STATFILE_PREFIX;
}
- cur_module_opt = g_hash_table_lookup (cfg->modules_opts, "regexp");
- if (cur_module_opt != NULL) {
- LIST_FOREACH (cur, cur_module_opt, next) {
- if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) {
- continue;
- }
- else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) {
- parse_autolearn_param (cur->param, cur->value, cfg);
- continue;
- }
- cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item));
- cur_item->symbol = cur->param;
- if (!read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->param, cur->value, cfg)) {
- res = FALSE;
- }
- set_counter (cur_item->symbol, 0);
- regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item);
+ cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp");
+ while (cur_opt) {
+ cur = cur_opt->data;
+ if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) {
+ continue;
+ }
+ else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) {
+ parse_autolearn_param (cur->param, cur->value, cfg);
+ continue;
+ }
+ cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item));
+ cur_item->symbol = cur->param;
+ if (!read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->param, cur->value, cfg)) {
+ res = FALSE;
}
+ set_counter (cur_item->symbol, 0);
+ regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item);
+ cur_opt = g_list_next (cur_opt);
}
return res;
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 020bf7764..81acb1f7b 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -82,7 +82,7 @@ int
surbl_module_config (struct config_file *cfg)
{
struct hostent *hent;
- LIST_HEAD (moduleoptq, module_opt) *opt = NULL;
+ GList *cur_opt;
struct module_opt *cur;
struct suffix_item *new_suffix;
struct surbl_bit_item *new_bit;
@@ -160,35 +160,35 @@ surbl_module_config (struct config_file *cfg)
}
}
- opt = g_hash_table_lookup (cfg->modules_opts, "surbl");
- if (opt) {
- LIST_FOREACH (cur, opt, next) {
- if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) {
- if ((str = strchr (cur->param, '_')) != NULL) {
- new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item));
- *str = '\0';
- new_suffix->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, str + 1);
- new_suffix->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value);
- msg_debug ("surbl_module_config: add new surbl suffix: %s with symbol: %s",
- new_suffix->suffix, new_suffix->symbol);
- *str = '_';
- surbl_module_ctx->suffixes = g_list_prepend (surbl_module_ctx->suffixes, new_suffix);
- }
+ cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl");
+ while (cur_opt) {
+ cur = cur_opt->data;
+ if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) {
+ if ((str = strchr (cur->param, '_')) != NULL) {
+ new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item));
+ *str = '\0';
+ new_suffix->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, str + 1);
+ new_suffix->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value);
+ msg_debug ("surbl_module_config: add new surbl suffix: %s with symbol: %s",
+ new_suffix->suffix, new_suffix->symbol);
+ *str = '_';
+ surbl_module_ctx->suffixes = g_list_prepend (surbl_module_ctx->suffixes, new_suffix);
}
- if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) {
- if ((str = strchr (cur->param, '_')) != NULL) {
- bit = strtoul (str + 1, NULL, 10);
- if (bit != 0) {
- new_bit = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct surbl_bit_item));
- new_bit->bit = bit;
- new_bit->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value);
- msg_debug ("surbl_module_config: add new bit suffix: %d with symbol: %s",
- (int)new_bit->bit, new_bit->symbol);
- surbl_module_ctx->bits = g_list_prepend (surbl_module_ctx->bits, new_bit);
- }
+ }
+ if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) {
+ if ((str = strchr (cur->param, '_')) != NULL) {
+ bit = strtoul (str + 1, NULL, 10);
+ if (bit != 0) {
+ new_bit = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct surbl_bit_item));
+ new_bit->bit = bit;
+ new_bit->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, cur->value);
+ msg_debug ("surbl_module_config: add new bit suffix: %d with symbol: %s",
+ (int)new_bit->bit, new_bit->symbol);
+ surbl_module_ctx->bits = g_list_prepend (surbl_module_ctx->bits, new_bit);
}
}
}
+ cur_opt = g_list_next (cur_opt);
}
/* Add default suffix */
if (surbl_module_ctx->suffixes == NULL) {
diff --git a/src/worker.c b/src/worker.c
index 2da9383e9..8a9dd9d3a 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -316,7 +316,7 @@ accept_socket (int fd, short what, void *arg)
* Start worker process
*/
void
-start_worker (struct rspamd_worker *worker, int listen_sock)
+start_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
@@ -350,7 +350,6 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
#endif
worker->srv->pid = getpid ();
- worker->srv->type = TYPE_WORKER;
event_init ();
evdns_init ();
@@ -363,7 +362,7 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
signal_add (&worker->sig_ev, NULL);
/* Accept event */
- event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);
/* Send SIGUSR2 to parent */