diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-24 20:16:53 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-24 20:16:53 +0300 |
commit | 1cd34f5283bcf1d4973a351dc5235094608e3d2d (patch) | |
tree | 2d6185294cfea7c3c062c8d2123df96f5e02e1b0 /src/lmtp.c | |
parent | 606128de4cb33a2727d6609df46ecf0c72006a73 (diff) | |
download | rspamd-1cd34f5283bcf1d4973a351dc5235094608e3d2d.tar.gz rspamd-1cd34f5283bcf1d4973a351dc5235094608e3d2d.zip |
* Add initial LMTP support and LDA delivery to rspamd
Diffstat (limited to 'src/lmtp.c')
-rw-r--r-- | src/lmtp.c | 314 |
1 files changed, 314 insertions, 0 deletions
diff --git a/src/lmtp.c b/src/lmtp.c new file mode 100644 index 000000000..ba03cd93d --- /dev/null +++ b/src/lmtp.c @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "buffer.h" +#include "main.h" +#include "lmtp.h" +#include "lmtp_proto.h" +#include "cfg_file.h" +#include "url.h" +#include "modules.h" +#include "message.h" + +static char greetingbuf[1024]; +static struct timeval io_tv; + +static void write_socket (void *arg); + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: + _exit (1); + break; + } +} + +/* + * Config reload is designed by sending sigusr to active workers and pending shutdown of them + */ +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + do_reopen_log = 1; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +/* + * Destructor for recipients list + */ +static void +rcpt_destruct (void *pointer) +{ + struct worker_task *task = (struct worker_task *)pointer; + + if (task->rcpt) { + g_list_free (task->rcpt); + } +} + +/* + * Free all structures of lmtp proto + */ +static void +free_task (struct rspamd_lmtp_proto *lmtp) +{ + GList *part; + struct mime_part *p; + + if (lmtp) { + msg_debug ("free_task: free pointer %p", lmtp->task); + if (lmtp->task->memc_ctx) { + memc_close_ctx (lmtp->task->memc_ctx); + } + while ((part = g_list_first (lmtp->task->parts))) { + lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part); + p = (struct mime_part *)part->data; + g_byte_array_free (p->content, FALSE); + g_list_free_1 (part); + } + memory_pool_delete (lmtp->task->task_pool); + /* Plan dispatcher shutdown */ + lmtp->task->dispatcher->wanna_die = 1; + close (lmtp->task->sock); + g_free (lmtp->task); + g_free (lmtp); + } +} + +/* + * Callback that is called when there is data to read in buffer + */ +static void +read_socket (f_str_t *in, void *arg) +{ + struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; + struct worker_task *task = lmtp->task; + ssize_t r; + + switch (task->state) { + case READ_COMMAND: + case READ_HEADER: + if (read_lmtp_input_line (lmtp, in) != 0) { + msg_info ("read_lmtp_socket: closing lmtp connection due to protocol error"); + lmtp->task->state = CLOSING_CONNECTION; + } + /* Task was read, recall read handler once more with new state to process message and write reply */ + if (task->state == READ_MESSAGE) { + read_socket (in, arg); + } + break; + case READ_MESSAGE: + r = process_message (lmtp->task); + r = process_filters (lmtp->task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = LMTP_FAILURE; + task->state = WRITE_ERROR; + write_socket (lmtp); + } + else if (r == 0) { + task->state = WAIT_FILTER; + rspamd_dispatcher_pause (lmtp->task->dispatcher); + } + else { + process_statfiles (lmtp->task); + task->state = WRITE_REPLY; + write_socket (lmtp); + } + break; + } +} + +/* + * Callback for socket writing + */ +static void +write_socket (void *arg) +{ + struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; + + switch (lmtp->task->state) { + case WRITE_REPLY: + write_lmtp_reply (lmtp); + lmtp->task->state = CLOSING_CONNECTION; + break; + case WRITE_ERROR: + write_lmtp_reply (lmtp); + lmtp->task->state = CLOSING_CONNECTION; + break; + case CLOSING_CONNECTION: + msg_debug ("lmtp_write_socket: normally closing connection"); + free_task (lmtp); + break; + } +} + +/* + * Called if something goes wrong + */ +static void +err_socket (GError *err, void *arg) +{ + struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; + msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message); + /* Free buffers */ + free_task (lmtp); +} + +/* + * Accept new connection and construct task + */ +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct worker_task *new_task; + struct rspamd_lmtp_proto *lmtp; + socklen_t addrlen = sizeof(ss); + int nfd, on = 1; + struct linger linger; + + if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + return; + } + if (event_make_socket_nonblocking(fd) < 0) { + return; + } + + /* Socket options */ + setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); + setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); + linger.l_onoff = 1; + linger.l_linger = 2; + setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)); + + lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto)); + new_task = g_malloc (sizeof (struct worker_task)); + bzero (new_task, sizeof (struct worker_task)); + new_task->worker = worker; + new_task->state = READ_COMMAND; + new_task->sock = nfd; + new_task->cfg = worker->srv->cfg; + TAILQ_INIT (&new_task->urls); + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + /* Add destructor for recipients list (it would be better to use anonymous function here */ + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task); + new_task->results = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results); + worker->srv->stat->connections_count ++; + lmtp->task = new_task; + lmtp->state = LMTP_READ_LHLO; + + /* Set up dispatcher */ + new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, + write_socket, err_socket, &io_tv, + (void *)lmtp); + rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE); +} + +/* + * Start lmtp worker process + */ +void +start_lmtp_worker (struct rspamd_worker *worker) +{ + struct sigaction signals; + int listen_sock, i; + struct sockaddr_un *un_addr; + char *hostbuf; + long int hostmax; + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_LMTP; + event_init (); + g_mime_init (0); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + /* Create listen socket */ + if (worker->srv->cfg->lmtp_family == AF_INET) { + if ((listen_sock = make_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port)) == -1) { + msg_err ("start_lmtp: cannot create tcp listen socket. %m"); + exit(-errno); + } + } + else { + un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un)); + if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr)) == -1) { + msg_err ("start_lmtp: cannot create unix listen socket. %m"); + exit(-errno); + } + } + + if (listen (listen_sock, -1) == -1) { + msg_err ("start_lmtp: cannot listen on socket. %m"); + exit(-errno); + } + /* Accept event */ + event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Perform modules configuring */ + for (i = 0; i < MODULES_NUM; i ++) { + modules[i].module_config_func (worker->srv->cfg); + } + + /* Fill hostname buf */ + hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; + hostbuf = alloca (hostmax); + gethostname (hostbuf, hostmax); + hostbuf[hostmax - 1] = '\0'; + snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf); + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + io_tv.tv_sec = WORKER_IO_TIMEOUT; + io_tv.tv_usec = 0; + + event_loop (0); +} + +/* + * vi:ts=4 + */ |