diff options
-rw-r--r-- | CMakeLists.txt | 6 | ||||
-rw-r--r-- | config.h.in | 9 | ||||
-rw-r--r-- | rspamd.conf.sample | 10 | ||||
-rw-r--r-- | src/cfg_file.h | 30 | ||||
-rw-r--r-- | src/cfg_file.l | 4 | ||||
-rw-r--r-- | src/cfg_file.y | 86 | ||||
-rw-r--r-- | src/cfg_utils.c | 134 | ||||
-rw-r--r-- | src/controller.c | 2 | ||||
-rw-r--r-- | src/fstring.c | 31 | ||||
-rw-r--r-- | src/fstring.h | 6 | ||||
-rw-r--r-- | src/lmtp.c | 314 | ||||
-rw-r--r-- | src/lmtp.h | 20 | ||||
-rw-r--r-- | src/lmtp_proto.c | 380 | ||||
-rw-r--r-- | src/lmtp_proto.h | 44 | ||||
-rw-r--r-- | src/main.c | 17 | ||||
-rw-r--r-- | src/main.h | 3 | ||||
-rw-r--r-- | src/worker.c | 2 |
17 files changed, 1032 insertions, 66 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 8fedd0bff..e11320de1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,7 +145,7 @@ CHECK_INCLUDE_FILES(netinet/in.h HAVE_NETINET_IN_H) CHECK_INCLUDE_FILES(arpa/inet.h HAVE_ARPA_INET_H) CHECK_INCLUDE_FILES(netdb.h HAVE_NETDB_H) CHECK_INCLUDE_FILES(syslog.h HAVE_SYSLOG_H) - +CHECK_INCLUDE_FILES(libgen.h HAVE_LIBGEN_H) CHECK_FUNCTION_EXISTS(setproctitle HAVE_SETPROCTITLE) CHECK_FUNCTION_EXISTS(getpagesize HAVE_GETPAGESIZE) @@ -212,7 +212,9 @@ SET(RSPAMDSRC src/modules.c src/filter.c src/controller.c src/cfg_utils.c - src/buffer.c) + src/buffer.c + src/lmtp.c + src/lmtp_proto.c) SET(TOKENIZERSSRC src/tokenizers/tokenizers.c src/tokenizers/osb.c) diff --git a/config.h.in b/config.h.in index 04dd043d9..b77d31454 100644 --- a/config.h.in +++ b/config.h.in @@ -37,6 +37,8 @@ #cmakedefine HAVE_LIBUTIL_H 1 +#cmakedefine HAVE_LIBGEN_H 1 + #cmakedefine HAVE_ENDIAN_H 1 #cmakedefine HAVE_SYS_ENDIAN_H 1 #cmakedefine HAVE_MACHINE_ENDIAN_H 1 @@ -192,14 +194,21 @@ #include <math.h> #endif +/* libutil */ #ifdef HAVE_LIBUTIL_H #include <libutil.h> #endif +/* syslog */ #ifdef HAVE_SYSLOG_H #include <syslog.h> #endif +#ifdef HAVE_LIBGEN_H +#include <libgen.h> +#define HAVE_DIRNAME 1 +#endif + #include <errno.h> #include <signal.h> #include <event.h> diff --git a/rspamd.conf.sample b/rspamd.conf.sample index f0e2a9411..0c6ba3189 100644 --- a/rspamd.conf.sample +++ b/rspamd.conf.sample @@ -76,4 +76,14 @@ factors { "winnow" = 5.5; }; +lmtp { + enabled = yes; + bind_socket = localhost:11335; +}; + +delivery { + enabled = yes; + agent = "/dev/null"; +}; + url_filters = "surbl"; diff --git a/src/cfg_file.h b/src/cfg_file.h index a5d092c73..7bacb8aa9 100644 --- a/src/cfg_file.h +++ b/src/cfg_file.h @@ -14,6 +14,7 @@ #define DEFAULT_BIND_PORT 768 #define DEFAULT_CONTROL_PORT 7608 +#define DEFAULT_LMTP_PORT 7609 #define MAX_MEMCACHED_SERVERS 48 #define DEFAULT_MEMCACHED_PORT 11211 /* Memcached timeouts */ @@ -39,6 +40,16 @@ struct classifier; enum { VAL_UNDEF=0, VAL_TRUE, VAL_FALSE }; /** + * Types of rspamd bind lines + */ +enum rspamd_cred_type { + CRED_NORMAL, + CRED_CONTROL, + CRED_LMTP, + CRED_DELIVERY, +}; + +/** * Regexp type: /H - header, /M - mime, /U - url */ enum rspamd_regexp_type { @@ -161,6 +172,21 @@ struct config_file { unsigned int memcached_maxerrors; /**< maximum number of errors */ unsigned int memcached_connect_timeout; /**< connection timeout */ + gboolean lmtp_enable; /**< is lmtp agent is enabled */ + char *lmtp_host; /**< host for lmtp agent */ + struct in_addr lmtp_addr; /**< bind address for lmtp */ + uint16_t lmtp_port; /**< bind port for lmtp agent */ + uint16_t lmtp_family; /**< bind family for lmtp agent */ + char *lmtp_metric; /**< metric to use in lmtp module */ + + gboolean delivery_enable; /**< is delivery agent is enabled */ + char *deliver_host; /**< host for mail deliviring */ + struct in_addr deliver_addr; /**< its address */ + uint16_t deliver_port; /**< port for deliviring */ + uint16_t deliver_family; /**< socket family for delivirnig */ + char *deliver_agent_path; /**< deliver to pipe instead of socket */ + gboolean deliver_lmtp; /**< use LMTP instead of SMTP */ + LIST_HEAD (modulesq, perl_module) perl_modules; /**< linked list of perl modules to load */ LIST_HEAD (headersq, filter) header_filters; /**< linked list of all header's filters */ @@ -193,10 +219,10 @@ int add_memcached_server (struct config_file *cf, char *str); * Parse bind credits * @param cf config file to use * @param str line that presents bind line - * @param is_control flag that defines whether this credits are for controller + * @param type type of credits * @return 1 if line was successfully parsed and 0 in case of error */ -int parse_bind_line (struct config_file *cf, char *str, char is_control); +int parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type); /** * Init default values diff --git a/src/cfg_file.l b/src/cfg_file.l index 1845166b2..d2810c5bb 100644 --- a/src/cfg_file.l +++ b/src/cfg_file.l @@ -47,6 +47,10 @@ required_score return REQUIRED_SCORE; function return FUNCTION; control return CONTROL; password return PASSWORD; +lmtp return LMTP; +enabled return ENABLED; +delivery return DELIVERY; +agent return AGENT; statfile return STATFILE; alias return ALIAS; diff --git a/src/cfg_file.y b/src/cfg_file.y index 73455976d..6667d01a1 100644 --- a/src/cfg_file.y +++ b/src/cfg_file.y @@ -43,6 +43,7 @@ struct statfile *cur_statfile = NULL; %token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE %token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME %token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE TOKENIZER CLASSIFIER +%token DELIVERY LMTP ENABLED AGENT %type <string> STRING %type <string> VARIABLE @@ -84,6 +85,8 @@ command : | logging | statfile | statfile_pool_size + | lmtp + | delivery ; tempdir : @@ -125,7 +128,7 @@ controlcmd: controlsock: BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3, 1)) { + if (!parse_bind_line (cfg, $3, CRED_CONTROL)) { yyerror ("yyparse: parse_bind_line"); YYERROR; } @@ -141,7 +144,7 @@ controlpassword: bindsock: BINDSOCK EQSIGN bind_cred { - if (!parse_bind_line (cfg, $3, 0)) { + if (!parse_bind_line (cfg, $3, CRED_NORMAL)) { yyerror ("yyparse: parse_bind_line"); YYERROR; } @@ -659,6 +662,85 @@ statfile_pool_size: cfg->max_statfile_size = $3; } ; + +lmtp: + LMTP OBRACE lmtpbody EBRACE + ; + +lmtpbody: + lmtpcmd SEMICOLON + | lmtpbody lmtpcmd SEMICOLON + ; + +lmtpcmd: + lmtpenabled + | lmtpsock + | lmtpmetric + ; + +lmtpenabled: + ENABLED EQSIGN FLAG { + cfg->lmtp_enable = $3; + } + ; + +lmtpsock: + BINDSOCK EQSIGN bind_cred { + if (!parse_bind_line (cfg, $3, CRED_LMTP)) { + yyerror ("yyparse: parse_bind_line"); + YYERROR; + } + free ($3); + } + ; +lmtpmetric: + METRIC EQSIGN QUOTEDSTRING { + cfg->lmtp_metric = memory_pool_strdup (cfg->cfg_pool, $3); + } + ; + +delivery: + DELIVERY OBRACE deliverybody EBRACE + ; + +deliverybody: + deliverycmd SEMICOLON + | deliverybody deliverycmd SEMICOLON + ; + +deliverycmd: + deliveryenabled + | deliverysock + | deliveryagent + | deliverylmtp + ; + +deliveryenabled: + ENABLED EQSIGN FLAG { + cfg->delivery_enable = $3; + } + ; + +deliverysock: + BINDSOCK EQSIGN bind_cred { + if (!parse_bind_line (cfg, $3, CRED_DELIVERY)) { + yyerror ("yyparse: parse_bind_line"); + YYERROR; + } + free ($3); + } + ; +deliverylmtp: + LMTP EQSIGN FLAG { + cfg->deliver_lmtp = $3; + } + ; +deliveryagent: + AGENT EQSIGN QUOTEDSTRING { + cfg->deliver_agent_path = memory_pool_strdup (cfg->cfg_pool, $3); + } + ; + %% /* * vi:ts=4 diff --git a/src/cfg_utils.c b/src/cfg_utils.c index df8a2265f..2785e18d9 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -81,85 +81,107 @@ add_memcached_server (struct config_file *cf, char *str) } int -parse_bind_line (struct config_file *cf, char *str, char is_control) +parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type) { char *cur_tok, *err_str; struct hostent *hent; size_t s; + char **host; + int16_t *family, *port; + struct in_addr *addr; if (str == NULL) return 0; cur_tok = strsep (&str, ":"); + + switch (type) { + case CRED_NORMAL: + host = &cf->bind_host; + port = &cf->bind_port; + *port = DEFAULT_BIND_PORT; + family = &cf->bind_family; + addr = &cf->bind_addr; + break; + case CRED_CONTROL: + host = &cf->control_host; + port = &cf->control_port; + *port = DEFAULT_CONTROL_PORT; + family = &cf->control_family; + addr = &cf->control_addr; + break; + case CRED_LMTP: + host = &cf->lmtp_host; + port = &cf->lmtp_port; + *port = DEFAULT_LMTP_PORT; + family = &cf->lmtp_family; + addr = &cf->lmtp_addr; + break; + case CRED_DELIVERY: + host = &cf->deliver_host; + port = &cf->deliver_port; + *port = 25; + family = &cf->deliver_family; + addr = &cf->deliver_addr; + break; + } if (cur_tok[0] == '/' || cur_tok[0] == '.') { - if (is_control) { - cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok); - cf->control_family = AF_UNIX; - } - else { - cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); - cf->bind_family = AF_UNIX; - } - return 1; - - } else { - if (str == '\0') { - if (is_control) { - cf->control_port = DEFAULT_CONTROL_PORT; +#ifdef HAVE_DIRNAME + /* Try to check path of bind credit */ + struct stat st; + int fd; + char *copy = memory_pool_strdup (cf->cfg_pool, cur_tok); + if (stat (copy, &st) == -1) { + if (errno == ENOENT) { + if ((fd = open (cur_tok, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { + yyerror ("parse_bind_line: cannot open path %s for making socket, %m", cur_tok); + return 0; + } + else { + close (fd); + unlink (cur_tok); + } } else { - cf->bind_port = DEFAULT_BIND_PORT; + yyerror ("parse_bind_line: cannot stat path %s for making socket, %m", cur_tok); + return 0; } } else { - if (is_control) { - cf->control_port = (uint16_t)strtoul (str, &err_str, 10); - } - else { - cf->bind_port = (uint16_t)strtoul (str, &err_str, 10); + if (unlink (cur_tok) == -1) { + yyerror ("parse_bind_line: cannot remove path %s for making socket, %m", cur_tok); + return 0; } + } +#endif + *host = memory_pool_strdup (cf->cfg_pool, cur_tok); + *family = AF_UNIX; + return 1; + + } else { + if (*str != '\0') { + *port = (uint16_t)strtoul (str, &err_str, 10); if (*err_str != '\0') { + yyerror ("parse_bind_line: cannot read numeric value: %s", err_str); return 0; } } - if (is_control) { - if (!inet_aton (cur_tok, &cf->control_addr)) { - /* Try to call gethostbyname */ - hent = gethostbyname (cur_tok); - if (hent == NULL) { - return 0; - } - else { - cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok); - memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr)); - s = strlen (cur_tok) + 1; - } + if (!inet_aton (cur_tok, addr)) { + /* Try to call gethostbyname */ + hent = gethostbyname (cur_tok); + if (hent == NULL) { + return 0; } else { - cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok); + *host = memory_pool_strdup (cf->cfg_pool, cur_tok); + memcpy((char *)addr, hent->h_addr, sizeof(struct in_addr)); + s = strlen (cur_tok) + 1; } - - cf->control_family = AF_INET; } else { - if (!inet_aton (cur_tok, &cf->bind_addr)) { - /* Try to call gethostbyname */ - hent = gethostbyname (cur_tok); - if (hent == NULL) { - return 0; - } - else { - cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); - memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr)); - s = strlen (cur_tok) + 1; - } - } - else { - cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok); - } - - cf->bind_family = AF_INET; + *host = memory_pool_strdup (cf->cfg_pool, cur_tok); } + *family = AF_INET; return 1; } @@ -191,6 +213,7 @@ init_defaults (struct config_file *cfg) cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal); cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal); cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal); + cfg->lmtp_metric = "default"; def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric)); def_metric->name = "default"; @@ -512,6 +535,11 @@ fill_cfg_params (struct config_file *cfg) void post_load_config (struct config_file *cfg) { + if (cfg->lmtp_enable && !cfg->delivery_enable) { + yywarn ("post_load_config: lmtp is enabled, but delivery is not enabled, disabling lmtp"); + cfg->lmtp_enable = FALSE; + } + g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg); g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg); parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER); diff --git a/src/controller.c b/src/controller.c index 8128d7356..2b1b5099e 100644 --- a/src/controller.c +++ b/src/controller.c @@ -534,7 +534,7 @@ start_controller (struct rspamd_worker *worker) } else { un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un)); - if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) { + if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->control_host, un_addr)) == -1) { msg_err ("start_controller: cannot create unix listen socket. %m"); exit(-errno); } diff --git a/src/fstring.c b/src/fstring.c index 75be14442..82d0b095e 100644 --- a/src/fstring.c +++ b/src/fstring.c @@ -92,6 +92,37 @@ fstrstr (f_str_t *orig, f_str_t *pattern) } /* + * Search for pattern in orig ignoring case + */ +ssize_t +fstrstri (f_str_t *orig, f_str_t *pattern) +{ + register ssize_t cur = 0, pcur = 0; + + if (pattern->len > orig->len) { + return -1; + } + + while (cur < orig->len) { + if (tolower (*(orig->begin + cur)) == tolower (*pattern->begin)) { + while (cur < orig->len && pcur < pattern->len) { + if (tolower (*(orig->begin + cur)) != tolower (*(pattern->begin + pcur))) { + pcur = 0; + break; + } + cur ++; + pcur ++; + } + return cur - pattern->len; + } + cur ++; + } + + return -1; + +} + +/* * Split string by tokens * word contains parsed word * diff --git a/src/fstring.h b/src/fstring.h index 000ba74c6..f5d7fffa3 100644 --- a/src/fstring.h +++ b/src/fstring.h @@ -43,6 +43,11 @@ ssize_t fstrrchr (f_str_t *src, char c); ssize_t fstrstr (f_str_t *orig, f_str_t *pattern); /* + * Search for pattern in orig ignoring case + */ +ssize_t fstrstri (f_str_t *orig, f_str_t *pattern); + +/* * Split string by tokens * word contains parsed word */ @@ -88,7 +93,6 @@ f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen); */ uint32_t fstrhash (f_str_t *str); - /* * Make copy of string to 0-terminated string */ diff --git a/src/lmtp.c b/src/lmtp.c new file mode 100644 index 000000000..ba03cd93d --- /dev/null +++ b/src/lmtp.c @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "buffer.h" +#include "main.h" +#include "lmtp.h" +#include "lmtp_proto.h" +#include "cfg_file.h" +#include "url.h" +#include "modules.h" +#include "message.h" + +static char greetingbuf[1024]; +static struct timeval io_tv; + +static void write_socket (void *arg); + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: + _exit (1); + break; + } +} + +/* + * Config reload is designed by sending sigusr to active workers and pending shutdown of them + */ +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + do_reopen_log = 1; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +/* + * Destructor for recipients list + */ +static void +rcpt_destruct (void *pointer) +{ + struct worker_task *task = (struct worker_task *)pointer; + + if (task->rcpt) { + g_list_free (task->rcpt); + } +} + +/* + * Free all structures of lmtp proto + */ +static void +free_task (struct rspamd_lmtp_proto *lmtp) +{ + GList *part; + struct mime_part *p; + + if (lmtp) { + msg_debug ("free_task: free pointer %p", lmtp->task); + if (lmtp->task->memc_ctx) { + memc_close_ctx (lmtp->task->memc_ctx); + } + while ((part = g_list_first (lmtp->task->parts))) { + lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part); + p = (struct mime_part *)part->data; + g_byte_array_free (p->content, FALSE); + g_list_free_1 (part); + } + memory_pool_delete (lmtp->task->task_pool); + /* Plan dispatcher shutdown */ + lmtp->task->dispatcher->wanna_die = 1; + close (lmtp->task->sock); + g_free (lmtp->task); + g_free (lmtp); + } +} + +/* + * Callback that is called when there is data to read in buffer + */ +static void +read_socket (f_str_t *in, void *arg) +{ + struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; + struct worker_task *task = lmtp->task; + ssize_t r; + + switch (task->state) { + case READ_COMMAND: + case READ_HEADER: + if (read_lmtp_input_line (lmtp, in) != 0) { + msg_info ("read_lmtp_socket: closing lmtp connection due to protocol error"); + lmtp->task->state = CLOSING_CONNECTION; + } + /* Task was read, recall read handler once more with new state to process message and write reply */ + if (task->state == READ_MESSAGE) { + read_socket (in, arg); + } + break; + case READ_MESSAGE: + r = process_message (lmtp->task); + r = process_filters (lmtp->task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = LMTP_FAILURE; + task->state = WRITE_ERROR; + write_socket (lmtp); + } + else if (r == 0) { + task->state = WAIT_FILTER; + rspamd_dispatcher_pause (lmtp->task->dispatcher); + } + else { + process_statfiles (lmtp->task); + task->state = WRITE_REPLY; + write_socket (lmtp); + } + break; + } +} + +/* + * Callback for socket writing + */ +static void +write_socket (void *arg) +{ + struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; + + switch (lmtp->task->state) { + case WRITE_REPLY: + write_lmtp_reply (lmtp); + lmtp->task->state = CLOSING_CONNECTION; + break; + case WRITE_ERROR: + write_lmtp_reply (lmtp); + lmtp->task->state = CLOSING_CONNECTION; + break; + case CLOSING_CONNECTION: + msg_debug ("lmtp_write_socket: normally closing connection"); + free_task (lmtp); + break; + } +} + +/* + * Called if something goes wrong + */ +static void +err_socket (GError *err, void *arg) +{ + struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; + msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message); + /* Free buffers */ + free_task (lmtp); +} + +/* + * Accept new connection and construct task + */ +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct worker_task *new_task; + struct rspamd_lmtp_proto *lmtp; + socklen_t addrlen = sizeof(ss); + int nfd, on = 1; + struct linger linger; + + if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + return; + } + if (event_make_socket_nonblocking(fd) < 0) { + return; + } + + /* Socket options */ + setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); + setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); + linger.l_onoff = 1; + linger.l_linger = 2; + setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)); + + lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto)); + new_task = g_malloc (sizeof (struct worker_task)); + bzero (new_task, sizeof (struct worker_task)); + new_task->worker = worker; + new_task->state = READ_COMMAND; + new_task->sock = nfd; + new_task->cfg = worker->srv->cfg; + TAILQ_INIT (&new_task->urls); + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + /* Add destructor for recipients list (it would be better to use anonymous function here */ + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task); + new_task->results = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results); + worker->srv->stat->connections_count ++; + lmtp->task = new_task; + lmtp->state = LMTP_READ_LHLO; + + /* Set up dispatcher */ + new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, + write_socket, err_socket, &io_tv, + (void *)lmtp); + rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE); +} + +/* + * Start lmtp worker process + */ +void +start_lmtp_worker (struct rspamd_worker *worker) +{ + struct sigaction signals; + int listen_sock, i; + struct sockaddr_un *un_addr; + char *hostbuf; + long int hostmax; + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_LMTP; + event_init (); + g_mime_init (0); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + /* Create listen socket */ + if (worker->srv->cfg->lmtp_family == AF_INET) { + if ((listen_sock = make_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port)) == -1) { + msg_err ("start_lmtp: cannot create tcp listen socket. %m"); + exit(-errno); + } + } + else { + un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un)); + if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr)) == -1) { + msg_err ("start_lmtp: cannot create unix listen socket. %m"); + exit(-errno); + } + } + + if (listen (listen_sock, -1) == -1) { + msg_err ("start_lmtp: cannot listen on socket. %m"); + exit(-errno); + } + /* Accept event */ + event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Perform modules configuring */ + for (i = 0; i < MODULES_NUM; i ++) { + modules[i].module_config_func (worker->srv->cfg); + } + + /* Fill hostname buf */ + hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; + hostbuf = alloca (hostmax); + gethostname (hostbuf, hostmax); + hostbuf[hostmax - 1] = '\0'; + snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf); + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + io_tv.tv_sec = WORKER_IO_TIMEOUT; + io_tv.tv_usec = 0; + + event_loop (0); +} + +/* + * vi:ts=4 + */ diff --git a/src/lmtp.h b/src/lmtp.h new file mode 100644 index 000000000..d7c13c497 --- /dev/null +++ b/src/lmtp.h @@ -0,0 +1,20 @@ +#ifndef RSPAMD_LMTP_H +#define RSPAMD_LMTP_H + +#include "config.h" +#include "main.h" + +#define LMTP_GREETING 220 +#define LMTP_QUIT 221 +#define LMTP_OK 250 +#define LMTP_DATA 354 +#define LMTP_ERROR_PROCESS 500 +#define LMTP_FAILURE 530 +#define LMTP_AUTH_ERROR 503 +#define LMTP_BAD_CMD 503 +#define LMTP_NO_RCPT 554 +#define LMTP_TEMP_FAIL 421 + +void start_lmtp_worker (struct rspamd_worker *worker); + +#endif diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c new file mode 100644 index 000000000..df53f69e3 --- /dev/null +++ b/src/lmtp_proto.c @@ -0,0 +1,380 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "cfg_file.h" +#include "lmtp.h" +#include "lmtp_proto.h" + +/* Max line size as it is defined in rfc2822 */ +#define OUTBUFSIZ 1000 + +/* LMTP commands */ +static f_str_t lhlo_command = { + .begin = "LHLO", + .len = sizeof ("LHLO") - 1 +}; +static f_str_t mail_command = { + .begin = "MAIL FROM:", + .len = sizeof ("MAIL FROM:") - 1 +}; +static f_str_t rcpt_command = { + .begin = "RCPT TO:", + .len = sizeof ("RCPT TO:") - 1 +}; +static f_str_t data_command = { + .begin = "DATA", + .len = sizeof ("DATA") - 1 +}; +static f_str_t data_dot = { + .begin = ".\r\n", + .len = sizeof (".\r\n") - 1 +}; + +static void +out_lmtp_reply (struct rspamd_lmtp_proto *lmtp, int code, char *rcode, char *msg) +{ + char outbuf[OUTBUFSIZ]; + int r; + + if (*rcode == '\0') { + r = snprintf (outbuf, OUTBUFSIZ, "%d %s\r\n", code, msg); + } + else { + r = snprintf (outbuf, OUTBUFSIZ, "%d %s %s\r\n", code, rcode, msg); + } + rspamd_dispatcher_write (lmtp->task->dispatcher, outbuf, r, FALSE); +} + +int +read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line) +{ + char *c, *rcpt; + unsigned int i = 0, l = 0, size; + + switch (lmtp->state) { + case LMTP_READ_LHLO: + /* Search LHLO line */ + if ((i = fstrstri (line, &lhlo_command)) == -1) { + msg_info ("read_lmtp_input_line: LHLO expected but not found"); + out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need LHLO here"); + return -1; + } + else { + i += lhlo_command.len; + c = line->begin + i; + /* Skip spaces */ + while (isspace (*c) && i < line->len) { + i ++; + c ++; + } + lmtp->task->helo = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1); + /* Strlcpy makes string null terminated by design */ + g_strlcpy (lmtp->task->helo, c, line->len - i + 1); + lmtp->state = LMTP_READ_FROM; + out_lmtp_reply (lmtp, LMTP_OK, "", "Ok"); + return 0; + } + break; + case LMTP_READ_FROM: + /* Search MAIL FROM: line */ + if ((i = fstrstri (line, &mail_command)) == -1) { + msg_info ("read_lmtp_input_line: MAIL expected but not found"); + out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need MAIL here"); + return -1; + } + else { + i += mail_command.len; + c = line->begin + i; + /* Get data from brackets (<>)*/ + while (*c++ != '<' && i < line->len) { + i ++; + } + while (*c != '>' && i < line->len) { + l ++; + c ++; + i ++; + } + + lmtp->task->from = memory_pool_alloc (lmtp->task->task_pool, l + 1); + /* Strlcpy makes string null terminated by design */ + g_strlcpy (lmtp->task->from, c - l, l + 1); + lmtp->state = LMTP_READ_RCPT; + out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Sender ok"); + return 0; + } + break; + case LMTP_READ_RCPT: + /* Search RCPT_TO: line */ + if ((i = fstrstri (line, &rcpt_command)) == -1) { + msg_info ("read_lmtp_input_line: RCPT expected but not found"); + out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need RCPT here"); + return -1; + } + else { + i += rcpt_command.len; + c = line->begin + i; + /* Get data from brackets (<>)*/ + while (*c++ != '<' && i < line->len) { + i ++; + } + while (*c != '>' && i < line->len) { + l ++; + c ++; + i ++; + } + rcpt = memory_pool_alloc (lmtp->task->task_pool, l + 1); + /* Strlcpy makes string null terminated by design */ + g_strlcpy (rcpt, c - l, l + 1); + lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt); + lmtp->state = LMTP_READ_DATA; + out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Recipient ok"); + return 0; + } + break; + case LMTP_READ_DATA: + /* Search DATA line */ + if ((i = fstrstri (line, &data_command)) == -1) { + msg_info ("read_lmtp_input_line: DATA expected but not found"); + out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need DATA here"); + return -1; + } + else { + i += rcpt_command.len; + c = line->begin + i; + /* Skip spaces */ + while (isspace (*c++)) { + i ++; + } + rcpt = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1); + /* Strlcpy makes string null terminated by design */ + g_strlcpy (rcpt, c, line->len - i + 1); + lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt); + lmtp->state = LMTP_READ_MESSAGE; + out_lmtp_reply (lmtp, LMTP_DATA, "", "Enter message, ending with \".\" on a line by itself"); + lmtp->task->msg = fstralloc (lmtp->task->task_pool, BUFSIZ); + return 0; + } + break; + case LMTP_READ_MESSAGE: + if (strncmp (line->begin, data_dot.begin, line->len) == 0) { + lmtp->state = LMTP_READ_DOT; + lmtp->task->state = READ_MESSAGE; + return 0; + } + else { + l = lmtp->task->msg->len; + size = lmtp->task->msg->size; + if (l + line->len > size) { + /* Grow buffer */ + if (line->len > size) { + size += line->len << 1; + } + else { + /* size *= 2 */ + size <<= 1; + } + lmtp->task->msg = fstrgrow (lmtp->task->task_pool, lmtp->task->msg, size); + } + fstrcat (lmtp->task->msg, line); + return 0; + } + break; + case LMTP_READ_DOT: + /* We have some input after reading dot, close connection as we have no currently support of multiply + * messages per session + */ + out_lmtp_reply (lmtp, LMTP_QUIT, "", "Bye"); + return 0; + break; + } +} + +static char* +format_lda_args (struct worker_task *task) +{ + char *arg, *res, *c, *r; + size_t len; + GList *rcpt; + gboolean got_args = FALSE; + + c = task->cfg->deliver_agent_path; + /* Find first arg */ + if ((c = strchr (c, ' ')) == NULL) { + return task->cfg->deliver_agent_path; + } + + /* Calculate length of result string */ + len = strlen (task->cfg->deliver_agent_path); + while (*c) { + if (*c == '%') { + c++; + switch (*c) { + case 'f': + /* Insert from */ + len += strlen (task->from) - 2; + break; + case 'r': + /* Insert list of recipients */ + rcpt = g_list_first (task->rcpt); + len -= 2; + while (rcpt) { + len += strlen ((char *)rcpt->data) + 1; + rcpt = g_list_next (rcpt); + } + break; + } + } + c ++; + len ++; + } + res = memory_pool_alloc (task->task_pool, len + 1); + r = res; + c = task->cfg->deliver_agent_path; + + while (*c) { + if (*c == ' ') { + got_args = TRUE; + } + if (got_args && *c == '%') { + switch (*(c + 1)) { + case 'f': + /* Insert from */ + c += 2; + len = strlen (task->from); + memcpy (r, task->from, len); + r += len; + break; + case 'r': + /* Insert list of recipients */ + c += 2; + rcpt = g_list_first (task->rcpt); + while (rcpt) { + len = strlen ((char *)rcpt->data) + 1; + memcpy (r, rcpt->data, len); + r += len; + *r++ = ' '; + rcpt = g_list_next (rcpt); + } + break; + default: + *r = *c; + r ++; + c ++; + break; + } + } + else { + *r = *c; + r ++; + c ++; + } + } + + return res; +} + +static int +lmtp_deliver_lda (struct worker_task *task) +{ + char *args; + FILE *lda; + GMimeStream *stream; + int rc, ecode; + + if ((args = format_lda_args (task)) == NULL) { + return -1; + } + + lda = popen (args, "w"); + if (lda == NULL) { + msg_info ("lmtp_deliver_lda: cannot deliver to lda, %m"); + return -1; + } + + stream = g_mime_stream_file_new (lda); + + if (g_mime_object_write_to_stream ((GMimeObject *)task->message, stream) == -1) { + msg_info ("lmtp_deliver_lda: cannot write stream to lda"); + return -1; + } + + rc = pclose (lda); + if (rc == -1) { + msg_info ("lmtp_deliver_lda: lda returned error code"); + return -1; + } + else if (WIFEXITED (rc)) { + ecode = WEXITSTATUS (rc); + if (ecode == 0) { + return 0; + } + else { + msg_info ("lmtp_deliver_lda: lda returned error code %d", ecode); + return -1; + } + } +} + +int +lmtp_deliver_message (struct worker_task *task) +{ + if (task->cfg->deliver_agent_path != NULL) { + /* Do deliver to LDA */ + return lmtp_deliver_lda (task); + } + else { + /* XXX: do lmtp/smtp client */ + return -1; + } +} + +int +write_lmtp_reply (struct rspamd_lmtp_proto *lmtp) +{ + int r; + char outbuf[OUTBUFSIZ]; + + msg_debug ("write_lmtp_reply: writing reply to client"); + if (lmtp->task->error_code != 0) { + out_lmtp_reply (lmtp, lmtp->task->error_code, "", lmtp->task->last_error); + } + else { + /* Do delivery */ + if (lmtp_deliver_message (lmtp->task) == -1) { + out_lmtp_reply (lmtp, LMTP_FAILURE, "", "Delivery failure"); + return -1; + } + else { + out_lmtp_reply (lmtp, LMTP_OK, "", "Delivery completed"); + } + } + + return 0; +} + +/* + * vi:ts=4 + */ diff --git a/src/lmtp_proto.h b/src/lmtp_proto.h new file mode 100644 index 000000000..24cba2c5e --- /dev/null +++ b/src/lmtp_proto.h @@ -0,0 +1,44 @@ +#ifndef RSPAMD_LMTP_PROTO_H +#define RSPAMD_LMTP_PROTO_H + +#include "config.h" + +struct worker_task; + +enum lmtp_state { + LMTP_READ_LHLO, + LMTP_READ_FROM, + LMTP_READ_RCPT, + LMTP_READ_DATA, + LMTP_READ_MESSAGE, + LMTP_READ_DOT, +}; + +struct rspamd_lmtp_proto { + struct worker_task *task; + enum lmtp_state state; +}; + +/** + * Read one line of user's input for specified task + * @param lmtp lmtp object + * @param line line of user's input + * @return 0 if line was successfully parsed and -1 if we have protocol error + */ +int read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line); + +/** + * Deliver message via lmtp/smtp or pipe to LDA + * @param task task object + * @return 0 if we wrote message and -1 if there was some error + */ +int lmtp_deliver_message (struct worker_task *task); + +/** + * Write reply for specified lmtp object + * @param lmtp lmtp object + * @return 0 if we wrote reply and -1 if there was some error + */ +int write_lmtp_reply (struct rspamd_lmtp_proto *lmtp); + +#endif diff --git a/src/main.c b/src/main.c index 782f0acff..6a170726a 100644 --- a/src/main.c +++ b/src/main.c @@ -27,6 +27,7 @@ #include "cfg_file.h" #include "util.h" #include "perl.h" +#include "lmtp.h" /* 2 seconds to fork new process in place of dead one */ #define SOFT_FORK_TIME 2 @@ -178,6 +179,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro msg_info ("fork_worker: starting controller process %d", getpid ()); start_controller (cur); break; + case TYPE_LMTP: + setproctitle ("lmtp process"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting lmtp process %d", getpid ()); + start_lmtp_worker (cur); case TYPE_WORKER: default: setproctitle ("worker process"); @@ -368,6 +374,11 @@ main (int argc, char **argv, char **env) if (cfg->controller_enabled) { fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER); } + + /* Start lmtp if enabled */ + if (cfg->lmtp_enable) { + fork_worker (rspamd, listen_sock, 0, TYPE_LMTP); + } /* Signal processing cycle */ for (;;) { @@ -394,17 +405,17 @@ main (int argc, char **argv, char **env) if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { /* Normal worker termination, do not fork one more */ msg_info ("main: %s process %d terminated normally", - (cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid); + (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); } else { if (WIFSIGNALED (res)) { msg_warn ("main: %s process %d terminated abnormally by signal: %d", - (cur->type == TYPE_WORKER) ? "worker" : "controller", + (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid, WTERMSIG(res)); } else { msg_warn ("main: %s process %d terminated abnormally", - (cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid); + (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); } /* Fork another worker in replace of dead one */ delay_fork (cur->type); diff --git a/src/main.h b/src/main.h index ec74ad03b..270817a37 100644 --- a/src/main.h +++ b/src/main.h @@ -22,6 +22,8 @@ #define SOFT_SHUTDOWN_TIME 60 /* Default metric name */ #define DEFAULT_METRIC "default" +/* 60 seconds for worker's IO */ +#define WORKER_IO_TIMEOUT 60 /* Logging in postfix style */ #define msg_err g_critical @@ -36,6 +38,7 @@ enum process_type { TYPE_MAIN, TYPE_WORKER, TYPE_CONTROLLER, + TYPE_LMTP, }; /** diff --git a/src/worker.c b/src/worker.c index f3bbac8ce..334e819cb 100644 --- a/src/worker.c +++ b/src/worker.c @@ -40,8 +40,6 @@ #include <perl.h> /* from the Perl distribution */ #define TASK_POOL_SIZE 4095 -/* 60 seconds for worker's IO */ -#define WORKER_IO_TIMEOUT 60 const f_str_t CRLF = { /* begin */"\r\n", |