/*- * Copyright 2016 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include "rspamd.h" #include "lua/lua_common.h" #include "worker_util.h" #include "unix-std.h" #include "utlist.h" #include "ottery.h" #include "rspamd_control.h" #include "libserver/maps/map.h" #include "libserver/maps/map_private.h" #include "libserver/http/http_private.h" #include "libserver/http/http_router.h" #include "libutil/rrd.h" /* sys/resource.h */ #ifdef HAVE_SYS_RESOURCE_H #include #endif /* pwd and grp */ #ifdef HAVE_PWD_H #include #endif #ifdef HAVE_GRP_H #include #endif #ifdef HAVE_LIBUTIL_H #include #endif #include "zlib.h" #ifdef WITH_LIBUNWIND #define UNW_LOCAL_ONLY 1 #include #define UNWIND_BACKTRACE_DEPTH 256 #endif #ifdef HAVE_UCONTEXT_H #include #elif defined(HAVE_SYS_UCONTEXT_H) #include #endif #ifdef HAVE_SYS_WAIT_H #include #endif #include "contrib/libev/ev.h" #include "libstat/stat_api.h" /* Forward declaration */ static void rspamd_worker_heartbeat_start (struct rspamd_worker *, struct ev_loop *); static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *); /** * Return worker's control structure by its type * @param type * @return worker's control structure or NULL */ worker_t * rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type) { worker_t **pwrk; pwrk = cfg->compiled_workers; while (pwrk && *pwrk) { if (rspamd_check_worker (cfg, *pwrk)) { if (g_quark_from_string ((*pwrk)->name) == type) { return *pwrk; } } pwrk++; } return NULL; } static void rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents) { int *pnchecks = (int *)w->data; if (*pnchecks > SOFT_SHUTDOWN_TIME * 10) { msg_warn ("terminating worker before finishing of terminate handlers"); ev_break (EV_A_ EVBREAK_ONE); } else { int refcount = ev_active_cnt (EV_A); if (refcount == 1) { ev_break (EV_A_ EVBREAK_ONE); } else { ev_timer_again (EV_A_ w); } } } static gboolean rspamd_worker_finalize (gpointer user_data) { struct rspamd_task *task = user_data; if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) { msg_info_task ("finishing actions has been processed, terminating"); /* ev_break (task->event_loop, EVBREAK_ALL); */ task->worker->state = rspamd_worker_wanna_die; rspamd_session_destroy (task->s); return TRUE; } return FALSE; } gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) { struct rspamd_task *task; struct rspamd_config *cfg = worker->srv->cfg; struct rspamd_abstract_worker_ctx *ctx; struct rspamd_config_cfg_lua_script *sc; if (cfg->on_term_scripts) { ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; /* Create a fake task object for async events */ task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->flags |= RSPAMD_TASK_FLAG_PROCESSING; task->s = rspamd_session_create (task->task_pool, rspamd_worker_finalize, NULL, (event_finalizer_t) rspamd_task_free, task); DL_FOREACH (cfg->on_term_scripts, sc) { lua_call_finish_script (sc, task); } task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING; if (rspamd_session_pending (task->s)) { return TRUE; } } return FALSE; } static void rspamd_worker_terminate_handlers (struct rspamd_worker *w) { if (w->nconns == 0 && (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) { /* * We are here either: * - No active connections are represented * - No term scripts are registered * - Worker is not a scanner, so it can die safely */ w->state = rspamd_worker_wanna_die; } else { if (w->nconns > 0) { /* * Wait until all connections are terminated */ w->state = rspamd_worker_wait_connections; } else { /* * Start finish scripts */ if (w->state != rspamd_worker_wait_final_scripts) { w->state = rspamd_worker_wait_final_scripts; if ((w->flags & RSPAMD_WORKER_SCANNER) && rspamd_worker_call_finish_handlers (w)) { msg_info ("performing async finishing actions"); w->state = rspamd_worker_wait_final_scripts; } else { /* * We are done now */ msg_info ("no async finishing actions, terminating"); w->state = rspamd_worker_wanna_die; } } } } } static void rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *)w->data; worker->state = rspamd_worker_wanna_die; ev_timer_stop (EV_A_ w); ev_break (loop, EVBREAK_ALL); } static void rspamd_worker_shutdown_check (EV_P_ ev_timer *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *)w->data; if (worker->state != rspamd_worker_wanna_die) { rspamd_worker_terminate_handlers (worker); if (worker->state == rspamd_worker_wanna_die) { /* We are done, kill event loop */ ev_timer_stop (EV_A_ w); ev_break (EV_A_ EVBREAK_ALL); } else { /* Try again later */ ev_timer_again (EV_A_ w); } } else { ev_timer_stop (EV_A_ w); ev_break (EV_A_ EVBREAK_ALL); } } /* * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them */ static gboolean rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg) { /* Do not accept new connections, preparing to end worker's process */ if (sigh->worker->state == rspamd_worker_state_running) { static ev_timer shutdown_ev, shutdown_check_ev; ev_tstamp shutdown_ts; if (sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY) { shutdown_ts = 0.0; } else { shutdown_ts = MAX (SOFT_SHUTDOWN_TIME, sigh->worker->srv->cfg->task_timeout * 2.0); } rspamd_worker_ignore_signal (sigh); sigh->worker->state = rspamd_worker_state_terminating; rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, G_STRFUNC, "worker's shutdown is pending in %.2f sec", shutdown_ts); /* Soft shutdown timer */ shutdown_ev.data = sigh->worker; ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, shutdown_ts, 0.0); ev_timer_start (sigh->event_loop, &shutdown_ev); if (!(sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY)) { /* This timer checks if we are ready to die and is called frequently */ shutdown_check_ev.data = sigh->worker; ev_timer_init (&shutdown_check_ev, rspamd_worker_shutdown_check, 0.5, 0.5); ev_timer_start (sigh->event_loop, &shutdown_check_ev); } rspamd_worker_stop_accept (sigh->worker); } /* No more signals */ return FALSE; } /* * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them */ static gboolean rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg) { struct rspamd_main *rspamd_main = sigh->worker->srv; rspamd_log_reopen (sigh->worker->srv->logger, rspamd_main->cfg, -1, -1); msg_info_main ("logging reinitialised"); /* Get more signals */ return TRUE; } static gboolean rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg) { if (sigh->worker->state == rspamd_worker_state_running) { static ev_timer shutdown_ev, shutdown_check_ev; ev_tstamp shutdown_ts; if (sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY) { shutdown_ts = 0.0; } else { shutdown_ts = MAX (SOFT_SHUTDOWN_TIME, sigh->worker->srv->cfg->task_timeout * 2.0); } rspamd_worker_ignore_signal (sigh); sigh->worker->state = rspamd_worker_state_terminating; rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, G_STRFUNC, "terminating after receiving signal %s", g_strsignal (sigh->signo)); rspamd_worker_stop_accept (sigh->worker); rspamd_worker_terminate_handlers (sigh->worker); /* Check if we are ready to die */ if (sigh->worker->state != rspamd_worker_wanna_die) { /* This timer is called when we have no choices but to die */ shutdown_ev.data = sigh->worker; ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, shutdown_ts, 0.0); ev_timer_start (sigh->event_loop, &shutdown_ev); if (!(sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY)) { /* This timer checks if we are ready to die and is called frequently */ shutdown_check_ev.data = sigh->worker; ev_timer_init (&shutdown_check_ev, rspamd_worker_shutdown_check, 0.5, 0.5); ev_timer_start (sigh->event_loop, &shutdown_check_ev); } } else { /* Flag to die has been already set */ ev_break (sigh->event_loop, EVBREAK_ALL); } } /* Stop reacting on signals */ return FALSE; } static void rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents) { struct rspamd_worker_signal_handler *sigh = (struct rspamd_worker_signal_handler *)w->data; struct rspamd_worker_signal_handler_elt *cb, *cbtmp; /* Call all signal handlers registered */ DL_FOREACH_SAFE (sigh->cb, cb, cbtmp) { if (!cb->handler (sigh, cb->handler_data)) { DL_DELETE (sigh->cb, cb); g_free (cb); } } } static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh) { sigset_t set; ev_signal_stop (sigh->event_loop, &sigh->ev_sig); sigemptyset (&set); sigaddset (&set, sigh->signo); sigprocmask (SIG_BLOCK, &set, NULL); } static void rspamd_worker_default_signal (int signo) { struct sigaction sig; sigemptyset (&sig.sa_mask); sigaddset (&sig.sa_mask, signo); sig.sa_handler = SIG_DFL; sig.sa_flags = 0; sigaction (signo, &sig, NULL); } static void rspamd_sigh_free (void *p) { struct rspamd_worker_signal_handler *sigh = p; struct rspamd_worker_signal_handler_elt *cb, *tmp; DL_FOREACH_SAFE (sigh->cb, cb, tmp) { DL_DELETE (sigh->cb, cb); g_free (cb); } ev_signal_stop (sigh->event_loop, &sigh->ev_sig); rspamd_worker_default_signal (sigh->signo); g_free (sigh); } void rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker, struct ev_loop *event_loop, rspamd_worker_signal_cb_t handler, void *handler_data) { struct rspamd_worker_signal_handler *sigh; struct rspamd_worker_signal_handler_elt *cb; sigh = g_hash_table_lookup (worker->signal_events, GINT_TO_POINTER (signo)); if (sigh == NULL) { sigh = g_malloc0 (sizeof (*sigh)); sigh->signo = signo; sigh->worker = worker; sigh->event_loop = event_loop; sigh->enabled = TRUE; sigh->ev_sig.data = sigh; ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo); ev_signal_start (event_loop, &sigh->ev_sig); g_hash_table_insert (worker->signal_events, GINT_TO_POINTER (signo), sigh); } cb = g_malloc0 (sizeof (*cb)); cb->handler = handler; cb->handler_data = handler_data; DL_APPEND (sigh->cb, cb); } void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *event_loop) { /* A set of terminating signals */ rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop, rspamd_worker_term_handler, NULL); rspamd_worker_set_signal_handler (SIGINT, worker, event_loop, rspamd_worker_term_handler, NULL); rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop, rspamd_worker_term_handler, NULL); /* Special purpose signals */ rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop, rspamd_worker_usr1_handler, NULL); rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop, rspamd_worker_usr2_handler, NULL); } struct ev_loop * rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, rspamd_accept_handler hdl) { struct ev_loop *event_loop; GList *cur; struct rspamd_worker_listen_socket *ls; struct rspamd_worker_accept_event *accept_ev; worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, rspamd_sigh_free); event_loop = ev_loop_new (rspamd_config_ev_backend_get (worker->srv->cfg)); worker->srv->event_loop = event_loop; rspamd_worker_init_signals (worker, event_loop); rspamd_control_worker_add_default_cmd_handlers (worker, event_loop); rspamd_worker_heartbeat_start (worker, event_loop); #ifdef WITH_HIREDIS rspamd_redis_pool_config (worker->srv->cfg->redis_pool, worker->srv->cfg, event_loop); #endif /* Accept all sockets */ if (hdl) { cur = worker->cf->listen_socks; while (cur) { ls = cur->data; if (ls->fd != -1) { accept_ev = g_malloc0 (sizeof (*accept_ev)); accept_ev->event_loop = event_loop; accept_ev->accept_ev.data = worker; ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ); ev_io_start (event_loop, &accept_ev->accept_ev); DL_APPEND (worker->accept_events, accept_ev); } cur = g_list_next (cur); } } return event_loop; } void rspamd_worker_stop_accept (struct rspamd_worker *worker) { struct rspamd_worker_accept_event *cur, *tmp; /* Remove all events */ DL_FOREACH_SAFE (worker->accept_events, cur, tmp) { if (ev_can_stop (&cur->accept_ev)) { ev_io_stop (cur->event_loop, &cur->accept_ev); } if (ev_can_stop (&cur->throttling_ev)) { ev_timer_stop (cur->event_loop, &cur->throttling_ev); } g_free (cur); } /* XXX: we need to do it much later */ #if 0 g_hash_table_iter_init (&it, worker->signal_events); while (g_hash_table_iter_next (&it, &k, &v)) { sigh = (struct rspamd_worker_signal_handler *)v; g_hash_table_iter_steal (&it); if (sigh->enabled) { event_del (&sigh->ev); } g_free (sigh); } g_hash_table_unref (worker->signal_events); #endif } static rspamd_fstring_t * rspamd_controller_maybe_compress (struct rspamd_http_connection_entry *entry, rspamd_fstring_t *buf, struct rspamd_http_message *msg) { if (entry->support_gzip) { if (rspamd_fstring_gzip (&buf)) { rspamd_http_message_add_header (msg, "Content-Encoding", "gzip"); } } return buf; } void rspamd_controller_send_error (struct rspamd_http_connection_entry *entry, gint code, const gchar *error_msg, ...) { struct rspamd_http_message *msg; va_list args; rspamd_fstring_t *reply; msg = rspamd_http_new_message (HTTP_RESPONSE); va_start (args, error_msg); msg->status = rspamd_fstring_new (); rspamd_vprintf_fstring (&msg->status, error_msg, args); va_end (args); msg->date = time (NULL); msg->code = code; reply = rspamd_fstring_sized_new (msg->status->len + 16); rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status); rspamd_http_message_set_body_from_fstring_steal (msg, rspamd_controller_maybe_compress (entry, reply, msg)); rspamd_http_connection_reset (entry->conn); rspamd_http_router_insert_headers (entry->rt, msg); rspamd_http_connection_write_message (entry->conn, msg, NULL, "application/json", entry, entry->rt->timeout); entry->is_reply = TRUE; } void rspamd_controller_send_string (struct rspamd_http_connection_entry *entry, const gchar *str) { struct rspamd_http_message *msg; rspamd_fstring_t *reply; msg = rspamd_http_new_message (HTTP_RESPONSE); msg->date = time (NULL); msg->code = 200; msg->status = rspamd_fstring_new_init ("OK", 2); if (str) { reply = rspamd_fstring_new_init (str, strlen (str)); } else { reply = rspamd_fstring_new_init ("null", 4); } rspamd_http_message_set_body_from_fstring_steal (msg, rspamd_controller_maybe_compress (entry, reply, msg)); rspamd_http_connection_reset (entry->conn); rspamd_http_router_insert_headers (entry->rt, msg); rspamd_http_connection_write_message (entry->conn, msg, NULL, "application/json", entry, entry->rt->timeout); entry->is_reply = TRUE; } void rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry, ucl_object_t *obj) { struct rspamd_http_message *msg; rspamd_fstring_t *reply; msg = rspamd_http_new_message (HTTP_RESPONSE); msg->date = time (NULL); msg->code = 200; msg->status = rspamd_fstring_new_init ("OK", 2); reply = rspamd_fstring_sized_new (BUFSIZ); rspamd_ucl_emit_fstring (obj, UCL_EMIT_JSON_COMPACT, &reply); rspamd_http_message_set_body_from_fstring_steal (msg, rspamd_controller_maybe_compress (entry, reply, msg)); rspamd_http_connection_reset (entry->conn); rspamd_http_router_insert_headers (entry->rt, msg); rspamd_http_connection_write_message (entry->conn, msg, NULL, "application/json", entry, entry->rt->timeout); entry->is_reply = TRUE; } static void rspamd_worker_drop_priv (struct rspamd_main *rspamd_main) { if (rspamd_main->is_privilleged) { if (setgid (rspamd_main->workers_gid) == -1) { msg_err_main ("cannot setgid to %d (%s), aborting", (gint) rspamd_main->workers_gid, strerror (errno)); exit (-errno); } if (rspamd_main->cfg->rspamd_user && initgroups (rspamd_main->cfg->rspamd_user, rspamd_main->workers_gid) == -1) { msg_err_main ("initgroups failed (%s), aborting", strerror (errno)); exit (-errno); } if (setuid (rspamd_main->workers_uid) == -1) { msg_err_main ("cannot setuid to %d (%s), aborting", (gint) rspamd_main->workers_uid, strerror (errno)); exit (-errno); } } } static void rspamd_worker_set_limits (struct rspamd_main *rspamd_main, struct rspamd_worker_conf *cf) { struct rlimit rlmt; if (cf->rlimit_nofile != 0) { rlmt.rlim_cur = (rlim_t) cf->rlimit_nofile; rlmt.rlim_max = (rlim_t) cf->rlimit_nofile; if (setrlimit (RLIMIT_NOFILE, &rlmt) == -1) { msg_warn_main ("cannot set files rlimit: %L, %s", cf->rlimit_nofile, strerror (errno)); } memset (&rlmt, 0, sizeof (rlmt)); if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) { msg_warn_main ("cannot get max files rlimit: %HL, %s", cf->rlimit_maxcore, strerror (errno)); } else { msg_info_main ("set max file descriptors limit: %HL cur and %HL max", (guint64) rlmt.rlim_cur, (guint64) rlmt.rlim_max); } } else { /* Just report */ if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) { msg_warn_main ("cannot get max files rlimit: %HL, %s", cf->rlimit_maxcore, strerror (errno)); } else { msg_info_main ("use system max file descriptors limit: %HL cur and %HL max", (guint64) rlmt.rlim_cur, (guint64) rlmt.rlim_max); } } if (rspamd_main->cores_throttling) { msg_info_main ("disable core files for the new worker as limits are reached"); rlmt.rlim_cur = 0; rlmt.rlim_max = 0; if (setrlimit (RLIMIT_CORE, &rlmt) == -1) { msg_warn_main ("cannot disable core dumps: error when setting limits: %s", strerror (errno)); } } else { if (cf->rlimit_maxcore != 0) { rlmt.rlim_cur = (rlim_t) cf->rlimit_maxcore; rlmt.rlim_max = (rlim_t) cf->rlimit_maxcore; if (setrlimit (RLIMIT_CORE, &rlmt) == -1) { msg_warn_main ("cannot set max core size limit: %HL, %s", cf->rlimit_maxcore, strerror (errno)); } /* Ensure that we did it */ memset (&rlmt, 0, sizeof (rlmt)); if (getrlimit (RLIMIT_CORE, &rlmt) == -1) { msg_warn_main ("cannot get max core size rlimit: %HL, %s", cf->rlimit_maxcore, strerror (errno)); } else { if (rlmt.rlim_cur != cf->rlimit_maxcore || rlmt.rlim_max != cf->rlimit_maxcore) { msg_warn_main ("setting of core file limits was unsuccessful: " "%HL was wanted, " "but we have %HL cur and %HL max", cf->rlimit_maxcore, (guint64) rlmt.rlim_cur, (guint64) rlmt.rlim_max); } else { msg_info_main ("set max core size limit: %HL cur and %HL max", (guint64) rlmt.rlim_cur, (guint64) rlmt.rlim_max); } } } else { /* Just report */ if (getrlimit (RLIMIT_CORE, &rlmt) == -1) { msg_warn_main ("cannot get max core size limit: %HL, %s", cf->rlimit_maxcore, strerror (errno)); } else { msg_info_main ("use system max core size limit: %HL cur and %HL max", (guint64) rlmt.rlim_cur, (guint64) rlmt.rlim_max); } } } } static void rspamd_worker_on_term (EV_P_ ev_child *w, int revents) { struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; if (wrk->ppid == getpid ()) { if (wrk->term_handler) { wrk->term_handler (EV_A_ w, wrk->srv, wrk); } else { rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); } } else { /* Ignore SIGCHLD for not our children... */ } } static void rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents) { struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; struct rspamd_srv_command cmd; memset (&cmd, 0, sizeof (cmd)); cmd.type = RSPAMD_SRV_HEARTBEAT; rspamd_srv_send_command (wrk, EV_A, &cmd, -1, NULL, NULL); } static void rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop) { wrk->hb.heartbeat_ev.data = (void *)wrk; ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb, 0.0, wrk->srv->cfg->heartbeat_interval); ev_timer_start (event_loop, &wrk->hb.heartbeat_ev); } static void rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents) { struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; gdouble time_from_last = ev_time (); struct rspamd_main *rspamd_main; static struct rspamd_control_command cmd; struct tm tm; gchar timebuf[64]; gchar usec_buf[16]; gint r; time_from_last -= wrk->hb.last_event; rspamd_main = wrk->srv; if (wrk->hb.last_event > 0 && time_from_last > 0 && time_from_last >= rspamd_main->cfg->heartbeat_interval * 2) { rspamd_localtime (wrk->hb.last_event, &tm); r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm); rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f", wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event); rspamd_snprintf (timebuf + r, sizeof (timebuf) - r, "%s", usec_buf + 1); if (wrk->hb.nbeats > 0) { /* First time lost event */ cmd.type = RSPAMD_CONTROL_CHILD_CHANGE; cmd.cmd.child_change.what = rspamd_child_offline; cmd.cmd.child_change.pid = wrk->pid; rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid); msg_warn_main ("lost heartbeat from worker type %s with pid %P, " "last beat on: %s (%L beats received previously)", g_quark_to_string (wrk->type), wrk->pid, timebuf, wrk->hb.nbeats); wrk->hb.nbeats = -1; /* TODO: send notify about worker problem */ } else { wrk->hb.nbeats --; msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, " "last beat on: %s", -(wrk->hb.nbeats), g_quark_to_string (wrk->type), wrk->pid, timebuf); if (rspamd_main->cfg->heartbeats_loss_max > 0 && -(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) { if (-(wrk->hb.nbeats) > rspamd_main->cfg->heartbeats_loss_max + 1) { msg_err_main ("force kill worker type %s with pid %P, " "last beat on: %s; %L heartbeat lost", g_quark_to_string (wrk->type), wrk->pid, timebuf, -(wrk->hb.nbeats)); kill (wrk->pid, SIGKILL); } else { msg_err_main ("terminate worker type %s with pid %P, " "last beat on: %s; %L heartbeat lost", g_quark_to_string (wrk->type), wrk->pid, timebuf, -(wrk->hb.nbeats)); kill (wrk->pid, SIGTERM); } } } } else if (wrk->hb.nbeats < 0) { rspamd_localtime (wrk->hb.last_event, &tm); r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm); rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f", wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event); rspamd_snprintf (timebuf + r, sizeof (timebuf) - r, "%s", usec_buf + 1); cmd.type = RSPAMD_CONTROL_CHILD_CHANGE; cmd.cmd.child_change.what = rspamd_child_online; cmd.cmd.child_change.pid = wrk->pid; rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid); msg_info_main ("received heartbeat from worker type %s with pid %P, " "last beat on: %s (%L beats lost previously)", g_quark_to_string (wrk->type), wrk->pid, timebuf, -(wrk->hb.nbeats)); wrk->hb.nbeats = 1; /* TODO: send notify about worker restoration */ } } static void rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop) { wrk->hb.heartbeat_ev.data = (void *)wrk; ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb, 0.0, wrk->srv->cfg->heartbeat_interval * 2); ev_timer_start (event_loop, &wrk->hb.heartbeat_ev); } struct rspamd_worker * rspamd_fork_worker (struct rspamd_main *rspamd_main, struct rspamd_worker_conf *cf, guint index, struct ev_loop *ev_base, rspamd_worker_term_cb term_handler) { struct rspamd_worker *wrk; gint rc; struct rlimit rlim; /* Starting worker process */ wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker)); if (!rspamd_socketpair (wrk->control_pipe, 0)) { msg_err ("socketpair failure: %s", strerror (errno)); rspamd_hard_terminate (rspamd_main); } if (!rspamd_socketpair (wrk->srv_pipe, 0)) { msg_err ("socketpair failure: %s", strerror (errno)); rspamd_hard_terminate (rspamd_main); } if (cf->bind_conf) { msg_info_main ("prepare to fork process %s (%d); listen on: %s", cf->worker->name, index, cf->bind_conf->name); } else { msg_info_main ("prepare to fork process %s (%d), no bind socket", cf->worker->name, index); } wrk->srv = rspamd_main; wrk->type = cf->type; wrk->cf = cf; wrk->flags = cf->worker->flags; REF_RETAIN (cf); wrk->index = index; wrk->ctx = cf->ctx; wrk->ppid = getpid (); wrk->pid = fork (); wrk->cores_throttled = rspamd_main->cores_throttling; wrk->term_handler = term_handler; wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, rspamd_pending_control_free); switch (wrk->pid) { case 0: /* Update pid for logging */ rspamd_log_on_fork (cf->type, rspamd_main->cfg, rspamd_main->logger); wrk->pid = getpid (); /* Init PRNG after fork */ rc = ottery_init (rspamd_main->cfg->libs_ctx->ottery_cfg); if (rc != OTTERY_ERR_NONE) { msg_err_main ("cannot initialize PRNG: %d", rc); abort (); } rspamd_random_seed_fast (); #ifdef HAVE_EVUTIL_RNG_INIT evutil_secure_rng_init (); #endif /* * Libev stores all signals in a global table, so * previous handlers must be explicitly detached and forgotten * before starting a new loop */ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev); ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev); ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev); ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev); /* Remove the inherited event base */ ev_loop_destroy (rspamd_main->event_loop); rspamd_main->event_loop = NULL; /* Drop privileges */ rspamd_worker_drop_priv (rspamd_main); /* Set limits */ rspamd_worker_set_limits (rspamd_main, cf); /* Re-set stack limit */ getrlimit (RLIMIT_STACK, &rlim); rlim.rlim_cur = 100 * 1024 * 1024; rlim.rlim_max = rlim.rlim_cur; setrlimit (RLIMIT_STACK, &rlim); if (cf->bind_conf) { setproctitle ("%s process (%s)", cf->worker->name, cf->bind_conf->bind_line); } else { setproctitle ("%s process", cf->worker->name); } if (rspamd_main->pfh) { rspamd_pidfile_close (rspamd_main->pfh); } if (rspamd_main->cfg->log_silent_workers) { rspamd_log_set_log_level (rspamd_main->logger, G_LOG_LEVEL_MESSAGE); } wrk->start_time = rspamd_get_calendar_ticks (); if (cf->bind_conf) { msg_info_main ("starting %s process %P (%d); listen on: %s", cf->worker->name, getpid (), index, cf->bind_conf->bind_line); } else { msg_info_main ("starting %s process %P (%d)", cf->worker->name, getpid (), index); } /* Close parent part of socketpair */ close (wrk->control_pipe[0]); close (wrk->srv_pipe[0]); rspamd_socket_nonblocking (wrk->control_pipe[1]); rspamd_socket_nonblocking (wrk->srv_pipe[1]); rspamd_main->cfg->cur_worker = wrk; /* Execute worker (this function should not return normally!) */ cf->worker->worker_start_func (wrk); /* To distinguish from normal termination */ exit (EXIT_FAILURE); break; case -1: msg_err_main ("cannot fork main process: %s", strerror (errno)); if (rspamd_main->pfh) { rspamd_pidfile_remove (rspamd_main->pfh); } rspamd_hard_terminate (rspamd_main); break; default: /* Close worker part of socketpair */ close (wrk->control_pipe[1]); close (wrk->srv_pipe[1]); rspamd_socket_nonblocking (wrk->control_pipe[0]); rspamd_socket_nonblocking (wrk->srv_pipe[0]); rspamd_srv_start_watching (rspamd_main, wrk, ev_base); wrk->cld_ev.data = wrk; ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0); ev_child_start (rspamd_main->event_loop, &wrk->cld_ev); rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop); /* Insert worker into worker's table, pid is index */ g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid), wrk); break; } return wrk; } void rspamd_worker_block_signals (void) { sigset_t set; sigemptyset (&set); sigaddset (&set, SIGTERM); sigaddset (&set, SIGINT); sigaddset (&set, SIGHUP); sigaddset (&set, SIGUSR1); sigaddset (&set, SIGUSR2); sigprocmask (SIG_BLOCK, &set, NULL); } void rspamd_worker_unblock_signals (void) { sigset_t set; sigemptyset (&set); sigaddset (&set, SIGTERM); sigaddset (&set, SIGINT); sigaddset (&set, SIGHUP); sigaddset (&set, SIGUSR1); sigaddset (&set, SIGUSR2); sigprocmask (SIG_UNBLOCK, &set, NULL); } void rspamd_hard_terminate (struct rspamd_main *rspamd_main) { GHashTableIter it; gpointer k, v; struct rspamd_worker *w; sigset_t set; /* Block all signals */ sigemptyset (&set); sigaddset (&set, SIGTERM); sigaddset (&set, SIGINT); sigaddset (&set, SIGHUP); sigaddset (&set, SIGUSR1); sigaddset (&set, SIGUSR2); sigaddset (&set, SIGCHLD); sigprocmask (SIG_BLOCK, &set, NULL); /* We need to terminate all workers that might be already spawned */ rspamd_worker_block_signals (); g_hash_table_iter_init (&it, rspamd_main->workers); while (g_hash_table_iter_next (&it, &k, &v)) { w = v; msg_err_main ("kill worker %P as Rspamd is terminating due to " "an unrecoverable error", w->pid); kill (w->pid, SIGKILL); } msg_err_main ("shutting down Rspamd due to fatal error"); rspamd_log_close (rspamd_main->logger); exit (EXIT_FAILURE); } gboolean rspamd_worker_is_scanner (struct rspamd_worker *w) { if (w) { return !!(w->flags & RSPAMD_WORKER_SCANNER); } return FALSE; } gboolean rspamd_worker_is_primary_controller (struct rspamd_worker *w) { if (w) { return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0; } return FALSE; } struct rspamd_worker_session_elt { void *ptr; guint *pref; const gchar *tag; time_t when; }; struct rspamd_worker_session_cache { struct ev_loop *ev_base; GHashTable *cache; struct rspamd_config *cfg; struct ev_timer periodic; }; static gint rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb) { const struct rspamd_worker_session_elt *e1 = *(const struct rspamd_worker_session_elt **)pa, *e2 = *(const struct rspamd_worker_session_elt **)pb; return e2->when < e1->when; } static void rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents) { struct rspamd_worker_session_cache *c = (struct rspamd_worker_session_cache *)w->data; GHashTableIter it; gchar timebuf[32]; gpointer k, v; struct rspamd_worker_session_elt *elt; struct tm tms; GPtrArray *res; guint i; if (g_hash_table_size (c->cache) > c->cfg->max_sessions_cache) { res = g_ptr_array_sized_new (g_hash_table_size (c->cache)); g_hash_table_iter_init (&it, c->cache); while (g_hash_table_iter_next (&it, &k, &v)) { g_ptr_array_add (res, v); } msg_err ("sessions cache is overflowed %d elements where %d is limit", (gint)res->len, (gint)c->cfg->max_sessions_cache); g_ptr_array_sort (res, rspamd_session_cache_sort_cmp); PTR_ARRAY_FOREACH (res, i, elt) { rspamd_localtime (elt->when, &tms); strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tms); msg_warn ("redundant session; ptr: %p, " "tag: %s, refcount: %d, time: %s", elt->ptr, elt->tag ? elt->tag : "unknown", elt->pref ? *elt->pref : 0, timebuf); } } ev_timer_again (EV_A_ w); } void * rspamd_worker_session_cache_new (struct rspamd_worker *w, struct ev_loop *ev_base) { struct rspamd_worker_session_cache *c; static const gdouble periodic_interval = 60.0; c = g_malloc0 (sizeof (*c)); c->ev_base = ev_base; c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, g_free); c->cfg = w->srv->cfg; c->periodic.data = c; ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval, periodic_interval); ev_timer_start (ev_base, &c->periodic); return c; } void rspamd_worker_session_cache_add (void *cache, const gchar *tag, guint *pref, void *ptr) { struct rspamd_worker_session_cache *c = cache; struct rspamd_worker_session_elt *elt; elt = g_malloc0 (sizeof (*elt)); elt->pref = pref; elt->ptr = ptr; elt->tag = tag; elt->when = time (NULL); g_hash_table_insert (c->cache, elt->ptr, elt); } void rspamd_worker_session_cache_remove (void *cache, void *ptr) { struct rspamd_worker_session_cache *c = cache; g_hash_table_remove (c->cache, ptr); } static void rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx, struct rspamd_monitored *m, gboolean alive, void *ud) { struct rspamd_worker *worker = ud; struct rspamd_config *cfg = worker->srv->cfg; struct ev_loop *ev_base; guchar tag[RSPAMD_MONITORED_TAG_LEN]; static struct rspamd_srv_command srv_cmd; rspamd_monitored_get_tag (m, tag); ev_base = rspamd_monitored_ctx_get_ev_base (ctx); memset (&srv_cmd, 0, sizeof (srv_cmd)); srv_cmd.type = RSPAMD_SRV_MONITORED_CHANGE; rspamd_strlcpy (srv_cmd.cmd.monitored_change.tag, tag, sizeof (srv_cmd.cmd.monitored_change.tag)); srv_cmd.cmd.monitored_change.alive = alive; srv_cmd.cmd.monitored_change.sender = getpid (); msg_info_config ("broadcast monitored update for %s: %s", srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead"); rspamd_srv_send_command (worker, ev_base, &srv_cmd, -1, NULL, NULL); } void rspamd_worker_init_monitored (struct rspamd_worker *worker, struct ev_loop *ev_base, struct rspamd_dns_resolver *resolver) { rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx, worker->srv->cfg, ev_base, resolver->r, rspamd_worker_monitored_on_change, worker); } #ifdef HAVE_SA_SIGINFO #ifdef WITH_LIBUNWIND static void rspamd_print_crash (ucontext_t *uap) { unw_cursor_t cursor; unw_word_t ip, off; guint level; gint ret; if ((ret = unw_init_local (&cursor, uap)) != 0) { msg_err ("unw_init_local: %d", ret); return; } level = 0; ret = 0; for (;;) { char name[128]; if (level >= UNWIND_BACKTRACE_DEPTH) { break; } unw_get_reg (&cursor, UNW_REG_IP, &ip); ret = unw_get_proc_name(&cursor, name, sizeof (name), &off); if (ret == 0) { msg_err ("%d: %p: %s()+0x%xl", level, ip, name, (uintptr_t)off); } else { msg_err ("%d: %p: ", level, ip); } level++; ret = unw_step (&cursor); if (ret <= 0) { break; } } if (ret < 0) { msg_err ("unw_step_ptr: %d", ret); } } #endif static struct rspamd_main *saved_main = NULL; static gboolean rspamd_crash_propagate (gpointer key, gpointer value, gpointer unused) { struct rspamd_worker *w = value; /* Kill children softly */ kill (w->pid, SIGTERM); return TRUE; } static void rspamd_crash_sig_handler (int sig, siginfo_t *info, void *ctx) { struct sigaction sa; ucontext_t *uap = ctx; pid_t pid; pid = getpid (); msg_err ("caught fatal signal %d(%s), " "pid: %P, trace: ", sig, strsignal (sig), pid); (void)uap; #ifdef WITH_LIBUNWIND rspamd_print_crash (uap); #endif msg_err ("please see Rspamd FAQ to learn how to dump core files and how to " "fill a bug report"); if (saved_main) { if (pid == saved_main->pid) { /* * Main process has crashed, propagate crash further to trigger * monitoring alerts and mass panic */ g_hash_table_foreach_remove (saved_main->workers, rspamd_crash_propagate, NULL); } } /* * Invoke signal with the default handler */ sigemptyset (&sa.sa_mask); sa.sa_handler = SIG_DFL; sa.sa_flags = 0; sigaction (sig, &sa, NULL); kill (pid, sig); } #endif void rspamd_set_crash_handler (struct rspamd_main *rspamd_main) { #ifdef HAVE_SA_SIGINFO struct sigaction sa; #ifdef HAVE_SIGALTSTACK stack_t ss; memset (&ss, 0, sizeof ss); /* Allocate special stack, NOT freed at the end so far */ ss.ss_size = MAX (SIGSTKSZ, 8192 * 4); ss.ss_sp = g_malloc0 (ss.ss_size); sigaltstack (&ss, NULL); #endif saved_main = rspamd_main; sigemptyset (&sa.sa_mask); sa.sa_sigaction = &rspamd_crash_sig_handler; sa.sa_flags = SA_RESTART | SA_SIGINFO | SA_ONSTACK; sigaction (SIGSEGV, &sa, NULL); sigaction (SIGBUS, &sa, NULL); sigaction (SIGABRT, &sa, NULL); sigaction (SIGFPE, &sa, NULL); sigaction (SIGSYS, &sa, NULL); #endif } static void rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents) { struct rspamd_worker_accept_event *ac_ev = (struct rspamd_worker_accept_event *)w->data; ev_timer_stop (EV_A_ w); ev_io_start (EV_A_ &ac_ev->accept_ev); } void rspamd_worker_throttle_accept_events (gint sock, void *data) { struct rspamd_worker_accept_event *head, *cur; const gdouble throttling = 0.5; head = (struct rspamd_worker_accept_event *)data; DL_FOREACH (head, cur) { ev_io_stop (cur->event_loop, &cur->accept_ev); cur->throttling_ev.data = cur; ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event, throttling, 0.0); ev_timer_start (cur->event_loop, &cur->throttling_ev); } } gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main, struct rspamd_worker *wrk, int res) { gboolean need_refork = TRUE; if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) { /* Do not refork workers that are intended to be terminated */ need_refork = FALSE; } if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { /* Normal worker termination, do not fork one more */ if (wrk->hb.nbeats < 0 && rspamd_main->cfg->heartbeats_loss_max > 0 && -(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) { msg_info_main ("%s process %P terminated normally, but lost %L " "heartbeats, refork it", g_quark_to_string (wrk->type), wrk->pid, -(wrk->hb.nbeats)); need_refork = TRUE; } else { msg_info_main ("%s process %P terminated normally", g_quark_to_string (wrk->type), wrk->pid); need_refork = FALSE; } } else { if (WIFSIGNALED (res)) { #ifdef WCOREDUMP if (WCOREDUMP (res)) { msg_warn_main ( "%s process %P terminated abnormally by signal: %s" " and created core file; please see Rspamd FAQ " "to learn how to extract data from core file and " "fill a bug report", g_quark_to_string (wrk->type), wrk->pid, g_strsignal (WTERMSIG (res))); } else { #ifdef HAVE_SYS_RESOURCE_H struct rlimit rlmt; (void) getrlimit (RLIMIT_CORE, &rlmt); msg_warn_main ( "%s process %P terminated abnormally with exit code %d by " "signal: %s" " but NOT created core file (throttled=%s); " "core file limits: %L current, %L max", g_quark_to_string (wrk->type), wrk->pid, WEXITSTATUS (res), g_strsignal (WTERMSIG (res)), wrk->cores_throttled ? "yes" : "no", (gint64) rlmt.rlim_cur, (gint64) rlmt.rlim_max); #else msg_warn_main ( "%s process %P terminated abnormally with exit code %d by signal: %s" " but NOT created core file (throttled=%s); ", g_quark_to_string (wrk->type), wrk->pid, WEXITSTATUS (res), g_strsignal (WTERMSIG (res)), wrk->cores_throttled ? "yes" : "no"); #endif } #else msg_warn_main ( "%s process %P terminated abnormally with exit code %d by signal: %s", g_quark_to_string (wrk->type), wrk->pid, WEXITSTATUS (res), g_strsignal (WTERMSIG (res))); #endif if (WTERMSIG (res) == SIGUSR2) { /* * It is actually race condition when not started process * has been requested to be reloaded. * * We shouldn't refork on this */ need_refork = FALSE; } } else { msg_warn_main ("%s process %P terminated abnormally " "(but it was not killed by a signal) " "with exit code %d", g_quark_to_string (wrk->type), wrk->pid, WEXITSTATUS (res)); } } return need_refork; } #ifdef WITH_HYPERSCAN gboolean rspamd_worker_hyperscan_ready (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, gint attached_fd, struct rspamd_control_command *cmd, gpointer ud) { struct rspamd_control_reply rep; struct rspamd_re_cache *cache = worker->srv->cfg->re_cache; memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED; if (rspamd_re_cache_is_hs_loaded (cache) != RSPAMD_HYPERSCAN_LOADED_FULL || cmd->cmd.hs_loaded.forced) { msg_info ("loading hyperscan expressions after receiving compilation " "notice: %s", (rspamd_re_cache_is_hs_loaded (cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update"); rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan ( worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false); } if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", strerror (errno)); } return TRUE; } #endif /* With Hyperscan */ gboolean rspamd_worker_check_context (gpointer ctx, guint64 magic) { struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx; return actx->magic == magic; } static gboolean rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, gint attached_fd, struct rspamd_control_command *cmd, gpointer ud) { struct rspamd_config *cfg = ud; struct rspamd_worker_log_pipe *lp; struct rspamd_control_reply rep; memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_LOG_PIPE; if (attached_fd != -1) { lp = g_malloc0 (sizeof (*lp)); lp->fd = attached_fd; lp->type = cmd->cmd.log_pipe.type; DL_APPEND (cfg->log_pipes, lp); msg_info ("added new log pipe"); } else { rep.reply.log_pipe.status = ENOENT; msg_err ("cannot attach log pipe: invalid fd"); } if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", strerror (errno)); } return TRUE; } static gboolean rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, gint attached_fd, struct rspamd_control_command *cmd, gpointer ud) { struct rspamd_control_reply rep; struct rspamd_monitored *m; struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx; struct rspamd_config *cfg = ud; memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_MONITORED_CHANGE; if (cmd->cmd.monitored_change.sender != getpid ()) { m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag); if (m != NULL) { rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive); rep.reply.monitored_change.status = 1; msg_info_config ("updated monitored status for %s: %s", cmd->cmd.monitored_change.tag, cmd->cmd.monitored_change.alive ? "alive" : "dead"); } else { msg_err ("cannot find monitored by tag: %*s", 32, cmd->cmd.monitored_change.tag); rep.reply.monitored_change.status = 0; } } if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", strerror (errno)); } return TRUE; } void rspamd_worker_init_scanner (struct rspamd_worker *worker, struct ev_loop *ev_base, struct rspamd_dns_resolver *resolver, struct rspamd_lang_detector **plang_det) { rspamd_stat_init (worker->srv->cfg, ev_base); #ifdef WITH_HYPERSCAN rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_HYPERSCAN_LOADED, rspamd_worker_hyperscan_ready, NULL); #endif rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_LOG_PIPE, rspamd_worker_log_pipe_handler, worker->srv->cfg); rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_MONITORED_CHANGE, rspamd_worker_monitored_handler, worker->srv->cfg); *plang_det = worker->srv->cfg->lang_det; } void rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main, struct rspamd_config *cfg) { struct rspamd_stat *stat; ucl_object_t *top, *sub; struct ucl_emitter_functions *efuncs; gint i, fd; gchar fpath[PATH_MAX]; if (cfg->stats_file == NULL) { return; } rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file); fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644); if (fd == -1) { msg_err_config ("cannot open for writing controller stats from %s: %s", fpath, strerror (errno)); return; } stat = rspamd_main->stat; top = ucl_object_typed_new (UCL_OBJECT); ucl_object_insert_key (top, ucl_object_fromint ( stat->messages_scanned), "scanned", 0, false); ucl_object_insert_key (top, ucl_object_fromint ( stat->messages_learned), "learned", 0, false); if (stat->messages_scanned > 0) { sub = ucl_object_typed_new (UCL_OBJECT); for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { ucl_object_insert_key (sub, ucl_object_fromint (stat->actions_stat[i]), rspamd_action_to_str (i), 0, false); } ucl_object_insert_key (top, sub, "actions", 0, false); } ucl_object_insert_key (top, ucl_object_fromint (stat->connections_count), "connections", 0, false); ucl_object_insert_key (top, ucl_object_fromint (stat->control_connections_count), "control_connections", 0, false); efuncs = ucl_object_emit_fd_funcs (fd); if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, efuncs, NULL)) { msg_err_config ("cannot write stats to %s: %s", fpath, strerror (errno)); unlink (fpath); } else { if (rename (fpath, cfg->stats_file) == -1) { msg_err_config ("cannot rename stats from %s to %s: %s", fpath, cfg->stats_file, strerror (errno)); } } ucl_object_unref (top); close (fd); ucl_object_emit_funcs_free (efuncs); } static ev_timer rrd_timer; void rspamd_controller_on_terminate (struct rspamd_worker *worker, struct rspamd_rrd_file *rrd) { struct rspamd_abstract_worker_ctx *ctx; ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg); if (rrd) { ev_timer_stop (ctx->event_loop, &rrd_timer); msg_info ("closing rrd file: %s", rrd->filename); rspamd_rrd_close (rrd); } } static void rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main, struct rspamd_config *cfg) { struct ucl_parser *parser; ucl_object_t *obj; const ucl_object_t *elt, *subelt; struct rspamd_stat *stat, stat_copy; gint i; if (cfg->stats_file == NULL) { return; } if (access (cfg->stats_file, R_OK) == -1) { msg_err_config ("cannot load controller stats from %s: %s", cfg->stats_file, strerror (errno)); return; } parser = ucl_parser_new (0); if (!ucl_parser_add_file (parser, cfg->stats_file)) { msg_err_config ("cannot parse controller stats from %s: %s", cfg->stats_file, ucl_parser_get_error (parser)); ucl_parser_free (parser); return; } obj = ucl_parser_get_object (parser); ucl_parser_free (parser); stat = rspamd_main->stat; memcpy (&stat_copy, stat, sizeof (stat_copy)); elt = ucl_object_lookup (obj, "scanned"); if (elt != NULL && ucl_object_type (elt) == UCL_INT) { stat_copy.messages_scanned = ucl_object_toint (elt); } elt = ucl_object_lookup (obj, "learned"); if (elt != NULL && ucl_object_type (elt) == UCL_INT) { stat_copy.messages_learned = ucl_object_toint (elt); } elt = ucl_object_lookup (obj, "actions"); if (elt != NULL) { for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { subelt = ucl_object_lookup (elt, rspamd_action_to_str (i)); if (subelt && ucl_object_type (subelt) == UCL_INT) { stat_copy.actions_stat[i] = ucl_object_toint (subelt); } } } elt = ucl_object_lookup (obj, "connections_count"); if (elt != NULL && ucl_object_type (elt) == UCL_INT) { stat_copy.connections_count = ucl_object_toint (elt); } elt = ucl_object_lookup (obj, "control_connections_count"); if (elt != NULL && ucl_object_type (elt) == UCL_INT) { stat_copy.control_connections_count = ucl_object_toint (elt); } ucl_object_unref (obj); memcpy (stat, &stat_copy, sizeof (stat_copy)); } struct rspamd_controller_periodics_cbdata { struct rspamd_worker *worker; struct rspamd_rrd_file *rrd; struct rspamd_stat *stat; ev_timer save_stats_event; }; static void rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents) { struct rspamd_controller_periodics_cbdata *cbd = (struct rspamd_controller_periodics_cbdata *)w->data; struct rspamd_stat *stat; GArray ar; gdouble points[METRIC_ACTION_MAX]; GError *err = NULL; guint i; g_assert (cbd->rrd != NULL); stat = cbd->stat; for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) { points[i] = stat->actions_stat[i]; } ar.data = (gchar *)points; ar.len = sizeof (points); if (!rspamd_rrd_add_record (cbd->rrd, &ar, rspamd_get_calendar_ticks (), &err)) { msg_err ("cannot update rrd file: %e", err); g_error_free (err); } /* Plan new event */ ev_timer_again (EV_A_ w); } static void rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents) { struct rspamd_controller_periodics_cbdata *cbd = (struct rspamd_controller_periodics_cbdata *)w->data; rspamd_controller_store_saved_stats (cbd->worker->srv, cbd->worker->srv->cfg); ev_timer_again (EV_A_ w); } void rspamd_worker_init_controller (struct rspamd_worker *worker, struct rspamd_rrd_file **prrd) { struct rspamd_abstract_worker_ctx *ctx; static const ev_tstamp rrd_update_time = 1.0; ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; rspamd_controller_load_saved_stats (worker->srv, worker->srv->cfg); if (worker->index == 0) { /* Enable periodics and other stuff */ static struct rspamd_controller_periodics_cbdata cbd; const ev_tstamp save_stats_interval = 60; /* 1 minute */ memset (&cbd, 0, sizeof (cbd)); cbd.save_stats_event.data = &cbd; cbd.worker = worker; cbd.stat = worker->srv->stat; ev_timer_init (&cbd.save_stats_event, rspamd_controller_stats_save_periodic, save_stats_interval, save_stats_interval); ev_timer_start (ctx->event_loop, &cbd.save_stats_event); rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER); if (prrd != NULL) { if (ctx->cfg->rrd_file && worker->index == 0) { GError *rrd_err = NULL; *prrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err); if (*prrd) { cbd.rrd = *prrd; rrd_timer.data = &cbd; ev_timer_init (&rrd_timer, rspamd_controller_rrd_update, rrd_update_time, rrd_update_time); ev_timer_start (ctx->event_loop, &rrd_timer); } else if (rrd_err) { msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file, rrd_err); g_error_free (rrd_err); } else { msg_err ("cannot load rrd from %s: unknown error", ctx->cfg->rrd_file); } } else { *prrd = NULL; } } if (!ctx->cfg->disable_monitored) { rspamd_worker_init_monitored (worker, ctx->event_loop, ctx->resolver); } } else { rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, RSPAMD_MAP_WATCH_SCANNER); } }