123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- /*
- * Copyright (c) 2009-2012, Vsevolod Stakhov
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
- #include "config.h"
- #include "buffer.h"
- #include "main.h"
- #include "lmtp.h"
- #include "lmtp_proto.h"
- #include "cfg_file.h"
- #include "util.h"
- #include "url.h"
- #include "message.h"
-
- static gchar greetingbuf[1024];
- static struct timeval io_tv;
-
- static gboolean lmtp_write_socket (void *arg);
-
- void start_lmtp (struct rspamd_worker *worker);
-
- worker_t lmtp_worker = {
- "controller", /* Name */
- NULL, /* Init function */
- start_lmtp, /* Start function */
- TRUE, /* Has socket */
- FALSE, /* Non unique */
- FALSE, /* Non threaded */
- TRUE /* Killable */
- };
-
- #ifndef HAVE_SA_SIGINFO
- static void
- sig_handler (gint signo)
- #else
- static void
- sig_handler (gint signo, siginfo_t *info, void *unused)
- #endif
- {
- switch (signo) {
- case SIGINT:
- case SIGTERM:
- _exit (1);
- break;
- }
- }
-
- /*
- * Config reload is designed by sending sigusr to active workers and pending shutdown of them
- */
- static void
- sigusr2_handler (gint fd, short what, void *arg)
- {
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- /* Do not accept new connections, preparing to end worker's process */
- struct timeval tv;
- tv.tv_sec = SOFT_SHUTDOWN_TIME;
- tv.tv_usec = 0;
- event_del (&worker->sig_ev_usr1);
- event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
- msg_info ("lmtp worker's shutdown is pending in %d sec",
- SOFT_SHUTDOWN_TIME);
- event_loopexit (&tv);
- return;
- }
-
- /*
- * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
- */
- static void
- sigusr1_handler (gint fd, short what, void *arg)
- {
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
-
- rspamd_log_reopen (worker->srv->logger);
-
- return;
- }
-
- /*
- * Destructor for recipients list
- */
- static void
- rcpt_destruct (void *pointer)
- {
- struct rspamd_task *task = (struct rspamd_task *)pointer;
-
- if (task->rcpt) {
- g_list_free (task->rcpt);
- }
- }
-
- /*
- * Free all structures of lmtp proto
- */
- static void
- free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
- {
- GList *part;
- struct mime_part *p;
- struct rspamd_task *task = lmtp->task;
-
- if (lmtp) {
- debug_task ("free pointer %p", lmtp->task);
- while ((part = g_list_first (lmtp->task->parts))) {
- lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part);
- p = (struct mime_part *)part->data;
- g_byte_array_free (p->content, FALSE);
- g_list_free_1 (part);
- }
- rspamd_mempool_delete (lmtp->task->task_pool);
- if (is_soft) {
- /* Plan dispatcher shutdown */
- lmtp->task->dispatcher->wanna_die = 1;
- }
- else {
- rspamd_remove_dispatcher (lmtp->task->dispatcher);
- }
- close (lmtp->task->sock);
- g_free (lmtp->task);
- g_free (lmtp);
- }
- }
-
- /*
- * Callback that is called when there is data to read in buffer
- */
- static gboolean
- lmtp_read_socket (rspamd_fstring_t * in, void *arg)
- {
- struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
- struct rspamd_task *task = lmtp->task;
- ssize_t r;
-
- switch (task->state) {
- case READ_COMMAND:
- case READ_HEADER:
- if (read_lmtp_input_line (lmtp, in) != 0) {
- msg_info ("closing lmtp connection due to protocol error");
- lmtp->task->state = CLOSING_CONNECTION;
- }
- /* Task was read, recall read handler once more with new state to process message and write reply */
- if (task->state == READ_MESSAGE) {
- lmtp_read_socket (in, arg);
- }
- break;
- case READ_MESSAGE:
- r = process_message (lmtp->task);
- r = rspamd_process_filters (lmtp->task);
- if (r == -1) {
- return FALSE;
- }
- else if (r == 0) {
- task->state = WAIT_FILTER;
- rspamd_dispatcher_pause (lmtp->task->dispatcher);
- }
- else {
- rspamd_process_statistics (lmtp->task);
- task->state = WRITE_REPLY;
- lmtp_write_socket (lmtp);
- }
- break;
- default:
- debug_task ("invalid state while reading from socket %d",
- lmtp->task->state);
- break;
- }
-
- return TRUE;
- }
-
- /*
- * Callback for socket writing
- */
- static gboolean
- lmtp_write_socket (void *arg)
- {
- struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
- struct rspamd_task *task = lmtp->task;
-
- switch (lmtp->task->state) {
- case WRITE_REPLY:
- if (write_lmtp_reply (lmtp) == 1) {
- lmtp->task->state = WAIT_FILTER;
- }
- else {
- lmtp->task->state = CLOSING_CONNECTION;
- }
- break;
- case WRITE_ERROR:
- write_lmtp_reply (lmtp);
- lmtp->task->state = CLOSING_CONNECTION;
- break;
- case CLOSING_CONNECTION:
- debug_task ("normally closing connection");
- free_lmtp_task (lmtp, TRUE);
- return FALSE;
- break;
- default:
- debug_task ("invalid state while writing to socket %d",
- lmtp->task->state);
- break;
- }
-
- return TRUE;
- }
-
- /*
- * Called if something goes wrong
- */
- static void
- lmtp_err_socket (GError * err, void *arg)
- {
- struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
- msg_info ("abnormally closing connection, error: %s", err->message);
- /* Free buffers */
- free_lmtp_task (lmtp, FALSE);
- }
-
- /*
- * Accept new connection and construct task
- */
- static void
- accept_socket (gint fd, short what, void *arg)
- {
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- union sa_union su;
- struct rspamd_task *new_task;
- struct rspamd_lmtp_proto *lmtp;
- socklen_t addrlen = sizeof (su.ss);
- gint nfd;
-
- if ((nfd =
- accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
- msg_warn ("accept failed: %s", strerror (errno));
- return;
- }
-
- lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto));
-
- new_task = rspamd_task_new (worker);
-
- if (su.ss.ss_family == AF_UNIX) {
- msg_info ("accepted connection from unix socket");
- new_task->client_addr.s_addr = INADDR_NONE;
- }
- else if (su.ss.ss_family == AF_INET) {
- msg_info ("accepted connection from %s port %d",
- inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
- memcpy (&new_task->client_addr, &su.s4.sin_addr,
- sizeof (struct in_addr));
- }
-
- new_task->sock = nfd;
- new_task->cfg = worker->srv->cfg;
- new_task->task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
- /* Add destructor for recipients list (it would be better to use anonymous function here */
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) rcpt_destruct, new_task);
- new_task->results = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
- new_task->ev_base = worker->ctx;
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) g_hash_table_destroy, new_task->results);
- worker->srv->stat->connections_count++;
- lmtp->task = new_task;
- lmtp->state = LMTP_READ_LHLO;
-
- /* Set up dispatcher */
- new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base,
- nfd,
- BUFFER_LINE,
- lmtp_read_socket,
- lmtp_write_socket,
- lmtp_err_socket,
- &io_tv,
- (void *)lmtp);
- new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
- if (!rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf,
- strlen (greetingbuf), FALSE, FALSE)) {
- msg_warn ("cannot write greeting");
- }
- }
-
- /*
- * Start lmtp worker process
- */
- void
- start_lmtp (struct rspamd_worker *worker)
- {
- struct sigaction signals;
- gchar *hostbuf;
- gsize hostmax;
- module_t **mod;
-
- worker->srv->pid = getpid ();
- worker->ctx = event_init ();
-
- rspamd_signals_init (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
- /* SIGUSR2 handler */
- signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler,
- (void *) worker);
- event_base_set (worker->ctx, &worker->sig_ev_usr2);
- signal_add (&worker->sig_ev_usr2, NULL);
-
- /* SIGUSR1 handler */
- signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler,
- (void *) worker);
- event_base_set (worker->ctx, &worker->sig_ev_usr1);
- signal_add (&worker->sig_ev_usr1, NULL);
-
- /* Accept event */
- event_set (&worker->bind_ev,
- worker->cf->listen_sock,
- EV_READ | EV_PERSIST,
- accept_socket,
- (void *)worker);
- event_base_set (worker->ctx, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
-
- /* Perform modules configuring */
- mod = &modules[0];
- while (*mod) {
- (*mod)->module_config_func (worker->srv->cfg);
- mod++;
- }
-
- /* Fill hostname buf */
- hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
- hostbuf = alloca (hostmax);
- gethostname (hostbuf, hostmax);
- hostbuf[hostmax - 1] = '\0';
- rspamd_snprintf (greetingbuf,
- sizeof (greetingbuf),
- "%d rspamd version %s LMTP on %s Ready\r\n",
- LMTP_OK,
- RVERSION,
- hostbuf);
-
- io_tv.tv_sec = 60000;
- io_tv.tv_usec = 0;
-
- gperf_profiler_init (worker->srv->cfg, "lmtp");
-
- event_base_loop (worker->ctx, 0);
- exit (EXIT_SUCCESS);
- }
-
- /*
- * vi:ts=4
- */
|