summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt6
-rw-r--r--config.h.in9
-rw-r--r--rspamd.conf.sample10
-rw-r--r--src/cfg_file.h30
-rw-r--r--src/cfg_file.l4
-rw-r--r--src/cfg_file.y86
-rw-r--r--src/cfg_utils.c134
-rw-r--r--src/controller.c2
-rw-r--r--src/fstring.c31
-rw-r--r--src/fstring.h6
-rw-r--r--src/lmtp.c314
-rw-r--r--src/lmtp.h20
-rw-r--r--src/lmtp_proto.c380
-rw-r--r--src/lmtp_proto.h44
-rw-r--r--src/main.c17
-rw-r--r--src/main.h3
-rw-r--r--src/worker.c2
17 files changed, 1032 insertions, 66 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8fedd0bff..e11320de1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -145,7 +145,7 @@ CHECK_INCLUDE_FILES(netinet/in.h HAVE_NETINET_IN_H)
CHECK_INCLUDE_FILES(arpa/inet.h HAVE_ARPA_INET_H)
CHECK_INCLUDE_FILES(netdb.h HAVE_NETDB_H)
CHECK_INCLUDE_FILES(syslog.h HAVE_SYSLOG_H)
-
+CHECK_INCLUDE_FILES(libgen.h HAVE_LIBGEN_H)
CHECK_FUNCTION_EXISTS(setproctitle HAVE_SETPROCTITLE)
CHECK_FUNCTION_EXISTS(getpagesize HAVE_GETPAGESIZE)
@@ -212,7 +212,9 @@ SET(RSPAMDSRC src/modules.c
src/filter.c
src/controller.c
src/cfg_utils.c
- src/buffer.c)
+ src/buffer.c
+ src/lmtp.c
+ src/lmtp_proto.c)
SET(TOKENIZERSSRC src/tokenizers/tokenizers.c
src/tokenizers/osb.c)
diff --git a/config.h.in b/config.h.in
index 04dd043d9..b77d31454 100644
--- a/config.h.in
+++ b/config.h.in
@@ -37,6 +37,8 @@
#cmakedefine HAVE_LIBUTIL_H 1
+#cmakedefine HAVE_LIBGEN_H 1
+
#cmakedefine HAVE_ENDIAN_H 1
#cmakedefine HAVE_SYS_ENDIAN_H 1
#cmakedefine HAVE_MACHINE_ENDIAN_H 1
@@ -192,14 +194,21 @@
#include <math.h>
#endif
+/* libutil */
#ifdef HAVE_LIBUTIL_H
#include <libutil.h>
#endif
+/* syslog */
#ifdef HAVE_SYSLOG_H
#include <syslog.h>
#endif
+#ifdef HAVE_LIBGEN_H
+#include <libgen.h>
+#define HAVE_DIRNAME 1
+#endif
+
#include <errno.h>
#include <signal.h>
#include <event.h>
diff --git a/rspamd.conf.sample b/rspamd.conf.sample
index f0e2a9411..0c6ba3189 100644
--- a/rspamd.conf.sample
+++ b/rspamd.conf.sample
@@ -76,4 +76,14 @@ factors {
"winnow" = 5.5;
};
+lmtp {
+ enabled = yes;
+ bind_socket = localhost:11335;
+};
+
+delivery {
+ enabled = yes;
+ agent = "/dev/null";
+};
+
url_filters = "surbl";
diff --git a/src/cfg_file.h b/src/cfg_file.h
index a5d092c73..7bacb8aa9 100644
--- a/src/cfg_file.h
+++ b/src/cfg_file.h
@@ -14,6 +14,7 @@
#define DEFAULT_BIND_PORT 768
#define DEFAULT_CONTROL_PORT 7608
+#define DEFAULT_LMTP_PORT 7609
#define MAX_MEMCACHED_SERVERS 48
#define DEFAULT_MEMCACHED_PORT 11211
/* Memcached timeouts */
@@ -39,6 +40,16 @@ struct classifier;
enum { VAL_UNDEF=0, VAL_TRUE, VAL_FALSE };
/**
+ * Types of rspamd bind lines
+ */
+enum rspamd_cred_type {
+ CRED_NORMAL,
+ CRED_CONTROL,
+ CRED_LMTP,
+ CRED_DELIVERY,
+};
+
+/**
* Regexp type: /H - header, /M - mime, /U - url
*/
enum rspamd_regexp_type {
@@ -161,6 +172,21 @@ struct config_file {
unsigned int memcached_maxerrors; /**< maximum number of errors */
unsigned int memcached_connect_timeout; /**< connection timeout */
+ gboolean lmtp_enable; /**< is lmtp agent is enabled */
+ char *lmtp_host; /**< host for lmtp agent */
+ struct in_addr lmtp_addr; /**< bind address for lmtp */
+ uint16_t lmtp_port; /**< bind port for lmtp agent */
+ uint16_t lmtp_family; /**< bind family for lmtp agent */
+ char *lmtp_metric; /**< metric to use in lmtp module */
+
+ gboolean delivery_enable; /**< is delivery agent is enabled */
+ char *deliver_host; /**< host for mail deliviring */
+ struct in_addr deliver_addr; /**< its address */
+ uint16_t deliver_port; /**< port for deliviring */
+ uint16_t deliver_family; /**< socket family for delivirnig */
+ char *deliver_agent_path; /**< deliver to pipe instead of socket */
+ gboolean deliver_lmtp; /**< use LMTP instead of SMTP */
+
LIST_HEAD (modulesq, perl_module) perl_modules; /**< linked list of perl modules to load */
LIST_HEAD (headersq, filter) header_filters; /**< linked list of all header's filters */
@@ -193,10 +219,10 @@ int add_memcached_server (struct config_file *cf, char *str);
* Parse bind credits
* @param cf config file to use
* @param str line that presents bind line
- * @param is_control flag that defines whether this credits are for controller
+ * @param type type of credits
* @return 1 if line was successfully parsed and 0 in case of error
*/
-int parse_bind_line (struct config_file *cf, char *str, char is_control);
+int parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type);
/**
* Init default values
diff --git a/src/cfg_file.l b/src/cfg_file.l
index 1845166b2..d2810c5bb 100644
--- a/src/cfg_file.l
+++ b/src/cfg_file.l
@@ -47,6 +47,10 @@ required_score return REQUIRED_SCORE;
function return FUNCTION;
control return CONTROL;
password return PASSWORD;
+lmtp return LMTP;
+enabled return ENABLED;
+delivery return DELIVERY;
+agent return AGENT;
statfile return STATFILE;
alias return ALIAS;
diff --git a/src/cfg_file.y b/src/cfg_file.y
index 73455976d..6667d01a1 100644
--- a/src/cfg_file.y
+++ b/src/cfg_file.y
@@ -43,6 +43,7 @@ struct statfile *cur_statfile = NULL;
%token LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
%token LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
%token STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE TOKENIZER CLASSIFIER
+%token DELIVERY LMTP ENABLED AGENT
%type <string> STRING
%type <string> VARIABLE
@@ -84,6 +85,8 @@ command :
| logging
| statfile
| statfile_pool_size
+ | lmtp
+ | delivery
;
tempdir :
@@ -125,7 +128,7 @@ controlcmd:
controlsock:
BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3, 1)) {
+ if (!parse_bind_line (cfg, $3, CRED_CONTROL)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
@@ -141,7 +144,7 @@ controlpassword:
bindsock:
BINDSOCK EQSIGN bind_cred {
- if (!parse_bind_line (cfg, $3, 0)) {
+ if (!parse_bind_line (cfg, $3, CRED_NORMAL)) {
yyerror ("yyparse: parse_bind_line");
YYERROR;
}
@@ -659,6 +662,85 @@ statfile_pool_size:
cfg->max_statfile_size = $3;
}
;
+
+lmtp:
+ LMTP OBRACE lmtpbody EBRACE
+ ;
+
+lmtpbody:
+ lmtpcmd SEMICOLON
+ | lmtpbody lmtpcmd SEMICOLON
+ ;
+
+lmtpcmd:
+ lmtpenabled
+ | lmtpsock
+ | lmtpmetric
+ ;
+
+lmtpenabled:
+ ENABLED EQSIGN FLAG {
+ cfg->lmtp_enable = $3;
+ }
+ ;
+
+lmtpsock:
+ BINDSOCK EQSIGN bind_cred {
+ if (!parse_bind_line (cfg, $3, CRED_LMTP)) {
+ yyerror ("yyparse: parse_bind_line");
+ YYERROR;
+ }
+ free ($3);
+ }
+ ;
+lmtpmetric:
+ METRIC EQSIGN QUOTEDSTRING {
+ cfg->lmtp_metric = memory_pool_strdup (cfg->cfg_pool, $3);
+ }
+ ;
+
+delivery:
+ DELIVERY OBRACE deliverybody EBRACE
+ ;
+
+deliverybody:
+ deliverycmd SEMICOLON
+ | deliverybody deliverycmd SEMICOLON
+ ;
+
+deliverycmd:
+ deliveryenabled
+ | deliverysock
+ | deliveryagent
+ | deliverylmtp
+ ;
+
+deliveryenabled:
+ ENABLED EQSIGN FLAG {
+ cfg->delivery_enable = $3;
+ }
+ ;
+
+deliverysock:
+ BINDSOCK EQSIGN bind_cred {
+ if (!parse_bind_line (cfg, $3, CRED_DELIVERY)) {
+ yyerror ("yyparse: parse_bind_line");
+ YYERROR;
+ }
+ free ($3);
+ }
+ ;
+deliverylmtp:
+ LMTP EQSIGN FLAG {
+ cfg->deliver_lmtp = $3;
+ }
+ ;
+deliveryagent:
+ AGENT EQSIGN QUOTEDSTRING {
+ cfg->deliver_agent_path = memory_pool_strdup (cfg->cfg_pool, $3);
+ }
+ ;
+
%%
/*
* vi:ts=4
diff --git a/src/cfg_utils.c b/src/cfg_utils.c
index df8a2265f..2785e18d9 100644
--- a/src/cfg_utils.c
+++ b/src/cfg_utils.c
@@ -81,85 +81,107 @@ add_memcached_server (struct config_file *cf, char *str)
}
int
-parse_bind_line (struct config_file *cf, char *str, char is_control)
+parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
{
char *cur_tok, *err_str;
struct hostent *hent;
size_t s;
+ char **host;
+ int16_t *family, *port;
+ struct in_addr *addr;
if (str == NULL) return 0;
cur_tok = strsep (&str, ":");
+
+ switch (type) {
+ case CRED_NORMAL:
+ host = &cf->bind_host;
+ port = &cf->bind_port;
+ *port = DEFAULT_BIND_PORT;
+ family = &cf->bind_family;
+ addr = &cf->bind_addr;
+ break;
+ case CRED_CONTROL:
+ host = &cf->control_host;
+ port = &cf->control_port;
+ *port = DEFAULT_CONTROL_PORT;
+ family = &cf->control_family;
+ addr = &cf->control_addr;
+ break;
+ case CRED_LMTP:
+ host = &cf->lmtp_host;
+ port = &cf->lmtp_port;
+ *port = DEFAULT_LMTP_PORT;
+ family = &cf->lmtp_family;
+ addr = &cf->lmtp_addr;
+ break;
+ case CRED_DELIVERY:
+ host = &cf->deliver_host;
+ port = &cf->deliver_port;
+ *port = 25;
+ family = &cf->deliver_family;
+ addr = &cf->deliver_addr;
+ break;
+ }
if (cur_tok[0] == '/' || cur_tok[0] == '.') {
- if (is_control) {
- cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
- cf->control_family = AF_UNIX;
- }
- else {
- cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
- cf->bind_family = AF_UNIX;
- }
- return 1;
-
- } else {
- if (str == '\0') {
- if (is_control) {
- cf->control_port = DEFAULT_CONTROL_PORT;
+#ifdef HAVE_DIRNAME
+ /* Try to check path of bind credit */
+ struct stat st;
+ int fd;
+ char *copy = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ if (stat (copy, &st) == -1) {
+ if (errno == ENOENT) {
+ if ((fd = open (cur_tok, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
+ yyerror ("parse_bind_line: cannot open path %s for making socket, %m", cur_tok);
+ return 0;
+ }
+ else {
+ close (fd);
+ unlink (cur_tok);
+ }
}
else {
- cf->bind_port = DEFAULT_BIND_PORT;
+ yyerror ("parse_bind_line: cannot stat path %s for making socket, %m", cur_tok);
+ return 0;
}
}
else {
- if (is_control) {
- cf->control_port = (uint16_t)strtoul (str, &err_str, 10);
- }
- else {
- cf->bind_port = (uint16_t)strtoul (str, &err_str, 10);
+ if (unlink (cur_tok) == -1) {
+ yyerror ("parse_bind_line: cannot remove path %s for making socket, %m", cur_tok);
+ return 0;
}
+ }
+#endif
+ *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ *family = AF_UNIX;
+ return 1;
+
+ } else {
+ if (*str != '\0') {
+ *port = (uint16_t)strtoul (str, &err_str, 10);
if (*err_str != '\0') {
+ yyerror ("parse_bind_line: cannot read numeric value: %s", err_str);
return 0;
}
}
- if (is_control) {
- if (!inet_aton (cur_tok, &cf->control_addr)) {
- /* Try to call gethostbyname */
- hent = gethostbyname (cur_tok);
- if (hent == NULL) {
- return 0;
- }
- else {
- cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
- memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr));
- s = strlen (cur_tok) + 1;
- }
+ if (!inet_aton (cur_tok, addr)) {
+ /* Try to call gethostbyname */
+ hent = gethostbyname (cur_tok);
+ if (hent == NULL) {
+ return 0;
}
else {
- cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+ memcpy((char *)addr, hent->h_addr, sizeof(struct in_addr));
+ s = strlen (cur_tok) + 1;
}
-
- cf->control_family = AF_INET;
}
else {
- if (!inet_aton (cur_tok, &cf->bind_addr)) {
- /* Try to call gethostbyname */
- hent = gethostbyname (cur_tok);
- if (hent == NULL) {
- return 0;
- }
- else {
- cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
- memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr));
- s = strlen (cur_tok) + 1;
- }
- }
- else {
- cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
- }
-
- cf->bind_family = AF_INET;
+ *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
}
+ *family = AF_INET;
return 1;
}
@@ -191,6 +213,7 @@ init_defaults (struct config_file *cfg)
cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal);
cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal);
cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal);
+ cfg->lmtp_metric = "default";
def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric));
def_metric->name = "default";
@@ -512,6 +535,11 @@ fill_cfg_params (struct config_file *cfg)
void
post_load_config (struct config_file *cfg)
{
+ if (cfg->lmtp_enable && !cfg->delivery_enable) {
+ yywarn ("post_load_config: lmtp is enabled, but delivery is not enabled, disabling lmtp");
+ cfg->lmtp_enable = FALSE;
+ }
+
g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg);
g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg);
parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER);
diff --git a/src/controller.c b/src/controller.c
index 8128d7356..2b1b5099e 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -534,7 +534,7 @@ start_controller (struct rspamd_worker *worker)
}
else {
un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
- if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) {
+ if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->control_host, un_addr)) == -1) {
msg_err ("start_controller: cannot create unix listen socket. %m");
exit(-errno);
}
diff --git a/src/fstring.c b/src/fstring.c
index 75be14442..82d0b095e 100644
--- a/src/fstring.c
+++ b/src/fstring.c
@@ -92,6 +92,37 @@ fstrstr (f_str_t *orig, f_str_t *pattern)
}
/*
+ * Search for pattern in orig ignoring case
+ */
+ssize_t
+fstrstri (f_str_t *orig, f_str_t *pattern)
+{
+ register ssize_t cur = 0, pcur = 0;
+
+ if (pattern->len > orig->len) {
+ return -1;
+ }
+
+ while (cur < orig->len) {
+ if (tolower (*(orig->begin + cur)) == tolower (*pattern->begin)) {
+ while (cur < orig->len && pcur < pattern->len) {
+ if (tolower (*(orig->begin + cur)) != tolower (*(pattern->begin + pcur))) {
+ pcur = 0;
+ break;
+ }
+ cur ++;
+ pcur ++;
+ }
+ return cur - pattern->len;
+ }
+ cur ++;
+ }
+
+ return -1;
+
+}
+
+/*
* Split string by tokens
* word contains parsed word
*
diff --git a/src/fstring.h b/src/fstring.h
index 000ba74c6..f5d7fffa3 100644
--- a/src/fstring.h
+++ b/src/fstring.h
@@ -43,6 +43,11 @@ ssize_t fstrrchr (f_str_t *src, char c);
ssize_t fstrstr (f_str_t *orig, f_str_t *pattern);
/*
+ * Search for pattern in orig ignoring case
+ */
+ssize_t fstrstri (f_str_t *orig, f_str_t *pattern);
+
+/*
* Split string by tokens
* word contains parsed word
*/
@@ -88,7 +93,6 @@ f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen);
*/
uint32_t fstrhash (f_str_t *str);
-
/*
* Make copy of string to 0-terminated string
*/
diff --git a/src/lmtp.c b/src/lmtp.c
new file mode 100644
index 000000000..ba03cd93d
--- /dev/null
+++ b/src/lmtp.c
@@ -0,0 +1,314 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "buffer.h"
+#include "main.h"
+#include "lmtp.h"
+#include "lmtp_proto.h"
+#include "cfg_file.h"
+#include "url.h"
+#include "modules.h"
+#include "message.h"
+
+static char greetingbuf[1024];
+static struct timeval io_tv;
+
+static void write_socket (void *arg);
+
+static
+void sig_handler (int signo)
+{
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ _exit (1);
+ break;
+ }
+}
+
+/*
+ * Config reload is designed by sending sigusr to active workers and pending shutdown of them
+ */
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev);
+ event_del (&worker->bind_ev);
+ do_reopen_log = 1;
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ return;
+}
+
+/*
+ * Destructor for recipients list
+ */
+static void
+rcpt_destruct (void *pointer)
+{
+ struct worker_task *task = (struct worker_task *)pointer;
+
+ if (task->rcpt) {
+ g_list_free (task->rcpt);
+ }
+}
+
+/*
+ * Free all structures of lmtp proto
+ */
+static void
+free_task (struct rspamd_lmtp_proto *lmtp)
+{
+ GList *part;
+ struct mime_part *p;
+
+ if (lmtp) {
+ msg_debug ("free_task: free pointer %p", lmtp->task);
+ if (lmtp->task->memc_ctx) {
+ memc_close_ctx (lmtp->task->memc_ctx);
+ }
+ while ((part = g_list_first (lmtp->task->parts))) {
+ lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part);
+ p = (struct mime_part *)part->data;
+ g_byte_array_free (p->content, FALSE);
+ g_list_free_1 (part);
+ }
+ memory_pool_delete (lmtp->task->task_pool);
+ /* Plan dispatcher shutdown */
+ lmtp->task->dispatcher->wanna_die = 1;
+ close (lmtp->task->sock);
+ g_free (lmtp->task);
+ g_free (lmtp);
+ }
+}
+
+/*
+ * Callback that is called when there is data to read in buffer
+ */
+static void
+read_socket (f_str_t *in, void *arg)
+{
+ struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
+ struct worker_task *task = lmtp->task;
+ ssize_t r;
+
+ switch (task->state) {
+ case READ_COMMAND:
+ case READ_HEADER:
+ if (read_lmtp_input_line (lmtp, in) != 0) {
+ msg_info ("read_lmtp_socket: closing lmtp connection due to protocol error");
+ lmtp->task->state = CLOSING_CONNECTION;
+ }
+ /* Task was read, recall read handler once more with new state to process message and write reply */
+ if (task->state == READ_MESSAGE) {
+ read_socket (in, arg);
+ }
+ break;
+ case READ_MESSAGE:
+ r = process_message (lmtp->task);
+ r = process_filters (lmtp->task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = LMTP_FAILURE;
+ task->state = WRITE_ERROR;
+ write_socket (lmtp);
+ }
+ else if (r == 0) {
+ task->state = WAIT_FILTER;
+ rspamd_dispatcher_pause (lmtp->task->dispatcher);
+ }
+ else {
+ process_statfiles (lmtp->task);
+ task->state = WRITE_REPLY;
+ write_socket (lmtp);
+ }
+ break;
+ }
+}
+
+/*
+ * Callback for socket writing
+ */
+static void
+write_socket (void *arg)
+{
+ struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
+
+ switch (lmtp->task->state) {
+ case WRITE_REPLY:
+ write_lmtp_reply (lmtp);
+ lmtp->task->state = CLOSING_CONNECTION;
+ break;
+ case WRITE_ERROR:
+ write_lmtp_reply (lmtp);
+ lmtp->task->state = CLOSING_CONNECTION;
+ break;
+ case CLOSING_CONNECTION:
+ msg_debug ("lmtp_write_socket: normally closing connection");
+ free_task (lmtp);
+ break;
+ }
+}
+
+/*
+ * Called if something goes wrong
+ */
+static void
+err_socket (GError *err, void *arg)
+{
+ struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
+ msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message);
+ /* Free buffers */
+ free_task (lmtp);
+}
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_socket (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct worker_task *new_task;
+ struct rspamd_lmtp_proto *lmtp;
+ socklen_t addrlen = sizeof(ss);
+ int nfd, on = 1;
+ struct linger linger;
+
+ if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+ return;
+ }
+ if (event_make_socket_nonblocking(fd) < 0) {
+ return;
+ }
+
+ /* Socket options */
+ setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
+ setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
+ linger.l_onoff = 1;
+ linger.l_linger = 2;
+ setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));
+
+ lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto));
+ new_task = g_malloc (sizeof (struct worker_task));
+ bzero (new_task, sizeof (struct worker_task));
+ new_task->worker = worker;
+ new_task->state = READ_COMMAND;
+ new_task->sock = nfd;
+ new_task->cfg = worker->srv->cfg;
+ TAILQ_INIT (&new_task->urls);
+ new_task->task_pool = memory_pool_new (memory_pool_get_size ());
+ /* Add destructor for recipients list (it would be better to use anonymous function here */
+ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
+ new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
+ worker->srv->stat->connections_count ++;
+ lmtp->task = new_task;
+ lmtp->state = LMTP_READ_LHLO;
+
+ /* Set up dispatcher */
+ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+ write_socket, err_socket, &io_tv,
+ (void *)lmtp);
+ rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE);
+}
+
+/*
+ * Start lmtp worker process
+ */
+void
+start_lmtp_worker (struct rspamd_worker *worker)
+{
+ struct sigaction signals;
+ int listen_sock, i;
+ struct sockaddr_un *un_addr;
+ char *hostbuf;
+ long int hostmax;
+
+ worker->srv->pid = getpid ();
+ worker->srv->type = TYPE_LMTP;
+ event_init ();
+ g_mime_init (0);
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ signal_add (&worker->sig_ev, NULL);
+
+ /* Create listen socket */
+ if (worker->srv->cfg->lmtp_family == AF_INET) {
+ if ((listen_sock = make_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port)) == -1) {
+ msg_err ("start_lmtp: cannot create tcp listen socket. %m");
+ exit(-errno);
+ }
+ }
+ else {
+ un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
+ if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr)) == -1) {
+ msg_err ("start_lmtp: cannot create unix listen socket. %m");
+ exit(-errno);
+ }
+ }
+
+ if (listen (listen_sock, -1) == -1) {
+ msg_err ("start_lmtp: cannot listen on socket. %m");
+ exit(-errno);
+ }
+ /* Accept event */
+ event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
+ /* Perform modules configuring */
+ for (i = 0; i < MODULES_NUM; i ++) {
+ modules[i].module_config_func (worker->srv->cfg);
+ }
+
+ /* Fill hostname buf */
+ hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
+ hostbuf = alloca (hostmax);
+ gethostname (hostbuf, hostmax);
+ hostbuf[hostmax - 1] = '\0';
+ snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf);
+
+ /* Send SIGUSR2 to parent */
+ kill (getppid (), SIGUSR2);
+
+ io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
+
+ event_loop (0);
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/lmtp.h b/src/lmtp.h
new file mode 100644
index 000000000..d7c13c497
--- /dev/null
+++ b/src/lmtp.h
@@ -0,0 +1,20 @@
+#ifndef RSPAMD_LMTP_H
+#define RSPAMD_LMTP_H
+
+#include "config.h"
+#include "main.h"
+
+#define LMTP_GREETING 220
+#define LMTP_QUIT 221
+#define LMTP_OK 250
+#define LMTP_DATA 354
+#define LMTP_ERROR_PROCESS 500
+#define LMTP_FAILURE 530
+#define LMTP_AUTH_ERROR 503
+#define LMTP_BAD_CMD 503
+#define LMTP_NO_RCPT 554
+#define LMTP_TEMP_FAIL 421
+
+void start_lmtp_worker (struct rspamd_worker *worker);
+
+#endif
diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c
new file mode 100644
index 000000000..df53f69e3
--- /dev/null
+++ b/src/lmtp_proto.c
@@ -0,0 +1,380 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "cfg_file.h"
+#include "lmtp.h"
+#include "lmtp_proto.h"
+
+/* Max line size as it is defined in rfc2822 */
+#define OUTBUFSIZ 1000
+
+/* LMTP commands */
+static f_str_t lhlo_command = {
+ .begin = "LHLO",
+ .len = sizeof ("LHLO") - 1
+};
+static f_str_t mail_command = {
+ .begin = "MAIL FROM:",
+ .len = sizeof ("MAIL FROM:") - 1
+};
+static f_str_t rcpt_command = {
+ .begin = "RCPT TO:",
+ .len = sizeof ("RCPT TO:") - 1
+};
+static f_str_t data_command = {
+ .begin = "DATA",
+ .len = sizeof ("DATA") - 1
+};
+static f_str_t data_dot = {
+ .begin = ".\r\n",
+ .len = sizeof (".\r\n") - 1
+};
+
+static void
+out_lmtp_reply (struct rspamd_lmtp_proto *lmtp, int code, char *rcode, char *msg)
+{
+ char outbuf[OUTBUFSIZ];
+ int r;
+
+ if (*rcode == '\0') {
+ r = snprintf (outbuf, OUTBUFSIZ, "%d %s\r\n", code, msg);
+ }
+ else {
+ r = snprintf (outbuf, OUTBUFSIZ, "%d %s %s\r\n", code, rcode, msg);
+ }
+ rspamd_dispatcher_write (lmtp->task->dispatcher, outbuf, r, FALSE);
+}
+
+int
+read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line)
+{
+ char *c, *rcpt;
+ unsigned int i = 0, l = 0, size;
+
+ switch (lmtp->state) {
+ case LMTP_READ_LHLO:
+ /* Search LHLO line */
+ if ((i = fstrstri (line, &lhlo_command)) == -1) {
+ msg_info ("read_lmtp_input_line: LHLO expected but not found");
+ out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need LHLO here");
+ return -1;
+ }
+ else {
+ i += lhlo_command.len;
+ c = line->begin + i;
+ /* Skip spaces */
+ while (isspace (*c) && i < line->len) {
+ i ++;
+ c ++;
+ }
+ lmtp->task->helo = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1);
+ /* Strlcpy makes string null terminated by design */
+ g_strlcpy (lmtp->task->helo, c, line->len - i + 1);
+ lmtp->state = LMTP_READ_FROM;
+ out_lmtp_reply (lmtp, LMTP_OK, "", "Ok");
+ return 0;
+ }
+ break;
+ case LMTP_READ_FROM:
+ /* Search MAIL FROM: line */
+ if ((i = fstrstri (line, &mail_command)) == -1) {
+ msg_info ("read_lmtp_input_line: MAIL expected but not found");
+ out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need MAIL here");
+ return -1;
+ }
+ else {
+ i += mail_command.len;
+ c = line->begin + i;
+ /* Get data from brackets (<>)*/
+ while (*c++ != '<' && i < line->len) {
+ i ++;
+ }
+ while (*c != '>' && i < line->len) {
+ l ++;
+ c ++;
+ i ++;
+ }
+
+ lmtp->task->from = memory_pool_alloc (lmtp->task->task_pool, l + 1);
+ /* Strlcpy makes string null terminated by design */
+ g_strlcpy (lmtp->task->from, c - l, l + 1);
+ lmtp->state = LMTP_READ_RCPT;
+ out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Sender ok");
+ return 0;
+ }
+ break;
+ case LMTP_READ_RCPT:
+ /* Search RCPT_TO: line */
+ if ((i = fstrstri (line, &rcpt_command)) == -1) {
+ msg_info ("read_lmtp_input_line: RCPT expected but not found");
+ out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need RCPT here");
+ return -1;
+ }
+ else {
+ i += rcpt_command.len;
+ c = line->begin + i;
+ /* Get data from brackets (<>)*/
+ while (*c++ != '<' && i < line->len) {
+ i ++;
+ }
+ while (*c != '>' && i < line->len) {
+ l ++;
+ c ++;
+ i ++;
+ }
+ rcpt = memory_pool_alloc (lmtp->task->task_pool, l + 1);
+ /* Strlcpy makes string null terminated by design */
+ g_strlcpy (rcpt, c - l, l + 1);
+ lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
+ lmtp->state = LMTP_READ_DATA;
+ out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Recipient ok");
+ return 0;
+ }
+ break;
+ case LMTP_READ_DATA:
+ /* Search DATA line */
+ if ((i = fstrstri (line, &data_command)) == -1) {
+ msg_info ("read_lmtp_input_line: DATA expected but not found");
+ out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need DATA here");
+ return -1;
+ }
+ else {
+ i += rcpt_command.len;
+ c = line->begin + i;
+ /* Skip spaces */
+ while (isspace (*c++)) {
+ i ++;
+ }
+ rcpt = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1);
+ /* Strlcpy makes string null terminated by design */
+ g_strlcpy (rcpt, c, line->len - i + 1);
+ lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
+ lmtp->state = LMTP_READ_MESSAGE;
+ out_lmtp_reply (lmtp, LMTP_DATA, "", "Enter message, ending with \".\" on a line by itself");
+ lmtp->task->msg = fstralloc (lmtp->task->task_pool, BUFSIZ);
+ return 0;
+ }
+ break;
+ case LMTP_READ_MESSAGE:
+ if (strncmp (line->begin, data_dot.begin, line->len) == 0) {
+ lmtp->state = LMTP_READ_DOT;
+ lmtp->task->state = READ_MESSAGE;
+ return 0;
+ }
+ else {
+ l = lmtp->task->msg->len;
+ size = lmtp->task->msg->size;
+ if (l + line->len > size) {
+ /* Grow buffer */
+ if (line->len > size) {
+ size += line->len << 1;
+ }
+ else {
+ /* size *= 2 */
+ size <<= 1;
+ }
+ lmtp->task->msg = fstrgrow (lmtp->task->task_pool, lmtp->task->msg, size);
+ }
+ fstrcat (lmtp->task->msg, line);
+ return 0;
+ }
+ break;
+ case LMTP_READ_DOT:
+ /* We have some input after reading dot, close connection as we have no currently support of multiply
+ * messages per session
+ */
+ out_lmtp_reply (lmtp, LMTP_QUIT, "", "Bye");
+ return 0;
+ break;
+ }
+}
+
+static char*
+format_lda_args (struct worker_task *task)
+{
+ char *arg, *res, *c, *r;
+ size_t len;
+ GList *rcpt;
+ gboolean got_args = FALSE;
+
+ c = task->cfg->deliver_agent_path;
+ /* Find first arg */
+ if ((c = strchr (c, ' ')) == NULL) {
+ return task->cfg->deliver_agent_path;
+ }
+
+ /* Calculate length of result string */
+ len = strlen (task->cfg->deliver_agent_path);
+ while (*c) {
+ if (*c == '%') {
+ c++;
+ switch (*c) {
+ case 'f':
+ /* Insert from */
+ len += strlen (task->from) - 2;
+ break;
+ case 'r':
+ /* Insert list of recipients */
+ rcpt = g_list_first (task->rcpt);
+ len -= 2;
+ while (rcpt) {
+ len += strlen ((char *)rcpt->data) + 1;
+ rcpt = g_list_next (rcpt);
+ }
+ break;
+ }
+ }
+ c ++;
+ len ++;
+ }
+ res = memory_pool_alloc (task->task_pool, len + 1);
+ r = res;
+ c = task->cfg->deliver_agent_path;
+
+ while (*c) {
+ if (*c == ' ') {
+ got_args = TRUE;
+ }
+ if (got_args && *c == '%') {
+ switch (*(c + 1)) {
+ case 'f':
+ /* Insert from */
+ c += 2;
+ len = strlen (task->from);
+ memcpy (r, task->from, len);
+ r += len;
+ break;
+ case 'r':
+ /* Insert list of recipients */
+ c += 2;
+ rcpt = g_list_first (task->rcpt);
+ while (rcpt) {
+ len = strlen ((char *)rcpt->data) + 1;
+ memcpy (r, rcpt->data, len);
+ r += len;
+ *r++ = ' ';
+ rcpt = g_list_next (rcpt);
+ }
+ break;
+ default:
+ *r = *c;
+ r ++;
+ c ++;
+ break;
+ }
+ }
+ else {
+ *r = *c;
+ r ++;
+ c ++;
+ }
+ }
+
+ return res;
+}
+
+static int
+lmtp_deliver_lda (struct worker_task *task)
+{
+ char *args;
+ FILE *lda;
+ GMimeStream *stream;
+ int rc, ecode;
+
+ if ((args = format_lda_args (task)) == NULL) {
+ return -1;
+ }
+
+ lda = popen (args, "w");
+ if (lda == NULL) {
+ msg_info ("lmtp_deliver_lda: cannot deliver to lda, %m");
+ return -1;
+ }
+
+ stream = g_mime_stream_file_new (lda);
+
+ if (g_mime_object_write_to_stream ((GMimeObject *)task->message, stream) == -1) {
+ msg_info ("lmtp_deliver_lda: cannot write stream to lda");
+ return -1;
+ }
+
+ rc = pclose (lda);
+ if (rc == -1) {
+ msg_info ("lmtp_deliver_lda: lda returned error code");
+ return -1;
+ }
+ else if (WIFEXITED (rc)) {
+ ecode = WEXITSTATUS (rc);
+ if (ecode == 0) {
+ return 0;
+ }
+ else {
+ msg_info ("lmtp_deliver_lda: lda returned error code %d", ecode);
+ return -1;
+ }
+ }
+}
+
+int
+lmtp_deliver_message (struct worker_task *task)
+{
+ if (task->cfg->deliver_agent_path != NULL) {
+ /* Do deliver to LDA */
+ return lmtp_deliver_lda (task);
+ }
+ else {
+ /* XXX: do lmtp/smtp client */
+ return -1;
+ }
+}
+
+int
+write_lmtp_reply (struct rspamd_lmtp_proto *lmtp)
+{
+ int r;
+ char outbuf[OUTBUFSIZ];
+
+ msg_debug ("write_lmtp_reply: writing reply to client");
+ if (lmtp->task->error_code != 0) {
+ out_lmtp_reply (lmtp, lmtp->task->error_code, "", lmtp->task->last_error);
+ }
+ else {
+ /* Do delivery */
+ if (lmtp_deliver_message (lmtp->task) == -1) {
+ out_lmtp_reply (lmtp, LMTP_FAILURE, "", "Delivery failure");
+ return -1;
+ }
+ else {
+ out_lmtp_reply (lmtp, LMTP_OK, "", "Delivery completed");
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * vi:ts=4
+ */
diff --git a/src/lmtp_proto.h b/src/lmtp_proto.h
new file mode 100644
index 000000000..24cba2c5e
--- /dev/null
+++ b/src/lmtp_proto.h
@@ -0,0 +1,44 @@
+#ifndef RSPAMD_LMTP_PROTO_H
+#define RSPAMD_LMTP_PROTO_H
+
+#include "config.h"
+
+struct worker_task;
+
+enum lmtp_state {
+ LMTP_READ_LHLO,
+ LMTP_READ_FROM,
+ LMTP_READ_RCPT,
+ LMTP_READ_DATA,
+ LMTP_READ_MESSAGE,
+ LMTP_READ_DOT,
+};
+
+struct rspamd_lmtp_proto {
+ struct worker_task *task;
+ enum lmtp_state state;
+};
+
+/**
+ * Read one line of user's input for specified task
+ * @param lmtp lmtp object
+ * @param line line of user's input
+ * @return 0 if line was successfully parsed and -1 if we have protocol error
+ */
+int read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line);
+
+/**
+ * Deliver message via lmtp/smtp or pipe to LDA
+ * @param task task object
+ * @return 0 if we wrote message and -1 if there was some error
+ */
+int lmtp_deliver_message (struct worker_task *task);
+
+/**
+ * Write reply for specified lmtp object
+ * @param lmtp lmtp object
+ * @return 0 if we wrote reply and -1 if there was some error
+ */
+int write_lmtp_reply (struct rspamd_lmtp_proto *lmtp);
+
+#endif
diff --git a/src/main.c b/src/main.c
index 782f0acff..6a170726a 100644
--- a/src/main.c
+++ b/src/main.c
@@ -27,6 +27,7 @@
#include "cfg_file.h"
#include "util.h"
#include "perl.h"
+#include "lmtp.h"
/* 2 seconds to fork new process in place of dead one */
#define SOFT_FORK_TIME 2
@@ -178,6 +179,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
msg_info ("fork_worker: starting controller process %d", getpid ());
start_controller (cur);
break;
+ case TYPE_LMTP:
+ setproctitle ("lmtp process");
+ pidfile_close (rspamd->pfh);
+ msg_info ("fork_worker: starting lmtp process %d", getpid ());
+ start_lmtp_worker (cur);
case TYPE_WORKER:
default:
setproctitle ("worker process");
@@ -368,6 +374,11 @@ main (int argc, char **argv, char **env)
if (cfg->controller_enabled) {
fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
}
+
+ /* Start lmtp if enabled */
+ if (cfg->lmtp_enable) {
+ fork_worker (rspamd, listen_sock, 0, TYPE_LMTP);
+ }
/* Signal processing cycle */
for (;;) {
@@ -394,17 +405,17 @@ main (int argc, char **argv, char **env)
if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
/* Normal worker termination, do not fork one more */
msg_info ("main: %s process %d terminated normally",
- (cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid);
+ (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
}
else {
if (WIFSIGNALED (res)) {
msg_warn ("main: %s process %d terminated abnormally by signal: %d",
- (cur->type == TYPE_WORKER) ? "worker" : "controller",
+ (cur->type != TYPE_WORKER) ? "controller" : "worker",
cur->pid, WTERMSIG(res));
}
else {
msg_warn ("main: %s process %d terminated abnormally",
- (cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid);
+ (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
}
/* Fork another worker in replace of dead one */
delay_fork (cur->type);
diff --git a/src/main.h b/src/main.h
index ec74ad03b..270817a37 100644
--- a/src/main.h
+++ b/src/main.h
@@ -22,6 +22,8 @@
#define SOFT_SHUTDOWN_TIME 60
/* Default metric name */
#define DEFAULT_METRIC "default"
+/* 60 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 60
/* Logging in postfix style */
#define msg_err g_critical
@@ -36,6 +38,7 @@ enum process_type {
TYPE_MAIN,
TYPE_WORKER,
TYPE_CONTROLLER,
+ TYPE_LMTP,
};
/**
diff --git a/src/worker.c b/src/worker.c
index f3bbac8ce..334e819cb 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -40,8 +40,6 @@
#include <perl.h> /* from the Perl distribution */
#define TASK_POOL_SIZE 4095
-/* 60 seconds for worker's IO */
-#define WORKER_IO_TIMEOUT 60
const f_str_t CRLF = {
/* begin */"\r\n",