From 2aa9c74f1c449da92f6faf870f8cc801a83bb08b Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 1 Nov 2008 18:01:05 +0300 Subject: * Reorganize structure of source files * Adopt build system for new structure --HG-- rename : cfg_file.h => src/cfg_file.h rename : cfg_file.l => src/cfg_file.l rename : cfg_file.y => src/cfg_file.y rename : cfg_utils.c => src/cfg_utils.c rename : controller.c => src/controller.c rename : filter.c => src/filter.c rename : filter.h => src/filter.h rename : fstring.c => src/fstring.c rename : fstring.h => src/fstring.h rename : main.c => src/main.c rename : main.h => src/main.h rename : mem_pool.c => src/mem_pool.c rename : mem_pool.h => src/mem_pool.h rename : memcached-test.c => src/memcached-test.c rename : memcached.c => src/memcached.c rename : memcached.h => src/memcached.h rename : perl.c => src/perl.c rename : perl.h => src/perl.h rename : plugins/regexp.c => src/plugins/regexp.c rename : plugins/surbl.c => src/plugins/surbl.c rename : protocol.c => src/protocol.c rename : protocol.h => src/protocol.h rename : upstream.c => src/upstream.c rename : upstream.h => src/upstream.h rename : url.c => src/url.c rename : url.h => src/url.h rename : util.c => src/util.c rename : util.h => src/util.h rename : worker.c => src/worker.c --- src/cfg_file.h | 167 ++++++++++ src/cfg_file.l | 134 ++++++++ src/cfg_file.y | 545 +++++++++++++++++++++++++++++++ src/cfg_utils.c | 563 ++++++++++++++++++++++++++++++++ src/controller.c | 349 ++++++++++++++++++++ src/filter.c | 336 +++++++++++++++++++ src/filter.h | 43 +++ src/fstring.c | 234 ++++++++++++++ src/fstring.h | 86 +++++ src/main.c | 427 +++++++++++++++++++++++++ src/main.h | 201 ++++++++++++ src/mem_pool.c | 360 +++++++++++++++++++++ src/mem_pool.h | 61 ++++ src/memcached-test.c | 79 +++++ src/memcached.c | 792 +++++++++++++++++++++++++++++++++++++++++++++ src/memcached.h | 142 +++++++++ src/perl.c | 190 +++++++++++ src/perl.h | 19 ++ src/plugins/regexp.c | 247 ++++++++++++++ src/plugins/surbl.c | 593 ++++++++++++++++++++++++++++++++++ src/protocol.c | 492 ++++++++++++++++++++++++++++ src/protocol.h | 31 ++ src/upstream.c | 521 ++++++++++++++++++++++++++++++ src/upstream.h | 43 +++ src/url.c | 886 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/url.h | 88 +++++ src/util.c | 834 ++++++++++++++++++++++++++++++++++++++++++++++++ src/util.h | 60 ++++ src/worker.c | 364 +++++++++++++++++++++ 29 files changed, 8887 insertions(+) create mode 100644 src/cfg_file.h create mode 100644 src/cfg_file.l create mode 100644 src/cfg_file.y create mode 100644 src/cfg_utils.c create mode 100644 src/controller.c create mode 100644 src/filter.c create mode 100644 src/filter.h create mode 100644 src/fstring.c create mode 100644 src/fstring.h create mode 100644 src/main.c create mode 100644 src/main.h create mode 100644 src/mem_pool.c create mode 100644 src/mem_pool.h create mode 100644 src/memcached-test.c create mode 100644 src/memcached.c create mode 100644 src/memcached.h create mode 100644 src/perl.c create mode 100644 src/perl.h create mode 100644 src/plugins/regexp.c create mode 100644 src/plugins/surbl.c create mode 100644 src/protocol.c create mode 100644 src/protocol.h create mode 100644 src/upstream.c create mode 100644 src/upstream.h create mode 100644 src/url.c create mode 100644 src/url.h create mode 100644 src/util.c create mode 100644 src/util.h create mode 100644 src/worker.c (limited to 'src') diff --git a/src/cfg_file.h b/src/cfg_file.h new file mode 100644 index 000000000..1eb71476d --- /dev/null +++ b/src/cfg_file.h @@ -0,0 +1,167 @@ +/* + * $Id$ + */ + + +#ifndef CFG_FILE_H +#define CFG_FILE_H + +#include "config.h" +#include +#ifndef HAVE_OWN_QUEUE_H +#include +#else +#include "queue.h" +#endif +#include +#include +#include +#include +#include "mem_pool.h" +#include "upstream.h" +#include "memcached.h" +#include "filter.h" + +#define DEFAULT_BIND_PORT 768 +#define DEFAULT_CONTROL_PORT 7608 +#define MAX_MEMCACHED_SERVERS 48 +#define DEFAULT_MEMCACHED_PORT 11211 +/* Memcached timeouts */ +#define DEFAULT_MEMCACHED_CONNECT_TIMEOUT 1000 +/* Upstream timeouts */ +#define DEFAULT_UPSTREAM_ERROR_TIME 10 +#define DEFAULT_UPSTREAM_ERROR_TIME 10 +#define DEFAULT_UPSTREAM_DEAD_TIME 300 +#define DEFAULT_UPSTREAM_MAXERRORS 10 + +/* 1 worker by default */ +#define DEFAULT_WORKERS_NUM 1 + +#define yyerror(fmt, ...) \ + fprintf (stderr, "Config file parse error!\non line: %d\n", yylineno); \ + fprintf (stderr, "while reading text: %s\nreason: ", yytext); \ + fprintf (stderr, fmt, ##__VA_ARGS__); \ + fprintf (stderr, "\n") +#define yywarn(fmt, ...) \ + fprintf (stderr, "Config file parse warning!\non line %d\n", yylineno); \ + fprintf (stderr, "while reading text: %s\nreason: ", yytext); \ + fprintf (stderr, fmt, ##__VA_ARGS__); \ + fprintf (stderr, "\n") + +struct expression; + +enum { VAL_UNDEF=0, VAL_TRUE, VAL_FALSE }; + +enum rspamd_regexp_type { + REGEXP_NONE = 0, + REGEXP_HEADER, + REGEXP_MIME, + REGEXP_MESSAGE, + REGEXP_URL, +}; + +enum rspamd_log_type { + RSPAMD_LOG_CONSOLE, + RSPAMD_LOG_SYSLOG, + RSPAMD_LOG_FILE, +}; + +struct rspamd_regexp { + enum rspamd_regexp_type type; + char *regexp_text; + GRegex *regexp; + char *header; +}; + +struct memcached_server { + struct upstream up; + struct in_addr addr; + uint16_t port; + short alive; + short int num; +}; + +struct perl_module { + char *path; + LIST_ENTRY (perl_module) next; +}; + +struct module_opt { + char *param; + char *value; + LIST_ENTRY (module_opt) next; +}; + +struct config_file { + memory_pool_t *cfg_pool; + char *cfg_name; + char *pid_file; + char *temp_dir; + + char *bind_host; + struct in_addr bind_addr; + uint16_t bind_port; + uint16_t bind_family; + + char *control_host; + struct in_addr control_addr; + uint16_t control_port; + uint16_t control_family; + int controller_enabled; + char *control_password; + + int no_fork; + unsigned int workers_number; + + enum rspamd_log_type log_type; + int log_facility; + int log_level; + char *log_file; + int log_fd; + + struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS]; + size_t memcached_servers_num; + memc_proto_t memcached_protocol; + unsigned int memcached_error_time; + unsigned int memcached_dead_time; + unsigned int memcached_maxerrors; + unsigned int memcached_connect_timeout; + + LIST_HEAD (modulesq, perl_module) perl_modules; + LIST_HEAD (headersq, filter) header_filters; + LIST_HEAD (mimesq, filter) mime_filters; + LIST_HEAD (messagesq, filter) message_filters; + LIST_HEAD (urlsq, filter) url_filters; + char *header_filters_str; + char *mime_filters_str; + char *message_filters_str; + char *url_filters_str; + GHashTable* modules_opts; + GHashTable* variables; + GHashTable* metrics; + GHashTable* factors; + GHashTable* c_modules; + GHashTable* composite_symbols; +}; + +int add_memcached_server (struct config_file *cf, char *str); +int parse_bind_line (struct config_file *cf, char *str, char is_control); +void init_defaults (struct config_file *cfg); +void free_config (struct config_file *cfg); +char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name); +size_t parse_limit (const char *limit); +unsigned int parse_seconds (const char *t); +char parse_flag (const char *str); +char* substitute_variable (struct config_file *cfg, char *str, u_char recursive); +void post_load_config (struct config_file *cfg); +struct rspamd_regexp* parse_regexp (memory_pool_t *pool, char *line); +struct expression* parse_expression (memory_pool_t *pool, char *line); + +int yylex (void); +int yyparse (void); +void yyrestart (FILE *); + +#endif /* ifdef CFG_FILE_H */ +/* + * vi:ts=4 + */ diff --git a/src/cfg_file.l b/src/cfg_file.l new file mode 100644 index 000000000..fefd11237 --- /dev/null +++ b/src/cfg_file.l @@ -0,0 +1,134 @@ +%x incl +%x module + +%{ +#include +#include +#include +#include +#include +#include "cfg_file.h" +#include "cfg_yacc.h" + +#define MAX_INCLUDE_DEPTH 10 +YY_BUFFER_STATE include_stack[MAX_INCLUDE_DEPTH]; +int include_stack_ptr = 0; +extern struct config_file *cfg; + +%} + +%option noyywrap +%option yylineno + +%% +^[ \t]*#.* /* ignore comments */; +.include BEGIN(incl); +.module BEGIN(module); +composites return COMPOSITES; +tempdir return TEMPDIR; +pidfile return PIDFILE; +workers return WORKERS; +error_time return ERROR_TIME; +dead_time return DEAD_TIME; +maxerrors return MAXERRORS; +reconnect_timeout return RECONNECT_TIMEOUT; +connect_timeout return CONNECT_TIMEOUT; +protocol return PROTOCOL; +memcached return MEMCACHED; +bind_socket return BINDSOCK; +servers return SERVERS; +require return REQUIRE; +header_filters return HEADER_FILTERS; +mime_filters return MIME_FILTERS; +message_filters return MESSAGE_FILTERS; +url_filters return URL_FILTERS; +factors return FACTORS; +metric return METRIC; +name return NAME; +required_score return REQUIRED_SCORE; +function return FUNCTION; +control return CONTROL; +password return PASSWORD; +logging return LOGGING; + +log_type return LOG_TYPE; +console return LOG_TYPE_CONSOLE; +syslog return LOG_TYPE_SYSLOG; +file return LOG_TYPE_FILE; + +log_level return LOG_LEVEL; +DEBUG return LOG_LEVEL_DEBUG; +INFO return LOG_LEVEL_INFO; +WARNING return LOG_LEVEL_WARNING; +ERROR return LOG_LEVEL_ERROR; +log_facility return LOG_FACILITY; +log_file return LOG_FILENAME; + +\{ return OBRACE; +\} return EBRACE; +; return SEMICOLON; +, return COMMA; += return EQSIGN; +yes|YES|no|NO|[yY]|[nN] yylval.flag=parse_flag(yytext); return FLAG; +\n /* ignore EOL */; +[ \t]+ /* ignore whitespace */; +\"[^"]+\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; return QUOTEDSTRING; +\" return QUOTE; +\$[a-zA-Z_][a-zA-Z0-9_]+ yylval.string=strdup(yytext + 1); return VARIABLE; +[0-9]+ yylval.number=strtol(yytext, NULL, 10); return NUMBER; +-?[0-9]+\.?[0-9]* yylval.fract=strtod(yytext, NULL); return FRACT; +[0-9]+[kKmMgG]? yylval.limit=parse_limit(yytext); return SIZELIMIT; +[0-9]+[sS]|[0-9]+[mM][sS] yylval.seconds=parse_seconds(yytext); return SECONDS; +[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3} yylval.string=strdup(yytext); return IPADDR; +[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/[0-9]{1,2} yylval.string=strdup(yytext); return IPNETWORK; +[a-zA-Z0-9.-]+:[0-9]{1,5} yylval.string=strdup(yytext); return HOSTPORT; +[a-zA-Z<][a-zA-Z@+>_-]* yylval.string=strdup(yytext); return STRING; +\/[^/\n]+\/ yylval.string=strdup(yytext); return REGEXP; +[a-zA-Z0-9].[a-zA-Z0-9\/.-]+ yylval.string=strdup(yytext); return DOMAIN; +[ \t]* /* eat the whitespace */ +[^ \t\n]+ { /* got the include file name */ + if (include_stack_ptr >= MAX_INCLUDE_DEPTH) { + yyerror ("yylex: includes nested too deeply"); + return -1; + } + + include_stack[include_stack_ptr++] = + YY_CURRENT_BUFFER; + + yyin = fopen (yytext, "r"); + + if (! yyin) { + yyerror ("yylex: cannot open include file"); + return -1; + } + + yy_switch_to_buffer (yy_create_buffer (yyin, YY_BUF_SIZE)); + + BEGIN(INITIAL); + } + +<> { + if ( --include_stack_ptr < 0 ) { + post_load_config (cfg); + yyterminate (); + } + else { + yy_delete_buffer (YY_CURRENT_BUFFER); + yy_switch_to_buffer (include_stack[include_stack_ptr]); + } + } + +\n /* ignore EOL */; +[ \t]+ /* ignore whitespace */; +\'[a-zA-Z0-9_-]\' yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; return MODULE_OPT; +\{ return OBRACE; +\} return EBRACE; +\; return SEMICOLON; +[a-zA-Z0-9_-] yylval.string=strdup(yytext); return PARAM; +\$[a-zA-Z_][a-zA-Z0-9_]+ yylval.string=strdup(yytext + 1); return VARIABLE; +\"[^"]+\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; return QUOTEDSTRING; + +%% +/* + * vi:ts=4 + */ diff --git a/src/cfg_file.y b/src/cfg_file.y new file mode 100644 index 000000000..dbbdd4e63 --- /dev/null +++ b/src/cfg_file.y @@ -0,0 +1,545 @@ +/* $Id$ */ + +%{ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cfg_file.h" +#include "main.h" + +#define YYDEBUG 1 + +extern struct config_file *cfg; +extern int yylineno; +extern char *yytext; + +LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL; +struct metric *cur_metric = NULL; + +%} + +%union +{ + char *string; + size_t limit; + char flag; + unsigned int seconds; + unsigned int number; + double fract; +} + +%token ERROR STRING QUOTEDSTRING FLAG +%token FILENAME REGEXP QUOTE SEMICOLON OBRACE EBRACE COMMA EQSIGN +%token BINDSOCK SOCKCRED DOMAIN IPADDR IPNETWORK HOSTPORT NUMBER CHECK_TIMEOUT +%token MAXSIZE SIZELIMIT SECONDS BEANSTALK MYSQL USER PASSWORD DATABASE +%token TEMPDIR PIDFILE SERVERS ERROR_TIME DEAD_TIME MAXERRORS CONNECT_TIMEOUT PROTOCOL RECONNECT_TIMEOUT +%token READ_SERVERS WRITE_SERVER DIRECTORY_SERVERS MAILBOX_QUERY USERS_QUERY LASTLOGIN_QUERY +%token MEMCACHED WORKERS REQUIRE MODULE +%token MODULE_OPT PARAM VARIABLE +%token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME +%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD +%token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE +%token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME + +%type STRING +%type VARIABLE +%type QUOTEDSTRING MODULE_OPT PARAM +%type FILENAME +%type SOCKCRED +%type IPADDR IPNETWORK +%type HOSTPORT +%type DOMAIN +%type SIZELIMIT +%type FLAG +%type SECONDS +%type NUMBER +%type memcached_hosts bind_cred +%type FRACT +%% + +file : /* empty */ + | file command SEMICOLON { } + ; + +command : + bindsock + | control + | tempdir + | pidfile + | memcached + | workers + | require + | header_filters + | mime_filters + | message_filters + | url_filters + | module_opt + | variable + | factors + | metric + | composites + | logging + ; + +tempdir : + TEMPDIR EQSIGN QUOTEDSTRING { + struct stat st; + + if (stat ($3, &st) == -1) { + yyerror ("yyparse: cannot stat directory \"%s\": %s", $3, strerror (errno)); + YYERROR; + } + if (!S_ISDIR (st.st_mode)) { + yyerror ("yyparse: \"%s\" is not a directory", $3); + YYERROR; + } + cfg->temp_dir = memory_pool_strdup (cfg->cfg_pool, $3); + free ($3); + } + ; + +pidfile : + PIDFILE EQSIGN QUOTEDSTRING { + cfg->pid_file = $3; + } + ; + +control: + CONTROL OBRACE controlbody EBRACE + ; + +controlbody: + controlcmd SEMICOLON + | controlbody controlcmd SEMICOLON + ; + +controlcmd: + controlsock + | controlpassword + ; + +controlsock: + BINDSOCK EQSIGN bind_cred { + if (!parse_bind_line (cfg, $3, 1)) { + yyerror ("yyparse: parse_bind_line"); + YYERROR; + } + cfg->controller_enabled = 1; + free ($3); + } + ; +controlpassword: + PASSWORD EQSIGN QUOTEDSTRING { + cfg->control_password = memory_pool_strdup (cfg->cfg_pool, $3); + } + ; + +bindsock: + BINDSOCK EQSIGN bind_cred { + if (!parse_bind_line (cfg, $3, 0)) { + yyerror ("yyparse: parse_bind_line"); + YYERROR; + } + free ($3); + } + ; + +bind_cred: + STRING { + $$ = $1; + } + | IPADDR{ + $$ = $1; + } + | DOMAIN { + $$ = $1; + } + | HOSTPORT { + $$ = $1; + } + | QUOTEDSTRING { + $$ = $1; + } + ; + +header_filters: + HEADER_FILTERS EQSIGN QUOTEDSTRING { + cfg->header_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); + free ($3); + } + ; + +mime_filters: + MIME_FILTERS EQSIGN QUOTEDSTRING { + cfg->mime_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); + free ($3); + } + ; + +message_filters: + MESSAGE_FILTERS EQSIGN QUOTEDSTRING { + cfg->message_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); + free ($3); + } + ; + +url_filters: + URL_FILTERS EQSIGN QUOTEDSTRING { + cfg->url_filters_str = memory_pool_strdup (cfg->cfg_pool, $3); + free ($3); + } + ; + +memcached: + MEMCACHED OBRACE memcachedbody EBRACE + ; + +memcachedbody: + memcachedcmd SEMICOLON + | memcachedbody memcachedcmd SEMICOLON + ; + +memcachedcmd: + memcached_servers + | memcached_connect_timeout + | memcached_error_time + | memcached_dead_time + | memcached_maxerrors + | memcached_protocol + ; + +memcached_servers: + SERVERS EQSIGN memcached_server + ; + +memcached_server: + memcached_params + | memcached_server COMMA memcached_params + ; + +memcached_params: + memcached_hosts { + if (!add_memcached_server (cfg, $1)) { + yyerror ("yyparse: add_memcached_server"); + YYERROR; + } + free ($1); + } + ; +memcached_hosts: + STRING + | IPADDR + | DOMAIN + | HOSTPORT + ; +memcached_error_time: + ERROR_TIME EQSIGN NUMBER { + cfg->memcached_error_time = $3; + } + ; +memcached_dead_time: + DEAD_TIME EQSIGN NUMBER { + cfg->memcached_dead_time = $3; + } + ; +memcached_maxerrors: + MAXERRORS EQSIGN NUMBER { + cfg->memcached_maxerrors = $3; + } + ; +memcached_connect_timeout: + CONNECT_TIMEOUT EQSIGN SECONDS { + cfg->memcached_connect_timeout = $3; + } + ; + +memcached_protocol: + PROTOCOL EQSIGN STRING { + if (strncasecmp ($3, "udp", sizeof ("udp") - 1) == 0) { + cfg->memcached_protocol = UDP_TEXT; + } + else if (strncasecmp ($3, "tcp", sizeof ("tcp") - 1) == 0) { + cfg->memcached_protocol = TCP_TEXT; + } + else { + yyerror ("yyparse: cannot recognize protocol: %s", $3); + YYERROR; + } + } + ; +workers: + WORKERS EQSIGN NUMBER { + cfg->workers_number = $3; + } + ; + +metric: + METRIC OBRACE metricbody EBRACE { + if (cur_metric == NULL || cur_metric->name == NULL) { + yyerror ("yyparse: not enough arguments in metric definition"); + YYERROR; + } + g_hash_table_insert (cfg->metrics, cur_metric->name, cur_metric); + cur_metric = NULL; + } + ; + +metricbody: + | metriccmd SEMICOLON + | metricbody metriccmd SEMICOLON + ; +metriccmd: + | metricname + | metricfunction + | metricscore + ; + +metricname: + NAME EQSIGN QUOTEDSTRING { + if (cur_metric == NULL) { + cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric)); + } + cur_metric->name = memory_pool_strdup (cfg->cfg_pool, $3); + } + ; + +metricfunction: + FUNCTION EQSIGN QUOTEDSTRING { + if (cur_metric == NULL) { + cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric)); + } + cur_metric->func_name = memory_pool_strdup (cfg->cfg_pool, $3); + } + ; + +metricscore: + REQUIRED_SCORE EQSIGN NUMBER { + if (cur_metric == NULL) { + cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric)); + } + cur_metric->required_score = $3; + } + | REQUIRED_SCORE EQSIGN FRACT { + if (cur_metric == NULL) { + cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric)); + } + cur_metric->required_score = $3; + } + ; + +factors: + FACTORS OBRACE factorsbody EBRACE + ; + +factorsbody: + factorparam SEMICOLON + | factorsbody factorparam SEMICOLON + ; + +factorparam: + QUOTEDSTRING EQSIGN FRACT { + double *tmp = memory_pool_alloc (cfg->cfg_pool, sizeof (double)); + *tmp = $3; + g_hash_table_insert (cfg->factors, $1, tmp); + } + | QUOTEDSTRING EQSIGN NUMBER { + double *tmp = memory_pool_alloc (cfg->cfg_pool, sizeof (double)); + *tmp = $3; + g_hash_table_insert (cfg->factors, $1, tmp); + }; + +require: + REQUIRE OBRACE requirebody EBRACE + ; + +requirebody: + requirecmd SEMICOLON + | requirebody requirecmd SEMICOLON + ; + +requirecmd: + MODULE EQSIGN QUOTEDSTRING { + struct stat st; + struct perl_module *cur; + if (stat ($3, &st) == -1) { + yyerror ("yyparse: cannot stat file %s, %m", $3); + YYERROR; + } + cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct perl_module)); + if (cur == NULL) { + yyerror ("yyparse: g_malloc: %s", strerror(errno)); + YYERROR; + } + cur->path = $3; + LIST_INSERT_HEAD (&cfg->perl_modules, cur, next); + } + ; + +composites: + COMPOSITES OBRACE compositesbody EBRACE + ; + +compositesbody: + compositescmd SEMICOLON + | compositesbody compositescmd SEMICOLON + ; + +compositescmd: + QUOTEDSTRING EQSIGN QUOTEDSTRING { + struct expression *expr; + if ((expr = parse_expression (cfg->cfg_pool, $3)) == NULL) { + yyerror ("yyparse: cannot parse composite expression: %s", $3); + YYERROR; + } + g_hash_table_insert (cfg->composite_symbols, $1, expr); + } + ; +module_opt: + MODULE_OPT OBRACE moduleoptbody EBRACE { + g_hash_table_insert (cfg->modules_opts, $1, cur_module_opt); + cur_module_opt = NULL; + } + ; + +moduleoptbody: + optcmd SEMICOLON + | moduleoptbody optcmd SEMICOLON + ; + +optcmd: + PARAM EQSIGN QUOTEDSTRING { + struct module_opt *mopt; + if (cur_module_opt == NULL) { + cur_module_opt = g_malloc (sizeof (cur_module_opt)); + LIST_INIT (cur_module_opt); + } + mopt = memory_pool_alloc (cfg->cfg_pool, sizeof (struct module_opt)); + mopt->param = $1; + mopt->value = $3; + LIST_INSERT_HEAD (cur_module_opt, mopt, next); + } + ; + +variable: + VARIABLE EQSIGN QUOTEDSTRING { + g_hash_table_insert (cfg->variables, $1, $3); + } + ; + +logging: + LOGGING OBRACE loggingbody EBRACE + ; + +loggingbody: + loggingcmd SEMICOLON + | loggingbody loggingcmd SEMICOLON + ; + +loggingcmd: + loggingtype + | logginglevel + | loggingfacility + | loggingfile + ; + +loggingtype: + LOG_TYPE EQSIGN LOG_TYPE_CONSOLE { + cfg->log_type = RSPAMD_LOG_CONSOLE; + } + LOG_TYPE EQSIGN LOG_TYPE_SYSLOG { + cfg->log_type = RSPAMD_LOG_SYSLOG; + } + LOG_TYPE EQSIGN LOG_TYPE_FILE { + cfg->log_type = RSPAMD_LOG_FILE; + } + ; + +logginglevel: + LOG_LEVEL EQSIGN LOG_LEVEL_DEBUG { + cfg->log_level = G_LOG_LEVEL_DEBUG; + } + LOG_LEVEL EQSIGN LOG_LEVEL_INFO { + cfg->log_level = G_LOG_LEVEL_INFO | G_LOG_LEVEL_MESSAGE; + } + LOG_LEVEL EQSIGN LOG_LEVEL_WARNING { + cfg->log_level = G_LOG_LEVEL_WARNING; + } + LOG_LEVEL EQSIGN LOG_LEVEL_ERROR { + cfg->log_level = G_LOG_LEVEL_ERROR | G_LOG_LEVEL_CRITICAL; + } + ; + +loggingfacility: + LOG_FACILITY EQSIGN QUOTEDSTRING { + if (strncasecmp ($3, "LOG_AUTH", sizeof ("LOG_AUTH") - 1) == 0) { + cfg->log_facility = LOG_AUTH; + } + else if (strncasecmp ($3, "LOG_CRON", sizeof ("LOG_CRON") - 1) == 0) { + cfg->log_facility = LOG_CRON; + } + else if (strncasecmp ($3, "LOG_DAEMON", sizeof ("LOG_DAEMON") - 1) == 0) { + cfg->log_facility = LOG_DAEMON; + } + else if (strncasecmp ($3, "LOG_MAIL", sizeof ("LOG_MAIL") - 1) == 0) { + cfg->log_facility = LOG_MAIL; + } + else if (strncasecmp ($3, "LOG_USER", sizeof ("LOG_USER") - 1) == 0) { + cfg->log_facility = LOG_USER; + } + else if (strncasecmp ($3, "LOG_LOCAL0", sizeof ("LOG_LOCAL0") - 1) == 0) { + cfg->log_facility = LOG_LOCAL0; + } + else if (strncasecmp ($3, "LOG_LOCAL1", sizeof ("LOG_LOCAL1") - 1) == 0) { + cfg->log_facility = LOG_LOCAL1; + } + else if (strncasecmp ($3, "LOG_LOCAL2", sizeof ("LOG_LOCAL2") - 1) == 0) { + cfg->log_facility = LOG_LOCAL2; + } + else if (strncasecmp ($3, "LOG_LOCAL3", sizeof ("LOG_LOCAL3") - 1) == 0) { + cfg->log_facility = LOG_LOCAL3; + } + else if (strncasecmp ($3, "LOG_LOCAL4", sizeof ("LOG_LOCAL4") - 1) == 0) { + cfg->log_facility = LOG_LOCAL4; + } + else if (strncasecmp ($3, "LOG_LOCAL5", sizeof ("LOG_LOCAL5") - 1) == 0) { + cfg->log_facility = LOG_LOCAL5; + } + else if (strncasecmp ($3, "LOG_LOCAL6", sizeof ("LOG_LOCAL6") - 1) == 0) { + cfg->log_facility = LOG_LOCAL6; + } + else if (strncasecmp ($3, "LOG_LOCAL7", sizeof ("LOG_LOCAL7") - 1) == 0) { + cfg->log_facility = LOG_LOCAL7; + } + else { + yyerror ("yyparse: invalid logging facility: %s", $3); + YYERROR; + } + + free ($3); + } + ; + +loggingfile: + LOG_FILENAME EQSIGN QUOTEDSTRING { + cfg->log_file = memory_pool_strdup (cfg->cfg_pool, $3); + + free ($3); + } + ; + +%% +/* + * vi:ts=4 + */ diff --git a/src/cfg_utils.c b/src/cfg_utils.c new file mode 100644 index 000000000..9fa7aac47 --- /dev/null +++ b/src/cfg_utils.c @@ -0,0 +1,563 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "cfg_file.h" +#include "main.h" +#ifndef HAVE_OWN_QUEUE_H +#include +#else +#include "queue.h" +#endif + +extern int yylineno; +extern char *yytext; + +int +add_memcached_server (struct config_file *cf, char *str) +{ + char *cur_tok, *err_str; + struct memcached_server *mc; + struct hostent *hent; + uint16_t port; + + if (str == NULL) return 0; + + cur_tok = strsep (&str, ":"); + + if (cur_tok == NULL || *cur_tok == '\0') return 0; + + if(cf->memcached_servers_num == MAX_MEMCACHED_SERVERS) { + yywarn ("yyparse: maximum number of memcached servers is reached %d", MAX_MEMCACHED_SERVERS); + } + + mc = &cf->memcached_servers[cf->memcached_servers_num]; + if (mc == NULL) return 0; + /* cur_tok - server name, str - server port */ + if (str == NULL) { + port = DEFAULT_MEMCACHED_PORT; + } + else { + port = (uint16_t)strtoul (str, &err_str, 10); + if (*err_str != '\0') { + return 0; + } + } + + if (!inet_aton (cur_tok, &mc->addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent == NULL) { + return 0; + } + else { + memcpy((char *)&mc->addr, hent->h_addr, sizeof(struct in_addr)); + } + } + mc->port = port; + cf->memcached_servers_num++; + return 1; +} + +int +parse_bind_line (struct config_file *cf, char *str, char is_control) +{ + char *cur_tok, *err_str; + struct hostent *hent; + size_t s; + + if (str == NULL) return 0; + cur_tok = strsep (&str, ":"); + + if (cur_tok[0] == '/' || cur_tok[0] == '.') { + if (is_control) { + cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + cf->control_family = AF_UNIX; + } + else { + cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + cf->bind_family = AF_UNIX; + } + return 1; + + } else { + if (str == '\0') { + if (is_control) { + cf->control_port = DEFAULT_CONTROL_PORT; + } + else { + cf->bind_port = DEFAULT_BIND_PORT; + } + } + else { + if (is_control) { + cf->control_port = (uint16_t)strtoul (str, &err_str, 10); + } + else { + cf->bind_port = (uint16_t)strtoul (str, &err_str, 10); + } + if (*err_str != '\0') { + return 0; + } + } + + if (is_control) { + if (!inet_aton (cur_tok, &cf->control_addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent == NULL) { + return 0; + } + else { + cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr)); + s = strlen (cur_tok) + 1; + } + } + + cf->control_family = AF_INET; + } + else { + if (!inet_aton (cur_tok, &cf->bind_addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent == NULL) { + return 0; + } + else { + cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr)); + s = strlen (cur_tok) + 1; + } + } + + cf->bind_family = AF_INET; + } + + return 1; + } + + return 0; +} + +void +init_defaults (struct config_file *cfg) +{ + cfg->memcached_error_time = DEFAULT_UPSTREAM_ERROR_TIME; + cfg->memcached_dead_time = DEFAULT_UPSTREAM_DEAD_TIME; + cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS; + cfg->memcached_protocol = TCP_TEXT; + + cfg->workers_number = DEFAULT_WORKERS_NUM; + cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal); + cfg->variables = g_hash_table_new (g_str_hash, g_str_equal); + cfg->metrics = g_hash_table_new (g_str_hash, g_str_equal); + cfg->factors = g_hash_table_new (g_str_hash, g_str_equal); + cfg->c_modules = g_hash_table_new (g_str_hash, g_str_equal); + cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal); + + LIST_INIT (&cfg->perl_modules); +} + +void +free_config (struct config_file *cfg) +{ + g_hash_table_remove_all (cfg->modules_opts); + g_hash_table_unref (cfg->modules_opts); + g_hash_table_remove_all (cfg->variables); + g_hash_table_unref (cfg->variables); + g_hash_table_remove_all (cfg->metrics); + g_hash_table_unref (cfg->metrics); + g_hash_table_remove_all (cfg->factors); + g_hash_table_unref (cfg->factors); + g_hash_table_remove_all (cfg->c_modules); + g_hash_table_unref (cfg->c_modules); + g_hash_table_remove_all (cfg->composite_symbols); + g_hash_table_unref (cfg->composite_symbols); + memory_pool_delete (cfg->cfg_pool); +} + +char* +get_module_opt (struct config_file *cfg, char *module_name, char *opt_name) +{ + LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL; + struct module_opt *cur; + + cur_module_opt = g_hash_table_lookup (cfg->modules_opts, module_name); + if (cur_module_opt == NULL) { + return NULL; + } + + LIST_FOREACH (cur, cur_module_opt, next) { + if (strcmp (cur->param, opt_name) == 0) { + return cur->value; + } + } + + return NULL; +} + +size_t +parse_limit (const char *limit) +{ + size_t result = 0; + char *err_str; + + if (!limit || *limit == '\0') return 0; + + result = strtoul (limit, &err_str, 10); + + if (*err_str != '\0') { + /* Megabytes */ + if (*err_str == 'm' || *err_str == 'M') { + result *= 1048576L; + } + /* Kilobytes */ + else if (*err_str == 'k' || *err_str == 'K') { + result *= 1024; + } + /* Gigabytes */ + else if (*err_str == 'g' || *err_str == 'G') { + result *= 1073741824L; + } + } + + return result; +} + +unsigned int +parse_seconds (const char *t) +{ + unsigned int result = 0; + char *err_str; + + if (!t || *t == '\0') return 0; + + result = strtoul (t, &err_str, 10); + + if (*err_str != '\0') { + /* Seconds */ + if (*err_str == 's' || *err_str == 'S') { + result *= 1000; + } + } + + return result; +} + +char +parse_flag (const char *str) +{ + if (!str || !*str) return -1; + + if ((*str == 'Y' || *str == 'y') && *(str + 1) == '\0') { + return 1; + } + + if ((*str == 'Y' || *str == 'y') && + (*(str + 1) == 'E' || *(str + 1) == 'e') && + (*(str + 2) == 'S' || *(str + 2) == 's') && + *(str + 3) == '\0') { + return 1; + } + + if ((*str == 'N' || *str == 'n') && *(str + 1) == '\0') { + return 0; + } + + if ((*str == 'N' || *str == 'n') && + (*(str + 1) == 'O' || *(str + 1) == 'o') && + *(str + 2) == '\0') { + return 0; + } + + return -1; +} + +/* + * Try to substitute all variables in given string + * Return: newly allocated string with substituted variables (original string may be freed if variables are found) + */ +char * +substitute_variable (struct config_file *cfg, char *str, u_char recursive) +{ + char *var, *new, *v_begin, *v_end; + size_t len; + + while ((v_begin = strstr (str, "${")) != NULL) { + len = strlen (str); + *v_begin = '\0'; + v_begin += 2; + if ((v_end = strstr (v_begin, "}")) == NULL) { + /* Not a variable, skip */ + continue; + } + *v_end = '\0'; + var = g_hash_table_lookup (cfg->variables, v_begin); + if (var == NULL) { + yywarn ("substitute_variable: variable %s is not defined", v_begin); + /* Substitute unknown variables with empty string */ + var = ""; + } + else if (recursive) { + var = substitute_variable (cfg, var, recursive); + } + /* Allocate new string */ + new = memory_pool_alloc (cfg->cfg_pool, len - strlen (v_begin) + strlen (var) + 1); + + snprintf (new, len - strlen (v_begin) + strlen (var) + 1, "%s%s%s", + str, var, v_end + 1); + str = new; + } + + return str; +} + +static void +substitute_module_variables (gpointer key, gpointer value, gpointer data) +{ + struct config_file *cfg = (struct config_file *)data; + LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value; + struct module_opt *cur, *tmp; + + LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) { + if (cur->value) { + cur->value = substitute_variable (cfg, cur->value, 0); + } + } +} + +static void +substitute_all_variables (gpointer key, gpointer value, gpointer data) +{ + struct config_file *cfg = (struct config_file *)data; + char *var; + + var = value; + /* Do recursive substitution */ + var = substitute_variable (cfg, var, 1); +} + +static void +parse_filters_str (struct config_file *cfg, const char *str, enum script_type type) +{ + gchar **strvec, **p; + struct filter *cur; + int i; + + if (str == NULL) { + return; + } + + strvec = g_strsplit (str, ",", 0); + if (strvec == NULL) { + return; + } + + p = strvec; + while (*p++) { + cur = NULL; + /* Search modules from known C modules */ + for (i = 0; i < MODULES_NUM; i++) { + if (strcasecmp (modules[i].name, *p) == 0) { + cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct filter)); + cur->type = C_FILTER; + switch (type) { + case SCRIPT_HEADER: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->header_filters, cur, next); + break; + case SCRIPT_MIME: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->mime_filters, cur, next); + break; + case SCRIPT_MESSAGE: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->message_filters, cur, next); + break; + case SCRIPT_URL: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->url_filters, cur, next); + break; + } + break; + } + } + if (cur != NULL) { + /* Go to next iteration */ + continue; + } + cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct filter)); + cur->type = PERL_FILTER; + switch (type) { + case SCRIPT_HEADER: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->header_filters, cur, next); + break; + case SCRIPT_MIME: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->mime_filters, cur, next); + break; + case SCRIPT_MESSAGE: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->message_filters, cur, next); + break; + case SCRIPT_URL: + cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p); + LIST_INSERT_HEAD (&cfg->url_filters, cur, next); + break; + } + } + + g_strfreev (strvec); +} + +/* + * Substitute all variables in strings + */ +void +post_load_config (struct config_file *cfg) +{ + g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg); + g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg); + parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER); + parse_filters_str (cfg, cfg->mime_filters_str, SCRIPT_MIME); + parse_filters_str (cfg, cfg->message_filters_str, SCRIPT_MESSAGE); + parse_filters_str (cfg, cfg->url_filters_str, SCRIPT_URL); +} + +/* + * Rspamd regexp utility functions + */ +struct rspamd_regexp* +parse_regexp (memory_pool_t *pool, char *line) +{ + char *begin, *end, *p; + struct rspamd_regexp *result; + int regexp_flags = 0; + enum rspamd_regexp_type type = REGEXP_NONE; + GError *err = NULL; + + result = memory_pool_alloc0 (pool, sizeof (struct rspamd_regexp)); + /* First try to find header name */ + begin = strchr (line, '='); + if (begin != NULL) { + *begin = '\0'; + result->header = memory_pool_strdup (pool, line); + result->type = REGEXP_HEADER; + *begin = '='; + line = begin; + } + /* Find begin of regexp */ + while (*line != '/') { + line ++; + } + if (*line != '\0') { + begin = line + 1; + } + else if (result->header == NULL) { + /* Assume that line without // is just a header name */ + result->header = memory_pool_strdup (pool, line); + result->type = REGEXP_HEADER; + return result; + } + else { + /* We got header name earlier but have not found // expression, so it is invalid regexp */ + return NULL; + } + /* Find end */ + end = begin; + while (*end && (*end != '/' || *(end - 1) == '\\')) { + end ++; + } + if (end == begin || *end != '/') { + return NULL; + } + /* Parse flags */ + p = end + 1; + while (p != NULL) { + switch (*p) { + case 'i': + regexp_flags |= G_REGEX_CASELESS; + p ++; + break; + case 'm': + regexp_flags |= G_REGEX_MULTILINE; + p ++; + break; + case 's': + regexp_flags |= G_REGEX_DOTALL; + p ++; + break; + case 'x': + regexp_flags |= G_REGEX_EXTENDED; + p ++; + break; + case 'u': + regexp_flags |= G_REGEX_UNGREEDY; + p ++; + break; + case 'o': + regexp_flags |= G_REGEX_OPTIMIZE; + p ++; + break; + /* Type flags */ + case 'H': + if (type != REGEXP_NONE) { + type = REGEXP_HEADER; + } + p ++; + break; + case 'M': + if (type != REGEXP_NONE) { + type = REGEXP_MESSAGE; + } + p ++; + break; + case 'P': + if (type != REGEXP_NONE) { + type = REGEXP_MIME; + } + p ++; + break; + case 'U': + if (type != REGEXP_NONE) { + type = REGEXP_URL; + } + p ++; + break; + /* Stop flags parsing */ + default: + p = NULL; + break; + } + } + + result = memory_pool_alloc (pool, sizeof (struct rspamd_regexp)); + result->type = type; + *end = '\0'; + result->regexp = g_regex_new (begin, regexp_flags, 0, &err); + result->regexp_text = memory_pool_strdup (pool, begin); + memory_pool_add_destructor (pool, (pool_destruct_func)g_regex_unref, (void *)result->regexp); + *end = '/'; + + return result; +} + +/* + * vi:ts=4 + */ diff --git a/src/controller.c b/src/controller.c new file mode 100644 index 000000000..1efb8c35c --- /dev/null +++ b/src/controller.c @@ -0,0 +1,349 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include "util.h" +#include "main.h" +#include "protocol.h" +#include "upstream.h" +#include "cfg_file.h" +#include "modules.h" + +#define CRLF "\r\n" + +enum command_type { + COMMAND_PASSWORD, + COMMAND_QUIT, + COMMAND_RELOAD, + COMMAND_STAT, + COMMAND_SHUTDOWN, + COMMAND_UPTIME, +}; + +struct controller_command { + char *command; + int privilleged; + enum command_type type; +}; + +static struct controller_command commands[] = { + {"password", 0, COMMAND_PASSWORD}, + {"quit", 0, COMMAND_QUIT}, + {"reload", 1, COMMAND_RELOAD}, + {"stat", 0, COMMAND_STAT}, + {"shutdown", 1, COMMAND_SHUTDOWN}, + {"uptime", 0, COMMAND_UPTIME}, +}; + +static GCompletion *comp; +static time_t start_time; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: + _exit (1); + break; + } +} + +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + msg_info ("controller's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +static gchar* +completion_func (gpointer elem) +{ + struct controller_command *cmd = (struct controller_command *)elem; + + return cmd->command; +} + +static void +free_session (struct controller_session *session) +{ + bufferevent_disable (session->bev, EV_READ | EV_WRITE); + memory_pool_delete (session->session_pool); + g_free (session); +} + +static int +check_auth (struct controller_command *cmd, struct controller_session *session) +{ + char out_buf[128]; + int r; + + if (cmd->privilleged && !session->authorized) { + r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF); + bufferevent_write (session->bev, out_buf, r); + return 0; + } + + return 1; +} + +static void +process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) +{ + char out_buf[512], *arg; + int r = 0, days, hours, minutes; + time_t uptime; + + switch (cmd->type) { + case COMMAND_PASSWORD: + arg = *cmd_args; + if (*arg == '\0') { + msg_debug ("process_command: empty password passed"); + r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF); + bufferevent_write (session->bev, out_buf, r); + return; + } + if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) { + session->authorized = 1; + r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF); + bufferevent_write (session->bev, out_buf, r); + } + else { + session->authorized = 0; + r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF); + bufferevent_write (session->bev, out_buf, r); + } + break; + case COMMAND_QUIT: + free_session (session); + break; + case COMMAND_RELOAD: + if (check_auth (cmd, session)) { + r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF); + bufferevent_write (session->bev, out_buf, r); + kill (getppid (), SIGHUP); + } + break; + case COMMAND_STAT: + /* XXX need to implement stat */ + if (check_auth (cmd, session)) { + r = snprintf (out_buf, sizeof (out_buf), "-- end of stats report" CRLF); + bufferevent_write (session->bev, out_buf, r); + } + case COMMAND_SHUTDOWN: + if (check_auth (cmd, session)) { + r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF); + bufferevent_write (session->bev, out_buf, r); + kill (getppid (), SIGTERM); + } + break; + case COMMAND_UPTIME: + if (check_auth (cmd, session)) { + uptime = time (NULL) - start_time; + /* If uptime more than 2 hours, print as a number of days. */ + if (uptime >= 2 * 3600) { + days = uptime / 86400; + hours = (uptime % 3600) / 60; + minutes = (uptime % 60) / 60; + r = snprintf (out_buf, sizeof (out_buf), "%dday%s %dhour%s %dminute%s" CRLF, + days, days > 1 ? "s" : " ", + hours, hours > 1 ? "s" : " ", + minutes, minutes > 1 ? "s" : " "); + } + /* If uptime is less than 1 minute print only seconds */ + else if (uptime / 60 == 0) { + r = snprintf (out_buf, sizeof (out_buf), "%dsecond%s", uptime, uptime > 1 ? "s" : " "); + } + /* Else print the minutes and seconds. */ + else { + hours = uptime / 3600; + minutes = (uptime % 60) / 60; + r = snprintf (out_buf, sizeof (out_buf), "%dhour%s %dminite%s %dsecond%s", + hours, hours > 1 ? "s" : " ", + minutes, minutes > 1 ? "s" : " ", + (int)uptime, uptime > 1 ? "s" : " "); + } + + } + bufferevent_write (session->bev, out_buf, r); + break; + } +} + +static void +read_socket (struct bufferevent *bev, void *arg) +{ + struct controller_session *session = (struct controller_session *)arg; + int len, i; + char *s, **params, *cmd, out_buf[128]; + GList *comp_list; + + s = evbuffer_readline (EVBUFFER_INPUT (bev)); + if (s != NULL && *s != 0) { + len = strlen (s); + /* Remove end of line characters from string */ + if (s[len - 1] == '\n') { + if (s[len - 2] == '\r') { + s[len - 2] = 0; + } + s[len - 1] = 0; + } + params = g_strsplit (s, " ", -1); + len = g_strv_length (params); + if (len > 0) { + cmd = g_strstrip (params[0]); + comp_list = g_completion_complete (comp, cmd, NULL); + switch (g_list_length (comp_list)) { + case 1: + process_command ((struct controller_command *)comp_list->data, ¶ms[1], session); + break; + case 0: + i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF); + bufferevent_write (bev, out_buf, i); + break; + default: + i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF); + bufferevent_write (bev, out_buf, i); + break; + } + } + g_strfreev (params); + } + if (s != NULL) { + free (s); + } +} + +static void +write_socket (struct bufferevent *bev, void *arg) +{ + char buf[1024], hostbuf[256]; + + gethostname (hostbuf, sizeof (hostbuf - 1)); + hostbuf[sizeof (hostbuf) - 1] = '\0'; + snprintf (buf, sizeof (buf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf); + bufferevent_disable (bev, EV_WRITE); + bufferevent_enable (bev, EV_READ); +} + +static void +err_socket (struct bufferevent *bev, short what, void *arg) +{ + struct controller_session *session = (struct controller_session *)arg; + msg_info ("closing connection"); + /* Free buffers */ + free_session (session); +} + +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct controller_session *new_session; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + return; + } + if (event_make_socket_nonblocking(fd) < 0) { + return; + } + + new_session = g_malloc (sizeof (struct controller_session)); + if (new_session == NULL) { + msg_err ("accept_socket: cannot allocate memory for task, %m"); + return; + } + bzero (new_session, sizeof (struct controller_session)); + new_session->worker = worker; + new_session->sock = nfd; + new_session->cfg = worker->srv->cfg; + new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1); + memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev); + worker->srv->stat->control_connections_count ++; + + /* Read event */ + new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session); + bufferevent_enable (new_session->bev, EV_WRITE); +} + +void +start_controller (struct rspamd_worker *worker) +{ + struct sigaction signals; + int listen_sock, i; + struct sockaddr_un *un_addr; + GList *comp_list = NULL; + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_CONTROLLER; + event_init (); + g_mime_init (0); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + if (worker->srv->cfg->control_family == AF_INET) { + if ((listen_sock = make_socket (worker->srv->cfg->control_host, worker->srv->cfg->control_port)) == -1) { + msg_err ("start_controller: cannot create tcp listen socket. %m"); + exit(-errno); + } + } + else { + un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un)); + if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) { + msg_err ("start_controller: cannot create unix listen socket. %m"); + exit(-errno); + } + } + + start_time = time (NULL); + + /* Init command completion */ + for (i = 0; i < sizeof (commands) / sizeof (commands[0]) - 1; i ++) { + comp_list = g_list_prepend (comp_list, &commands[i]); + } + comp = g_completion_new (completion_func); + g_completion_add_items (comp, comp_list); + /* Accept event */ + event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + event_loop (0); +} + + +/* + * vi:ts=4 + */ diff --git a/src/filter.c b/src/filter.c new file mode 100644 index 000000000..d1edbb930 --- /dev/null +++ b/src/filter.c @@ -0,0 +1,336 @@ +#include +#include +#include +#include + +#include "mem_pool.h" +#include "filter.h" +#include "main.h" +#include "cfg_file.h" +#include "perl.h" + +void +insert_result (struct worker_task *task, const char *metric_name, const char *symbol, u_char flag) +{ + struct metric *metric; + struct metric_result *metric_res; + + metric = g_hash_table_lookup (task->worker->srv->cfg->metrics, metric_name); + if (metric == NULL) { + return; + } + + metric_res = g_hash_table_lookup (task->results, metric_name); + + if (metric_res == NULL) { + /* Create new metric chain */ + metric_res = memory_pool_alloc (task->task_pool, sizeof (struct metric_result)); + metric_res->symbols = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_destroy, metric_res->symbols); + metric_res->metric = metric; + g_hash_table_insert (task->results, (gpointer)metric_name, metric_res); + } + + g_hash_table_insert (metric_res->symbols, (gpointer)symbol, GSIZE_TO_POINTER (flag)); +} + +/* + * Default consolidation function based on factors in config file + */ +double +factor_consolidation_func (struct worker_task *task, const char *metric_name) +{ + struct metric_result *metric_res; + double *factor; + double res = 0.; + GList *symbols = NULL, *cur; + + metric_res = g_hash_table_lookup (task->results, metric_name); + if (metric_res == NULL) { + return res; + } + + symbols = g_hash_table_get_keys (metric_res->symbols); + cur = g_list_first (symbols); + while (cur) { + factor = g_hash_table_lookup (task->worker->srv->cfg->factors, cur->data); + if (factor == NULL) { + /* Default multiplier is 1 */ + res ++; + } + else { + res += *factor; + } + cur = g_list_next (cur); + } + + g_list_free (symbols); + + return res; +} + +/* + * Call perl or C module function for specified part of message + */ +static void +call_filter_by_name (struct worker_task *task, const char *name, enum script_type sc_type, enum filter_type filt_type) +{ + struct module_ctx *c_module; + + switch (filt_type) { + case C_FILTER: + c_module = g_hash_table_lookup (task->worker->srv->cfg->c_modules, name); + if (c_module) { + switch (filt_type) { + case SCRIPT_HEADER: + c_module->header_filter (task); + break; + case SCRIPT_MIME: + c_module->mime_filter (task); + break; + case SCRIPT_URL: + c_module->url_filter (task); + break; + case SCRIPT_MESSAGE: + c_module->message_filter (task); + break; + } + } + break; + case PERL_FILTER: + switch (filt_type) { + case SCRIPT_HEADER: + perl_call_header_filter (name, task); + break; + case SCRIPT_MIME: + perl_call_mime_filter (name, task); + break; + case SCRIPT_URL: + perl_call_url_filter (name, task); + break; + case SCRIPT_MESSAGE: + perl_call_message_filter (name, task); + break; + } + break; + } +} + +static void +metric_process_callback (gpointer key, gpointer value, void *data) +{ + struct worker_task *task = (struct worker_task *)data; + struct metric_result *metric_res = (struct metric_result *)value; + + if (metric_res->metric->func != NULL) { + metric_res->score = metric_res->metric->func (task, metric_res->metric->name); + } + else { + metric_res->score = factor_consolidation_func (task, metric_res->metric->name); + } +} + +static int +continue_process_filters (struct worker_task *task) +{ + struct filter *cur = task->save.entry; + + cur = LIST_NEXT (cur, next); + /* Note: no breaks in this case! */ + switch (task->save.type) { + case SCRIPT_HEADER: + while (cur) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_HEADER; + return 0; + } + cur = LIST_NEXT (cur, next); + } + /* Process mime filters */ + cur = LIST_FIRST (&task->worker->srv->cfg->mime_filters); + case SCRIPT_MIME: + while (cur) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_MIME; + return 0; + } + cur = LIST_NEXT (cur, next); + } + /* Process url filters */ + cur = LIST_FIRST (&task->worker->srv->cfg->url_filters); + case SCRIPT_URL: + while (cur) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_URL; + return 0; + } + cur = LIST_NEXT (cur, next); + } + /* Process message filters */ + cur = LIST_FIRST (&task->worker->srv->cfg->message_filters); + case SCRIPT_MESSAGE: + while (cur) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_MESSAGE; + return 0; + } + cur = LIST_NEXT (cur, next); + } + /* All done */ + return 1; + } +} + +int +process_filters (struct worker_task *task) +{ + struct filter *cur; + + if (task->save.saved) { + task->save.saved = 0; + return continue_process_filters (task); + } + + /* Process filters in order that they are listed in config file */ + LIST_FOREACH (cur, &task->worker->srv->cfg->header_filters, next) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_HEADER; + return 0; + } + } + + LIST_FOREACH (cur, &task->worker->srv->cfg->mime_filters, next) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_MIME; + return 0; + } + } + + LIST_FOREACH (cur, &task->worker->srv->cfg->url_filters, next) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_URL; + return 0; + } + } + + LIST_FOREACH (cur, &task->worker->srv->cfg->message_filters, next) { + call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE); + if (task->save.saved) { + task->save.entry = cur; + task->save.type = SCRIPT_MESSAGE; + return 0; + } + } + + /* Process all metrics */ + g_hash_table_foreach (task->results, metric_process_callback, task); + return 1; +} + +struct composites_data { + struct worker_task *task; + struct metric_result *metric_res; +}; + +static void +composites_foreach_callback (gpointer key, gpointer value, void *data) +{ + struct composites_data *cd = (struct composites_data *)data; + struct expression *expr = (struct expression *)value; + GQueue *stack; + GList *symbols = NULL, *s; + gsize cur, op1, op2; + + stack = g_queue_new (); + + while (expr) { + if (expr->type == EXPR_OPERAND) { + /* Find corresponding symbol */ + if (g_hash_table_lookup (cd->metric_res->symbols, expr->content.operand) == NULL) { + cur = 0; + } + else { + cur = 1; + symbols = g_list_append (symbols, expr->content.operand); + } + g_queue_push_head (stack, GSIZE_TO_POINTER (cur)); + } + else { + if (g_queue_is_empty (stack)) { + /* Queue has no operands for operation, exiting */ + g_list_free (symbols); + g_queue_free (stack); + return; + } + switch (expr->content.operation) { + case '!': + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + op1 = !op1; + g_queue_push_head (stack, GSIZE_TO_POINTER (op1)); + break; + case '&': + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + g_queue_push_head (stack, GSIZE_TO_POINTER (op1 && op2)); + case '|': + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + g_queue_push_head (stack, GSIZE_TO_POINTER (op1 || op2)); + default: + expr = expr->next; + continue; + } + } + expr = expr->next; + } + if (!g_queue_is_empty (stack)) { + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + if (op1) { + /* Remove all symbols that are in composite symbol */ + s = g_list_first (symbols); + while (s) { + g_hash_table_remove (cd->metric_res->symbols, s->data); + s = g_list_next (s); + } + /* Add new symbol */ + g_hash_table_insert (cd->metric_res->symbols, key, GSIZE_TO_POINTER (op1)); + } + } + + g_queue_free (stack); + g_list_free (symbols); + + return; +} + +static void +composites_metric_callback (gpointer key, gpointer value, void *data) +{ + struct worker_task *task = (struct worker_task *)data; + struct composites_data *cd = memory_pool_alloc (task->task_pool, sizeof (struct composites_data)); + struct metric_result *metric_res = (struct metric_result *)value; + + cd->task = task; + cd->metric_res = (struct metric_result *)metric_res; + + g_hash_table_foreach (task->cfg->composite_symbols, composites_foreach_callback, cd); +} + +void make_composites (struct worker_task *task) +{ + g_hash_table_foreach (task->results, composites_metric_callback, task); +} diff --git a/src/filter.h b/src/filter.h new file mode 100644 index 000000000..ce8be7ecd --- /dev/null +++ b/src/filter.h @@ -0,0 +1,43 @@ +#ifndef RSPAMD_FILTER_H +#define RSPAMD_FILTER_H + +#include +#ifndef HAVE_OWN_QUEUE_H +#include +#else +#include "queue.h" +#endif +#include + +struct worker_task; + +typedef double (*metric_cons_func)(struct worker_task *task, const char *metric_name); +typedef void (*filter_func)(struct worker_task *task); + +enum filter_type { C_FILTER, PERL_FILTER }; + +struct filter { + char *func_name; + enum filter_type type; + LIST_ENTRY (filter) next; +}; + +struct metric { + char *name; + char *func_name; + metric_cons_func func; + double required_score; +}; + +struct metric_result { + struct metric *metric; + double score; + GHashTable *symbols; +}; + +int process_filters (struct worker_task *task); +void insert_result (struct worker_task *task, const char *metric_name, const char *symbol, u_char flag); +void make_composites (struct worker_task *task); +double factor_consolidation_func (struct worker_task *task, const char *metric_name); + +#endif diff --git a/src/fstring.c b/src/fstring.c new file mode 100644 index 000000000..ad0be7d78 --- /dev/null +++ b/src/fstring.c @@ -0,0 +1,234 @@ +#include + +#include "fstring.h" + +/* + * Search first occurence of character in string + */ +ssize_t +fstrchr (f_str_t *src, char c) +{ + register ssize_t cur = 0; + + while (cur < src->len) { + if (*(src->begin + cur) == c) { + return cur; + } + cur ++; + } + + return -1; +} + +/* + * Search last occurence of character in string + */ +ssize_t +fstrrchr (f_str_t *src, char c) +{ + register ssize_t cur = src->len; + + while (cur > 0) { + if (*(src->begin + cur) == c) { + return cur; + } + cur --; + } + + return -1; +} + +/* + * Search for pattern in orig + */ +ssize_t +fstrstr (f_str_t *orig, f_str_t *pattern) +{ + register ssize_t cur = 0, pcur = 0; + + if (pattern->len > orig->len) { + return -1; + } + + while (cur < orig->len) { + if (*(orig->begin + cur) == *pattern->begin) { + while (cur < orig->len && pcur < pattern->len) { + if (*(orig->begin + cur) != *(pattern->begin + pcur)) { + pcur = 0; + break; + } + cur ++; + pcur ++; + } + return cur - pattern->len; + } + cur ++; + } + + return -1; + +} + +/* + * Split string by tokens + * word contains parsed word + * + * Return: -1 - no new words can be extracted + * 1 - word was extracted and there are more words + * 0 - last word extracted + */ +int +fstrtok (f_str_t *text, const char *sep, f_tok_t *state) +{ + register size_t cur; + const char *csep = sep; + + if (state->pos >= text->len) { + return -1; + } + + cur = state->pos; + + while (cur < text->len) { + while (*csep) { + if (*(text->begin + cur) == *csep) { + state->word.begin = (text->begin + state->pos); + state->word.len = cur - state->pos; + state->pos = cur + 1; + return 1; + } + csep ++; + } + csep = sep; + cur ++; + } + + /* Last word */ + state->word.begin = (text->begin + state->pos); + state->word.len = cur - state->pos; + state->pos = cur; + + return 0; +} + +/* + * Copy one string into other + */ +size_t +fstrcpy (f_str_t *dest, f_str_t *src) +{ + register size_t cur = 0; + + if (dest->size < src->len) { + return 0; + } + + while (cur < src->len && cur < dest->size) { + *(dest->begin + cur) = *(src->begin + cur); + cur ++; + } + + return cur; +} + +/* + * Concatenate two strings + */ +size_t +fstrcat (f_str_t *dest, f_str_t *src) +{ + register size_t cur = src->len; + + if (dest->size < src->len + dest->len) { + return 0; + } + + while (cur < src->len && cur < dest->size) { + *(dest->begin + cur) = *(src->begin + cur); + cur ++; + } + + dest->len += src->len; + + return cur; + +} + +/* + * Push one character to fstr + */ +int +fstrpush (f_str_t *dest, char c) +{ + if (dest->size < dest->len) { + /* Need to reallocate string */ + return 0; + } + + *(dest->begin + dest->len) = c; + dest->len ++; + return 1; +} + +/* + * Allocate memory for f_str_t + */ +f_str_t* +fstralloc (memory_pool_t *pool, size_t len) +{ + f_str_t *res = memory_pool_alloc (pool, sizeof (f_str_t)); + + if (res == NULL) { + return NULL; + } + res->begin = memory_pool_alloc (pool, len); + if (res->begin == NULL) { + free (res); + return NULL; + } + + res->size = len; + return res; +} + +/* + * Truncate string to its len + */ +f_str_t* +fstrtruncate (memory_pool_t *pool, f_str_t *orig) +{ + f_str_t *res; + + if (orig == NULL || orig->len == 0 || orig->size <= orig->len) { + return orig; + } + + res = fstralloc (pool, orig->len); + if (res == NULL) { + return NULL; + } + fstrcpy (res, orig); + + return res; +} + +/* + * Enlarge string to new size + */ +f_str_t* +fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen) +{ + f_str_t *res; + + if (orig == NULL || orig->len == 0 || orig->size >= newlen) { + return orig; + } + + res = fstralloc (pool, newlen); + if (res == NULL) { + return NULL; + } + fstrcpy (res, orig); + + return res; +} diff --git a/src/fstring.h b/src/fstring.h new file mode 100644 index 000000000..c3087b2c5 --- /dev/null +++ b/src/fstring.h @@ -0,0 +1,86 @@ +/* + * Functions for handling with fixed size strings + */ + +#ifndef FSTRING_H +#define FSTRING_H + +#include +#include "mem_pool.h" + +#define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin + +typedef struct f_str_s { + char *begin; + size_t len; + size_t size; +} f_str_t; + +typedef struct f_str_buf_s { + f_str_t *buf; + char *pos; + size_t free; +} f_str_buf_t; + +typedef struct f_tok_s { + f_str_t word; + size_t pos; +} f_tok_t; + +/* + * Search first occurence of character in string + */ +ssize_t fstrchr (f_str_t *src, char c); + +/* + * Search last occurence of character in string + */ +ssize_t fstrrchr (f_str_t *src, char c); + +/* + * Search for pattern in orig + */ +ssize_t fstrstr (f_str_t *orig, f_str_t *pattern); + +/* + * Split string by tokens + * word contains parsed word + */ +int fstrtok (f_str_t *text, const char *sep, f_tok_t *state); + +/* + * Copy one string into other + */ +size_t fstrcpy (f_str_t *dest, f_str_t *src); + +/* + * Concatenate two strings + */ +size_t fstrcat (f_str_t *dest, f_str_t *src); + +/* + * Push one character to fstr + */ +int fstrpush (f_str_t *dest, char c); + +/* + * Allocate memory for f_str_t + */ +f_str_t* fstralloc (memory_pool_t *pool, size_t len); + +/* + * Truncate string to its len + */ +f_str_t* fstrtruncate (memory_pool_t *pool, f_str_t *orig); + +/* + * Enlarge string to new size + */ +f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen); + +/* + * Return specified character + */ +#define fstridx(str, pos) *((str)->begin + (pos)) + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 000000000..d2dec7e47 --- /dev/null +++ b/src/main.c @@ -0,0 +1,427 @@ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_LIBUTIL_H +#include +#endif +#include + +#include /* from the Perl distribution */ +#include /* from the Perl distribution */ + +#include "main.h" +#include "cfg_file.h" +#include "util.h" + +struct config_file *cfg; + +static void sig_handler (int ); +static struct rspamd_worker * fork_worker (struct rspamd_main *, int, int, enum process_type); + +sig_atomic_t do_restart; +sig_atomic_t do_terminate; +sig_atomic_t child_dead; +sig_atomic_t child_ready; + +extern int yynerrs; +extern FILE *yyin; +extern void boot_DynaLoader (pTHX_ CV* cv); +extern void boot_Socket (pTHX_ CV* cv); + +PerlInterpreter *perl_interpreter; +/* XXX: remove this shit when it would be clear why perl need this line */ +PerlInterpreter *my_perl; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGHUP: + do_restart = 1; + do_reopen_log = 1; + break; + case SIGINT: + case SIGTERM: + do_terminate = 1; + break; + case SIGCHLD: + child_dead = 1; + break; + case SIGUSR2: + child_ready = 1; + break; + } +} + +void +xs_init(pTHX) +{ + dXSUB_SYS; + /* DynaLoader is a special case */ + newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__); +} + +static void +init_filters (struct config_file *cfg) +{ + struct perl_module *module; + + LIST_FOREACH (module, &cfg->perl_modules, next) { + if (module->path) { + require_pv (module->path); + } + } +} + +static struct rspamd_worker * +fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum process_type type) +{ + struct rspamd_worker *cur; + char *cfg_file; + FILE *f; + struct config_file *tmp_cfg; + /* Starting worker process */ + cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker)); + if (cur) { + /* Reconfig needed */ + if (reconfig) { + tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file)); + if (tmp_cfg) { + bzero (tmp_cfg, sizeof (struct config_file)); + tmp_cfg->cfg_pool = memory_pool_new (32768); + cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name); + f = fopen (rspamd->cfg->cfg_name , "r"); + if (f == NULL) { + msg_warn ("fork_worker: cannot open file: %s", rspamd->cfg->cfg_name ); + } + else { + yyin = f; + yyrestart (yyin); + + if (yyparse() != 0 || yynerrs > 0) { + msg_warn ("fork_worker: yyparse: cannot parse config file, %d errors", yynerrs); + fclose (f); + } + else { + free_config (rspamd->cfg); + g_free (rspamd->cfg); + rspamd->cfg = tmp_cfg; + rspamd->cfg->cfg_name = cfg_file; + } + } + } + } + bzero (cur, sizeof (struct rspamd_worker)); + TAILQ_INSERT_HEAD (&rspamd->workers, cur, next); + cur->srv = rspamd; + cur->pid = fork(); + switch (cur->pid) { + case 0: + /* TODO: add worker code */ + switch (type) { + case TYPE_CONTROLLER: + setproctitle ("controller process"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting controller process %d", getpid ()); + cur->type = TYPE_CONTROLLER; + start_controller (cur); + break; + case TYPE_WORKER: + default: + setproctitle ("worker process"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting worker process %d", getpid ()); + cur->type = TYPE_WORKER; + start_worker (cur, listen_sock); + break; + } + break; + case -1: + msg_err ("fork_worker: cannot fork main process. %m"); + pidfile_remove (rspamd->pfh); + exit (-errno); + break; + } + } + + return cur; +} + +int +main (int argc, char **argv) +{ + struct rspamd_main *rspamd; + struct module_ctx *cur_module = NULL; + int res = 0, i, listen_sock; + struct sigaction signals; + struct rspamd_worker *cur, *cur_tmp, *active_worker; + struct sockaddr_un *un_addr; + FILE *f; + pid_t wrk; + char *args[] = { "", "-e", "0", NULL }; + + rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main)); + bzero (rspamd, sizeof (struct rspamd_main)); + rspamd->server_pool = memory_pool_new (memory_pool_get_size ()); + cfg = (struct config_file *)g_malloc (sizeof (struct config_file)); + rspamd->cfg = cfg; + if (!rspamd || !rspamd->cfg) { + fprintf(stderr, "Cannot allocate memory\n"); + exit(-errno); + } + + do_terminate = 0; + do_restart = 0; + child_dead = 0; + child_ready = 0; + do_reopen_log = 0; + active_worker = NULL; + + rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat)); + bzero (rspamd->stat, sizeof (struct rspamd_stat)); + + bzero (rspamd->cfg, sizeof (struct config_file)); + rspamd->cfg->cfg_pool = memory_pool_new (memory_pool_get_size ()); + init_defaults (rspamd->cfg); + + bzero (&signals, sizeof (struct sigaction)); + + rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, FIXED_CONFIG_FILE); + read_cmd_line (argc, argv, rspamd->cfg); + + msg_warn ("(main) starting..."); + + #ifndef HAVE_SETPROCTITLE + init_title (argc, argv, environ); + #endif + + f = fopen (rspamd->cfg->cfg_name , "r"); + if (f == NULL) { + msg_warn ("cannot open file: %s", rspamd->cfg->cfg_name ); + return EBADF; + } + yyin = f; + + if (yyparse() != 0 || yynerrs > 0) { + msg_warn ("yyparse: cannot parse config file, %d errors", yynerrs); + return EBADF; + } + + fclose (f); + rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, rspamd->cfg->cfg_name ); + + /* Strictly set temp dir */ + if (!rspamd->cfg->temp_dir) { + msg_warn ("tempdir is not set, trying to use $TMPDIR"); + rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR")); + + if (!rspamd->cfg->temp_dir) { + msg_warn ("$TMPDIR is empty too, using /tmp as default"); + rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp"); + } + } + + switch (cfg->log_type) { + case RSPAMD_LOG_CONSOLE: + if (!rspamd->cfg->no_fork) { + fprintf (stderr, "Cannot log to console while daemonized, disable logging"); + cfg->log_fd = -1; + } + else { + cfg->log_fd = 2; + } + g_log_set_default_handler (file_log_function, cfg); + break; + case RSPAMD_LOG_FILE: + if (cfg->log_file == NULL || open_log (cfg) == -1) { + fprintf (stderr, "Fatal error, cannot open logfile, exiting"); + exit (EXIT_FAILURE); + } + g_log_set_default_handler (file_log_function, cfg); + break; + case RSPAMD_LOG_SYSLOG: + if (open_log (cfg) == -1) { + fprintf (stderr, "Fatal error, cannot open syslog facility, exiting"); + exit (EXIT_FAILURE); + } + g_log_set_default_handler (syslog_log_function, cfg); + break; + } + + if (!rspamd->cfg->no_fork && daemon (1, 1) == -1) { + fprintf (stderr, "Cannot daemonize\n"); + exit (-errno); + } + + if (write_pid (rspamd) == -1) { + msg_err ("main: cannot write pid file %s", rspamd->cfg->pid_file); + exit (-errno); + } + + /* Init C modules */ + for (i = 0; i < MODULES_NUM; i ++) { + cur_module = memory_pool_alloc (rspamd->cfg->cfg_pool, sizeof (struct module_ctx)); + if (modules[i].module_init_func(cfg, &cur_module) == 0) { + g_hash_table_insert (cfg->c_modules, (gpointer)modules[i].name, cur_module); + } + } + + rspamd->pid = getpid(); + rspamd->type = TYPE_MAIN; + + init_signals (&signals, sig_handler); + /* Init perl interpreter */ + PERL_SYS_INIT3 (&argc, &argv, &env); + perl_interpreter = perl_alloc (); + if (perl_interpreter == NULL) { + msg_err ("main: cannot allocate perl interpreter, %m"); + exit (-errno); + } + + my_perl = perl_interpreter; + PERL_SET_CONTEXT (perl_interpreter); + perl_construct (perl_interpreter); + PL_exit_flags |= PERL_EXIT_DESTRUCT_END; + perl_parse (perl_interpreter, xs_init, 3, args, NULL); + /* Block signals to use sigsuspend in future */ + sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL); + + if (rspamd->cfg->bind_family == AF_INET) { + if ((listen_sock = make_socket (rspamd->cfg->bind_host, rspamd->cfg->bind_port)) == -1) { + msg_err ("main: cannot create tcp listen socket. %m"); + exit(-errno); + } + } + else { + un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un)); + if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) { + msg_err ("main: cannot create unix listen socket. %m"); + exit(-errno); + } + } + + if (listen (listen_sock, -1) == -1) { + msg_err ("main: cannot listen on socket. %m"); + exit(-errno); + } + + TAILQ_INIT (&rspamd->workers); + + setproctitle ("main process"); + + for (i = 0; i < cfg->workers_number; i++) { + fork_worker (rspamd, listen_sock, 0, TYPE_WORKER); + } + /* Start controller if enabled */ + if (cfg->controller_enabled) { + fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER); + } + + /* Signal processing cycle */ + for (;;) { + msg_debug ("main: calling sigsuspend"); + sigemptyset (&signals.sa_mask); + sigsuspend (&signals.sa_mask); + if (do_terminate) { + msg_debug ("main: catch termination signal, waiting for childs"); + pass_signal_worker (&rspamd->workers, SIGTERM); + break; + } + if (child_dead) { + child_dead = 0; + msg_debug ("main: catch SIGCHLD signal, finding terminated worker"); + /* Remove dead child form childs list */ + wrk = waitpid (0, &res, 0); + TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { + if (wrk == cur->pid) { + /* Catch situations if active worker is abnormally terminated */ + if (cur == active_worker) { + active_worker = NULL; + } + TAILQ_REMOVE(&rspamd->workers, cur, next); + if (cur->type == TYPE_CONTROLLER) { + msg_info ("main: do not restart dead controller"); + g_free (cur); + break; + } + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + /* Normal worker termination, do not fork one more */ + msg_info ("main: worker process %d terminated normally", cur->pid); + } + else { + if (WIFSIGNALED (res)) { + msg_warn ("main: worker process %d terminated abnormally by signal: %d", + cur->pid, WTERMSIG(res)); + } + else { + msg_warn ("main: worker process %d terminated abnormally", cur->pid); + } + /* Fork another worker in replace of dead one */ + fork_worker (rspamd, listen_sock, 0, cur->type); + } + g_free (cur); + } + } + } + if (do_restart) { + do_restart = 0; + + if (active_worker == NULL) { + /* Start new worker that would reread configuration*/ + active_worker = fork_worker (rspamd, listen_sock, 1, TYPE_WORKER); + } + /* Do not start new workers untill active worker is not ready for accept */ + } + if (child_ready) { + child_ready = 0; + + if (active_worker != NULL) { + msg_info ("main: worker process %d has been successfully started", active_worker->pid); + TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { + if (cur != active_worker && !cur->is_dying) { + /* Send to old workers SIGUSR2 */ + kill (cur->pid, SIGUSR2); + cur->is_dying = 1; + } + } + active_worker = NULL; + } + } + } + + /* Wait for workers termination */ + while (!TAILQ_EMPTY(&rspamd->workers)) { + cur = TAILQ_FIRST(&rspamd->workers); + waitpid (cur->pid, &res, 0); + msg_debug ("main(cleaning): worker process %d terminated", cur->pid); + TAILQ_REMOVE(&rspamd->workers, cur, next); + g_free(cur); + } + + msg_info ("main: terminating..."); + + + if (rspamd->cfg->bind_family == AF_UNIX) { + unlink (rspamd->cfg->bind_host); + } + + free_config (rspamd->cfg); + g_free (rspamd->cfg); + g_free (rspamd); + + return (res); +} + +/* + * vi:ts=4 + */ diff --git a/src/main.h b/src/main.h new file mode 100644 index 000000000..efb716ab0 --- /dev/null +++ b/src/main.h @@ -0,0 +1,201 @@ +#ifndef RPOP_MAIN_H +#define RPOP_MAIN_H + +#include "config.h" + +#include +#include +#ifndef HAVE_OWN_QUEUE_H +#include +#else +#include "queue.h" +#endif +#include + +#include +#include +#include + +#include +#include + +#include "fstring.h" +#include "mem_pool.h" +#include "url.h" +#include "memcached.h" +#include "protocol.h" +#include "filter.h" + +#include +#include + +/* Default values */ +#define FIXED_CONFIG_FILE "./rspamd.conf" +/* Time in seconds to exit for old worker */ +#define SOFT_SHUTDOWN_TIME 60 +/* Default metric name */ +#define DEFAULT_METRIC "default" + +/* Logging in postfix style */ +#define msg_err g_error +#define msg_warn g_warning +#define msg_info g_message +#define msg_debug g_debug + +/* Process type: main or worker */ +enum process_type { + TYPE_MAIN, + TYPE_WORKER, + TYPE_CONTROLLER, +}; + +/* Filter type */ +enum script_type { + SCRIPT_HEADER, + SCRIPT_MIME, + SCRIPT_URL, + SCRIPT_MESSAGE, +}; + +/* Logic expression */ +struct expression { + enum { EXPR_OPERAND, EXPR_OPERATION } type; + union { + void *operand; + char operation; + } content; + struct expression *next; +}; + +/* Worker process structure */ +struct rspamd_worker { + pid_t pid; + char is_initialized; + char is_dying; + TAILQ_ENTRY (rspamd_worker) next; + struct rspamd_main *srv; + enum process_type type; + struct event sig_ev; + struct event bind_ev; +}; + +struct pidfh; +struct config_file; + +/* Server statistics */ +struct rspamd_stat { + unsigned int messages_scanned; + unsigned int messages_spam; + unsigned int messages_ham; + unsigned int connections_count; + unsigned int control_connections_count; + unsigned int messages_learned; +}; + +/* Struct that determine main server object (for logging purposes) */ +struct rspamd_main { + struct config_file *cfg; + pid_t pid; + /* Pid file structure */ + struct pidfh *pfh; + enum process_type type; + unsigned int ev_initialized; + struct rspamd_stat *stat; + + memory_pool_t *server_pool; + + TAILQ_HEAD (workq, rspamd_worker) workers; +}; + +struct mime_part { + GMimeContentType *type; + GByteArray *content; + TAILQ_ENTRY (mime_part) next; +}; + +struct save_point { + void *entry; + enum script_type type; + unsigned int saved; +}; + +/* Control session */ +struct controller_session { + struct rspamd_worker *worker; + int sock; + /* Access to authorized commands */ + int authorized; + memory_pool_t *session_pool; + struct bufferevent *bev; + struct config_file *cfg; +}; + +/* Worker task structure */ +struct worker_task { + struct rspamd_worker *worker; + enum { + READ_COMMAND, + READ_HEADER, + READ_MESSAGE, + WRITE_REPLY, + WRITE_ERROR, + WAIT_FILTER, + CLOSING_CONNECTION, + } state; + size_t content_length; + enum rspamd_protocol proto; + enum rspamd_command cmd; + int sock; + char *helo; + char *from; + GList *rcpt; + unsigned int nrcpt; + struct in_addr from_addr; + f_str_buf_t *msg; + struct bufferevent *bev; + /* Memcached connection for this task */ + memcached_ctx_t *memc_ctx; + unsigned memc_busy:1; + /* Number of mime parts */ + int parts_count; + /* Message */ + GMimeMessage *message; + /* All parts of message */ + TAILQ_HEAD (mime_partq, mime_part) parts; + /* URLs extracted from message */ + TAILQ_HEAD (uriq, uri) urls; + /* Hash of metric result structures */ + GHashTable *results; + struct config_file *cfg; + /* Save point for filters deferred processing */ + struct save_point save; + /* Saved error message and code */ + char *last_error; + int error_code; + /* Memory pool that is associated with this task */ + memory_pool_t *task_pool; +}; + +struct module_ctx { + int (*header_filter)(struct worker_task *task); + int (*mime_filter)(struct worker_task *task); + int (*message_filter)(struct worker_task *task); + int (*url_filter)(struct worker_task *task); +}; + +struct c_module { + const char *name; + struct module_ctx *ctx; + LIST_ENTRY (c_module) next; +}; + +void start_worker (struct rspamd_worker *worker, int listen_sock); +void start_controller (struct rspamd_worker *worker); + +extern sig_atomic_t do_reopen_log; + +#endif + +/* + * vi:ts=4 + */ diff --git a/src/mem_pool.c b/src/mem_pool.c new file mode 100644 index 000000000..f116fff73 --- /dev/null +++ b/src/mem_pool.c @@ -0,0 +1,360 @@ +#include +#include +#include +#include +#include +#include +#include "config.h" + +#ifdef HAVE_SCHED_YIELD +#include +#endif + +#ifdef HAVE_NANOSLEEP +#include +#endif + +#include "mem_pool.h" + +/* Sleep time for spin lock in nanoseconds */ +#define MUTEX_SLEEP_TIME 10000000L + +#ifdef _THREAD_SAFE +pthread_mutex_t stat_mtx = PTHREAD_MUTEX_INITIALIZER; +#define STAT_LOCK() do { pthread_mutex_lock (&stat_mtx); } while (0) +#define STAT_UNLOCK() do { pthread_mutex_unlock (&stat_mtx); } while (0) +#else +#define STAT_LOCK() do {} while (0) +#define STAT_UNLOCK() do {} while (0) +#endif + +/* + * This define specify whether we should check all pools for free space for new object + * or just begin scan from current (recently attached) pool + * If MEMORY_GREEDY is defined, then we scan all pools to find free space (more CPU usage, slower + * but requires less memory). If it is not defined check only current pool and if object is too large + * to place in it allocate new one (this may cause huge CPU usage in some cases too, but generally faster than + * greedy method) + */ +#undef MEMORY_GREEDY + +/* Internal statistic */ +static size_t bytes_allocated = 0; +static size_t chunks_allocated = 0; +static size_t chunks_freed = 0; +static size_t shared_chunks_allocated = 0; + +static struct _pool_chain * +pool_chain_new (size_t size) +{ + struct _pool_chain *chain; + chain = g_malloc (sizeof (struct _pool_chain)); + chain->begin = g_malloc (size); + chain->len = size; + chain->pos = chain->begin; + chain->next = NULL; + STAT_LOCK (); + chunks_allocated ++; + STAT_UNLOCK (); + + return chain; +} + +static struct _pool_chain_shared * +pool_chain_new_shared (size_t size) +{ + struct _pool_chain_shared *chain; + +#if defined(HAVE_MMAP_ANON) + chain = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0); + chain->begin = ((u_char *)chain) + sizeof (struct _pool_chain_shared); + if (chain == MAP_FAILED) { + return NULL; + } +#elif defined(HAVE_MMAP_ZERO) + int fd; + + fd = open ("/dev/zero", O_RDWR); + if (fd == -1) { + return NULL; + } + chain = mmap (NULL, shm->size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + chain->begin = ((u_char *)chain) + sizeof (struct _pool_chain_shared); + if (chain == MAP_FAILED) { + return NULL; + } +#else +# error No mmap methods are defined +#endif + chain->len = size; + chain->pos = chain->begin; + chain->lock = 0; + chain->next = NULL; + STAT_LOCK (); + shared_chunks_allocated ++; + STAT_UNLOCK (); + + return chain; +} + +memory_pool_t* +memory_pool_new (size_t size) +{ + memory_pool_t *new; + + new = g_malloc (sizeof (memory_pool_t)); + new->cur_pool = pool_chain_new (size); + new->shared_pool = NULL; + new->first_pool = new->cur_pool; + new->destructors = NULL; + + return new; +} + +void * +memory_pool_alloc (memory_pool_t *pool, size_t size) +{ + u_char *tmp; + struct _pool_chain *new, *cur; + + if (pool) { +#ifdef MEMORY_GREEDY + cur = pool->first_pool; +#else + cur = pool->cur_pool; +#endif + /* Find free space in pool chain */ + while (memory_pool_free (cur) < size && cur->next) { + cur = cur->next; + } + if (cur->next == NULL && memory_pool_free (cur) < size) { + /* Allocate new pool */ + if (cur->len >= size) { + new = pool_chain_new (cur->len); + } + else { + new = pool_chain_new (size + cur->len); + } + /* Attach new pool to chain */ + cur->next = new; + pool->cur_pool = new; + new->pos += size; + STAT_LOCK (); + bytes_allocated += size; + STAT_UNLOCK (); + return new->begin; + } + tmp = cur->pos; + cur->pos += size; + STAT_LOCK (); + bytes_allocated += size; + STAT_UNLOCK (); + return tmp; + } + return NULL; +} + +void * +memory_pool_alloc0 (memory_pool_t *pool, size_t size) +{ + void *pointer = memory_pool_alloc (pool, size); + if (pointer) { + bzero (pointer, size); + } + return pointer; +} + +char * +memory_pool_strdup (memory_pool_t *pool, const char *src) +{ + size_t len; + char *newstr; + + if (src == NULL) { + return NULL; + } + + len = strlen (src); + newstr = memory_pool_alloc (pool, len + 1); + memcpy (newstr, src, len + 1); + return newstr; +} + +void * +memory_pool_alloc_shared (memory_pool_t *pool, size_t size) +{ + u_char *tmp; + struct _pool_chain_shared *new, *cur; + + if (pool) { + cur = pool->shared_pool; + if (!cur) { + cur = pool_chain_new_shared (pool->first_pool->len); + pool->shared_pool = cur; + } + + /* Find free space in pool chain */ + while (memory_pool_free (cur) < size && cur->next) { + cur = cur->next; + } + if (cur->next == NULL && memory_pool_free (cur) < size) { + /* Allocate new pool */ + if (cur->len >= size) { + new = pool_chain_new_shared (cur->len); + } + else { + new = pool_chain_new_shared (size + cur->len); + } + /* Attach new pool to chain */ + cur->next = new; + new->pos += size; + STAT_LOCK (); + bytes_allocated += size; + STAT_UNLOCK (); + return new->begin; + } + tmp = cur->pos; + cur->pos += size; + STAT_LOCK (); + bytes_allocated += size; + STAT_UNLOCK (); + return tmp; + } + return NULL; +} + +/* Find pool for a pointer, returns NULL if pointer is not in pool */ +static struct _pool_chain_shared * +memory_pool_find_pool (memory_pool_t *pool, void *pointer) +{ + struct _pool_chain_shared *cur = pool->shared_pool; + + while (cur) { + if ((u_char *)pointer >= cur->begin && (u_char *)pointer <= (cur->begin + cur->len)) { + return cur; + } + cur = cur->next; + } + + return NULL; +} + +static void +memory_pool_spin (struct _pool_chain_shared *chain) +{ + while (!g_atomic_int_compare_and_exchange (&chain->lock, 0, 1)) { + /* lock was aqquired */ +#ifdef HAVE_NANOSLEEP + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = MUTEX_SLEEP_TIME; + /* Spin */ + while (nanosleep (&ts, &ts) == -1 && errno == EINTR); +#endif +#ifdef HAVE_SCHED_YIELD + (void)sched_yield (); +#endif +#if !defined(HAVE_NANOSLEEP) && !defined(HAVE_SCHED_YIELD) +# error No methods to spin are defined +#endif + } +} + +/* Simple implementation of spinlock */ +void +memory_pool_lock_shared (memory_pool_t *pool, void *pointer) +{ + struct _pool_chain_shared *chain; + + chain = memory_pool_find_pool (pool, pointer); + if (chain == NULL) { + return; + } + + memory_pool_spin (chain); +} + +void memory_pool_unlock_shared (memory_pool_t *pool, void *pointer) +{ + struct _pool_chain_shared *chain; + + chain = memory_pool_find_pool (pool, pointer); + if (chain == NULL) { + return; + } + + (void)g_atomic_int_dec_and_test (&chain->lock); +} + +void +memory_pool_add_destructor (memory_pool_t *pool, pool_destruct_func func, void *data) +{ + struct _pool_destructors *cur; + + cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors)); + if (cur) { + cur->func = func; + cur->data = data; + cur->prev = pool->destructors; + pool->destructors = cur; + } +} + +void +memory_pool_delete (memory_pool_t *pool) +{ + struct _pool_chain *cur = pool->first_pool, *tmp; + struct _pool_chain_shared *cur_shared = pool->shared_pool, *tmp_shared; + struct _pool_destructors *destructor = pool->destructors; + + /* Call all pool destructors */ + while (destructor) { + destructor->func (destructor->data); + destructor = destructor->prev; + } + + while (cur) { + tmp = cur; + cur = cur->next; + g_free (tmp->begin); + g_free (tmp); + STAT_LOCK (); + chunks_freed ++; + STAT_UNLOCK (); + } + /* Unmap shared memory */ + while (cur_shared) { + tmp_shared = cur_shared; + cur_shared = cur_shared->next; + munmap (tmp_shared, tmp_shared->len + sizeof (struct _pool_chain_shared)); + STAT_LOCK (); + chunks_freed ++; + STAT_UNLOCK (); + } + + g_free (pool); +} + +void +memory_pool_stat (memory_pool_stat_t *st) +{ + st->bytes_allocated = bytes_allocated; + st->chunks_allocated = chunks_allocated; + st->shared_chunks_allocated = shared_chunks_allocated; + st->chunks_freed = chunks_freed; +} + +#define FIXED_POOL_SIZE 4095 +size_t +memory_pool_get_size () +{ +#ifdef HAVE_GETPAGESIZE + return getpagesize () - 1; +#else + return FIXED_POOL_SIZE; +#endif +} + +/* + * vi:ts=4 + */ diff --git a/src/mem_pool.h b/src/mem_pool.h new file mode 100644 index 000000000..4027b5de2 --- /dev/null +++ b/src/mem_pool.h @@ -0,0 +1,61 @@ +#ifndef RSPAMD_MEM_POOL_H +#define RSPAMD_MEM_POOL_H + +#include +#include + +typedef void (*pool_destruct_func)(void *ptr); + +struct _pool_chain { + u_char *begin; + u_char *pos; + size_t len; + struct _pool_chain *next; +}; + +struct _pool_chain_shared { + u_char *begin; + u_char *pos; + size_t len; + gint lock; + struct _pool_chain_shared *next; +}; + +struct _pool_destructors { + pool_destruct_func func; + void *data; + struct _pool_destructors *prev; +}; + +typedef struct memory_pool_s { + struct _pool_chain *cur_pool; + struct _pool_chain *first_pool; + struct _pool_chain_shared *shared_pool; + struct _pool_destructors *destructors; +} memory_pool_t; + +typedef struct memory_pool_stat_s { + size_t bytes_allocated; + size_t chunks_allocated; + size_t shared_chunks_allocated; + size_t chunks_freed; +} memory_pool_stat_t; + +memory_pool_t* memory_pool_new (size_t size); +void* memory_pool_alloc (memory_pool_t* pool, size_t size); +void* memory_pool_alloc0 (memory_pool_t* pool, size_t size); +char* memory_pool_strdup (memory_pool_t* pool, const char *src); +void memory_pool_add_destructor (memory_pool_t *pool, pool_destruct_func func, void *data); +void* memory_pool_alloc_shared (memory_pool_t *pool, size_t size); +void memory_pool_lock_shared (memory_pool_t *pool, void *pointer); +void memory_pool_unlock_shared (memory_pool_t *pool, void *pointer); +void memory_pool_delete (memory_pool_t* pool); + +void memory_pool_stat (memory_pool_stat_t *st); + +/* Get optimal pool size based on page size for this system */ +size_t memory_pool_get_size (); + +#define memory_pool_free(x) ((x)->len - ((x)->pos - (x)->begin)) + +#endif diff --git a/src/memcached-test.c b/src/memcached-test.c new file mode 100644 index 000000000..c258673bd --- /dev/null +++ b/src/memcached-test.c @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "upstream.h" +#include "memcached.h" + +#define HOST "127.0.0.1" +#define PORT 11211 + +memcached_param_t cur_param; + +static void +test_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + int s; + int r; + int *num = ((int *)data); + printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error)); + /* Connect */ + if (*num == 0) { + printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf); + s = 1; + r = memc_set (ctx, &cur_param, &s, 60); + (*num)++; + } + else if (*num == 1) { + printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf); + s = 1; + r = memc_get (ctx, &cur_param, &s); + (*num)++; + } + else { + printf ("Got value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf); + event_loopexit (NULL); + } +} + + +int +main (int argc, char **argv) +{ + memcached_ctx_t mctx; + char *addr, buf[512]; + int num = 0; + + event_init (); + strcpy (cur_param.key, "testkey"); + strcpy (buf, "test_value"); + cur_param.buf = buf; + cur_param.bufsize = sizeof ("test_value") - 1; + + if (argc == 2) { + addr = argv[1]; + } + else { + addr = HOST; + } + + mctx.protocol = TCP_TEXT; + mctx.timeout.tv_sec = 1; + mctx.timeout.tv_usec = 0; + mctx.port = htons (PORT); + mctx.options = MEMC_OPT_DEBUG; + mctx.callback = test_memc_callback; + /* XXX: it is wrong to use local variable pointer here */ + mctx.callback_data = (void *)# + inet_aton (addr, &mctx.addr); + + memc_init_ctx (&mctx); + + event_loop (0); + return 0; +} diff --git a/src/memcached.c b/src/memcached.c new file mode 100644 index 000000000..05ae16617 --- /dev/null +++ b/src/memcached.c @@ -0,0 +1,792 @@ +#ifdef _THREAD_SAFE +#include +#endif + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "memcached.h" + +#define CRLF "\r\n" +#define END_TRAILER "END" CRLF +#define STORED_TRAILER "STORED" CRLF +#define NOT_STORED_TRAILER "NOT STORED" CRLF +#define EXISTS_TRAILER "EXISTS" CRLF +#define DELETED_TRAILER "DELETED" CRLF +#define NOT_FOUND_TRAILER "NOT_FOUND" CRLF +#define CLIENT_ERROR_TRAILER "CLIENT_ERROR" +#define SERVER_ERROR_TRAILER "SERVER_ERROR" + +#define READ_BUFSIZ 1500 +#define MAX_RETRIES 3 + +/* Header for udp protocol */ +struct memc_udp_header +{ + uint16_t req_id; + uint16_t seq_num; + uint16_t dg_sent; + uint16_t unused; +}; + +static void socket_callback (int fd, short what, void *arg); +static int memc_parse_header (char *buf, size_t *len, char **end); + +/* + * Write to syslog if OPT_DEBUG is specified + */ +static void +memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) +{ + va_list args; + if (ctx->options & MEMC_OPT_DEBUG) { + va_start (args, fmt); + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); + g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args); + va_end (args); + } +} + +/* + * Callback for write command + */ +static void +write_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[4]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + + r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize); + memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + if (ctx->param->bufpos == 0) { + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + } + else { + iov[1].iov_base = NULL; + iov[1].iov_len = 0; + } + iov[2].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[3].iov_base = CRLF; + iov[3].iov_len = sizeof (CRLF) - 1; + writev (ctx->sock, iov, 4); + } + else { + iov[0].iov_base = read_buf; + iov[0].iov_len = r; + iov[1].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[2].iov_base = CRLF; + iov[2].iov_len = sizeof (CRLF) - 1; + writev (ctx->sock, iov, 3); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf); + /* Increment count */ + ctx->count++; + event_del (&ctx->mem_ev); + if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data); + } + else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { + ctx->callback (ctx, EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + } + else if (what == EV_TIMEOUT) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } +} + +/* + * Callback for read command + */ +static void +read_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + char *p; + ssize_t r; + size_t datalen; + struct memc_udp_header header; + struct iovec iov[2]; + int retries = 0, t; + + if (what == EV_WRITE) { + /* Send command to memcached */ + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + + r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key); + memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); + retries++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + + if (r > 0) { + read_buf[r] = 0; + if (ctx->param->bufpos == 0) { + t = memc_parse_header (read_buf, &datalen, &p); + if (t < 0) { + event_del (&ctx->mem_ev); + memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + else if (t == 0) { + memc_log (ctx, __LINE__, "memc_read: record does not exists"); + event_del (&ctx->mem_ev); + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + return; + } + + if (datalen > ctx->param->bufsize) { + memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen); + event_del (&ctx->mem_ev); + ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data); + return; + } + /* Check if we already have all data in buffer */ + if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { + /* Store all data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen); + /* Increment count */ + ctx->count++; + event_del (&ctx->mem_ev); + ctx->callback (ctx, OK, ctx->callback_data); + return; + } + /* Subtract from sum parsed header's length */ + r -= p - read_buf; + } + else { + p = read_buf; + } + + if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, + END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { + r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2; + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + event_del (&ctx->mem_ev); + ctx->callback (ctx, OK, ctx->callback_data); + return; + } + /* Store this part of data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + ctx->param->bufpos += r; + } + else { + memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + + ctx->count++; + } + else if (what == EV_TIMEOUT) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } + +} + +/* + * Callback for delete command + */ +static void +delete_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[2]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key); + memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + ctx->param->bufpos = writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + /* Increment count */ + ctx->count++; + event_del (&ctx->mem_ev); + if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + } + else if (what == EV_TIMEOUT) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } +} + +/* + * Callback for our socket events + */ +static void +socket_callback (int fd, short what, void *arg) +{ + memcached_ctx_t *ctx = (memcached_ctx_t *)arg; + + switch (ctx->op) { + case CMD_NULL: + /* Do nothing here */ + break; + case CMD_CONNECT: + /* We have write readiness after connect call, so reinit event */ + ctx->cmd = "connect"; + if (what == EV_WRITE) { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); + ctx->callback (ctx, OK, ctx->callback_data); + ctx->alive = 1; + } + else { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + ctx->alive = 0; + } + break; + case CMD_WRITE: + write_handler (fd, what, ctx); + break; + case CMD_READ: + read_handler (fd, what, ctx); + break; + case CMD_DELETE: + delete_handler (fd, what, ctx); + break; + } +} + +/* + * Common callback function for memcached operations if no user's callback is specified + */ +static void +common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error)); +} + +/* + * Make socket for udp connection + */ +static int +memc_make_udp_sock (memcached_ctx_t *ctx) +{ + struct sockaddr_in sc; + int ofl; + + bzero (&sc, sizeof (struct sockaddr_in *)); + sc.sin_family = AF_INET; + sc.sin_port = ctx->port; + memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr)); + + ctx->sock = socket (PF_INET, SOCK_DGRAM, 0); + + if (ctx->sock == -1) { + memc_log (ctx, __LINE__, "memc_make_udp_sock: socket() failed: %m"); + return -1; + } + + /* set nonblocking */ + ofl = fcntl(ctx->sock, F_GETFL, 0); + fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK); + + /* + * Call connect to set default destination for datagrams + * May not block + */ + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); + return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in)); +} + +/* + * Make socket for tcp connection + */ +static int +memc_make_tcp_sock (memcached_ctx_t *ctx) +{ + struct sockaddr_in sc; + int ofl, r; + + bzero (&sc, sizeof (struct sockaddr_in *)); + sc.sin_family = AF_INET; + sc.sin_port = ctx->port; + memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr)); + + ctx->sock = socket (PF_INET, SOCK_STREAM, 0); + + if (ctx->sock == -1) { + memc_log (ctx, __LINE__, "memc_make_tcp_sock: socket() failed: %m"); + return -1; + } + + /* set nonblocking */ + ofl = fcntl(ctx->sock, F_GETFL, 0); + fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK); + + if ((r = connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) { + if (errno != EINPROGRESS) { + close (ctx->sock); + ctx->sock = -1; + memc_log (ctx, __LINE__, "memc_make_tcp_sock: connect() failed: %m"); + return -1; + } + } + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + return 0; +} + +/* + * Parse VALUE reply from server and set len argument to value returned by memcached + */ +static int +memc_parse_header (char *buf, size_t *len, char **end) +{ + char *p, *c; + int i; + + /* VALUE []\r\n */ + c = strstr (buf, CRLF); + if (c == NULL) { + return -1; + } + *end = c + sizeof (CRLF) - 1; + + if (strncmp (buf, "VALUE ", sizeof ("VALUE ") - 1) == 0) { + p = buf + sizeof ("VALUE ") - 1; + + /* Read bytes value and ignore all other fields, such as flags and key */ + for (i = 0; i < 2; i++) { + while (p++ < c && *p != ' '); + + if (p > c) { + return -1; + } + } + *len = strtoul (p, &c, 10); + return 1; + } + /* If value not found memcached return just END\r\n , in this case return 0 */ + else if (strncmp (buf, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { + return 0; + } + + return -1; +} + + +/* + * Common read command handler for memcached + */ +memc_error_t +memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param) +{ + ctx->cmd = cmd; + ctx->op = CMD_READ; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + + return OK; +} + +/* + * Common write command handler for memcached + */ +memc_error_t +memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire) +{ + ctx->cmd = cmd; + ctx->op = CMD_WRITE; + ctx->param = param; + param->expire = expire; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + + return OK; +} +/* + * Delete command handler + */ +memc_error_t +memc_delete (memcached_ctx_t *ctx, memcached_param_t *param) +{ + ctx->cmd = "delete"; + ctx->op = CMD_DELETE; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + + return OK; +} + +/* + * Write handler for memcached mirroring + * writing is done to each memcached server + */ +memc_error_t +memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire) +{ + memc_error_t r, result = OK; + + while (memcached_num --) { + if (ctx[memcached_num].alive == 1) { + r = memc_write (&ctx[memcached_num], cmd, param, expire); + if (r != OK) { + memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r)); + result = r; + ctx[memcached_num].alive = 0; + } + } + } + + return result; +} + +/* + * Read handler for memcached mirroring + * reading is done from first active memcached server + */ +memc_error_t +memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param) +{ + memc_error_t r, result = OK; + + while (memcached_num --) { + if (ctx[memcached_num].alive == 1) { + r = memc_read (&ctx[memcached_num], cmd, param); + if (r != OK) { + result = r; + if (r != NOT_EXISTS) { + ctx[memcached_num].alive = 0; + memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: cannot write read from mirror server: %s", memc_strerror (r)); + } + else { + memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: record not exists", memc_strerror (r)); + } + } + else { + break; + } + } + } + + return result; +} + +/* + * Delete handler for memcached mirroring + * deleting is done for each active memcached server + */ +memc_error_t +memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param) +{ + memc_error_t r, result = OK; + + while (memcached_num --) { + if (ctx[memcached_num].alive == 1) { + r = memc_delete (&ctx[memcached_num], param); + if (r != OK) { + result = r; + if (r != NOT_EXISTS) { + ctx[memcached_num].alive = 0; + memc_log (&ctx[memcached_num], __LINE__, "memc_delete_mirror: cannot delete from mirror server: %s", memc_strerror (r)); + } + } + } + } + + return result; +} + + +/* + * Initialize memcached context for specified protocol + */ +int +memc_init_ctx (memcached_ctx_t *ctx) +{ + if (ctx == NULL) { + return -1; + } + + ctx->count = 0; + ctx->alive = 0; + ctx->op = CMD_NULL; + /* Set default callback */ + if (ctx->callback == NULL) { + ctx->callback = common_memc_callback; + } + + switch (ctx->protocol) { + case UDP_TEXT: + return memc_make_udp_sock (ctx); + break; + case TCP_TEXT: + return memc_make_tcp_sock (ctx); + break; + /* Not implemented */ + case UDP_BIN: + case TCP_BIN: + default: + return -1; + } +} +/* + * Mirror init + */ +int +memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num) +{ + int r, result = -1; + while (memcached_num--) { + if (ctx[memcached_num].alive == 1) { + r = memc_init_ctx (&ctx[memcached_num]); + if (r == -1) { + ctx[memcached_num].alive = 0; + memc_log (&ctx[memcached_num], __LINE__, "memc_init_ctx_mirror: cannot connect to server"); + } + else { + result = 1; + } + } + } + + return result; +} + +/* + * Close context connection + */ +int +memc_close_ctx (memcached_ctx_t *ctx) +{ + if (ctx != NULL && ctx->sock != -1) { + event_del (&ctx->mem_ev); + return close (ctx->sock); + } + + return -1; +} +/* + * Mirror close + */ +int +memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num) +{ + int r = 0; + while (memcached_num--) { + if (ctx[memcached_num].alive == 1) { + r = memc_close_ctx (&ctx[memcached_num]); + if (r == -1) { + memc_log (&ctx[memcached_num], __LINE__, "memc_close_ctx_mirror: cannot close connection to server properly"); + ctx[memcached_num].alive = 0; + } + } + } + + return r; +} + + +const char * memc_strerror (memc_error_t err) +{ + const char *p; + + switch (err) { + case OK: + p = "Ok"; + break; + case BAD_COMMAND: + p = "Bad command"; + break; + case CLIENT_ERROR: + p = "Client error"; + break; + case SERVER_ERROR: + p = "Server error"; + break; + case SERVER_TIMEOUT: + p = "Server timeout"; + break; + case NOT_EXISTS: + p = "Key not found"; + break; + case EXISTS: + p = "Key already exists"; + break; + case WRONG_LENGTH: + p = "Wrong result length"; + break; + default: + p = "Unknown error"; + break; + } + + return p; +} + +/* + * vi:ts=4 + */ diff --git a/src/memcached.h b/src/memcached.h new file mode 100644 index 000000000..46bcae465 --- /dev/null +++ b/src/memcached.h @@ -0,0 +1,142 @@ +#ifndef MEMCACHED_H +#define MEMCACHED_H + +#include +#include +#include +#include + +#define MAXKEYLEN 250 + +#define MEMC_OPT_DEBUG 0x1 + +struct event; + +typedef enum memc_error { + OK, + BAD_COMMAND, + CLIENT_ERROR, + SERVER_ERROR, + SERVER_TIMEOUT, + NOT_EXISTS, + EXISTS, + WRONG_LENGTH +} memc_error_t; + +/* XXX: Only UDP_TEXT is supported at present */ +typedef enum memc_proto { + UDP_TEXT, + TCP_TEXT, + UDP_BIN, + TCP_BIN +} memc_proto_t; + +typedef enum memc_op { + CMD_NULL, + CMD_CONNECT, + CMD_READ, + CMD_WRITE, + CMD_DELETE, +} memc_opt_t; + +typedef struct memcached_param_s { + char key[MAXKEYLEN]; + u_char *buf; + size_t bufsize; + size_t bufpos; + int expire; +} memcached_param_t; + + +/* Port must be in network byte order */ +typedef struct memcached_ctx_s { + memc_proto_t protocol; + struct in_addr addr; + uint16_t port; + int sock; + struct timeval timeout; + /* Counter that is used for memcached operations in network byte order */ + uint16_t count; + /* Flag that signalize that this memcached is alive */ + short alive; + /* Options that can be specified for memcached connection */ + short options; + /* Current operation */ + memc_opt_t op; + /* Current command */ + const char *cmd; + /* Current param */ + memcached_param_t *param; + /* Callback for current operation */ + void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data); + /* Data for callback function */ + void *callback_data; + /* Event structure */ + struct event mem_ev; +} memcached_ctx_t; + +typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data); + +/* + * Initialize connection to memcached server: + * addr, port and timeout fields in ctx must be filled with valid values + * Return: + * 0 - success + * -1 - error (error is stored in errno) + */ +int memc_init_ctx (memcached_ctx_t *ctx); +int memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num); +/* + * Memcached function for getting, setting, adding values to memcached server + * ctx - valid memcached context + * key - key to extract (max 250 characters as it specified in memcached API) + * buf, elemsize, nelem - allocated buffer of length nelem structures each of elemsize + * that would contain extracted data (NOT NULL TERMINATED) + * Return: + * memc_error_t + * nelem is changed according to actual number of extracted data + * + * "set" means "store this data". + * + * "add" means "store this data, but only if the server *doesn't* already + * hold data for this key". + + * "replace" means "store this data, but only if the server *does* + * already hold data for this key". + + * "append" means "add this data to an existing key after existing data". + + * "prepend" means "add this data to an existing key before existing data". + */ +#define memc_get(ctx, param) memc_read(ctx, "get", param) +#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire) +#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire) +#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire) +#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire) +#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire) + +/* Functions that works with mirror of memcached servers */ +#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param) +#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire) +#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire) +#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire) +#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire) +#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire) + + +memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param); +memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire); +memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params); + +memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire); +memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param); +memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param); + +/* Return symbolic name of memcached error*/ +const char * memc_strerror (memc_error_t err); + +/* Destroy socket from ctx */ +int memc_close_ctx (memcached_ctx_t *ctx); +int memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num); + +#endif diff --git a/src/perl.c b/src/perl.c new file mode 100644 index 000000000..870283835 --- /dev/null +++ b/src/perl.c @@ -0,0 +1,190 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include /* from the Perl distribution */ +#include /* from the Perl distribution */ + +#include "url.h" +#include "main.h" +#include "perl.h" + +extern PerlInterpreter *my_perl; + +int +perl_call_header_filter (const char *function, struct worker_task *task) +{ + int result; + dSP; + + ENTER; + SAVETMPS; + + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); + PUTBACK; + + call_pv (function, G_SCALAR); + + SPAGAIN; + + result = POPi; + msg_debug ("header_filter: call of %s with returned mark %d\n", function, result); + + PUTBACK; + FREETMPS; + LEAVE; + + return result; +} + +int +perl_call_mime_filter (const char *function, struct worker_task *task) +{ + int result; + dSP; + + ENTER; + SAVETMPS; + + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); + PUTBACK; + + call_pv (function, G_SCALAR); + + SPAGAIN; + + result = POPi; + msg_debug ("mime_filter: call of %s returned mark %d\n", function, result); + + PUTBACK; + FREETMPS; + LEAVE; + + return result; +} + +int +perl_call_message_filter (const char *function, struct worker_task *task) +{ + int result; + dSP; + + ENTER; + SAVETMPS; + + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); + PUTBACK; + + call_pv (function, G_SCALAR); + + SPAGAIN; + + result = POPi; + msg_debug ("message_filter: call of %s returned mark %d\n", function, result); + + PUTBACK; + FREETMPS; + LEAVE; + + return result; +} + +int +perl_call_url_filter (const char *function, struct worker_task *task) +{ + int result; + dSP; + + ENTER; + SAVETMPS; + + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); + PUTBACK; + + call_pv (function, G_SCALAR); + + SPAGAIN; + + result = POPi; + msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result); + + PUTBACK; + FREETMPS; + LEAVE; + + return result; +} + +int +perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number) +{ + int result, i; + AV *av; + + dSP; + + ENTER; + SAVETMPS; + av = newAV(); + av_extend (av, number); + for (i = 0; i < number; i ++) { + av_push (av, sv_2mortal (newSViv (marks[i]))); + } + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); + XPUSHs (sv_2mortal ((SV *)AvARRAY (av))); + PUTBACK; + + call_pv (function, G_SCALAR); + + SPAGAIN; + + result = POPi; + msg_debug ("chain_filter: call of %s returned mark %d\n", function, result); + + PUTBACK; + FREETMPS; + av_undef (av); + LEAVE; + + + return result; +} + +void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + struct { + SV *callback; + struct worker_task *task; + } *callback_data = data; + + dSP; + + ENTER; + SAVETMPS; + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (callback_data->task)))); + XPUSHs (sv_2mortal (newSViv (error))); + XPUSHs (sv_2mortal (newSVpv (ctx->param->buf, ctx->param->bufsize))); + PUTBACK; + + call_sv (callback_data->callback, G_SCALAR); + + /* Set save point */ + callback_data->task->save.saved = 0; + process_filters (callback_data->task); + + SPAGAIN; + FREETMPS; + LEAVE; + +} diff --git a/src/perl.h b/src/perl.h new file mode 100644 index 000000000..9a37634e3 --- /dev/null +++ b/src/perl.h @@ -0,0 +1,19 @@ +#ifndef RSPAM_PERL_H +#define RSPAM_PERL_H + +#include +#include +#include "memcached.h" + +struct uri; +struct worker_task; + +int perl_call_header_filter (const char *function, struct worker_task *task); +int perl_call_mime_filter (const char *function, struct worker_task *task); +int perl_call_message_filter (const char *function, struct worker_task *task); +int perl_call_url_filter (const char *function, struct worker_task *task); +int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number); + +void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data); + +#endif diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c new file mode 100644 index 000000000..f82543a7e --- /dev/null +++ b/src/plugins/regexp.c @@ -0,0 +1,247 @@ +/***MODULE:regexp + * rspamd module that implements different regexp rules + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "../config.h" +#include "../main.h" +#include "../modules.h" +#include "../cfg_file.h" + +struct regexp_module_item { + struct expression *expr; + int regexp_number; + int op_number; + char *symbol; +}; + +struct regexp_ctx { + int (*header_filter)(struct worker_task *task); + int (*mime_filter)(struct worker_task *task); + int (*message_filter)(struct worker_task *task); + int (*url_filter)(struct worker_task *task); + GList *items; + char *metric; + + memory_pool_t *regexp_pool; +}; + +static struct regexp_ctx *regexp_module_ctx = NULL; + +static int regexp_common_filter (struct worker_task *task); + +int +regexp_module_init (struct config_file *cfg, struct module_ctx **ctx) +{ + regexp_module_ctx = g_malloc (sizeof (struct regexp_ctx)); + + regexp_module_ctx->header_filter = regexp_common_filter; + regexp_module_ctx->mime_filter = NULL; + regexp_module_ctx->message_filter = NULL; + regexp_module_ctx->url_filter = NULL; + regexp_module_ctx->regexp_pool = memory_pool_new (1024); + regexp_module_ctx->items = NULL; + + return 0; +} + +static void +read_regexp_expression (memory_pool_t *pool, struct regexp_module_item *chain, char *line) +{ + struct expression *e, *cur; + + e = parse_expression (regexp_module_ctx->regexp_pool, line); + chain->expr = e; + cur = e; + while (cur) { + if (cur->type == EXPR_OPERAND) { + cur->content.operand = parse_regexp (pool, cur->content.operand); + chain->regexp_number ++; + } + else { + chain->op_number ++; + } + cur = cur->next; + } +} + +int +regexp_module_config (struct config_file *cfg) +{ + LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL; + struct module_opt *cur; + struct regexp_module_item *cur_item; + char *value; + + if ((value = get_module_opt (cfg, "regexp", "metric")) != NULL) { + regexp_module_ctx->metric = memory_pool_strdup (regexp_module_ctx->regexp_pool, value); + g_free (value); + } + else { + regexp_module_ctx->metric = DEFAULT_METRIC; + } + + cur_module_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); + if (cur_module_opt != NULL) { + LIST_FOREACH (cur, cur_module_opt, next) { + if (strcmp (cur->param, "metric") == 0) { + continue; + } + cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item)); + cur_item->symbol = cur->param; + read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->value); + regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item); + } + } + + return 0; +} + +int +regexp_module_reconfig (struct config_file *cfg) +{ + memory_pool_delete (regexp_module_ctx->regexp_pool); + regexp_module_ctx->regexp_pool = memory_pool_new (1024); + + return regexp_module_config (cfg); +} + +static gsize +process_regexp (struct rspamd_regexp *re, struct worker_task *task) +{ + char *headerv; + struct mime_part *part; + struct uri *url; + + switch (re->type) { + case REGEXP_NONE: + return 0; + case REGEXP_HEADER: + if (re->header == NULL) { + msg_info ("process_regexp: header regexp without header name"); + return 0; + } + msg_debug ("process_regexp: checking header regexp: %s = /%s/", re->header, re->regexp_text); + headerv = (char *)g_mime_message_get_header (task->message, re->header); + if (headerv == NULL) { + return 0; + } + else { + if (re->regexp == NULL) { + msg_debug ("process_regexp: regexp contains only header and it is found %s", re->header); + return 1; + } + if (g_regex_match (re->regexp, headerv, 0, NULL) == TRUE) { + return 1; + } + else { + return 0; + } + } + break; + case REGEXP_MIME: + msg_debug ("process_regexp: checking mime regexp: /%s/", re->regexp_text); + TAILQ_FOREACH (part, &task->parts, next) { + if (g_regex_match_full (re->regexp, part->content->data, part->content->len, 0, 0, NULL, NULL) == TRUE) { + return 1; + } + } + return 0; + case REGEXP_MESSAGE: + msg_debug ("process_message: checking mime regexp: /%s/", re->regexp_text); + if (g_regex_match_full (re->regexp, task->msg->buf->begin, task->msg->buf->len, 0, 0, NULL, NULL) == TRUE) { + return 1; + } + return 0; + case REGEXP_URL: + msg_debug ("process_url: checking mime regexp: /%s/", re->regexp_text); + TAILQ_FOREACH (url, &task->urls, next) { + if (g_regex_match (re->regexp, struri (url), 0, NULL) == TRUE) { + return 1; + } + } + return 0; + } + + /* Not reached */ + return 0; +} + +static void +process_regexp_item (struct regexp_module_item *item, struct worker_task *task) +{ + GQueue *stack; + gsize cur, op1, op2; + struct expression *it = item->expr; + + stack = g_queue_new (); + + while (it) { + if (it->type == EXPR_OPERAND) { + /* Find corresponding symbol */ + cur = process_regexp ((struct rspamd_regexp *)it->content.operand, task); + msg_debug ("process_regexp_item: regexp %s found", cur ? "is" : "is not"); + g_queue_push_head (stack, GSIZE_TO_POINTER (cur)); + } + else { + if (g_queue_is_empty (stack)) { + /* Queue has no operands for operation, exiting */ + g_queue_free (stack); + return; + } + switch (it->content.operation) { + case '!': + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + op1 = !op1; + g_queue_push_head (stack, GSIZE_TO_POINTER (op1)); + break; + case '&': + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + g_queue_push_head (stack, GSIZE_TO_POINTER (op1 && op2)); + case '|': + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + g_queue_push_head (stack, GSIZE_TO_POINTER (op1 || op2)); + default: + it = it->next; + continue; + } + } + it = it->next; + } + if (!g_queue_is_empty (stack)) { + op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack)); + if (op1) { + /* Add symbol to results */ + insert_result (task, regexp_module_ctx->metric, item->symbol, op1); + } + } + + g_queue_free (stack); +} + +static int +regexp_common_filter (struct worker_task *task) +{ + GList *cur_expr = g_list_first (regexp_module_ctx->items); + + while (cur_expr) { + process_regexp_item ((struct regexp_module_item *)cur_expr->data, task); + cur_expr = g_list_next (cur_expr); + } +} diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c new file mode 100644 index 000000000..c90a8419e --- /dev/null +++ b/src/plugins/surbl.c @@ -0,0 +1,593 @@ +/***MODULE:surbl + * rspamd module that implements SURBL url checking + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "../config.h" +#include "../main.h" +#include "../modules.h" +#include "../cfg_file.h" +#include "../memcached.h" + +#define DEFAULT_REDIRECTOR_PORT 8080 +#define DEFAULT_SURBL_WEIGHT 10 +#define DEFAULT_REDIRECTOR_CONNECT_TIMEOUT 1000 +#define DEFAULT_REDIRECTOR_READ_TIMEOUT 5000 +#define DEFAULT_SURBL_MAX_URLS 1000 +#define DEFAULT_SURBL_URL_EXPIRE 86400 +#define DEFAULT_SURBL_SYMBOL "SURBL_DNS" +#define DEFAULT_SURBL_SUFFIX "multi.surbl.org" + +struct surbl_ctx { + int (*header_filter)(struct worker_task *task); + int (*mime_filter)(struct worker_task *task); + int (*message_filter)(struct worker_task *task); + int (*url_filter)(struct worker_task *task); + struct in_addr redirector_addr; + uint16_t redirector_port; + uint16_t weight; + unsigned int connect_timeout; + unsigned int read_timeout; + unsigned int max_urls; + unsigned int url_expire; + char *suffix; + char *symbol; + char *metric; + GHashTable *hosters; + GHashTable *whitelist; + unsigned use_redirector; + memory_pool_t *surbl_pool; +}; + +struct redirector_param { + struct uri *url; + struct worker_task *task; + enum { + STATE_CONNECT, + STATE_READ, + } state; + struct event ev; + int sock; +}; + +struct memcached_param { + struct uri *url; + struct worker_task *task; + memcached_ctx_t *ctx; +}; + +static char *hash_fill = "1"; +struct surbl_ctx *surbl_module_ctx; +GRegex *extract_hoster_regexp, *extract_normal_regexp, *extract_numeric_regexp; + +static int surbl_test_url (struct worker_task *task); + +int +surbl_module_init (struct config_file *cfg, struct module_ctx **ctx) +{ + GError *err = NULL; + + surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx)); + + surbl_module_ctx->header_filter = NULL; + surbl_module_ctx->mime_filter = NULL; + surbl_module_ctx->message_filter = NULL; + surbl_module_ctx->url_filter = surbl_test_url; + surbl_module_ctx->use_redirector = 0; + surbl_module_ctx->surbl_pool = memory_pool_new (1024); + + surbl_module_ctx->hosters = g_hash_table_new (g_str_hash, g_str_equal); + /* Register destructors */ + memory_pool_add_destructor (surbl_module_ctx->surbl_pool, (pool_destruct_func)g_hash_table_remove_all, surbl_module_ctx->hosters); + + surbl_module_ctx->whitelist = g_hash_table_new (g_str_hash, g_str_equal); + /* Register destructors */ + memory_pool_add_destructor (surbl_module_ctx->surbl_pool, (pool_destruct_func)g_hash_table_remove_all, surbl_module_ctx->whitelist); + + /* Init matching regexps */ + extract_hoster_regexp = g_regex_new ("([^.]+)\\.([^.]+)\\.([^.]+)$", G_REGEX_RAW, 0, &err); + extract_normal_regexp = g_regex_new ("([^.]+)\\.([^.]+)$", G_REGEX_RAW, 0, &err); + extract_numeric_regexp = g_regex_new ("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})$", G_REGEX_RAW, 0, &err); + + *ctx = (struct module_ctx *)surbl_module_ctx; + + return 0; +} + +int +surbl_module_config (struct config_file *cfg) +{ + struct hostent *hent; + + char *value, *cur_tok, *str; + + evdns_init (); + + if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) { + str = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + cur_tok = strsep (&str, ":"); + if (!inet_aton (cur_tok, &surbl_module_ctx->redirector_addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent != NULL) { + memcpy((char *)&surbl_module_ctx->redirector_addr, hent->h_addr, sizeof(struct in_addr)); + if (str != NULL) { + surbl_module_ctx->redirector_port = (uint16_t)strtoul (str, NULL, 10); + } + else { + surbl_module_ctx->redirector_port = DEFAULT_REDIRECTOR_PORT; + } + surbl_module_ctx->use_redirector = 1; + } + } + /* Free cur_tok as it is actually initial str after strsep */ + free (cur_tok); + } + if ((value = get_module_opt (cfg, "surbl", "weight")) != NULL) { + surbl_module_ctx->weight = atoi (value); + } + else { + surbl_module_ctx->weight = DEFAULT_SURBL_WEIGHT; + } + if ((value = get_module_opt (cfg, "surbl", "url_expire")) != NULL) { + surbl_module_ctx->url_expire = atoi (value); + } + else { + surbl_module_ctx->url_expire = DEFAULT_SURBL_URL_EXPIRE; + } + if ((value = get_module_opt (cfg, "surbl", "redirector_connect_timeout")) != NULL) { + surbl_module_ctx->connect_timeout = parse_seconds (value); + } + else { + surbl_module_ctx->connect_timeout = DEFAULT_REDIRECTOR_CONNECT_TIMEOUT; + } + if ((value = get_module_opt (cfg, "surbl", "redirector_read_timeout")) != NULL) { + surbl_module_ctx->read_timeout = parse_seconds (value); + } + else { + surbl_module_ctx->read_timeout = DEFAULT_REDIRECTOR_READ_TIMEOUT; + } + if ((value = get_module_opt (cfg, "surbl", "max_urls")) != NULL) { + surbl_module_ctx->max_urls = atoi (value); + } + else { + surbl_module_ctx->max_urls = DEFAULT_SURBL_MAX_URLS; + } + if ((value = get_module_opt (cfg, "surbl", "suffix")) != NULL) { + surbl_module_ctx->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + g_free (value); + } + else { + surbl_module_ctx->suffix = DEFAULT_SURBL_SUFFIX; + } + if ((value = get_module_opt (cfg, "surbl", "symbol")) != NULL) { + surbl_module_ctx->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + g_free (value); + } + else { + surbl_module_ctx->symbol = DEFAULT_SURBL_SYMBOL; + } + if ((value = get_module_opt (cfg, "surbl", "metric")) != NULL) { + surbl_module_ctx->metric = memory_pool_strdup (surbl_module_ctx->surbl_pool, value); + g_free (value); + } + else { + surbl_module_ctx->metric = DEFAULT_METRIC; + } + if ((value = get_module_opt (cfg, "surbl", "hostings")) != NULL) { + char comment_flag = 0; + str = value; + while (*value ++) { + if (*value == '#') { + comment_flag = 1; + } + if (*value == '\r' || *value == '\n' || *value == ',') { + if (!comment_flag && str != value) { + g_hash_table_insert (surbl_module_ctx->hosters, g_strstrip(str), hash_fill); + str = value + 1; + } + else if (*value != ',') { + comment_flag = 0; + str = value + 1; + } + } + } + } + if ((value = get_module_opt (cfg, "surbl", "whitelist")) != NULL) { + char comment_flag = 0; + str = value; + while (*value ++) { + if (*value == '#') { + comment_flag = 1; + } + if (*value == '\r' || *value == '\n' || *value == ',') { + if (!comment_flag && str != value) { + g_hash_table_insert (surbl_module_ctx->whitelist, g_strstrip(str), hash_fill); + str = value + 1; + } + else if (*value != ',') { + comment_flag = 0; + str = value + 1; + } + } + } + } +} + +int +surbl_module_reconfig (struct config_file *cfg) +{ + memory_pool_delete (surbl_module_ctx->surbl_pool); + surbl_module_ctx->surbl_pool = memory_pool_new (1024); + + return surbl_module_config (cfg); +} + +static char * +format_surbl_request (char *hostname) +{ + GMatchInfo *info; + char *result; + + result = g_malloc (strlen (hostname) + strlen (surbl_module_ctx->suffix) + 1); + + /* First try to match numeric expression */ + if (g_regex_match (extract_numeric_regexp, hostname, 0, &info) == TRUE) { + gchar *octet1, *octet2, *octet3, *octet4; + octet1 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + octet2 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + octet3 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + octet4 = g_match_info_fetch (info, 0); + g_match_info_free (info); + sprintf (result, "%s.%s.%s.%s.%s", octet4, octet3, octet2, octet1, surbl_module_ctx->suffix); + g_free (octet1); + g_free (octet2); + g_free (octet3); + g_free (octet4); + return result; + } + g_match_info_free (info); + /* Try to match normal domain */ + if (g_regex_match (extract_normal_regexp, hostname, 0, &info) == TRUE) { + gchar *part1, *part2; + part1 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + part2 = g_match_info_fetch (info, 0); + g_match_info_free (info); + sprintf (result, "%s.%s", part1, part2); + if (g_hash_table_lookup (surbl_module_ctx->hosters, result) != NULL) { + /* Match additional part for hosters */ + g_free (part1); + g_free (part2); + if (g_regex_match (extract_hoster_regexp, hostname, 0, &info) == TRUE) { + gchar *hpart1, *hpart2, *hpart3; + hpart1 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + hpart2 = g_match_info_fetch (info, 0); + g_match_info_next (info, NULL); + hpart3 = g_match_info_fetch (info, 0); + g_match_info_free (info); + sprintf (result, "%s.%s.%s.%s", hpart1, hpart2, hpart3, surbl_module_ctx->suffix); + g_free (hpart1); + g_free (hpart2); + g_free (hpart3); + return result; + } + return NULL; + } + g_free (part1); + g_free (part2); + return result; + } + + return NULL; +} + +static void +dns_callback (int result, char type, int count, int ttl, void *addresses, void *data) +{ + struct memcached_param *param = (struct memcached_param *)data; + + /* If we have result from DNS server, this url exists in SURBL, so increase score */ + if (result != DNS_ERR_NONE || type != DNS_IPv4_A) { + msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix); + insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1); + } + else { + insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 0); + } + + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); +} + +static void +memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + struct memcached_param *param = (struct memcached_param *)data; + int *url_count; + char *surbl_req; + + switch (ctx->op) { + case CMD_CONNECT: + if (error != OK) { + msg_info ("memcached_callback: memcached returned error %s on CONNECT stage"); + memc_close_ctx (param->ctx); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); + } + else { + memc_get (param->ctx, param->ctx->param); + } + break; + case CMD_READ: + if (error != OK) { + msg_info ("memcached_callback: memcached returned error %s on READ stage"); + memc_close_ctx (param->ctx); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); + } + else { + url_count = (int *)param->ctx->param->buf; + /* Do not check DNS for urls that have count more than max_urls */ + if (*url_count > surbl_module_ctx->max_urls) { + msg_info ("memcached_callback: url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls); + insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1); + } + (*url_count) ++; + memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire); + } + break; + case CMD_WRITE: + if (error != OK) { + msg_info ("memcached_callback: memcached returned error %s on WRITE stage"); + } + memc_close_ctx (param->ctx); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + if ((surbl_req = format_surbl_request (param->url->host)) != NULL) { + param->task->save.saved ++; + evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param); + return; + } + g_free (param->ctx->param->buf); + g_free (param->ctx->param); + g_free (param->ctx); + g_free (param); + break; + } +} + +static void +register_memcached_call (struct uri *url, struct worker_task *task) +{ + struct memcached_param *param; + struct memcached_server *selected; + memcached_param_t *cur_param; + gchar *sum_str; + int *url_count; + + param = g_malloc (sizeof (struct memcached_param)); + cur_param = g_malloc (sizeof (memcached_param_t)); + url_count = g_malloc (sizeof (int)); + + param->url = url; + param->task = task; + + param->ctx = g_malloc (sizeof (memcached_ctx_t)); + bzero (param->ctx, sizeof (memcached_ctx_t)); + bzero (cur_param, sizeof (memcached_param_t)); + + cur_param->buf = (u_char *)url_count; + cur_param->bufsize = sizeof (int); + + sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1); + strlcpy (cur_param->key, sum_str, sizeof (cur_param->key)); + g_free (sum_str); + + selected = (struct memcached_server *) get_upstream_by_hash ((void *)task->cfg->memcached_servers, + task->cfg->memcached_servers_num, sizeof (struct memcached_server), + time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors, + cur_param->key, strlen(cur_param->key)); + param->ctx->callback = memcached_callback; + param->ctx->callback_data = (void *)param; + param->ctx->protocol = task->cfg->memcached_protocol; + memcpy(¶m->ctx->addr, &selected->addr, sizeof (struct in_addr)); + param->ctx->port = selected->port; + param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000; + param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000; + param->ctx->sock = -1; +#ifdef WITH_DEBUG + param->ctx->options = MEMC_OPT_DEBUG; +#else + param->ctx->options = 0; +#endif + param->ctx->param = cur_param; + memc_init_ctx (param->ctx); +} + +static void +redirector_callback (int fd, short what, void *arg) +{ + struct redirector_param *param = (struct redirector_param *)arg; + char url_buf[1024]; + int r; + struct timeval timeout; + char *p, *c; + + switch (param->state) { + case STATE_CONNECT: + /* We have write readiness after connect call, so reinit event */ + if (what == EV_WRITE) { + timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000; + timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000; + event_del (¶m->ev); + event_set (¶m->ev, param->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, redirector_callback, (void *)param); + event_add (¶m->ev, &timeout); + r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url)); + write (param->sock, url_buf, r); + param->state = STATE_READ; + } + else { + event_del (¶m->ev); + msg_info ("redirector_callback: connection to redirector timed out"); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param); + } + break; + case STATE_READ: + if (what == EV_READ) { + r = read (param->sock, url_buf, sizeof (url_buf)); + if ((p = strstr (url_buf, "Uri: ")) != NULL) { + p += sizeof ("Uri: ") - 1; + c = p; + while (p++ < url_buf + sizeof (url_buf) - 1) { + if (*p == '\r' || *p == '\n') { + *p = '\0'; + break; + } + } + if (*p == '\0') { + msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c); + parse_uri (param->url, c, param->task->task_pool); + register_memcached_call (param->url, param->task); + param->task->save.saved ++; + } + } + event_del (¶m->ev); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param); + } + else { + event_del (¶m->ev); + msg_info ("redirector_callback: reading redirector timed out"); + param->task->save.saved --; + if (param->task->save.saved == 0) { + /* Call other filters */ + param->task->save.saved = 1; + process_filters (param->task); + } + g_free (param); + } + break; + } +} + + +static void +register_redirector_call (struct uri *url, struct worker_task *task) +{ + struct sockaddr_in sc; + int ofl, r, s; + struct redirector_param *param; + struct timeval timeout; + + bzero (&sc, sizeof (struct sockaddr_in *)); + sc.sin_family = AF_INET; + sc.sin_port = surbl_module_ctx->redirector_port; + memcpy (&sc.sin_addr, &surbl_module_ctx->redirector_addr, sizeof (struct in_addr)); + + s = socket (PF_INET, SOCK_STREAM, 0); + + if (s == -1) { + msg_info ("register_redirector_call: socket() failed: %m"); + return; + } + + /* set nonblocking */ + ofl = fcntl(s, F_GETFL, 0); + fcntl(s, F_SETFL, ofl | O_NONBLOCK); + + if ((r = connect (s, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) { + if (errno != EINPROGRESS) { + close (s); + msg_info ("register_redirector_call: connect() failed: %m"); + } + } + param = g_malloc (sizeof (struct redirector_param)); + param->url = url; + param->task = task; + param->state = STATE_READ; + param->sock = s; + timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000; + timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000; + event_set (¶m->ev, s, EV_WRITE | EV_TIMEOUT, redirector_callback, (void *)param); + event_add (¶m->ev, &timeout); +} + +static int +surbl_test_url (struct worker_task *task) +{ + struct uri *url; + + TAILQ_FOREACH (url, &task->urls, next) { + if (surbl_module_ctx->use_redirector) { + register_redirector_call (url, task); + } + else { + register_memcached_call (url, task); + } + task->save.saved++; + } + return 0; +} + +/* + * vi:ts=4 + */ diff --git a/src/protocol.c b/src/protocol.c new file mode 100644 index 000000000..b259f6cd9 --- /dev/null +++ b/src/protocol.c @@ -0,0 +1,492 @@ +#include +#include +#include +#include +#include "main.h" + +#define CRLF "\r\n" +/* Max line size as it is defined in rfc2822 */ +#define OUTBUFSIZ 1000 +/* + * Just check if the passed message is spam or not and reply as + * described below + */ +#define MSG_CMD_CHECK "check" +/* + * Check if message is spam or not, and return score plus list + * of symbols hit + */ +#define MSG_CMD_SYMBOLS "symbols" +/* + * Check if message is spam or not, and return score plus report + */ +#define MSG_CMD_REPORT "report" +/* + * Check if message is spam or not, and return score plus report + * if the message is spam + */ +#define MSG_CMD_REPORT_IFSPAM "report_ifspam" +/* + * Ignore this message -- client opened connection then changed + */ +#define MSG_CMD_SKIP "skip" +/* + * Return a confirmation that spamd is alive + */ +#define MSG_CMD_PING "ping" +/* + * Process this message as described above and return modified message + */ +#define MSG_CMD_PROCESS "process" + +/* + * spamassassin greeting: + */ +#define SPAMC_GREETING "SPAMC" +/* + * rspamd greeting: + */ +#define RSPAMC_GREETING "RSPAMC" +/* + * Headers + */ +#define CONTENT_LENGTH_HEADER "Content-Length" +#define HELO_HEADER "Helo" +#define FROM_HEADER "From" +#define IP_ADDR_HEADER "IP" +#define NRCPT_HEADER "Recipient-Number" +#define RCPT_HEADER "Rcpt" +#define ERROR_HEADER "Error" +/* + * Reply messages + */ +#define RSPAMD_REPLY_BANNER "RSPAMD/1.0" +#define SPAMD_REPLY_BANNER "SPAMD/1.1" +#define SPAMD_OK "EX_OK" +/* XXX: try to convert rspamd errors to spamd errors */ +#define SPAMD_ERROR "EX_ERROR" + +static int +parse_command (struct worker_task *task, char *line) +{ + char *token; + + token = strsep (&line, " "); + if (line == NULL || token == NULL) { + msg_debug ("parse_command: bad comand: %s", token); + return -1; + } + + switch (token[0]) { + case 'c': + case 'C': + /* check */ + if (strcasecmp (token + 1, MSG_CMD_CHECK + 1) == 0) { + task->cmd = CMD_CHECK; + } + else { + msg_debug ("parse_command: bad comand: %s", token); + return -1; + } + break; + case 's': + case 'S': + /* symbols, skip */ + if (strcasecmp (token + 1, MSG_CMD_SYMBOLS + 1) == 0) { + task->cmd = CMD_SYMBOLS; + } + else if (strcasecmp (token + 1, MSG_CMD_SKIP + 1) == 0) { + task->cmd = CMD_SKIP; + } + else { + msg_debug ("parse_command: bad comand: %s", token); + return -1; + } + break; + case 'p': + case 'P': + /* ping, process */ + if (strcasecmp (token + 1, MSG_CMD_PING + 1) == 0) { + task->cmd = CMD_PING; + } + else if (strcasecmp (token + 1, MSG_CMD_PROCESS + 1) == 0) { + task->cmd = CMD_PROCESS; + } + else { + msg_debug ("parse_command: bad comand: %s", token); + return -1; + } + break; + case 'r': + case 'R': + /* report, report_ifspam */ + if (strcasecmp (token + 1, MSG_CMD_REPORT + 1) == 0) { + task->cmd = CMD_REPORT; + } + else if (strcasecmp (token + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) { + task->cmd = CMD_REPORT_IFSPAM; + } + else { + msg_debug ("parse_command: bad comand: %s", token); + return -1; + } + break; + default: + msg_debug ("parse_command: bad comand: %s", token); + return -1; + } + + if (strncasecmp (line, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) { + task->proto = RSPAMC_PROTO; + } + else if (strncasecmp (line, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) { + task->proto = SPAMC_PROTO; + } + else { + msg_debug ("parse_command: bad protocol version: %s", line); + return -1; + } + task->state = READ_HEADER; + return 0; +} + +static int +parse_header (struct worker_task *task, char *line) +{ + char *headern, *err, *tmp; + + /* Check end of headers */ + if (*line == '\0') { + if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) { + task->state = WRITE_REPLY; + } + else { + if (task->content_length > 0) { + task->state = READ_MESSAGE; + } + else { + task->last_error = "Unknown content length"; + task->error_code = RSPAMD_LENGTH_ERROR; + task->state = WRITE_ERROR; + } + } + return 0; + } + + headern = strsep (&line, ":"); + + if (line == NULL || headern == NULL) { + return -1; + } + /* Eat whitespaces */ + g_strstrip (line); + g_strstrip (headern); + + switch (headern[0]) { + case 'c': + case 'C': + /* content-length */ + if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) { + task->content_length = strtoul (line, &err, 10); + task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_buf_t)); + task->msg->buf = fstralloc (task->task_pool, task->content_length); + if (task->msg->buf == NULL) { + msg_err ("read_socket: cannot allocate memory for message buffer"); + return -1; + } + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; + case 'h': + case 'H': + /* helo */ + if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) { + task->helo = memory_pool_strdup (task->task_pool, line); + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; + case 'f': + case 'F': + /* from */ + if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) { + task->from = memory_pool_strdup (task->task_pool, line); + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; + case 'r': + case 'R': + /* rcpt */ + if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) { + tmp = memory_pool_strdup (task->task_pool, line); + task->rcpt = g_list_prepend (task->rcpt, tmp); + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; + case 'n': + case 'N': + /* nrcpt */ + if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) { + task->nrcpt = strtoul (line, &err, 10); + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; + case 'i': + case 'I': + /* ip_addr */ + if (strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) { + if (!inet_aton (line, &task->from_addr)) { + msg_info ("parse_header: bad ip header: '%s'", line); + return -1; + } + } + else { + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + break; + default: + msg_info ("parse_header: wrong header: %s", headern); + return -1; + } + + return 0; +} + +int +read_rspamd_input_line (struct worker_task *task, char *line) +{ + switch (task->state) { + case READ_COMMAND: + return parse_command (task, line); + break; + case READ_HEADER: + return parse_header (task, line); + break; + } +} + +static void +show_url_header (struct worker_task *task) +{ + int r = 0; + char outbuf[OUTBUFSIZ], c; + struct uri *url; + f_str_t host; + + r = snprintf (outbuf, sizeof (outbuf), "Urls: "); + TAILQ_FOREACH (url, &task->urls, next) { + host.begin = url->host; + host.len = url->hostlen; + /* Skip long hosts to avoid protocol coollisions */ + if (host.len > OUTBUFSIZ) { + continue; + } + /* Do header folding */ + if (host.len + r >= OUTBUFSIZ - 3) { + outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' '; + bufferevent_write (task->bev, outbuf, r); + r = 0; + } + /* Write url host to buf */ + if (TAILQ_NEXT (url, next) != NULL) { + c = *(host.begin + host.len); + *(host.begin + host.len) = '\0'; + r += snprintf (outbuf, sizeof (outbuf) - r, "%s, ", host.begin); + *(host.begin + host.len) = c; + } + else { + c = *(host.begin + host.len); + *(host.begin + host.len) = '\0'; + r += snprintf (outbuf, sizeof (outbuf) - r, "%s" CRLF, host.begin); + *(host.begin + host.len) = c; + } + } + bufferevent_write (task->bev, outbuf, r); +} + +static void +show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data) +{ + struct worker_task *task = (struct worker_task *)user_data; + int r; + char outbuf[OUTBUFSIZ]; + struct metric_result *metric_res = (struct metric_result *)metric_value; + int is_spam = 0; + + if (metric_res->score >= metric_res->metric->required_score) { + is_spam = 1; + } + if (task->proto == SPAMC_PROTO) { + r = snprintf (outbuf, sizeof (outbuf), "Spam: %s ; %.2f / %.2f" CRLF, + (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score); + } + else { + r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name, + (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score); + } + bufferevent_write (task->bev, outbuf, r); +} + +static int +write_check_reply (struct worker_task *task) +{ + int r; + char outbuf[OUTBUFSIZ]; + struct metric_result *metric_res; + + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); + bufferevent_write (task->bev, outbuf, r); + if (task->proto == SPAMC_PROTO) { + /* Ignore metrics, just write report for 'default' metric */ + metric_res = g_hash_table_lookup (task->results, "default"); + if (metric_res == NULL) { + return -1; + } + else { + show_metric_result ((gpointer)"default", (gpointer)metric_res, (void *)task); + } + } + else { + /* Write result for each metric separately */ + g_hash_table_foreach (task->results, show_metric_result, task); + /* URL stat */ + show_url_header (task); + } + bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1); + + return 0; +} + +static void +show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_data) +{ + struct worker_task *task = (struct worker_task *)user_data; + int r = 0; + char outbuf[OUTBUFSIZ]; + GList *symbols = NULL, *cur; + struct metric_result *metric_res = (struct metric_result *)metric_value; + + if (task->proto == RSPAMC_PROTO) { + r = snprintf (outbuf, sizeof (outbuf), "%s: ", (char *)metric_name); + } + + symbols = g_hash_table_get_keys (metric_res->symbols); + cur = symbols; + while (cur) { + if (g_list_next (cur) != NULL) { + r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s,", (char *)cur->data); + } + else { + r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s", (char *)cur->data); + } + cur = g_list_next (cur); + } + g_list_free (symbols); + outbuf[r++] = '\r'; outbuf[r] = '\n'; + bufferevent_write (task->bev, outbuf, r); +} + +static int +write_symbols_reply (struct worker_task *task) +{ + struct metric_result *metric_res; + + /* First of all write normal results by calling write_check_reply */ + if (write_check_reply (task) == -1) { + return -1; + } + /* Now write symbols */ + if (task->proto == SPAMC_PROTO) { + /* Ignore metrics, just write report for 'default' metric */ + metric_res = g_hash_table_lookup (task->results, "default"); + if (metric_res == NULL) { + return -1; + } + else { + show_metric_symbols ((gpointer)"default", (gpointer)metric_res, (void *)task); + } + } + else { + /* Write result for each metric separately */ + g_hash_table_foreach (task->results, show_metric_symbols, task); + } + return 0; +} + +static int +write_process_reply (struct worker_task *task) +{ + int r; + char outbuf[OUTBUFSIZ]; + + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF, + (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, task->msg->buf->len); + bufferevent_write (task->bev, outbuf, r); + bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len); + + return 0; +} + +int +write_reply (struct worker_task *task) +{ + int r; + char outbuf[OUTBUFSIZ]; + + msg_debug ("write_reply: writing reply to client"); + if (task->error_code != 0) { + /* Write error message and error code to reply */ + if (task->proto == SPAMC_PROTO) { + r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR); + msg_debug ("write_reply: writing error: %s", outbuf); + } + else { + r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code, + SPAMD_ERROR, ERROR_HEADER, task->last_error); + msg_debug ("write_reply: writing error: %s", outbuf); + } + /* Write to bufferevent error message */ + bufferevent_write (task->bev, outbuf, r); + } + else { + switch (task->cmd) { + case CMD_REPORT_IFSPAM: + case CMD_REPORT: + case CMD_CHECK: + return write_check_reply (task); + break; + case CMD_SYMBOLS: + return write_symbols_reply (task); + break; + case CMD_PROCESS: + return write_process_reply (task); + break; + case CMD_SKIP: + r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, + SPAMD_OK); + bufferevent_write (task->bev, outbuf, r); + break; + case CMD_PING: + r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER); + bufferevent_write (task->bev, outbuf, r); + break; + } + } + + return 0; +} diff --git a/src/protocol.h b/src/protocol.h new file mode 100644 index 000000000..6c750e91f --- /dev/null +++ b/src/protocol.h @@ -0,0 +1,31 @@ +#ifndef RSPAMD_PROTOCOL_H +#define RSPAMD_PROTOCOL_H + +#include "config.h" + +#define RSPAMD_FILTER_ERROR 1 +#define RSPAMD_NETWORK_ERROR 2 +#define RSPAMD_PROTOCOL_ERROR 3 +#define RSPAMD_LENGTH_ERROR 4 + +struct worker_task; + +enum rspamd_protocol { + SPAMC_PROTO, + RSPAMC_PROTO, +}; + +enum rspamd_command { + CMD_CHECK, + CMD_SYMBOLS, + CMD_REPORT, + CMD_REPORT_IFSPAM, + CMD_SKIP, + CMD_PING, + CMD_PROCESS, +}; + +int read_rspamd_input_line (struct worker_task *task, char *line); +int write_reply (struct worker_task *task); + +#endif diff --git a/src/upstream.c b/src/upstream.c new file mode 100644 index 000000000..87aab8535 --- /dev/null +++ b/src/upstream.c @@ -0,0 +1,521 @@ +#ifdef _THREAD_SAFE +#include +#endif + +#include +#include +#include +#include +#ifdef HAVE_STDINT_H +#include +#endif +#ifdef HAVE_INTTYPES_H +#include +#endif +#include +#ifdef WITH_DEBUG +#include +#endif +#include "upstream.h" + +#ifdef WITH_DEBUG +#define msg_debug(args...) syslog(LOG_DEBUG, ##args) +#else +#define msg_debug(args...) do {} while(0) +#endif + +#ifdef _THREAD_SAFE +pthread_rwlock_t upstream_mtx = PTHREAD_RWLOCK_INITIALIZER; +#define U_RLOCK() do { pthread_rwlock_rdlock (&upstream_mtx); } while (0) +#define U_WLOCK() do { pthread_rwlock_wrlock (&upstream_mtx); } while (0) +#define U_UNLOCK() do { pthread_rwlock_unlock (&upstream_mtx); } while (0) +#else +#define U_RLOCK() do {} while (0) +#define U_WLOCK() do {} while (0) +#define U_UNLOCK() do {} while (0) +#endif + +#define MAX_TRIES 20 + +/* + * Poly: 0xedb88320 + * Init: 0x0 + */ + +static const uint32_t crc32lookup[256] = { + 0x00000000U, 0x77073096U, 0xee0e612cU, 0x990951baU, 0x076dc419U, 0x706af48fU, + 0xe963a535U, 0x9e6495a3U, 0x0edb8832U, 0x79dcb8a4U, 0xe0d5e91eU, 0x97d2d988U, + 0x09b64c2bU, 0x7eb17cbdU, 0xe7b82d07U, 0x90bf1d91U, 0x1db71064U, 0x6ab020f2U, + 0xf3b97148U, 0x84be41deU, 0x1adad47dU, 0x6ddde4ebU, 0xf4d4b551U, 0x83d385c7U, + 0x136c9856U, 0x646ba8c0U, 0xfd62f97aU, 0x8a65c9ecU, 0x14015c4fU, 0x63066cd9U, + 0xfa0f3d63U, 0x8d080df5U, 0x3b6e20c8U, 0x4c69105eU, 0xd56041e4U, 0xa2677172U, + 0x3c03e4d1U, 0x4b04d447U, 0xd20d85fdU, 0xa50ab56bU, 0x35b5a8faU, 0x42b2986cU, + 0xdbbbc9d6U, 0xacbcf940U, 0x32d86ce3U, 0x45df5c75U, 0xdcd60dcfU, 0xabd13d59U, + 0x26d930acU, 0x51de003aU, 0xc8d75180U, 0xbfd06116U, 0x21b4f4b5U, 0x56b3c423U, + 0xcfba9599U, 0xb8bda50fU, 0x2802b89eU, 0x5f058808U, 0xc60cd9b2U, 0xb10be924U, + 0x2f6f7c87U, 0x58684c11U, 0xc1611dabU, 0xb6662d3dU, 0x76dc4190U, 0x01db7106U, + 0x98d220bcU, 0xefd5102aU, 0x71b18589U, 0x06b6b51fU, 0x9fbfe4a5U, 0xe8b8d433U, + 0x7807c9a2U, 0x0f00f934U, 0x9609a88eU, 0xe10e9818U, 0x7f6a0dbbU, 0x086d3d2dU, + 0x91646c97U, 0xe6635c01U, 0x6b6b51f4U, 0x1c6c6162U, 0x856530d8U, 0xf262004eU, + 0x6c0695edU, 0x1b01a57bU, 0x8208f4c1U, 0xf50fc457U, 0x65b0d9c6U, 0x12b7e950U, + 0x8bbeb8eaU, 0xfcb9887cU, 0x62dd1ddfU, 0x15da2d49U, 0x8cd37cf3U, 0xfbd44c65U, + 0x4db26158U, 0x3ab551ceU, 0xa3bc0074U, 0xd4bb30e2U, 0x4adfa541U, 0x3dd895d7U, + 0xa4d1c46dU, 0xd3d6f4fbU, 0x4369e96aU, 0x346ed9fcU, 0xad678846U, 0xda60b8d0U, + 0x44042d73U, 0x33031de5U, 0xaa0a4c5fU, 0xdd0d7cc9U, 0x5005713cU, 0x270241aaU, + 0xbe0b1010U, 0xc90c2086U, 0x5768b525U, 0x206f85b3U, 0xb966d409U, 0xce61e49fU, + 0x5edef90eU, 0x29d9c998U, 0xb0d09822U, 0xc7d7a8b4U, 0x59b33d17U, 0x2eb40d81U, + 0xb7bd5c3bU, 0xc0ba6cadU, 0xedb88320U, 0x9abfb3b6U, 0x03b6e20cU, 0x74b1d29aU, + 0xead54739U, 0x9dd277afU, 0x04db2615U, 0x73dc1683U, 0xe3630b12U, 0x94643b84U, + 0x0d6d6a3eU, 0x7a6a5aa8U, 0xe40ecf0bU, 0x9309ff9dU, 0x0a00ae27U, 0x7d079eb1U, + 0xf00f9344U, 0x8708a3d2U, 0x1e01f268U, 0x6906c2feU, 0xf762575dU, 0x806567cbU, + 0x196c3671U, 0x6e6b06e7U, 0xfed41b76U, 0x89d32be0U, 0x10da7a5aU, 0x67dd4accU, + 0xf9b9df6fU, 0x8ebeeff9U, 0x17b7be43U, 0x60b08ed5U, 0xd6d6a3e8U, 0xa1d1937eU, + 0x38d8c2c4U, 0x4fdff252U, 0xd1bb67f1U, 0xa6bc5767U, 0x3fb506ddU, 0x48b2364bU, + 0xd80d2bdaU, 0xaf0a1b4cU, 0x36034af6U, 0x41047a60U, 0xdf60efc3U, 0xa867df55U, + 0x316e8eefU, 0x4669be79U, 0xcb61b38cU, 0xbc66831aU, 0x256fd2a0U, 0x5268e236U, + 0xcc0c7795U, 0xbb0b4703U, 0x220216b9U, 0x5505262fU, 0xc5ba3bbeU, 0xb2bd0b28U, + 0x2bb45a92U, 0x5cb36a04U, 0xc2d7ffa7U, 0xb5d0cf31U, 0x2cd99e8bU, 0x5bdeae1dU, + 0x9b64c2b0U, 0xec63f226U, 0x756aa39cU, 0x026d930aU, 0x9c0906a9U, 0xeb0e363fU, + 0x72076785U, 0x05005713U, 0x95bf4a82U, 0xe2b87a14U, 0x7bb12baeU, 0x0cb61b38U, + 0x92d28e9bU, 0xe5d5be0dU, 0x7cdcefb7U, 0x0bdbdf21U, 0x86d3d2d4U, 0xf1d4e242U, + 0x68ddb3f8U, 0x1fda836eU, 0x81be16cdU, 0xf6b9265bU, 0x6fb077e1U, 0x18b74777U, + 0x88085ae6U, 0xff0f6a70U, 0x66063bcaU, 0x11010b5cU, 0x8f659effU, 0xf862ae69U, + 0x616bffd3U, 0x166ccf45U, 0xa00ae278U, 0xd70dd2eeU, 0x4e048354U, 0x3903b3c2U, + 0xa7672661U, 0xd06016f7U, 0x4969474dU, 0x3e6e77dbU, 0xaed16a4aU, 0xd9d65adcU, + 0x40df0b66U, 0x37d83bf0U, 0xa9bcae53U, 0xdebb9ec5U, 0x47b2cf7fU, 0x30b5ffe9U, + 0xbdbdf21cU, 0xcabac28aU, 0x53b39330U, 0x24b4a3a6U, 0xbad03605U, 0xcdd70693U, + 0x54de5729U, 0x23d967bfU, 0xb3667a2eU, 0xc4614ab8U, 0x5d681b02U, 0x2a6f2b94U, + 0xb40bbe37U, 0xc30c8ea1U, 0x5a05df1bU, 0x2d02ef8dU +}; + +/* + * Check upstream parameters and mark it whether valid or dead + */ +static void +check_upstream (struct upstream *up, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors) +{ + if (up->dead) { + if (now - up->time >= revive_timeout) { + msg_debug ("check_upstream: reviving upstream after %ld seconds", (long int) now - up->time); + U_WLOCK (); + up->dead = 0; + up->errors = 0; + up->time = 0; + up->weight = up->priority; + U_UNLOCK (); + } + } + else { + if (now - up->time >= error_timeout && up->errors >= max_errors) { + msg_debug ("check_upstream: marking upstreams as dead after %ld errors", (long int) up->errors); + U_WLOCK (); + up->dead = 1; + up->time = now; + up->weight = 0; + U_UNLOCK (); + } + } +} + +/* + * Call this function after failed upstream request + */ +void +upstream_fail (struct upstream *up, time_t now) +{ + if (up->time != 0) { + up->errors ++; + } + else { + U_WLOCK (); + up->time = now; + up->errors ++; + U_UNLOCK (); + } +} +/* + * Call this function after successfull upstream request + */ +void +upstream_ok (struct upstream *up, time_t now) +{ + if (up->errors != 0) { + U_WLOCK (); + up->errors = 0; + up->time = 0; + U_UNLOCK (); + } + + up->weight --; +} +/* + * Mark all upstreams as active. This function is used when all upstreams are marked as inactive + */ +void +revive_all_upstreams (void *ups, size_t members, size_t msize) +{ + int i; + struct upstream *cur; + u_char *p; + + U_WLOCK (); + msg_debug ("revive_all_upstreams: starting reviving all upstreams"); + p = ups; + for (i = 0; i < members; i++) { + cur = (struct upstream *)p; + cur->time = 0; + cur->errors = 0; + cur->dead = 0; + cur->weight = cur->priority; + p += msize; + } + U_UNLOCK (); +} + +/* + * Scan all upstreams for errors and mark upstreams dead or alive depends on conditions, + * return number of alive upstreams + */ +static int +rescan_upstreams (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors) +{ + int i, alive; + struct upstream *cur; + u_char *p; + + /* Recheck all upstreams */ + p = ups; + alive = members; + for (i = 0; i < members; i++) { + cur = (struct upstream *)p; + check_upstream (cur, now, error_timeout, revive_timeout, max_errors); + alive -= cur->dead; + p += msize; + } + + /* All upstreams are dead */ + if (alive == 0) { + revive_all_upstreams (ups, members, msize); + alive = members; + } + + msg_debug ("rescan_upstreams: %d upstreams alive", alive); + + return alive; + +} + +/* Return alive upstream by its number */ +static struct upstream * +get_upstream_by_number (void *ups, size_t members, size_t msize, int selected) +{ + int i; + u_char *p, *c; + struct upstream *cur; + + i = 0; + p = ups; + c = ups; + U_RLOCK (); + for (;;) { + /* Out of range, return NULL */ + if (p > c + members * msize) { + break; + } + + cur = (struct upstream *)p; + p += msize; + + if (cur->dead) { + /* Skip inactive upstreams */ + continue; + } + /* Return selected upstream */ + if (i == selected) { + U_UNLOCK (); + return cur; + } + i++; + } + U_UNLOCK (); + + /* Error */ + return NULL; + +} + +/* + * Get hash key for specified key (perl hash) + */ +static uint32_t +get_hash_for_key (uint32_t hash, char *key, size_t keylen) +{ + uint32_t h, index; + const char *end = key + keylen; + + h = ~hash; + + while (key < end) { + index = (h ^ (u_char) *key) & 0x000000ffU; + h = (h >> 8) ^ crc32lookup[index]; + ++key; + } + + return (~h); +} + +/* + * Recheck all upstreams and return random active upstream + */ +struct upstream * +get_random_upstream (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors) +{ + int alive, selected; + + alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors); + selected = rand () % alive; + msg_debug ("get_random_upstream: return upstream with number %d of %d", selected, alive); + + return get_upstream_by_number (ups, members, msize, selected); +} + +/* + * Return upstream by hash, that is calculated from active upstreams number + */ +struct upstream * +get_upstream_by_hash (void *ups, size_t members, size_t msize, time_t now, + time_t error_timeout, time_t revive_timeout, size_t max_errors, + char *key, size_t keylen) +{ + int alive, tries = 0, r; + uint32_t h = 0, ht; + char *p, numbuf[4]; + struct upstream *cur; + + alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors); + + if (alive == 0) { + return NULL; + } + + h = get_hash_for_key (0, key, keylen); +#ifdef HASH_COMPAT + h = (h >> 16) & 0x7fff; +#endif + h %= members; + msg_debug ("get_upstream_by_hash: try to select upstream number %d of %zd", h, members); + + for (;;) { + p = (char *)ups + msize * h; + cur = (struct upstream *)p; + if (!cur->dead) { + break; + } + r = snprintf (numbuf, sizeof (numbuf), "%d", tries); + ht = get_hash_for_key (0, numbuf, r); + ht = get_hash_for_key (ht, key, keylen); +#ifdef HASH_COMPAT + h += (ht >> 16) & 0x7fff; +#else + h += ht; +#endif + h %= members; + msg_debug ("get_upstream_by_hash: try to select upstream number %d of %zd, tries: %d", h, members, tries); + tries ++; + if (tries > MAX_TRIES) { + msg_debug ("get_upstream_by_hash: max tries exceed, returning NULL"); + return NULL; + } + } + + U_RLOCK (); + p = ups; + U_UNLOCK (); + return cur; +} + +/* + * Recheck all upstreams and return upstream in round-robin order according to weight and priority + */ +struct upstream * +get_upstream_round_robin (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors) +{ + int alive, max_weight, i; + struct upstream *cur, *selected = NULL; + u_char *p; + + /* Recheck all upstreams */ + alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors); + + p = ups; + max_weight = 0; + selected = (struct upstream *)p; + U_RLOCK (); + for (i = 0; i < members; i++) { + cur = (struct upstream *)p; + if (!cur->dead) { + if (max_weight < cur->weight) { + max_weight = cur->weight; + selected = cur; + } + } + p += msize; + } + U_UNLOCK (); + + if (max_weight == 0) { + p = ups; + U_WLOCK (); + for (i = 0; i < members; i++) { + cur = (struct upstream *)p; + cur->weight = cur->priority; + if (!cur->dead) { + if (max_weight < cur->priority) { + max_weight = cur->priority; + selected = cur; + } + } + p += msize; + } + U_UNLOCK (); + } + msg_debug ("get_upstream_round_robin: selecting upstream with weight %d", max_weight); + + return selected; +} + +/* + * Recheck all upstreams and return upstream in round-robin order according to only priority (master-slaves) + */ +struct upstream * +get_upstream_master_slave (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors) +{ + int alive, max_weight, i; + struct upstream *cur, *selected = NULL; + u_char *p; + + /* Recheck all upstreams */ + alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors); + + p = ups; + max_weight = 0; + selected = (struct upstream *)p; + U_RLOCK (); + for (i = 0; i < members; i++) { + cur = (struct upstream *)p; + if (!cur->dead) { + if (max_weight < cur->priority) { + max_weight = cur->priority; + selected = cur; + } + } + p += msize; + } + U_UNLOCK (); + msg_debug ("get_upstream_master_slave: selecting upstream with priority %d", max_weight); + + return selected; +} + +/* + * Ketama manipulation functions + */ + +static int +ketama_sort_cmp (const void *a1, const void *a2) +{ + return *((uint32_t *)a1) - *((uint32_t *)a2); +} + +/* + * Add ketama points for specified upstream + */ +int +upstream_ketama_add (struct upstream *up, char *up_key, size_t keylen, size_t keypoints) +{ + uint32_t h = 0; + char tmp[4]; + int i; + + /* Allocate ketama points array */ + if (up->ketama_points == NULL) { + up->ketama_points_size = keypoints; + up->ketama_points = malloc (sizeof (uint32_t) * up->ketama_points_size); + if (up->ketama_points == NULL) { + return -1; + } + } + + h = get_hash_for_key (h, up_key, keylen); + + for (i = 0; i < keypoints; i++) { + tmp[0] = i & 0xff; + tmp[1] = (i >> 8) & 0xff; + tmp[2] = (i >> 16) & 0xff; + tmp[3] = (i >> 24) & 0xff; + + h = get_hash_for_key (h, tmp, sizeof (tmp) * sizeof (char)); + up->ketama_points[i] = h; + } + /* Keep points sorted */ + qsort (up->ketama_points, keypoints, sizeof (uint32_t), ketama_sort_cmp); + + return 0; +} + +/* + * Return upstream by hash and find nearest ketama point in some server + */ +struct upstream * +get_upstream_by_hash_ketama (void *ups, size_t members, size_t msize, time_t now, + time_t error_timeout, time_t revive_timeout, size_t max_errors, + char *key, size_t keylen) +{ + int alive, i; + uint32_t h = 0, step, middle, d, min_diff = UINT_MAX; + char *p; + struct upstream *cur = NULL, *nearest = NULL; + + alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors); + + if (alive == 0) { + return NULL; + } + + h = get_hash_for_key (h, key, keylen); + + U_RLOCK (); + p = ups; + nearest = (struct upstream *)p; + for (i = 0; i < members; i++) { + cur = (struct upstream *)p; + if (!cur->dead && cur->ketama_points != NULL) { + /* Find nearest ketama point for this key */ + step = cur->ketama_points_size / 2; + middle = step; + while (step != 1) { + d = cur->ketama_points[middle] - h; + if (abs (d) < min_diff) { + min_diff = abs (d); + nearest = cur; + } + step /= 2; + if (d > 0) { + middle -= step; + } + else { + middle += step; + } + } + } + } + U_UNLOCK (); + return nearest; +} + +#undef U_LOCK +#undef U_UNLOCK +#undef msg_debug +/* + * vi:ts=4 + */ diff --git a/src/upstream.h b/src/upstream.h new file mode 100644 index 000000000..a6c6b2200 --- /dev/null +++ b/src/upstream.h @@ -0,0 +1,43 @@ +#ifndef UPSTREAM_H +#define UPSTREAM_H + +#include +#include + +struct upstream { + unsigned int errors; + time_t time; + unsigned char dead; + unsigned char priority; + int16_t weight; + uint32_t *ketama_points; + size_t ketama_points_size; +}; + +void upstream_fail (struct upstream *up, time_t now); +void upstream_ok (struct upstream *up, time_t now); +void revive_all_upstreams (void *ups, size_t members, size_t msize); +int upstream_ketama_add (struct upstream *up, char *up_key, size_t keylen, size_t keypoints); + +struct upstream* get_random_upstream (void *ups, size_t members, size_t msize, + time_t now, time_t error_timeout, + time_t revive_timeout, size_t max_errors); + +struct upstream* get_upstream_by_hash (void *ups, size_t members, size_t msize, + time_t now, time_t error_timeout, + time_t revive_timeout, size_t max_errors, + char *key, size_t keylen); + +struct upstream* get_upstream_round_robin (void *ups, size_t members, size_t msize, + time_t now, time_t error_timeout, + time_t revive_timeout, size_t max_errors); + +struct upstream* get_upstream_by_hash_ketama (void *ups, size_t members, size_t msize, time_t now, + time_t error_timeout, time_t revive_timeout, size_t max_errors, + char *key, size_t keylen); + + +#endif /* UPSTREAM_H */ +/* + * vi:ts=4 + */ diff --git a/src/url.c b/src/url.c new file mode 100644 index 000000000..83ee0195a --- /dev/null +++ b/src/url.c @@ -0,0 +1,886 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "url.h" +#include "fstring.h" +#include "main.h" + +#define POST_CHAR 1 +#define POST_CHAR_S "\001" + +/* Tcp port range */ +#define LOWEST_PORT 0 +#define HIGHEST_PORT 65535 + +#define uri_port_is_valid(port) \ + (LOWEST_PORT <= (port) && (port) <= HIGHEST_PORT) + +struct _proto { + unsigned char *name; + int port; + uintptr_t *unused; + unsigned int need_slashes:1; + unsigned int need_slash_after_host:1; + unsigned int free_syntax:1; + unsigned int need_ssl:1; +}; + +static const char *text_url = "((https?|ftp)://)?" +"(\\b(??!),.'\"\\]:])" +"(?!@)" +")"; +static const char *html_url = "(?: src|href)=\"(" +"((https?|ftp)://)?" +"(\\b(??!),.'\"\\]:])" +"(?!@)" +"))\""; + +static short url_initialized = 0; +GRegex *text_re, *html_re; + +static const struct _proto protocol_backends[] = { + { "file", 0, NULL, 1, 0, 0, 0 }, + { "ftp", 21, NULL, 1, 1, 0, 0 }, + { "http", 80, NULL, 1, 1, 0, 0 }, + { "https", 443, NULL, 1, 1, 0, 1 }, + + /* Keep these last! */ + { NULL, 0, NULL, 0, 0, 1, 0 }, +}; + +/* + Table of "reserved" and "unsafe" characters. Those terms are + rfc1738-speak, as such largely obsoleted by rfc2396 and later + specs, but the general idea remains. + + A reserved character is the one that you can't decode without + changing the meaning of the URL. For example, you can't decode + "/foo/%2f/bar" into "/foo///bar" because the number and contents of + path components is different. Non-reserved characters can be + changed, so "/foo/%78/bar" is safe to change to "/foo/x/bar". The + unsafe characters are loosely based on rfc1738, plus "$" and ",", + as recommended by rfc2396, and minus "~", which is very frequently + used (and sometimes unrecognized as %7E by broken servers). + + An unsafe character is the one that should be encoded when URLs are + placed in foreign environments. E.g. space and newline are unsafe + in HTTP contexts because HTTP uses them as separator and line + terminator, so they must be encoded to %20 and %0A respectively. + "*" is unsafe in shell context, etc. + + We determine whether a character is unsafe through static table + lookup. This code assumes ASCII character set and 8-bit chars. */ + +enum { + /* rfc1738 reserved chars + "$" and ",". */ + urlchr_reserved = 1, + + /* rfc1738 unsafe chars, plus non-printables. */ + urlchr_unsafe = 2 +}; + +#define urlchr_test(c, mask) (urlchr_table[(unsigned char)(c)] & (mask)) +#define URL_RESERVED_CHAR(c) urlchr_test(c, urlchr_reserved) +#define URL_UNSAFE_CHAR(c) urlchr_test(c, urlchr_unsafe) +/* Convert an ASCII hex digit to the corresponding number between 0 + and 15. H should be a hexadecimal digit that satisfies isxdigit; + otherwise, the result is undefined. */ +#define XDIGIT_TO_NUM(h) ((h) < 'A' ? (h) - '0' : toupper (h) - 'A' + 10) +#define X2DIGITS_TO_NUM(h1, h2) ((XDIGIT_TO_NUM (h1) << 4) + XDIGIT_TO_NUM (h2)) +/* The reverse of the above: convert a number in the [0, 16) range to + the ASCII representation of the corresponding hexadecimal digit. + `+ 0' is there so you can't accidentally use it as an lvalue. */ +#define XNUM_TO_DIGIT(x) ("0123456789ABCDEF"[x] + 0) +#define XNUM_TO_digit(x) ("0123456789abcdef"[x] + 0) + +/* Shorthands for the table: */ +#define R urlchr_reserved +#define U urlchr_unsafe +#define RU R|U + +static const unsigned char urlchr_table[256] = +{ + U, U, U, U, U, U, U, U, /* NUL SOH STX ETX EOT ENQ ACK BEL */ + U, U, U, U, U, U, U, U, /* BS HT LF VT FF CR SO SI */ + U, U, U, U, U, U, U, U, /* DLE DC1 DC2 DC3 DC4 NAK SYN ETB */ + U, U, U, U, U, U, U, U, /* CAN EM SUB ESC FS GS RS US */ + U, 0, U, RU, R, U, R, 0, /* SP ! " # $ % & ' */ + 0, 0, 0, R, R, 0, 0, R, /* ( ) * + , - . / */ + 0, 0, 0, 0, 0, 0, 0, 0, /* 0 1 2 3 4 5 6 7 */ + 0, 0, RU, R, U, R, U, R, /* 8 9 : ; < = > ? */ + RU, 0, 0, 0, 0, 0, 0, 0, /* @ A B C D E F G */ + 0, 0, 0, 0, 0, 0, 0, 0, /* H I J K L M N O */ + 0, 0, 0, 0, 0, 0, 0, 0, /* P Q R S T U V W */ + 0, 0, 0, RU, U, RU, U, 0, /* X Y Z [ \ ] ^ _ */ + U, 0, 0, 0, 0, 0, 0, 0, /* ` a b c d e f g */ + 0, 0, 0, 0, 0, 0, 0, 0, /* h i j k l m n o */ + 0, 0, 0, 0, 0, 0, 0, 0, /* p q r s t u v w */ + 0, 0, 0, U, U, U, 0, U, /* x y z { | } ~ DEL */ + + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, + U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, +}; +#undef R +#undef U +#undef RU + +static inline int +end_of_dir(unsigned char c) +{ + return c == POST_CHAR || c == '#' || c == ';' || c == '?'; +} + +static inline int +is_uri_dir_sep(struct uri *uri, unsigned char pos) +{ + return (pos == '/'); +} + +static int +check_uri_file(unsigned char *name) +{ + static const unsigned char chars[] = POST_CHAR_S "#?"; + + return strcspn(name, chars); +} + +static int +url_init (void) +{ + GError *err = NULL; + if (url_initialized == 0) { + text_re = g_regex_new (text_url, G_REGEX_CASELESS | G_REGEX_MULTILINE | G_REGEX_OPTIMIZE | G_REGEX_EXTENDED, 0, &err); + if (err != NULL) { + msg_info ("url_init: cannot init text url parsing regexp: %s", err->message); + g_error_free (err); + return -1; + } + html_re = g_regex_new (html_url, G_REGEX_CASELESS | G_REGEX_MULTILINE | G_REGEX_OPTIMIZE | G_REGEX_EXTENDED, 0, &err); + if (err != NULL) { + msg_info ("url_init: cannot init html url parsing regexp: %s", err->message); + g_error_free (err); + return -1; + } + url_initialized = 1; + } + + return 0; +} + +enum protocol +get_protocol(unsigned char *name, int namelen) +{ + /* These are really enum protocol values but can take on negative + * values and since 0 <= -1 for enum values it's better to use clean + * integer type. */ + int start, end; + enum protocol protocol; + unsigned char *pname; + int pnamelen, minlen, compare; + + /* Almost dichotomic search is used here */ + /* Starting at the HTTP entry which is the most common that will make + * file and NNTP the next entries checked and amongst the third checks + * are proxy and FTP. */ + start = 0; + end = PROTOCOL_UNKNOWN - 1; + protocol = PROTOCOL_HTTP; + + while (start <= end) { + pname = protocol_backends[protocol].name; + pnamelen = strlen (pname); + minlen = MIN (pnamelen, namelen); + compare = strncasecmp (pname, name, minlen); + + if (compare == 0) { + if (pnamelen == namelen) + return protocol; + + /* If the current protocol name is longer than the + * protocol name being searched for move @end else move + * @start. */ + compare = pnamelen > namelen ? 1 : -1; + } + + if (compare > 0) + end = protocol - 1; + else + start = protocol + 1; + + protocol = (start + end) / 2; + } + + return PROTOCOL_UNKNOWN; +} + + +int +get_protocol_port(enum protocol protocol) +{ + return protocol_backends[protocol].port; +} + +int +get_protocol_need_slashes(enum protocol protocol) +{ + return protocol_backends[protocol].need_slashes; +} + +int +get_protocol_need_slash_after_host(enum protocol protocol) +{ + return protocol_backends[protocol].need_slash_after_host; +} + +int +get_protocol_free_syntax(enum protocol protocol) +{ + return protocol_backends[protocol].free_syntax; +} + +static int +get_protocol_length(const unsigned char *url) +{ + unsigned char *end = (unsigned char *) url; + + /* Seek the end of the protocol name if any. */ + /* RFC1738: + * scheme = 1*[ lowalpha | digit | "+" | "-" | "." ] + * (but per its recommendations we accept "upalpha" too) */ + while (isalnum(*end) || *end == '+' || *end == '-' || *end == '.') + end++; + + /* Also return 0 if there's no protocol name (@end == @url). */ + return (*end == ':') ? end - url : 0; +} + +/* URL-unescape the string S. + + This is done by transforming the sequences "%HH" to the character + represented by the hexadecimal digits HH. If % is not followed by + two hexadecimal digits, it is inserted literally. + + The transformation is done in place. If you need the original + string intact, make a copy before calling this function. */ + +static void +url_unescape (char *s) +{ + char *t = s; /* t - tortoise */ + char *h = s; /* h - hare */ + + for (; *h; h++, t++) { + if (*h != '%') { + copychar: + *t = *h; + } + else { + char c; + /* Do nothing if '%' is not followed by two hex digits. */ + if (!h[1] || !h[2] || !(isxdigit (h[1]) && isxdigit (h[2]))) + goto copychar; + c = X2DIGITS_TO_NUM (h[1], h[2]); + /* Don't unescape %00 because there is no way to insert it + * into a C string without effectively truncating it. */ + if (c == '\0') + goto copychar; + *t = c; + h += 2; + } + } + *t = '\0'; +} + +/* The core of url_escape_* functions. Escapes the characters that + match the provided mask in urlchr_table. + + If ALLOW_PASSTHROUGH is non-zero, a string with no unsafe chars + will be returned unchanged. If ALLOW_PASSTHROUGH is zero, a + freshly allocated string will be returned in all cases. */ + +static char * +url_escape_1 (const char *s, unsigned char mask, int allow_passthrough, memory_pool_t *pool) +{ + const char *p1; + char *p2, *newstr; + int newlen; + int addition = 0; + + for (p1 = s; *p1; p1++) + if (urlchr_test (*p1, mask)) + addition += 2; /* Two more characters (hex digits) */ + + if (!addition) { + if (allow_passthrough) { + return (char *)s; + } + else { + return memory_pool_strdup (pool, s); + } + } + + newlen = (p1 - s) + addition; + newstr = (char *) memory_pool_alloc (pool, newlen + 1); + + p1 = s; + p2 = newstr; + while (*p1) { + /* Quote the characters that match the test mask. */ + if (urlchr_test (*p1, mask)) { + unsigned char c = *p1++; + *p2++ = '%'; + *p2++ = XNUM_TO_DIGIT (c >> 4); + *p2++ = XNUM_TO_DIGIT (c & 0xf); + } + else + *p2++ = *p1++; + } + *p2 = '\0'; + + return newstr; +} + +/* URL-escape the unsafe characters (see urlchr_table) in a given + string, returning a freshly allocated string. */ + +char * +url_escape (const char *s, memory_pool_t *pool) +{ + return url_escape_1 (s, urlchr_unsafe, 0, pool); +} + +/* URL-escape the unsafe characters (see urlchr_table) in a given + string. If no characters are unsafe, S is returned. */ + +static char * +url_escape_allow_passthrough (const char *s, memory_pool_t *pool) +{ + return url_escape_1 (s, urlchr_unsafe, 1, pool); +} + +/* Decide whether the char at position P needs to be encoded. (It is + not enough to pass a single char *P because the function may need + to inspect the surrounding context.) + + Return 1 if the char should be escaped as %XX, 0 otherwise. */ + +static inline int +char_needs_escaping (const char *p) +{ + if (*p == '%') { + if (isxdigit (*(p + 1)) && isxdigit (*(p + 2))) + return 0; + else + /* Garbled %.. sequence: encode `%'. */ + return 1; + } + else if (URL_UNSAFE_CHAR (*p) && !URL_RESERVED_CHAR (*p)) + return 1; + else + return 0; +} + +/* Translate a %-escaped (but possibly non-conformant) input string S + into a %-escaped (and conformant) output string. If no characters + are encoded or decoded, return the same string S; otherwise, return + a freshly allocated string with the new contents. + + After a URL has been run through this function, the protocols that + use `%' as the quote character can use the resulting string as-is, + while those that don't can use url_unescape to get to the intended + data. This function is stable: once the input is transformed, + further transformations of the result yield the same output. +*/ + +static char * +reencode_escapes (const char *s, memory_pool_t *pool) +{ + const char *p1; + char *newstr, *p2; + int oldlen, newlen; + + int encode_count = 0; + + /* First pass: inspect the string to see if there's anything to do, + and to calculate the new length. */ + for (p1 = s; *p1; p1++) + if (char_needs_escaping (p1)) + ++encode_count; + + if (!encode_count) { + /* The string is good as it is. */ + return memory_pool_strdup (pool, s); + } + + oldlen = p1 - s; + /* Each encoding adds two characters (hex digits). */ + newlen = oldlen + 2 * encode_count; + newstr = memory_pool_alloc (pool, newlen + 1); + + /* Second pass: copy the string to the destination address, encoding + chars when needed. */ + p1 = s; + p2 = newstr; + + while (*p1) + if (char_needs_escaping (p1)) { + unsigned char c = *p1++; + *p2++ = '%'; + *p2++ = XNUM_TO_DIGIT (c >> 4); + *p2++ = XNUM_TO_DIGIT (c & 0xf); + } + else { + *p2++ = *p1++; + } + + *p2 = '\0'; + return newstr; +} +/* Unescape CHR in an otherwise escaped STR. Used to selectively + escaping of certain characters, such as "/" and ":". Returns a + count of unescaped chars. */ + +static void +unescape_single_char (char *str, char chr) +{ + const char c1 = XNUM_TO_DIGIT (chr >> 4); + const char c2 = XNUM_TO_DIGIT (chr & 0xf); + char *h = str; /* hare */ + char *t = str; /* tortoise */ + + for (; *h; h++, t++) { + if (h[0] == '%' && h[1] == c1 && h[2] == c2) { + *t = chr; + h += 2; + } + else { + *t = *h; + } + } + *t = '\0'; +} + +/* Escape unsafe and reserved characters, except for the slash + characters. */ + +static char * +url_escape_dir (const char *dir, memory_pool_t *pool) +{ + char *newdir = url_escape_1 (dir, urlchr_unsafe | urlchr_reserved, 1, pool); + if (newdir == dir) + return (char *)dir; + + unescape_single_char (newdir, '/'); + return newdir; +} + +/* Resolve "." and ".." elements of PATH by destructively modifying + PATH and return non-zero if PATH has been modified, zero otherwise. + + The algorithm is in spirit similar to the one described in rfc1808, + although implemented differently, in one pass. To recap, path + elements containing only "." are removed, and ".." is taken to mean + "back up one element". Single leading and trailing slashes are + preserved. + + For example, "a/b/c/./../d/.." will yield "a/b/". More exhaustive + test examples are provided below. If you change anything in this + function, run test_path_simplify to make sure you haven't broken a + test case. */ + +static int +path_simplify (char *path) +{ + char *h = path; /* hare */ + char *t = path; /* tortoise */ + char *beg = path; /* boundary for backing the tortoise */ + char *end = path + strlen (path); + + while (h < end) { + /* Hare should be at the beginning of a path element. */ + if (h[0] == '.' && (h[1] == '/' || h[1] == '\0')) { + /* Ignore "./". */ + h += 2; + } + else if (h[0] == '.' && h[1] == '.' && (h[2] == '/' || h[2] == '\0')) { + /* Handle "../" by retreating the tortoise by one path + element -- but not past beggining. */ + if (t > beg) { + /* Move backwards until T hits the beginning of the + previous path element or the beginning of path. */ + for (--t; t > beg && t[-1] != '/'; t--); + } + else { + /* If we're at the beginning, copy the "../" literally + move the beginning so a later ".." doesn't remove + it. */ + beg = t + 3; + goto regular; + } + h += 3; + } + else { + regular: + /* A regular path element. If H hasn't advanced past T, + simply skip to the next path element. Otherwise, copy + the path element until the next slash. */ + if (t == h) { + /* Skip the path element, including the slash. */ + while (h < end && *h != '/') + t++, h++; + if (h < end) + t++, h++; + } + else { + /* Copy the path element, including the final slash. */ + while (h < end && *h != '/') + *t++ = *h++; + if (h < end) + *t++ = *h++; + } + } + } + + if (t != h) + *t = '\0'; + + return t != h; +} + +enum uri_errno +parse_uri(struct uri *uri, unsigned char *uristring, memory_pool_t *pool) +{ + unsigned char *prefix_end, *host_end, *p; + unsigned char *lbracket, *rbracket; + int datalen, n, addrlen; + unsigned char *frag_or_post, *user_end, *port_end; + + memset (uri, 0, sizeof (*uri)); + + /* Nothing to do for an empty url. */ + if (!*uristring) return URI_ERRNO_EMPTY; + + uri->string = reencode_escapes (uristring, pool); + msg_debug ("parse_uri: reencoding escapes in original url: '%s'", struri (uri)); + uri->protocollen = get_protocol_length (struri (uri)); + + /* Assume http as default protocol */ + if (!uri->protocollen || (uri->protocol = get_protocol (struri(uri), uri->protocollen)) == PROTOCOL_UNKNOWN) { + p = g_strconcat ("http://", uri->string, NULL); + g_free (uri->string); + uri->string = p; + uri->protocol = PROTOCOL_HTTP; + prefix_end = struri (uri) + 7; + } + else { + /* Figure out whether the protocol is known */ + msg_debug ("parse_uri: getting protocol from url: %d", uri->protocol); + + prefix_end = struri (uri) + uri->protocollen; /* ':' */ + + /* Check if there's a digit after the protocol name. */ + if (isdigit (*prefix_end)) { + p = struri (uri); + uri->ip_family = p[uri->protocollen] - '0'; + prefix_end++; + } + if (*prefix_end != ':') { + msg_debug ("parse_uri: invalid protocol in uri"); + return URI_ERRNO_INVALID_PROTOCOL; + } + prefix_end++; + + /* Skip slashes */ + + if (prefix_end[0] == '/' && prefix_end[1] == '/') { + if (prefix_end[2] == '/') { + msg_debug ("parse_uri: too many '/' in uri"); + return URI_ERRNO_TOO_MANY_SLASHES; + } + + prefix_end += 2; + + } else { + msg_debug ("parse_uri: no '/' in uri"); + return URI_ERRNO_NO_SLASHES; + } + } + + if (get_protocol_free_syntax (uri->protocol)) { + uri->data = prefix_end; + uri->datalen = strlen (prefix_end); + return URI_ERRNO_OK; + + } else if (uri->protocol == PROTOCOL_FILE) { + datalen = check_uri_file (prefix_end); + frag_or_post = prefix_end + datalen; + + /* Extract the fragment part. */ + if (datalen >= 0) { + if (*frag_or_post == '#') { + uri->fragment = frag_or_post + 1; + uri->fragmentlen = strcspn(uri->fragment, POST_CHAR_S); + frag_or_post = uri->fragment + uri->fragmentlen; + } + if (*frag_or_post == POST_CHAR) { + uri->post = frag_or_post + 1; + } + } else { + datalen = strlen(prefix_end); + } + + uri->data = prefix_end; + uri->datalen = datalen; + + return URI_ERRNO_OK; + } + + /* Isolate host */ + + /* Get brackets enclosing IPv6 address */ + lbracket = strchr (prefix_end, '['); + if (lbracket) { + rbracket = strchr (lbracket, ']'); + /* [address] is handled only inside of hostname part (surprisingly). */ + if (rbracket && rbracket < prefix_end + strcspn (prefix_end, "/")) + uri->ipv6 = 1; + else + lbracket = rbracket = NULL; + } else { + rbracket = NULL; + } + + /* Possibly skip auth part */ + host_end = prefix_end + strcspn (prefix_end, "@"); + + if (prefix_end + strcspn (prefix_end, "/") > host_end + && *host_end) { /* we have auth info here */ + + /* Allow '@' in the password component */ + while (strcspn (host_end + 1, "@") < strcspn (host_end + 1, "/?")) + host_end = host_end + 1 + strcspn (host_end + 1, "@"); + + user_end = strchr (prefix_end, ':'); + + if (!user_end || user_end > host_end) { + uri->user = prefix_end; + uri->userlen = host_end - prefix_end; + } else { + uri->user = prefix_end; + uri->userlen = user_end - prefix_end; + uri->password = user_end + 1; + uri->passwordlen = host_end - user_end - 1; + } + prefix_end = host_end + 1; + } + + if (uri->ipv6) + host_end = rbracket + strcspn (rbracket, ":/?"); + else + host_end = prefix_end + strcspn (prefix_end, ":/?"); + + if (uri->ipv6) { + addrlen = rbracket - lbracket - 1; + + + uri->host = lbracket + 1; + uri->hostlen = addrlen; + } else { + uri->host = prefix_end; + uri->hostlen = host_end - prefix_end; + + /* Trim trailing '.'s */ + if (uri->hostlen && uri->host[uri->hostlen - 1] == '.') + return URI_ERRNO_TRAILING_DOTS; + } + + if (*host_end == ':') { /* we have port here */ + port_end = host_end + 1 + strcspn (host_end + 1, "/"); + + host_end++; + + uri->port = host_end; + uri->portlen = port_end - host_end; + + if (uri->portlen == 0) + return URI_ERRNO_NO_PORT_COLON; + + /* We only use 8 bits for portlen so better check */ + if (uri->portlen != port_end - host_end) + return URI_ERRNO_INVALID_PORT; + + /* test if port is number */ + for (; host_end < port_end; host_end++) + if (!isdigit (*host_end)) + return URI_ERRNO_INVALID_PORT; + + /* Check valid port value, and let show an error message + * about invalid url syntax. */ + if (uri->port && uri->portlen) { + + errno = 0; + n = strtol (uri->port, NULL, 10); + if (errno || !uri_port_is_valid (n)) + return URI_ERRNO_INVALID_PORT; + } + } + + if (*host_end == '/') { + host_end++; + + } else if (get_protocol_need_slash_after_host (uri->protocol) && *host_end != '?') { + /* The need for slash after the host component depends on the + * need for a host component. -- The dangerous mind of Jonah */ + if (!uri->hostlen) + return URI_ERRNO_NO_HOST; + + return URI_ERRNO_NO_HOST_SLASH; + } + + /* Look for #fragment or POST_CHAR */ + prefix_end = host_end + strcspn (host_end, "#" POST_CHAR_S); + uri->data = host_end; + uri->datalen = prefix_end - host_end; + + if (*prefix_end == '#') { + uri->fragment = prefix_end + 1; + uri->fragmentlen = strcspn (uri->fragment, POST_CHAR_S); + prefix_end = uri->fragment + uri->fragmentlen; + } + + if (*prefix_end == POST_CHAR) { + uri->post = prefix_end + 1; + } + + convert_to_lowercase (uri->string, uri->protocollen); + convert_to_lowercase (uri->host, uri->hostlen); + /* Decode %HH sequences in host name. This is important not so much + to support %HH sequences in host names (which other browser + don't), but to support binary characters (which will have been + converted to %HH by reencode_escapes). */ + if (strchr (uri->host, '%')) { + url_unescape (uri->host); + } + path_simplify (uri->data); + + return URI_ERRNO_OK; +} + +void +url_parse_text (struct worker_task *task, GByteArray *content) +{ + GMatchInfo *info; + GError *err = NULL; + int pos = 0, start; + gboolean rc; + char *url_str = NULL; + struct uri *new; + + if (url_init () == 0) { + do { + rc = g_regex_match_full (text_re, (const char *)content->data, content->len, pos, 0, &info, &err); + if (rc) { + if (g_match_info_matches (info)) { + g_match_info_fetch_pos (info, 0, &start, &pos); + url_str = g_match_info_fetch (info, 0); + msg_debug ("url_parse_text: extracted string with regexp: '%s'", url_str); + if (url_str != NULL) { + new = memory_pool_alloc (task->task_pool, sizeof (struct uri)); + if (new != NULL) { + parse_uri (new, url_str, task->task_pool); + TAILQ_INSERT_TAIL (&task->urls, new, next); + } + } + g_free (url_str); + } + g_match_info_free (info); + } + else if (err != NULL) { + msg_debug ("url_parse_text: error matching regexp: %s", err->message); + g_free (err); + } + else { + msg_debug ("url_parse_text: cannot find url pattern in given string"); + } + } while (rc); + } +} + +void +url_parse_html (struct worker_task *task, GByteArray *content) +{ + GMatchInfo *info; + GError *err = NULL; + int pos = 0, start; + gboolean rc; + char *url_str = NULL; + struct uri *new; + + if (url_init () == 0) { + do { + rc = g_regex_match_full (html_re, (const char *)content->data, content->len, pos, 0, &info, &err); + if (rc) { + if (g_match_info_matches (info)) { + g_match_info_fetch_pos (info, 0, &start, &pos); + url_str = g_match_info_fetch (info, 1); + msg_debug ("url_parse_html: extracted string with regexp: '%s'", url_str); + if (url_str != NULL) { + new = memory_pool_alloc (task->task_pool, sizeof (struct uri)); + if (new != NULL) { + parse_uri (new, url_str, task->task_pool); + TAILQ_INSERT_TAIL (&task->urls, new, next); + } + } + g_free (url_str); + } + g_match_info_free (info); + } + else if (err) { + msg_debug ("url_parse_html: error matching regexp: %s", err->message); + g_free (err); + } + else { + msg_debug ("url_parse_html: cannot find url pattern in given string"); + } + } while (rc); + } +} diff --git a/src/url.h b/src/url.h new file mode 100644 index 000000000..6987c38d1 --- /dev/null +++ b/src/url.h @@ -0,0 +1,88 @@ +/* URL check functions */ +#ifndef URL_H +#define URL_H + +#include +#include +#ifndef HAVE_OWN_QUEUE_H +#include +#else +#include "queue.h" +#endif + +#include +#include "mem_pool.h" + +struct worker_task; + +struct uri { + /* The start of the uri (and thus start of the protocol string). */ + unsigned char *string; + + /* The internal type of protocol. Can _never_ be PROTOCOL_UNKNOWN. */ + int protocol; /* enum protocol */ + + int ip_family; + + unsigned char *user; + unsigned char *password; + unsigned char *host; + unsigned char *port; + /* @data can contain both the path and query uri fields. + * It can never be NULL but can have zero length. */ + unsigned char *data; + unsigned char *fragment; + /* @post can contain some special encoded form data, used internally + * to make form data handling more efficient. The data is marked by + * POST_CHAR in the uri string. */ + unsigned char *post; + + /* @protocollen should only be usable if @protocol is either + * PROTOCOL_USER or an uri string should be composed. */ + unsigned int protocollen; + unsigned int userlen; + unsigned int passwordlen; + unsigned int hostlen; + unsigned int portlen; + unsigned int datalen; + unsigned int fragmentlen; + + /* Flags */ + unsigned int ipv6; /* URI contains IPv6 host */ + unsigned int form; /* URI originated from form */ + + /* Link */ + TAILQ_ENTRY(uri) next; +}; + +enum uri_errno { + URI_ERRNO_OK, /* Parsing went well */ + URI_ERRNO_EMPTY, /* The URI string was empty */ + URI_ERRNO_INVALID_PROTOCOL, /* No protocol was found */ + URI_ERRNO_NO_SLASHES, /* Slashes after protocol missing */ + URI_ERRNO_TOO_MANY_SLASHES, /* Too many slashes after protocol */ + URI_ERRNO_TRAILING_DOTS, /* '.' after host */ + URI_ERRNO_NO_HOST, /* Host part is missing */ + URI_ERRNO_NO_PORT_COLON, /* ':' after host without port */ + URI_ERRNO_NO_HOST_SLASH, /* Slash after host missing */ + URI_ERRNO_IPV6_SECURITY, /* IPv6 security bug detected */ + URI_ERRNO_INVALID_PORT, /* Port number is bad */ + URI_ERRNO_INVALID_PORT_RANGE, /* Port number is not within 0-65535 */ +}; + +enum protocol { + PROTOCOL_FILE, + PROTOCOL_FTP, + PROTOCOL_HTTP, + PROTOCOL_HTTPS, + + PROTOCOL_UNKNOWN, +}; + +#define struri(uri) ((uri)->string) + +void url_parse_html (struct worker_task *task, GByteArray *part); +void url_parse_text (struct worker_task *task, GByteArray *part); +enum uri_errno parse_uri(struct uri *uri, unsigned char *uristring, memory_pool_t *pool); + +#endif diff --git a/src/util.c b/src/util.c new file mode 100644 index 000000000..90a536359 --- /dev/null +++ b/src/util.c @@ -0,0 +1,834 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#ifdef HAVE_LIBUTIL_H +#include +#endif +#include "util.h" +#include "cfg_file.h" + +sig_atomic_t do_reopen_log = 0; + +int +event_make_socket_nonblocking (int fd) +{ + if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { + return -1; + } + return 0; +} + +static int +make_socket_ai (struct addrinfo *ai) +{ + struct linger linger; + int fd, on = 1, r; + int serrno; + + /* Create listen socket */ + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd == -1) { + return (-1); + } + + if (event_make_socket_nonblocking(fd) < 0) + goto out; + + if (fcntl(fd, F_SETFD, 1) == -1) { + goto out; + } + + setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); + linger.l_onoff = 1; + linger.l_linger = 5; + setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)); + + r = bind(fd, ai->ai_addr, ai->ai_addrlen); + + if (r == -1) { + if (errno != EINPROGRESS) { + goto out; + } + } + + return (fd); + + out: + serrno = errno; + close(fd); + errno = serrno; + return (-1); +} + +int +make_socket (const char *address, u_short port) +{ + int fd; + struct addrinfo ai, *aitop = NULL; + char strport[NI_MAXSERV]; + int ai_result; + + memset (&ai, 0, sizeof (ai)); + ai.ai_family = AF_INET; + ai.ai_socktype = SOCK_STREAM; + ai.ai_flags = AI_PASSIVE; + snprintf (strport, sizeof (strport), "%d", port); + if ((ai_result = getaddrinfo (address, strport, &ai, &aitop)) != 0) { + return (-1); + } + + fd = make_socket_ai (aitop); + + freeaddrinfo (aitop); + + return (fd); +} + +int +make_unix_socket (const char *path, struct sockaddr_un *addr) +{ + size_t len = strlen (path); + int sock; + + if (len > sizeof (addr->sun_path) - 1) return -1; + + #ifdef FREEBSD + addr->sun_len = sizeof (struct sockaddr_un); + #endif + + addr->sun_family = AF_UNIX; + + strncpy (addr->sun_path, path, len); + + sock = socket (PF_LOCAL, SOCK_STREAM, 0); + + if (sock != -1) { + if (bind (sock, (struct sockaddr *) addr, sizeof (struct sockaddr_un)) == -1) return -1; + } + + return sock; +} + +void +read_cmd_line (int argc, char **argv, struct config_file *cfg) +{ + int ch; + while ((ch = getopt(argc, argv, "hfc:")) != -1) { + switch (ch) { + case 'f': + cfg->no_fork = 1; + break; + case 'c': + if (optarg && cfg->cfg_name) { + cfg->cfg_name = memory_pool_strdup (cfg->cfg_pool, optarg); + } + break; + case 'h': + case '?': + default: + /* Show help message and exit */ + printf ("Rspamd version " RVERSION "\n" + "Usage: rspamd [-h] [-n] [-f] [-c config_file]\n" + "-h: This help message\n" + "-f: Do not daemonize main process\n" + "-c: Specify config file (./rspamd.conf is used by default)\n"); + exit (0); + break; + } + } +} + +int +write_pid (struct rspamd_main *main) +{ + pid_t pid; + main->pfh = pidfile_open (main->cfg->pid_file, 0644, &pid); + + if (main->pfh == NULL) { + return -1; + } + + pidfile_write (main->pfh); + + return 0; +} + +void +init_signals (struct sigaction *signals, sig_t sig_handler) +{ + /* Setting up signal handlers */ + /* SIGUSR1 - reopen config file */ + /* SIGUSR2 - worker is ready for accept */ + sigemptyset(&signals->sa_mask); + sigaddset(&signals->sa_mask, SIGTERM); + sigaddset(&signals->sa_mask, SIGINT); + sigaddset(&signals->sa_mask, SIGHUP); + sigaddset(&signals->sa_mask, SIGCHLD); + sigaddset(&signals->sa_mask, SIGUSR1); + sigaddset(&signals->sa_mask, SIGUSR2); + + + signals->sa_handler = sig_handler; + sigaction (SIGTERM, signals, NULL); + sigaction (SIGINT, signals, NULL); + sigaction (SIGHUP, signals, NULL); + sigaction (SIGCHLD, signals, NULL); + sigaction (SIGUSR1, signals, NULL); + sigaction (SIGUSR2, signals, NULL); +} + +void +pass_signal_worker (struct workq *workers, int signo) +{ + struct rspamd_worker *cur; + TAILQ_FOREACH (cur, workers, next) { + kill (cur->pid, signo); + } +} + +void convert_to_lowercase (char *str, unsigned int size) +{ + while (size--) { + *str = tolower (*str); + str ++; + } +} + +#ifndef HAVE_SETPROCTITLE + +static char *title_buffer = 0; +static size_t title_buffer_size = 0; +static char *title_progname, *title_progname_full; + +int +setproctitle (const char *fmt, ...) +{ + if (!title_buffer || !title_buffer_size) { + errno = ENOMEM; + return -1; + } + + memset (title_buffer, '\0', title_buffer_size); + + ssize_t written; + + if (fmt) { + ssize_t written2; + va_list ap; + + written = snprintf (title_buffer, title_buffer_size, "%s: ", title_progname); + if (written < 0 || (size_t) written >= title_buffer_size) + return -1; + + va_start (ap, fmt); + written2 = + vsnprintf (title_buffer + written, + title_buffer_size - written, fmt, ap); + va_end (ap); + if (written2 < 0 + || (size_t) written2 >= title_buffer_size - written) + return -1; + } else { + written = + snprintf (title_buffer, title_buffer_size, "%s", + title_progname); + if (written < 0 || (size_t) written >= title_buffer_size) + return -1; + } + + written = strlen (title_buffer); + memset (title_buffer + written, '\0', title_buffer_size - written); + + return 0; +} + +/* + It has to be _init function, because __attribute__((constructor)) + functions gets called without arguments. +*/ + +int +init_title (int argc, char *argv[], char *envp[]) +{ + char *begin_of_buffer = 0, *end_of_buffer = 0; + int i; + + for (i = 0; i < argc; ++i) { + if (!begin_of_buffer) + begin_of_buffer = argv[i]; + if (!end_of_buffer || end_of_buffer + 1 == argv[i]) + end_of_buffer = argv[i] + strlen (argv[i]); + } + + for (i = 0; envp[i]; ++i) { + if (!begin_of_buffer) + begin_of_buffer = envp[i]; + if (!end_of_buffer || end_of_buffer + 1 == envp[i]) + end_of_buffer = envp[i] + strlen(envp[i]); + } + + if (!end_of_buffer) + return 0; + + char **new_environ = g_malloc ((i + 1) * sizeof (envp[0])); + + if (!new_environ) + return 0; + + for (i = 0; envp[i]; ++i) { + if (!(new_environ[i] = g_strdup (envp[i]))) + goto cleanup_enomem; + } + new_environ[i] = 0; + + if (program_invocation_name) { + title_progname_full = g_strdup (program_invocation_name); + + if (!title_progname_full) + goto cleanup_enomem; + + char *p = strrchr (title_progname_full, '/'); + + if (p) + title_progname = p + 1; + else + title_progname = title_progname_full; + + program_invocation_name = title_progname_full; + program_invocation_short_name = title_progname; + } + + environ = new_environ; + title_buffer = begin_of_buffer; + title_buffer_size = end_of_buffer - begin_of_buffer; + + return 0; + + cleanup_enomem: + for (--i; i >= 0; --i) { + g_free (new_environ[i]); + } + g_free (new_environ); + return 0; +} +#endif + +#ifndef HAVE_PIDFILE +extern char * __progname; +static int _pidfile_remove (struct pidfh *pfh, int freeit); + +static int +pidfile_verify (struct pidfh *pfh) +{ + struct stat sb; + + if (pfh == NULL || pfh->pf_fd == -1) + return (-1); + /* + * Check remembered descriptor. + */ + if (fstat (pfh->pf_fd, &sb) == -1) + return (errno); + if (sb.st_dev != pfh->pf_dev || sb.st_ino != pfh->pf_ino) + return -1; + return 0; +} + +static int +pidfile_read (const char *path, pid_t *pidptr) +{ + char buf[16], *endptr; + int error, fd, i; + + fd = open (path, O_RDONLY); + if (fd == -1) + return (errno); + + i = read (fd, buf, sizeof(buf) - 1); + error = errno; /* Remember errno in case close() wants to change it. */ + close (fd); + if (i == -1) + return error; + else if (i == 0) + return EAGAIN; + buf[i] = '\0'; + + *pidptr = strtol (buf, &endptr, 10); + if (endptr != &buf[i]) + return EINVAL; + + return 0; +} + +struct pidfh * +pidfile_open (const char *path, mode_t mode, pid_t *pidptr) +{ + struct pidfh *pfh; + struct stat sb; + int error, fd, len, count; + struct timespec rqtp; + + pfh = g_malloc (sizeof(*pfh)); + if (pfh == NULL) + return NULL; + + if (path == NULL) + len = snprintf (pfh->pf_path, sizeof(pfh->pf_path), + "/var/run/%s.pid", __progname); + else + len = snprintf (pfh->pf_path, sizeof(pfh->pf_path), + "%s", path); + if (len >= (int)sizeof (pfh->pf_path)) { + g_free (pfh); + errno = ENAMETOOLONG; + return NULL; + } + + /* + * Open the PID file and obtain exclusive lock. + * We truncate PID file here only to remove old PID immediatelly, + * PID file will be truncated again in pidfile_write(), so + * pidfile_write() can be called multiple times. + */ + fd = open (pfh->pf_path, + O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, mode); + flock (fd, LOCK_EX | LOCK_NB); + if (fd == -1) { + count = 0; + rqtp.tv_sec = 0; + rqtp.tv_nsec = 5000000; + if (errno == EWOULDBLOCK && pidptr != NULL) { + again: + errno = pidfile_read (pfh->pf_path, pidptr); + if (errno == 0) + errno = EEXIST; + else if (errno == EAGAIN) { + if (++count <= 3) { + nanosleep (&rqtp, 0); + goto again; + } + } + } + g_free (pfh); + return NULL; + } + /* + * Remember file information, so in pidfile_write() we are sure we write + * to the proper descriptor. + */ + if (fstat (fd, &sb) == -1) { + error = errno; + unlink (pfh->pf_path); + close (fd); + g_free (pfh); + errno = error; + return NULL; + } + + pfh->pf_fd = fd; + pfh->pf_dev = sb.st_dev; + pfh->pf_ino = sb.st_ino; + + return pfh; +} + +int +pidfile_write (struct pidfh *pfh) +{ + char pidstr[16]; + int error, fd; + + /* + * Check remembered descriptor, so we don't overwrite some other + * file if pidfile was closed and descriptor reused. + */ + errno = pidfile_verify (pfh); + if (errno != 0) { + /* + * Don't close descriptor, because we are not sure if it's ours. + */ + return -1; + } + fd = pfh->pf_fd; + + /* + * Truncate PID file, so multiple calls of pidfile_write() are allowed. + */ + if (ftruncate (fd, 0) == -1) { + error = errno; + _pidfile_remove (pfh, 0); + errno = error; + return -1; + } + + snprintf (pidstr, sizeof(pidstr), "%u", getpid ()); + if (pwrite (fd, pidstr, strlen (pidstr), 0) != (ssize_t)strlen (pidstr)) { + error = errno; + _pidfile_remove (pfh, 0); + errno = error; + return -1; + } + + return 0; +} + +int +pidfile_close (struct pidfh *pfh) +{ + int error; + + error = pidfile_verify (pfh); + if (error != 0) { + errno = error; + return -1; + } + + if (close (pfh->pf_fd) == -1) + error = errno; + g_free (pfh); + if (error != 0) { + errno = error; + return -1; + } + return 0; +} + +static int +_pidfile_remove (struct pidfh *pfh, int freeit) +{ + int error; + + error = pidfile_verify (pfh); + if (error != 0) { + errno = error; + return -1; + } + + if (unlink (pfh->pf_path) == -1) + error = errno; + if (flock (pfh->pf_fd, LOCK_UN) == -1) { + if (error == 0) + error = errno; + } + if (close (pfh->pf_fd) == -1) { + if (error == 0) + error = errno; + } + if (freeit) + g_free (pfh); + else + pfh->pf_fd = -1; + if (error != 0) { + errno = error; + return -1; + } + return 0; +} + +int +pidfile_remove (struct pidfh *pfh) +{ + + return (_pidfile_remove (pfh, 1)); +} +#endif + +/* + * Functions for parsing expressions + */ + +struct expression_stack { + char op; + struct expression_stack *next; +}; + +/* + * Push operand or operator to stack + */ +static struct expression_stack* +push_expression_stack (memory_pool_t *pool, struct expression_stack *head, char op) +{ + struct expression_stack *new; + new = memory_pool_alloc (pool, sizeof (struct expression_stack)); + new->op = op; + new->next = head; + return new; +} + +/* + * Delete symbol from stack, return pointer to operand or operator (casted to void* ) + */ +static char +delete_expression_stack (struct expression_stack **head) +{ + struct expression_stack *cur; + char res; + + if(*head == NULL) return 0; + + cur = *head; + res = cur->op; + + *head = cur->next; + return res; +} + +/* + * Return operation priority + */ +static int +logic_priority (char a) +{ + switch (a) { + case '!': + return 3; + case '|': + case '&': + return 2; + case '(': + return 1; + default: + return 0; + } +} + +/* + * Return 0 if symbol is not operation symbol (operand) + * Return 1 if symbol is operation symbol + */ +static int +is_operation_symbol (char a) +{ + switch (a) { + case '!': + case '&': + case '|': + case '(': + case ')': + return 1; + default: + return 0; + } +} + +static void +insert_expression (memory_pool_t *pool, struct expression **head, int type, char op, void *operand) +{ + struct expression *new, *cur; + + new = memory_pool_alloc (pool, sizeof (struct expression)); + new->type = type; + if (new->type == EXPR_OPERAND) { + new->content.operand = operand; + } + else { + new->content.operation = op; + } + new->next = NULL; + + if (!*head) { + *head = new; + } + else { + cur = *head; + while (cur->next) { + cur = cur->next; + } + cur->next = new; + } +} + +/* + * Make inverse polish record for specified expression + * Memory is allocated from given pool + */ +struct expression* +parse_expression (memory_pool_t *pool, char *line) +{ + struct expression *expr = NULL; + struct expression_stack *stack = NULL; + char *p, *c, *str, op, in_regexp = 0; + + if (line == NULL || pool == NULL) { + return NULL; + } + + p = line; + c = p; + while (*p) { + if (is_operation_symbol (*p) && !in_regexp) { + if (c != p) { + /* Copy operand */ + str = memory_pool_alloc (pool, p - c + 1); + strlcpy (str, c, (p - c + 1)); + insert_expression (pool, &expr, EXPR_OPERAND, 0, str); + } + if (*p == ')') { + if (stack == NULL) { + return NULL; + } + /* Pop all operators from stack to nearest '(' or to head */ + while (stack->op != '(') { + op = delete_expression_stack (&stack); + if (op != '(') { + insert_expression (pool, &expr, EXPR_OPERATION, op, NULL); + } + } + } + else if (*p == '(') { + /* Push it to stack */ + stack = push_expression_stack (pool, stack, *p); + } + else { + if (stack == NULL) { + stack = push_expression_stack (pool, stack, *p); + } + /* Check priority of logic operation */ + else { + if (logic_priority (stack->op) < logic_priority (*p)) { + stack = push_expression_stack (pool, stack, *p); + } + else { + /* Pop all operations that have higher priority than this one */ + while((stack != NULL) && (logic_priority (stack->op) >= logic_priority (*p))) { + op = delete_expression_stack (&stack); + if (op != '(') { + insert_expression (pool, &expr, EXPR_OPERATION, op, NULL); + } + } + stack = push_expression_stack (pool, stack, *p); + } + } + } + c = p + 1; + } + if (*p == '/' && (p == line || *(p - 1) != '\\')) { + in_regexp = !in_regexp; + } + p++; + } + /* Write last operand if it exists */ + if (c != p) { + /* Copy operand */ + str = memory_pool_alloc (pool, p - c + 1); + strlcpy (str, c, (p - c + 1)); + insert_expression (pool, &expr, EXPR_OPERAND, 0, str); + } + /* Pop everything from stack */ + while(stack != NULL) { + op = delete_expression_stack (&stack); + if (op != '(') { + insert_expression (pool, &expr, EXPR_OPERATION, op, NULL); + } + } + + return expr; +} + +/* Logging utility functions */ +int +open_log (struct config_file *cfg) +{ + switch (cfg->log_type) { + case RSPAMD_LOG_CONSOLE: + /* Do nothing with console */ + return 0; + case RSPAMD_LOG_SYSLOG: + openlog ("rspamd", LOG_NDELAY | LOG_PID, cfg->log_facility); + return 0; + case RSPAMD_LOG_FILE: + cfg->log_fd = open (cfg->log_file, O_CREAT | O_WRONLY | O_APPEND); + if (cfg->log_fd == -1) { + msg_err ("open_log: cannot open desired log file: %s, %m", cfg->log_file); + return -1; + } + return 0; + } +} + +int +reopen_log (struct config_file *cfg) +{ + do_reopen_log = 0; + switch (cfg->log_type) { + case RSPAMD_LOG_CONSOLE: + /* Do nothing with console */ + return 0; + case RSPAMD_LOG_SYSLOG: + closelog (); + break; + case RSPAMD_LOG_FILE: + close (cfg->log_fd); + break; + } + return open_log (cfg); +} + +void +syslog_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg) +{ + struct config_file *cfg = (struct config_file *)arg; + if (do_reopen_log) { + reopen_log (cfg); + } + + if (log_level <= cfg->log_level) { + if (log_level >= G_LOG_LEVEL_DEBUG) { + syslog (LOG_DEBUG, message); + } + else if (log_level >= G_LOG_LEVEL_INFO) { + syslog (LOG_INFO, message); + } + else if (log_level >= G_LOG_LEVEL_WARNING) { + syslog (LOG_WARNING, message); + } + else if (log_level >= G_LOG_LEVEL_CRITICAL) { + syslog (LOG_ERR, message); + } + } +} + +void +file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg) +{ + struct config_file *cfg = (struct config_file *)arg; + char tmpbuf[128]; + int r; + struct iovec out[3]; + + if (cfg->log_fd == -1) { + return; + } + + if (do_reopen_log) { + reopen_log (cfg); + } + + if (log_level <= cfg->log_level) { + r = snprintf (tmpbuf, sizeof (tmpbuf), "#%d: %d rspamd ", (int)getpid (), (int)time (NULL)); + out[0].iov_base = tmpbuf; + out[0].iov_len = r; + out[1].iov_base = (char *)message; + out[1].iov_len = strlen (message); + out[2].iov_base = "\r\n"; + out[2].iov_len = 2; + + writev (cfg->log_fd, out, sizeof (out) / sizeof (out[0])); + } +} +/* + * vi:ts=4 + */ diff --git a/src/util.h b/src/util.h new file mode 100644 index 000000000..1791f4635 --- /dev/null +++ b/src/util.h @@ -0,0 +1,60 @@ +#ifndef UTIL_H +#define UTIL_H + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "main.h" + +struct config_file; + +/* Create socket and bind it to specified address and port */ +int make_socket(const char *, u_short ); +/* Create and bind unix socket */ +int make_unix_socket (const char *, struct sockaddr_un *); +/* Parse command line arguments using getopt (3) */ +void read_cmd_line (int , char **, struct config_file *); +/* Write pid to file */ +int write_pid (struct rspamd_main *); +/* Make specified socket non-blocking */ +int event_make_socket_nonblocking(int); +/* Init signals */ +void init_signals (struct sigaction *, sig_t); +/* Send specified signal to each worker */ +void pass_signal_worker (struct workq *, int ); +/* Convert string to lowercase */ +void convert_to_lowercase (char *str, unsigned int size); + +#ifndef HAVE_SETPROCTITLE +int init_title(int argc, char *argv[], char *envp[]); +int setproctitle(const char *fmt, ...); +#endif + +#ifndef HAVE_PIDFILE +struct pidfh { + int pf_fd; + char pf_path[MAXPATHLEN + 1]; + __dev_t pf_dev; + ino_t pf_ino; +}; +struct pidfh *pidfile_open(const char *path, mode_t mode, pid_t *pidptr); +int pidfile_write(struct pidfh *pfh); +int pidfile_close(struct pidfh *pfh); +int pidfile_remove(struct pidfh *pfh); +#endif + +int open_log (struct config_file *cfg); +int reopen_log (struct config_file *cfg); +void syslog_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg); +void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg); + +#endif diff --git a/src/worker.c b/src/worker.c new file mode 100644 index 000000000..268e8123b --- /dev/null +++ b/src/worker.c @@ -0,0 +1,364 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include /* from the Perl distribution */ +#include /* from the Perl distribution */ + +#include +#include + +#include "util.h" +#include "main.h" +#include "protocol.h" +#include "upstream.h" +#include "cfg_file.h" +#include "url.h" +#include "modules.h" + +#define TASK_POOL_SIZE 4095 + +const f_str_t CRLF = { + /* begin */"\r\n", + /* len */2, + /* size */2 +}; + +extern PerlInterpreter *perl_interpreter; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: + _exit (1); + break; + } +} + +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + do_reopen_log = 1; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +static void +rcpt_destruct (void *pointer) +{ + struct worker_task *task = (struct worker_task *)pointer; + + if (task->rcpt) { + g_list_free (task->rcpt); + } +} + +static void +free_task (struct worker_task *task) +{ + struct mime_part *part; + + if (task) { + if (task->memc_ctx) { + memc_close_ctx (task->memc_ctx); + } + while (!TAILQ_EMPTY (&task->parts)) { + part = TAILQ_FIRST (&task->parts); + g_object_unref (part->type); + g_object_unref (part->content); + TAILQ_REMOVE (&task->parts, part, next); + } + memory_pool_delete (task->task_pool); + bufferevent_disable (task->bev, EV_READ | EV_WRITE); + bufferevent_free (task->bev); + close (task->sock); + g_free (task); + } +} + +static void +mime_foreach_callback (GMimeObject *part, gpointer user_data) +{ + struct worker_task *task = (struct worker_task *)user_data; + struct mime_part *mime_part; + GMimeContentType *type; + GMimeDataWrapper *wrapper; + GMimeStream *part_stream; + GByteArray *part_content; + + task->parts_count ++; + + /* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */ + + /* find out what class 'part' is... */ + if (GMIME_IS_MESSAGE_PART (part)) { + /* message/rfc822 or message/news */ + GMimeMessage *message; + + /* g_mime_message_foreach_part() won't descend into + child message parts, so if we want to count any + subparts of this child message, we'll have to call + g_mime_message_foreach_part() again here. */ + + message = g_mime_message_part_get_message ((GMimeMessagePart *) part); + g_mime_message_foreach_part (message, mime_foreach_callback, task); + g_object_unref (message); + } else if (GMIME_IS_MESSAGE_PARTIAL (part)) { + /* message/partial */ + + /* this is an incomplete message part, probably a + large message that the sender has broken into + smaller parts and is sending us bit by bit. we + could save some info about it so that we could + piece this back together again once we get all the + parts? */ + } else if (GMIME_IS_MULTIPART (part)) { + /* multipart/mixed, multipart/alternative, multipart/related, multipart/signed, multipart/encrypted, etc... */ + + /* we'll get to finding out if this is a signed/encrypted multipart later... */ + } else if (GMIME_IS_PART (part)) { + /* a normal leaf part, could be text/plain or image/jpeg etc */ + wrapper = g_mime_part_get_content_object (GMIME_PART (part)); + if (wrapper != NULL) { + part_stream = g_mime_stream_mem_new (); + if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) { + part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream)); + type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part)); + mime_part = memory_pool_alloc (task->task_pool, sizeof (struct mime_part)); + mime_part->type = type; + mime_part->content = part_content; + TAILQ_INSERT_TAIL (&task->parts, mime_part, next); + if (g_mime_content_type_is_type (type, "text", "html")) { + url_parse_html (task, part_content); + } + else if (g_mime_content_type_is_type (type, "text", "plain")) { + url_parse_text (task, part_content); + } + } + } + } else { + g_assert_not_reached (); + } +} + +static int +process_message (struct worker_task *task) +{ + GMimeMessage *message; + GMimeParser *parser; + GMimeStream *stream; + + stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len); + /* create a new parser object to parse the stream */ + parser = g_mime_parser_new_with_stream (stream); + + /* unref the stream (parser owns a ref, so this object does not actually get free'd until we destroy the parser) */ + g_object_unref (stream); + + /* parse the message from the stream */ + message = g_mime_parser_construct_message (parser); + + task->message = message; + memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_object_unref, task->message); + + /* free the parser (and the stream) */ + g_object_unref (parser); + + g_mime_message_foreach_part (message, mime_foreach_callback, task); + + msg_info ("process_message: found %d parts in message", task->parts_count); + + task->worker->srv->stat->messages_scanned ++; + + return process_filters (task); +} + +static void +read_socket (struct bufferevent *bev, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + ssize_t r; + char *s; + + switch (task->state) { + case READ_COMMAND: + case READ_HEADER: + s = evbuffer_readline (EVBUFFER_INPUT (bev)); + if (read_rspamd_input_line (task, s) != 0) { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } + if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { + bufferevent_enable (bev, EV_WRITE); + bufferevent_disable (bev, EV_READ); + } + free (s); + break; + case READ_MESSAGE: + r = bufferevent_read (bev, task->msg->pos, task->msg->free); + if (r > 0) { + task->msg->pos += r; + update_buf_size (task->msg); + if (task->msg->free == 0) { + r = process_message (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + } + else if (r == 1) { + task->state = WAIT_FILTER; + } + } + if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { + bufferevent_enable (bev, EV_WRITE); + bufferevent_disable (bev, EV_READ); + } + } + else { + msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r); + bufferevent_disable (bev, EV_READ); + bufferevent_free (bev); + free_task (task); + } + break; + case WAIT_FILTER: + bufferevent_disable (bev, EV_READ); + break; + } +} + +static void +write_socket (struct bufferevent *bev, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + + switch (task->state) { + case WRITE_REPLY: + write_reply (task); + task->state = CLOSING_CONNECTION; + bufferevent_disable (bev, EV_READ); + break; + case WRITE_ERROR: + write_reply (task); + task->state = CLOSING_CONNECTION; + bufferevent_disable (bev, EV_READ); + break; + case CLOSING_CONNECTION: + msg_debug ("write_socket: normally closing connection"); + free_task (task); + break; + default: + msg_info ("write_socket: abnormally closing connection"); + free_task (task); + break; + } +} + +static void +err_socket (struct bufferevent *bev, short what, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + msg_info ("err_socket: abnormally closing connection"); + /* Free buffers */ + free_task (task); +} + +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct worker_task *new_task; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + return; + } + if (event_make_socket_nonblocking(fd) < 0) { + return; + } + + new_task = g_malloc (sizeof (struct worker_task)); + if (new_task == NULL) { + msg_err ("accept_socket: cannot allocate memory for task, %m"); + return; + } + bzero (new_task, sizeof (struct worker_task)); + new_task->worker = worker; + new_task->state = READ_COMMAND; + new_task->sock = nfd; + new_task->cfg = worker->srv->cfg; + TAILQ_INIT (&new_task->urls); + TAILQ_INIT (&new_task->parts); + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + /* Add destructor for recipients list (it would be better to use anonymous function here */ + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task); + worker->srv->stat->connections_count ++; + + /* Read event */ + new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task); + bufferevent_enable (new_task->bev, EV_READ); +} + +void +start_worker (struct rspamd_worker *worker, int listen_sock) +{ + struct sigaction signals; + int i; + + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_WORKER; + event_init (); + g_mime_init (0); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + /* Accept event */ + event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Perform modules configuring */ + for (i = 0; i < MODULES_NUM; i ++) { + modules[i].module_config_func (worker->srv->cfg); + } + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + event_loop (0); +} + +/* + * vi:ts=4 + */ -- cgit v1.2.3