diff options
Diffstat (limited to 'src/libserver/worker_util.c')
-rw-r--r-- | src/libserver/worker_util.c | 1768 |
1 files changed, 879 insertions, 889 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 3c90b8fb1..d2a900e01 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -67,24 +67,24 @@ struct rspamd_worker *rspamd_current_worker = NULL; /* Forward declaration */ -static void rspamd_worker_heartbeat_start (struct rspamd_worker *, - struct ev_loop *); +static void rspamd_worker_heartbeat_start(struct rspamd_worker *, + struct ev_loop *); -static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *); +static void rspamd_worker_ignore_signal(struct rspamd_worker_signal_handler *); /** * Return worker's control structure by its type * @param type * @return worker's control structure or NULL */ worker_t * -rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type) +rspamd_get_worker_by_type(struct rspamd_config *cfg, GQuark type) { worker_t **pwrk; pwrk = cfg->compiled_workers; while (pwrk && *pwrk) { - if (rspamd_check_worker (cfg, *pwrk)) { - if (g_quark_from_string ((*pwrk)->name) == type) { + if (rspamd_check_worker(cfg, *pwrk)) { + if (g_quark_from_string((*pwrk)->name) == type) { return *pwrk; } } @@ -96,36 +96,36 @@ rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type) } static void -rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents) +rspamd_worker_check_finished(EV_P_ ev_timer *w, int revents) { - int *pnchecks = (int *)w->data; + int *pnchecks = (int *) w->data; if (*pnchecks > SOFT_SHUTDOWN_TIME * 10) { - msg_warn ("terminating worker before finishing of terminate handlers"); - ev_break (EV_A_ EVBREAK_ONE); + msg_warn("terminating worker before finishing of terminate handlers"); + ev_break(EV_A_ EVBREAK_ONE); } else { - int refcount = ev_active_cnt (EV_A); + int refcount = ev_active_cnt(EV_A); if (refcount == 1) { - ev_break (EV_A_ EVBREAK_ONE); + ev_break(EV_A_ EVBREAK_ONE); } else { - ev_timer_again (EV_A_ w); + ev_timer_again(EV_A_ w); } } } static gboolean -rspamd_worker_finalize (gpointer user_data) +rspamd_worker_finalize(gpointer user_data) { struct rspamd_task *task = user_data; if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) { - msg_info_task ("finishing actions has been processed, terminating"); + msg_info_task("finishing actions has been processed, terminating"); /* ev_break (task->event_loop, EVBREAK_ALL); */ task->worker->state = rspamd_worker_wanna_die; - rspamd_session_destroy (task->s); + rspamd_session_destroy(task->s); return TRUE; } @@ -134,7 +134,7 @@ rspamd_worker_finalize (gpointer user_data) } gboolean -rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) +rspamd_worker_call_finish_handlers(struct rspamd_worker *worker) { struct rspamd_task *task; struct rspamd_config *cfg = worker->srv->cfg; @@ -142,24 +142,25 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) struct rspamd_config_cfg_lua_script *sc; if (cfg->on_term_scripts) { - ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; + ctx = (struct rspamd_abstract_worker_ctx *) worker->ctx; /* Create a fake task object for async events */ - task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE); + task = rspamd_task_new(worker, cfg, NULL, NULL, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->flags |= RSPAMD_TASK_FLAG_PROCESSING; - task->s = rspamd_session_create (task->task_pool, - rspamd_worker_finalize, - NULL, - (event_finalizer_t) rspamd_task_free, - task); + task->s = rspamd_session_create(task->task_pool, + rspamd_worker_finalize, + NULL, + (event_finalizer_t) rspamd_task_free, + task); - DL_FOREACH (cfg->on_term_scripts, sc) { - lua_call_finish_script (sc, task); + DL_FOREACH(cfg->on_term_scripts, sc) + { + lua_call_finish_script(sc, task); } task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING; - if (rspamd_session_pending (task->s)) { + if (rspamd_session_pending(task->s)) { return TRUE; } } @@ -168,7 +169,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) } static void -rspamd_worker_terminate_handlers (struct rspamd_worker *w) +rspamd_worker_terminate_handlers(struct rspamd_worker *w) { if (w->nconns == 0 && (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) { @@ -195,15 +196,15 @@ rspamd_worker_terminate_handlers (struct rspamd_worker *w) w->state = rspamd_worker_wait_final_scripts; if ((w->flags & RSPAMD_WORKER_SCANNER) && - rspamd_worker_call_finish_handlers (w)) { - msg_info ("performing async finishing actions"); + rspamd_worker_call_finish_handlers(w)) { + msg_info("performing async finishing actions"); w->state = rspamd_worker_wait_final_scripts; } else { /* * We are done now */ - msg_info ("no async finishing actions, terminating"); + msg_info("no async finishing actions, terminating"); w->state = rspamd_worker_wanna_die; } } @@ -212,36 +213,36 @@ rspamd_worker_terminate_handlers (struct rspamd_worker *w) } static void -rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents) +rspamd_worker_on_delayed_shutdown(EV_P_ ev_timer *w, int revents) { - struct rspamd_worker *worker = (struct rspamd_worker *)w->data; + struct rspamd_worker *worker = (struct rspamd_worker *) w->data; worker->state = rspamd_worker_wanna_die; - ev_timer_stop (EV_A_ w); - ev_break (loop, EVBREAK_ALL); + ev_timer_stop(EV_A_ w); + ev_break(loop, EVBREAK_ALL); } static void -rspamd_worker_shutdown_check (EV_P_ ev_timer *w, int revents) +rspamd_worker_shutdown_check(EV_P_ ev_timer *w, int revents) { - struct rspamd_worker *worker = (struct rspamd_worker *)w->data; + struct rspamd_worker *worker = (struct rspamd_worker *) w->data; if (worker->state != rspamd_worker_wanna_die) { - rspamd_worker_terminate_handlers (worker); + rspamd_worker_terminate_handlers(worker); if (worker->state == rspamd_worker_wanna_die) { /* We are done, kill event loop */ - ev_timer_stop (EV_A_ w); - ev_break (EV_A_ EVBREAK_ALL); + ev_timer_stop(EV_A_ w); + ev_break(EV_A_ EVBREAK_ALL); } else { /* Try again later */ - ev_timer_again (EV_A_ w); + ev_timer_again(EV_A_ w); } } else { - ev_timer_stop (EV_A_ w); - ev_break (EV_A_ EVBREAK_ALL); + ev_timer_stop(EV_A_ w); + ev_break(EV_A_ EVBREAK_ALL); } } @@ -249,7 +250,7 @@ rspamd_worker_shutdown_check (EV_P_ ev_timer *w, int revents) * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them */ static gboolean -rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg) +rspamd_worker_usr2_handler(struct rspamd_worker_signal_handler *sigh, void *arg) { /* Do not accept new connections, preparing to end worker's process */ if (sigh->worker->state == rspamd_worker_state_running) { @@ -260,35 +261,35 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg shutdown_ts = 0.0; } else { - shutdown_ts = MAX (SOFT_SHUTDOWN_TIME, - sigh->worker->srv->cfg->task_timeout * 2.0); + shutdown_ts = MAX(SOFT_SHUTDOWN_TIME, + sigh->worker->srv->cfg->task_timeout * 2.0); } - rspamd_worker_ignore_signal (sigh); + rspamd_worker_ignore_signal(sigh); sigh->worker->state = rspamd_worker_state_terminating; - rspamd_default_log_function (G_LOG_LEVEL_INFO, - sigh->worker->srv->server_pool->tag.tagname, - sigh->worker->srv->server_pool->tag.uid, - G_STRFUNC, - "worker's shutdown is pending in %.2f sec", - shutdown_ts); + rspamd_default_log_function(G_LOG_LEVEL_INFO, + sigh->worker->srv->server_pool->tag.tagname, + sigh->worker->srv->server_pool->tag.uid, + G_STRFUNC, + "worker's shutdown is pending in %.2f sec", + shutdown_ts); /* Soft shutdown timer */ shutdown_ev.data = sigh->worker; - ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, - shutdown_ts, 0.0); - ev_timer_start (sigh->event_loop, &shutdown_ev); + ev_timer_init(&shutdown_ev, rspamd_worker_on_delayed_shutdown, + shutdown_ts, 0.0); + ev_timer_start(sigh->event_loop, &shutdown_ev); if (!(sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY)) { /* This timer checks if we are ready to die and is called frequently */ shutdown_check_ev.data = sigh->worker; - ev_timer_init (&shutdown_check_ev, rspamd_worker_shutdown_check, - 0.5, 0.5); - ev_timer_start (sigh->event_loop, &shutdown_check_ev); + ev_timer_init(&shutdown_check_ev, rspamd_worker_shutdown_check, + 0.5, 0.5); + ev_timer_start(sigh->event_loop, &shutdown_check_ev); } - rspamd_worker_stop_accept (sigh->worker); + rspamd_worker_stop_accept(sigh->worker); } /* No more signals */ @@ -299,19 +300,19 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them */ static gboolean -rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg) +rspamd_worker_usr1_handler(struct rspamd_worker_signal_handler *sigh, void *arg) { struct rspamd_main *rspamd_main = sigh->worker->srv; - rspamd_log_reopen (sigh->worker->srv->logger, rspamd_main->cfg, -1, -1); - msg_info_main ("logging reinitialised"); + rspamd_log_reopen(sigh->worker->srv->logger, rspamd_main->cfg, -1, -1); + msg_info_main("logging reinitialised"); /* Get more signals */ return TRUE; } static gboolean -rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg) +rspamd_worker_term_handler(struct rspamd_worker_signal_handler *sigh, void *arg) { if (sigh->worker->state == rspamd_worker_state_running) { static ev_timer shutdown_ev, shutdown_check_ev; @@ -321,41 +322,41 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg shutdown_ts = 0.0; } else { - shutdown_ts = MAX (SOFT_SHUTDOWN_TIME, - sigh->worker->srv->cfg->task_timeout * 2.0); + shutdown_ts = MAX(SOFT_SHUTDOWN_TIME, + sigh->worker->srv->cfg->task_timeout * 2.0); } - rspamd_worker_ignore_signal (sigh); + rspamd_worker_ignore_signal(sigh); sigh->worker->state = rspamd_worker_state_terminating; - rspamd_default_log_function (G_LOG_LEVEL_INFO, - sigh->worker->srv->server_pool->tag.tagname, - sigh->worker->srv->server_pool->tag.uid, - G_STRFUNC, - "terminating after receiving signal %s", - g_strsignal (sigh->signo)); + rspamd_default_log_function(G_LOG_LEVEL_INFO, + sigh->worker->srv->server_pool->tag.tagname, + sigh->worker->srv->server_pool->tag.uid, + G_STRFUNC, + "terminating after receiving signal %s", + g_strsignal(sigh->signo)); - rspamd_worker_stop_accept (sigh->worker); - rspamd_worker_terminate_handlers (sigh->worker); + rspamd_worker_stop_accept(sigh->worker); + rspamd_worker_terminate_handlers(sigh->worker); /* Check if we are ready to die */ if (sigh->worker->state != rspamd_worker_wanna_die) { /* This timer is called when we have no choices but to die */ shutdown_ev.data = sigh->worker; - ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, - shutdown_ts, 0.0); - ev_timer_start (sigh->event_loop, &shutdown_ev); + ev_timer_init(&shutdown_ev, rspamd_worker_on_delayed_shutdown, + shutdown_ts, 0.0); + ev_timer_start(sigh->event_loop, &shutdown_ev); if (!(sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY)) { /* This timer checks if we are ready to die and is called frequently */ shutdown_check_ev.data = sigh->worker; - ev_timer_init (&shutdown_check_ev, rspamd_worker_shutdown_check, - 0.5, 0.5); - ev_timer_start (sigh->event_loop, &shutdown_check_ev); + ev_timer_init(&shutdown_check_ev, rspamd_worker_shutdown_check, + 0.5, 0.5); + ev_timer_start(sigh->event_loop, &shutdown_check_ev); } } else { /* Flag to die has been already set */ - ev_break (sigh->event_loop, EVBREAK_ALL); + ev_break(sigh->event_loop, EVBREAK_ALL); } } @@ -364,134 +365,134 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg } static void -rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents) +rspamd_worker_signal_handle(EV_P_ ev_signal *w, int revents) { struct rspamd_worker_signal_handler *sigh = - (struct rspamd_worker_signal_handler *)w->data; + (struct rspamd_worker_signal_handler *) w->data; struct rspamd_worker_signal_handler_elt *cb, *cbtmp; /* Call all signal handlers registered */ - DL_FOREACH_SAFE (sigh->cb, cb, cbtmp) { - if (!cb->handler (sigh, cb->handler_data)) { - DL_DELETE (sigh->cb, cb); - g_free (cb); + DL_FOREACH_SAFE(sigh->cb, cb, cbtmp) + { + if (!cb->handler(sigh, cb->handler_data)) { + DL_DELETE(sigh->cb, cb); + g_free(cb); } } } static void -rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh) +rspamd_worker_ignore_signal(struct rspamd_worker_signal_handler *sigh) { sigset_t set; - ev_signal_stop (sigh->event_loop, &sigh->ev_sig); - sigemptyset (&set); - sigaddset (&set, sigh->signo); - sigprocmask (SIG_BLOCK, &set, NULL); + ev_signal_stop(sigh->event_loop, &sigh->ev_sig); + sigemptyset(&set); + sigaddset(&set, sigh->signo); + sigprocmask(SIG_BLOCK, &set, NULL); } static void -rspamd_worker_default_signal (int signo) +rspamd_worker_default_signal(int signo) { struct sigaction sig; - sigemptyset (&sig.sa_mask); - sigaddset (&sig.sa_mask, signo); + sigemptyset(&sig.sa_mask); + sigaddset(&sig.sa_mask, signo); sig.sa_handler = SIG_DFL; sig.sa_flags = 0; - sigaction (signo, &sig, NULL); + sigaction(signo, &sig, NULL); } static void -rspamd_sigh_free (void *p) +rspamd_sigh_free(void *p) { struct rspamd_worker_signal_handler *sigh = p; struct rspamd_worker_signal_handler_elt *cb, *tmp; - DL_FOREACH_SAFE (sigh->cb, cb, tmp) { - DL_DELETE (sigh->cb, cb); - g_free (cb); + DL_FOREACH_SAFE(sigh->cb, cb, tmp) + { + DL_DELETE(sigh->cb, cb); + g_free(cb); } - ev_signal_stop (sigh->event_loop, &sigh->ev_sig); - rspamd_worker_default_signal (sigh->signo); - g_free (sigh); + ev_signal_stop(sigh->event_loop, &sigh->ev_sig); + rspamd_worker_default_signal(sigh->signo); + g_free(sigh); } -void -rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker, - struct ev_loop *event_loop, - rspamd_worker_signal_cb_t handler, - void *handler_data) +void rspamd_worker_set_signal_handler(int signo, struct rspamd_worker *worker, + struct ev_loop *event_loop, + rspamd_worker_signal_cb_t handler, + void *handler_data) { struct rspamd_worker_signal_handler *sigh; struct rspamd_worker_signal_handler_elt *cb; - sigh = g_hash_table_lookup (worker->signal_events, GINT_TO_POINTER (signo)); + sigh = g_hash_table_lookup(worker->signal_events, GINT_TO_POINTER(signo)); if (sigh == NULL) { - sigh = g_malloc0 (sizeof (*sigh)); + sigh = g_malloc0(sizeof(*sigh)); sigh->signo = signo; sigh->worker = worker; sigh->event_loop = event_loop; sigh->enabled = TRUE; sigh->ev_sig.data = sigh; - ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo); - ev_signal_start (event_loop, &sigh->ev_sig); + ev_signal_init(&sigh->ev_sig, rspamd_worker_signal_handle, signo); + ev_signal_start(event_loop, &sigh->ev_sig); - g_hash_table_insert (worker->signal_events, - GINT_TO_POINTER (signo), - sigh); + g_hash_table_insert(worker->signal_events, + GINT_TO_POINTER(signo), + sigh); } - cb = g_malloc0 (sizeof (*cb)); + cb = g_malloc0(sizeof(*cb)); cb->handler = handler; cb->handler_data = handler_data; - DL_APPEND (sigh->cb, cb); + DL_APPEND(sigh->cb, cb); } -void -rspamd_worker_init_signals (struct rspamd_worker *worker, - struct ev_loop *event_loop) +void rspamd_worker_init_signals(struct rspamd_worker *worker, + struct ev_loop *event_loop) { /* A set of terminating signals */ - rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop, - rspamd_worker_term_handler, NULL); - rspamd_worker_set_signal_handler (SIGINT, worker, event_loop, - rspamd_worker_term_handler, NULL); - rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop, - rspamd_worker_term_handler, NULL); + rspamd_worker_set_signal_handler(SIGTERM, worker, event_loop, + rspamd_worker_term_handler, NULL); + rspamd_worker_set_signal_handler(SIGINT, worker, event_loop, + rspamd_worker_term_handler, NULL); + rspamd_worker_set_signal_handler(SIGHUP, worker, event_loop, + rspamd_worker_term_handler, NULL); /* Special purpose signals */ - rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop, - rspamd_worker_usr1_handler, NULL); - rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop, - rspamd_worker_usr2_handler, NULL); + rspamd_worker_set_signal_handler(SIGUSR1, worker, event_loop, + rspamd_worker_usr1_handler, NULL); + rspamd_worker_set_signal_handler(SIGUSR2, worker, event_loop, + rspamd_worker_usr2_handler, NULL); } struct ev_loop * -rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, - rspamd_accept_handler hdl) +rspamd_prepare_worker(struct rspamd_worker *worker, const char *name, + rspamd_accept_handler hdl) { struct ev_loop *event_loop; GList *cur; struct rspamd_worker_listen_socket *ls; struct rspamd_worker_accept_event *accept_ev; - worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, - NULL, rspamd_sigh_free); + worker->signal_events = g_hash_table_new_full(g_direct_hash, g_direct_equal, + NULL, rspamd_sigh_free); - event_loop = ev_loop_new (rspamd_config_ev_backend_get (worker->srv->cfg)); + event_loop = ev_loop_new(rspamd_config_ev_backend_get(worker->srv->cfg)); worker->srv->event_loop = event_loop; - rspamd_worker_init_signals (worker, event_loop); - rspamd_control_worker_add_default_cmd_handlers (worker, event_loop); - rspamd_worker_heartbeat_start (worker, event_loop); - rspamd_redis_pool_config (worker->srv->cfg->redis_pool, - worker->srv->cfg, event_loop); + rspamd_worker_init_signals(worker, event_loop); + rspamd_control_worker_add_default_cmd_handlers(worker, event_loop); + rspamd_worker_heartbeat_start(worker, event_loop); + rspamd_redis_pool_config(worker->srv->cfg->redis_pool, + worker->srv->cfg, event_loop); /* Accept all sockets */ if (hdl) { @@ -501,40 +502,40 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, ls = cur->data; if (ls->fd != -1) { - accept_ev = g_malloc0 (sizeof (*accept_ev)); + accept_ev = g_malloc0(sizeof(*accept_ev)); accept_ev->event_loop = event_loop; accept_ev->accept_ev.data = worker; - ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ); - ev_io_start (event_loop, &accept_ev->accept_ev); + ev_io_init(&accept_ev->accept_ev, hdl, ls->fd, EV_READ); + ev_io_start(event_loop, &accept_ev->accept_ev); - DL_APPEND (worker->accept_events, accept_ev); + DL_APPEND(worker->accept_events, accept_ev); } - cur = g_list_next (cur); + cur = g_list_next(cur); } } return event_loop; } -void -rspamd_worker_stop_accept (struct rspamd_worker *worker) +void rspamd_worker_stop_accept(struct rspamd_worker *worker) { struct rspamd_worker_accept_event *cur, *tmp; /* Remove all events */ - DL_FOREACH_SAFE (worker->accept_events, cur, tmp) { + DL_FOREACH_SAFE(worker->accept_events, cur, tmp) + { - if (ev_can_stop (&cur->accept_ev)) { - ev_io_stop (cur->event_loop, &cur->accept_ev); + if (ev_can_stop(&cur->accept_ev)) { + ev_io_stop(cur->event_loop, &cur->accept_ev); } - if (ev_can_stop (&cur->throttling_ev)) { - ev_timer_stop (cur->event_loop, &cur->throttling_ev); + if (ev_can_stop(&cur->throttling_ev)) { + ev_timer_stop(cur->event_loop, &cur->throttling_ev); } - g_free (cur); + g_free(cur); } /* XXX: we need to do it much later */ @@ -557,162 +558,158 @@ rspamd_worker_stop_accept (struct rspamd_worker *worker) } static rspamd_fstring_t * -rspamd_controller_maybe_compress (struct rspamd_http_connection_entry *entry, - rspamd_fstring_t *buf, struct rspamd_http_message *msg) +rspamd_controller_maybe_compress(struct rspamd_http_connection_entry *entry, + rspamd_fstring_t *buf, struct rspamd_http_message *msg) { if (entry->support_gzip) { - if (rspamd_fstring_gzip (&buf)) { - rspamd_http_message_add_header (msg, "Content-Encoding", "gzip"); + if (rspamd_fstring_gzip(&buf)) { + rspamd_http_message_add_header(msg, "Content-Encoding", "gzip"); } } return buf; } -void -rspamd_controller_send_error (struct rspamd_http_connection_entry *entry, - gint code, const gchar *error_msg, ...) +void rspamd_controller_send_error(struct rspamd_http_connection_entry *entry, + gint code, const gchar *error_msg, ...) { struct rspamd_http_message *msg; va_list args; rspamd_fstring_t *reply; - msg = rspamd_http_new_message (HTTP_RESPONSE); + msg = rspamd_http_new_message(HTTP_RESPONSE); - va_start (args, error_msg); - msg->status = rspamd_fstring_new (); - rspamd_vprintf_fstring (&msg->status, error_msg, args); - va_end (args); + va_start(args, error_msg); + msg->status = rspamd_fstring_new(); + rspamd_vprintf_fstring(&msg->status, error_msg, args); + va_end(args); - msg->date = time (NULL); + msg->date = time(NULL); msg->code = code; - reply = rspamd_fstring_sized_new (msg->status->len + 16); - rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status); - rspamd_http_message_set_body_from_fstring_steal (msg, - rspamd_controller_maybe_compress (entry, reply, msg)); - rspamd_http_connection_reset (entry->conn); - rspamd_http_router_insert_headers (entry->rt, msg); - rspamd_http_connection_write_message (entry->conn, - msg, - NULL, - "application/json", - entry, - entry->rt->timeout); + reply = rspamd_fstring_sized_new(msg->status->len + 16); + rspamd_printf_fstring(&reply, "{\"error\":\"%V\"}", msg->status); + rspamd_http_message_set_body_from_fstring_steal(msg, + rspamd_controller_maybe_compress(entry, reply, msg)); + rspamd_http_connection_reset(entry->conn); + rspamd_http_router_insert_headers(entry->rt, msg); + rspamd_http_connection_write_message(entry->conn, + msg, + NULL, + "application/json", + entry, + entry->rt->timeout); entry->is_reply = TRUE; } -void -rspamd_controller_send_openmetrics (struct rspamd_http_connection_entry *entry, - rspamd_fstring_t *str) +void rspamd_controller_send_openmetrics(struct rspamd_http_connection_entry *entry, + rspamd_fstring_t *str) { struct rspamd_http_message *msg; - msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->date = time (NULL); + msg = rspamd_http_new_message(HTTP_RESPONSE); + msg->date = time(NULL); msg->code = 200; - msg->status = rspamd_fstring_new_init ("OK", 2); - - rspamd_http_message_set_body_from_fstring_steal (msg, - rspamd_controller_maybe_compress (entry, str, msg)); - rspamd_http_connection_reset (entry->conn); - rspamd_http_router_insert_headers (entry->rt, msg); - rspamd_http_connection_write_message (entry->conn, - msg, - NULL, - "application/openmetrics-text; version=1.0.0; charset=utf-8", - entry, - entry->rt->timeout); + msg->status = rspamd_fstring_new_init("OK", 2); + + rspamd_http_message_set_body_from_fstring_steal(msg, + rspamd_controller_maybe_compress(entry, str, msg)); + rspamd_http_connection_reset(entry->conn); + rspamd_http_router_insert_headers(entry->rt, msg); + rspamd_http_connection_write_message(entry->conn, + msg, + NULL, + "application/openmetrics-text; version=1.0.0; charset=utf-8", + entry, + entry->rt->timeout); entry->is_reply = TRUE; } -void -rspamd_controller_send_string (struct rspamd_http_connection_entry *entry, - const gchar *str) +void rspamd_controller_send_string(struct rspamd_http_connection_entry *entry, + const gchar *str) { struct rspamd_http_message *msg; rspamd_fstring_t *reply; - msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->date = time (NULL); + msg = rspamd_http_new_message(HTTP_RESPONSE); + msg->date = time(NULL); msg->code = 200; - msg->status = rspamd_fstring_new_init ("OK", 2); + msg->status = rspamd_fstring_new_init("OK", 2); if (str) { - reply = rspamd_fstring_new_init (str, strlen (str)); + reply = rspamd_fstring_new_init(str, strlen(str)); } else { - reply = rspamd_fstring_new_init ("null", 4); - } - - rspamd_http_message_set_body_from_fstring_steal (msg, - rspamd_controller_maybe_compress (entry, reply, msg)); - rspamd_http_connection_reset (entry->conn); - rspamd_http_router_insert_headers (entry->rt, msg); - rspamd_http_connection_write_message (entry->conn, - msg, - NULL, - "application/json", - entry, - entry->rt->timeout); + reply = rspamd_fstring_new_init("null", 4); + } + + rspamd_http_message_set_body_from_fstring_steal(msg, + rspamd_controller_maybe_compress(entry, reply, msg)); + rspamd_http_connection_reset(entry->conn); + rspamd_http_router_insert_headers(entry->rt, msg); + rspamd_http_connection_write_message(entry->conn, + msg, + NULL, + "application/json", + entry, + entry->rt->timeout); entry->is_reply = TRUE; } -void -rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry, - ucl_object_t *obj) +void rspamd_controller_send_ucl(struct rspamd_http_connection_entry *entry, + ucl_object_t *obj) { struct rspamd_http_message *msg; rspamd_fstring_t *reply; - msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->date = time (NULL); + msg = rspamd_http_new_message(HTTP_RESPONSE); + msg->date = time(NULL); msg->code = 200; - msg->status = rspamd_fstring_new_init ("OK", 2); - reply = rspamd_fstring_sized_new (BUFSIZ); - rspamd_ucl_emit_fstring (obj, UCL_EMIT_JSON_COMPACT, &reply); - rspamd_http_message_set_body_from_fstring_steal (msg, - rspamd_controller_maybe_compress (entry, reply, msg)); - rspamd_http_connection_reset (entry->conn); - rspamd_http_router_insert_headers (entry->rt, msg); - rspamd_http_connection_write_message (entry->conn, - msg, - NULL, - "application/json", - entry, - entry->rt->timeout); + msg->status = rspamd_fstring_new_init("OK", 2); + reply = rspamd_fstring_sized_new(BUFSIZ); + rspamd_ucl_emit_fstring(obj, UCL_EMIT_JSON_COMPACT, &reply); + rspamd_http_message_set_body_from_fstring_steal(msg, + rspamd_controller_maybe_compress(entry, reply, msg)); + rspamd_http_connection_reset(entry->conn); + rspamd_http_router_insert_headers(entry->rt, msg); + rspamd_http_connection_write_message(entry->conn, + msg, + NULL, + "application/json", + entry, + entry->rt->timeout); entry->is_reply = TRUE; } static void -rspamd_worker_drop_priv (struct rspamd_main *rspamd_main) +rspamd_worker_drop_priv(struct rspamd_main *rspamd_main) { if (rspamd_main->is_privileged) { - if (setgid (rspamd_main->workers_gid) == -1) { - msg_err_main ("cannot setgid to %d (%s), aborting", - (gint) rspamd_main->workers_gid, - strerror (errno)); - exit (-errno); + if (setgid(rspamd_main->workers_gid) == -1) { + msg_err_main("cannot setgid to %d (%s), aborting", + (gint) rspamd_main->workers_gid, + strerror(errno)); + exit(-errno); } if (rspamd_main->cfg->rspamd_user && - initgroups (rspamd_main->cfg->rspamd_user, - rspamd_main->workers_gid) == -1) { - msg_err_main ("initgroups failed (%s), aborting", strerror (errno)); - exit (-errno); + initgroups(rspamd_main->cfg->rspamd_user, + rspamd_main->workers_gid) == -1) { + msg_err_main("initgroups failed (%s), aborting", strerror(errno)); + exit(-errno); } - if (setuid (rspamd_main->workers_uid) == -1) { - msg_err_main ("cannot setuid to %d (%s), aborting", - (gint) rspamd_main->workers_uid, - strerror (errno)); - exit (-errno); + if (setuid(rspamd_main->workers_uid) == -1) { + msg_err_main("cannot setuid to %d (%s), aborting", + (gint) rspamd_main->workers_uid, + strerror(errno)); + exit(-errno); } } } static void -rspamd_worker_set_limits (struct rspamd_main *rspamd_main, - struct rspamd_worker_conf *cf) +rspamd_worker_set_limits(struct rspamd_main *rspamd_main, + struct rspamd_worker_conf *cf) { struct rlimit rlmt; @@ -720,47 +717,47 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main, rlmt.rlim_cur = (rlim_t) cf->rlimit_nofile; rlmt.rlim_max = (rlim_t) cf->rlimit_nofile; - if (setrlimit (RLIMIT_NOFILE, &rlmt) == -1) { - msg_warn_main ("cannot set files rlimit: %L, %s", - cf->rlimit_nofile, - strerror (errno)); + if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) { + msg_warn_main("cannot set files rlimit: %L, %s", + cf->rlimit_nofile, + strerror(errno)); } - memset (&rlmt, 0, sizeof (rlmt)); + memset(&rlmt, 0, sizeof(rlmt)); - if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) { - msg_warn_main ("cannot get max files rlimit: %HL, %s", - cf->rlimit_maxcore, - strerror (errno)); + if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { + msg_warn_main("cannot get max files rlimit: %HL, %s", + cf->rlimit_maxcore, + strerror(errno)); } else { - msg_info_main ("set max file descriptors limit: %HL cur and %HL max", - (guint64) rlmt.rlim_cur, - (guint64) rlmt.rlim_max); + msg_info_main("set max file descriptors limit: %HL cur and %HL max", + (guint64) rlmt.rlim_cur, + (guint64) rlmt.rlim_max); } } else { /* Just report */ - if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) { - msg_warn_main ("cannot get max files rlimit: %HL, %s", - cf->rlimit_maxcore, - strerror (errno)); + if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { + msg_warn_main("cannot get max files rlimit: %HL, %s", + cf->rlimit_maxcore, + strerror(errno)); } else { - msg_info_main ("use system max file descriptors limit: %HL cur and %HL max", - (guint64) rlmt.rlim_cur, - (guint64) rlmt.rlim_max); + msg_info_main("use system max file descriptors limit: %HL cur and %HL max", + (guint64) rlmt.rlim_cur, + (guint64) rlmt.rlim_max); } } if (rspamd_main->cores_throttling) { - msg_info_main ("disable core files for the new worker as limits are reached"); + msg_info_main("disable core files for the new worker as limits are reached"); rlmt.rlim_cur = 0; rlmt.rlim_max = 0; - if (setrlimit (RLIMIT_CORE, &rlmt) == -1) { - msg_warn_main ("cannot disable core dumps: error when setting limits: %s", - strerror (errno)); + if (setrlimit(RLIMIT_CORE, &rlmt) == -1) { + msg_warn_main("cannot disable core dumps: error when setting limits: %s", + strerror(errno)); } } else { @@ -768,64 +765,64 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main, rlmt.rlim_cur = (rlim_t) cf->rlimit_maxcore; rlmt.rlim_max = (rlim_t) cf->rlimit_maxcore; - if (setrlimit (RLIMIT_CORE, &rlmt) == -1) { - msg_warn_main ("cannot set max core size limit: %HL, %s", - cf->rlimit_maxcore, - strerror (errno)); + if (setrlimit(RLIMIT_CORE, &rlmt) == -1) { + msg_warn_main("cannot set max core size limit: %HL, %s", + cf->rlimit_maxcore, + strerror(errno)); } /* Ensure that we did it */ - memset (&rlmt, 0, sizeof (rlmt)); + memset(&rlmt, 0, sizeof(rlmt)); - if (getrlimit (RLIMIT_CORE, &rlmt) == -1) { - msg_warn_main ("cannot get max core size rlimit: %HL, %s", - cf->rlimit_maxcore, - strerror (errno)); + if (getrlimit(RLIMIT_CORE, &rlmt) == -1) { + msg_warn_main("cannot get max core size rlimit: %HL, %s", + cf->rlimit_maxcore, + strerror(errno)); } else { if (rlmt.rlim_cur != cf->rlimit_maxcore || rlmt.rlim_max != cf->rlimit_maxcore) { - msg_warn_main ("setting of core file limits was unsuccessful: " - "%HL was wanted, " - "but we have %HL cur and %HL max", - cf->rlimit_maxcore, - (guint64) rlmt.rlim_cur, - (guint64) rlmt.rlim_max); + msg_warn_main("setting of core file limits was unsuccessful: " + "%HL was wanted, " + "but we have %HL cur and %HL max", + cf->rlimit_maxcore, + (guint64) rlmt.rlim_cur, + (guint64) rlmt.rlim_max); } else { - msg_info_main ("set max core size limit: %HL cur and %HL max", - (guint64) rlmt.rlim_cur, - (guint64) rlmt.rlim_max); + msg_info_main("set max core size limit: %HL cur and %HL max", + (guint64) rlmt.rlim_cur, + (guint64) rlmt.rlim_max); } } } else { /* Just report */ - if (getrlimit (RLIMIT_CORE, &rlmt) == -1) { - msg_warn_main ("cannot get max core size limit: %HL, %s", - cf->rlimit_maxcore, - strerror (errno)); + if (getrlimit(RLIMIT_CORE, &rlmt) == -1) { + msg_warn_main("cannot get max core size limit: %HL, %s", + cf->rlimit_maxcore, + strerror(errno)); } else { - msg_info_main ("use system max core size limit: %HL cur and %HL max", - (guint64) rlmt.rlim_cur, - (guint64) rlmt.rlim_max); + msg_info_main("use system max core size limit: %HL cur and %HL max", + (guint64) rlmt.rlim_cur, + (guint64) rlmt.rlim_max); } } } } static void -rspamd_worker_on_term (EV_P_ ev_child *w, int revents) +rspamd_worker_on_term(EV_P_ ev_child *w, int revents) { - struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; + struct rspamd_worker *wrk = (struct rspamd_worker *) w->data; - if (wrk->ppid == getpid ()) { + if (wrk->ppid == getpid()) { if (wrk->term_handler) { - wrk->term_handler (EV_A_ w, wrk->srv, wrk); + wrk->term_handler(EV_A_ w, wrk->srv, wrk); } else { - rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); + rspamd_check_termination_clause(wrk->srv, wrk, w->rstatus); } } else { @@ -834,30 +831,30 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents) } static void -rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents) +rspamd_worker_heartbeat_cb(EV_P_ ev_timer *w, int revents) { - struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; + struct rspamd_worker *wrk = (struct rspamd_worker *) w->data; struct rspamd_srv_command cmd; - memset (&cmd, 0, sizeof (cmd)); + memset(&cmd, 0, sizeof(cmd)); cmd.type = RSPAMD_SRV_HEARTBEAT; - rspamd_srv_send_command (wrk, EV_A, &cmd, -1, NULL, NULL); + rspamd_srv_send_command(wrk, EV_A, &cmd, -1, NULL, NULL); } static void -rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop) +rspamd_worker_heartbeat_start(struct rspamd_worker *wrk, struct ev_loop *event_loop) { - wrk->hb.heartbeat_ev.data = (void *)wrk; - ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb, - 0.0, wrk->srv->cfg->heartbeat_interval); - ev_timer_start (event_loop, &wrk->hb.heartbeat_ev); + wrk->hb.heartbeat_ev.data = (void *) wrk; + ev_timer_init(&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb, + 0.0, wrk->srv->cfg->heartbeat_interval); + ev_timer_start(event_loop, &wrk->hb.heartbeat_ev); } static void -rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents) +rspamd_main_heartbeat_cb(EV_P_ ev_timer *w, int revents) { - struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; - gdouble time_from_last = ev_time (); + struct rspamd_worker *wrk = (struct rspamd_worker *) w->data; + gdouble time_from_last = ev_time(); struct rspamd_main *rspamd_main; static struct rspamd_control_command cmd; struct tm tm; @@ -872,105 +869,104 @@ rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents) time_from_last > 0 && time_from_last >= rspamd_main->cfg->heartbeat_interval * 2) { - rspamd_localtime (wrk->hb.last_event, &tm); - r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm); - rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f", - wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event); - rspamd_snprintf (timebuf + r, sizeof (timebuf) - r, - "%s", usec_buf + 1); + rspamd_localtime(wrk->hb.last_event, &tm); + r = strftime(timebuf, sizeof(timebuf), "%F %H:%M:%S", &tm); + rspamd_snprintf(usec_buf, sizeof(usec_buf), "%.5f", + wrk->hb.last_event - (gdouble) (time_t) wrk->hb.last_event); + rspamd_snprintf(timebuf + r, sizeof(timebuf) - r, + "%s", usec_buf + 1); if (wrk->hb.nbeats > 0) { /* First time lost event */ cmd.type = RSPAMD_CONTROL_CHILD_CHANGE; cmd.cmd.child_change.what = rspamd_child_offline; cmd.cmd.child_change.pid = wrk->pid; - rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid); - msg_warn_main ("lost heartbeat from worker type %s with pid %P, " - "last beat on: %s (%L beats received previously)", - g_quark_to_string (wrk->type), wrk->pid, - timebuf, - wrk->hb.nbeats); + rspamd_control_broadcast_srv_cmd(rspamd_main, &cmd, wrk->pid); + msg_warn_main("lost heartbeat from worker type %s with pid %P, " + "last beat on: %s (%L beats received previously)", + g_quark_to_string(wrk->type), wrk->pid, + timebuf, + wrk->hb.nbeats); wrk->hb.nbeats = -1; /* TODO: send notify about worker problem */ } else { - wrk->hb.nbeats --; - msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, " - "last beat on: %s", - -(wrk->hb.nbeats), - g_quark_to_string (wrk->type), - wrk->pid, - timebuf); + wrk->hb.nbeats--; + msg_warn_main("lost %L heartbeat from worker type %s with pid %P, " + "last beat on: %s", + -(wrk->hb.nbeats), + g_quark_to_string(wrk->type), + wrk->pid, + timebuf); if (rspamd_main->cfg->heartbeats_loss_max > 0 && -(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) { if (-(wrk->hb.nbeats) > rspamd_main->cfg->heartbeats_loss_max + 1) { - msg_err_main ("force kill worker type %s with pid %P, " - "last beat on: %s; %L heartbeat lost", - g_quark_to_string (wrk->type), - wrk->pid, - timebuf, - -(wrk->hb.nbeats)); - kill (wrk->pid, SIGKILL); + msg_err_main("force kill worker type %s with pid %P, " + "last beat on: %s; %L heartbeat lost", + g_quark_to_string(wrk->type), + wrk->pid, + timebuf, + -(wrk->hb.nbeats)); + kill(wrk->pid, SIGKILL); } else { - msg_err_main ("terminate worker type %s with pid %P, " - "last beat on: %s; %L heartbeat lost", - g_quark_to_string (wrk->type), - wrk->pid, - timebuf, - -(wrk->hb.nbeats)); - kill (wrk->pid, SIGTERM); + msg_err_main("terminate worker type %s with pid %P, " + "last beat on: %s; %L heartbeat lost", + g_quark_to_string(wrk->type), + wrk->pid, + timebuf, + -(wrk->hb.nbeats)); + kill(wrk->pid, SIGTERM); } - } } } else if (wrk->hb.nbeats < 0) { - rspamd_localtime (wrk->hb.last_event, &tm); - r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm); - rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f", - wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event); - rspamd_snprintf (timebuf + r, sizeof (timebuf) - r, - "%s", usec_buf + 1); + rspamd_localtime(wrk->hb.last_event, &tm); + r = strftime(timebuf, sizeof(timebuf), "%F %H:%M:%S", &tm); + rspamd_snprintf(usec_buf, sizeof(usec_buf), "%.5f", + wrk->hb.last_event - (gdouble) (time_t) wrk->hb.last_event); + rspamd_snprintf(timebuf + r, sizeof(timebuf) - r, + "%s", usec_buf + 1); cmd.type = RSPAMD_CONTROL_CHILD_CHANGE; cmd.cmd.child_change.what = rspamd_child_online; cmd.cmd.child_change.pid = wrk->pid; - rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid); - msg_info_main ("received heartbeat from worker type %s with pid %P, " - "last beat on: %s (%L beats lost previously)", - g_quark_to_string (wrk->type), wrk->pid, - timebuf, - -(wrk->hb.nbeats)); + rspamd_control_broadcast_srv_cmd(rspamd_main, &cmd, wrk->pid); + msg_info_main("received heartbeat from worker type %s with pid %P, " + "last beat on: %s (%L beats lost previously)", + g_quark_to_string(wrk->type), wrk->pid, + timebuf, + -(wrk->hb.nbeats)); wrk->hb.nbeats = 1; /* TODO: send notify about worker restoration */ } } static void -rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop) +rspamd_main_heartbeat_start(struct rspamd_worker *wrk, struct ev_loop *event_loop) { - wrk->hb.heartbeat_ev.data = (void *)wrk; - ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb, - 0.0, wrk->srv->cfg->heartbeat_interval * 2); - ev_timer_start (event_loop, &wrk->hb.heartbeat_ev); + wrk->hb.heartbeat_ev.data = (void *) wrk; + ev_timer_init(&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb, + 0.0, wrk->srv->cfg->heartbeat_interval * 2); + ev_timer_start(event_loop, &wrk->hb.heartbeat_ev); } static bool -rspamd_maybe_reuseport_socket (struct rspamd_worker_listen_socket *ls) +rspamd_maybe_reuseport_socket(struct rspamd_worker_listen_socket *ls) { if (ls->is_systemd) { /* No need to reuseport */ return true; } - if (ls->fd != -1 && rspamd_inet_address_get_af (ls->addr) == AF_UNIX) { + if (ls->fd != -1 && rspamd_inet_address_get_af(ls->addr) == AF_UNIX) { /* Just try listen */ - if (listen (ls->fd, -1) == -1) { + if (listen(ls->fd, -1) == -1) { return false; } @@ -981,19 +977,19 @@ rspamd_maybe_reuseport_socket (struct rspamd_worker_listen_socket *ls) gint nfd = -1; if (ls->type == RSPAMD_WORKER_SOCKET_UDP) { - nfd = rspamd_inet_address_listen (ls->addr, - (ls->type == RSPAMD_WORKER_SOCKET_UDP ? SOCK_DGRAM : SOCK_STREAM), - RSPAMD_INET_ADDRESS_LISTEN_ASYNC|RSPAMD_INET_ADDRESS_LISTEN_REUSEPORT, - -1); + nfd = rspamd_inet_address_listen(ls->addr, + (ls->type == RSPAMD_WORKER_SOCKET_UDP ? SOCK_DGRAM : SOCK_STREAM), + RSPAMD_INET_ADDRESS_LISTEN_ASYNC | RSPAMD_INET_ADDRESS_LISTEN_REUSEPORT, + -1); if (nfd == -1) { - msg_warn ("cannot create reuseport listen socket for %d: %s", - ls->fd, strerror (errno)); + msg_warn("cannot create reuseport listen socket for %d: %s", + ls->fd, strerror(errno)); nfd = ls->fd; } else { if (ls->fd != -1) { - close (ls->fd); + close(ls->fd); } ls->fd = nfd; nfd = -1; @@ -1031,28 +1027,28 @@ rspamd_maybe_reuseport_socket (struct rspamd_worker_listen_socket *ls) * @param listen_sockets */ static void __attribute__((noreturn)) -rspamd_handle_child_fork (struct rspamd_worker *wrk, - struct rspamd_main *rspamd_main, - struct rspamd_worker_conf *cf, - GHashTable *listen_sockets) +rspamd_handle_child_fork(struct rspamd_worker *wrk, + struct rspamd_main *rspamd_main, + struct rspamd_worker_conf *cf, + GHashTable *listen_sockets) { gint rc; struct rlimit rlim; /* Update pid for logging */ - rspamd_log_on_fork (cf->type, rspamd_main->cfg, rspamd_main->logger); - wrk->pid = getpid (); + rspamd_log_on_fork(cf->type, rspamd_main->cfg, rspamd_main->logger); + wrk->pid = getpid(); /* Init PRNG after fork */ - rc = ottery_init (rspamd_main->cfg->libs_ctx->ottery_cfg); + rc = ottery_init(rspamd_main->cfg->libs_ctx->ottery_cfg); if (rc != OTTERY_ERR_NONE) { - msg_err_main ("cannot initialize PRNG: %d", rc); - abort (); + msg_err_main("cannot initialize PRNG: %d", rc); + abort(); } - rspamd_random_seed_fast (); + rspamd_random_seed_fast(); #ifdef HAVE_EVUTIL_RNG_INIT - evutil_secure_rng_init (); + evutil_secure_rng_init(); #endif /* @@ -1060,12 +1056,12 @@ rspamd_handle_child_fork (struct rspamd_worker *wrk, * previous handlers must be explicitly detached and forgotten * before starting a new loop */ - ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev); - ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev); - ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev); - ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev); + ev_signal_stop(rspamd_main->event_loop, &rspamd_main->int_ev); + ev_signal_stop(rspamd_main->event_loop, &rspamd_main->term_ev); + ev_signal_stop(rspamd_main->event_loop, &rspamd_main->hup_ev); + ev_signal_stop(rspamd_main->event_loop, &rspamd_main->usr1_ev); /* Remove the inherited event base */ - ev_loop_destroy (rspamd_main->event_loop); + ev_loop_destroy(rspamd_main->event_loop); rspamd_main->event_loop = NULL; /* Close unused sockets */ @@ -1073,32 +1069,32 @@ rspamd_handle_child_fork (struct rspamd_worker *wrk, gpointer k, v; - g_hash_table_iter_init (&it, listen_sockets); + g_hash_table_iter_init(&it, listen_sockets); /* * Close listen sockets of not our process (inherited from other forks) */ - while (g_hash_table_iter_next (&it, &k, &v)) { - GList *elt = (GList *)v; + while (g_hash_table_iter_next(&it, &k, &v)) { + GList *elt = (GList *) v; GList *our = cf->listen_socks; - if (g_list_position (our, elt) == -1) { + if (g_list_position(our, elt) == -1) { GList *cur = elt; while (cur) { struct rspamd_worker_listen_socket *ls = - (struct rspamd_worker_listen_socket *)cur->data; + (struct rspamd_worker_listen_socket *) cur->data; - if (ls->fd != -1 && close (ls->fd) == -1) { - msg_err ("cannot close fd %d (addr = %s): %s", + if (ls->fd != -1 && close(ls->fd) == -1) { + msg_err("cannot close fd %d (addr = %s): %s", ls->fd, - rspamd_inet_address_to_string_pretty (ls->addr), - strerror (errno)); + rspamd_inet_address_to_string_pretty(ls->addr), + strerror(errno)); } ls->fd = -1; - cur = g_list_next (cur); + cur = g_list_next(cur); } } } @@ -1108,97 +1104,98 @@ rspamd_handle_child_fork (struct rspamd_worker *wrk, while (cur) { struct rspamd_worker_listen_socket *ls = - (struct rspamd_worker_listen_socket *)cur->data; + (struct rspamd_worker_listen_socket *) cur->data; - if (!rspamd_maybe_reuseport_socket (ls)) { - msg_err ("cannot listen on socket %s: %s", - rspamd_inet_address_to_string_pretty (ls->addr), - strerror (errno)); + if (!rspamd_maybe_reuseport_socket(ls)) { + msg_err("cannot listen on socket %s: %s", + rspamd_inet_address_to_string_pretty(ls->addr), + strerror(errno)); } - cur = g_list_next (cur); + cur = g_list_next(cur); } /* Drop privileges */ - rspamd_worker_drop_priv (rspamd_main); + rspamd_worker_drop_priv(rspamd_main); /* Set limits */ - rspamd_worker_set_limits (rspamd_main, cf); + rspamd_worker_set_limits(rspamd_main, cf); /* Re-set stack limit */ - getrlimit (RLIMIT_STACK, &rlim); + getrlimit(RLIMIT_STACK, &rlim); rlim.rlim_cur = 100 * 1024 * 1024; rlim.rlim_max = rlim.rlim_cur; - setrlimit (RLIMIT_STACK, &rlim); + setrlimit(RLIMIT_STACK, &rlim); if (cf->bind_conf) { - setproctitle ("%s process (%s)", cf->worker->name, - cf->bind_conf->bind_line); + setproctitle("%s process (%s)", cf->worker->name, + cf->bind_conf->bind_line); } else { - setproctitle ("%s process", cf->worker->name); + setproctitle("%s process", cf->worker->name); } if (rspamd_main->pfh) { - rspamd_pidfile_close (rspamd_main->pfh); + rspamd_pidfile_close(rspamd_main->pfh); } if (rspamd_main->cfg->log_silent_workers) { - rspamd_log_set_log_level (rspamd_main->logger, G_LOG_LEVEL_MESSAGE); + rspamd_log_set_log_level(rspamd_main->logger, G_LOG_LEVEL_MESSAGE); } - wrk->start_time = rspamd_get_calendar_ticks (); + wrk->start_time = rspamd_get_calendar_ticks(); if (cf->bind_conf) { - GString *listen_conf_stringified = g_string_new (NULL); + GString *listen_conf_stringified = g_string_new(NULL); struct rspamd_worker_bind_conf *cur_conf; - LL_FOREACH (cf->bind_conf, cur_conf) { + LL_FOREACH(cf->bind_conf, cur_conf) + { if (cur_conf->next) { - rspamd_printf_gstring (listen_conf_stringified, "%s, ", - cur_conf->bind_line); + rspamd_printf_gstring(listen_conf_stringified, "%s, ", + cur_conf->bind_line); } else { - rspamd_printf_gstring (listen_conf_stringified, "%s", - cur_conf->bind_line); + rspamd_printf_gstring(listen_conf_stringified, "%s", + cur_conf->bind_line); } } - msg_info_main ("starting %s process %P (%d); listen on: %v", - cf->worker->name, - getpid (), wrk->index, listen_conf_stringified); - g_string_free (listen_conf_stringified, TRUE); + msg_info_main("starting %s process %P (%d); listen on: %v", + cf->worker->name, + getpid(), wrk->index, listen_conf_stringified); + g_string_free(listen_conf_stringified, TRUE); } else { - msg_info_main ("starting %s process %P (%d); no listen", - cf->worker->name, - getpid (), wrk->index); + msg_info_main("starting %s process %P (%d); no listen", + cf->worker->name, + getpid(), wrk->index); } /* Close parent part of socketpair */ - close (wrk->control_pipe[0]); - close (wrk->srv_pipe[0]); + close(wrk->control_pipe[0]); + close(wrk->srv_pipe[0]); /* * Read comments in `rspamd_handle_main_fork` for details why these channel * is blocking. */ - rspamd_socket_nonblocking (wrk->control_pipe[1]); + rspamd_socket_nonblocking(wrk->control_pipe[1]); #if 0 rspamd_socket_nonblocking (wrk->srv_pipe[1]); #endif rspamd_main->cfg->cur_worker = wrk; /* Execute worker (this function should not return normally!) */ - cf->worker->worker_start_func (wrk); + cf->worker->worker_start_func(wrk); /* To distinguish from normal termination */ - exit (EXIT_FAILURE); + exit(EXIT_FAILURE); } static void -rspamd_handle_main_fork (struct rspamd_worker *wrk, - struct rspamd_main *rspamd_main, - struct rspamd_worker_conf *cf, - struct ev_loop *ev_base) +rspamd_handle_main_fork(struct rspamd_worker *wrk, + struct rspamd_main *rspamd_main, + struct rspamd_worker_conf *cf, + struct ev_loop *ev_base) { /* Close worker part of socketpair */ - close (wrk->control_pipe[1]); - close (wrk->srv_pipe[1]); + close(wrk->control_pipe[1]); + close(wrk->srv_pipe[1]); /* * There are no reasons why control pipes are blocking: the messages @@ -1214,18 +1211,18 @@ rspamd_handle_main_fork (struct rspamd_worker *wrk, #if 0 rspamd_socket_nonblocking (wrk->srv_pipe[0]); #endif - rspamd_socket_nonblocking (wrk->control_pipe[0]); + rspamd_socket_nonblocking(wrk->control_pipe[0]); - rspamd_srv_start_watching (rspamd_main, wrk, ev_base); + rspamd_srv_start_watching(rspamd_main, wrk, ev_base); /* Child event */ wrk->cld_ev.data = wrk; - ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0); - ev_child_start (rspamd_main->event_loop, &wrk->cld_ev); + ev_child_init(&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0); + ev_child_start(rspamd_main->event_loop, &wrk->cld_ev); /* Heartbeats */ - rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop); + rspamd_main_heartbeat_start(wrk, rspamd_main->event_loop); /* Insert worker into worker's table, pid is index */ - g_hash_table_insert (rspamd_main->workers, - GSIZE_TO_POINTER (wrk->pid), wrk); + g_hash_table_insert(rspamd_main->workers, + GSIZE_TO_POINTER(wrk->pid), wrk); #if defined(SO_REUSEPORT) && defined(SO_REUSEADDR) && defined(LINUX) /* @@ -1236,14 +1233,14 @@ rspamd_handle_main_fork (struct rspamd_worker *wrk, while (cur) { struct rspamd_worker_listen_socket *ls = - (struct rspamd_worker_listen_socket *)cur->data; + (struct rspamd_worker_listen_socket *) cur->data; if (ls->fd != -1 && ls->type == RSPAMD_WORKER_SOCKET_UDP) { - close (ls->fd); + close(ls->fd); ls->fd = -1; } - cur = g_list_next (cur); + cur = g_list_next(cur); } #endif } @@ -1252,105 +1249,102 @@ rspamd_handle_main_fork (struct rspamd_worker *wrk, #define SOCK_SEQPACKET SOCK_DGRAM #endif struct rspamd_worker * -rspamd_fork_worker (struct rspamd_main *rspamd_main, - struct rspamd_worker_conf *cf, - guint index, - struct ev_loop *ev_base, - rspamd_worker_term_cb term_handler, - GHashTable *listen_sockets) +rspamd_fork_worker(struct rspamd_main *rspamd_main, + struct rspamd_worker_conf *cf, + guint index, + struct ev_loop *ev_base, + rspamd_worker_term_cb term_handler, + GHashTable *listen_sockets) { struct rspamd_worker *wrk; /* Starting worker process */ - wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker)); + wrk = (struct rspamd_worker *) g_malloc0(sizeof(struct rspamd_worker)); - if (!rspamd_socketpair (wrk->control_pipe, SOCK_SEQPACKET)) { - msg_err ("socketpair failure: %s", strerror (errno)); - rspamd_hard_terminate (rspamd_main); + if (!rspamd_socketpair(wrk->control_pipe, SOCK_SEQPACKET)) { + msg_err("socketpair failure: %s", strerror(errno)); + rspamd_hard_terminate(rspamd_main); } - if (!rspamd_socketpair (wrk->srv_pipe, SOCK_SEQPACKET)) { - msg_err ("socketpair failure: %s", strerror (errno)); - rspamd_hard_terminate (rspamd_main); + if (!rspamd_socketpair(wrk->srv_pipe, SOCK_SEQPACKET)) { + msg_err("socketpair failure: %s", strerror(errno)); + rspamd_hard_terminate(rspamd_main); } if (cf->bind_conf) { - msg_info_main ("prepare to fork process %s (%d); listen on: %s", - cf->worker->name, - index, cf->bind_conf->name); + msg_info_main("prepare to fork process %s (%d); listen on: %s", + cf->worker->name, + index, cf->bind_conf->name); } else { - msg_info_main ("prepare to fork process %s (%d), no bind socket", - cf->worker->name, - index); + msg_info_main("prepare to fork process %s (%d), no bind socket", + cf->worker->name, + index); } wrk->srv = rspamd_main; wrk->type = cf->type; wrk->cf = cf; wrk->flags = cf->worker->flags; - REF_RETAIN (cf); + REF_RETAIN(cf); wrk->index = index; wrk->ctx = cf->ctx; - wrk->ppid = getpid (); - wrk->pid = fork (); + wrk->ppid = getpid(); + wrk->pid = fork(); wrk->cores_throttled = rspamd_main->cores_throttling; wrk->term_handler = term_handler; - wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal, - NULL, rspamd_pending_control_free); + wrk->control_events_pending = g_hash_table_new_full(g_direct_hash, g_direct_equal, + NULL, rspamd_pending_control_free); switch (wrk->pid) { case 0: rspamd_current_worker = wrk; - rspamd_handle_child_fork (wrk, rspamd_main, cf, listen_sockets); + rspamd_handle_child_fork(wrk, rspamd_main, cf, listen_sockets); break; case -1: - msg_err_main ("cannot fork main process: %s", strerror (errno)); + msg_err_main("cannot fork main process: %s", strerror(errno)); if (rspamd_main->pfh) { - rspamd_pidfile_remove (rspamd_main->pfh); + rspamd_pidfile_remove(rspamd_main->pfh); } - rspamd_hard_terminate (rspamd_main); + rspamd_hard_terminate(rspamd_main); break; default: - rspamd_handle_main_fork (wrk, rspamd_main, cf, ev_base); + rspamd_handle_main_fork(wrk, rspamd_main, cf, ev_base); break; } return wrk; } -void -rspamd_worker_block_signals (void) +void rspamd_worker_block_signals(void) { sigset_t set; - sigemptyset (&set); - sigaddset (&set, SIGTERM); - sigaddset (&set, SIGINT); - sigaddset (&set, SIGHUP); - sigaddset (&set, SIGUSR1); - sigaddset (&set, SIGUSR2); - sigprocmask (SIG_BLOCK, &set, NULL); + sigemptyset(&set); + sigaddset(&set, SIGTERM); + sigaddset(&set, SIGINT); + sigaddset(&set, SIGHUP); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGUSR2); + sigprocmask(SIG_BLOCK, &set, NULL); } -void -rspamd_worker_unblock_signals (void) +void rspamd_worker_unblock_signals(void) { sigset_t set; - sigemptyset (&set); - sigaddset (&set, SIGTERM); - sigaddset (&set, SIGINT); - sigaddset (&set, SIGHUP); - sigaddset (&set, SIGUSR1); - sigaddset (&set, SIGUSR2); - sigprocmask (SIG_UNBLOCK, &set, NULL); + sigemptyset(&set); + sigaddset(&set, SIGTERM); + sigaddset(&set, SIGINT); + sigaddset(&set, SIGHUP); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGUSR2); + sigprocmask(SIG_UNBLOCK, &set, NULL); } -void -rspamd_hard_terminate (struct rspamd_main *rspamd_main) +void rspamd_hard_terminate(struct rspamd_main *rspamd_main) { GHashTableIter it; gpointer k, v; @@ -1358,34 +1352,35 @@ rspamd_hard_terminate (struct rspamd_main *rspamd_main) sigset_t set; /* Block all signals */ - sigemptyset (&set); - sigaddset (&set, SIGTERM); - sigaddset (&set, SIGINT); - sigaddset (&set, SIGHUP); - sigaddset (&set, SIGUSR1); - sigaddset (&set, SIGUSR2); - sigaddset (&set, SIGCHLD); - sigprocmask (SIG_BLOCK, &set, NULL); + sigemptyset(&set); + sigaddset(&set, SIGTERM); + sigaddset(&set, SIGINT); + sigaddset(&set, SIGHUP); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGUSR2); + sigaddset(&set, SIGCHLD); + sigprocmask(SIG_BLOCK, &set, NULL); /* We need to terminate all workers that might be already spawned */ - rspamd_worker_block_signals (); - g_hash_table_iter_init (&it, rspamd_main->workers); + rspamd_worker_block_signals(); + g_hash_table_iter_init(&it, rspamd_main->workers); - while (g_hash_table_iter_next (&it, &k, &v)) { + while (g_hash_table_iter_next(&it, &k, &v)) { w = v; - msg_err_main ("kill worker %P as Rspamd is terminating due to " - "an unrecoverable error", w->pid); - kill (w->pid, SIGKILL); + msg_err_main("kill worker %P as Rspamd is terminating due to " + "an unrecoverable error", + w->pid); + kill(w->pid, SIGKILL); } - msg_err_main ("shutting down Rspamd due to fatal error"); + msg_err_main("shutting down Rspamd due to fatal error"); - rspamd_log_close (rspamd_main->logger); - exit (EXIT_FAILURE); + rspamd_log_close(rspamd_main->logger); + exit(EXIT_FAILURE); } gboolean -rspamd_worker_is_scanner (struct rspamd_worker *w) +rspamd_worker_is_scanner(struct rspamd_worker *w) { if (w) { @@ -1396,7 +1391,7 @@ rspamd_worker_is_scanner (struct rspamd_worker *w) } gboolean -rspamd_worker_is_primary_controller (struct rspamd_worker *w) +rspamd_worker_is_primary_controller(struct rspamd_worker *w) { if (w) { @@ -1407,7 +1402,7 @@ rspamd_worker_is_primary_controller (struct rspamd_worker *w) } gboolean -rspamd_worker_check_controller_presence (struct rspamd_worker *w) +rspamd_worker_check_controller_presence(struct rspamd_worker *w) { if (w->index == 0) { GQuark our_type = w->type; @@ -1426,7 +1421,7 @@ rspamd_worker_check_controller_presence (struct rspamd_worker *w) our_priority = high_priority_worker; } else { - msg_err ("function is called for a wrong worker type: %s", g_quark_to_string(our_type)); + msg_err("function is called for a wrong worker type: %s", g_quark_to_string(our_type)); return FALSE; } @@ -1435,7 +1430,7 @@ rspamd_worker_check_controller_presence (struct rspamd_worker *w) while (cur) { struct rspamd_worker_conf *cf; - cf = (struct rspamd_worker_conf *)cur->data; + cf = (struct rspamd_worker_conf *) cur->data; if (our_priority == low_priority_worker) { if ((cf->type == g_quark_from_static_string("controller")) || @@ -1456,12 +1451,12 @@ rspamd_worker_check_controller_presence (struct rspamd_worker *w) } } - cur = g_list_next (cur); + cur = g_list_next(cur); } if (!controller_seen) { - msg_info ("no controller or normal workers defined, execute " - "controller periodics in this worker"); + msg_info("no controller or normal workers defined, execute " + "controller periodics in this worker"); w->flags |= RSPAMD_WORKER_CONTROLLER; return TRUE; } @@ -1485,20 +1480,20 @@ struct rspamd_worker_session_cache { }; static gint -rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb) +rspamd_session_cache_sort_cmp(gconstpointer pa, gconstpointer pb) { const struct rspamd_worker_session_elt - *e1 = *(const struct rspamd_worker_session_elt **)pa, - *e2 = *(const struct rspamd_worker_session_elt **)pb; + *e1 = *(const struct rspamd_worker_session_elt **) pa, + *e2 = *(const struct rspamd_worker_session_elt **) pb; return e2->when < e1->when; } static void -rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents) +rspamd_sessions_cache_periodic(EV_P_ ev_timer *w, int revents) { struct rspamd_worker_session_cache *c = - (struct rspamd_worker_session_cache *)w->data; + (struct rspamd_worker_session_cache *) w->data; GHashTableIter it; gchar timebuf[32]; gpointer k, v; @@ -1507,83 +1502,82 @@ rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents) GPtrArray *res; guint i; - if (g_hash_table_size (c->cache) > c->cfg->max_sessions_cache) { - res = g_ptr_array_sized_new (g_hash_table_size (c->cache)); - g_hash_table_iter_init (&it, c->cache); + if (g_hash_table_size(c->cache) > c->cfg->max_sessions_cache) { + res = g_ptr_array_sized_new(g_hash_table_size(c->cache)); + g_hash_table_iter_init(&it, c->cache); - while (g_hash_table_iter_next (&it, &k, &v)) { - g_ptr_array_add (res, v); + while (g_hash_table_iter_next(&it, &k, &v)) { + g_ptr_array_add(res, v); } - msg_err ("sessions cache is overflowed %d elements where %d is limit", - (gint)res->len, (gint)c->cfg->max_sessions_cache); - g_ptr_array_sort (res, rspamd_session_cache_sort_cmp); + msg_err("sessions cache is overflowed %d elements where %d is limit", + (gint) res->len, (gint) c->cfg->max_sessions_cache); + g_ptr_array_sort(res, rspamd_session_cache_sort_cmp); - PTR_ARRAY_FOREACH (res, i, elt) { - rspamd_localtime (elt->when, &tms); - strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tms); + PTR_ARRAY_FOREACH(res, i, elt) + { + rspamd_localtime(elt->when, &tms); + strftime(timebuf, sizeof(timebuf), "%F %H:%M:%S", &tms); - msg_warn ("redundant session; ptr: %p, " - "tag: %s, refcount: %d, time: %s", - elt->ptr, elt->tag ? elt->tag : "unknown", - elt->pref ? *elt->pref : 0, - timebuf); + msg_warn("redundant session; ptr: %p, " + "tag: %s, refcount: %d, time: %s", + elt->ptr, elt->tag ? elt->tag : "unknown", + elt->pref ? *elt->pref : 0, + timebuf); } } - ev_timer_again (EV_A_ w); + ev_timer_again(EV_A_ w); } void * -rspamd_worker_session_cache_new (struct rspamd_worker *w, - struct ev_loop *ev_base) +rspamd_worker_session_cache_new(struct rspamd_worker *w, + struct ev_loop *ev_base) { struct rspamd_worker_session_cache *c; static const gdouble periodic_interval = 60.0; - c = g_malloc0 (sizeof (*c)); + c = g_malloc0(sizeof(*c)); c->ev_base = ev_base; - c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal, - NULL, g_free); + c->cache = g_hash_table_new_full(g_direct_hash, g_direct_equal, + NULL, g_free); c->cfg = w->srv->cfg; c->periodic.data = c; - ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval, - periodic_interval); - ev_timer_start (ev_base, &c->periodic); + ev_timer_init(&c->periodic, rspamd_sessions_cache_periodic, periodic_interval, + periodic_interval); + ev_timer_start(ev_base, &c->periodic); return c; } -void -rspamd_worker_session_cache_add (void *cache, const gchar *tag, - guint *pref, void *ptr) +void rspamd_worker_session_cache_add(void *cache, const gchar *tag, + guint *pref, void *ptr) { struct rspamd_worker_session_cache *c = cache; struct rspamd_worker_session_elt *elt; - elt = g_malloc0 (sizeof (*elt)); + elt = g_malloc0(sizeof(*elt)); elt->pref = pref; elt->ptr = ptr; elt->tag = tag; - elt->when = time (NULL); + elt->when = time(NULL); - g_hash_table_insert (c->cache, elt->ptr, elt); + g_hash_table_insert(c->cache, elt->ptr, elt); } -void -rspamd_worker_session_cache_remove (void *cache, void *ptr) +void rspamd_worker_session_cache_remove(void *cache, void *ptr) { struct rspamd_worker_session_cache *c = cache; - g_hash_table_remove (c->cache, ptr); + g_hash_table_remove(c->cache, ptr); } static void -rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx, - struct rspamd_monitored *m, gboolean alive, - void *ud) +rspamd_worker_monitored_on_change(struct rspamd_monitored_ctx *ctx, + struct rspamd_monitored *m, gboolean alive, + void *ud) { struct rspamd_worker *worker = ud; struct rspamd_config *cfg = worker->srv->cfg; @@ -1591,43 +1585,42 @@ rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx, guchar tag[RSPAMD_MONITORED_TAG_LEN]; static struct rspamd_srv_command srv_cmd; - rspamd_monitored_get_tag (m, tag); - ev_base = rspamd_monitored_ctx_get_ev_base (ctx); - memset (&srv_cmd, 0, sizeof (srv_cmd)); + rspamd_monitored_get_tag(m, tag); + ev_base = rspamd_monitored_ctx_get_ev_base(ctx); + memset(&srv_cmd, 0, sizeof(srv_cmd)); srv_cmd.type = RSPAMD_SRV_MONITORED_CHANGE; - rspamd_strlcpy (srv_cmd.cmd.monitored_change.tag, tag, - sizeof (srv_cmd.cmd.monitored_change.tag)); + rspamd_strlcpy(srv_cmd.cmd.monitored_change.tag, tag, + sizeof(srv_cmd.cmd.monitored_change.tag)); srv_cmd.cmd.monitored_change.alive = alive; - srv_cmd.cmd.monitored_change.sender = getpid (); - msg_info_config ("broadcast monitored update for %s: %s", - srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead"); + srv_cmd.cmd.monitored_change.sender = getpid(); + msg_info_config("broadcast monitored update for %s: %s", + srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead"); - rspamd_srv_send_command (worker, ev_base, &srv_cmd, -1, NULL, NULL); + rspamd_srv_send_command(worker, ev_base, &srv_cmd, -1, NULL, NULL); } -void -rspamd_worker_init_monitored (struct rspamd_worker *worker, - struct ev_loop *ev_base, - struct rspamd_dns_resolver *resolver) +void rspamd_worker_init_monitored(struct rspamd_worker *worker, + struct ev_loop *ev_base, + struct rspamd_dns_resolver *resolver) { - rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx, - worker->srv->cfg, ev_base, resolver->r, - rspamd_worker_monitored_on_change, worker); + rspamd_monitored_ctx_config(worker->srv->cfg->monitored_ctx, + worker->srv->cfg, ev_base, resolver->r, + rspamd_worker_monitored_on_change, worker); } #ifdef HAVE_SA_SIGINFO #ifdef WITH_LIBUNWIND static void -rspamd_print_crash (ucontext_t *uap) +rspamd_print_crash(ucontext_t *uap) { unw_cursor_t cursor; unw_word_t ip, off; guint level; gint ret; - if ((ret = unw_init_local (&cursor, uap)) != 0) { - msg_err ("unw_init_local: %d", ret); + if ((ret = unw_init_local(&cursor, uap)) != 0) { + msg_err("unw_init_local: %d", ret); return; } @@ -1642,18 +1635,19 @@ rspamd_print_crash (ucontext_t *uap) break; } - unw_get_reg (&cursor, UNW_REG_IP, &ip); - ret = unw_get_proc_name(&cursor, name, sizeof (name), &off); + unw_get_reg(&cursor, UNW_REG_IP, &ip); + ret = unw_get_proc_name(&cursor, name, sizeof(name), &off); if (ret == 0) { - msg_err ("%d: %p: %s()+0x%xl", - level, ip, name, (uintptr_t)off); - } else { - msg_err ("%d: %p: <unknown>", level, ip); + msg_err("%d: %p: %s()+0x%xl", + level, ip, name, (uintptr_t) off); + } + else { + msg_err("%d: %p: <unknown>", level, ip); } level++; - ret = unw_step (&cursor); + ret = unw_step(&cursor); if (ret <= 0) { break; @@ -1661,40 +1655,40 @@ rspamd_print_crash (ucontext_t *uap) } if (ret < 0) { - msg_err ("unw_step_ptr: %d", ret); + msg_err("unw_step_ptr: %d", ret); } } #endif static struct rspamd_main *saved_main = NULL; static gboolean -rspamd_crash_propagate (gpointer key, gpointer value, gpointer unused) +rspamd_crash_propagate(gpointer key, gpointer value, gpointer unused) { struct rspamd_worker *w = value; /* Kill children softly */ - kill (w->pid, SIGTERM); + kill(w->pid, SIGTERM); return TRUE; } static void -rspamd_crash_sig_handler (int sig, siginfo_t *info, void *ctx) +rspamd_crash_sig_handler(int sig, siginfo_t *info, void *ctx) { struct sigaction sa; ucontext_t *uap = ctx; pid_t pid; - pid = getpid (); - msg_err ("caught fatal signal %d(%s), " - "pid: %P, trace: ", - sig, strsignal (sig), pid); - (void)uap; + pid = getpid(); + msg_err("caught fatal signal %d(%s), " + "pid: %P, trace: ", + sig, strsignal(sig), pid); + (void) uap; #ifdef WITH_LIBUNWIND - rspamd_print_crash (uap); + rspamd_print_crash(uap); #endif - msg_err ("please see Rspamd FAQ to learn how to dump core files and how to " - "fill a bug report"); + msg_err("please see Rspamd FAQ to learn how to dump core files and how to " + "fill a bug report"); if (saved_main) { if (pid == saved_main->pid) { @@ -1702,24 +1696,24 @@ rspamd_crash_sig_handler (int sig, siginfo_t *info, void *ctx) * Main process has crashed, propagate crash further to trigger * monitoring alerts and mass panic */ - g_hash_table_foreach_remove (saved_main->workers, - rspamd_crash_propagate, NULL); + g_hash_table_foreach_remove(saved_main->workers, + rspamd_crash_propagate, NULL); } } /* * Invoke signal with the default handler */ - sigemptyset (&sa.sa_mask); + sigemptyset(&sa.sa_mask); sa.sa_handler = SIG_DFL; sa.sa_flags = 0; - sigaction (sig, &sa, NULL); - kill (pid, sig); + sigaction(sig, &sa, NULL); + kill(pid, sig); } #endif RSPAMD_NO_SANITIZE void -rspamd_set_crash_handler (struct rspamd_main *rspamd_main) +rspamd_set_crash_handler(struct rspamd_main *rspamd_main) { #ifdef HAVE_SA_SIGINFO struct sigaction sa; @@ -1727,31 +1721,31 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main) #ifdef HAVE_SIGALTSTACK void *stack_mem; stack_t ss; - memset (&ss, 0, sizeof ss); + memset(&ss, 0, sizeof ss); - ss.ss_size = MAX (SIGSTKSZ, 8192 * 4); - stack_mem = g_malloc0 (ss.ss_size); + ss.ss_size = MAX(SIGSTKSZ, 8192 * 4); + stack_mem = g_malloc0(ss.ss_size); ss.ss_sp = stack_mem; - sigaltstack (&ss, NULL); + sigaltstack(&ss, NULL); #endif saved_main = rspamd_main; - sigemptyset (&sa.sa_mask); + sigemptyset(&sa.sa_mask); sa.sa_sigaction = &rspamd_crash_sig_handler; sa.sa_flags = SA_RESTART | SA_SIGINFO | SA_ONSTACK; - sigaction (SIGSEGV, &sa, NULL); - sigaction (SIGBUS, &sa, NULL); - sigaction (SIGABRT, &sa, NULL); - sigaction (SIGFPE, &sa, NULL); - sigaction (SIGSYS, &sa, NULL); + sigaction(SIGSEGV, &sa, NULL); + sigaction(SIGBUS, &sa, NULL); + sigaction(SIGABRT, &sa, NULL); + sigaction(SIGFPE, &sa, NULL); + sigaction(SIGSYS, &sa, NULL); #endif } -RSPAMD_NO_SANITIZE void rspamd_unset_crash_handler (struct rspamd_main *unused_) +RSPAMD_NO_SANITIZE void rspamd_unset_crash_handler(struct rspamd_main *unused_) { #ifdef HAVE_SIGALTSTACK int ret; stack_t ss; - ret = sigaltstack (NULL, &ss); + ret = sigaltstack(NULL, &ss); if (ret != -1) { if (ss.ss_size > 0 && ss.ss_sp) { @@ -1769,122 +1763,122 @@ RSPAMD_NO_SANITIZE void rspamd_unset_crash_handler (struct rspamd_main *unused_) } static void -rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents) +rspamd_enable_accept_event(EV_P_ ev_timer *w, int revents) { struct rspamd_worker_accept_event *ac_ev = - (struct rspamd_worker_accept_event *)w->data; + (struct rspamd_worker_accept_event *) w->data; - ev_timer_stop (EV_A_ w); - ev_io_start (EV_A_ &ac_ev->accept_ev); + ev_timer_stop(EV_A_ w); + ev_io_start(EV_A_ & ac_ev->accept_ev); } -void -rspamd_worker_throttle_accept_events (gint sock, void *data) +void rspamd_worker_throttle_accept_events(gint sock, void *data) { struct rspamd_worker_accept_event *head, *cur; const gdouble throttling = 0.5; - head = (struct rspamd_worker_accept_event *)data; + head = (struct rspamd_worker_accept_event *) data; - DL_FOREACH (head, cur) { + DL_FOREACH(head, cur) + { - ev_io_stop (cur->event_loop, &cur->accept_ev); + ev_io_stop(cur->event_loop, &cur->accept_ev); cur->throttling_ev.data = cur; - ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event, - throttling, 0.0); - ev_timer_start (cur->event_loop, &cur->throttling_ev); + ev_timer_init(&cur->throttling_ev, rspamd_enable_accept_event, + throttling, 0.0); + ev_timer_start(cur->event_loop, &cur->throttling_ev); } } gboolean -rspamd_check_termination_clause (struct rspamd_main *rspamd_main, - struct rspamd_worker *wrk, - int res) +rspamd_check_termination_clause(struct rspamd_main *rspamd_main, + struct rspamd_worker *wrk, + int res) { gboolean need_refork = TRUE; if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die || - (wrk->flags & RSPAMD_WORKER_OLD_CONFIG)) { + (wrk->flags & RSPAMD_WORKER_OLD_CONFIG)) { /* Do not refork workers that are intended to be terminated */ need_refork = FALSE; } - if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + if (WIFEXITED(res) && WEXITSTATUS(res) == 0) { /* Normal worker termination, do not fork one more */ if (wrk->flags & RSPAMD_WORKER_OLD_CONFIG) { /* Never re-fork old workers */ - msg_info_main ("%s process %P terminated normally", - g_quark_to_string(wrk->type), - wrk->pid); + msg_info_main("%s process %P terminated normally", + g_quark_to_string(wrk->type), + wrk->pid); need_refork = FALSE; } else { if (wrk->hb.nbeats < 0 && rspamd_main->cfg->heartbeats_loss_max > 0 && -(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) { - msg_info_main ("%s process %P terminated normally, but lost %L " - "heartbeats, refork it", - g_quark_to_string(wrk->type), - wrk->pid, - -(wrk->hb.nbeats)); + msg_info_main("%s process %P terminated normally, but lost %L " + "heartbeats, refork it", + g_quark_to_string(wrk->type), + wrk->pid, + -(wrk->hb.nbeats)); need_refork = TRUE; } else { - msg_info_main ("%s process %P terminated normally", - g_quark_to_string(wrk->type), - wrk->pid); + msg_info_main("%s process %P terminated normally", + g_quark_to_string(wrk->type), + wrk->pid); need_refork = FALSE; } } } else { - if (WIFSIGNALED (res)) { + if (WIFSIGNALED(res)) { #ifdef WCOREDUMP - if (WCOREDUMP (res)) { - msg_warn_main ( - "%s process %P terminated abnormally by signal: %s" - " and created core file; please see Rspamd FAQ " - "to learn how to extract data from core file and " - "fill a bug report", - g_quark_to_string (wrk->type), - wrk->pid, - g_strsignal (WTERMSIG (res))); + if (WCOREDUMP(res)) { + msg_warn_main( + "%s process %P terminated abnormally by signal: %s" + " and created core file; please see Rspamd FAQ " + "to learn how to extract data from core file and " + "fill a bug report", + g_quark_to_string(wrk->type), + wrk->pid, + g_strsignal(WTERMSIG(res))); } else { #ifdef HAVE_SYS_RESOURCE_H struct rlimit rlmt; - (void) getrlimit (RLIMIT_CORE, &rlmt); - - msg_warn_main ( - "%s process %P terminated abnormally with exit code %d by " - "signal: %s" - " but NOT created core file (throttled=%s); " - "core file limits: %L current, %L max", - g_quark_to_string (wrk->type), - wrk->pid, - WEXITSTATUS (res), - g_strsignal (WTERMSIG (res)), - wrk->cores_throttled ? "yes" : "no", - (gint64) rlmt.rlim_cur, - (gint64) rlmt.rlim_max); + (void) getrlimit(RLIMIT_CORE, &rlmt); + + msg_warn_main( + "%s process %P terminated abnormally with exit code %d by " + "signal: %s" + " but NOT created core file (throttled=%s); " + "core file limits: %L current, %L max", + g_quark_to_string(wrk->type), + wrk->pid, + WEXITSTATUS(res), + g_strsignal(WTERMSIG(res)), + wrk->cores_throttled ? "yes" : "no", + (gint64) rlmt.rlim_cur, + (gint64) rlmt.rlim_max); #else - msg_warn_main ( - "%s process %P terminated abnormally with exit code %d by signal: %s" - " but NOT created core file (throttled=%s); ", - g_quark_to_string (wrk->type), - wrk->pid, WEXITSTATUS (res), - g_strsignal (WTERMSIG (res)), - wrk->cores_throttled ? "yes" : "no"); + msg_warn_main( + "%s process %P terminated abnormally with exit code %d by signal: %s" + " but NOT created core file (throttled=%s); ", + g_quark_to_string(wrk->type), + wrk->pid, WEXITSTATUS(res), + g_strsignal(WTERMSIG(res)), + wrk->cores_throttled ? "yes" : "no"); #endif } #else - msg_warn_main ( - "%s process %P terminated abnormally with exit code %d by signal: %s", - g_quark_to_string (wrk->type), - wrk->pid, WEXITSTATUS (res), - g_strsignal (WTERMSIG (res))); + msg_warn_main( + "%s process %P terminated abnormally with exit code %d by signal: %s", + g_quark_to_string(wrk->type), + wrk->pid, WEXITSTATUS(res), + g_strsignal(WTERMSIG(res))); #endif - if (WTERMSIG (res) == SIGUSR2) { + if (WTERMSIG(res) == SIGUSR2) { /* * It is actually race condition when not started process * has been requested to be reloaded. @@ -1895,12 +1889,12 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main, } } else { - msg_warn_main ("%s process %P terminated abnormally " - "(but it was not killed by a signal) " - "with exit code %d", - g_quark_to_string (wrk->type), - wrk->pid, - WEXITSTATUS (res)); + msg_warn_main("%s process %P terminated abnormally " + "(but it was not killed by a signal) " + "with exit code %d", + g_quark_to_string(wrk->type), + wrk->pid, + WEXITSTATUS(res)); } } @@ -1909,31 +1903,31 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main, #ifdef WITH_HYPERSCAN gboolean -rspamd_worker_hyperscan_ready (struct rspamd_main *rspamd_main, - struct rspamd_worker *worker, gint fd, - gint attached_fd, - struct rspamd_control_command *cmd, - gpointer ud) { +rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ struct rspamd_control_reply rep; struct rspamd_re_cache *cache = worker->srv->cfg->re_cache; - memset (&rep, 0, sizeof (rep)); + memset(&rep, 0, sizeof(rep)); rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED; - if (rspamd_re_cache_is_hs_loaded (cache) != RSPAMD_HYPERSCAN_LOADED_FULL || + if (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL || cmd->cmd.hs_loaded.forced) { - msg_info ("loading hyperscan expressions after receiving compilation " - "notice: %s", - (rspamd_re_cache_is_hs_loaded (cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? - "new db" : "forced update"); - rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan ( - worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false); + msg_info("loading hyperscan expressions after receiving compilation " + "notice: %s", + (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update"); + rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan( + worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false); } - if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { - msg_err ("cannot write reply to the control socket: %s", - strerror (errno)); + if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) { + msg_err("cannot write reply to the control socket: %s", + strerror(errno)); } return TRUE; @@ -1941,115 +1935,114 @@ rspamd_worker_hyperscan_ready (struct rspamd_main *rspamd_main, #endif /* With Hyperscan */ gboolean -rspamd_worker_check_context (gpointer ctx, guint64 magic) +rspamd_worker_check_context(gpointer ctx, guint64 magic) { - struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx; + struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx *) ctx; return actx->magic == magic; } static gboolean -rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, - struct rspamd_worker *worker, gint fd, - gint attached_fd, - struct rspamd_control_command *cmd, - gpointer ud) +rspamd_worker_log_pipe_handler(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) { struct rspamd_config *cfg = ud; struct rspamd_worker_log_pipe *lp; struct rspamd_control_reply rep; - memset (&rep, 0, sizeof (rep)); + memset(&rep, 0, sizeof(rep)); rep.type = RSPAMD_CONTROL_LOG_PIPE; if (attached_fd != -1) { - lp = g_malloc0 (sizeof (*lp)); + lp = g_malloc0(sizeof(*lp)); lp->fd = attached_fd; lp->type = cmd->cmd.log_pipe.type; - DL_APPEND (cfg->log_pipes, lp); - msg_info ("added new log pipe"); + DL_APPEND(cfg->log_pipes, lp); + msg_info("added new log pipe"); } else { rep.reply.log_pipe.status = ENOENT; - msg_err ("cannot attach log pipe: invalid fd"); + msg_err("cannot attach log pipe: invalid fd"); } - if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { - msg_err ("cannot write reply to the control socket: %s", - strerror (errno)); + if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) { + msg_err("cannot write reply to the control socket: %s", + strerror(errno)); } return TRUE; } static gboolean -rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main, - struct rspamd_worker *worker, gint fd, - gint attached_fd, - struct rspamd_control_command *cmd, - gpointer ud) +rspamd_worker_monitored_handler(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) { struct rspamd_control_reply rep; struct rspamd_monitored *m; struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx; struct rspamd_config *cfg = ud; - memset (&rep, 0, sizeof (rep)); + memset(&rep, 0, sizeof(rep)); rep.type = RSPAMD_CONTROL_MONITORED_CHANGE; - if (cmd->cmd.monitored_change.sender != getpid ()) { - m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag); + if (cmd->cmd.monitored_change.sender != getpid()) { + m = rspamd_monitored_by_tag(mctx, cmd->cmd.monitored_change.tag); if (m != NULL) { - rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive); + rspamd_monitored_set_alive(m, cmd->cmd.monitored_change.alive); rep.reply.monitored_change.status = 1; - msg_info_config ("updated monitored status for %s: %s", - cmd->cmd.monitored_change.tag, - cmd->cmd.monitored_change.alive ? "alive" : "dead"); - } else { - msg_err ("cannot find monitored by tag: %*s", 32, + msg_info_config("updated monitored status for %s: %s", + cmd->cmd.monitored_change.tag, + cmd->cmd.monitored_change.alive ? "alive" : "dead"); + } + else { + msg_err("cannot find monitored by tag: %*s", 32, cmd->cmd.monitored_change.tag); rep.reply.monitored_change.status = 0; } } - if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { - msg_err ("cannot write reply to the control socket: %s", - strerror (errno)); + if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) { + msg_err("cannot write reply to the control socket: %s", + strerror(errno)); } return TRUE; } -void -rspamd_worker_init_scanner (struct rspamd_worker *worker, - struct ev_loop *ev_base, - struct rspamd_dns_resolver *resolver, - struct rspamd_lang_detector **plang_det) +void rspamd_worker_init_scanner(struct rspamd_worker *worker, + struct ev_loop *ev_base, + struct rspamd_dns_resolver *resolver, + struct rspamd_lang_detector **plang_det) { - rspamd_stat_init (worker->srv->cfg, ev_base); + rspamd_stat_init(worker->srv->cfg, ev_base); #ifdef WITH_HYPERSCAN - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_HYPERSCAN_LOADED, - rspamd_worker_hyperscan_ready, - NULL); + rspamd_control_worker_add_cmd_handler(worker, + RSPAMD_CONTROL_HYPERSCAN_LOADED, + rspamd_worker_hyperscan_ready, + NULL); #endif - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_LOG_PIPE, - rspamd_worker_log_pipe_handler, - worker->srv->cfg); - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_MONITORED_CHANGE, - rspamd_worker_monitored_handler, - worker->srv->cfg); + rspamd_control_worker_add_cmd_handler(worker, + RSPAMD_CONTROL_LOG_PIPE, + rspamd_worker_log_pipe_handler, + worker->srv->cfg); + rspamd_control_worker_add_cmd_handler(worker, + RSPAMD_CONTROL_MONITORED_CHANGE, + rspamd_worker_monitored_handler, + worker->srv->cfg); *plang_det = worker->srv->cfg->lang_det; } -void -rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main, - struct rspamd_config *cfg) +void rspamd_controller_store_saved_stats(struct rspamd_main *rspamd_main, + struct rspamd_config *cfg) { struct rspamd_stat *stat; ucl_object_t *top, *sub; @@ -2062,82 +2055,79 @@ rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main, return; } - rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file); - fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644); + rspamd_snprintf(fpath, sizeof(fpath), "%s.XXXXXXXX", cfg->stats_file); + fd = g_mkstemp_full(fpath, O_WRONLY | O_TRUNC, 00644); if (fd == -1) { - msg_err_config ("cannot open for writing controller stats from %s: %s", - fpath, strerror (errno)); + msg_err_config("cannot open for writing controller stats from %s: %s", + fpath, strerror(errno)); return; } - fp = fdopen (fd, "w"); + fp = fdopen(fd, "w"); stat = rspamd_main->stat; - top = ucl_object_typed_new (UCL_OBJECT); - ucl_object_insert_key (top, ucl_object_fromint ( - stat->messages_scanned), "scanned", 0, false); - ucl_object_insert_key (top, ucl_object_fromint ( - stat->messages_learned), "learned", 0, false); + top = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(top, ucl_object_fromint(stat->messages_scanned), "scanned", 0, false); + ucl_object_insert_key(top, ucl_object_fromint(stat->messages_learned), "learned", 0, false); if (stat->messages_scanned > 0) { - sub = ucl_object_typed_new (UCL_OBJECT); + sub = ucl_object_typed_new(UCL_OBJECT); for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { - ucl_object_insert_key (sub, - ucl_object_fromint (stat->actions_stat[i]), - rspamd_action_to_str (i), 0, false); + ucl_object_insert_key(sub, + ucl_object_fromint(stat->actions_stat[i]), + rspamd_action_to_str(i), 0, false); } - ucl_object_insert_key (top, sub, "actions", 0, false); + ucl_object_insert_key(top, sub, "actions", 0, false); } - ucl_object_insert_key (top, - ucl_object_fromint (stat->connections_count), - "connections", 0, false); - ucl_object_insert_key (top, - ucl_object_fromint (stat->control_connections_count), - "control_connections", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(stat->connections_count), + "connections", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(stat->control_connections_count), + "control_connections", 0, false); - efuncs = ucl_object_emit_file_funcs (fp); - if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, - efuncs, NULL)) { - msg_err_config ("cannot write stats to %s: %s", - fpath, strerror (errno)); + efuncs = ucl_object_emit_file_funcs(fp); + if (!ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, + efuncs, NULL)) { + msg_err_config("cannot write stats to %s: %s", + fpath, strerror(errno)); - unlink (fpath); + unlink(fpath); } else { - if (rename (fpath, cfg->stats_file) == -1) { - msg_err_config ("cannot rename stats from %s to %s: %s", - fpath, cfg->stats_file, strerror (errno)); + if (rename(fpath, cfg->stats_file) == -1) { + msg_err_config("cannot rename stats from %s to %s: %s", + fpath, cfg->stats_file, strerror(errno)); } } - ucl_object_unref (top); - fclose (fp); - ucl_object_emit_funcs_free (efuncs); + ucl_object_unref(top); + fclose(fp); + ucl_object_emit_funcs_free(efuncs); } static ev_timer rrd_timer; -void -rspamd_controller_on_terminate (struct rspamd_worker *worker, - struct rspamd_rrd_file *rrd) +void rspamd_controller_on_terminate(struct rspamd_worker *worker, + struct rspamd_rrd_file *rrd) { struct rspamd_abstract_worker_ctx *ctx; - ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; - rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg); + ctx = (struct rspamd_abstract_worker_ctx *) worker->ctx; + rspamd_controller_store_saved_stats(worker->srv, worker->srv->cfg); if (rrd) { - ev_timer_stop (ctx->event_loop, &rrd_timer); - msg_info ("closing rrd file: %s", rrd->filename); - rspamd_rrd_close (rrd); + ev_timer_stop(ctx->event_loop, &rrd_timer); + msg_info("closing rrd file: %s", rrd->filename); + rspamd_rrd_close(rrd); } } static void -rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main, - struct rspamd_config *cfg) +rspamd_controller_load_saved_stats(struct rspamd_main *rspamd_main, + struct rspamd_config *cfg) { struct ucl_parser *parser; ucl_object_t *obj; @@ -2149,66 +2139,66 @@ rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main, return; } - if (access (cfg->stats_file, R_OK) == -1) { - msg_err_config ("cannot load controller stats from %s: %s", - cfg->stats_file, strerror (errno)); + if (access(cfg->stats_file, R_OK) == -1) { + msg_err_config("cannot load controller stats from %s: %s", + cfg->stats_file, strerror(errno)); return; } - parser = ucl_parser_new (0); + parser = ucl_parser_new(0); - if (!ucl_parser_add_file (parser, cfg->stats_file)) { - msg_err_config ("cannot parse controller stats from %s: %s", - cfg->stats_file, ucl_parser_get_error (parser)); - ucl_parser_free (parser); + if (!ucl_parser_add_file(parser, cfg->stats_file)) { + msg_err_config("cannot parse controller stats from %s: %s", + cfg->stats_file, ucl_parser_get_error(parser)); + ucl_parser_free(parser); return; } - obj = ucl_parser_get_object (parser); - ucl_parser_free (parser); + obj = ucl_parser_get_object(parser); + ucl_parser_free(parser); stat = rspamd_main->stat; - memcpy (&stat_copy, stat, sizeof (stat_copy)); + memcpy(&stat_copy, stat, sizeof(stat_copy)); - elt = ucl_object_lookup (obj, "scanned"); + elt = ucl_object_lookup(obj, "scanned"); - if (elt != NULL && ucl_object_type (elt) == UCL_INT) { - stat_copy.messages_scanned = ucl_object_toint (elt); + if (elt != NULL && ucl_object_type(elt) == UCL_INT) { + stat_copy.messages_scanned = ucl_object_toint(elt); } - elt = ucl_object_lookup (obj, "learned"); + elt = ucl_object_lookup(obj, "learned"); - if (elt != NULL && ucl_object_type (elt) == UCL_INT) { - stat_copy.messages_learned = ucl_object_toint (elt); + if (elt != NULL && ucl_object_type(elt) == UCL_INT) { + stat_copy.messages_learned = ucl_object_toint(elt); } - elt = ucl_object_lookup (obj, "actions"); + elt = ucl_object_lookup(obj, "actions"); if (elt != NULL) { for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { - subelt = ucl_object_lookup (elt, rspamd_action_to_str (i)); + subelt = ucl_object_lookup(elt, rspamd_action_to_str(i)); - if (subelt && ucl_object_type (subelt) == UCL_INT) { - stat_copy.actions_stat[i] = ucl_object_toint (subelt); + if (subelt && ucl_object_type(subelt) == UCL_INT) { + stat_copy.actions_stat[i] = ucl_object_toint(subelt); } } } - elt = ucl_object_lookup (obj, "connections_count"); + elt = ucl_object_lookup(obj, "connections_count"); - if (elt != NULL && ucl_object_type (elt) == UCL_INT) { - stat_copy.connections_count = ucl_object_toint (elt); + if (elt != NULL && ucl_object_type(elt) == UCL_INT) { + stat_copy.connections_count = ucl_object_toint(elt); } - elt = ucl_object_lookup (obj, "control_connections_count"); + elt = ucl_object_lookup(obj, "control_connections_count"); - if (elt != NULL && ucl_object_type (elt) == UCL_INT) { - stat_copy.control_connections_count = ucl_object_toint (elt); + if (elt != NULL && ucl_object_type(elt) == UCL_INT) { + stat_copy.control_connections_count = ucl_object_toint(elt); } - ucl_object_unref (obj); - memcpy (stat, &stat_copy, sizeof (stat_copy)); + ucl_object_unref(obj); + memcpy(stat, &stat_copy, sizeof(stat_copy)); } struct rspamd_controller_periodics_cbdata { @@ -2219,95 +2209,94 @@ struct rspamd_controller_periodics_cbdata { }; static void -rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents) +rspamd_controller_rrd_update(EV_P_ ev_timer *w, int revents) { struct rspamd_controller_periodics_cbdata *cbd = - (struct rspamd_controller_periodics_cbdata *)w->data; + (struct rspamd_controller_periodics_cbdata *) w->data; struct rspamd_stat *stat; GArray ar; gdouble points[METRIC_ACTION_MAX]; GError *err = NULL; guint i; - g_assert (cbd->rrd != NULL); + g_assert(cbd->rrd != NULL); stat = cbd->stat; - for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) { + for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i++) { points[i] = stat->actions_stat[i]; } - ar.data = (gchar *)points; - ar.len = sizeof (points); + ar.data = (gchar *) points; + ar.len = sizeof(points); - if (!rspamd_rrd_add_record (cbd->rrd, &ar, rspamd_get_calendar_ticks (), - &err)) { - msg_err ("cannot update rrd file: %e", err); - g_error_free (err); + if (!rspamd_rrd_add_record(cbd->rrd, &ar, rspamd_get_calendar_ticks(), + &err)) { + msg_err("cannot update rrd file: %e", err); + g_error_free(err); } /* Plan new event */ - ev_timer_again (EV_A_ w); + ev_timer_again(EV_A_ w); } static void -rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents) +rspamd_controller_stats_save_periodic(EV_P_ ev_timer *w, int revents) { struct rspamd_controller_periodics_cbdata *cbd = - (struct rspamd_controller_periodics_cbdata *)w->data; + (struct rspamd_controller_periodics_cbdata *) w->data; - rspamd_controller_store_saved_stats (cbd->worker->srv, cbd->worker->srv->cfg); - ev_timer_again (EV_A_ w); + rspamd_controller_store_saved_stats(cbd->worker->srv, cbd->worker->srv->cfg); + ev_timer_again(EV_A_ w); } -void -rspamd_worker_init_controller (struct rspamd_worker *worker, - struct rspamd_rrd_file **prrd) +void rspamd_worker_init_controller(struct rspamd_worker *worker, + struct rspamd_rrd_file **prrd) { struct rspamd_abstract_worker_ctx *ctx; static const ev_tstamp rrd_update_time = 1.0; - ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; - rspamd_controller_load_saved_stats (worker->srv, worker->srv->cfg); + ctx = (struct rspamd_abstract_worker_ctx *) worker->ctx; + rspamd_controller_load_saved_stats(worker->srv, worker->srv->cfg); if (worker->index == 0) { /* Enable periodics and other stuff */ static struct rspamd_controller_periodics_cbdata cbd; const ev_tstamp save_stats_interval = 60; /* 1 minute */ - memset (&cbd, 0, sizeof (cbd)); + memset(&cbd, 0, sizeof(cbd)); cbd.save_stats_event.data = &cbd; cbd.worker = worker; cbd.stat = worker->srv->stat; - ev_timer_init (&cbd.save_stats_event, - rspamd_controller_stats_save_periodic, - save_stats_interval, save_stats_interval); - ev_timer_start (ctx->event_loop, &cbd.save_stats_event); + ev_timer_init(&cbd.save_stats_event, + rspamd_controller_stats_save_periodic, + save_stats_interval, save_stats_interval); + ev_timer_start(ctx->event_loop, &cbd.save_stats_event); - rspamd_map_watch (worker->srv->cfg, ctx->event_loop, - ctx->resolver, worker, - RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER); + rspamd_map_watch(worker->srv->cfg, ctx->event_loop, + ctx->resolver, worker, + RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER); if (prrd != NULL) { if (ctx->cfg->rrd_file && worker->index == 0) { GError *rrd_err = NULL; - *prrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err); + *prrd = rspamd_rrd_file_default(ctx->cfg->rrd_file, &rrd_err); if (*prrd) { cbd.rrd = *prrd; rrd_timer.data = &cbd; - ev_timer_init (&rrd_timer, rspamd_controller_rrd_update, - rrd_update_time, rrd_update_time); - ev_timer_start (ctx->event_loop, &rrd_timer); + ev_timer_init(&rrd_timer, rspamd_controller_rrd_update, + rrd_update_time, rrd_update_time); + ev_timer_start(ctx->event_loop, &rrd_timer); } else if (rrd_err) { - msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file, + msg_err("cannot load rrd from %s: %e", ctx->cfg->rrd_file, rrd_err); - g_error_free (rrd_err); + g_error_free(rrd_err); } else { - msg_err ("cannot load rrd from %s: unknown error", + msg_err("cannot load rrd from %s: unknown error", ctx->cfg->rrd_file); } } @@ -2317,57 +2306,58 @@ rspamd_worker_init_controller (struct rspamd_worker *worker, } if (!ctx->cfg->disable_monitored) { - rspamd_worker_init_monitored (worker, - ctx->event_loop, ctx->resolver); + rspamd_worker_init_monitored(worker, + ctx->event_loop, ctx->resolver); } } else { - rspamd_map_watch (worker->srv->cfg, ctx->event_loop, - ctx->resolver, worker, RSPAMD_MAP_WATCH_SCANNER); + rspamd_map_watch(worker->srv->cfg, ctx->event_loop, + ctx->resolver, worker, RSPAMD_MAP_WATCH_SCANNER); } } gdouble -rspamd_worker_check_and_adjust_timeout (struct rspamd_config *cfg, gdouble timeout) +rspamd_worker_check_and_adjust_timeout(struct rspamd_config *cfg, gdouble timeout) { - if (isnan (timeout)) { + if (isnan(timeout)) { /* Use implicit timeout from cfg->task_timeout */ timeout = cfg->task_timeout; } - if (isnan (timeout)) { + if (isnan(timeout)) { return timeout; } - struct rspamd_symcache_timeout_result *tres = rspamd_symcache_get_max_timeout (cfg->cache); - g_assert (tres != 0); + struct rspamd_symcache_timeout_result *tres = rspamd_symcache_get_max_timeout(cfg->cache); + g_assert(tres != 0); if (tres->max_timeout > timeout) { msg_info_config("configured task_timeout %.2f is less than maximum symbols cache timeout %.2f; " - "some symbols can be terminated before checks", timeout, tres->max_timeout); + "some symbols can be terminated before checks", + timeout, tres->max_timeout); GString *buf = g_string_sized_new(512); static const int max_displayed_items = 12; for (int i = 0; i < MIN(tres->nitems, max_displayed_items); i++) { if (i == 0) { rspamd_printf_gstring(buf, "%s(%.2f)", - rspamd_symcache_item_name((struct rspamd_symcache_item *)tres->items[i].item), - tres->items[i].timeout); + rspamd_symcache_item_name((struct rspamd_symcache_item *) tres->items[i].item), + tres->items[i].timeout); } else { rspamd_printf_gstring(buf, "; %s(%.2f)", - rspamd_symcache_item_name((struct rspamd_symcache_item *)tres->items[i].item), - tres->items[i].timeout); + rspamd_symcache_item_name((struct rspamd_symcache_item *) tres->items[i].item), + tres->items[i].timeout); } } msg_info_config("list of top %d symbols by execution time: %v", - (int)MIN(tres->nitems, max_displayed_items), - buf); + (int) MIN(tres->nitems, max_displayed_items), + buf); g_string_free(buf, TRUE); } - rspamd_symcache_timeout_result_free (tres); + rspamd_symcache_timeout_result_free(tres); /* TODO: maybe adjust timeout */ return timeout; |