aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-11-01 18:01:05 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-11-01 18:01:05 +0300
commit2aa9c74f1c449da92f6faf870f8cc801a83bb08b (patch)
tree33f0f941f08583fd0c4c3653cadde8d6ce8426c2 /src
parentcc5343692b448c27485a24ea7f1b24d714bb82f6 (diff)
downloadrspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.tar.gz
rspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.zip
* Reorganize structure of source files
* Adopt build system for new structure --HG-- rename : cfg_file.h => src/cfg_file.h rename : cfg_file.l => src/cfg_file.l rename : cfg_file.y => src/cfg_file.y rename : cfg_utils.c => src/cfg_utils.c rename : controller.c => src/controller.c rename : filter.c => src/filter.c rename : filter.h => src/filter.h rename : fstring.c => src/fstring.c rename : fstring.h => src/fstring.h rename : main.c => src/main.c rename : main.h => src/main.h rename : mem_pool.c => src/mem_pool.c rename : mem_pool.h => src/mem_pool.h rename : memcached-test.c => src/memcached-test.c rename : memcached.c => src/memcached.c rename : memcached.h => src/memcached.h rename : perl.c => src/perl.c rename : perl.h => src/perl.h rename : plugins/regexp.c => src/plugins/regexp.c rename : plugins/surbl.c => src/plugins/surbl.c rename : protocol.c => src/protocol.c rename : protocol.h => src/protocol.h rename : upstream.c => src/upstream.c rename : upstream.h => src/upstream.h rename : url.c => src/url.c rename : url.h => src/url.h rename : util.c => src/util.c rename : util.h => src/util.h rename : worker.c => src/worker.c
Diffstat (limited to 'src')
-rw-r--r--src/cfg_file.h167
-rw-r--r--src/cfg_file.l134
-rw-r--r--src/cfg_file.y545
-rw-r--r--src/cfg_utils.c563
-rw-r--r--src/controller.c349
-rw-r--r--src/filter.c336
-rw-r--r--src/filter.h43
-rw-r--r--src/fstring.c234
-rw-r--r--src/fstring.h86
-rw-r--r--src/main.c427
-rw-r--r--src/main.h201
-rw-r--r--src/mem_pool.c360
-rw-r--r--src/mem_pool.h61
-rw-r--r--src/memcached-test.c79
-rw-r--r--src/memcached.c792
-rw-r--r--src/memcached.h142
-rw-r--r--src/perl.c190
-rw-r--r--src/perl.h19
-rw-r--r--src/plugins/regexp.c247
-rw-r--r--src/plugins/surbl.c593
-rw-r--r--src/protocol.c492
-rw-r--r--src/protocol.h31
-rw-r--r--src/upstream.c521
-rw-r--r--src/upstream.h43
-rw-r--r--src/url.c886
-rw-r--r--src/url.h88
-rw-r--r--src/util.c834
-rw-r--r--src/util.h60
-rw-r--r--src/worker.c364
29 files changed, 8887 insertions, 0 deletions
diff --git a/src/cfg_file.h b/src/cfg_file.h
new file mode 100644
index 000000000..1eb71476d
--- /dev/null
+++ b/src/cfg_file.h
@@ -0,0 +1,167 @@
+/*
+ * $Id$
+ */
+
+
+#ifndef CFG_FILE_H
+#define CFG_FILE_H
+
+#include "config.h"
+#include <sys/types.h>
+#ifndef HAVE_OWN_QUEUE_H
+#include <sys/queue.h>
+#else
+#include "queue.h"
+#endif
+#include <netinet/in.h>
+#include <sys/un.h>
+#include <event.h>
+#include <glib.h>
+#include "mem_pool.h"
+#include "upstream.h"
+#include "memcached.h"
+#include "filter.h"
+
+#define DEFAULT_BIND_PORT 768
+#define DEFAULT_CONTROL_PORT 7608
+#define MAX_MEMCACHED_SERVERS 48
+#define DEFAULT_MEMCACHED_PORT 11211
+/* Memcached timeouts */
+#define DEFAULT_MEMCACHED_CONNECT_TIMEOUT 1000
+/* Upstream timeouts */
+#define DEFAULT_UPSTREAM_ERROR_TIME 10
+#define DEFAULT_UPSTREAM_ERROR_TIME 10
+#define DEFAULT_UPSTREAM_DEAD_TIME 300
+#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+/* 1 worker by default */
+#define DEFAULT_WORKERS_NUM 1
+
+#define yyerror(fmt, ...) \
+ fprintf (stderr, "Config file parse error!\non line: %d\n", yylineno); \
+ fprintf (stderr, "while reading text: %s\nreason: ", yytext); \
+ fprintf (stderr, fmt, ##__VA_ARGS__); \
+ fprintf (stderr, "\n")
+#define yywarn(fmt, ...) \
+ fprintf (stderr, "Config file parse warning!\non line %d\n", yylineno); \
+ fprintf (stderr, "while reading text: %s\nreason: ", yytext); \
+ fprintf (stderr, fmt, ##__VA_ARGS__); \
+ fprintf (stderr, "\n")
+
+struct expression;
+
+enum { VAL_UNDEF=0, VAL_TRUE, VAL_FALSE };
+
+enum rspamd_regexp_type {
+ REGEXP_NONE = 0,
+ REGEXP_HEADER,
+ REGEXP_MIME,
+ REGEXP_MESSAGE,
+ REGEXP_URL,
+};
+
+enum rspamd_log_type {
+ RSPAMD_LOG_CONSOLE,
+ RSPAMD_LOG_SYSLOG,
+ RSPAMD_LOG_FILE,
+};
+
+struct rspamd_regexp {
+ enum rspamd_regexp_type type;
+ char *regexp_text;
+ GRegex *regexp;
+ char *header;
+};
+
+struct memcached_server {
+ struct upstream up;
+ struct in_addr addr;
+ uint16_t port;
+ short alive;
+ short int num;
+};
+
+struct perl_module {
+ char *path;
+ LIST_ENTRY (perl_module) next;
+};
+
+struct module_opt {
+ char *param;
+ char *value;
+ LIST_ENTRY (module_opt) next;
+};
+
+struct config_file {
+ memory_pool_t *cfg_pool;
+ char *cfg_name;
+ char *pid_file;
+ char *temp_dir;
+
+ char *bind_host;
+ struct in_addr bind_addr;
+ uint16_t bind_port;
+ uint16_t bind_family;
+
+ char *control_host;
+ struct in_addr control_addr;
+ uint16_t control_port;
+ uint16_t control_family;
+ int controller_enabled;
+ char *control_password;
+
+ int no_fork;
+ unsigned int workers_number;
+
+ enum rspamd_log_type log_type;
+ int log_facility;
+ int log_level;
+ char *log_file;
+ int log_fd;
+
+ struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS];
+ size_t memcached_servers_num;
+ memc_proto_t memcached_protocol;
+ unsigned int memcached_error_time;
+ unsigned int memcached_dead_time;
+ unsigned int memcached_maxerrors;
+ unsigned int memcached_connect_timeout;
+
+ LIST_HEAD (modulesq, perl_module) perl_modules;
+ LIST_HEAD (headersq, filter) header_filters;
+ LIST_HEAD (mimesq, filter) mime_filters;
+ LIST_HEAD (messagesq, filter) message_filters;
+ LIST_HEAD (urlsq, filter) url_filters;
+ char *header_filters_str;
+ char *mime_filters_str;
+ char *message_filters_str;
+ char *url_filters_str;
+ GHashTable* modules_opts;
+ GHashTable* variables;
+ GHashTable* metrics;
+ GHashTable* factors;
+ GHashTable* c_modules;
+ GHashTable* composite_symbols;
+};
+
+int add_memcached_server (struct config_file *cf, char *str);
+int parse_bind_line (struct config_file *cf, char *str, char is_control);
+void init_defaults (struct config_file *cfg);
+void free_config (struct config_file *cfg);
+char* get_module_opt (struct config_file *cfg, char *module_name, char *opt_name);
+size_t parse_limit (const char *limit);
+unsigned int parse_seconds (const char *t);
+char parse_flag (const char *str);
+char* substitute_variable (struct config_file *cfg, char *str, u_char recursive);
+void post_load_config (struct config_file *cfg);
+struct rspamd_regexp* parse_regexp (memory_pool_t *pool, char *line);
+struct expression* parse_expression (memory_pool_t *pool, char *line);
+
+int yylex (void);
+int yyparse (void);
+void yyrestart (FILE *);
+
+#endif /* ifdef CFG_FILE_H */
+/*
+ * vi:ts=4
+ */
diff --git a/src/cfg_file.l b/src/cfg_file.l
new file mode 100644
index 000000000..fefd11237
--- /dev/null
+++ b/src/cfg_file.l
@@ -0,0 +1,134 @@
+%x incl
+%x module
+
+%{
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <syslog.h>
+#include "cfg_file.h"
+#include "cfg_yacc.h"
+
+#define MAX_INCLUDE_DEPTH 10
+YY_BUFFER_STATE include_stack[MAX_INCLUDE_DEPTH];
+int include_stack_ptr = 0;
+extern struct config_file *cfg;
+
+%}
+
+%option noyywrap
+%option yylineno
+
+%%
+^[ \t]*#.* /* ignore comments */;
+.include BEGIN(incl);
+.module BEGIN(module);
+composites return COMPOSITES;
+tempdir return TEMPDIR;
+pidfile return PIDFILE;
+workers return WORKERS;
+error_time return ERROR_TIME;
+dead_time return DEAD_TIME;
+maxerrors return MAXERRORS;
+reconnect_timeout return RECONNECT_TIMEOUT;
+connect_timeout return CONNECT_TIMEOUT;
+protocol return PROTOCOL;
+memcached return MEMCACHED;
+bind_socket return BINDSOCK;
+servers return SERVERS;
+require return REQUIRE;
+header_filters return HEADER_FILTERS;
+mime_filters return MIME_FILTERS;
+message_filters return MESSAGE_FILTERS;
+url_filters return URL_FILTERS;
+factors return FACTORS;
+metric return METRIC;
+name return NAME;
+required_score return REQUIRED_SCORE;
+function return FUNCTION;
+control return CONTROL;
+password return PASSWORD;
+logging return LOGGING;
+
+log_type return LOG_TYPE;
+console return LOG_TYPE_CONSOLE;
+syslog return LOG_TYPE_SYSLOG;
+file return LOG_TYPE_FILE;
+
+log_level return LOG_LEVEL;
+DEBUG return LOG_LEVEL_DEBUG;
+INFO return LOG_LEVEL_INFO;
+WARNING return LOG_LEVEL_WARNING;
+ERROR return LOG_LEVEL_ERROR;
+log_facility return LOG_FACILITY;
+log_file return LOG_FILENAME;
+
+\{ return OBRACE;
+\} return EBRACE;
+; return SEMICOLON;
+, return COMMA;
+= return EQSIGN;
+yes|YES|no|NO|[yY]|[nN] yylval.flag=parse_flag(yytext); return FLAG;
+\n /* ignore EOL */;
+[ \t]+ /* ignore whitespace */;
+\"[^"]+\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; return QUOTEDSTRING;
+\" return QUOTE;
+\$[a-zA-Z_][a-zA-Z0-9_]+ yylval.string=strdup(yytext + 1); return VARIABLE;
+[0-9]+ yylval.number=strtol(yytext, NULL, 10); return NUMBER;
+-?[0-9]+\.?[0-9]* yylval.fract=strtod(yytext, NULL); return FRACT;
+[0-9]+[kKmMgG]? yylval.limit=parse_limit(yytext); return SIZELIMIT;
+[0-9]+[sS]|[0-9]+[mM][sS] yylval.seconds=parse_seconds(yytext); return SECONDS;
+[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3} yylval.string=strdup(yytext); return IPADDR;
+[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/[0-9]{1,2} yylval.string=strdup(yytext); return IPNETWORK;
+[a-zA-Z0-9.-]+:[0-9]{1,5} yylval.string=strdup(yytext); return HOSTPORT;
+[a-zA-Z<][a-zA-Z@+>_-]* yylval.string=strdup(yytext); return STRING;
+\/[^/\n]+\/ yylval.string=strdup(yytext); return REGEXP;
+[a-zA-Z0-9].[a-zA-Z0-9\/.-]+ yylval.string=strdup(yytext); return DOMAIN;
+<incl>[ \t]* /* eat the whitespace */
+<incl>[^ \t\n]+ { /* got the include file name */
+ if (include_stack_ptr >= MAX_INCLUDE_DEPTH) {
+ yyerror ("yylex: includes nested too deeply");
+ return -1;
+ }
+
+ include_stack[include_stack_ptr++] =
+ YY_CURRENT_BUFFER;
+
+ yyin = fopen (yytext, "r");
+
+ if (! yyin) {
+ yyerror ("yylex: cannot open include file");
+ return -1;
+ }
+
+ yy_switch_to_buffer (yy_create_buffer (yyin, YY_BUF_SIZE));
+
+ BEGIN(INITIAL);
+ }
+
+<<EOF>> {
+ if ( --include_stack_ptr < 0 ) {
+ post_load_config (cfg);
+ yyterminate ();
+ }
+ else {
+ yy_delete_buffer (YY_CURRENT_BUFFER);
+ yy_switch_to_buffer (include_stack[include_stack_ptr]);
+ }
+ }
+
+<module>\n /* ignore EOL */;
+<module>[ \t]+ /* ignore whitespace */;
+<module>\'[a-zA-Z0-9_-]\' yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; return MODULE_OPT;
+<module>\{ return OBRACE;
+<module>\} return EBRACE;
+<module>\; return SEMICOLON;
+<module>[a-zA-Z0-9_-] yylval.string=strdup(yytext); return PARAM;
+<module>\$[a-zA-Z_][a-zA-Z0-9_]+ yylval.string=strdup(yytext + 1); return VARIABLE;
+<module>\"[^"]+\" yylval.string=strdup(yytext + 1); yylval.string[strlen(yylval.string) - 1] = '\0'; return QUOTEDSTRING;
+
+%%
+/*
+ * vi:ts=4
+ */
diff --git a/src/cfg_file.y b/src/cfg_file.y
new file mode 100644
index 000000000..dbbdd4e63
--- /dev/null
+++ b/src/cfg_file.y
@@ -0,0 +1,545 @@
+/* $Id$ */
+
+%{
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <syslog.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <glib.h>
+
+#include "cfg_file.h"
+#include "main.h"
+
+#define YYDEBUG 1
+
+extern struct config_file *cfg;
+extern int yylineno;
+extern char *yytext;
+
+LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+struct metric *cur_metric = NULL;
+
+%}
+
+%union
+{
+ char *string;
+ size_t limit;
+ char flag;
+ unsigned int seconds;
+ unsigned int number;
+ double fract;
+}
+
+%token ERROR STRING QUOTEDSTRING FLAG
+%token FILENAME REGEXP QUOTE SEMICOLON OBRACE EBRACE COMMA EQSIGN
+%token BINDSOCK SOCKCRED DOMAIN IPADDR IPNETWORK HOSTPORT NUMBER CHECK_TIMEOUT
+%token MAXSIZE SIZELIMIT SECONDS BEANSTALK MYSQL USER PASSWORD DATABASE
+%token TEMPDIR PIDFILE SERVERS ERROR_TIME DEAD_TIME MAXERRORS CONNECT_TIMEOUT PROTOCOL RECONNECT_TIMEOUT
+%token READ_SERVERS WRITE_SERVER DIRECTORY_SERVERS MAILBOX_QUERY USERS_QUERY LASTLOGIN_QUERY
+%token MEMCACHED WORKERS REQUIRE MODULE
+%token MODULE_OPT PARAM VARIABLE
+%token HEADER_FILTERS MIME_FILTERS MESSAGE_FILTERS URL_FILTERS FACTORS METRIC NAME
+%token REQUIRED_SCORE FUNCTION FRACT COMPOSITES CONTROL PASSWORD
+%token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
+%token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
+
+%type <string> STRING
+%type <string> VARIABLE
+%type <string> QUOTEDSTRING MODULE_OPT PARAM
+%type <string> FILENAME
+%type <string> SOCKCRED
+%type <string> IPADDR IPNETWORK
+%type <string> HOSTPORT
+%type <string> DOMAIN
+%type <limit> SIZELIMIT
+%type <flag> FLAG
+%type <seconds> SECONDS
+%type <number> NUMBER
+%type <string> memcached_hosts bind_cred
+%type <fract> FRACT
+%%
+
+file : /* empty */
+ | file command SEMICOLON { }
+ ;
+
+command :
+ bindsock
+ | control
+ | tempdir
+ | pidfile
+ | memcached
+ | workers
+ | require
+ | header_filters
+ | mime_filters
+ | message_filters
+ | url_filters
+ | module_opt
+ | variable
+ | factors
+ | metric
+ | composites
+ | logging
+ ;
+
+tempdir :
+ TEMPDIR EQSIGN QUOTEDSTRING {
+ struct stat st;
+
+ if (stat ($3, &st) == -1) {
+ yyerror ("yyparse: cannot stat directory \"%s\": %s", $3, strerror (errno));
+ YYERROR;
+ }
+ if (!S_ISDIR (st.st_mode)) {
+ yyerror ("yyparse: \"%s\" is not a directory", $3);
+ YYERROR;
+ }
+ cfg->temp_dir = memory_pool_strdup (cfg->cfg_pool, $3);
+ free ($3);
+ }
+ ;
+
+pidfile :
+ PIDFILE EQSIGN QUOTEDSTRING {
+ cfg->pid_file = $3;
+ }
+ ;
+
+control:
+ CONTROL OBRACE controlbody EBRACE
+ ;
+
+controlbody:
+ controlcmd SEMICOLON
+ | controlbody controlcmd SEMICOLON
+ ;
+
+controlcmd:
+ controlsock
+ | controlpassword
+ ;
+
+controlsock:
+ BINDSOCK EQSIGN bind_cred {
+ if (!parse_bind_line (cfg, $3, 1)) {
+ yyerror ("yyparse: parse_bind_line");
+ YYERROR;
+ }
+ cfg->controller_enabled = 1;
+ free ($3);
+ }
+ ;
+controlpassword:
+ PASSWORD EQSIGN QUOTEDSTRING {
+ cfg->control_password = memory_pool_strdup (cfg->cfg_pool, $3);
+ }
+ ;
+
+bindsock:
+ BINDSOCK EQSIGN bind_cred {
+ if (!parse_bind_line (cfg, $3, 0)) {
+ yyerror ("yyparse: parse_bind_line");
+ YYERROR;
+ }
+ free ($3);
+ }
+ ;
+
+bind_cred:
+ STRING {
+ $$ = $1;
+ }
+ | IPADDR{
+ $$ = $1;
+ }
+ | DOMAIN {
+ $$ = $1;
+ }
+ | HOSTPORT {
+ $$ = $1;
+ }
+ | QUOTEDSTRING {
+ $$ = $1;
+ }
+ ;
+
+header_filters:
+ HEADER_FILTERS EQSIGN QUOTEDSTRING {
+ cfg->header_filters_str = memory_pool_strdup (cfg->cfg_pool, $3);
+ free ($3);
+ }
+ ;
+
+mime_filters:
+ MIME_FILTERS EQSIGN QUOTEDSTRING {
+ cfg->mime_filters_str = memory_pool_strdup (cfg->cfg_pool, $3);
+ free ($3);
+ }
+ ;
+
+message_filters:
+ MESSAGE_FILTERS EQSIGN QUOTEDSTRING {
+ cfg->message_filters_str = memory_pool_strdup (cfg->cfg_pool, $3);
+ free ($3);
+ }
+ ;
+
+url_filters:
+ URL_FILTERS EQSIGN QUOTEDSTRING {
+ cfg->url_filters_str = memory_pool_strdup (cfg->cfg_pool, $3);
+ free ($3);
+ }
+ ;
+
+memcached:
+ MEMCACHED OBRACE memcachedbody EBRACE
+ ;
+
+memcachedbody:
+ memcachedcmd SEMICOLON
+ | memcachedbody memcachedcmd SEMICOLON
+ ;
+
+memcachedcmd:
+ memcached_servers
+ | memcached_connect_timeout
+ | memcached_error_time
+ | memcached_dead_time
+ | memcached_maxerrors
+ | memcached_protocol
+ ;
+
+memcached_servers:
+ SERVERS EQSIGN memcached_server
+ ;
+
+memcached_server:
+ memcached_params
+ | memcached_server COMMA memcached_params
+ ;
+
+memcached_params:
+ memcached_hosts {
+ if (!add_memcached_server (cfg, $1)) {
+ yyerror ("yyparse: add_memcached_server");
+ YYERROR;
+ }
+ free ($1);
+ }
+ ;
+memcached_hosts:
+ STRING
+ | IPADDR
+ | DOMAIN
+ | HOSTPORT
+ ;
+memcached_error_time:
+ ERROR_TIME EQSIGN NUMBER {
+ cfg->memcached_error_time = $3;
+ }
+ ;
+memcached_dead_time:
+ DEAD_TIME EQSIGN NUMBER {
+ cfg->memcached_dead_time = $3;
+ }
+ ;
+memcached_maxerrors:
+ MAXERRORS EQSIGN NUMBER {
+ cfg->memcached_maxerrors = $3;
+ }
+ ;
+memcached_connect_timeout:
+ CONNECT_TIMEOUT EQSIGN SECONDS {
+ cfg->memcached_connect_timeout = $3;
+ }
+ ;
+
+memcached_protocol:
+ PROTOCOL EQSIGN STRING {
+ if (strncasecmp ($3, "udp", sizeof ("udp") - 1) == 0) {
+ cfg->memcached_protocol = UDP_TEXT;
+ }
+ else if (strncasecmp ($3, "tcp", sizeof ("tcp") - 1) == 0) {
+ cfg->memcached_protocol = TCP_TEXT;
+ }
+ else {
+ yyerror ("yyparse: cannot recognize protocol: %s", $3);
+ YYERROR;
+ }
+ }
+ ;
+workers:
+ WORKERS EQSIGN NUMBER {
+ cfg->workers_number = $3;
+ }
+ ;
+
+metric:
+ METRIC OBRACE metricbody EBRACE {
+ if (cur_metric == NULL || cur_metric->name == NULL) {
+ yyerror ("yyparse: not enough arguments in metric definition");
+ YYERROR;
+ }
+ g_hash_table_insert (cfg->metrics, cur_metric->name, cur_metric);
+ cur_metric = NULL;
+ }
+ ;
+
+metricbody:
+ | metriccmd SEMICOLON
+ | metricbody metriccmd SEMICOLON
+ ;
+metriccmd:
+ | metricname
+ | metricfunction
+ | metricscore
+ ;
+
+metricname:
+ NAME EQSIGN QUOTEDSTRING {
+ if (cur_metric == NULL) {
+ cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric));
+ }
+ cur_metric->name = memory_pool_strdup (cfg->cfg_pool, $3);
+ }
+ ;
+
+metricfunction:
+ FUNCTION EQSIGN QUOTEDSTRING {
+ if (cur_metric == NULL) {
+ cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric));
+ }
+ cur_metric->func_name = memory_pool_strdup (cfg->cfg_pool, $3);
+ }
+ ;
+
+metricscore:
+ REQUIRED_SCORE EQSIGN NUMBER {
+ if (cur_metric == NULL) {
+ cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric));
+ }
+ cur_metric->required_score = $3;
+ }
+ | REQUIRED_SCORE EQSIGN FRACT {
+ if (cur_metric == NULL) {
+ cur_metric = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct metric));
+ }
+ cur_metric->required_score = $3;
+ }
+ ;
+
+factors:
+ FACTORS OBRACE factorsbody EBRACE
+ ;
+
+factorsbody:
+ factorparam SEMICOLON
+ | factorsbody factorparam SEMICOLON
+ ;
+
+factorparam:
+ QUOTEDSTRING EQSIGN FRACT {
+ double *tmp = memory_pool_alloc (cfg->cfg_pool, sizeof (double));
+ *tmp = $3;
+ g_hash_table_insert (cfg->factors, $1, tmp);
+ }
+ | QUOTEDSTRING EQSIGN NUMBER {
+ double *tmp = memory_pool_alloc (cfg->cfg_pool, sizeof (double));
+ *tmp = $3;
+ g_hash_table_insert (cfg->factors, $1, tmp);
+ };
+
+require:
+ REQUIRE OBRACE requirebody EBRACE
+ ;
+
+requirebody:
+ requirecmd SEMICOLON
+ | requirebody requirecmd SEMICOLON
+ ;
+
+requirecmd:
+ MODULE EQSIGN QUOTEDSTRING {
+ struct stat st;
+ struct perl_module *cur;
+ if (stat ($3, &st) == -1) {
+ yyerror ("yyparse: cannot stat file %s, %m", $3);
+ YYERROR;
+ }
+ cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct perl_module));
+ if (cur == NULL) {
+ yyerror ("yyparse: g_malloc: %s", strerror(errno));
+ YYERROR;
+ }
+ cur->path = $3;
+ LIST_INSERT_HEAD (&cfg->perl_modules, cur, next);
+ }
+ ;
+
+composites:
+ COMPOSITES OBRACE compositesbody EBRACE
+ ;
+
+compositesbody:
+ compositescmd SEMICOLON
+ | compositesbody compositescmd SEMICOLON
+ ;
+
+compositescmd:
+ QUOTEDSTRING EQSIGN QUOTEDSTRING {
+ struct expression *expr;
+ if ((expr = parse_expression (cfg->cfg_pool, $3)) == NULL) {
+ yyerror ("yyparse: cannot parse composite expression: %s", $3);
+ YYERROR;
+ }
+ g_hash_table_insert (cfg->composite_symbols, $1, expr);
+ }
+ ;
+module_opt:
+ MODULE_OPT OBRACE moduleoptbody EBRACE {
+ g_hash_table_insert (cfg->modules_opts, $1, cur_module_opt);
+ cur_module_opt = NULL;
+ }
+ ;
+
+moduleoptbody:
+ optcmd SEMICOLON
+ | moduleoptbody optcmd SEMICOLON
+ ;
+
+optcmd:
+ PARAM EQSIGN QUOTEDSTRING {
+ struct module_opt *mopt;
+ if (cur_module_opt == NULL) {
+ cur_module_opt = g_malloc (sizeof (cur_module_opt));
+ LIST_INIT (cur_module_opt);
+ }
+ mopt = memory_pool_alloc (cfg->cfg_pool, sizeof (struct module_opt));
+ mopt->param = $1;
+ mopt->value = $3;
+ LIST_INSERT_HEAD (cur_module_opt, mopt, next);
+ }
+ ;
+
+variable:
+ VARIABLE EQSIGN QUOTEDSTRING {
+ g_hash_table_insert (cfg->variables, $1, $3);
+ }
+ ;
+
+logging:
+ LOGGING OBRACE loggingbody EBRACE
+ ;
+
+loggingbody:
+ loggingcmd SEMICOLON
+ | loggingbody loggingcmd SEMICOLON
+ ;
+
+loggingcmd:
+ loggingtype
+ | logginglevel
+ | loggingfacility
+ | loggingfile
+ ;
+
+loggingtype:
+ LOG_TYPE EQSIGN LOG_TYPE_CONSOLE {
+ cfg->log_type = RSPAMD_LOG_CONSOLE;
+ }
+ LOG_TYPE EQSIGN LOG_TYPE_SYSLOG {
+ cfg->log_type = RSPAMD_LOG_SYSLOG;
+ }
+ LOG_TYPE EQSIGN LOG_TYPE_FILE {
+ cfg->log_type = RSPAMD_LOG_FILE;
+ }
+ ;
+
+logginglevel:
+ LOG_LEVEL EQSIGN LOG_LEVEL_DEBUG {
+ cfg->log_level = G_LOG_LEVEL_DEBUG;
+ }
+ LOG_LEVEL EQSIGN LOG_LEVEL_INFO {
+ cfg->log_level = G_LOG_LEVEL_INFO | G_LOG_LEVEL_MESSAGE;
+ }
+ LOG_LEVEL EQSIGN LOG_LEVEL_WARNING {
+ cfg->log_level = G_LOG_LEVEL_WARNING;
+ }
+ LOG_LEVEL EQSIGN LOG_LEVEL_ERROR {
+ cfg->log_level = G_LOG_LEVEL_ERROR | G_LOG_LEVEL_CRITICAL;
+ }
+ ;
+
+loggingfacility:
+ LOG_FACILITY EQSIGN QUOTEDSTRING {
+ if (strncasecmp ($3, "LOG_AUTH", sizeof ("LOG_AUTH") - 1) == 0) {
+ cfg->log_facility = LOG_AUTH;
+ }
+ else if (strncasecmp ($3, "LOG_CRON", sizeof ("LOG_CRON") - 1) == 0) {
+ cfg->log_facility = LOG_CRON;
+ }
+ else if (strncasecmp ($3, "LOG_DAEMON", sizeof ("LOG_DAEMON") - 1) == 0) {
+ cfg->log_facility = LOG_DAEMON;
+ }
+ else if (strncasecmp ($3, "LOG_MAIL", sizeof ("LOG_MAIL") - 1) == 0) {
+ cfg->log_facility = LOG_MAIL;
+ }
+ else if (strncasecmp ($3, "LOG_USER", sizeof ("LOG_USER") - 1) == 0) {
+ cfg->log_facility = LOG_USER;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL0", sizeof ("LOG_LOCAL0") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL0;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL1", sizeof ("LOG_LOCAL1") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL1;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL2", sizeof ("LOG_LOCAL2") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL2;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL3", sizeof ("LOG_LOCAL3") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL3;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL4", sizeof ("LOG_LOCAL4") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL4;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL5", sizeof ("LOG_LOCAL5") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL5;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL6", sizeof ("LOG_LOCAL6") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL6;
+ }
+ else if (strncasecmp ($3, "LOG_LOCAL7", sizeof ("LOG_LOCAL7") - 1) == 0) {
+ cfg->log_facility = LOG_LOCAL7;
+ }
+ else {
+ yyerror ("yyparse: invalid logging facility: %s", $3);
+ YYERROR;
+ }
+
+ free ($3);
+ }
+ ;
+
+loggingfile:
+ LOG_FILENAME EQSIGN QUOTEDSTRING {
+ cfg->log_file = memory_pool_strdup (cfg->cfg_pool, $3);
+
+ free ($3);
+ }
+ ;
+
+%%
+/*
+ * vi:ts=4
+ */
diff --git a/src/cfg_utils.c b/src/cfg_utils.c
new file mode 100644
index 000000000..9fa7aac47
--- /dev/null
+++ b/src/cfg_utils.c
@@ -0,0 +1,563 @@
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <syslog.h>
+#include <netdb.h>
+#include <math.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "config.h"
+#include "cfg_file.h"
+#include "main.h"
+#ifndef HAVE_OWN_QUEUE_H
+#include <sys/queue.h>
+#else
+#include "queue.h"
+#endif
+
+extern int yylineno;
+extern char *yytext;
+
+int
+add_memcached_server (struct config_file *cf, char *str)
+{
+ char *cur_tok, *err_str;
+ struct memcached_server *mc;
+ struct hostent *hent;
+ uint16_t port;
+
+ if (str == NULL) return 0;
+
+ cur_tok = strsep (&str, ":");
+
+ if (cur_tok == NULL || *cur_tok == '\0') return 0;
+
+ if(cf->memcached_servers_num == MAX_MEMCACHED_SERVERS) {
+ yywarn ("yyparse: maximum number of memcached servers is reached %d", MAX_MEMCACHED_SERVERS);
+ }
+
+ mc = &cf->memcached_servers[cf->memcached_servers_num];
+ if (mc == NULL) return 0;
+ /* cur_tok - server name, str - server port */
+ if (str == NULL) {
+ port = DEFAULT_MEMCACHED_PORT;
+ }
+ else {
+ port = (uint16_t)strtoul (str, &err_str, 10);
+ if (*err_str != '\0') {
+ return 0;
+ }
+ }
+
+ if (!inet_aton (cur_tok, &mc->addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent == NULL) {
+ return 0;
+ }
+ else {
+ memcpy((char *)&mc->addr, hent->h_addr, sizeof(struct in_addr));
+ }
+ }
+ mc->port = port;
+ cf->memcached_servers_num++;
+ return 1;
+}
+
+int
+parse_bind_line (struct config_file *cf, char *str, char is_control)
+{
+ char *cur_tok, *err_str;
+ struct hostent *hent;
+ size_t s;
+
+ if (str == NULL) return 0;
+ cur_tok = strsep (&str, ":");
+
+ if (cur_tok[0] == '/' || cur_tok[0] == '.') {
+ if (is_control) {
+ cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ cf->control_family = AF_UNIX;
+ }
+ else {
+ cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ cf->bind_family = AF_UNIX;
+ }
+ return 1;
+
+ } else {
+ if (str == '\0') {
+ if (is_control) {
+ cf->control_port = DEFAULT_CONTROL_PORT;
+ }
+ else {
+ cf->bind_port = DEFAULT_BIND_PORT;
+ }
+ }
+ else {
+ if (is_control) {
+ cf->control_port = (uint16_t)strtoul (str, &err_str, 10);
+ }
+ else {
+ cf->bind_port = (uint16_t)strtoul (str, &err_str, 10);
+ }
+ if (*err_str != '\0') {
+ return 0;
+ }
+ }
+
+ if (is_control) {
+ if (!inet_aton (cur_tok, &cf->control_addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent == NULL) {
+ return 0;
+ }
+ else {
+ cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr));
+ s = strlen (cur_tok) + 1;
+ }
+ }
+
+ cf->control_family = AF_INET;
+ }
+ else {
+ if (!inet_aton (cur_tok, &cf->bind_addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent == NULL) {
+ return 0;
+ }
+ else {
+ cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr));
+ s = strlen (cur_tok) + 1;
+ }
+ }
+
+ cf->bind_family = AF_INET;
+ }
+
+ return 1;
+ }
+
+ return 0;
+}
+
+void
+init_defaults (struct config_file *cfg)
+{
+ cfg->memcached_error_time = DEFAULT_UPSTREAM_ERROR_TIME;
+ cfg->memcached_dead_time = DEFAULT_UPSTREAM_DEAD_TIME;
+ cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS;
+ cfg->memcached_protocol = TCP_TEXT;
+
+ cfg->workers_number = DEFAULT_WORKERS_NUM;
+ cfg->modules_opts = g_hash_table_new (g_str_hash, g_str_equal);
+ cfg->variables = g_hash_table_new (g_str_hash, g_str_equal);
+ cfg->metrics = g_hash_table_new (g_str_hash, g_str_equal);
+ cfg->factors = g_hash_table_new (g_str_hash, g_str_equal);
+ cfg->c_modules = g_hash_table_new (g_str_hash, g_str_equal);
+ cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal);
+
+ LIST_INIT (&cfg->perl_modules);
+}
+
+void
+free_config (struct config_file *cfg)
+{
+ g_hash_table_remove_all (cfg->modules_opts);
+ g_hash_table_unref (cfg->modules_opts);
+ g_hash_table_remove_all (cfg->variables);
+ g_hash_table_unref (cfg->variables);
+ g_hash_table_remove_all (cfg->metrics);
+ g_hash_table_unref (cfg->metrics);
+ g_hash_table_remove_all (cfg->factors);
+ g_hash_table_unref (cfg->factors);
+ g_hash_table_remove_all (cfg->c_modules);
+ g_hash_table_unref (cfg->c_modules);
+ g_hash_table_remove_all (cfg->composite_symbols);
+ g_hash_table_unref (cfg->composite_symbols);
+ memory_pool_delete (cfg->cfg_pool);
+}
+
+char*
+get_module_opt (struct config_file *cfg, char *module_name, char *opt_name)
+{
+ LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+ struct module_opt *cur;
+
+ cur_module_opt = g_hash_table_lookup (cfg->modules_opts, module_name);
+ if (cur_module_opt == NULL) {
+ return NULL;
+ }
+
+ LIST_FOREACH (cur, cur_module_opt, next) {
+ if (strcmp (cur->param, opt_name) == 0) {
+ return cur->value;
+ }
+ }
+
+ return NULL;
+}
+
+size_t
+parse_limit (const char *limit)
+{
+ size_t result = 0;
+ char *err_str;
+
+ if (!limit || *limit == '\0') return 0;
+
+ result = strtoul (limit, &err_str, 10);
+
+ if (*err_str != '\0') {
+ /* Megabytes */
+ if (*err_str == 'm' || *err_str == 'M') {
+ result *= 1048576L;
+ }
+ /* Kilobytes */
+ else if (*err_str == 'k' || *err_str == 'K') {
+ result *= 1024;
+ }
+ /* Gigabytes */
+ else if (*err_str == 'g' || *err_str == 'G') {
+ result *= 1073741824L;
+ }
+ }
+
+ return result;
+}
+
+unsigned int
+parse_seconds (const char *t)
+{
+ unsigned int result = 0;
+ char *err_str;
+
+ if (!t || *t == '\0') return 0;
+
+ result = strtoul (t, &err_str, 10);
+
+ if (*err_str != '\0') {
+ /* Seconds */
+ if (*err_str == 's' || *err_str == 'S') {
+ result *= 1000;
+ }
+ }
+
+ return result;
+}
+
+char
+parse_flag (const char *str)
+{
+ if (!str || !*str) return -1;
+
+ if ((*str == 'Y' || *str == 'y') && *(str + 1) == '\0') {
+ return 1;
+ }
+
+ if ((*str == 'Y' || *str == 'y') &&
+ (*(str + 1) == 'E' || *(str + 1) == 'e') &&
+ (*(str + 2) == 'S' || *(str + 2) == 's') &&
+ *(str + 3) == '\0') {
+ return 1;
+ }
+
+ if ((*str == 'N' || *str == 'n') && *(str + 1) == '\0') {
+ return 0;
+ }
+
+ if ((*str == 'N' || *str == 'n') &&
+ (*(str + 1) == 'O' || *(str + 1) == 'o') &&
+ *(str + 2) == '\0') {
+ return 0;
+ }
+
+ return -1;
+}
+
+/*
+ * Try to substitute all variables in given string
+ * Return: newly allocated string with substituted variables (original string may be freed if variables are found)
+ */
+char *
+substitute_variable (struct config_file *cfg, char *str, u_char recursive)
+{
+ char *var, *new, *v_begin, *v_end;
+ size_t len;
+
+ while ((v_begin = strstr (str, "${")) != NULL) {
+ len = strlen (str);
+ *v_begin = '\0';
+ v_begin += 2;
+ if ((v_end = strstr (v_begin, "}")) == NULL) {
+ /* Not a variable, skip */
+ continue;
+ }
+ *v_end = '\0';
+ var = g_hash_table_lookup (cfg->variables, v_begin);
+ if (var == NULL) {
+ yywarn ("substitute_variable: variable %s is not defined", v_begin);
+ /* Substitute unknown variables with empty string */
+ var = "";
+ }
+ else if (recursive) {
+ var = substitute_variable (cfg, var, recursive);
+ }
+ /* Allocate new string */
+ new = memory_pool_alloc (cfg->cfg_pool, len - strlen (v_begin) + strlen (var) + 1);
+
+ snprintf (new, len - strlen (v_begin) + strlen (var) + 1, "%s%s%s",
+ str, var, v_end + 1);
+ str = new;
+ }
+
+ return str;
+}
+
+static void
+substitute_module_variables (gpointer key, gpointer value, gpointer data)
+{
+ struct config_file *cfg = (struct config_file *)data;
+ LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value;
+ struct module_opt *cur, *tmp;
+
+ LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) {
+ if (cur->value) {
+ cur->value = substitute_variable (cfg, cur->value, 0);
+ }
+ }
+}
+
+static void
+substitute_all_variables (gpointer key, gpointer value, gpointer data)
+{
+ struct config_file *cfg = (struct config_file *)data;
+ char *var;
+
+ var = value;
+ /* Do recursive substitution */
+ var = substitute_variable (cfg, var, 1);
+}
+
+static void
+parse_filters_str (struct config_file *cfg, const char *str, enum script_type type)
+{
+ gchar **strvec, **p;
+ struct filter *cur;
+ int i;
+
+ if (str == NULL) {
+ return;
+ }
+
+ strvec = g_strsplit (str, ",", 0);
+ if (strvec == NULL) {
+ return;
+ }
+
+ p = strvec;
+ while (*p++) {
+ cur = NULL;
+ /* Search modules from known C modules */
+ for (i = 0; i < MODULES_NUM; i++) {
+ if (strcasecmp (modules[i].name, *p) == 0) {
+ cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct filter));
+ cur->type = C_FILTER;
+ switch (type) {
+ case SCRIPT_HEADER:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->header_filters, cur, next);
+ break;
+ case SCRIPT_MIME:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->mime_filters, cur, next);
+ break;
+ case SCRIPT_MESSAGE:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->message_filters, cur, next);
+ break;
+ case SCRIPT_URL:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->url_filters, cur, next);
+ break;
+ }
+ break;
+ }
+ }
+ if (cur != NULL) {
+ /* Go to next iteration */
+ continue;
+ }
+ cur = memory_pool_alloc (cfg->cfg_pool, sizeof (struct filter));
+ cur->type = PERL_FILTER;
+ switch (type) {
+ case SCRIPT_HEADER:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->header_filters, cur, next);
+ break;
+ case SCRIPT_MIME:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->mime_filters, cur, next);
+ break;
+ case SCRIPT_MESSAGE:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->message_filters, cur, next);
+ break;
+ case SCRIPT_URL:
+ cur->func_name = memory_pool_strdup (cfg->cfg_pool, *p);
+ LIST_INSERT_HEAD (&cfg->url_filters, cur, next);
+ break;
+ }
+ }
+
+ g_strfreev (strvec);
+}
+
+/*
+ * Substitute all variables in strings
+ */
+void
+post_load_config (struct config_file *cfg)
+{
+ g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg);
+ g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg);
+ parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER);
+ parse_filters_str (cfg, cfg->mime_filters_str, SCRIPT_MIME);
+ parse_filters_str (cfg, cfg->message_filters_str, SCRIPT_MESSAGE);
+ parse_filters_str (cfg, cfg->url_filters_str, SCRIPT_URL);
+}
+
+/*
+ * Rspamd regexp utility functions
+ */
+struct rspamd_regexp*
+parse_regexp (memory_pool_t *pool, char *line)
+{
+ char *begin, *end, *p;
+ struct rspamd_regexp *result;
+ int regexp_flags = 0;
+ enum rspamd_regexp_type type = REGEXP_NONE;
+ GError *err = NULL;
+
+ result = memory_pool_alloc0 (pool, sizeof (struct rspamd_regexp));
+ /* First try to find header name */
+ begin = strchr (line, '=');
+ if (begin != NULL) {
+ *begin = '\0';
+ result->header = memory_pool_strdup (pool, line);
+ result->type = REGEXP_HEADER;
+ *begin = '=';
+ line = begin;
+ }
+ /* Find begin of regexp */
+ while (*line != '/') {
+ line ++;
+ }
+ if (*line != '\0') {
+ begin = line + 1;
+ }
+ else if (result->header == NULL) {
+ /* Assume that line without // is just a header name */
+ result->header = memory_pool_strdup (pool, line);
+ result->type = REGEXP_HEADER;
+ return result;
+ }
+ else {
+ /* We got header name earlier but have not found // expression, so it is invalid regexp */
+ return NULL;
+ }
+ /* Find end */
+ end = begin;
+ while (*end && (*end != '/' || *(end - 1) == '\\')) {
+ end ++;
+ }
+ if (end == begin || *end != '/') {
+ return NULL;
+ }
+ /* Parse flags */
+ p = end + 1;
+ while (p != NULL) {
+ switch (*p) {
+ case 'i':
+ regexp_flags |= G_REGEX_CASELESS;
+ p ++;
+ break;
+ case 'm':
+ regexp_flags |= G_REGEX_MULTILINE;
+ p ++;
+ break;
+ case 's':
+ regexp_flags |= G_REGEX_DOTALL;
+ p ++;
+ break;
+ case 'x':
+ regexp_flags |= G_REGEX_EXTENDED;
+ p ++;
+ break;
+ case 'u':
+ regexp_flags |= G_REGEX_UNGREEDY;
+ p ++;
+ break;
+ case 'o':
+ regexp_flags |= G_REGEX_OPTIMIZE;
+ p ++;
+ break;
+ /* Type flags */
+ case 'H':
+ if (type != REGEXP_NONE) {
+ type = REGEXP_HEADER;
+ }
+ p ++;
+ break;
+ case 'M':
+ if (type != REGEXP_NONE) {
+ type = REGEXP_MESSAGE;
+ }
+ p ++;
+ break;
+ case 'P':
+ if (type != REGEXP_NONE) {
+ type = REGEXP_MIME;
+ }
+ p ++;
+ break;
+ case 'U':
+ if (type != REGEXP_NONE) {
+ type = REGEXP_URL;
+ }
+ p ++;
+ break;
+ /* Stop flags parsing */
+ default:
+ p = NULL;
+ break;
+ }
+ }
+
+ result = memory_pool_alloc (pool, sizeof (struct rspamd_regexp));
+ result->type = type;
+ *end = '\0';
+ result->regexp = g_regex_new (begin, regexp_flags, 0, &err);
+ result->regexp_text = memory_pool_strdup (pool, begin);
+ memory_pool_add_destructor (pool, (pool_destruct_func)g_regex_unref, (void *)result->regexp);
+ *end = '/';
+
+ return result;
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/controller.c b/src/controller.c
new file mode 100644
index 000000000..1efb8c35c
--- /dev/null
+++ b/src/controller.c
@@ -0,0 +1,349 @@
+#include <sys/stat.h>
+#include <sys/param.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+#include <signal.h>
+
+#include <netinet/in.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <glib.h>
+
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "modules.h"
+
+#define CRLF "\r\n"
+
+enum command_type {
+ COMMAND_PASSWORD,
+ COMMAND_QUIT,
+ COMMAND_RELOAD,
+ COMMAND_STAT,
+ COMMAND_SHUTDOWN,
+ COMMAND_UPTIME,
+};
+
+struct controller_command {
+ char *command;
+ int privilleged;
+ enum command_type type;
+};
+
+static struct controller_command commands[] = {
+ {"password", 0, COMMAND_PASSWORD},
+ {"quit", 0, COMMAND_QUIT},
+ {"reload", 1, COMMAND_RELOAD},
+ {"stat", 0, COMMAND_STAT},
+ {"shutdown", 1, COMMAND_SHUTDOWN},
+ {"uptime", 0, COMMAND_UPTIME},
+};
+
+static GCompletion *comp;
+static time_t start_time;
+
+static
+void sig_handler (int signo)
+{
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ _exit (1);
+ break;
+ }
+}
+
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev);
+ event_del (&worker->bind_ev);
+ msg_info ("controller's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ return;
+}
+
+static gchar*
+completion_func (gpointer elem)
+{
+ struct controller_command *cmd = (struct controller_command *)elem;
+
+ return cmd->command;
+}
+
+static void
+free_session (struct controller_session *session)
+{
+ bufferevent_disable (session->bev, EV_READ | EV_WRITE);
+ memory_pool_delete (session->session_pool);
+ g_free (session);
+}
+
+static int
+check_auth (struct controller_command *cmd, struct controller_session *session)
+{
+ char out_buf[128];
+ int r;
+
+ if (cmd->privilleged && !session->authorized) {
+ r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ return 0;
+ }
+
+ return 1;
+}
+
+static void
+process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
+{
+ char out_buf[512], *arg;
+ int r = 0, days, hours, minutes;
+ time_t uptime;
+
+ switch (cmd->type) {
+ case COMMAND_PASSWORD:
+ arg = *cmd_args;
+ if (*arg == '\0') {
+ msg_debug ("process_command: empty password passed");
+ r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ return;
+ }
+ if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) {
+ session->authorized = 1;
+ r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ }
+ else {
+ session->authorized = 0;
+ r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ }
+ break;
+ case COMMAND_QUIT:
+ free_session (session);
+ break;
+ case COMMAND_RELOAD:
+ if (check_auth (cmd, session)) {
+ r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ kill (getppid (), SIGHUP);
+ }
+ break;
+ case COMMAND_STAT:
+ /* XXX need to implement stat */
+ if (check_auth (cmd, session)) {
+ r = snprintf (out_buf, sizeof (out_buf), "-- end of stats report" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ }
+ case COMMAND_SHUTDOWN:
+ if (check_auth (cmd, session)) {
+ r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
+ bufferevent_write (session->bev, out_buf, r);
+ kill (getppid (), SIGTERM);
+ }
+ break;
+ case COMMAND_UPTIME:
+ if (check_auth (cmd, session)) {
+ uptime = time (NULL) - start_time;
+ /* If uptime more than 2 hours, print as a number of days. */
+ if (uptime >= 2 * 3600) {
+ days = uptime / 86400;
+ hours = (uptime % 3600) / 60;
+ minutes = (uptime % 60) / 60;
+ r = snprintf (out_buf, sizeof (out_buf), "%dday%s %dhour%s %dminute%s" CRLF,
+ days, days > 1 ? "s" : " ",
+ hours, hours > 1 ? "s" : " ",
+ minutes, minutes > 1 ? "s" : " ");
+ }
+ /* If uptime is less than 1 minute print only seconds */
+ else if (uptime / 60 == 0) {
+ r = snprintf (out_buf, sizeof (out_buf), "%dsecond%s", uptime, uptime > 1 ? "s" : " ");
+ }
+ /* Else print the minutes and seconds. */
+ else {
+ hours = uptime / 3600;
+ minutes = (uptime % 60) / 60;
+ r = snprintf (out_buf, sizeof (out_buf), "%dhour%s %dminite%s %dsecond%s",
+ hours, hours > 1 ? "s" : " ",
+ minutes, minutes > 1 ? "s" : " ",
+ (int)uptime, uptime > 1 ? "s" : " ");
+ }
+
+ }
+ bufferevent_write (session->bev, out_buf, r);
+ break;
+ }
+}
+
+static void
+read_socket (struct bufferevent *bev, void *arg)
+{
+ struct controller_session *session = (struct controller_session *)arg;
+ int len, i;
+ char *s, **params, *cmd, out_buf[128];
+ GList *comp_list;
+
+ s = evbuffer_readline (EVBUFFER_INPUT (bev));
+ if (s != NULL && *s != 0) {
+ len = strlen (s);
+ /* Remove end of line characters from string */
+ if (s[len - 1] == '\n') {
+ if (s[len - 2] == '\r') {
+ s[len - 2] = 0;
+ }
+ s[len - 1] = 0;
+ }
+ params = g_strsplit (s, " ", -1);
+ len = g_strv_length (params);
+ if (len > 0) {
+ cmd = g_strstrip (params[0]);
+ comp_list = g_completion_complete (comp, cmd, NULL);
+ switch (g_list_length (comp_list)) {
+ case 1:
+ process_command ((struct controller_command *)comp_list->data, &params[1], session);
+ break;
+ case 0:
+ i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+ bufferevent_write (bev, out_buf, i);
+ break;
+ default:
+ i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+ bufferevent_write (bev, out_buf, i);
+ break;
+ }
+ }
+ g_strfreev (params);
+ }
+ if (s != NULL) {
+ free (s);
+ }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+ char buf[1024], hostbuf[256];
+
+ gethostname (hostbuf, sizeof (hostbuf - 1));
+ hostbuf[sizeof (hostbuf) - 1] = '\0';
+ snprintf (buf, sizeof (buf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
+ bufferevent_disable (bev, EV_WRITE);
+ bufferevent_enable (bev, EV_READ);
+}
+
+static void
+err_socket (struct bufferevent *bev, short what, void *arg)
+{
+ struct controller_session *session = (struct controller_session *)arg;
+ msg_info ("closing connection");
+ /* Free buffers */
+ free_session (session);
+}
+
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct controller_session *new_session;
+ socklen_t addrlen = sizeof(ss);
+ int nfd;
+
+ if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ return;
+ }
+ if (event_make_socket_nonblocking(fd) < 0) {
+ return;
+ }
+
+ new_session = g_malloc (sizeof (struct controller_session));
+ if (new_session == NULL) {
+ msg_err ("accept_socket: cannot allocate memory for task, %m");
+ return;
+ }
+ bzero (new_session, sizeof (struct controller_session));
+ new_session->worker = worker;
+ new_session->sock = nfd;
+ new_session->cfg = worker->srv->cfg;
+ new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
+ memory_pool_add_destructor (new_session->session_pool, (pool_destruct_func)bufferevent_free, new_session->bev);
+ worker->srv->stat->control_connections_count ++;
+
+ /* Read event */
+ new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session);
+ bufferevent_enable (new_session->bev, EV_WRITE);
+}
+
+void
+start_controller (struct rspamd_worker *worker)
+{
+ struct sigaction signals;
+ int listen_sock, i;
+ struct sockaddr_un *un_addr;
+ GList *comp_list = NULL;
+
+ worker->srv->pid = getpid ();
+ worker->srv->type = TYPE_CONTROLLER;
+ event_init ();
+ g_mime_init (0);
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ signal_add (&worker->sig_ev, NULL);
+
+ if (worker->srv->cfg->control_family == AF_INET) {
+ if ((listen_sock = make_socket (worker->srv->cfg->control_host, worker->srv->cfg->control_port)) == -1) {
+ msg_err ("start_controller: cannot create tcp listen socket. %m");
+ exit(-errno);
+ }
+ }
+ else {
+ un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
+ if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) {
+ msg_err ("start_controller: cannot create unix listen socket. %m");
+ exit(-errno);
+ }
+ }
+
+ start_time = time (NULL);
+
+ /* Init command completion */
+ for (i = 0; i < sizeof (commands) / sizeof (commands[0]) - 1; i ++) {
+ comp_list = g_list_prepend (comp_list, &commands[i]);
+ }
+ comp = g_completion_new (completion_func);
+ g_completion_add_items (comp, comp_list);
+ /* Accept event */
+ event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
+ /* Send SIGUSR2 to parent */
+ kill (getppid (), SIGUSR2);
+
+ event_loop (0);
+}
+
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/filter.c b/src/filter.c
new file mode 100644
index 000000000..d1edbb930
--- /dev/null
+++ b/src/filter.c
@@ -0,0 +1,336 @@
+#include <sys/types.h>
+#include <glib.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include "mem_pool.h"
+#include "filter.h"
+#include "main.h"
+#include "cfg_file.h"
+#include "perl.h"
+
+void
+insert_result (struct worker_task *task, const char *metric_name, const char *symbol, u_char flag)
+{
+ struct metric *metric;
+ struct metric_result *metric_res;
+
+ metric = g_hash_table_lookup (task->worker->srv->cfg->metrics, metric_name);
+ if (metric == NULL) {
+ return;
+ }
+
+ metric_res = g_hash_table_lookup (task->results, metric_name);
+
+ if (metric_res == NULL) {
+ /* Create new metric chain */
+ metric_res = memory_pool_alloc (task->task_pool, sizeof (struct metric_result));
+ metric_res->symbols = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_destroy, metric_res->symbols);
+ metric_res->metric = metric;
+ g_hash_table_insert (task->results, (gpointer)metric_name, metric_res);
+ }
+
+ g_hash_table_insert (metric_res->symbols, (gpointer)symbol, GSIZE_TO_POINTER (flag));
+}
+
+/*
+ * Default consolidation function based on factors in config file
+ */
+double
+factor_consolidation_func (struct worker_task *task, const char *metric_name)
+{
+ struct metric_result *metric_res;
+ double *factor;
+ double res = 0.;
+ GList *symbols = NULL, *cur;
+
+ metric_res = g_hash_table_lookup (task->results, metric_name);
+ if (metric_res == NULL) {
+ return res;
+ }
+
+ symbols = g_hash_table_get_keys (metric_res->symbols);
+ cur = g_list_first (symbols);
+ while (cur) {
+ factor = g_hash_table_lookup (task->worker->srv->cfg->factors, cur->data);
+ if (factor == NULL) {
+ /* Default multiplier is 1 */
+ res ++;
+ }
+ else {
+ res += *factor;
+ }
+ cur = g_list_next (cur);
+ }
+
+ g_list_free (symbols);
+
+ return res;
+}
+
+/*
+ * Call perl or C module function for specified part of message
+ */
+static void
+call_filter_by_name (struct worker_task *task, const char *name, enum script_type sc_type, enum filter_type filt_type)
+{
+ struct module_ctx *c_module;
+
+ switch (filt_type) {
+ case C_FILTER:
+ c_module = g_hash_table_lookup (task->worker->srv->cfg->c_modules, name);
+ if (c_module) {
+ switch (filt_type) {
+ case SCRIPT_HEADER:
+ c_module->header_filter (task);
+ break;
+ case SCRIPT_MIME:
+ c_module->mime_filter (task);
+ break;
+ case SCRIPT_URL:
+ c_module->url_filter (task);
+ break;
+ case SCRIPT_MESSAGE:
+ c_module->message_filter (task);
+ break;
+ }
+ }
+ break;
+ case PERL_FILTER:
+ switch (filt_type) {
+ case SCRIPT_HEADER:
+ perl_call_header_filter (name, task);
+ break;
+ case SCRIPT_MIME:
+ perl_call_mime_filter (name, task);
+ break;
+ case SCRIPT_URL:
+ perl_call_url_filter (name, task);
+ break;
+ case SCRIPT_MESSAGE:
+ perl_call_message_filter (name, task);
+ break;
+ }
+ break;
+ }
+}
+
+static void
+metric_process_callback (gpointer key, gpointer value, void *data)
+{
+ struct worker_task *task = (struct worker_task *)data;
+ struct metric_result *metric_res = (struct metric_result *)value;
+
+ if (metric_res->metric->func != NULL) {
+ metric_res->score = metric_res->metric->func (task, metric_res->metric->name);
+ }
+ else {
+ metric_res->score = factor_consolidation_func (task, metric_res->metric->name);
+ }
+}
+
+static int
+continue_process_filters (struct worker_task *task)
+{
+ struct filter *cur = task->save.entry;
+
+ cur = LIST_NEXT (cur, next);
+ /* Note: no breaks in this case! */
+ switch (task->save.type) {
+ case SCRIPT_HEADER:
+ while (cur) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_HEADER;
+ return 0;
+ }
+ cur = LIST_NEXT (cur, next);
+ }
+ /* Process mime filters */
+ cur = LIST_FIRST (&task->worker->srv->cfg->mime_filters);
+ case SCRIPT_MIME:
+ while (cur) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_MIME;
+ return 0;
+ }
+ cur = LIST_NEXT (cur, next);
+ }
+ /* Process url filters */
+ cur = LIST_FIRST (&task->worker->srv->cfg->url_filters);
+ case SCRIPT_URL:
+ while (cur) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_URL;
+ return 0;
+ }
+ cur = LIST_NEXT (cur, next);
+ }
+ /* Process message filters */
+ cur = LIST_FIRST (&task->worker->srv->cfg->message_filters);
+ case SCRIPT_MESSAGE:
+ while (cur) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_MESSAGE;
+ return 0;
+ }
+ cur = LIST_NEXT (cur, next);
+ }
+ /* All done */
+ return 1;
+ }
+}
+
+int
+process_filters (struct worker_task *task)
+{
+ struct filter *cur;
+
+ if (task->save.saved) {
+ task->save.saved = 0;
+ return continue_process_filters (task);
+ }
+
+ /* Process filters in order that they are listed in config file */
+ LIST_FOREACH (cur, &task->worker->srv->cfg->header_filters, next) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_HEADER);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_HEADER;
+ return 0;
+ }
+ }
+
+ LIST_FOREACH (cur, &task->worker->srv->cfg->mime_filters, next) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MIME);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_MIME;
+ return 0;
+ }
+ }
+
+ LIST_FOREACH (cur, &task->worker->srv->cfg->url_filters, next) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_URL);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_URL;
+ return 0;
+ }
+ }
+
+ LIST_FOREACH (cur, &task->worker->srv->cfg->message_filters, next) {
+ call_filter_by_name (task, cur->func_name, cur->type, SCRIPT_MESSAGE);
+ if (task->save.saved) {
+ task->save.entry = cur;
+ task->save.type = SCRIPT_MESSAGE;
+ return 0;
+ }
+ }
+
+ /* Process all metrics */
+ g_hash_table_foreach (task->results, metric_process_callback, task);
+ return 1;
+}
+
+struct composites_data {
+ struct worker_task *task;
+ struct metric_result *metric_res;
+};
+
+static void
+composites_foreach_callback (gpointer key, gpointer value, void *data)
+{
+ struct composites_data *cd = (struct composites_data *)data;
+ struct expression *expr = (struct expression *)value;
+ GQueue *stack;
+ GList *symbols = NULL, *s;
+ gsize cur, op1, op2;
+
+ stack = g_queue_new ();
+
+ while (expr) {
+ if (expr->type == EXPR_OPERAND) {
+ /* Find corresponding symbol */
+ if (g_hash_table_lookup (cd->metric_res->symbols, expr->content.operand) == NULL) {
+ cur = 0;
+ }
+ else {
+ cur = 1;
+ symbols = g_list_append (symbols, expr->content.operand);
+ }
+ g_queue_push_head (stack, GSIZE_TO_POINTER (cur));
+ }
+ else {
+ if (g_queue_is_empty (stack)) {
+ /* Queue has no operands for operation, exiting */
+ g_list_free (symbols);
+ g_queue_free (stack);
+ return;
+ }
+ switch (expr->content.operation) {
+ case '!':
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ op1 = !op1;
+ g_queue_push_head (stack, GSIZE_TO_POINTER (op1));
+ break;
+ case '&':
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ g_queue_push_head (stack, GSIZE_TO_POINTER (op1 && op2));
+ case '|':
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ g_queue_push_head (stack, GSIZE_TO_POINTER (op1 || op2));
+ default:
+ expr = expr->next;
+ continue;
+ }
+ }
+ expr = expr->next;
+ }
+ if (!g_queue_is_empty (stack)) {
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ if (op1) {
+ /* Remove all symbols that are in composite symbol */
+ s = g_list_first (symbols);
+ while (s) {
+ g_hash_table_remove (cd->metric_res->symbols, s->data);
+ s = g_list_next (s);
+ }
+ /* Add new symbol */
+ g_hash_table_insert (cd->metric_res->symbols, key, GSIZE_TO_POINTER (op1));
+ }
+ }
+
+ g_queue_free (stack);
+ g_list_free (symbols);
+
+ return;
+}
+
+static void
+composites_metric_callback (gpointer key, gpointer value, void *data)
+{
+ struct worker_task *task = (struct worker_task *)data;
+ struct composites_data *cd = memory_pool_alloc (task->task_pool, sizeof (struct composites_data));
+ struct metric_result *metric_res = (struct metric_result *)value;
+
+ cd->task = task;
+ cd->metric_res = (struct metric_result *)metric_res;
+
+ g_hash_table_foreach (task->cfg->composite_symbols, composites_foreach_callback, cd);
+}
+
+void make_composites (struct worker_task *task)
+{
+ g_hash_table_foreach (task->results, composites_metric_callback, task);
+}
diff --git a/src/filter.h b/src/filter.h
new file mode 100644
index 000000000..ce8be7ecd
--- /dev/null
+++ b/src/filter.h
@@ -0,0 +1,43 @@
+#ifndef RSPAMD_FILTER_H
+#define RSPAMD_FILTER_H
+
+#include <sys/types.h>
+#ifndef HAVE_OWN_QUEUE_H
+#include <sys/queue.h>
+#else
+#include "queue.h"
+#endif
+#include <glib.h>
+
+struct worker_task;
+
+typedef double (*metric_cons_func)(struct worker_task *task, const char *metric_name);
+typedef void (*filter_func)(struct worker_task *task);
+
+enum filter_type { C_FILTER, PERL_FILTER };
+
+struct filter {
+ char *func_name;
+ enum filter_type type;
+ LIST_ENTRY (filter) next;
+};
+
+struct metric {
+ char *name;
+ char *func_name;
+ metric_cons_func func;
+ double required_score;
+};
+
+struct metric_result {
+ struct metric *metric;
+ double score;
+ GHashTable *symbols;
+};
+
+int process_filters (struct worker_task *task);
+void insert_result (struct worker_task *task, const char *metric_name, const char *symbol, u_char flag);
+void make_composites (struct worker_task *task);
+double factor_consolidation_func (struct worker_task *task, const char *metric_name);
+
+#endif
diff --git a/src/fstring.c b/src/fstring.c
new file mode 100644
index 000000000..ad0be7d78
--- /dev/null
+++ b/src/fstring.c
@@ -0,0 +1,234 @@
+#include <stdlib.h>
+
+#include "fstring.h"
+
+/*
+ * Search first occurence of character in string
+ */
+ssize_t
+fstrchr (f_str_t *src, char c)
+{
+ register ssize_t cur = 0;
+
+ while (cur < src->len) {
+ if (*(src->begin + cur) == c) {
+ return cur;
+ }
+ cur ++;
+ }
+
+ return -1;
+}
+
+/*
+ * Search last occurence of character in string
+ */
+ssize_t
+fstrrchr (f_str_t *src, char c)
+{
+ register ssize_t cur = src->len;
+
+ while (cur > 0) {
+ if (*(src->begin + cur) == c) {
+ return cur;
+ }
+ cur --;
+ }
+
+ return -1;
+}
+
+/*
+ * Search for pattern in orig
+ */
+ssize_t
+fstrstr (f_str_t *orig, f_str_t *pattern)
+{
+ register ssize_t cur = 0, pcur = 0;
+
+ if (pattern->len > orig->len) {
+ return -1;
+ }
+
+ while (cur < orig->len) {
+ if (*(orig->begin + cur) == *pattern->begin) {
+ while (cur < orig->len && pcur < pattern->len) {
+ if (*(orig->begin + cur) != *(pattern->begin + pcur)) {
+ pcur = 0;
+ break;
+ }
+ cur ++;
+ pcur ++;
+ }
+ return cur - pattern->len;
+ }
+ cur ++;
+ }
+
+ return -1;
+
+}
+
+/*
+ * Split string by tokens
+ * word contains parsed word
+ *
+ * Return: -1 - no new words can be extracted
+ * 1 - word was extracted and there are more words
+ * 0 - last word extracted
+ */
+int
+fstrtok (f_str_t *text, const char *sep, f_tok_t *state)
+{
+ register size_t cur;
+ const char *csep = sep;
+
+ if (state->pos >= text->len) {
+ return -1;
+ }
+
+ cur = state->pos;
+
+ while (cur < text->len) {
+ while (*csep) {
+ if (*(text->begin + cur) == *csep) {
+ state->word.begin = (text->begin + state->pos);
+ state->word.len = cur - state->pos;
+ state->pos = cur + 1;
+ return 1;
+ }
+ csep ++;
+ }
+ csep = sep;
+ cur ++;
+ }
+
+ /* Last word */
+ state->word.begin = (text->begin + state->pos);
+ state->word.len = cur - state->pos;
+ state->pos = cur;
+
+ return 0;
+}
+
+/*
+ * Copy one string into other
+ */
+size_t
+fstrcpy (f_str_t *dest, f_str_t *src)
+{
+ register size_t cur = 0;
+
+ if (dest->size < src->len) {
+ return 0;
+ }
+
+ while (cur < src->len && cur < dest->size) {
+ *(dest->begin + cur) = *(src->begin + cur);
+ cur ++;
+ }
+
+ return cur;
+}
+
+/*
+ * Concatenate two strings
+ */
+size_t
+fstrcat (f_str_t *dest, f_str_t *src)
+{
+ register size_t cur = src->len;
+
+ if (dest->size < src->len + dest->len) {
+ return 0;
+ }
+
+ while (cur < src->len && cur < dest->size) {
+ *(dest->begin + cur) = *(src->begin + cur);
+ cur ++;
+ }
+
+ dest->len += src->len;
+
+ return cur;
+
+}
+
+/*
+ * Push one character to fstr
+ */
+int
+fstrpush (f_str_t *dest, char c)
+{
+ if (dest->size < dest->len) {
+ /* Need to reallocate string */
+ return 0;
+ }
+
+ *(dest->begin + dest->len) = c;
+ dest->len ++;
+ return 1;
+}
+
+/*
+ * Allocate memory for f_str_t
+ */
+f_str_t*
+fstralloc (memory_pool_t *pool, size_t len)
+{
+ f_str_t *res = memory_pool_alloc (pool, sizeof (f_str_t));
+
+ if (res == NULL) {
+ return NULL;
+ }
+ res->begin = memory_pool_alloc (pool, len);
+ if (res->begin == NULL) {
+ free (res);
+ return NULL;
+ }
+
+ res->size = len;
+ return res;
+}
+
+/*
+ * Truncate string to its len
+ */
+f_str_t*
+fstrtruncate (memory_pool_t *pool, f_str_t *orig)
+{
+ f_str_t *res;
+
+ if (orig == NULL || orig->len == 0 || orig->size <= orig->len) {
+ return orig;
+ }
+
+ res = fstralloc (pool, orig->len);
+ if (res == NULL) {
+ return NULL;
+ }
+ fstrcpy (res, orig);
+
+ return res;
+}
+
+/*
+ * Enlarge string to new size
+ */
+f_str_t*
+fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen)
+{
+ f_str_t *res;
+
+ if (orig == NULL || orig->len == 0 || orig->size >= newlen) {
+ return orig;
+ }
+
+ res = fstralloc (pool, newlen);
+ if (res == NULL) {
+ return NULL;
+ }
+ fstrcpy (res, orig);
+
+ return res;
+}
diff --git a/src/fstring.h b/src/fstring.h
new file mode 100644
index 000000000..c3087b2c5
--- /dev/null
+++ b/src/fstring.h
@@ -0,0 +1,86 @@
+/*
+ * Functions for handling with fixed size strings
+ */
+
+#ifndef FSTRING_H
+#define FSTRING_H
+
+#include <sys/types.h>
+#include "mem_pool.h"
+
+#define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin
+
+typedef struct f_str_s {
+ char *begin;
+ size_t len;
+ size_t size;
+} f_str_t;
+
+typedef struct f_str_buf_s {
+ f_str_t *buf;
+ char *pos;
+ size_t free;
+} f_str_buf_t;
+
+typedef struct f_tok_s {
+ f_str_t word;
+ size_t pos;
+} f_tok_t;
+
+/*
+ * Search first occurence of character in string
+ */
+ssize_t fstrchr (f_str_t *src, char c);
+
+/*
+ * Search last occurence of character in string
+ */
+ssize_t fstrrchr (f_str_t *src, char c);
+
+/*
+ * Search for pattern in orig
+ */
+ssize_t fstrstr (f_str_t *orig, f_str_t *pattern);
+
+/*
+ * Split string by tokens
+ * word contains parsed word
+ */
+int fstrtok (f_str_t *text, const char *sep, f_tok_t *state);
+
+/*
+ * Copy one string into other
+ */
+size_t fstrcpy (f_str_t *dest, f_str_t *src);
+
+/*
+ * Concatenate two strings
+ */
+size_t fstrcat (f_str_t *dest, f_str_t *src);
+
+/*
+ * Push one character to fstr
+ */
+int fstrpush (f_str_t *dest, char c);
+
+/*
+ * Allocate memory for f_str_t
+ */
+f_str_t* fstralloc (memory_pool_t *pool, size_t len);
+
+/*
+ * Truncate string to its len
+ */
+f_str_t* fstrtruncate (memory_pool_t *pool, f_str_t *orig);
+
+/*
+ * Enlarge string to new size
+ */
+f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen);
+
+/*
+ * Return specified character
+ */
+#define fstridx(str, pos) *((str)->begin + (pos))
+
+#endif
diff --git a/src/main.c b/src/main.c
new file mode 100644
index 000000000..d2dec7e47
--- /dev/null
+++ b/src/main.c
@@ -0,0 +1,427 @@
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <sys/param.h>
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+#include <signal.h>
+#ifdef HAVE_LIBUTIL_H
+#include <libutil.h>
+#endif
+#include <syslog.h>
+
+#include <EXTERN.h> /* from the Perl distribution */
+#include <perl.h> /* from the Perl distribution */
+
+#include "main.h"
+#include "cfg_file.h"
+#include "util.h"
+
+struct config_file *cfg;
+
+static void sig_handler (int );
+static struct rspamd_worker * fork_worker (struct rspamd_main *, int, int, enum process_type);
+
+sig_atomic_t do_restart;
+sig_atomic_t do_terminate;
+sig_atomic_t child_dead;
+sig_atomic_t child_ready;
+
+extern int yynerrs;
+extern FILE *yyin;
+extern void boot_DynaLoader (pTHX_ CV* cv);
+extern void boot_Socket (pTHX_ CV* cv);
+
+PerlInterpreter *perl_interpreter;
+/* XXX: remove this shit when it would be clear why perl need this line */
+PerlInterpreter *my_perl;
+
+static
+void sig_handler (int signo)
+{
+ switch (signo) {
+ case SIGHUP:
+ do_restart = 1;
+ do_reopen_log = 1;
+ break;
+ case SIGINT:
+ case SIGTERM:
+ do_terminate = 1;
+ break;
+ case SIGCHLD:
+ child_dead = 1;
+ break;
+ case SIGUSR2:
+ child_ready = 1;
+ break;
+ }
+}
+
+void
+xs_init(pTHX)
+{
+ dXSUB_SYS;
+ /* DynaLoader is a special case */
+ newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__);
+}
+
+static void
+init_filters (struct config_file *cfg)
+{
+ struct perl_module *module;
+
+ LIST_FOREACH (module, &cfg->perl_modules, next) {
+ if (module->path) {
+ require_pv (module->path);
+ }
+ }
+}
+
+static struct rspamd_worker *
+fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum process_type type)
+{
+ struct rspamd_worker *cur;
+ char *cfg_file;
+ FILE *f;
+ struct config_file *tmp_cfg;
+ /* Starting worker process */
+ cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
+ if (cur) {
+ /* Reconfig needed */
+ if (reconfig) {
+ tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
+ if (tmp_cfg) {
+ bzero (tmp_cfg, sizeof (struct config_file));
+ tmp_cfg->cfg_pool = memory_pool_new (32768);
+ cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
+ f = fopen (rspamd->cfg->cfg_name , "r");
+ if (f == NULL) {
+ msg_warn ("fork_worker: cannot open file: %s", rspamd->cfg->cfg_name );
+ }
+ else {
+ yyin = f;
+ yyrestart (yyin);
+
+ if (yyparse() != 0 || yynerrs > 0) {
+ msg_warn ("fork_worker: yyparse: cannot parse config file, %d errors", yynerrs);
+ fclose (f);
+ }
+ else {
+ free_config (rspamd->cfg);
+ g_free (rspamd->cfg);
+ rspamd->cfg = tmp_cfg;
+ rspamd->cfg->cfg_name = cfg_file;
+ }
+ }
+ }
+ }
+ bzero (cur, sizeof (struct rspamd_worker));
+ TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
+ cur->srv = rspamd;
+ cur->pid = fork();
+ switch (cur->pid) {
+ case 0:
+ /* TODO: add worker code */
+ switch (type) {
+ case TYPE_CONTROLLER:
+ setproctitle ("controller process");
+ pidfile_close (rspamd->pfh);
+ msg_info ("fork_worker: starting controller process %d", getpid ());
+ cur->type = TYPE_CONTROLLER;
+ start_controller (cur);
+ break;
+ case TYPE_WORKER:
+ default:
+ setproctitle ("worker process");
+ pidfile_close (rspamd->pfh);
+ msg_info ("fork_worker: starting worker process %d", getpid ());
+ cur->type = TYPE_WORKER;
+ start_worker (cur, listen_sock);
+ break;
+ }
+ break;
+ case -1:
+ msg_err ("fork_worker: cannot fork main process. %m");
+ pidfile_remove (rspamd->pfh);
+ exit (-errno);
+ break;
+ }
+ }
+
+ return cur;
+}
+
+int
+main (int argc, char **argv)
+{
+ struct rspamd_main *rspamd;
+ struct module_ctx *cur_module = NULL;
+ int res = 0, i, listen_sock;
+ struct sigaction signals;
+ struct rspamd_worker *cur, *cur_tmp, *active_worker;
+ struct sockaddr_un *un_addr;
+ FILE *f;
+ pid_t wrk;
+ char *args[] = { "", "-e", "0", NULL };
+
+ rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main));
+ bzero (rspamd, sizeof (struct rspamd_main));
+ rspamd->server_pool = memory_pool_new (memory_pool_get_size ());
+ cfg = (struct config_file *)g_malloc (sizeof (struct config_file));
+ rspamd->cfg = cfg;
+ if (!rspamd || !rspamd->cfg) {
+ fprintf(stderr, "Cannot allocate memory\n");
+ exit(-errno);
+ }
+
+ do_terminate = 0;
+ do_restart = 0;
+ child_dead = 0;
+ child_ready = 0;
+ do_reopen_log = 0;
+ active_worker = NULL;
+
+ rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat));
+ bzero (rspamd->stat, sizeof (struct rspamd_stat));
+
+ bzero (rspamd->cfg, sizeof (struct config_file));
+ rspamd->cfg->cfg_pool = memory_pool_new (memory_pool_get_size ());
+ init_defaults (rspamd->cfg);
+
+ bzero (&signals, sizeof (struct sigaction));
+
+ rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, FIXED_CONFIG_FILE);
+ read_cmd_line (argc, argv, rspamd->cfg);
+
+ msg_warn ("(main) starting...");
+
+ #ifndef HAVE_SETPROCTITLE
+ init_title (argc, argv, environ);
+ #endif
+
+ f = fopen (rspamd->cfg->cfg_name , "r");
+ if (f == NULL) {
+ msg_warn ("cannot open file: %s", rspamd->cfg->cfg_name );
+ return EBADF;
+ }
+ yyin = f;
+
+ if (yyparse() != 0 || yynerrs > 0) {
+ msg_warn ("yyparse: cannot parse config file, %d errors", yynerrs);
+ return EBADF;
+ }
+
+ fclose (f);
+ rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, rspamd->cfg->cfg_name );
+
+ /* Strictly set temp dir */
+ if (!rspamd->cfg->temp_dir) {
+ msg_warn ("tempdir is not set, trying to use $TMPDIR");
+ rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR"));
+
+ if (!rspamd->cfg->temp_dir) {
+ msg_warn ("$TMPDIR is empty too, using /tmp as default");
+ rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp");
+ }
+ }
+
+ switch (cfg->log_type) {
+ case RSPAMD_LOG_CONSOLE:
+ if (!rspamd->cfg->no_fork) {
+ fprintf (stderr, "Cannot log to console while daemonized, disable logging");
+ cfg->log_fd = -1;
+ }
+ else {
+ cfg->log_fd = 2;
+ }
+ g_log_set_default_handler (file_log_function, cfg);
+ break;
+ case RSPAMD_LOG_FILE:
+ if (cfg->log_file == NULL || open_log (cfg) == -1) {
+ fprintf (stderr, "Fatal error, cannot open logfile, exiting");
+ exit (EXIT_FAILURE);
+ }
+ g_log_set_default_handler (file_log_function, cfg);
+ break;
+ case RSPAMD_LOG_SYSLOG:
+ if (open_log (cfg) == -1) {
+ fprintf (stderr, "Fatal error, cannot open syslog facility, exiting");
+ exit (EXIT_FAILURE);
+ }
+ g_log_set_default_handler (syslog_log_function, cfg);
+ break;
+ }
+
+ if (!rspamd->cfg->no_fork && daemon (1, 1) == -1) {
+ fprintf (stderr, "Cannot daemonize\n");
+ exit (-errno);
+ }
+
+ if (write_pid (rspamd) == -1) {
+ msg_err ("main: cannot write pid file %s", rspamd->cfg->pid_file);
+ exit (-errno);
+ }
+
+ /* Init C modules */
+ for (i = 0; i < MODULES_NUM; i ++) {
+ cur_module = memory_pool_alloc (rspamd->cfg->cfg_pool, sizeof (struct module_ctx));
+ if (modules[i].module_init_func(cfg, &cur_module) == 0) {
+ g_hash_table_insert (cfg->c_modules, (gpointer)modules[i].name, cur_module);
+ }
+ }
+
+ rspamd->pid = getpid();
+ rspamd->type = TYPE_MAIN;
+
+ init_signals (&signals, sig_handler);
+ /* Init perl interpreter */
+ PERL_SYS_INIT3 (&argc, &argv, &env);
+ perl_interpreter = perl_alloc ();
+ if (perl_interpreter == NULL) {
+ msg_err ("main: cannot allocate perl interpreter, %m");
+ exit (-errno);
+ }
+
+ my_perl = perl_interpreter;
+ PERL_SET_CONTEXT (perl_interpreter);
+ perl_construct (perl_interpreter);
+ PL_exit_flags |= PERL_EXIT_DESTRUCT_END;
+ perl_parse (perl_interpreter, xs_init, 3, args, NULL);
+ /* Block signals to use sigsuspend in future */
+ sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL);
+
+ if (rspamd->cfg->bind_family == AF_INET) {
+ if ((listen_sock = make_socket (rspamd->cfg->bind_host, rspamd->cfg->bind_port)) == -1) {
+ msg_err ("main: cannot create tcp listen socket. %m");
+ exit(-errno);
+ }
+ }
+ else {
+ un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un));
+ if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) {
+ msg_err ("main: cannot create unix listen socket. %m");
+ exit(-errno);
+ }
+ }
+
+ if (listen (listen_sock, -1) == -1) {
+ msg_err ("main: cannot listen on socket. %m");
+ exit(-errno);
+ }
+
+ TAILQ_INIT (&rspamd->workers);
+
+ setproctitle ("main process");
+
+ for (i = 0; i < cfg->workers_number; i++) {
+ fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);
+ }
+ /* Start controller if enabled */
+ if (cfg->controller_enabled) {
+ fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
+ }
+
+ /* Signal processing cycle */
+ for (;;) {
+ msg_debug ("main: calling sigsuspend");
+ sigemptyset (&signals.sa_mask);
+ sigsuspend (&signals.sa_mask);
+ if (do_terminate) {
+ msg_debug ("main: catch termination signal, waiting for childs");
+ pass_signal_worker (&rspamd->workers, SIGTERM);
+ break;
+ }
+ if (child_dead) {
+ child_dead = 0;
+ msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
+ /* Remove dead child form childs list */
+ wrk = waitpid (0, &res, 0);
+ TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
+ if (wrk == cur->pid) {
+ /* Catch situations if active worker is abnormally terminated */
+ if (cur == active_worker) {
+ active_worker = NULL;
+ }
+ TAILQ_REMOVE(&rspamd->workers, cur, next);
+ if (cur->type == TYPE_CONTROLLER) {
+ msg_info ("main: do not restart dead controller");
+ g_free (cur);
+ break;
+ }
+ if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+ /* Normal worker termination, do not fork one more */
+ msg_info ("main: worker process %d terminated normally", cur->pid);
+ }
+ else {
+ if (WIFSIGNALED (res)) {
+ msg_warn ("main: worker process %d terminated abnormally by signal: %d",
+ cur->pid, WTERMSIG(res));
+ }
+ else {
+ msg_warn ("main: worker process %d terminated abnormally", cur->pid);
+ }
+ /* Fork another worker in replace of dead one */
+ fork_worker (rspamd, listen_sock, 0, cur->type);
+ }
+ g_free (cur);
+ }
+ }
+ }
+ if (do_restart) {
+ do_restart = 0;
+
+ if (active_worker == NULL) {
+ /* Start new worker that would reread configuration*/
+ active_worker = fork_worker (rspamd, listen_sock, 1, TYPE_WORKER);
+ }
+ /* Do not start new workers untill active worker is not ready for accept */
+ }
+ if (child_ready) {
+ child_ready = 0;
+
+ if (active_worker != NULL) {
+ msg_info ("main: worker process %d has been successfully started", active_worker->pid);
+ TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
+ if (cur != active_worker && !cur->is_dying) {
+ /* Send to old workers SIGUSR2 */
+ kill (cur->pid, SIGUSR2);
+ cur->is_dying = 1;
+ }
+ }
+ active_worker = NULL;
+ }
+ }
+ }
+
+ /* Wait for workers termination */
+ while (!TAILQ_EMPTY(&rspamd->workers)) {
+ cur = TAILQ_FIRST(&rspamd->workers);
+ waitpid (cur->pid, &res, 0);
+ msg_debug ("main(cleaning): worker process %d terminated", cur->pid);
+ TAILQ_REMOVE(&rspamd->workers, cur, next);
+ g_free(cur);
+ }
+
+ msg_info ("main: terminating...");
+
+
+ if (rspamd->cfg->bind_family == AF_UNIX) {
+ unlink (rspamd->cfg->bind_host);
+ }
+
+ free_config (rspamd->cfg);
+ g_free (rspamd->cfg);
+ g_free (rspamd);
+
+ return (res);
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/main.h b/src/main.h
new file mode 100644
index 000000000..efb716ab0
--- /dev/null
+++ b/src/main.h
@@ -0,0 +1,201 @@
+#ifndef RPOP_MAIN_H
+#define RPOP_MAIN_H
+
+#include "config.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#ifndef HAVE_OWN_QUEUE_H
+#include <sys/queue.h>
+#else
+#include "queue.h"
+#endif
+#include <sys/time.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <signal.h>
+#include <event.h>
+
+#include "fstring.h"
+#include "mem_pool.h"
+#include "url.h"
+#include "memcached.h"
+#include "protocol.h"
+#include "filter.h"
+
+#include <glib.h>
+#include <gmime/gmime.h>
+
+/* Default values */
+#define FIXED_CONFIG_FILE "./rspamd.conf"
+/* Time in seconds to exit for old worker */
+#define SOFT_SHUTDOWN_TIME 60
+/* Default metric name */
+#define DEFAULT_METRIC "default"
+
+/* Logging in postfix style */
+#define msg_err g_error
+#define msg_warn g_warning
+#define msg_info g_message
+#define msg_debug g_debug
+
+/* Process type: main or worker */
+enum process_type {
+ TYPE_MAIN,
+ TYPE_WORKER,
+ TYPE_CONTROLLER,
+};
+
+/* Filter type */
+enum script_type {
+ SCRIPT_HEADER,
+ SCRIPT_MIME,
+ SCRIPT_URL,
+ SCRIPT_MESSAGE,
+};
+
+/* Logic expression */
+struct expression {
+ enum { EXPR_OPERAND, EXPR_OPERATION } type;
+ union {
+ void *operand;
+ char operation;
+ } content;
+ struct expression *next;
+};
+
+/* Worker process structure */
+struct rspamd_worker {
+ pid_t pid;
+ char is_initialized;
+ char is_dying;
+ TAILQ_ENTRY (rspamd_worker) next;
+ struct rspamd_main *srv;
+ enum process_type type;
+ struct event sig_ev;
+ struct event bind_ev;
+};
+
+struct pidfh;
+struct config_file;
+
+/* Server statistics */
+struct rspamd_stat {
+ unsigned int messages_scanned;
+ unsigned int messages_spam;
+ unsigned int messages_ham;
+ unsigned int connections_count;
+ unsigned int control_connections_count;
+ unsigned int messages_learned;
+};
+
+/* Struct that determine main server object (for logging purposes) */
+struct rspamd_main {
+ struct config_file *cfg;
+ pid_t pid;
+ /* Pid file structure */
+ struct pidfh *pfh;
+ enum process_type type;
+ unsigned int ev_initialized;
+ struct rspamd_stat *stat;
+
+ memory_pool_t *server_pool;
+
+ TAILQ_HEAD (workq, rspamd_worker) workers;
+};
+
+struct mime_part {
+ GMimeContentType *type;
+ GByteArray *content;
+ TAILQ_ENTRY (mime_part) next;
+};
+
+struct save_point {
+ void *entry;
+ enum script_type type;
+ unsigned int saved;
+};
+
+/* Control session */
+struct controller_session {
+ struct rspamd_worker *worker;
+ int sock;
+ /* Access to authorized commands */
+ int authorized;
+ memory_pool_t *session_pool;
+ struct bufferevent *bev;
+ struct config_file *cfg;
+};
+
+/* Worker task structure */
+struct worker_task {
+ struct rspamd_worker *worker;
+ enum {
+ READ_COMMAND,
+ READ_HEADER,
+ READ_MESSAGE,
+ WRITE_REPLY,
+ WRITE_ERROR,
+ WAIT_FILTER,
+ CLOSING_CONNECTION,
+ } state;
+ size_t content_length;
+ enum rspamd_protocol proto;
+ enum rspamd_command cmd;
+ int sock;
+ char *helo;
+ char *from;
+ GList *rcpt;
+ unsigned int nrcpt;
+ struct in_addr from_addr;
+ f_str_buf_t *msg;
+ struct bufferevent *bev;
+ /* Memcached connection for this task */
+ memcached_ctx_t *memc_ctx;
+ unsigned memc_busy:1;
+ /* Number of mime parts */
+ int parts_count;
+ /* Message */
+ GMimeMessage *message;
+ /* All parts of message */
+ TAILQ_HEAD (mime_partq, mime_part) parts;
+ /* URLs extracted from message */
+ TAILQ_HEAD (uriq, uri) urls;
+ /* Hash of metric result structures */
+ GHashTable *results;
+ struct config_file *cfg;
+ /* Save point for filters deferred processing */
+ struct save_point save;
+ /* Saved error message and code */
+ char *last_error;
+ int error_code;
+ /* Memory pool that is associated with this task */
+ memory_pool_t *task_pool;
+};
+
+struct module_ctx {
+ int (*header_filter)(struct worker_task *task);
+ int (*mime_filter)(struct worker_task *task);
+ int (*message_filter)(struct worker_task *task);
+ int (*url_filter)(struct worker_task *task);
+};
+
+struct c_module {
+ const char *name;
+ struct module_ctx *ctx;
+ LIST_ENTRY (c_module) next;
+};
+
+void start_worker (struct rspamd_worker *worker, int listen_sock);
+void start_controller (struct rspamd_worker *worker);
+
+extern sig_atomic_t do_reopen_log;
+
+#endif
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/mem_pool.c b/src/mem_pool.c
new file mode 100644
index 000000000..f116fff73
--- /dev/null
+++ b/src/mem_pool.c
@@ -0,0 +1,360 @@
+#include <sys/types.h>
+#include <glib.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <errno.h>
+#include "config.h"
+
+#ifdef HAVE_SCHED_YIELD
+#include <sched.h>
+#endif
+
+#ifdef HAVE_NANOSLEEP
+#include <time.h>
+#endif
+
+#include "mem_pool.h"
+
+/* Sleep time for spin lock in nanoseconds */
+#define MUTEX_SLEEP_TIME 10000000L
+
+#ifdef _THREAD_SAFE
+pthread_mutex_t stat_mtx = PTHREAD_MUTEX_INITIALIZER;
+#define STAT_LOCK() do { pthread_mutex_lock (&stat_mtx); } while (0)
+#define STAT_UNLOCK() do { pthread_mutex_unlock (&stat_mtx); } while (0)
+#else
+#define STAT_LOCK() do {} while (0)
+#define STAT_UNLOCK() do {} while (0)
+#endif
+
+/*
+ * This define specify whether we should check all pools for free space for new object
+ * or just begin scan from current (recently attached) pool
+ * If MEMORY_GREEDY is defined, then we scan all pools to find free space (more CPU usage, slower
+ * but requires less memory). If it is not defined check only current pool and if object is too large
+ * to place in it allocate new one (this may cause huge CPU usage in some cases too, but generally faster than
+ * greedy method)
+ */
+#undef MEMORY_GREEDY
+
+/* Internal statistic */
+static size_t bytes_allocated = 0;
+static size_t chunks_allocated = 0;
+static size_t chunks_freed = 0;
+static size_t shared_chunks_allocated = 0;
+
+static struct _pool_chain *
+pool_chain_new (size_t size)
+{
+ struct _pool_chain *chain;
+ chain = g_malloc (sizeof (struct _pool_chain));
+ chain->begin = g_malloc (size);
+ chain->len = size;
+ chain->pos = chain->begin;
+ chain->next = NULL;
+ STAT_LOCK ();
+ chunks_allocated ++;
+ STAT_UNLOCK ();
+
+ return chain;
+}
+
+static struct _pool_chain_shared *
+pool_chain_new_shared (size_t size)
+{
+ struct _pool_chain_shared *chain;
+
+#if defined(HAVE_MMAP_ANON)
+ chain = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
+ chain->begin = ((u_char *)chain) + sizeof (struct _pool_chain_shared);
+ if (chain == MAP_FAILED) {
+ return NULL;
+ }
+#elif defined(HAVE_MMAP_ZERO)
+ int fd;
+
+ fd = open ("/dev/zero", O_RDWR);
+ if (fd == -1) {
+ return NULL;
+ }
+ chain = mmap (NULL, shm->size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
+ chain->begin = ((u_char *)chain) + sizeof (struct _pool_chain_shared);
+ if (chain == MAP_FAILED) {
+ return NULL;
+ }
+#else
+# error No mmap methods are defined
+#endif
+ chain->len = size;
+ chain->pos = chain->begin;
+ chain->lock = 0;
+ chain->next = NULL;
+ STAT_LOCK ();
+ shared_chunks_allocated ++;
+ STAT_UNLOCK ();
+
+ return chain;
+}
+
+memory_pool_t*
+memory_pool_new (size_t size)
+{
+ memory_pool_t *new;
+
+ new = g_malloc (sizeof (memory_pool_t));
+ new->cur_pool = pool_chain_new (size);
+ new->shared_pool = NULL;
+ new->first_pool = new->cur_pool;
+ new->destructors = NULL;
+
+ return new;
+}
+
+void *
+memory_pool_alloc (memory_pool_t *pool, size_t size)
+{
+ u_char *tmp;
+ struct _pool_chain *new, *cur;
+
+ if (pool) {
+#ifdef MEMORY_GREEDY
+ cur = pool->first_pool;
+#else
+ cur = pool->cur_pool;
+#endif
+ /* Find free space in pool chain */
+ while (memory_pool_free (cur) < size && cur->next) {
+ cur = cur->next;
+ }
+ if (cur->next == NULL && memory_pool_free (cur) < size) {
+ /* Allocate new pool */
+ if (cur->len >= size) {
+ new = pool_chain_new (cur->len);
+ }
+ else {
+ new = pool_chain_new (size + cur->len);
+ }
+ /* Attach new pool to chain */
+ cur->next = new;
+ pool->cur_pool = new;
+ new->pos += size;
+ STAT_LOCK ();
+ bytes_allocated += size;
+ STAT_UNLOCK ();
+ return new->begin;
+ }
+ tmp = cur->pos;
+ cur->pos += size;
+ STAT_LOCK ();
+ bytes_allocated += size;
+ STAT_UNLOCK ();
+ return tmp;
+ }
+ return NULL;
+}
+
+void *
+memory_pool_alloc0 (memory_pool_t *pool, size_t size)
+{
+ void *pointer = memory_pool_alloc (pool, size);
+ if (pointer) {
+ bzero (pointer, size);
+ }
+ return pointer;
+}
+
+char *
+memory_pool_strdup (memory_pool_t *pool, const char *src)
+{
+ size_t len;
+ char *newstr;
+
+ if (src == NULL) {
+ return NULL;
+ }
+
+ len = strlen (src);
+ newstr = memory_pool_alloc (pool, len + 1);
+ memcpy (newstr, src, len + 1);
+ return newstr;
+}
+
+void *
+memory_pool_alloc_shared (memory_pool_t *pool, size_t size)
+{
+ u_char *tmp;
+ struct _pool_chain_shared *new, *cur;
+
+ if (pool) {
+ cur = pool->shared_pool;
+ if (!cur) {
+ cur = pool_chain_new_shared (pool->first_pool->len);
+ pool->shared_pool = cur;
+ }
+
+ /* Find free space in pool chain */
+ while (memory_pool_free (cur) < size && cur->next) {
+ cur = cur->next;
+ }
+ if (cur->next == NULL && memory_pool_free (cur) < size) {
+ /* Allocate new pool */
+ if (cur->len >= size) {
+ new = pool_chain_new_shared (cur->len);
+ }
+ else {
+ new = pool_chain_new_shared (size + cur->len);
+ }
+ /* Attach new pool to chain */
+ cur->next = new;
+ new->pos += size;
+ STAT_LOCK ();
+ bytes_allocated += size;
+ STAT_UNLOCK ();
+ return new->begin;
+ }
+ tmp = cur->pos;
+ cur->pos += size;
+ STAT_LOCK ();
+ bytes_allocated += size;
+ STAT_UNLOCK ();
+ return tmp;
+ }
+ return NULL;
+}
+
+/* Find pool for a pointer, returns NULL if pointer is not in pool */
+static struct _pool_chain_shared *
+memory_pool_find_pool (memory_pool_t *pool, void *pointer)
+{
+ struct _pool_chain_shared *cur = pool->shared_pool;
+
+ while (cur) {
+ if ((u_char *)pointer >= cur->begin && (u_char *)pointer <= (cur->begin + cur->len)) {
+ return cur;
+ }
+ cur = cur->next;
+ }
+
+ return NULL;
+}
+
+static void
+memory_pool_spin (struct _pool_chain_shared *chain)
+{
+ while (!g_atomic_int_compare_and_exchange (&chain->lock, 0, 1)) {
+ /* lock was aqquired */
+#ifdef HAVE_NANOSLEEP
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = MUTEX_SLEEP_TIME;
+ /* Spin */
+ while (nanosleep (&ts, &ts) == -1 && errno == EINTR);
+#endif
+#ifdef HAVE_SCHED_YIELD
+ (void)sched_yield ();
+#endif
+#if !defined(HAVE_NANOSLEEP) && !defined(HAVE_SCHED_YIELD)
+# error No methods to spin are defined
+#endif
+ }
+}
+
+/* Simple implementation of spinlock */
+void
+memory_pool_lock_shared (memory_pool_t *pool, void *pointer)
+{
+ struct _pool_chain_shared *chain;
+
+ chain = memory_pool_find_pool (pool, pointer);
+ if (chain == NULL) {
+ return;
+ }
+
+ memory_pool_spin (chain);
+}
+
+void memory_pool_unlock_shared (memory_pool_t *pool, void *pointer)
+{
+ struct _pool_chain_shared *chain;
+
+ chain = memory_pool_find_pool (pool, pointer);
+ if (chain == NULL) {
+ return;
+ }
+
+ (void)g_atomic_int_dec_and_test (&chain->lock);
+}
+
+void
+memory_pool_add_destructor (memory_pool_t *pool, pool_destruct_func func, void *data)
+{
+ struct _pool_destructors *cur;
+
+ cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors));
+ if (cur) {
+ cur->func = func;
+ cur->data = data;
+ cur->prev = pool->destructors;
+ pool->destructors = cur;
+ }
+}
+
+void
+memory_pool_delete (memory_pool_t *pool)
+{
+ struct _pool_chain *cur = pool->first_pool, *tmp;
+ struct _pool_chain_shared *cur_shared = pool->shared_pool, *tmp_shared;
+ struct _pool_destructors *destructor = pool->destructors;
+
+ /* Call all pool destructors */
+ while (destructor) {
+ destructor->func (destructor->data);
+ destructor = destructor->prev;
+ }
+
+ while (cur) {
+ tmp = cur;
+ cur = cur->next;
+ g_free (tmp->begin);
+ g_free (tmp);
+ STAT_LOCK ();
+ chunks_freed ++;
+ STAT_UNLOCK ();
+ }
+ /* Unmap shared memory */
+ while (cur_shared) {
+ tmp_shared = cur_shared;
+ cur_shared = cur_shared->next;
+ munmap (tmp_shared, tmp_shared->len + sizeof (struct _pool_chain_shared));
+ STAT_LOCK ();
+ chunks_freed ++;
+ STAT_UNLOCK ();
+ }
+
+ g_free (pool);
+}
+
+void
+memory_pool_stat (memory_pool_stat_t *st)
+{
+ st->bytes_allocated = bytes_allocated;
+ st->chunks_allocated = chunks_allocated;
+ st->shared_chunks_allocated = shared_chunks_allocated;
+ st->chunks_freed = chunks_freed;
+}
+
+#define FIXED_POOL_SIZE 4095
+size_t
+memory_pool_get_size ()
+{
+#ifdef HAVE_GETPAGESIZE
+ return getpagesize () - 1;
+#else
+ return FIXED_POOL_SIZE;
+#endif
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/mem_pool.h b/src/mem_pool.h
new file mode 100644
index 000000000..4027b5de2
--- /dev/null
+++ b/src/mem_pool.h
@@ -0,0 +1,61 @@
+#ifndef RSPAMD_MEM_POOL_H
+#define RSPAMD_MEM_POOL_H
+
+#include <sys/types.h>
+#include <glib.h>
+
+typedef void (*pool_destruct_func)(void *ptr);
+
+struct _pool_chain {
+ u_char *begin;
+ u_char *pos;
+ size_t len;
+ struct _pool_chain *next;
+};
+
+struct _pool_chain_shared {
+ u_char *begin;
+ u_char *pos;
+ size_t len;
+ gint lock;
+ struct _pool_chain_shared *next;
+};
+
+struct _pool_destructors {
+ pool_destruct_func func;
+ void *data;
+ struct _pool_destructors *prev;
+};
+
+typedef struct memory_pool_s {
+ struct _pool_chain *cur_pool;
+ struct _pool_chain *first_pool;
+ struct _pool_chain_shared *shared_pool;
+ struct _pool_destructors *destructors;
+} memory_pool_t;
+
+typedef struct memory_pool_stat_s {
+ size_t bytes_allocated;
+ size_t chunks_allocated;
+ size_t shared_chunks_allocated;
+ size_t chunks_freed;
+} memory_pool_stat_t;
+
+memory_pool_t* memory_pool_new (size_t size);
+void* memory_pool_alloc (memory_pool_t* pool, size_t size);
+void* memory_pool_alloc0 (memory_pool_t* pool, size_t size);
+char* memory_pool_strdup (memory_pool_t* pool, const char *src);
+void memory_pool_add_destructor (memory_pool_t *pool, pool_destruct_func func, void *data);
+void* memory_pool_alloc_shared (memory_pool_t *pool, size_t size);
+void memory_pool_lock_shared (memory_pool_t *pool, void *pointer);
+void memory_pool_unlock_shared (memory_pool_t *pool, void *pointer);
+void memory_pool_delete (memory_pool_t* pool);
+
+void memory_pool_stat (memory_pool_stat_t *st);
+
+/* Get optimal pool size based on page size for this system */
+size_t memory_pool_get_size ();
+
+#define memory_pool_free(x) ((x)->len - ((x)->pos - (x)->begin))
+
+#endif
diff --git a/src/memcached-test.c b/src/memcached-test.c
new file mode 100644
index 000000000..c258673bd
--- /dev/null
+++ b/src/memcached-test.c
@@ -0,0 +1,79 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <event.h>
+
+#include "upstream.h"
+#include "memcached.h"
+
+#define HOST "127.0.0.1"
+#define PORT 11211
+
+memcached_param_t cur_param;
+
+static void
+test_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ int s;
+ int r;
+ int *num = ((int *)data);
+ printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error));
+ /* Connect */
+ if (*num == 0) {
+ printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+ s = 1;
+ r = memc_set (ctx, &cur_param, &s, 60);
+ (*num)++;
+ }
+ else if (*num == 1) {
+ printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+ s = 1;
+ r = memc_get (ctx, &cur_param, &s);
+ (*num)++;
+ }
+ else {
+ printf ("Got value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+ event_loopexit (NULL);
+ }
+}
+
+
+int
+main (int argc, char **argv)
+{
+ memcached_ctx_t mctx;
+ char *addr, buf[512];
+ int num = 0;
+
+ event_init ();
+ strcpy (cur_param.key, "testkey");
+ strcpy (buf, "test_value");
+ cur_param.buf = buf;
+ cur_param.bufsize = sizeof ("test_value") - 1;
+
+ if (argc == 2) {
+ addr = argv[1];
+ }
+ else {
+ addr = HOST;
+ }
+
+ mctx.protocol = TCP_TEXT;
+ mctx.timeout.tv_sec = 1;
+ mctx.timeout.tv_usec = 0;
+ mctx.port = htons (PORT);
+ mctx.options = MEMC_OPT_DEBUG;
+ mctx.callback = test_memc_callback;
+ /* XXX: it is wrong to use local variable pointer here */
+ mctx.callback_data = (void *)&num;
+ inet_aton (addr, &mctx.addr);
+
+ memc_init_ctx (&mctx);
+
+ event_loop (0);
+ return 0;
+}
diff --git a/src/memcached.c b/src/memcached.c
new file mode 100644
index 000000000..05ae16617
--- /dev/null
+++ b/src/memcached.c
@@ -0,0 +1,792 @@
+#ifdef _THREAD_SAFE
+#include <pthread.h>
+#endif
+
+#include <stdarg.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/param.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <syslog.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/uio.h>
+#include <event.h>
+#include <glib.h>
+
+#include "memcached.h"
+
+#define CRLF "\r\n"
+#define END_TRAILER "END" CRLF
+#define STORED_TRAILER "STORED" CRLF
+#define NOT_STORED_TRAILER "NOT STORED" CRLF
+#define EXISTS_TRAILER "EXISTS" CRLF
+#define DELETED_TRAILER "DELETED" CRLF
+#define NOT_FOUND_TRAILER "NOT_FOUND" CRLF
+#define CLIENT_ERROR_TRAILER "CLIENT_ERROR"
+#define SERVER_ERROR_TRAILER "SERVER_ERROR"
+
+#define READ_BUFSIZ 1500
+#define MAX_RETRIES 3
+
+/* Header for udp protocol */
+struct memc_udp_header
+{
+ uint16_t req_id;
+ uint16_t seq_num;
+ uint16_t dg_sent;
+ uint16_t unused;
+};
+
+static void socket_callback (int fd, short what, void *arg);
+static int memc_parse_header (char *buf, size_t *len, char **end);
+
+/*
+ * Write to syslog if OPT_DEBUG is specified
+ */
+static void
+memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...)
+{
+ va_list args;
+ if (ctx->options & MEMC_OPT_DEBUG) {
+ va_start (args, fmt);
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
+ g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args);
+ va_end (args);
+ }
+}
+
+/*
+ * Callback for write command
+ */
+static void
+write_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[4];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize);
+ memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ if (ctx->param->bufpos == 0) {
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ }
+ else {
+ iov[1].iov_base = NULL;
+ iov[1].iov_len = 0;
+ }
+ iov[2].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[3].iov_base = CRLF;
+ iov[3].iov_len = sizeof (CRLF) - 1;
+ writev (ctx->sock, iov, 4);
+ }
+ else {
+ iov[0].iov_base = read_buf;
+ iov[0].iov_len = r;
+ iov[1].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[2].iov_base = CRLF;
+ iov[2].iov_len = sizeof (CRLF) - 1;
+ writev (ctx->sock, iov, 3);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf);
+ /* Increment count */
+ ctx->count++;
+ event_del (&ctx->mem_ev);
+ if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ }
+ else if (what == EV_TIMEOUT) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+}
+
+/*
+ * Callback for read command
+ */
+static void
+read_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ char *p;
+ ssize_t r;
+ size_t datalen;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+ int retries = 0, t;
+
+ if (what == EV_WRITE) {
+ /* Send command to memcached */
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+ memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
+ retries++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+
+ if (r > 0) {
+ read_buf[r] = 0;
+ if (ctx->param->bufpos == 0) {
+ t = memc_parse_header (read_buf, &datalen, &p);
+ if (t < 0) {
+ event_del (&ctx->mem_ev);
+ memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+ else if (t == 0) {
+ memc_log (ctx, __LINE__, "memc_read: record does not exists");
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ return;
+ }
+
+ if (datalen > ctx->param->bufsize) {
+ memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
+ return;
+ }
+ /* Check if we already have all data in buffer */
+ if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
+ /* Store all data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
+ /* Increment count */
+ ctx->count++;
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ return;
+ }
+ /* Subtract from sum parsed header's length */
+ r -= p - read_buf;
+ }
+ else {
+ p = read_buf;
+ }
+
+ if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2,
+ END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+ r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ return;
+ }
+ /* Store this part of data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ ctx->param->bufpos += r;
+ }
+ else {
+ memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+
+ ctx->count++;
+ }
+ else if (what == EV_TIMEOUT) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+
+}
+
+/*
+ * Callback for delete command
+ */
+static void
+delete_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+ r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ ctx->param->bufpos = writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ /* Increment count */
+ ctx->count++;
+ event_del (&ctx->mem_ev);
+ if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ }
+ else if (what == EV_TIMEOUT) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+}
+
+/*
+ * Callback for our socket events
+ */
+static void
+socket_callback (int fd, short what, void *arg)
+{
+ memcached_ctx_t *ctx = (memcached_ctx_t *)arg;
+
+ switch (ctx->op) {
+ case CMD_NULL:
+ /* Do nothing here */
+ break;
+ case CMD_CONNECT:
+ /* We have write readiness after connect call, so reinit event */
+ ctx->cmd = "connect";
+ if (what == EV_WRITE) {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ ctx->alive = 1;
+ }
+ else {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ ctx->alive = 0;
+ }
+ break;
+ case CMD_WRITE:
+ write_handler (fd, what, ctx);
+ break;
+ case CMD_READ:
+ read_handler (fd, what, ctx);
+ break;
+ case CMD_DELETE:
+ delete_handler (fd, what, ctx);
+ break;
+ }
+}
+
+/*
+ * Common callback function for memcached operations if no user's callback is specified
+ */
+static void
+common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error));
+}
+
+/*
+ * Make socket for udp connection
+ */
+static int
+memc_make_udp_sock (memcached_ctx_t *ctx)
+{
+ struct sockaddr_in sc;
+ int ofl;
+
+ bzero (&sc, sizeof (struct sockaddr_in *));
+ sc.sin_family = AF_INET;
+ sc.sin_port = ctx->port;
+ memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr));
+
+ ctx->sock = socket (PF_INET, SOCK_DGRAM, 0);
+
+ if (ctx->sock == -1) {
+ memc_log (ctx, __LINE__, "memc_make_udp_sock: socket() failed: %m");
+ return -1;
+ }
+
+ /* set nonblocking */
+ ofl = fcntl(ctx->sock, F_GETFL, 0);
+ fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK);
+
+ /*
+ * Call connect to set default destination for datagrams
+ * May not block
+ */
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
+ return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in));
+}
+
+/*
+ * Make socket for tcp connection
+ */
+static int
+memc_make_tcp_sock (memcached_ctx_t *ctx)
+{
+ struct sockaddr_in sc;
+ int ofl, r;
+
+ bzero (&sc, sizeof (struct sockaddr_in *));
+ sc.sin_family = AF_INET;
+ sc.sin_port = ctx->port;
+ memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr));
+
+ ctx->sock = socket (PF_INET, SOCK_STREAM, 0);
+
+ if (ctx->sock == -1) {
+ memc_log (ctx, __LINE__, "memc_make_tcp_sock: socket() failed: %m");
+ return -1;
+ }
+
+ /* set nonblocking */
+ ofl = fcntl(ctx->sock, F_GETFL, 0);
+ fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK);
+
+ if ((r = connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) {
+ if (errno != EINPROGRESS) {
+ close (ctx->sock);
+ ctx->sock = -1;
+ memc_log (ctx, __LINE__, "memc_make_tcp_sock: connect() failed: %m");
+ return -1;
+ }
+ }
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ return 0;
+}
+
+/*
+ * Parse VALUE reply from server and set len argument to value returned by memcached
+ */
+static int
+memc_parse_header (char *buf, size_t *len, char **end)
+{
+ char *p, *c;
+ int i;
+
+ /* VALUE <key> <flags> <bytes> [<cas unique>]\r\n */
+ c = strstr (buf, CRLF);
+ if (c == NULL) {
+ return -1;
+ }
+ *end = c + sizeof (CRLF) - 1;
+
+ if (strncmp (buf, "VALUE ", sizeof ("VALUE ") - 1) == 0) {
+ p = buf + sizeof ("VALUE ") - 1;
+
+ /* Read bytes value and ignore all other fields, such as flags and key */
+ for (i = 0; i < 2; i++) {
+ while (p++ < c && *p != ' ');
+
+ if (p > c) {
+ return -1;
+ }
+ }
+ *len = strtoul (p, &c, 10);
+ return 1;
+ }
+ /* If value not found memcached return just END\r\n , in this case return 0 */
+ else if (strncmp (buf, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+ return 0;
+ }
+
+ return -1;
+}
+
+
+/*
+ * Common read command handler for memcached
+ */
+memc_error_t
+memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param)
+{
+ ctx->cmd = cmd;
+ ctx->op = CMD_READ;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+
+ return OK;
+}
+
+/*
+ * Common write command handler for memcached
+ */
+memc_error_t
+memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire)
+{
+ ctx->cmd = cmd;
+ ctx->op = CMD_WRITE;
+ ctx->param = param;
+ param->expire = expire;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+
+ return OK;
+}
+/*
+ * Delete command handler
+ */
+memc_error_t
+memc_delete (memcached_ctx_t *ctx, memcached_param_t *param)
+{
+ ctx->cmd = "delete";
+ ctx->op = CMD_DELETE;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+
+ return OK;
+}
+
+/*
+ * Write handler for memcached mirroring
+ * writing is done to each memcached server
+ */
+memc_error_t
+memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire)
+{
+ memc_error_t r, result = OK;
+
+ while (memcached_num --) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_write (&ctx[memcached_num], cmd, param, expire);
+ if (r != OK) {
+ memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r));
+ result = r;
+ ctx[memcached_num].alive = 0;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Read handler for memcached mirroring
+ * reading is done from first active memcached server
+ */
+memc_error_t
+memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
+{
+ memc_error_t r, result = OK;
+
+ while (memcached_num --) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_read (&ctx[memcached_num], cmd, param);
+ if (r != OK) {
+ result = r;
+ if (r != NOT_EXISTS) {
+ ctx[memcached_num].alive = 0;
+ memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: cannot write read from mirror server: %s", memc_strerror (r));
+ }
+ else {
+ memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: record not exists", memc_strerror (r));
+ }
+ }
+ else {
+ break;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Delete handler for memcached mirroring
+ * deleting is done for each active memcached server
+ */
+memc_error_t
+memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
+{
+ memc_error_t r, result = OK;
+
+ while (memcached_num --) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_delete (&ctx[memcached_num], param);
+ if (r != OK) {
+ result = r;
+ if (r != NOT_EXISTS) {
+ ctx[memcached_num].alive = 0;
+ memc_log (&ctx[memcached_num], __LINE__, "memc_delete_mirror: cannot delete from mirror server: %s", memc_strerror (r));
+ }
+ }
+ }
+ }
+
+ return result;
+}
+
+
+/*
+ * Initialize memcached context for specified protocol
+ */
+int
+memc_init_ctx (memcached_ctx_t *ctx)
+{
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ ctx->count = 0;
+ ctx->alive = 0;
+ ctx->op = CMD_NULL;
+ /* Set default callback */
+ if (ctx->callback == NULL) {
+ ctx->callback = common_memc_callback;
+ }
+
+ switch (ctx->protocol) {
+ case UDP_TEXT:
+ return memc_make_udp_sock (ctx);
+ break;
+ case TCP_TEXT:
+ return memc_make_tcp_sock (ctx);
+ break;
+ /* Not implemented */
+ case UDP_BIN:
+ case TCP_BIN:
+ default:
+ return -1;
+ }
+}
+/*
+ * Mirror init
+ */
+int
+memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num)
+{
+ int r, result = -1;
+ while (memcached_num--) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_init_ctx (&ctx[memcached_num]);
+ if (r == -1) {
+ ctx[memcached_num].alive = 0;
+ memc_log (&ctx[memcached_num], __LINE__, "memc_init_ctx_mirror: cannot connect to server");
+ }
+ else {
+ result = 1;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Close context connection
+ */
+int
+memc_close_ctx (memcached_ctx_t *ctx)
+{
+ if (ctx != NULL && ctx->sock != -1) {
+ event_del (&ctx->mem_ev);
+ return close (ctx->sock);
+ }
+
+ return -1;
+}
+/*
+ * Mirror close
+ */
+int
+memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num)
+{
+ int r = 0;
+ while (memcached_num--) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_close_ctx (&ctx[memcached_num]);
+ if (r == -1) {
+ memc_log (&ctx[memcached_num], __LINE__, "memc_close_ctx_mirror: cannot close connection to server properly");
+ ctx[memcached_num].alive = 0;
+ }
+ }
+ }
+
+ return r;
+}
+
+
+const char * memc_strerror (memc_error_t err)
+{
+ const char *p;
+
+ switch (err) {
+ case OK:
+ p = "Ok";
+ break;
+ case BAD_COMMAND:
+ p = "Bad command";
+ break;
+ case CLIENT_ERROR:
+ p = "Client error";
+ break;
+ case SERVER_ERROR:
+ p = "Server error";
+ break;
+ case SERVER_TIMEOUT:
+ p = "Server timeout";
+ break;
+ case NOT_EXISTS:
+ p = "Key not found";
+ break;
+ case EXISTS:
+ p = "Key already exists";
+ break;
+ case WRONG_LENGTH:
+ p = "Wrong result length";
+ break;
+ default:
+ p = "Unknown error";
+ break;
+ }
+
+ return p;
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/memcached.h b/src/memcached.h
new file mode 100644
index 000000000..46bcae465
--- /dev/null
+++ b/src/memcached.h
@@ -0,0 +1,142 @@
+#ifndef MEMCACHED_H
+#define MEMCACHED_H
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/time.h>
+#include <time.h>
+
+#define MAXKEYLEN 250
+
+#define MEMC_OPT_DEBUG 0x1
+
+struct event;
+
+typedef enum memc_error {
+ OK,
+ BAD_COMMAND,
+ CLIENT_ERROR,
+ SERVER_ERROR,
+ SERVER_TIMEOUT,
+ NOT_EXISTS,
+ EXISTS,
+ WRONG_LENGTH
+} memc_error_t;
+
+/* XXX: Only UDP_TEXT is supported at present */
+typedef enum memc_proto {
+ UDP_TEXT,
+ TCP_TEXT,
+ UDP_BIN,
+ TCP_BIN
+} memc_proto_t;
+
+typedef enum memc_op {
+ CMD_NULL,
+ CMD_CONNECT,
+ CMD_READ,
+ CMD_WRITE,
+ CMD_DELETE,
+} memc_opt_t;
+
+typedef struct memcached_param_s {
+ char key[MAXKEYLEN];
+ u_char *buf;
+ size_t bufsize;
+ size_t bufpos;
+ int expire;
+} memcached_param_t;
+
+
+/* Port must be in network byte order */
+typedef struct memcached_ctx_s {
+ memc_proto_t protocol;
+ struct in_addr addr;
+ uint16_t port;
+ int sock;
+ struct timeval timeout;
+ /* Counter that is used for memcached operations in network byte order */
+ uint16_t count;
+ /* Flag that signalize that this memcached is alive */
+ short alive;
+ /* Options that can be specified for memcached connection */
+ short options;
+ /* Current operation */
+ memc_opt_t op;
+ /* Current command */
+ const char *cmd;
+ /* Current param */
+ memcached_param_t *param;
+ /* Callback for current operation */
+ void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data);
+ /* Data for callback function */
+ void *callback_data;
+ /* Event structure */
+ struct event mem_ev;
+} memcached_ctx_t;
+
+typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data);
+
+/*
+ * Initialize connection to memcached server:
+ * addr, port and timeout fields in ctx must be filled with valid values
+ * Return:
+ * 0 - success
+ * -1 - error (error is stored in errno)
+ */
+int memc_init_ctx (memcached_ctx_t *ctx);
+int memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num);
+/*
+ * Memcached function for getting, setting, adding values to memcached server
+ * ctx - valid memcached context
+ * key - key to extract (max 250 characters as it specified in memcached API)
+ * buf, elemsize, nelem - allocated buffer of length nelem structures each of elemsize
+ * that would contain extracted data (NOT NULL TERMINATED)
+ * Return:
+ * memc_error_t
+ * nelem is changed according to actual number of extracted data
+ *
+ * "set" means "store this data".
+ *
+ * "add" means "store this data, but only if the server *doesn't* already
+ * hold data for this key".
+
+ * "replace" means "store this data, but only if the server *does*
+ * already hold data for this key".
+
+ * "append" means "add this data to an existing key after existing data".
+
+ * "prepend" means "add this data to an existing key before existing data".
+ */
+#define memc_get(ctx, param) memc_read(ctx, "get", param)
+#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire)
+#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire)
+#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire)
+#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire)
+#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire)
+
+/* Functions that works with mirror of memcached servers */
+#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param)
+#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire)
+#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire)
+#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire)
+#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire)
+#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire)
+
+
+memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param);
+memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params);
+
+memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
+memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
+
+/* Return symbolic name of memcached error*/
+const char * memc_strerror (memc_error_t err);
+
+/* Destroy socket from ctx */
+int memc_close_ctx (memcached_ctx_t *ctx);
+int memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num);
+
+#endif
diff --git a/src/perl.c b/src/perl.c
new file mode 100644
index 000000000..870283835
--- /dev/null
+++ b/src/perl.c
@@ -0,0 +1,190 @@
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <syslog.h>
+
+#include <glib.h>
+
+#include <EXTERN.h> /* from the Perl distribution */
+#include <perl.h> /* from the Perl distribution */
+
+#include "url.h"
+#include "main.h"
+#include "perl.h"
+
+extern PerlInterpreter *my_perl;
+
+int
+perl_call_header_filter (const char *function, struct worker_task *task)
+{
+ int result;
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ PUTBACK;
+
+ call_pv (function, G_SCALAR);
+
+ SPAGAIN;
+
+ result = POPi;
+ msg_debug ("header_filter: call of %s with returned mark %d\n", function, result);
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ return result;
+}
+
+int
+perl_call_mime_filter (const char *function, struct worker_task *task)
+{
+ int result;
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ PUTBACK;
+
+ call_pv (function, G_SCALAR);
+
+ SPAGAIN;
+
+ result = POPi;
+ msg_debug ("mime_filter: call of %s returned mark %d\n", function, result);
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ return result;
+}
+
+int
+perl_call_message_filter (const char *function, struct worker_task *task)
+{
+ int result;
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ PUTBACK;
+
+ call_pv (function, G_SCALAR);
+
+ SPAGAIN;
+
+ result = POPi;
+ msg_debug ("message_filter: call of %s returned mark %d\n", function, result);
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ return result;
+}
+
+int
+perl_call_url_filter (const char *function, struct worker_task *task)
+{
+ int result;
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ PUTBACK;
+
+ call_pv (function, G_SCALAR);
+
+ SPAGAIN;
+
+ result = POPi;
+ msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result);
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ return result;
+}
+
+int
+perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number)
+{
+ int result, i;
+ AV *av;
+
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+ av = newAV();
+ av_extend (av, number);
+ for (i = 0; i < number; i ++) {
+ av_push (av, sv_2mortal (newSViv (marks[i])));
+ }
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ XPUSHs (sv_2mortal ((SV *)AvARRAY (av)));
+ PUTBACK;
+
+ call_pv (function, G_SCALAR);
+
+ SPAGAIN;
+
+ result = POPi;
+ msg_debug ("chain_filter: call of %s returned mark %d\n", function, result);
+
+ PUTBACK;
+ FREETMPS;
+ av_undef (av);
+ LEAVE;
+
+
+ return result;
+}
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ struct {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data = data;
+
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (callback_data->task))));
+ XPUSHs (sv_2mortal (newSViv (error)));
+ XPUSHs (sv_2mortal (newSVpv (ctx->param->buf, ctx->param->bufsize)));
+ PUTBACK;
+
+ call_sv (callback_data->callback, G_SCALAR);
+
+ /* Set save point */
+ callback_data->task->save.saved = 0;
+ process_filters (callback_data->task);
+
+ SPAGAIN;
+ FREETMPS;
+ LEAVE;
+
+}
diff --git a/src/perl.h b/src/perl.h
new file mode 100644
index 000000000..9a37634e3
--- /dev/null
+++ b/src/perl.h
@@ -0,0 +1,19 @@
+#ifndef RSPAM_PERL_H
+#define RSPAM_PERL_H
+
+#include <sys/types.h>
+#include <glib.h>
+#include "memcached.h"
+
+struct uri;
+struct worker_task;
+
+int perl_call_header_filter (const char *function, struct worker_task *task);
+int perl_call_mime_filter (const char *function, struct worker_task *task);
+int perl_call_message_filter (const char *function, struct worker_task *task);
+int perl_call_url_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number);
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
+
+#endif
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
new file mode 100644
index 000000000..f82543a7e
--- /dev/null
+++ b/src/plugins/regexp.c
@@ -0,0 +1,247 @@
+/***MODULE:regexp
+ * rspamd module that implements different regexp rules
+ */
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <sys/param.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <evdns.h>
+
+#include "../config.h"
+#include "../main.h"
+#include "../modules.h"
+#include "../cfg_file.h"
+
+struct regexp_module_item {
+ struct expression *expr;
+ int regexp_number;
+ int op_number;
+ char *symbol;
+};
+
+struct regexp_ctx {
+ int (*header_filter)(struct worker_task *task);
+ int (*mime_filter)(struct worker_task *task);
+ int (*message_filter)(struct worker_task *task);
+ int (*url_filter)(struct worker_task *task);
+ GList *items;
+ char *metric;
+
+ memory_pool_t *regexp_pool;
+};
+
+static struct regexp_ctx *regexp_module_ctx = NULL;
+
+static int regexp_common_filter (struct worker_task *task);
+
+int
+regexp_module_init (struct config_file *cfg, struct module_ctx **ctx)
+{
+ regexp_module_ctx = g_malloc (sizeof (struct regexp_ctx));
+
+ regexp_module_ctx->header_filter = regexp_common_filter;
+ regexp_module_ctx->mime_filter = NULL;
+ regexp_module_ctx->message_filter = NULL;
+ regexp_module_ctx->url_filter = NULL;
+ regexp_module_ctx->regexp_pool = memory_pool_new (1024);
+ regexp_module_ctx->items = NULL;
+
+ return 0;
+}
+
+static void
+read_regexp_expression (memory_pool_t *pool, struct regexp_module_item *chain, char *line)
+{
+ struct expression *e, *cur;
+
+ e = parse_expression (regexp_module_ctx->regexp_pool, line);
+ chain->expr = e;
+ cur = e;
+ while (cur) {
+ if (cur->type == EXPR_OPERAND) {
+ cur->content.operand = parse_regexp (pool, cur->content.operand);
+ chain->regexp_number ++;
+ }
+ else {
+ chain->op_number ++;
+ }
+ cur = cur->next;
+ }
+}
+
+int
+regexp_module_config (struct config_file *cfg)
+{
+ LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = NULL;
+ struct module_opt *cur;
+ struct regexp_module_item *cur_item;
+ char *value;
+
+ if ((value = get_module_opt (cfg, "regexp", "metric")) != NULL) {
+ regexp_module_ctx->metric = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
+ g_free (value);
+ }
+ else {
+ regexp_module_ctx->metric = DEFAULT_METRIC;
+ }
+
+ cur_module_opt = g_hash_table_lookup (cfg->modules_opts, "regexp");
+ if (cur_module_opt != NULL) {
+ LIST_FOREACH (cur, cur_module_opt, next) {
+ if (strcmp (cur->param, "metric") == 0) {
+ continue;
+ }
+ cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item));
+ cur_item->symbol = cur->param;
+ read_regexp_expression (regexp_module_ctx->regexp_pool, cur_item, cur->value);
+ regexp_module_ctx->items = g_list_prepend (regexp_module_ctx->items, cur_item);
+ }
+ }
+
+ return 0;
+}
+
+int
+regexp_module_reconfig (struct config_file *cfg)
+{
+ memory_pool_delete (regexp_module_ctx->regexp_pool);
+ regexp_module_ctx->regexp_pool = memory_pool_new (1024);
+
+ return regexp_module_config (cfg);
+}
+
+static gsize
+process_regexp (struct rspamd_regexp *re, struct worker_task *task)
+{
+ char *headerv;
+ struct mime_part *part;
+ struct uri *url;
+
+ switch (re->type) {
+ case REGEXP_NONE:
+ return 0;
+ case REGEXP_HEADER:
+ if (re->header == NULL) {
+ msg_info ("process_regexp: header regexp without header name");
+ return 0;
+ }
+ msg_debug ("process_regexp: checking header regexp: %s = /%s/", re->header, re->regexp_text);
+ headerv = (char *)g_mime_message_get_header (task->message, re->header);
+ if (headerv == NULL) {
+ return 0;
+ }
+ else {
+ if (re->regexp == NULL) {
+ msg_debug ("process_regexp: regexp contains only header and it is found %s", re->header);
+ return 1;
+ }
+ if (g_regex_match (re->regexp, headerv, 0, NULL) == TRUE) {
+ return 1;
+ }
+ else {
+ return 0;
+ }
+ }
+ break;
+ case REGEXP_MIME:
+ msg_debug ("process_regexp: checking mime regexp: /%s/", re->regexp_text);
+ TAILQ_FOREACH (part, &task->parts, next) {
+ if (g_regex_match_full (re->regexp, part->content->data, part->content->len, 0, 0, NULL, NULL) == TRUE) {
+ return 1;
+ }
+ }
+ return 0;
+ case REGEXP_MESSAGE:
+ msg_debug ("process_message: checking mime regexp: /%s/", re->regexp_text);
+ if (g_regex_match_full (re->regexp, task->msg->buf->begin, task->msg->buf->len, 0, 0, NULL, NULL) == TRUE) {
+ return 1;
+ }
+ return 0;
+ case REGEXP_URL:
+ msg_debug ("process_url: checking mime regexp: /%s/", re->regexp_text);
+ TAILQ_FOREACH (url, &task->urls, next) {
+ if (g_regex_match (re->regexp, struri (url), 0, NULL) == TRUE) {
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ /* Not reached */
+ return 0;
+}
+
+static void
+process_regexp_item (struct regexp_module_item *item, struct worker_task *task)
+{
+ GQueue *stack;
+ gsize cur, op1, op2;
+ struct expression *it = item->expr;
+
+ stack = g_queue_new ();
+
+ while (it) {
+ if (it->type == EXPR_OPERAND) {
+ /* Find corresponding symbol */
+ cur = process_regexp ((struct rspamd_regexp *)it->content.operand, task);
+ msg_debug ("process_regexp_item: regexp %s found", cur ? "is" : "is not");
+ g_queue_push_head (stack, GSIZE_TO_POINTER (cur));
+ }
+ else {
+ if (g_queue_is_empty (stack)) {
+ /* Queue has no operands for operation, exiting */
+ g_queue_free (stack);
+ return;
+ }
+ switch (it->content.operation) {
+ case '!':
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ op1 = !op1;
+ g_queue_push_head (stack, GSIZE_TO_POINTER (op1));
+ break;
+ case '&':
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ g_queue_push_head (stack, GSIZE_TO_POINTER (op1 && op2));
+ case '|':
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ op2 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ g_queue_push_head (stack, GSIZE_TO_POINTER (op1 || op2));
+ default:
+ it = it->next;
+ continue;
+ }
+ }
+ it = it->next;
+ }
+ if (!g_queue_is_empty (stack)) {
+ op1 = GPOINTER_TO_SIZE (g_queue_pop_head (stack));
+ if (op1) {
+ /* Add symbol to results */
+ insert_result (task, regexp_module_ctx->metric, item->symbol, op1);
+ }
+ }
+
+ g_queue_free (stack);
+}
+
+static int
+regexp_common_filter (struct worker_task *task)
+{
+ GList *cur_expr = g_list_first (regexp_module_ctx->items);
+
+ while (cur_expr) {
+ process_regexp_item ((struct regexp_module_item *)cur_expr->data, task);
+ cur_expr = g_list_next (cur_expr);
+ }
+}
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
new file mode 100644
index 000000000..c90a8419e
--- /dev/null
+++ b/src/plugins/surbl.c
@@ -0,0 +1,593 @@
+/***MODULE:surbl
+ * rspamd module that implements SURBL url checking
+ */
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <sys/param.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <evdns.h>
+
+#include "../config.h"
+#include "../main.h"
+#include "../modules.h"
+#include "../cfg_file.h"
+#include "../memcached.h"
+
+#define DEFAULT_REDIRECTOR_PORT 8080
+#define DEFAULT_SURBL_WEIGHT 10
+#define DEFAULT_REDIRECTOR_CONNECT_TIMEOUT 1000
+#define DEFAULT_REDIRECTOR_READ_TIMEOUT 5000
+#define DEFAULT_SURBL_MAX_URLS 1000
+#define DEFAULT_SURBL_URL_EXPIRE 86400
+#define DEFAULT_SURBL_SYMBOL "SURBL_DNS"
+#define DEFAULT_SURBL_SUFFIX "multi.surbl.org"
+
+struct surbl_ctx {
+ int (*header_filter)(struct worker_task *task);
+ int (*mime_filter)(struct worker_task *task);
+ int (*message_filter)(struct worker_task *task);
+ int (*url_filter)(struct worker_task *task);
+ struct in_addr redirector_addr;
+ uint16_t redirector_port;
+ uint16_t weight;
+ unsigned int connect_timeout;
+ unsigned int read_timeout;
+ unsigned int max_urls;
+ unsigned int url_expire;
+ char *suffix;
+ char *symbol;
+ char *metric;
+ GHashTable *hosters;
+ GHashTable *whitelist;
+ unsigned use_redirector;
+ memory_pool_t *surbl_pool;
+};
+
+struct redirector_param {
+ struct uri *url;
+ struct worker_task *task;
+ enum {
+ STATE_CONNECT,
+ STATE_READ,
+ } state;
+ struct event ev;
+ int sock;
+};
+
+struct memcached_param {
+ struct uri *url;
+ struct worker_task *task;
+ memcached_ctx_t *ctx;
+};
+
+static char *hash_fill = "1";
+struct surbl_ctx *surbl_module_ctx;
+GRegex *extract_hoster_regexp, *extract_normal_regexp, *extract_numeric_regexp;
+
+static int surbl_test_url (struct worker_task *task);
+
+int
+surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
+{
+ GError *err = NULL;
+
+ surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx));
+
+ surbl_module_ctx->header_filter = NULL;
+ surbl_module_ctx->mime_filter = NULL;
+ surbl_module_ctx->message_filter = NULL;
+ surbl_module_ctx->url_filter = surbl_test_url;
+ surbl_module_ctx->use_redirector = 0;
+ surbl_module_ctx->surbl_pool = memory_pool_new (1024);
+
+ surbl_module_ctx->hosters = g_hash_table_new (g_str_hash, g_str_equal);
+ /* Register destructors */
+ memory_pool_add_destructor (surbl_module_ctx->surbl_pool, (pool_destruct_func)g_hash_table_remove_all, surbl_module_ctx->hosters);
+
+ surbl_module_ctx->whitelist = g_hash_table_new (g_str_hash, g_str_equal);
+ /* Register destructors */
+ memory_pool_add_destructor (surbl_module_ctx->surbl_pool, (pool_destruct_func)g_hash_table_remove_all, surbl_module_ctx->whitelist);
+
+ /* Init matching regexps */
+ extract_hoster_regexp = g_regex_new ("([^.]+)\\.([^.]+)\\.([^.]+)$", G_REGEX_RAW, 0, &err);
+ extract_normal_regexp = g_regex_new ("([^.]+)\\.([^.]+)$", G_REGEX_RAW, 0, &err);
+ extract_numeric_regexp = g_regex_new ("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})$", G_REGEX_RAW, 0, &err);
+
+ *ctx = (struct module_ctx *)surbl_module_ctx;
+
+ return 0;
+}
+
+int
+surbl_module_config (struct config_file *cfg)
+{
+ struct hostent *hent;
+
+ char *value, *cur_tok, *str;
+
+ evdns_init ();
+
+ if ((value = get_module_opt (cfg, "surbl", "redirector")) != NULL) {
+ str = memory_pool_strdup (surbl_module_ctx->surbl_pool, value);
+ cur_tok = strsep (&str, ":");
+ if (!inet_aton (cur_tok, &surbl_module_ctx->redirector_addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent != NULL) {
+ memcpy((char *)&surbl_module_ctx->redirector_addr, hent->h_addr, sizeof(struct in_addr));
+ if (str != NULL) {
+ surbl_module_ctx->redirector_port = (uint16_t)strtoul (str, NULL, 10);
+ }
+ else {
+ surbl_module_ctx->redirector_port = DEFAULT_REDIRECTOR_PORT;
+ }
+ surbl_module_ctx->use_redirector = 1;
+ }
+ }
+ /* Free cur_tok as it is actually initial str after strsep */
+ free (cur_tok);
+ }
+ if ((value = get_module_opt (cfg, "surbl", "weight")) != NULL) {
+ surbl_module_ctx->weight = atoi (value);
+ }
+ else {
+ surbl_module_ctx->weight = DEFAULT_SURBL_WEIGHT;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "url_expire")) != NULL) {
+ surbl_module_ctx->url_expire = atoi (value);
+ }
+ else {
+ surbl_module_ctx->url_expire = DEFAULT_SURBL_URL_EXPIRE;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "redirector_connect_timeout")) != NULL) {
+ surbl_module_ctx->connect_timeout = parse_seconds (value);
+ }
+ else {
+ surbl_module_ctx->connect_timeout = DEFAULT_REDIRECTOR_CONNECT_TIMEOUT;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "redirector_read_timeout")) != NULL) {
+ surbl_module_ctx->read_timeout = parse_seconds (value);
+ }
+ else {
+ surbl_module_ctx->read_timeout = DEFAULT_REDIRECTOR_READ_TIMEOUT;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "max_urls")) != NULL) {
+ surbl_module_ctx->max_urls = atoi (value);
+ }
+ else {
+ surbl_module_ctx->max_urls = DEFAULT_SURBL_MAX_URLS;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "suffix")) != NULL) {
+ surbl_module_ctx->suffix = memory_pool_strdup (surbl_module_ctx->surbl_pool, value);
+ g_free (value);
+ }
+ else {
+ surbl_module_ctx->suffix = DEFAULT_SURBL_SUFFIX;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "symbol")) != NULL) {
+ surbl_module_ctx->symbol = memory_pool_strdup (surbl_module_ctx->surbl_pool, value);
+ g_free (value);
+ }
+ else {
+ surbl_module_ctx->symbol = DEFAULT_SURBL_SYMBOL;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "metric")) != NULL) {
+ surbl_module_ctx->metric = memory_pool_strdup (surbl_module_ctx->surbl_pool, value);
+ g_free (value);
+ }
+ else {
+ surbl_module_ctx->metric = DEFAULT_METRIC;
+ }
+ if ((value = get_module_opt (cfg, "surbl", "hostings")) != NULL) {
+ char comment_flag = 0;
+ str = value;
+ while (*value ++) {
+ if (*value == '#') {
+ comment_flag = 1;
+ }
+ if (*value == '\r' || *value == '\n' || *value == ',') {
+ if (!comment_flag && str != value) {
+ g_hash_table_insert (surbl_module_ctx->hosters, g_strstrip(str), hash_fill);
+ str = value + 1;
+ }
+ else if (*value != ',') {
+ comment_flag = 0;
+ str = value + 1;
+ }
+ }
+ }
+ }
+ if ((value = get_module_opt (cfg, "surbl", "whitelist")) != NULL) {
+ char comment_flag = 0;
+ str = value;
+ while (*value ++) {
+ if (*value == '#') {
+ comment_flag = 1;
+ }
+ if (*value == '\r' || *value == '\n' || *value == ',') {
+ if (!comment_flag && str != value) {
+ g_hash_table_insert (surbl_module_ctx->whitelist, g_strstrip(str), hash_fill);
+ str = value + 1;
+ }
+ else if (*value != ',') {
+ comment_flag = 0;
+ str = value + 1;
+ }
+ }
+ }
+ }
+}
+
+int
+surbl_module_reconfig (struct config_file *cfg)
+{
+ memory_pool_delete (surbl_module_ctx->surbl_pool);
+ surbl_module_ctx->surbl_pool = memory_pool_new (1024);
+
+ return surbl_module_config (cfg);
+}
+
+static char *
+format_surbl_request (char *hostname)
+{
+ GMatchInfo *info;
+ char *result;
+
+ result = g_malloc (strlen (hostname) + strlen (surbl_module_ctx->suffix) + 1);
+
+ /* First try to match numeric expression */
+ if (g_regex_match (extract_numeric_regexp, hostname, 0, &info) == TRUE) {
+ gchar *octet1, *octet2, *octet3, *octet4;
+ octet1 = g_match_info_fetch (info, 0);
+ g_match_info_next (info, NULL);
+ octet2 = g_match_info_fetch (info, 0);
+ g_match_info_next (info, NULL);
+ octet3 = g_match_info_fetch (info, 0);
+ g_match_info_next (info, NULL);
+ octet4 = g_match_info_fetch (info, 0);
+ g_match_info_free (info);
+ sprintf (result, "%s.%s.%s.%s.%s", octet4, octet3, octet2, octet1, surbl_module_ctx->suffix);
+ g_free (octet1);
+ g_free (octet2);
+ g_free (octet3);
+ g_free (octet4);
+ return result;
+ }
+ g_match_info_free (info);
+ /* Try to match normal domain */
+ if (g_regex_match (extract_normal_regexp, hostname, 0, &info) == TRUE) {
+ gchar *part1, *part2;
+ part1 = g_match_info_fetch (info, 0);
+ g_match_info_next (info, NULL);
+ part2 = g_match_info_fetch (info, 0);
+ g_match_info_free (info);
+ sprintf (result, "%s.%s", part1, part2);
+ if (g_hash_table_lookup (surbl_module_ctx->hosters, result) != NULL) {
+ /* Match additional part for hosters */
+ g_free (part1);
+ g_free (part2);
+ if (g_regex_match (extract_hoster_regexp, hostname, 0, &info) == TRUE) {
+ gchar *hpart1, *hpart2, *hpart3;
+ hpart1 = g_match_info_fetch (info, 0);
+ g_match_info_next (info, NULL);
+ hpart2 = g_match_info_fetch (info, 0);
+ g_match_info_next (info, NULL);
+ hpart3 = g_match_info_fetch (info, 0);
+ g_match_info_free (info);
+ sprintf (result, "%s.%s.%s.%s", hpart1, hpart2, hpart3, surbl_module_ctx->suffix);
+ g_free (hpart1);
+ g_free (hpart2);
+ g_free (hpart3);
+ return result;
+ }
+ return NULL;
+ }
+ g_free (part1);
+ g_free (part2);
+ return result;
+ }
+
+ return NULL;
+}
+
+static void
+dns_callback (int result, char type, int count, int ttl, void *addresses, void *data)
+{
+ struct memcached_param *param = (struct memcached_param *)data;
+
+ /* If we have result from DNS server, this url exists in SURBL, so increase score */
+ if (result != DNS_ERR_NONE || type != DNS_IPv4_A) {
+ msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix);
+ insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1);
+ }
+ else {
+ insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 0);
+ }
+
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+}
+
+static void
+memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ struct memcached_param *param = (struct memcached_param *)data;
+ int *url_count;
+ char *surbl_req;
+
+ switch (ctx->op) {
+ case CMD_CONNECT:
+ if (error != OK) {
+ msg_info ("memcached_callback: memcached returned error %s on CONNECT stage");
+ memc_close_ctx (param->ctx);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+ }
+ else {
+ memc_get (param->ctx, param->ctx->param);
+ }
+ break;
+ case CMD_READ:
+ if (error != OK) {
+ msg_info ("memcached_callback: memcached returned error %s on READ stage");
+ memc_close_ctx (param->ctx);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+ }
+ else {
+ url_count = (int *)param->ctx->param->buf;
+ /* Do not check DNS for urls that have count more than max_urls */
+ if (*url_count > surbl_module_ctx->max_urls) {
+ msg_info ("memcached_callback: url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls);
+ insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1);
+ }
+ (*url_count) ++;
+ memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire);
+ }
+ break;
+ case CMD_WRITE:
+ if (error != OK) {
+ msg_info ("memcached_callback: memcached returned error %s on WRITE stage");
+ }
+ memc_close_ctx (param->ctx);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ if ((surbl_req = format_surbl_request (param->url->host)) != NULL) {
+ param->task->save.saved ++;
+ evdns_resolve_ipv4 (surbl_req, DNS_QUERY_NO_SEARCH, dns_callback, (void *)param);
+ return;
+ }
+ g_free (param->ctx->param->buf);
+ g_free (param->ctx->param);
+ g_free (param->ctx);
+ g_free (param);
+ break;
+ }
+}
+
+static void
+register_memcached_call (struct uri *url, struct worker_task *task)
+{
+ struct memcached_param *param;
+ struct memcached_server *selected;
+ memcached_param_t *cur_param;
+ gchar *sum_str;
+ int *url_count;
+
+ param = g_malloc (sizeof (struct memcached_param));
+ cur_param = g_malloc (sizeof (memcached_param_t));
+ url_count = g_malloc (sizeof (int));
+
+ param->url = url;
+ param->task = task;
+
+ param->ctx = g_malloc (sizeof (memcached_ctx_t));
+ bzero (param->ctx, sizeof (memcached_ctx_t));
+ bzero (cur_param, sizeof (memcached_param_t));
+
+ cur_param->buf = (u_char *)url_count;
+ cur_param->bufsize = sizeof (int);
+
+ sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1);
+ strlcpy (cur_param->key, sum_str, sizeof (cur_param->key));
+ g_free (sum_str);
+
+ selected = (struct memcached_server *) get_upstream_by_hash ((void *)task->cfg->memcached_servers,
+ task->cfg->memcached_servers_num, sizeof (struct memcached_server),
+ time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors,
+ cur_param->key, strlen(cur_param->key));
+ param->ctx->callback = memcached_callback;
+ param->ctx->callback_data = (void *)param;
+ param->ctx->protocol = task->cfg->memcached_protocol;
+ memcpy(&param->ctx->addr, &selected->addr, sizeof (struct in_addr));
+ param->ctx->port = selected->port;
+ param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000;
+ param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000;
+ param->ctx->sock = -1;
+#ifdef WITH_DEBUG
+ param->ctx->options = MEMC_OPT_DEBUG;
+#else
+ param->ctx->options = 0;
+#endif
+ param->ctx->param = cur_param;
+ memc_init_ctx (param->ctx);
+}
+
+static void
+redirector_callback (int fd, short what, void *arg)
+{
+ struct redirector_param *param = (struct redirector_param *)arg;
+ char url_buf[1024];
+ int r;
+ struct timeval timeout;
+ char *p, *c;
+
+ switch (param->state) {
+ case STATE_CONNECT:
+ /* We have write readiness after connect call, so reinit event */
+ if (what == EV_WRITE) {
+ timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000;
+ timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000;
+ event_del (&param->ev);
+ event_set (&param->ev, param->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, redirector_callback, (void *)param);
+ event_add (&param->ev, &timeout);
+ r = snprintf (url_buf, sizeof (url_buf), "GET %s HTTP/1.0\r\n\r\n", struri (param->url));
+ write (param->sock, url_buf, r);
+ param->state = STATE_READ;
+ }
+ else {
+ event_del (&param->ev);
+ msg_info ("redirector_callback: connection to redirector timed out");
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param);
+ }
+ break;
+ case STATE_READ:
+ if (what == EV_READ) {
+ r = read (param->sock, url_buf, sizeof (url_buf));
+ if ((p = strstr (url_buf, "Uri: ")) != NULL) {
+ p += sizeof ("Uri: ") - 1;
+ c = p;
+ while (p++ < url_buf + sizeof (url_buf) - 1) {
+ if (*p == '\r' || *p == '\n') {
+ *p = '\0';
+ break;
+ }
+ }
+ if (*p == '\0') {
+ msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c);
+ parse_uri (param->url, c, param->task->task_pool);
+ register_memcached_call (param->url, param->task);
+ param->task->save.saved ++;
+ }
+ }
+ event_del (&param->ev);
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param);
+ }
+ else {
+ event_del (&param->ev);
+ msg_info ("redirector_callback: reading redirector timed out");
+ param->task->save.saved --;
+ if (param->task->save.saved == 0) {
+ /* Call other filters */
+ param->task->save.saved = 1;
+ process_filters (param->task);
+ }
+ g_free (param);
+ }
+ break;
+ }
+}
+
+
+static void
+register_redirector_call (struct uri *url, struct worker_task *task)
+{
+ struct sockaddr_in sc;
+ int ofl, r, s;
+ struct redirector_param *param;
+ struct timeval timeout;
+
+ bzero (&sc, sizeof (struct sockaddr_in *));
+ sc.sin_family = AF_INET;
+ sc.sin_port = surbl_module_ctx->redirector_port;
+ memcpy (&sc.sin_addr, &surbl_module_ctx->redirector_addr, sizeof (struct in_addr));
+
+ s = socket (PF_INET, SOCK_STREAM, 0);
+
+ if (s == -1) {
+ msg_info ("register_redirector_call: socket() failed: %m");
+ return;
+ }
+
+ /* set nonblocking */
+ ofl = fcntl(s, F_GETFL, 0);
+ fcntl(s, F_SETFL, ofl | O_NONBLOCK);
+
+ if ((r = connect (s, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) {
+ if (errno != EINPROGRESS) {
+ close (s);
+ msg_info ("register_redirector_call: connect() failed: %m");
+ }
+ }
+ param = g_malloc (sizeof (struct redirector_param));
+ param->url = url;
+ param->task = task;
+ param->state = STATE_READ;
+ param->sock = s;
+ timeout.tv_sec = surbl_module_ctx->connect_timeout / 1000;
+ timeout.tv_usec = surbl_module_ctx->connect_timeout - timeout.tv_sec * 1000;
+ event_set (&param->ev, s, EV_WRITE | EV_TIMEOUT, redirector_callback, (void *)param);
+ event_add (&param->ev, &timeout);
+}
+
+static int
+surbl_test_url (struct worker_task *task)
+{
+ struct uri *url;
+
+ TAILQ_FOREACH (url, &task->urls, next) {
+ if (surbl_module_ctx->use_redirector) {
+ register_redirector_call (url, task);
+ }
+ else {
+ register_memcached_call (url, task);
+ }
+ task->save.saved++;
+ }
+ return 0;
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/protocol.c b/src/protocol.c
new file mode 100644
index 000000000..b259f6cd9
--- /dev/null
+++ b/src/protocol.c
@@ -0,0 +1,492 @@
+#include <sys/types.h>
+#include <string.h>
+#include <stdlib.h>
+#include <glib.h>
+#include "main.h"
+
+#define CRLF "\r\n"
+/* Max line size as it is defined in rfc2822 */
+#define OUTBUFSIZ 1000
+/*
+ * Just check if the passed message is spam or not and reply as
+ * described below
+ */
+#define MSG_CMD_CHECK "check"
+/*
+ * Check if message is spam or not, and return score plus list
+ * of symbols hit
+ */
+#define MSG_CMD_SYMBOLS "symbols"
+/*
+ * Check if message is spam or not, and return score plus report
+ */
+#define MSG_CMD_REPORT "report"
+/*
+ * Check if message is spam or not, and return score plus report
+ * if the message is spam
+ */
+#define MSG_CMD_REPORT_IFSPAM "report_ifspam"
+/*
+ * Ignore this message -- client opened connection then changed
+ */
+#define MSG_CMD_SKIP "skip"
+/*
+ * Return a confirmation that spamd is alive
+ */
+#define MSG_CMD_PING "ping"
+/*
+ * Process this message as described above and return modified message
+ */
+#define MSG_CMD_PROCESS "process"
+
+/*
+ * spamassassin greeting:
+ */
+#define SPAMC_GREETING "SPAMC"
+/*
+ * rspamd greeting:
+ */
+#define RSPAMC_GREETING "RSPAMC"
+/*
+ * Headers
+ */
+#define CONTENT_LENGTH_HEADER "Content-Length"
+#define HELO_HEADER "Helo"
+#define FROM_HEADER "From"
+#define IP_ADDR_HEADER "IP"
+#define NRCPT_HEADER "Recipient-Number"
+#define RCPT_HEADER "Rcpt"
+#define ERROR_HEADER "Error"
+/*
+ * Reply messages
+ */
+#define RSPAMD_REPLY_BANNER "RSPAMD/1.0"
+#define SPAMD_REPLY_BANNER "SPAMD/1.1"
+#define SPAMD_OK "EX_OK"
+/* XXX: try to convert rspamd errors to spamd errors */
+#define SPAMD_ERROR "EX_ERROR"
+
+static int
+parse_command (struct worker_task *task, char *line)
+{
+ char *token;
+
+ token = strsep (&line, " ");
+ if (line == NULL || token == NULL) {
+ msg_debug ("parse_command: bad comand: %s", token);
+ return -1;
+ }
+
+ switch (token[0]) {
+ case 'c':
+ case 'C':
+ /* check */
+ if (strcasecmp (token + 1, MSG_CMD_CHECK + 1) == 0) {
+ task->cmd = CMD_CHECK;
+ }
+ else {
+ msg_debug ("parse_command: bad comand: %s", token);
+ return -1;
+ }
+ break;
+ case 's':
+ case 'S':
+ /* symbols, skip */
+ if (strcasecmp (token + 1, MSG_CMD_SYMBOLS + 1) == 0) {
+ task->cmd = CMD_SYMBOLS;
+ }
+ else if (strcasecmp (token + 1, MSG_CMD_SKIP + 1) == 0) {
+ task->cmd = CMD_SKIP;
+ }
+ else {
+ msg_debug ("parse_command: bad comand: %s", token);
+ return -1;
+ }
+ break;
+ case 'p':
+ case 'P':
+ /* ping, process */
+ if (strcasecmp (token + 1, MSG_CMD_PING + 1) == 0) {
+ task->cmd = CMD_PING;
+ }
+ else if (strcasecmp (token + 1, MSG_CMD_PROCESS + 1) == 0) {
+ task->cmd = CMD_PROCESS;
+ }
+ else {
+ msg_debug ("parse_command: bad comand: %s", token);
+ return -1;
+ }
+ break;
+ case 'r':
+ case 'R':
+ /* report, report_ifspam */
+ if (strcasecmp (token + 1, MSG_CMD_REPORT + 1) == 0) {
+ task->cmd = CMD_REPORT;
+ }
+ else if (strcasecmp (token + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
+ task->cmd = CMD_REPORT_IFSPAM;
+ }
+ else {
+ msg_debug ("parse_command: bad comand: %s", token);
+ return -1;
+ }
+ break;
+ default:
+ msg_debug ("parse_command: bad comand: %s", token);
+ return -1;
+ }
+
+ if (strncasecmp (line, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
+ task->proto = RSPAMC_PROTO;
+ }
+ else if (strncasecmp (line, SPAMC_GREETING, sizeof (SPAMC_GREETING) -1) == 0) {
+ task->proto = SPAMC_PROTO;
+ }
+ else {
+ msg_debug ("parse_command: bad protocol version: %s", line);
+ return -1;
+ }
+ task->state = READ_HEADER;
+ return 0;
+}
+
+static int
+parse_header (struct worker_task *task, char *line)
+{
+ char *headern, *err, *tmp;
+
+ /* Check end of headers */
+ if (*line == '\0') {
+ if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
+ task->state = WRITE_REPLY;
+ }
+ else {
+ if (task->content_length > 0) {
+ task->state = READ_MESSAGE;
+ }
+ else {
+ task->last_error = "Unknown content length";
+ task->error_code = RSPAMD_LENGTH_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ }
+ return 0;
+ }
+
+ headern = strsep (&line, ":");
+
+ if (line == NULL || headern == NULL) {
+ return -1;
+ }
+ /* Eat whitespaces */
+ g_strstrip (line);
+ g_strstrip (headern);
+
+ switch (headern[0]) {
+ case 'c':
+ case 'C':
+ /* content-length */
+ if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
+ task->content_length = strtoul (line, &err, 10);
+ task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_buf_t));
+ task->msg->buf = fstralloc (task->task_pool, task->content_length);
+ if (task->msg->buf == NULL) {
+ msg_err ("read_socket: cannot allocate memory for message buffer");
+ return -1;
+ }
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
+ case 'h':
+ case 'H':
+ /* helo */
+ if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
+ task->helo = memory_pool_strdup (task->task_pool, line);
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
+ case 'f':
+ case 'F':
+ /* from */
+ if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
+ task->from = memory_pool_strdup (task->task_pool, line);
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
+ case 'r':
+ case 'R':
+ /* rcpt */
+ if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
+ tmp = memory_pool_strdup (task->task_pool, line);
+ task->rcpt = g_list_prepend (task->rcpt, tmp);
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
+ case 'n':
+ case 'N':
+ /* nrcpt */
+ if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
+ task->nrcpt = strtoul (line, &err, 10);
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
+ case 'i':
+ case 'I':
+ /* ip_addr */
+ if (strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) {
+ if (!inet_aton (line, &task->from_addr)) {
+ msg_info ("parse_header: bad ip header: '%s'", line);
+ return -1;
+ }
+ }
+ else {
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+ break;
+ default:
+ msg_info ("parse_header: wrong header: %s", headern);
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+read_rspamd_input_line (struct worker_task *task, char *line)
+{
+ switch (task->state) {
+ case READ_COMMAND:
+ return parse_command (task, line);
+ break;
+ case READ_HEADER:
+ return parse_header (task, line);
+ break;
+ }
+}
+
+static void
+show_url_header (struct worker_task *task)
+{
+ int r = 0;
+ char outbuf[OUTBUFSIZ], c;
+ struct uri *url;
+ f_str_t host;
+
+ r = snprintf (outbuf, sizeof (outbuf), "Urls: ");
+ TAILQ_FOREACH (url, &task->urls, next) {
+ host.begin = url->host;
+ host.len = url->hostlen;
+ /* Skip long hosts to avoid protocol coollisions */
+ if (host.len > OUTBUFSIZ) {
+ continue;
+ }
+ /* Do header folding */
+ if (host.len + r >= OUTBUFSIZ - 3) {
+ outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' ';
+ bufferevent_write (task->bev, outbuf, r);
+ r = 0;
+ }
+ /* Write url host to buf */
+ if (TAILQ_NEXT (url, next) != NULL) {
+ c = *(host.begin + host.len);
+ *(host.begin + host.len) = '\0';
+ r += snprintf (outbuf, sizeof (outbuf) - r, "%s, ", host.begin);
+ *(host.begin + host.len) = c;
+ }
+ else {
+ c = *(host.begin + host.len);
+ *(host.begin + host.len) = '\0';
+ r += snprintf (outbuf, sizeof (outbuf) - r, "%s" CRLF, host.begin);
+ *(host.begin + host.len) = c;
+ }
+ }
+ bufferevent_write (task->bev, outbuf, r);
+}
+
+static void
+show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data)
+{
+ struct worker_task *task = (struct worker_task *)user_data;
+ int r;
+ char outbuf[OUTBUFSIZ];
+ struct metric_result *metric_res = (struct metric_result *)metric_value;
+ int is_spam = 0;
+
+ if (metric_res->score >= metric_res->metric->required_score) {
+ is_spam = 1;
+ }
+ if (task->proto == SPAMC_PROTO) {
+ r = snprintf (outbuf, sizeof (outbuf), "Spam: %s ; %.2f / %.2f" CRLF,
+ (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
+ }
+ else {
+ r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name,
+ (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
+ }
+ bufferevent_write (task->bev, outbuf, r);
+}
+
+static int
+write_check_reply (struct worker_task *task)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+ struct metric_result *metric_res;
+
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+ bufferevent_write (task->bev, outbuf, r);
+ if (task->proto == SPAMC_PROTO) {
+ /* Ignore metrics, just write report for 'default' metric */
+ metric_res = g_hash_table_lookup (task->results, "default");
+ if (metric_res == NULL) {
+ return -1;
+ }
+ else {
+ show_metric_result ((gpointer)"default", (gpointer)metric_res, (void *)task);
+ }
+ }
+ else {
+ /* Write result for each metric separately */
+ g_hash_table_foreach (task->results, show_metric_result, task);
+ /* URL stat */
+ show_url_header (task);
+ }
+ bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1);
+
+ return 0;
+}
+
+static void
+show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_data)
+{
+ struct worker_task *task = (struct worker_task *)user_data;
+ int r = 0;
+ char outbuf[OUTBUFSIZ];
+ GList *symbols = NULL, *cur;
+ struct metric_result *metric_res = (struct metric_result *)metric_value;
+
+ if (task->proto == RSPAMC_PROTO) {
+ r = snprintf (outbuf, sizeof (outbuf), "%s: ", (char *)metric_name);
+ }
+
+ symbols = g_hash_table_get_keys (metric_res->symbols);
+ cur = symbols;
+ while (cur) {
+ if (g_list_next (cur) != NULL) {
+ r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s,", (char *)cur->data);
+ }
+ else {
+ r += snprintf (outbuf + r, sizeof (outbuf) - r, "%s", (char *)cur->data);
+ }
+ cur = g_list_next (cur);
+ }
+ g_list_free (symbols);
+ outbuf[r++] = '\r'; outbuf[r] = '\n';
+ bufferevent_write (task->bev, outbuf, r);
+}
+
+static int
+write_symbols_reply (struct worker_task *task)
+{
+ struct metric_result *metric_res;
+
+ /* First of all write normal results by calling write_check_reply */
+ if (write_check_reply (task) == -1) {
+ return -1;
+ }
+ /* Now write symbols */
+ if (task->proto == SPAMC_PROTO) {
+ /* Ignore metrics, just write report for 'default' metric */
+ metric_res = g_hash_table_lookup (task->results, "default");
+ if (metric_res == NULL) {
+ return -1;
+ }
+ else {
+ show_metric_symbols ((gpointer)"default", (gpointer)metric_res, (void *)task);
+ }
+ }
+ else {
+ /* Write result for each metric separately */
+ g_hash_table_foreach (task->results, show_metric_symbols, task);
+ }
+ return 0;
+}
+
+static int
+write_process_reply (struct worker_task *task)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF,
+ (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, task->msg->buf->len);
+ bufferevent_write (task->bev, outbuf, r);
+ bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len);
+
+ return 0;
+}
+
+int
+write_reply (struct worker_task *task)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+
+ msg_debug ("write_reply: writing reply to client");
+ if (task->error_code != 0) {
+ /* Write error message and error code to reply */
+ if (task->proto == SPAMC_PROTO) {
+ r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF CRLF, SPAMD_REPLY_BANNER, task->error_code, SPAMD_ERROR);
+ msg_debug ("write_reply: writing error: %s", outbuf);
+ }
+ else {
+ r = snprintf (outbuf, sizeof (outbuf), "%s %d %s" CRLF "%s: %s" CRLF CRLF, RSPAMD_REPLY_BANNER, task->error_code,
+ SPAMD_ERROR, ERROR_HEADER, task->last_error);
+ msg_debug ("write_reply: writing error: %s", outbuf);
+ }
+ /* Write to bufferevent error message */
+ bufferevent_write (task->bev, outbuf, r);
+ }
+ else {
+ switch (task->cmd) {
+ case CMD_REPORT_IFSPAM:
+ case CMD_REPORT:
+ case CMD_CHECK:
+ return write_check_reply (task);
+ break;
+ case CMD_SYMBOLS:
+ return write_symbols_reply (task);
+ break;
+ case CMD_PROCESS:
+ return write_process_reply (task);
+ break;
+ case CMD_SKIP:
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
+ SPAMD_OK);
+ bufferevent_write (task->bev, outbuf, r);
+ break;
+ case CMD_PING:
+ r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
+ bufferevent_write (task->bev, outbuf, r);
+ break;
+ }
+ }
+
+ return 0;
+}
diff --git a/src/protocol.h b/src/protocol.h
new file mode 100644
index 000000000..6c750e91f
--- /dev/null
+++ b/src/protocol.h
@@ -0,0 +1,31 @@
+#ifndef RSPAMD_PROTOCOL_H
+#define RSPAMD_PROTOCOL_H
+
+#include "config.h"
+
+#define RSPAMD_FILTER_ERROR 1
+#define RSPAMD_NETWORK_ERROR 2
+#define RSPAMD_PROTOCOL_ERROR 3
+#define RSPAMD_LENGTH_ERROR 4
+
+struct worker_task;
+
+enum rspamd_protocol {
+ SPAMC_PROTO,
+ RSPAMC_PROTO,
+};
+
+enum rspamd_command {
+ CMD_CHECK,
+ CMD_SYMBOLS,
+ CMD_REPORT,
+ CMD_REPORT_IFSPAM,
+ CMD_SKIP,
+ CMD_PING,
+ CMD_PROCESS,
+};
+
+int read_rspamd_input_line (struct worker_task *task, char *line);
+int write_reply (struct worker_task *task);
+
+#endif
diff --git a/src/upstream.c b/src/upstream.c
new file mode 100644
index 000000000..87aab8535
--- /dev/null
+++ b/src/upstream.c
@@ -0,0 +1,521 @@
+#ifdef _THREAD_SAFE
+#include <pthread.h>
+#endif
+
+#include <sys/types.h>
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#ifdef HAVE_STDINT_H
+#include <stdint.h>
+#endif
+#ifdef HAVE_INTTYPES_H
+#include <inttypes.h>
+#endif
+#include <limits.h>
+#ifdef WITH_DEBUG
+#include <syslog.h>
+#endif
+#include "upstream.h"
+
+#ifdef WITH_DEBUG
+#define msg_debug(args...) syslog(LOG_DEBUG, ##args)
+#else
+#define msg_debug(args...) do {} while(0)
+#endif
+
+#ifdef _THREAD_SAFE
+pthread_rwlock_t upstream_mtx = PTHREAD_RWLOCK_INITIALIZER;
+#define U_RLOCK() do { pthread_rwlock_rdlock (&upstream_mtx); } while (0)
+#define U_WLOCK() do { pthread_rwlock_wrlock (&upstream_mtx); } while (0)
+#define U_UNLOCK() do { pthread_rwlock_unlock (&upstream_mtx); } while (0)
+#else
+#define U_RLOCK() do {} while (0)
+#define U_WLOCK() do {} while (0)
+#define U_UNLOCK() do {} while (0)
+#endif
+
+#define MAX_TRIES 20
+
+/*
+ * Poly: 0xedb88320
+ * Init: 0x0
+ */
+
+static const uint32_t crc32lookup[256] = {
+ 0x00000000U, 0x77073096U, 0xee0e612cU, 0x990951baU, 0x076dc419U, 0x706af48fU,
+ 0xe963a535U, 0x9e6495a3U, 0x0edb8832U, 0x79dcb8a4U, 0xe0d5e91eU, 0x97d2d988U,
+ 0x09b64c2bU, 0x7eb17cbdU, 0xe7b82d07U, 0x90bf1d91U, 0x1db71064U, 0x6ab020f2U,
+ 0xf3b97148U, 0x84be41deU, 0x1adad47dU, 0x6ddde4ebU, 0xf4d4b551U, 0x83d385c7U,
+ 0x136c9856U, 0x646ba8c0U, 0xfd62f97aU, 0x8a65c9ecU, 0x14015c4fU, 0x63066cd9U,
+ 0xfa0f3d63U, 0x8d080df5U, 0x3b6e20c8U, 0x4c69105eU, 0xd56041e4U, 0xa2677172U,
+ 0x3c03e4d1U, 0x4b04d447U, 0xd20d85fdU, 0xa50ab56bU, 0x35b5a8faU, 0x42b2986cU,
+ 0xdbbbc9d6U, 0xacbcf940U, 0x32d86ce3U, 0x45df5c75U, 0xdcd60dcfU, 0xabd13d59U,
+ 0x26d930acU, 0x51de003aU, 0xc8d75180U, 0xbfd06116U, 0x21b4f4b5U, 0x56b3c423U,
+ 0xcfba9599U, 0xb8bda50fU, 0x2802b89eU, 0x5f058808U, 0xc60cd9b2U, 0xb10be924U,
+ 0x2f6f7c87U, 0x58684c11U, 0xc1611dabU, 0xb6662d3dU, 0x76dc4190U, 0x01db7106U,
+ 0x98d220bcU, 0xefd5102aU, 0x71b18589U, 0x06b6b51fU, 0x9fbfe4a5U, 0xe8b8d433U,
+ 0x7807c9a2U, 0x0f00f934U, 0x9609a88eU, 0xe10e9818U, 0x7f6a0dbbU, 0x086d3d2dU,
+ 0x91646c97U, 0xe6635c01U, 0x6b6b51f4U, 0x1c6c6162U, 0x856530d8U, 0xf262004eU,
+ 0x6c0695edU, 0x1b01a57bU, 0x8208f4c1U, 0xf50fc457U, 0x65b0d9c6U, 0x12b7e950U,
+ 0x8bbeb8eaU, 0xfcb9887cU, 0x62dd1ddfU, 0x15da2d49U, 0x8cd37cf3U, 0xfbd44c65U,
+ 0x4db26158U, 0x3ab551ceU, 0xa3bc0074U, 0xd4bb30e2U, 0x4adfa541U, 0x3dd895d7U,
+ 0xa4d1c46dU, 0xd3d6f4fbU, 0x4369e96aU, 0x346ed9fcU, 0xad678846U, 0xda60b8d0U,
+ 0x44042d73U, 0x33031de5U, 0xaa0a4c5fU, 0xdd0d7cc9U, 0x5005713cU, 0x270241aaU,
+ 0xbe0b1010U, 0xc90c2086U, 0x5768b525U, 0x206f85b3U, 0xb966d409U, 0xce61e49fU,
+ 0x5edef90eU, 0x29d9c998U, 0xb0d09822U, 0xc7d7a8b4U, 0x59b33d17U, 0x2eb40d81U,
+ 0xb7bd5c3bU, 0xc0ba6cadU, 0xedb88320U, 0x9abfb3b6U, 0x03b6e20cU, 0x74b1d29aU,
+ 0xead54739U, 0x9dd277afU, 0x04db2615U, 0x73dc1683U, 0xe3630b12U, 0x94643b84U,
+ 0x0d6d6a3eU, 0x7a6a5aa8U, 0xe40ecf0bU, 0x9309ff9dU, 0x0a00ae27U, 0x7d079eb1U,
+ 0xf00f9344U, 0x8708a3d2U, 0x1e01f268U, 0x6906c2feU, 0xf762575dU, 0x806567cbU,
+ 0x196c3671U, 0x6e6b06e7U, 0xfed41b76U, 0x89d32be0U, 0x10da7a5aU, 0x67dd4accU,
+ 0xf9b9df6fU, 0x8ebeeff9U, 0x17b7be43U, 0x60b08ed5U, 0xd6d6a3e8U, 0xa1d1937eU,
+ 0x38d8c2c4U, 0x4fdff252U, 0xd1bb67f1U, 0xa6bc5767U, 0x3fb506ddU, 0x48b2364bU,
+ 0xd80d2bdaU, 0xaf0a1b4cU, 0x36034af6U, 0x41047a60U, 0xdf60efc3U, 0xa867df55U,
+ 0x316e8eefU, 0x4669be79U, 0xcb61b38cU, 0xbc66831aU, 0x256fd2a0U, 0x5268e236U,
+ 0xcc0c7795U, 0xbb0b4703U, 0x220216b9U, 0x5505262fU, 0xc5ba3bbeU, 0xb2bd0b28U,
+ 0x2bb45a92U, 0x5cb36a04U, 0xc2d7ffa7U, 0xb5d0cf31U, 0x2cd99e8bU, 0x5bdeae1dU,
+ 0x9b64c2b0U, 0xec63f226U, 0x756aa39cU, 0x026d930aU, 0x9c0906a9U, 0xeb0e363fU,
+ 0x72076785U, 0x05005713U, 0x95bf4a82U, 0xe2b87a14U, 0x7bb12baeU, 0x0cb61b38U,
+ 0x92d28e9bU, 0xe5d5be0dU, 0x7cdcefb7U, 0x0bdbdf21U, 0x86d3d2d4U, 0xf1d4e242U,
+ 0x68ddb3f8U, 0x1fda836eU, 0x81be16cdU, 0xf6b9265bU, 0x6fb077e1U, 0x18b74777U,
+ 0x88085ae6U, 0xff0f6a70U, 0x66063bcaU, 0x11010b5cU, 0x8f659effU, 0xf862ae69U,
+ 0x616bffd3U, 0x166ccf45U, 0xa00ae278U, 0xd70dd2eeU, 0x4e048354U, 0x3903b3c2U,
+ 0xa7672661U, 0xd06016f7U, 0x4969474dU, 0x3e6e77dbU, 0xaed16a4aU, 0xd9d65adcU,
+ 0x40df0b66U, 0x37d83bf0U, 0xa9bcae53U, 0xdebb9ec5U, 0x47b2cf7fU, 0x30b5ffe9U,
+ 0xbdbdf21cU, 0xcabac28aU, 0x53b39330U, 0x24b4a3a6U, 0xbad03605U, 0xcdd70693U,
+ 0x54de5729U, 0x23d967bfU, 0xb3667a2eU, 0xc4614ab8U, 0x5d681b02U, 0x2a6f2b94U,
+ 0xb40bbe37U, 0xc30c8ea1U, 0x5a05df1bU, 0x2d02ef8dU
+};
+
+/*
+ * Check upstream parameters and mark it whether valid or dead
+ */
+static void
+check_upstream (struct upstream *up, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors)
+{
+ if (up->dead) {
+ if (now - up->time >= revive_timeout) {
+ msg_debug ("check_upstream: reviving upstream after %ld seconds", (long int) now - up->time);
+ U_WLOCK ();
+ up->dead = 0;
+ up->errors = 0;
+ up->time = 0;
+ up->weight = up->priority;
+ U_UNLOCK ();
+ }
+ }
+ else {
+ if (now - up->time >= error_timeout && up->errors >= max_errors) {
+ msg_debug ("check_upstream: marking upstreams as dead after %ld errors", (long int) up->errors);
+ U_WLOCK ();
+ up->dead = 1;
+ up->time = now;
+ up->weight = 0;
+ U_UNLOCK ();
+ }
+ }
+}
+
+/*
+ * Call this function after failed upstream request
+ */
+void
+upstream_fail (struct upstream *up, time_t now)
+{
+ if (up->time != 0) {
+ up->errors ++;
+ }
+ else {
+ U_WLOCK ();
+ up->time = now;
+ up->errors ++;
+ U_UNLOCK ();
+ }
+}
+/*
+ * Call this function after successfull upstream request
+ */
+void
+upstream_ok (struct upstream *up, time_t now)
+{
+ if (up->errors != 0) {
+ U_WLOCK ();
+ up->errors = 0;
+ up->time = 0;
+ U_UNLOCK ();
+ }
+
+ up->weight --;
+}
+/*
+ * Mark all upstreams as active. This function is used when all upstreams are marked as inactive
+ */
+void
+revive_all_upstreams (void *ups, size_t members, size_t msize)
+{
+ int i;
+ struct upstream *cur;
+ u_char *p;
+
+ U_WLOCK ();
+ msg_debug ("revive_all_upstreams: starting reviving all upstreams");
+ p = ups;
+ for (i = 0; i < members; i++) {
+ cur = (struct upstream *)p;
+ cur->time = 0;
+ cur->errors = 0;
+ cur->dead = 0;
+ cur->weight = cur->priority;
+ p += msize;
+ }
+ U_UNLOCK ();
+}
+
+/*
+ * Scan all upstreams for errors and mark upstreams dead or alive depends on conditions,
+ * return number of alive upstreams
+ */
+static int
+rescan_upstreams (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors)
+{
+ int i, alive;
+ struct upstream *cur;
+ u_char *p;
+
+ /* Recheck all upstreams */
+ p = ups;
+ alive = members;
+ for (i = 0; i < members; i++) {
+ cur = (struct upstream *)p;
+ check_upstream (cur, now, error_timeout, revive_timeout, max_errors);
+ alive -= cur->dead;
+ p += msize;
+ }
+
+ /* All upstreams are dead */
+ if (alive == 0) {
+ revive_all_upstreams (ups, members, msize);
+ alive = members;
+ }
+
+ msg_debug ("rescan_upstreams: %d upstreams alive", alive);
+
+ return alive;
+
+}
+
+/* Return alive upstream by its number */
+static struct upstream *
+get_upstream_by_number (void *ups, size_t members, size_t msize, int selected)
+{
+ int i;
+ u_char *p, *c;
+ struct upstream *cur;
+
+ i = 0;
+ p = ups;
+ c = ups;
+ U_RLOCK ();
+ for (;;) {
+ /* Out of range, return NULL */
+ if (p > c + members * msize) {
+ break;
+ }
+
+ cur = (struct upstream *)p;
+ p += msize;
+
+ if (cur->dead) {
+ /* Skip inactive upstreams */
+ continue;
+ }
+ /* Return selected upstream */
+ if (i == selected) {
+ U_UNLOCK ();
+ return cur;
+ }
+ i++;
+ }
+ U_UNLOCK ();
+
+ /* Error */
+ return NULL;
+
+}
+
+/*
+ * Get hash key for specified key (perl hash)
+ */
+static uint32_t
+get_hash_for_key (uint32_t hash, char *key, size_t keylen)
+{
+ uint32_t h, index;
+ const char *end = key + keylen;
+
+ h = ~hash;
+
+ while (key < end) {
+ index = (h ^ (u_char) *key) & 0x000000ffU;
+ h = (h >> 8) ^ crc32lookup[index];
+ ++key;
+ }
+
+ return (~h);
+}
+
+/*
+ * Recheck all upstreams and return random active upstream
+ */
+struct upstream *
+get_random_upstream (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors)
+{
+ int alive, selected;
+
+ alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors);
+ selected = rand () % alive;
+ msg_debug ("get_random_upstream: return upstream with number %d of %d", selected, alive);
+
+ return get_upstream_by_number (ups, members, msize, selected);
+}
+
+/*
+ * Return upstream by hash, that is calculated from active upstreams number
+ */
+struct upstream *
+get_upstream_by_hash (void *ups, size_t members, size_t msize, time_t now,
+ time_t error_timeout, time_t revive_timeout, size_t max_errors,
+ char *key, size_t keylen)
+{
+ int alive, tries = 0, r;
+ uint32_t h = 0, ht;
+ char *p, numbuf[4];
+ struct upstream *cur;
+
+ alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors);
+
+ if (alive == 0) {
+ return NULL;
+ }
+
+ h = get_hash_for_key (0, key, keylen);
+#ifdef HASH_COMPAT
+ h = (h >> 16) & 0x7fff;
+#endif
+ h %= members;
+ msg_debug ("get_upstream_by_hash: try to select upstream number %d of %zd", h, members);
+
+ for (;;) {
+ p = (char *)ups + msize * h;
+ cur = (struct upstream *)p;
+ if (!cur->dead) {
+ break;
+ }
+ r = snprintf (numbuf, sizeof (numbuf), "%d", tries);
+ ht = get_hash_for_key (0, numbuf, r);
+ ht = get_hash_for_key (ht, key, keylen);
+#ifdef HASH_COMPAT
+ h += (ht >> 16) & 0x7fff;
+#else
+ h += ht;
+#endif
+ h %= members;
+ msg_debug ("get_upstream_by_hash: try to select upstream number %d of %zd, tries: %d", h, members, tries);
+ tries ++;
+ if (tries > MAX_TRIES) {
+ msg_debug ("get_upstream_by_hash: max tries exceed, returning NULL");
+ return NULL;
+ }
+ }
+
+ U_RLOCK ();
+ p = ups;
+ U_UNLOCK ();
+ return cur;
+}
+
+/*
+ * Recheck all upstreams and return upstream in round-robin order according to weight and priority
+ */
+struct upstream *
+get_upstream_round_robin (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors)
+{
+ int alive, max_weight, i;
+ struct upstream *cur, *selected = NULL;
+ u_char *p;
+
+ /* Recheck all upstreams */
+ alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors);
+
+ p = ups;
+ max_weight = 0;
+ selected = (struct upstream *)p;
+ U_RLOCK ();
+ for (i = 0; i < members; i++) {
+ cur = (struct upstream *)p;
+ if (!cur->dead) {
+ if (max_weight < cur->weight) {
+ max_weight = cur->weight;
+ selected = cur;
+ }
+ }
+ p += msize;
+ }
+ U_UNLOCK ();
+
+ if (max_weight == 0) {
+ p = ups;
+ U_WLOCK ();
+ for (i = 0; i < members; i++) {
+ cur = (struct upstream *)p;
+ cur->weight = cur->priority;
+ if (!cur->dead) {
+ if (max_weight < cur->priority) {
+ max_weight = cur->priority;
+ selected = cur;
+ }
+ }
+ p += msize;
+ }
+ U_UNLOCK ();
+ }
+ msg_debug ("get_upstream_round_robin: selecting upstream with weight %d", max_weight);
+
+ return selected;
+}
+
+/*
+ * Recheck all upstreams and return upstream in round-robin order according to only priority (master-slaves)
+ */
+struct upstream *
+get_upstream_master_slave (void *ups, size_t members, size_t msize, time_t now, time_t error_timeout, time_t revive_timeout, size_t max_errors)
+{
+ int alive, max_weight, i;
+ struct upstream *cur, *selected = NULL;
+ u_char *p;
+
+ /* Recheck all upstreams */
+ alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors);
+
+ p = ups;
+ max_weight = 0;
+ selected = (struct upstream *)p;
+ U_RLOCK ();
+ for (i = 0; i < members; i++) {
+ cur = (struct upstream *)p;
+ if (!cur->dead) {
+ if (max_weight < cur->priority) {
+ max_weight = cur->priority;
+ selected = cur;
+ }
+ }
+ p += msize;
+ }
+ U_UNLOCK ();
+ msg_debug ("get_upstream_master_slave: selecting upstream with priority %d", max_weight);
+
+ return selected;
+}
+
+/*
+ * Ketama manipulation functions
+ */
+
+static int
+ketama_sort_cmp (const void *a1, const void *a2)
+{
+ return *((uint32_t *)a1) - *((uint32_t *)a2);
+}
+
+/*
+ * Add ketama points for specified upstream
+ */
+int
+upstream_ketama_add (struct upstream *up, char *up_key, size_t keylen, size_t keypoints)
+{
+ uint32_t h = 0;
+ char tmp[4];
+ int i;
+
+ /* Allocate ketama points array */
+ if (up->ketama_points == NULL) {
+ up->ketama_points_size = keypoints;
+ up->ketama_points = malloc (sizeof (uint32_t) * up->ketama_points_size);
+ if (up->ketama_points == NULL) {
+ return -1;
+ }
+ }
+
+ h = get_hash_for_key (h, up_key, keylen);
+
+ for (i = 0; i < keypoints; i++) {
+ tmp[0] = i & 0xff;
+ tmp[1] = (i >> 8) & 0xff;
+ tmp[2] = (i >> 16) & 0xff;
+ tmp[3] = (i >> 24) & 0xff;
+
+ h = get_hash_for_key (h, tmp, sizeof (tmp) * sizeof (char));
+ up->ketama_points[i] = h;
+ }
+ /* Keep points sorted */
+ qsort (up->ketama_points, keypoints, sizeof (uint32_t), ketama_sort_cmp);
+
+ return 0;
+}
+
+/*
+ * Return upstream by hash and find nearest ketama point in some server
+ */
+struct upstream *
+get_upstream_by_hash_ketama (void *ups, size_t members, size_t msize, time_t now,
+ time_t error_timeout, time_t revive_timeout, size_t max_errors,
+ char *key, size_t keylen)
+{
+ int alive, i;
+ uint32_t h = 0, step, middle, d, min_diff = UINT_MAX;
+ char *p;
+ struct upstream *cur = NULL, *nearest = NULL;
+
+ alive = rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout, max_errors);
+
+ if (alive == 0) {
+ return NULL;
+ }
+
+ h = get_hash_for_key (h, key, keylen);
+
+ U_RLOCK ();
+ p = ups;
+ nearest = (struct upstream *)p;
+ for (i = 0; i < members; i++) {
+ cur = (struct upstream *)p;
+ if (!cur->dead && cur->ketama_points != NULL) {
+ /* Find nearest ketama point for this key */
+ step = cur->ketama_points_size / 2;
+ middle = step;
+ while (step != 1) {
+ d = cur->ketama_points[middle] - h;
+ if (abs (d) < min_diff) {
+ min_diff = abs (d);
+ nearest = cur;
+ }
+ step /= 2;
+ if (d > 0) {
+ middle -= step;
+ }
+ else {
+ middle += step;
+ }
+ }
+ }
+ }
+ U_UNLOCK ();
+ return nearest;
+}
+
+#undef U_LOCK
+#undef U_UNLOCK
+#undef msg_debug
+/*
+ * vi:ts=4
+ */
diff --git a/src/upstream.h b/src/upstream.h
new file mode 100644
index 000000000..a6c6b2200
--- /dev/null
+++ b/src/upstream.h
@@ -0,0 +1,43 @@
+#ifndef UPSTREAM_H
+#define UPSTREAM_H
+
+#include <sys/types.h>
+#include <stdint.h>
+
+struct upstream {
+ unsigned int errors;
+ time_t time;
+ unsigned char dead;
+ unsigned char priority;
+ int16_t weight;
+ uint32_t *ketama_points;
+ size_t ketama_points_size;
+};
+
+void upstream_fail (struct upstream *up, time_t now);
+void upstream_ok (struct upstream *up, time_t now);
+void revive_all_upstreams (void *ups, size_t members, size_t msize);
+int upstream_ketama_add (struct upstream *up, char *up_key, size_t keylen, size_t keypoints);
+
+struct upstream* get_random_upstream (void *ups, size_t members, size_t msize,
+ time_t now, time_t error_timeout,
+ time_t revive_timeout, size_t max_errors);
+
+struct upstream* get_upstream_by_hash (void *ups, size_t members, size_t msize,
+ time_t now, time_t error_timeout,
+ time_t revive_timeout, size_t max_errors,
+ char *key, size_t keylen);
+
+struct upstream* get_upstream_round_robin (void *ups, size_t members, size_t msize,
+ time_t now, time_t error_timeout,
+ time_t revive_timeout, size_t max_errors);
+
+struct upstream* get_upstream_by_hash_ketama (void *ups, size_t members, size_t msize, time_t now,
+ time_t error_timeout, time_t revive_timeout, size_t max_errors,
+ char *key, size_t keylen);
+
+
+#endif /* UPSTREAM_H */
+/*
+ * vi:ts=4
+ */
diff --git a/src/url.c b/src/url.c
new file mode 100644
index 000000000..83ee0195a
--- /dev/null
+++ b/src/url.c
@@ -0,0 +1,886 @@
+#include <sys/types.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <errno.h>
+#include <syslog.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <string.h>
+
+#include "url.h"
+#include "fstring.h"
+#include "main.h"
+
+#define POST_CHAR 1
+#define POST_CHAR_S "\001"
+
+/* Tcp port range */
+#define LOWEST_PORT 0
+#define HIGHEST_PORT 65535
+
+#define uri_port_is_valid(port) \
+ (LOWEST_PORT <= (port) && (port) <= HIGHEST_PORT)
+
+struct _proto {
+ unsigned char *name;
+ int port;
+ uintptr_t *unused;
+ unsigned int need_slashes:1;
+ unsigned int need_slash_after_host:1;
+ unsigned int free_syntax:1;
+ unsigned int need_ssl:1;
+};
+
+static const char *text_url = "((https?|ftp)://)?"
+"(\\b(?<![.\\@A-Za-z0-9-])"
+"(?: [A-Za-z0-9][A-Za-z0-9-]*(?:\\.[A-Za-z0-9-]+)*\\."
+"(?i:com|net|org|biz|edu|gov|info|name|int|mil|aero|coop|jobs|mobi|museum|pro|travel"
+"|[rs]u|uk|ua|by|de|jp|fr|fi|no|no|ca|it|ro|cn|nl|at|nu|se"
+"|[a-z]{2}"
+"(?(1)|(?=/)))"
+"(?!\\w)"
+"|(?:\\d{1,3}\\.){3}\\d{1,3}(?(1)|(?=[/:]))"
+")"
+"(?::\\d{1,5})?" /* port */
+"(?!\\.\\w)" /* host part ended, no more of this further on */
+"(?:[/?][;/?:@&=+\\$,[\\]\\-_.!~*'()A-Za-z0-9#%]*)?" /* path (&query) */
+"(?<![\\s>?!),.'\"\\]:])"
+"(?!@)"
+")";
+static const char *html_url = "(?: src|href)=\"("
+"((https?|ftp)://)?"
+"(\\b(?<![.\\@A-Za-z0-9-])"
+"(?: [A-Za-z0-9][A-Za-z0-9-]*(?:\\.[A-Za-z0-9-]+)*\\."
+"(?i:com|net|org|biz|edu|gov|info|name|int|mil|aero|coop|jobs|mobi|museum|pro|travel"
+"|[rs]u|uk|ua|by|de|jp|fr|fi|no|no|ca|it|ro|cn|nl|at|nu|se"
+"|[a-z]{2}"
+"(?(1)|(?=/)))"
+"(?!\\w)"
+"|(?:\\d{1,3}\\.){3}\\d{1,3}(?(1)|(?=[/:]))"
+")"
+"(?::\\d{1,5})?" /* port */
+"(?!\\.\\w)" /* host part ended, no more of this further on */
+"(?:[/?][;/?:@&=+\\$,[\\]\\-_.!~*'()A-Za-z0-9#%]*)?" /* path (&query) */
+"(?<![\\s>?!),.'\"\\]:])"
+"(?!@)"
+"))\"";
+
+static short url_initialized = 0;
+GRegex *text_re, *html_re;
+
+static const struct _proto protocol_backends[] = {
+ { "file", 0, NULL, 1, 0, 0, 0 },
+ { "ftp", 21, NULL, 1, 1, 0, 0 },
+ { "http", 80, NULL, 1, 1, 0, 0 },
+ { "https", 443, NULL, 1, 1, 0, 1 },
+
+ /* Keep these last! */
+ { NULL, 0, NULL, 0, 0, 1, 0 },
+};
+
+/*
+ Table of "reserved" and "unsafe" characters. Those terms are
+ rfc1738-speak, as such largely obsoleted by rfc2396 and later
+ specs, but the general idea remains.
+
+ A reserved character is the one that you can't decode without
+ changing the meaning of the URL. For example, you can't decode
+ "/foo/%2f/bar" into "/foo///bar" because the number and contents of
+ path components is different. Non-reserved characters can be
+ changed, so "/foo/%78/bar" is safe to change to "/foo/x/bar". The
+ unsafe characters are loosely based on rfc1738, plus "$" and ",",
+ as recommended by rfc2396, and minus "~", which is very frequently
+ used (and sometimes unrecognized as %7E by broken servers).
+
+ An unsafe character is the one that should be encoded when URLs are
+ placed in foreign environments. E.g. space and newline are unsafe
+ in HTTP contexts because HTTP uses them as separator and line
+ terminator, so they must be encoded to %20 and %0A respectively.
+ "*" is unsafe in shell context, etc.
+
+ We determine whether a character is unsafe through static table
+ lookup. This code assumes ASCII character set and 8-bit chars. */
+
+enum {
+ /* rfc1738 reserved chars + "$" and ",". */
+ urlchr_reserved = 1,
+
+ /* rfc1738 unsafe chars, plus non-printables. */
+ urlchr_unsafe = 2
+};
+
+#define urlchr_test(c, mask) (urlchr_table[(unsigned char)(c)] & (mask))
+#define URL_RESERVED_CHAR(c) urlchr_test(c, urlchr_reserved)
+#define URL_UNSAFE_CHAR(c) urlchr_test(c, urlchr_unsafe)
+/* Convert an ASCII hex digit to the corresponding number between 0
+ and 15. H should be a hexadecimal digit that satisfies isxdigit;
+ otherwise, the result is undefined. */
+#define XDIGIT_TO_NUM(h) ((h) < 'A' ? (h) - '0' : toupper (h) - 'A' + 10)
+#define X2DIGITS_TO_NUM(h1, h2) ((XDIGIT_TO_NUM (h1) << 4) + XDIGIT_TO_NUM (h2))
+/* The reverse of the above: convert a number in the [0, 16) range to
+ the ASCII representation of the corresponding hexadecimal digit.
+ `+ 0' is there so you can't accidentally use it as an lvalue. */
+#define XNUM_TO_DIGIT(x) ("0123456789ABCDEF"[x] + 0)
+#define XNUM_TO_digit(x) ("0123456789abcdef"[x] + 0)
+
+/* Shorthands for the table: */
+#define R urlchr_reserved
+#define U urlchr_unsafe
+#define RU R|U
+
+static const unsigned char urlchr_table[256] =
+{
+ U, U, U, U, U, U, U, U, /* NUL SOH STX ETX EOT ENQ ACK BEL */
+ U, U, U, U, U, U, U, U, /* BS HT LF VT FF CR SO SI */
+ U, U, U, U, U, U, U, U, /* DLE DC1 DC2 DC3 DC4 NAK SYN ETB */
+ U, U, U, U, U, U, U, U, /* CAN EM SUB ESC FS GS RS US */
+ U, 0, U, RU, R, U, R, 0, /* SP ! " # $ % & ' */
+ 0, 0, 0, R, R, 0, 0, R, /* ( ) * + , - . / */
+ 0, 0, 0, 0, 0, 0, 0, 0, /* 0 1 2 3 4 5 6 7 */
+ 0, 0, RU, R, U, R, U, R, /* 8 9 : ; < = > ? */
+ RU, 0, 0, 0, 0, 0, 0, 0, /* @ A B C D E F G */
+ 0, 0, 0, 0, 0, 0, 0, 0, /* H I J K L M N O */
+ 0, 0, 0, 0, 0, 0, 0, 0, /* P Q R S T U V W */
+ 0, 0, 0, RU, U, RU, U, 0, /* X Y Z [ \ ] ^ _ */
+ U, 0, 0, 0, 0, 0, 0, 0, /* ` a b c d e f g */
+ 0, 0, 0, 0, 0, 0, 0, 0, /* h i j k l m n o */
+ 0, 0, 0, 0, 0, 0, 0, 0, /* p q r s t u v w */
+ 0, 0, 0, U, U, U, 0, U, /* x y z { | } ~ DEL */
+
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+ U, U, U, U, U, U, U, U, U, U, U, U, U, U, U, U,
+};
+#undef R
+#undef U
+#undef RU
+
+static inline int
+end_of_dir(unsigned char c)
+{
+ return c == POST_CHAR || c == '#' || c == ';' || c == '?';
+}
+
+static inline int
+is_uri_dir_sep(struct uri *uri, unsigned char pos)
+{
+ return (pos == '/');
+}
+
+static int
+check_uri_file(unsigned char *name)
+{
+ static const unsigned char chars[] = POST_CHAR_S "#?";
+
+ return strcspn(name, chars);
+}
+
+static int
+url_init (void)
+{
+ GError *err = NULL;
+ if (url_initialized == 0) {
+ text_re = g_regex_new (text_url, G_REGEX_CASELESS | G_REGEX_MULTILINE | G_REGEX_OPTIMIZE | G_REGEX_EXTENDED, 0, &err);
+ if (err != NULL) {
+ msg_info ("url_init: cannot init text url parsing regexp: %s", err->message);
+ g_error_free (err);
+ return -1;
+ }
+ html_re = g_regex_new (html_url, G_REGEX_CASELESS | G_REGEX_MULTILINE | G_REGEX_OPTIMIZE | G_REGEX_EXTENDED, 0, &err);
+ if (err != NULL) {
+ msg_info ("url_init: cannot init html url parsing regexp: %s", err->message);
+ g_error_free (err);
+ return -1;
+ }
+ url_initialized = 1;
+ }
+
+ return 0;
+}
+
+enum protocol
+get_protocol(unsigned char *name, int namelen)
+{
+ /* These are really enum protocol values but can take on negative
+ * values and since 0 <= -1 for enum values it's better to use clean
+ * integer type. */
+ int start, end;
+ enum protocol protocol;
+ unsigned char *pname;
+ int pnamelen, minlen, compare;
+
+ /* Almost dichotomic search is used here */
+ /* Starting at the HTTP entry which is the most common that will make
+ * file and NNTP the next entries checked and amongst the third checks
+ * are proxy and FTP. */
+ start = 0;
+ end = PROTOCOL_UNKNOWN - 1;
+ protocol = PROTOCOL_HTTP;
+
+ while (start <= end) {
+ pname = protocol_backends[protocol].name;
+ pnamelen = strlen (pname);
+ minlen = MIN (pnamelen, namelen);
+ compare = strncasecmp (pname, name, minlen);
+
+ if (compare == 0) {
+ if (pnamelen == namelen)
+ return protocol;
+
+ /* If the current protocol name is longer than the
+ * protocol name being searched for move @end else move
+ * @start. */
+ compare = pnamelen > namelen ? 1 : -1;
+ }
+
+ if (compare > 0)
+ end = protocol - 1;
+ else
+ start = protocol + 1;
+
+ protocol = (start + end) / 2;
+ }
+
+ return PROTOCOL_UNKNOWN;
+}
+
+
+int
+get_protocol_port(enum protocol protocol)
+{
+ return protocol_backends[protocol].port;
+}
+
+int
+get_protocol_need_slashes(enum protocol protocol)
+{
+ return protocol_backends[protocol].need_slashes;
+}
+
+int
+get_protocol_need_slash_after_host(enum protocol protocol)
+{
+ return protocol_backends[protocol].need_slash_after_host;
+}
+
+int
+get_protocol_free_syntax(enum protocol protocol)
+{
+ return protocol_backends[protocol].free_syntax;
+}
+
+static int
+get_protocol_length(const unsigned char *url)
+{
+ unsigned char *end = (unsigned char *) url;
+
+ /* Seek the end of the protocol name if any. */
+ /* RFC1738:
+ * scheme = 1*[ lowalpha | digit | "+" | "-" | "." ]
+ * (but per its recommendations we accept "upalpha" too) */
+ while (isalnum(*end) || *end == '+' || *end == '-' || *end == '.')
+ end++;
+
+ /* Also return 0 if there's no protocol name (@end == @url). */
+ return (*end == ':') ? end - url : 0;
+}
+
+/* URL-unescape the string S.
+
+ This is done by transforming the sequences "%HH" to the character
+ represented by the hexadecimal digits HH. If % is not followed by
+ two hexadecimal digits, it is inserted literally.
+
+ The transformation is done in place. If you need the original
+ string intact, make a copy before calling this function. */
+
+static void
+url_unescape (char *s)
+{
+ char *t = s; /* t - tortoise */
+ char *h = s; /* h - hare */
+
+ for (; *h; h++, t++) {
+ if (*h != '%') {
+ copychar:
+ *t = *h;
+ }
+ else {
+ char c;
+ /* Do nothing if '%' is not followed by two hex digits. */
+ if (!h[1] || !h[2] || !(isxdigit (h[1]) && isxdigit (h[2])))
+ goto copychar;
+ c = X2DIGITS_TO_NUM (h[1], h[2]);
+ /* Don't unescape %00 because there is no way to insert it
+ * into a C string without effectively truncating it. */
+ if (c == '\0')
+ goto copychar;
+ *t = c;
+ h += 2;
+ }
+ }
+ *t = '\0';
+}
+
+/* The core of url_escape_* functions. Escapes the characters that
+ match the provided mask in urlchr_table.
+
+ If ALLOW_PASSTHROUGH is non-zero, a string with no unsafe chars
+ will be returned unchanged. If ALLOW_PASSTHROUGH is zero, a
+ freshly allocated string will be returned in all cases. */
+
+static char *
+url_escape_1 (const char *s, unsigned char mask, int allow_passthrough, memory_pool_t *pool)
+{
+ const char *p1;
+ char *p2, *newstr;
+ int newlen;
+ int addition = 0;
+
+ for (p1 = s; *p1; p1++)
+ if (urlchr_test (*p1, mask))
+ addition += 2; /* Two more characters (hex digits) */
+
+ if (!addition) {
+ if (allow_passthrough) {
+ return (char *)s;
+ }
+ else {
+ return memory_pool_strdup (pool, s);
+ }
+ }
+
+ newlen = (p1 - s) + addition;
+ newstr = (char *) memory_pool_alloc (pool, newlen + 1);
+
+ p1 = s;
+ p2 = newstr;
+ while (*p1) {
+ /* Quote the characters that match the test mask. */
+ if (urlchr_test (*p1, mask)) {
+ unsigned char c = *p1++;
+ *p2++ = '%';
+ *p2++ = XNUM_TO_DIGIT (c >> 4);
+ *p2++ = XNUM_TO_DIGIT (c & 0xf);
+ }
+ else
+ *p2++ = *p1++;
+ }
+ *p2 = '\0';
+
+ return newstr;
+}
+
+/* URL-escape the unsafe characters (see urlchr_table) in a given
+ string, returning a freshly allocated string. */
+
+char *
+url_escape (const char *s, memory_pool_t *pool)
+{
+ return url_escape_1 (s, urlchr_unsafe, 0, pool);
+}
+
+/* URL-escape the unsafe characters (see urlchr_table) in a given
+ string. If no characters are unsafe, S is returned. */
+
+static char *
+url_escape_allow_passthrough (const char *s, memory_pool_t *pool)
+{
+ return url_escape_1 (s, urlchr_unsafe, 1, pool);
+}
+
+/* Decide whether the char at position P needs to be encoded. (It is
+ not enough to pass a single char *P because the function may need
+ to inspect the surrounding context.)
+
+ Return 1 if the char should be escaped as %XX, 0 otherwise. */
+
+static inline int
+char_needs_escaping (const char *p)
+{
+ if (*p == '%') {
+ if (isxdigit (*(p + 1)) && isxdigit (*(p + 2)))
+ return 0;
+ else
+ /* Garbled %.. sequence: encode `%'. */
+ return 1;
+ }
+ else if (URL_UNSAFE_CHAR (*p) && !URL_RESERVED_CHAR (*p))
+ return 1;
+ else
+ return 0;
+}
+
+/* Translate a %-escaped (but possibly non-conformant) input string S
+ into a %-escaped (and conformant) output string. If no characters
+ are encoded or decoded, return the same string S; otherwise, return
+ a freshly allocated string with the new contents.
+
+ After a URL has been run through this function, the protocols that
+ use `%' as the quote character can use the resulting string as-is,
+ while those that don't can use url_unescape to get to the intended
+ data. This function is stable: once the input is transformed,
+ further transformations of the result yield the same output.
+*/
+
+static char *
+reencode_escapes (const char *s, memory_pool_t *pool)
+{
+ const char *p1;
+ char *newstr, *p2;
+ int oldlen, newlen;
+
+ int encode_count = 0;
+
+ /* First pass: inspect the string to see if there's anything to do,
+ and to calculate the new length. */
+ for (p1 = s; *p1; p1++)
+ if (char_needs_escaping (p1))
+ ++encode_count;
+
+ if (!encode_count) {
+ /* The string is good as it is. */
+ return memory_pool_strdup (pool, s);
+ }
+
+ oldlen = p1 - s;
+ /* Each encoding adds two characters (hex digits). */
+ newlen = oldlen + 2 * encode_count;
+ newstr = memory_pool_alloc (pool, newlen + 1);
+
+ /* Second pass: copy the string to the destination address, encoding
+ chars when needed. */
+ p1 = s;
+ p2 = newstr;
+
+ while (*p1)
+ if (char_needs_escaping (p1)) {
+ unsigned char c = *p1++;
+ *p2++ = '%';
+ *p2++ = XNUM_TO_DIGIT (c >> 4);
+ *p2++ = XNUM_TO_DIGIT (c & 0xf);
+ }
+ else {
+ *p2++ = *p1++;
+ }
+
+ *p2 = '\0';
+ return newstr;
+}
+/* Unescape CHR in an otherwise escaped STR. Used to selectively
+ escaping of certain characters, such as "/" and ":". Returns a
+ count of unescaped chars. */
+
+static void
+unescape_single_char (char *str, char chr)
+{
+ const char c1 = XNUM_TO_DIGIT (chr >> 4);
+ const char c2 = XNUM_TO_DIGIT (chr & 0xf);
+ char *h = str; /* hare */
+ char *t = str; /* tortoise */
+
+ for (; *h; h++, t++) {
+ if (h[0] == '%' && h[1] == c1 && h[2] == c2) {
+ *t = chr;
+ h += 2;
+ }
+ else {
+ *t = *h;
+ }
+ }
+ *t = '\0';
+}
+
+/* Escape unsafe and reserved characters, except for the slash
+ characters. */
+
+static char *
+url_escape_dir (const char *dir, memory_pool_t *pool)
+{
+ char *newdir = url_escape_1 (dir, urlchr_unsafe | urlchr_reserved, 1, pool);
+ if (newdir == dir)
+ return (char *)dir;
+
+ unescape_single_char (newdir, '/');
+ return newdir;
+}
+
+/* Resolve "." and ".." elements of PATH by destructively modifying
+ PATH and return non-zero if PATH has been modified, zero otherwise.
+
+ The algorithm is in spirit similar to the one described in rfc1808,
+ although implemented differently, in one pass. To recap, path
+ elements containing only "." are removed, and ".." is taken to mean
+ "back up one element". Single leading and trailing slashes are
+ preserved.
+
+ For example, "a/b/c/./../d/.." will yield "a/b/". More exhaustive
+ test examples are provided below. If you change anything in this
+ function, run test_path_simplify to make sure you haven't broken a
+ test case. */
+
+static int
+path_simplify (char *path)
+{
+ char *h = path; /* hare */
+ char *t = path; /* tortoise */
+ char *beg = path; /* boundary for backing the tortoise */
+ char *end = path + strlen (path);
+
+ while (h < end) {
+ /* Hare should be at the beginning of a path element. */
+ if (h[0] == '.' && (h[1] == '/' || h[1] == '\0')) {
+ /* Ignore "./". */
+ h += 2;
+ }
+ else if (h[0] == '.' && h[1] == '.' && (h[2] == '/' || h[2] == '\0')) {
+ /* Handle "../" by retreating the tortoise by one path
+ element -- but not past beggining. */
+ if (t > beg) {
+ /* Move backwards until T hits the beginning of the
+ previous path element or the beginning of path. */
+ for (--t; t > beg && t[-1] != '/'; t--);
+ }
+ else {
+ /* If we're at the beginning, copy the "../" literally
+ move the beginning so a later ".." doesn't remove
+ it. */
+ beg = t + 3;
+ goto regular;
+ }
+ h += 3;
+ }
+ else {
+ regular:
+ /* A regular path element. If H hasn't advanced past T,
+ simply skip to the next path element. Otherwise, copy
+ the path element until the next slash. */
+ if (t == h) {
+ /* Skip the path element, including the slash. */
+ while (h < end && *h != '/')
+ t++, h++;
+ if (h < end)
+ t++, h++;
+ }
+ else {
+ /* Copy the path element, including the final slash. */
+ while (h < end && *h != '/')
+ *t++ = *h++;
+ if (h < end)
+ *t++ = *h++;
+ }
+ }
+ }
+
+ if (t != h)
+ *t = '\0';
+
+ return t != h;
+}
+
+enum uri_errno
+parse_uri(struct uri *uri, unsigned char *uristring, memory_pool_t *pool)
+{
+ unsigned char *prefix_end, *host_end, *p;
+ unsigned char *lbracket, *rbracket;
+ int datalen, n, addrlen;
+ unsigned char *frag_or_post, *user_end, *port_end;
+
+ memset (uri, 0, sizeof (*uri));
+
+ /* Nothing to do for an empty url. */
+ if (!*uristring) return URI_ERRNO_EMPTY;
+
+ uri->string = reencode_escapes (uristring, pool);
+ msg_debug ("parse_uri: reencoding escapes in original url: '%s'", struri (uri));
+ uri->protocollen = get_protocol_length (struri (uri));
+
+ /* Assume http as default protocol */
+ if (!uri->protocollen || (uri->protocol = get_protocol (struri(uri), uri->protocollen)) == PROTOCOL_UNKNOWN) {
+ p = g_strconcat ("http://", uri->string, NULL);
+ g_free (uri->string);
+ uri->string = p;
+ uri->protocol = PROTOCOL_HTTP;
+ prefix_end = struri (uri) + 7;
+ }
+ else {
+ /* Figure out whether the protocol is known */
+ msg_debug ("parse_uri: getting protocol from url: %d", uri->protocol);
+
+ prefix_end = struri (uri) + uri->protocollen; /* ':' */
+
+ /* Check if there's a digit after the protocol name. */
+ if (isdigit (*prefix_end)) {
+ p = struri (uri);
+ uri->ip_family = p[uri->protocollen] - '0';
+ prefix_end++;
+ }
+ if (*prefix_end != ':') {
+ msg_debug ("parse_uri: invalid protocol in uri");
+ return URI_ERRNO_INVALID_PROTOCOL;
+ }
+ prefix_end++;
+
+ /* Skip slashes */
+
+ if (prefix_end[0] == '/' && prefix_end[1] == '/') {
+ if (prefix_end[2] == '/') {
+ msg_debug ("parse_uri: too many '/' in uri");
+ return URI_ERRNO_TOO_MANY_SLASHES;
+ }
+
+ prefix_end += 2;
+
+ } else {
+ msg_debug ("parse_uri: no '/' in uri");
+ return URI_ERRNO_NO_SLASHES;
+ }
+ }
+
+ if (get_protocol_free_syntax (uri->protocol)) {
+ uri->data = prefix_end;
+ uri->datalen = strlen (prefix_end);
+ return URI_ERRNO_OK;
+
+ } else if (uri->protocol == PROTOCOL_FILE) {
+ datalen = check_uri_file (prefix_end);
+ frag_or_post = prefix_end + datalen;
+
+ /* Extract the fragment part. */
+ if (datalen >= 0) {
+ if (*frag_or_post == '#') {
+ uri->fragment = frag_or_post + 1;
+ uri->fragmentlen = strcspn(uri->fragment, POST_CHAR_S);
+ frag_or_post = uri->fragment + uri->fragmentlen;
+ }
+ if (*frag_or_post == POST_CHAR) {
+ uri->post = frag_or_post + 1;
+ }
+ } else {
+ datalen = strlen(prefix_end);
+ }
+
+ uri->data = prefix_end;
+ uri->datalen = datalen;
+
+ return URI_ERRNO_OK;
+ }
+
+ /* Isolate host */
+
+ /* Get brackets enclosing IPv6 address */
+ lbracket = strchr (prefix_end, '[');
+ if (lbracket) {
+ rbracket = strchr (lbracket, ']');
+ /* [address] is handled only inside of hostname part (surprisingly). */
+ if (rbracket && rbracket < prefix_end + strcspn (prefix_end, "/"))
+ uri->ipv6 = 1;
+ else
+ lbracket = rbracket = NULL;
+ } else {
+ rbracket = NULL;
+ }
+
+ /* Possibly skip auth part */
+ host_end = prefix_end + strcspn (prefix_end, "@");
+
+ if (prefix_end + strcspn (prefix_end, "/") > host_end
+ && *host_end) { /* we have auth info here */
+
+ /* Allow '@' in the password component */
+ while (strcspn (host_end + 1, "@") < strcspn (host_end + 1, "/?"))
+ host_end = host_end + 1 + strcspn (host_end + 1, "@");
+
+ user_end = strchr (prefix_end, ':');
+
+ if (!user_end || user_end > host_end) {
+ uri->user = prefix_end;
+ uri->userlen = host_end - prefix_end;
+ } else {
+ uri->user = prefix_end;
+ uri->userlen = user_end - prefix_end;
+ uri->password = user_end + 1;
+ uri->passwordlen = host_end - user_end - 1;
+ }
+ prefix_end = host_end + 1;
+ }
+
+ if (uri->ipv6)
+ host_end = rbracket + strcspn (rbracket, ":/?");
+ else
+ host_end = prefix_end + strcspn (prefix_end, ":/?");
+
+ if (uri->ipv6) {
+ addrlen = rbracket - lbracket - 1;
+
+
+ uri->host = lbracket + 1;
+ uri->hostlen = addrlen;
+ } else {
+ uri->host = prefix_end;
+ uri->hostlen = host_end - prefix_end;
+
+ /* Trim trailing '.'s */
+ if (uri->hostlen && uri->host[uri->hostlen - 1] == '.')
+ return URI_ERRNO_TRAILING_DOTS;
+ }
+
+ if (*host_end == ':') { /* we have port here */
+ port_end = host_end + 1 + strcspn (host_end + 1, "/");
+
+ host_end++;
+
+ uri->port = host_end;
+ uri->portlen = port_end - host_end;
+
+ if (uri->portlen == 0)
+ return URI_ERRNO_NO_PORT_COLON;
+
+ /* We only use 8 bits for portlen so better check */
+ if (uri->portlen != port_end - host_end)
+ return URI_ERRNO_INVALID_PORT;
+
+ /* test if port is number */
+ for (; host_end < port_end; host_end++)
+ if (!isdigit (*host_end))
+ return URI_ERRNO_INVALID_PORT;
+
+ /* Check valid port value, and let show an error message
+ * about invalid url syntax. */
+ if (uri->port && uri->portlen) {
+
+ errno = 0;
+ n = strtol (uri->port, NULL, 10);
+ if (errno || !uri_port_is_valid (n))
+ return URI_ERRNO_INVALID_PORT;
+ }
+ }
+
+ if (*host_end == '/') {
+ host_end++;
+
+ } else if (get_protocol_need_slash_after_host (uri->protocol) && *host_end != '?') {
+ /* The need for slash after the host component depends on the
+ * need for a host component. -- The dangerous mind of Jonah */
+ if (!uri->hostlen)
+ return URI_ERRNO_NO_HOST;
+
+ return URI_ERRNO_NO_HOST_SLASH;
+ }
+
+ /* Look for #fragment or POST_CHAR */
+ prefix_end = host_end + strcspn (host_end, "#" POST_CHAR_S);
+ uri->data = host_end;
+ uri->datalen = prefix_end - host_end;
+
+ if (*prefix_end == '#') {
+ uri->fragment = prefix_end + 1;
+ uri->fragmentlen = strcspn (uri->fragment, POST_CHAR_S);
+ prefix_end = uri->fragment + uri->fragmentlen;
+ }
+
+ if (*prefix_end == POST_CHAR) {
+ uri->post = prefix_end + 1;
+ }
+
+ convert_to_lowercase (uri->string, uri->protocollen);
+ convert_to_lowercase (uri->host, uri->hostlen);
+ /* Decode %HH sequences in host name. This is important not so much
+ to support %HH sequences in host names (which other browser
+ don't), but to support binary characters (which will have been
+ converted to %HH by reencode_escapes). */
+ if (strchr (uri->host, '%')) {
+ url_unescape (uri->host);
+ }
+ path_simplify (uri->data);
+
+ return URI_ERRNO_OK;
+}
+
+void
+url_parse_text (struct worker_task *task, GByteArray *content)
+{
+ GMatchInfo *info;
+ GError *err = NULL;
+ int pos = 0, start;
+ gboolean rc;
+ char *url_str = NULL;
+ struct uri *new;
+
+ if (url_init () == 0) {
+ do {
+ rc = g_regex_match_full (text_re, (const char *)content->data, content->len, pos, 0, &info, &err);
+ if (rc) {
+ if (g_match_info_matches (info)) {
+ g_match_info_fetch_pos (info, 0, &start, &pos);
+ url_str = g_match_info_fetch (info, 0);
+ msg_debug ("url_parse_text: extracted string with regexp: '%s'", url_str);
+ if (url_str != NULL) {
+ new = memory_pool_alloc (task->task_pool, sizeof (struct uri));
+ if (new != NULL) {
+ parse_uri (new, url_str, task->task_pool);
+ TAILQ_INSERT_TAIL (&task->urls, new, next);
+ }
+ }
+ g_free (url_str);
+ }
+ g_match_info_free (info);
+ }
+ else if (err != NULL) {
+ msg_debug ("url_parse_text: error matching regexp: %s", err->message);
+ g_free (err);
+ }
+ else {
+ msg_debug ("url_parse_text: cannot find url pattern in given string");
+ }
+ } while (rc);
+ }
+}
+
+void
+url_parse_html (struct worker_task *task, GByteArray *content)
+{
+ GMatchInfo *info;
+ GError *err = NULL;
+ int pos = 0, start;
+ gboolean rc;
+ char *url_str = NULL;
+ struct uri *new;
+
+ if (url_init () == 0) {
+ do {
+ rc = g_regex_match_full (html_re, (const char *)content->data, content->len, pos, 0, &info, &err);
+ if (rc) {
+ if (g_match_info_matches (info)) {
+ g_match_info_fetch_pos (info, 0, &start, &pos);
+ url_str = g_match_info_fetch (info, 1);
+ msg_debug ("url_parse_html: extracted string with regexp: '%s'", url_str);
+ if (url_str != NULL) {
+ new = memory_pool_alloc (task->task_pool, sizeof (struct uri));
+ if (new != NULL) {
+ parse_uri (new, url_str, task->task_pool);
+ TAILQ_INSERT_TAIL (&task->urls, new, next);
+ }
+ }
+ g_free (url_str);
+ }
+ g_match_info_free (info);
+ }
+ else if (err) {
+ msg_debug ("url_parse_html: error matching regexp: %s", err->message);
+ g_free (err);
+ }
+ else {
+ msg_debug ("url_parse_html: cannot find url pattern in given string");
+ }
+ } while (rc);
+ }
+}
diff --git a/src/url.h b/src/url.h
new file mode 100644
index 000000000..6987c38d1
--- /dev/null
+++ b/src/url.h
@@ -0,0 +1,88 @@
+/* URL check functions */
+#ifndef URL_H
+#define URL_H
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#ifndef HAVE_OWN_QUEUE_H
+#include <sys/queue.h>
+#else
+#include "queue.h"
+#endif
+
+#include <glib.h>
+#include "mem_pool.h"
+
+struct worker_task;
+
+struct uri {
+ /* The start of the uri (and thus start of the protocol string). */
+ unsigned char *string;
+
+ /* The internal type of protocol. Can _never_ be PROTOCOL_UNKNOWN. */
+ int protocol; /* enum protocol */
+
+ int ip_family;
+
+ unsigned char *user;
+ unsigned char *password;
+ unsigned char *host;
+ unsigned char *port;
+ /* @data can contain both the path and query uri fields.
+ * It can never be NULL but can have zero length. */
+ unsigned char *data;
+ unsigned char *fragment;
+ /* @post can contain some special encoded form data, used internally
+ * to make form data handling more efficient. The data is marked by
+ * POST_CHAR in the uri string. */
+ unsigned char *post;
+
+ /* @protocollen should only be usable if @protocol is either
+ * PROTOCOL_USER or an uri string should be composed. */
+ unsigned int protocollen;
+ unsigned int userlen;
+ unsigned int passwordlen;
+ unsigned int hostlen;
+ unsigned int portlen;
+ unsigned int datalen;
+ unsigned int fragmentlen;
+
+ /* Flags */
+ unsigned int ipv6; /* URI contains IPv6 host */
+ unsigned int form; /* URI originated from form */
+
+ /* Link */
+ TAILQ_ENTRY(uri) next;
+};
+
+enum uri_errno {
+ URI_ERRNO_OK, /* Parsing went well */
+ URI_ERRNO_EMPTY, /* The URI string was empty */
+ URI_ERRNO_INVALID_PROTOCOL, /* No protocol was found */
+ URI_ERRNO_NO_SLASHES, /* Slashes after protocol missing */
+ URI_ERRNO_TOO_MANY_SLASHES, /* Too many slashes after protocol */
+ URI_ERRNO_TRAILING_DOTS, /* '.' after host */
+ URI_ERRNO_NO_HOST, /* Host part is missing */
+ URI_ERRNO_NO_PORT_COLON, /* ':' after host without port */
+ URI_ERRNO_NO_HOST_SLASH, /* Slash after host missing */
+ URI_ERRNO_IPV6_SECURITY, /* IPv6 security bug detected */
+ URI_ERRNO_INVALID_PORT, /* Port number is bad */
+ URI_ERRNO_INVALID_PORT_RANGE, /* Port number is not within 0-65535 */
+};
+
+enum protocol {
+ PROTOCOL_FILE,
+ PROTOCOL_FTP,
+ PROTOCOL_HTTP,
+ PROTOCOL_HTTPS,
+
+ PROTOCOL_UNKNOWN,
+};
+
+#define struri(uri) ((uri)->string)
+
+void url_parse_html (struct worker_task *task, GByteArray *part);
+void url_parse_text (struct worker_task *task, GByteArray *part);
+enum uri_errno parse_uri(struct uri *uri, unsigned char *uristring, memory_pool_t *pool);
+
+#endif
diff --git a/src/util.c b/src/util.c
new file mode 100644
index 000000000..90a536359
--- /dev/null
+++ b/src/util.c
@@ -0,0 +1,834 @@
+#include <sys/param.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <sys/file.h>
+#include <syslog.h>
+#include <glib.h>
+
+#include "config.h"
+#ifdef HAVE_LIBUTIL_H
+#include <libutil.h>
+#endif
+#include "util.h"
+#include "cfg_file.h"
+
+sig_atomic_t do_reopen_log = 0;
+
+int
+event_make_socket_nonblocking (int fd)
+{
+ if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+make_socket_ai (struct addrinfo *ai)
+{
+ struct linger linger;
+ int fd, on = 1, r;
+ int serrno;
+
+ /* Create listen socket */
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == -1) {
+ return (-1);
+ }
+
+ if (event_make_socket_nonblocking(fd) < 0)
+ goto out;
+
+ if (fcntl(fd, F_SETFD, 1) == -1) {
+ goto out;
+ }
+
+ setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
+ linger.l_onoff = 1;
+ linger.l_linger = 5;
+ setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));
+
+ r = bind(fd, ai->ai_addr, ai->ai_addrlen);
+
+ if (r == -1) {
+ if (errno != EINPROGRESS) {
+ goto out;
+ }
+ }
+
+ return (fd);
+
+ out:
+ serrno = errno;
+ close(fd);
+ errno = serrno;
+ return (-1);
+}
+
+int
+make_socket (const char *address, u_short port)
+{
+ int fd;
+ struct addrinfo ai, *aitop = NULL;
+ char strport[NI_MAXSERV];
+ int ai_result;
+
+ memset (&ai, 0, sizeof (ai));
+ ai.ai_family = AF_INET;
+ ai.ai_socktype = SOCK_STREAM;
+ ai.ai_flags = AI_PASSIVE;
+ snprintf (strport, sizeof (strport), "%d", port);
+ if ((ai_result = getaddrinfo (address, strport, &ai, &aitop)) != 0) {
+ return (-1);
+ }
+
+ fd = make_socket_ai (aitop);
+
+ freeaddrinfo (aitop);
+
+ return (fd);
+}
+
+int
+make_unix_socket (const char *path, struct sockaddr_un *addr)
+{
+ size_t len = strlen (path);
+ int sock;
+
+ if (len > sizeof (addr->sun_path) - 1) return -1;
+
+ #ifdef FREEBSD
+ addr->sun_len = sizeof (struct sockaddr_un);
+ #endif
+
+ addr->sun_family = AF_UNIX;
+
+ strncpy (addr->sun_path, path, len);
+
+ sock = socket (PF_LOCAL, SOCK_STREAM, 0);
+
+ if (sock != -1) {
+ if (bind (sock, (struct sockaddr *) addr, sizeof (struct sockaddr_un)) == -1) return -1;
+ }
+
+ return sock;
+}
+
+void
+read_cmd_line (int argc, char **argv, struct config_file *cfg)
+{
+ int ch;
+ while ((ch = getopt(argc, argv, "hfc:")) != -1) {
+ switch (ch) {
+ case 'f':
+ cfg->no_fork = 1;
+ break;
+ case 'c':
+ if (optarg && cfg->cfg_name) {
+ cfg->cfg_name = memory_pool_strdup (cfg->cfg_pool, optarg);
+ }
+ break;
+ case 'h':
+ case '?':
+ default:
+ /* Show help message and exit */
+ printf ("Rspamd version " RVERSION "\n"
+ "Usage: rspamd [-h] [-n] [-f] [-c config_file]\n"
+ "-h: This help message\n"
+ "-f: Do not daemonize main process\n"
+ "-c: Specify config file (./rspamd.conf is used by default)\n");
+ exit (0);
+ break;
+ }
+ }
+}
+
+int
+write_pid (struct rspamd_main *main)
+{
+ pid_t pid;
+ main->pfh = pidfile_open (main->cfg->pid_file, 0644, &pid);
+
+ if (main->pfh == NULL) {
+ return -1;
+ }
+
+ pidfile_write (main->pfh);
+
+ return 0;
+}
+
+void
+init_signals (struct sigaction *signals, sig_t sig_handler)
+{
+ /* Setting up signal handlers */
+ /* SIGUSR1 - reopen config file */
+ /* SIGUSR2 - worker is ready for accept */
+ sigemptyset(&signals->sa_mask);
+ sigaddset(&signals->sa_mask, SIGTERM);
+ sigaddset(&signals->sa_mask, SIGINT);
+ sigaddset(&signals->sa_mask, SIGHUP);
+ sigaddset(&signals->sa_mask, SIGCHLD);
+ sigaddset(&signals->sa_mask, SIGUSR1);
+ sigaddset(&signals->sa_mask, SIGUSR2);
+
+
+ signals->sa_handler = sig_handler;
+ sigaction (SIGTERM, signals, NULL);
+ sigaction (SIGINT, signals, NULL);
+ sigaction (SIGHUP, signals, NULL);
+ sigaction (SIGCHLD, signals, NULL);
+ sigaction (SIGUSR1, signals, NULL);
+ sigaction (SIGUSR2, signals, NULL);
+}
+
+void
+pass_signal_worker (struct workq *workers, int signo)
+{
+ struct rspamd_worker *cur;
+ TAILQ_FOREACH (cur, workers, next) {
+ kill (cur->pid, signo);
+ }
+}
+
+void convert_to_lowercase (char *str, unsigned int size)
+{
+ while (size--) {
+ *str = tolower (*str);
+ str ++;
+ }
+}
+
+#ifndef HAVE_SETPROCTITLE
+
+static char *title_buffer = 0;
+static size_t title_buffer_size = 0;
+static char *title_progname, *title_progname_full;
+
+int
+setproctitle (const char *fmt, ...)
+{
+ if (!title_buffer || !title_buffer_size) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ memset (title_buffer, '\0', title_buffer_size);
+
+ ssize_t written;
+
+ if (fmt) {
+ ssize_t written2;
+ va_list ap;
+
+ written = snprintf (title_buffer, title_buffer_size, "%s: ", title_progname);
+ if (written < 0 || (size_t) written >= title_buffer_size)
+ return -1;
+
+ va_start (ap, fmt);
+ written2 =
+ vsnprintf (title_buffer + written,
+ title_buffer_size - written, fmt, ap);
+ va_end (ap);
+ if (written2 < 0
+ || (size_t) written2 >= title_buffer_size - written)
+ return -1;
+ } else {
+ written =
+ snprintf (title_buffer, title_buffer_size, "%s",
+ title_progname);
+ if (written < 0 || (size_t) written >= title_buffer_size)
+ return -1;
+ }
+
+ written = strlen (title_buffer);
+ memset (title_buffer + written, '\0', title_buffer_size - written);
+
+ return 0;
+}
+
+/*
+ It has to be _init function, because __attribute__((constructor))
+ functions gets called without arguments.
+*/
+
+int
+init_title (int argc, char *argv[], char *envp[])
+{
+ char *begin_of_buffer = 0, *end_of_buffer = 0;
+ int i;
+
+ for (i = 0; i < argc; ++i) {
+ if (!begin_of_buffer)
+ begin_of_buffer = argv[i];
+ if (!end_of_buffer || end_of_buffer + 1 == argv[i])
+ end_of_buffer = argv[i] + strlen (argv[i]);
+ }
+
+ for (i = 0; envp[i]; ++i) {
+ if (!begin_of_buffer)
+ begin_of_buffer = envp[i];
+ if (!end_of_buffer || end_of_buffer + 1 == envp[i])
+ end_of_buffer = envp[i] + strlen(envp[i]);
+ }
+
+ if (!end_of_buffer)
+ return 0;
+
+ char **new_environ = g_malloc ((i + 1) * sizeof (envp[0]));
+
+ if (!new_environ)
+ return 0;
+
+ for (i = 0; envp[i]; ++i) {
+ if (!(new_environ[i] = g_strdup (envp[i])))
+ goto cleanup_enomem;
+ }
+ new_environ[i] = 0;
+
+ if (program_invocation_name) {
+ title_progname_full = g_strdup (program_invocation_name);
+
+ if (!title_progname_full)
+ goto cleanup_enomem;
+
+ char *p = strrchr (title_progname_full, '/');
+
+ if (p)
+ title_progname = p + 1;
+ else
+ title_progname = title_progname_full;
+
+ program_invocation_name = title_progname_full;
+ program_invocation_short_name = title_progname;
+ }
+
+ environ = new_environ;
+ title_buffer = begin_of_buffer;
+ title_buffer_size = end_of_buffer - begin_of_buffer;
+
+ return 0;
+
+ cleanup_enomem:
+ for (--i; i >= 0; --i) {
+ g_free (new_environ[i]);
+ }
+ g_free (new_environ);
+ return 0;
+}
+#endif
+
+#ifndef HAVE_PIDFILE
+extern char * __progname;
+static int _pidfile_remove (struct pidfh *pfh, int freeit);
+
+static int
+pidfile_verify (struct pidfh *pfh)
+{
+ struct stat sb;
+
+ if (pfh == NULL || pfh->pf_fd == -1)
+ return (-1);
+ /*
+ * Check remembered descriptor.
+ */
+ if (fstat (pfh->pf_fd, &sb) == -1)
+ return (errno);
+ if (sb.st_dev != pfh->pf_dev || sb.st_ino != pfh->pf_ino)
+ return -1;
+ return 0;
+}
+
+static int
+pidfile_read (const char *path, pid_t *pidptr)
+{
+ char buf[16], *endptr;
+ int error, fd, i;
+
+ fd = open (path, O_RDONLY);
+ if (fd == -1)
+ return (errno);
+
+ i = read (fd, buf, sizeof(buf) - 1);
+ error = errno; /* Remember errno in case close() wants to change it. */
+ close (fd);
+ if (i == -1)
+ return error;
+ else if (i == 0)
+ return EAGAIN;
+ buf[i] = '\0';
+
+ *pidptr = strtol (buf, &endptr, 10);
+ if (endptr != &buf[i])
+ return EINVAL;
+
+ return 0;
+}
+
+struct pidfh *
+pidfile_open (const char *path, mode_t mode, pid_t *pidptr)
+{
+ struct pidfh *pfh;
+ struct stat sb;
+ int error, fd, len, count;
+ struct timespec rqtp;
+
+ pfh = g_malloc (sizeof(*pfh));
+ if (pfh == NULL)
+ return NULL;
+
+ if (path == NULL)
+ len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
+ "/var/run/%s.pid", __progname);
+ else
+ len = snprintf (pfh->pf_path, sizeof(pfh->pf_path),
+ "%s", path);
+ if (len >= (int)sizeof (pfh->pf_path)) {
+ g_free (pfh);
+ errno = ENAMETOOLONG;
+ return NULL;
+ }
+
+ /*
+ * Open the PID file and obtain exclusive lock.
+ * We truncate PID file here only to remove old PID immediatelly,
+ * PID file will be truncated again in pidfile_write(), so
+ * pidfile_write() can be called multiple times.
+ */
+ fd = open (pfh->pf_path,
+ O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, mode);
+ flock (fd, LOCK_EX | LOCK_NB);
+ if (fd == -1) {
+ count = 0;
+ rqtp.tv_sec = 0;
+ rqtp.tv_nsec = 5000000;
+ if (errno == EWOULDBLOCK && pidptr != NULL) {
+ again:
+ errno = pidfile_read (pfh->pf_path, pidptr);
+ if (errno == 0)
+ errno = EEXIST;
+ else if (errno == EAGAIN) {
+ if (++count <= 3) {
+ nanosleep (&rqtp, 0);
+ goto again;
+ }
+ }
+ }
+ g_free (pfh);
+ return NULL;
+ }
+ /*
+ * Remember file information, so in pidfile_write() we are sure we write
+ * to the proper descriptor.
+ */
+ if (fstat (fd, &sb) == -1) {
+ error = errno;
+ unlink (pfh->pf_path);
+ close (fd);
+ g_free (pfh);
+ errno = error;
+ return NULL;
+ }
+
+ pfh->pf_fd = fd;
+ pfh->pf_dev = sb.st_dev;
+ pfh->pf_ino = sb.st_ino;
+
+ return pfh;
+}
+
+int
+pidfile_write (struct pidfh *pfh)
+{
+ char pidstr[16];
+ int error, fd;
+
+ /*
+ * Check remembered descriptor, so we don't overwrite some other
+ * file if pidfile was closed and descriptor reused.
+ */
+ errno = pidfile_verify (pfh);
+ if (errno != 0) {
+ /*
+ * Don't close descriptor, because we are not sure if it's ours.
+ */
+ return -1;
+ }
+ fd = pfh->pf_fd;
+
+ /*
+ * Truncate PID file, so multiple calls of pidfile_write() are allowed.
+ */
+ if (ftruncate (fd, 0) == -1) {
+ error = errno;
+ _pidfile_remove (pfh, 0);
+ errno = error;
+ return -1;
+ }
+
+ snprintf (pidstr, sizeof(pidstr), "%u", getpid ());
+ if (pwrite (fd, pidstr, strlen (pidstr), 0) != (ssize_t)strlen (pidstr)) {
+ error = errno;
+ _pidfile_remove (pfh, 0);
+ errno = error;
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+pidfile_close (struct pidfh *pfh)
+{
+ int error;
+
+ error = pidfile_verify (pfh);
+ if (error != 0) {
+ errno = error;
+ return -1;
+ }
+
+ if (close (pfh->pf_fd) == -1)
+ error = errno;
+ g_free (pfh);
+ if (error != 0) {
+ errno = error;
+ return -1;
+ }
+ return 0;
+}
+
+static int
+_pidfile_remove (struct pidfh *pfh, int freeit)
+{
+ int error;
+
+ error = pidfile_verify (pfh);
+ if (error != 0) {
+ errno = error;
+ return -1;
+ }
+
+ if (unlink (pfh->pf_path) == -1)
+ error = errno;
+ if (flock (pfh->pf_fd, LOCK_UN) == -1) {
+ if (error == 0)
+ error = errno;
+ }
+ if (close (pfh->pf_fd) == -1) {
+ if (error == 0)
+ error = errno;
+ }
+ if (freeit)
+ g_free (pfh);
+ else
+ pfh->pf_fd = -1;
+ if (error != 0) {
+ errno = error;
+ return -1;
+ }
+ return 0;
+}
+
+int
+pidfile_remove (struct pidfh *pfh)
+{
+
+ return (_pidfile_remove (pfh, 1));
+}
+#endif
+
+/*
+ * Functions for parsing expressions
+ */
+
+struct expression_stack {
+ char op;
+ struct expression_stack *next;
+};
+
+/*
+ * Push operand or operator to stack
+ */
+static struct expression_stack*
+push_expression_stack (memory_pool_t *pool, struct expression_stack *head, char op)
+{
+ struct expression_stack *new;
+ new = memory_pool_alloc (pool, sizeof (struct expression_stack));
+ new->op = op;
+ new->next = head;
+ return new;
+}
+
+/*
+ * Delete symbol from stack, return pointer to operand or operator (casted to void* )
+ */
+static char
+delete_expression_stack (struct expression_stack **head)
+{
+ struct expression_stack *cur;
+ char res;
+
+ if(*head == NULL) return 0;
+
+ cur = *head;
+ res = cur->op;
+
+ *head = cur->next;
+ return res;
+}
+
+/*
+ * Return operation priority
+ */
+static int
+logic_priority (char a)
+{
+ switch (a) {
+ case '!':
+ return 3;
+ case '|':
+ case '&':
+ return 2;
+ case '(':
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+/*
+ * Return 0 if symbol is not operation symbol (operand)
+ * Return 1 if symbol is operation symbol
+ */
+static int
+is_operation_symbol (char a)
+{
+ switch (a) {
+ case '!':
+ case '&':
+ case '|':
+ case '(':
+ case ')':
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static void
+insert_expression (memory_pool_t *pool, struct expression **head, int type, char op, void *operand)
+{
+ struct expression *new, *cur;
+
+ new = memory_pool_alloc (pool, sizeof (struct expression));
+ new->type = type;
+ if (new->type == EXPR_OPERAND) {
+ new->content.operand = operand;
+ }
+ else {
+ new->content.operation = op;
+ }
+ new->next = NULL;
+
+ if (!*head) {
+ *head = new;
+ }
+ else {
+ cur = *head;
+ while (cur->next) {
+ cur = cur->next;
+ }
+ cur->next = new;
+ }
+}
+
+/*
+ * Make inverse polish record for specified expression
+ * Memory is allocated from given pool
+ */
+struct expression*
+parse_expression (memory_pool_t *pool, char *line)
+{
+ struct expression *expr = NULL;
+ struct expression_stack *stack = NULL;
+ char *p, *c, *str, op, in_regexp = 0;
+
+ if (line == NULL || pool == NULL) {
+ return NULL;
+ }
+
+ p = line;
+ c = p;
+ while (*p) {
+ if (is_operation_symbol (*p) && !in_regexp) {
+ if (c != p) {
+ /* Copy operand */
+ str = memory_pool_alloc (pool, p - c + 1);
+ strlcpy (str, c, (p - c + 1));
+ insert_expression (pool, &expr, EXPR_OPERAND, 0, str);
+ }
+ if (*p == ')') {
+ if (stack == NULL) {
+ return NULL;
+ }
+ /* Pop all operators from stack to nearest '(' or to head */
+ while (stack->op != '(') {
+ op = delete_expression_stack (&stack);
+ if (op != '(') {
+ insert_expression (pool, &expr, EXPR_OPERATION, op, NULL);
+ }
+ }
+ }
+ else if (*p == '(') {
+ /* Push it to stack */
+ stack = push_expression_stack (pool, stack, *p);
+ }
+ else {
+ if (stack == NULL) {
+ stack = push_expression_stack (pool, stack, *p);
+ }
+ /* Check priority of logic operation */
+ else {
+ if (logic_priority (stack->op) < logic_priority (*p)) {
+ stack = push_expression_stack (pool, stack, *p);
+ }
+ else {
+ /* Pop all operations that have higher priority than this one */
+ while((stack != NULL) && (logic_priority (stack->op) >= logic_priority (*p))) {
+ op = delete_expression_stack (&stack);
+ if (op != '(') {
+ insert_expression (pool, &expr, EXPR_OPERATION, op, NULL);
+ }
+ }
+ stack = push_expression_stack (pool, stack, *p);
+ }
+ }
+ }
+ c = p + 1;
+ }
+ if (*p == '/' && (p == line || *(p - 1) != '\\')) {
+ in_regexp = !in_regexp;
+ }
+ p++;
+ }
+ /* Write last operand if it exists */
+ if (c != p) {
+ /* Copy operand */
+ str = memory_pool_alloc (pool, p - c + 1);
+ strlcpy (str, c, (p - c + 1));
+ insert_expression (pool, &expr, EXPR_OPERAND, 0, str);
+ }
+ /* Pop everything from stack */
+ while(stack != NULL) {
+ op = delete_expression_stack (&stack);
+ if (op != '(') {
+ insert_expression (pool, &expr, EXPR_OPERATION, op, NULL);
+ }
+ }
+
+ return expr;
+}
+
+/* Logging utility functions */
+int
+open_log (struct config_file *cfg)
+{
+ switch (cfg->log_type) {
+ case RSPAMD_LOG_CONSOLE:
+ /* Do nothing with console */
+ return 0;
+ case RSPAMD_LOG_SYSLOG:
+ openlog ("rspamd", LOG_NDELAY | LOG_PID, cfg->log_facility);
+ return 0;
+ case RSPAMD_LOG_FILE:
+ cfg->log_fd = open (cfg->log_file, O_CREAT | O_WRONLY | O_APPEND);
+ if (cfg->log_fd == -1) {
+ msg_err ("open_log: cannot open desired log file: %s, %m", cfg->log_file);
+ return -1;
+ }
+ return 0;
+ }
+}
+
+int
+reopen_log (struct config_file *cfg)
+{
+ do_reopen_log = 0;
+ switch (cfg->log_type) {
+ case RSPAMD_LOG_CONSOLE:
+ /* Do nothing with console */
+ return 0;
+ case RSPAMD_LOG_SYSLOG:
+ closelog ();
+ break;
+ case RSPAMD_LOG_FILE:
+ close (cfg->log_fd);
+ break;
+ }
+ return open_log (cfg);
+}
+
+void
+syslog_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg)
+{
+ struct config_file *cfg = (struct config_file *)arg;
+ if (do_reopen_log) {
+ reopen_log (cfg);
+ }
+
+ if (log_level <= cfg->log_level) {
+ if (log_level >= G_LOG_LEVEL_DEBUG) {
+ syslog (LOG_DEBUG, message);
+ }
+ else if (log_level >= G_LOG_LEVEL_INFO) {
+ syslog (LOG_INFO, message);
+ }
+ else if (log_level >= G_LOG_LEVEL_WARNING) {
+ syslog (LOG_WARNING, message);
+ }
+ else if (log_level >= G_LOG_LEVEL_CRITICAL) {
+ syslog (LOG_ERR, message);
+ }
+ }
+}
+
+void
+file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg)
+{
+ struct config_file *cfg = (struct config_file *)arg;
+ char tmpbuf[128];
+ int r;
+ struct iovec out[3];
+
+ if (cfg->log_fd == -1) {
+ return;
+ }
+
+ if (do_reopen_log) {
+ reopen_log (cfg);
+ }
+
+ if (log_level <= cfg->log_level) {
+ r = snprintf (tmpbuf, sizeof (tmpbuf), "#%d: %d rspamd ", (int)getpid (), (int)time (NULL));
+ out[0].iov_base = tmpbuf;
+ out[0].iov_len = r;
+ out[1].iov_base = (char *)message;
+ out[1].iov_len = strlen (message);
+ out[2].iov_base = "\r\n";
+ out[2].iov_len = 2;
+
+ writev (cfg->log_fd, out, sizeof (out) / sizeof (out[0]));
+ }
+}
+/*
+ * vi:ts=4
+ */
diff --git a/src/util.h b/src/util.h
new file mode 100644
index 000000000..1791f4635
--- /dev/null
+++ b/src/util.h
@@ -0,0 +1,60 @@
+#ifndef UTIL_H
+#define UTIL_H
+
+#include <sys/types.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/queue.h>
+#include <sys/time.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <signal.h>
+
+#include "main.h"
+
+struct config_file;
+
+/* Create socket and bind it to specified address and port */
+int make_socket(const char *, u_short );
+/* Create and bind unix socket */
+int make_unix_socket (const char *, struct sockaddr_un *);
+/* Parse command line arguments using getopt (3) */
+void read_cmd_line (int , char **, struct config_file *);
+/* Write pid to file */
+int write_pid (struct rspamd_main *);
+/* Make specified socket non-blocking */
+int event_make_socket_nonblocking(int);
+/* Init signals */
+void init_signals (struct sigaction *, sig_t);
+/* Send specified signal to each worker */
+void pass_signal_worker (struct workq *, int );
+/* Convert string to lowercase */
+void convert_to_lowercase (char *str, unsigned int size);
+
+#ifndef HAVE_SETPROCTITLE
+int init_title(int argc, char *argv[], char *envp[]);
+int setproctitle(const char *fmt, ...);
+#endif
+
+#ifndef HAVE_PIDFILE
+struct pidfh {
+ int pf_fd;
+ char pf_path[MAXPATHLEN + 1];
+ __dev_t pf_dev;
+ ino_t pf_ino;
+};
+struct pidfh *pidfile_open(const char *path, mode_t mode, pid_t *pidptr);
+int pidfile_write(struct pidfh *pfh);
+int pidfile_close(struct pidfh *pfh);
+int pidfile_remove(struct pidfh *pfh);
+#endif
+
+int open_log (struct config_file *cfg);
+int reopen_log (struct config_file *cfg);
+void syslog_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg);
+void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg);
+
+#endif
diff --git a/src/worker.c b/src/worker.c
new file mode 100644
index 000000000..268e8123b
--- /dev/null
+++ b/src/worker.c
@@ -0,0 +1,364 @@
+#include <sys/stat.h>
+#include <sys/param.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+#include <signal.h>
+
+#include <netinet/in.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <EXTERN.h> /* from the Perl distribution */
+#include <perl.h> /* from the Perl distribution */
+
+#include <glib.h>
+#include <gmime/gmime.h>
+
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "url.h"
+#include "modules.h"
+
+#define TASK_POOL_SIZE 4095
+
+const f_str_t CRLF = {
+ /* begin */"\r\n",
+ /* len */2,
+ /* size */2
+};
+
+extern PerlInterpreter *perl_interpreter;
+
+static
+void sig_handler (int signo)
+{
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ _exit (1);
+ break;
+ }
+}
+
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev);
+ event_del (&worker->bind_ev);
+ do_reopen_log = 1;
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ return;
+}
+
+static void
+rcpt_destruct (void *pointer)
+{
+ struct worker_task *task = (struct worker_task *)pointer;
+
+ if (task->rcpt) {
+ g_list_free (task->rcpt);
+ }
+}
+
+static void
+free_task (struct worker_task *task)
+{
+ struct mime_part *part;
+
+ if (task) {
+ if (task->memc_ctx) {
+ memc_close_ctx (task->memc_ctx);
+ }
+ while (!TAILQ_EMPTY (&task->parts)) {
+ part = TAILQ_FIRST (&task->parts);
+ g_object_unref (part->type);
+ g_object_unref (part->content);
+ TAILQ_REMOVE (&task->parts, part, next);
+ }
+ memory_pool_delete (task->task_pool);
+ bufferevent_disable (task->bev, EV_READ | EV_WRITE);
+ bufferevent_free (task->bev);
+ close (task->sock);
+ g_free (task);
+ }
+}
+
+static void
+mime_foreach_callback (GMimeObject *part, gpointer user_data)
+{
+ struct worker_task *task = (struct worker_task *)user_data;
+ struct mime_part *mime_part;
+ GMimeContentType *type;
+ GMimeDataWrapper *wrapper;
+ GMimeStream *part_stream;
+ GByteArray *part_content;
+
+ task->parts_count ++;
+
+ /* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */
+
+ /* find out what class 'part' is... */
+ if (GMIME_IS_MESSAGE_PART (part)) {
+ /* message/rfc822 or message/news */
+ GMimeMessage *message;
+
+ /* g_mime_message_foreach_part() won't descend into
+ child message parts, so if we want to count any
+ subparts of this child message, we'll have to call
+ g_mime_message_foreach_part() again here. */
+
+ message = g_mime_message_part_get_message ((GMimeMessagePart *) part);
+ g_mime_message_foreach_part (message, mime_foreach_callback, task);
+ g_object_unref (message);
+ } else if (GMIME_IS_MESSAGE_PARTIAL (part)) {
+ /* message/partial */
+
+ /* this is an incomplete message part, probably a
+ large message that the sender has broken into
+ smaller parts and is sending us bit by bit. we
+ could save some info about it so that we could
+ piece this back together again once we get all the
+ parts? */
+ } else if (GMIME_IS_MULTIPART (part)) {
+ /* multipart/mixed, multipart/alternative, multipart/related, multipart/signed, multipart/encrypted, etc... */
+
+ /* we'll get to finding out if this is a signed/encrypted multipart later... */
+ } else if (GMIME_IS_PART (part)) {
+ /* a normal leaf part, could be text/plain or image/jpeg etc */
+ wrapper = g_mime_part_get_content_object (GMIME_PART (part));
+ if (wrapper != NULL) {
+ part_stream = g_mime_stream_mem_new ();
+ if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) {
+ part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream));
+ type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part));
+ mime_part = memory_pool_alloc (task->task_pool, sizeof (struct mime_part));
+ mime_part->type = type;
+ mime_part->content = part_content;
+ TAILQ_INSERT_TAIL (&task->parts, mime_part, next);
+ if (g_mime_content_type_is_type (type, "text", "html")) {
+ url_parse_html (task, part_content);
+ }
+ else if (g_mime_content_type_is_type (type, "text", "plain")) {
+ url_parse_text (task, part_content);
+ }
+ }
+ }
+ } else {
+ g_assert_not_reached ();
+ }
+}
+
+static int
+process_message (struct worker_task *task)
+{
+ GMimeMessage *message;
+ GMimeParser *parser;
+ GMimeStream *stream;
+
+ stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len);
+ /* create a new parser object to parse the stream */
+ parser = g_mime_parser_new_with_stream (stream);
+
+ /* unref the stream (parser owns a ref, so this object does not actually get free'd until we destroy the parser) */
+ g_object_unref (stream);
+
+ /* parse the message from the stream */
+ message = g_mime_parser_construct_message (parser);
+
+ task->message = message;
+ memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_object_unref, task->message);
+
+ /* free the parser (and the stream) */
+ g_object_unref (parser);
+
+ g_mime_message_foreach_part (message, mime_foreach_callback, task);
+
+ msg_info ("process_message: found %d parts in message", task->parts_count);
+
+ task->worker->srv->stat->messages_scanned ++;
+
+ return process_filters (task);
+}
+
+static void
+read_socket (struct bufferevent *bev, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+ ssize_t r;
+ char *s;
+
+ switch (task->state) {
+ case READ_COMMAND:
+ case READ_HEADER:
+ s = evbuffer_readline (EVBUFFER_INPUT (bev));
+ if (read_rspamd_input_line (task, s) != 0) {
+ task->last_error = "Read error";
+ task->error_code = RSPAMD_NETWORK_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+ bufferevent_enable (bev, EV_WRITE);
+ bufferevent_disable (bev, EV_READ);
+ }
+ free (s);
+ break;
+ case READ_MESSAGE:
+ r = bufferevent_read (bev, task->msg->pos, task->msg->free);
+ if (r > 0) {
+ task->msg->pos += r;
+ update_buf_size (task->msg);
+ if (task->msg->free == 0) {
+ r = process_message (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ else if (r == 1) {
+ task->state = WAIT_FILTER;
+ }
+ }
+ if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
+ bufferevent_enable (bev, EV_WRITE);
+ bufferevent_disable (bev, EV_READ);
+ }
+ }
+ else {
+ msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r);
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_free (bev);
+ free_task (task);
+ }
+ break;
+ case WAIT_FILTER:
+ bufferevent_disable (bev, EV_READ);
+ break;
+ }
+}
+
+static void
+write_socket (struct bufferevent *bev, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+
+ switch (task->state) {
+ case WRITE_REPLY:
+ write_reply (task);
+ task->state = CLOSING_CONNECTION;
+ bufferevent_disable (bev, EV_READ);
+ break;
+ case WRITE_ERROR:
+ write_reply (task);
+ task->state = CLOSING_CONNECTION;
+ bufferevent_disable (bev, EV_READ);
+ break;
+ case CLOSING_CONNECTION:
+ msg_debug ("write_socket: normally closing connection");
+ free_task (task);
+ break;
+ default:
+ msg_info ("write_socket: abnormally closing connection");
+ free_task (task);
+ break;
+ }
+}
+
+static void
+err_socket (struct bufferevent *bev, short what, void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+ msg_info ("err_socket: abnormally closing connection");
+ /* Free buffers */
+ free_task (task);
+}
+
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct worker_task *new_task;
+ socklen_t addrlen = sizeof(ss);
+ int nfd;
+
+ if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ return;
+ }
+ if (event_make_socket_nonblocking(fd) < 0) {
+ return;
+ }
+
+ new_task = g_malloc (sizeof (struct worker_task));
+ if (new_task == NULL) {
+ msg_err ("accept_socket: cannot allocate memory for task, %m");
+ return;
+ }
+ bzero (new_task, sizeof (struct worker_task));
+ new_task->worker = worker;
+ new_task->state = READ_COMMAND;
+ new_task->sock = nfd;
+ new_task->cfg = worker->srv->cfg;
+ TAILQ_INIT (&new_task->urls);
+ TAILQ_INIT (&new_task->parts);
+ new_task->task_pool = memory_pool_new (memory_pool_get_size ());
+ /* Add destructor for recipients list (it would be better to use anonymous function here */
+ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
+ worker->srv->stat->connections_count ++;
+
+ /* Read event */
+ new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);
+ bufferevent_enable (new_task->bev, EV_READ);
+}
+
+void
+start_worker (struct rspamd_worker *worker, int listen_sock)
+{
+ struct sigaction signals;
+ int i;
+
+
+ worker->srv->pid = getpid ();
+ worker->srv->type = TYPE_WORKER;
+ event_init ();
+ g_mime_init (0);
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ signal_add (&worker->sig_ev, NULL);
+
+ /* Accept event */
+ event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
+ /* Perform modules configuring */
+ for (i = 0; i < MODULES_NUM; i ++) {
+ modules[i].module_config_func (worker->srv->cfg);
+ }
+
+ /* Send SIGUSR2 to parent */
+ kill (getppid (), SIGUSR2);
+
+ event_loop (0);
+}
+
+/*
+ * vi:ts=4
+ */