/*- * Copyright 2016 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include "buffer.h" #include "main.h" #include "lmtp.h" #include "lmtp_proto.h" #include "cfg_file.h" #include "util.h" #include "url.h" #include "message.h" static gchar greetingbuf[1024]; static struct timeval io_tv; static gboolean lmtp_write_socket (void *arg); void start_lmtp (struct rspamd_worker *worker); worker_t lmtp_worker = { "controller", /* Name */ NULL, /* Init function */ start_lmtp, /* Start function */ TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ TRUE, /* Killable */ RSPAMD_WORKER_VER /* Version info */ }; #ifndef HAVE_SA_SIGINFO static void sig_handler (gint signo) #else static void sig_handler (gint signo, siginfo_t *info, void *unused) #endif { switch (signo) { case SIGINT: case SIGTERM: _exit (1); break; } } /* * Config reload is designed by sending sigusr to active workers and pending shutdown of them */ static void sigusr2_handler (gint fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; /* Do not accept new connections, preparing to end worker's process */ struct timeval tv; tv.tv_sec = SOFT_SHUTDOWN_TIME; tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); event_del (&worker->bind_ev); msg_info ("lmtp worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); event_loopexit (&tv); return; } /* * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them */ static void sigusr1_handler (gint fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *) arg; rspamd_log_reopen (worker->srv->logger); return; } /* * Destructor for recipients list */ static void rcpt_destruct (void *pointer) { struct rspamd_task *task = (struct rspamd_task *)pointer; if (task->rcpt) { g_list_free (task->rcpt); } } /* * Free all structures of lmtp proto */ static void free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft) { GList *part; struct mime_part *p; struct rspamd_task *task = lmtp->task; if (lmtp) { debug_task ("free pointer %p", lmtp->task); while ((part = g_list_first (lmtp->task->parts))) { lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part); p = (struct mime_part *)part->data; g_byte_array_free (p->content, FALSE); g_list_free_1 (part); } rspamd_mempool_delete (lmtp->task->task_pool); if (is_soft) { /* Plan dispatcher shutdown */ lmtp->task->dispatcher->wanna_die = 1; } else { rspamd_remove_dispatcher (lmtp->task->dispatcher); } close (lmtp->task->sock); g_free (lmtp->task); g_free (lmtp); } } /* * Callback that is called when there is data to read in buffer */ static gboolean lmtp_read_socket (rspamd_fstring_t * in, void *arg) { struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; struct rspamd_task *task = lmtp->task; ssize_t r; switch (task->state) { case READ_COMMAND: case READ_HEADER: if (read_lmtp_input_line (lmtp, in) != 0) { msg_info ("closing lmtp connection due to protocol error"); lmtp->task->state = CLOSING_CONNECTION; } /* Task was read, recall read handler once more with new state to process message and write reply */ if (task->state == READ_MESSAGE) { lmtp_read_socket (in, arg); } break; case READ_MESSAGE: r = rspamd_message_parse (lmtp->task); r = rspamd_process_filters (lmtp->task); if (r == -1) { return FALSE; } else if (r == 0) { task->state = WAIT_FILTER; rspamd_dispatcher_pause (lmtp->task->dispatcher); } else { rspamd_process_statistics (lmtp->task); task->state = WRITE_REPLY; lmtp_write_socket (lmtp); } break; default: debug_task ("invalid state while reading from socket %d", lmtp->task->state); break; } return TRUE; } /* * Callback for socket writing */ static gboolean lmtp_write_socket (void *arg) { struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; struct rspamd_task *task = lmtp->task; switch (lmtp->task->state) { case WRITE_REPLY: if (write_lmtp_reply (lmtp) == 1) { lmtp->task->state = WAIT_FILTER; } else { lmtp->task->state = CLOSING_CONNECTION; } break; case WRITE_ERROR: write_lmtp_reply (lmtp); lmtp->task->state = CLOSING_CONNECTION; break; case CLOSING_CONNECTION: debug_task ("normally closing connection"); free_lmtp_task (lmtp, TRUE); return FALSE; break; default: debug_task ("invalid state while writing to socket %d", lmtp->task->state); break; } return TRUE; } /* * Called if something goes wrong */ static void lmtp_err_socket (GError * err, void *arg) { struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; msg_info ("abnormally closing connection, error: %s", err->message); /* Free buffers */ free_lmtp_task (lmtp, FALSE); } /* * Accept new connection and construct task */ static void accept_socket (gint fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; union sa_union su; struct rspamd_task *new_task; struct rspamd_lmtp_proto *lmtp; socklen_t addrlen = sizeof (su.ss); gint nfd; if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto)); new_task = rspamd_task_new (worker); if (su.ss.ss_family == AF_UNIX) { msg_info ("accepted connection from unix socket"); new_task->client_addr.s_addr = INADDR_NONE; } else if (su.ss.ss_family == AF_INET) { msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); memcpy (&new_task->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); } new_task->sock = nfd; new_task->cfg = worker->srv->cfg; new_task->task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); /* Add destructor for recipients list (it would be better to use anonymous function here */ rspamd_mempool_add_destructor (new_task->task_pool, (rspamd_mempool_destruct_t) rcpt_destruct, new_task); new_task->results = g_hash_table_new (rspamd_str_hash, rspamd_str_equal); new_task->ev_base = worker->ctx; rspamd_mempool_add_destructor (new_task->task_pool, (rspamd_mempool_destruct_t) g_hash_table_destroy, new_task->results); worker->srv->stat->connections_count++; lmtp->task = new_task; lmtp->state = LMTP_READ_LHLO; /* Set up dispatcher */ new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base, nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; if (!rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) { msg_warn ("cannot write greeting"); } } /* * Start lmtp worker process */ void start_lmtp (struct rspamd_worker *worker) { struct sigaction signals; gchar *hostbuf; gsize hostmax; module_t **mod; worker->srv->pid = getpid (); worker->ctx = event_init (); rspamd_signals_init (&signals, sig_handler); sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); event_base_set (worker->ctx, &worker->sig_ev_usr2); signal_add (&worker->sig_ev_usr2, NULL); /* SIGUSR1 handler */ signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker); event_base_set (worker->ctx, &worker->sig_ev_usr1); signal_add (&worker->sig_ev_usr1, NULL); /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_base_set (worker->ctx, &worker->bind_ev); event_add (&worker->bind_ev, NULL); /* Perform modules configuring */ mod = &modules[0]; while (*mod) { (*mod)->module_config_func (worker->srv->cfg); mod++; } /* Fill hostname buf */ hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; hostbuf = alloca (hostmax); gethostname (hostbuf, hostmax); hostbuf[hostmax - 1] = '\0'; rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf); io_tv.tv_sec = 60000; io_tv.tv_usec = 0; gperf_profiler_init (worker->srv->cfg, "lmtp"); event_base_loop (worker->ctx, 0); exit (EXIT_SUCCESS); } /* * vi:ts=4 */