From ffe52ffb1f0f5250292d24e0da4a0f22c0ed488b Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 2 May 2017 12:31:10 +0100 Subject: [PATCH] [Rework] Make log pipes worker agnostic, add scanners API --- src/libserver/cfg_file.h | 7 ++++++ src/libserver/cfg_utils.c | 7 ++++++ src/libserver/protocol.c | 15 +++-------- src/worker.c | 52 +++++++++++++++++++-------------------- src/worker_private.h | 18 ++++++-------- 5 files changed, 49 insertions(+), 50 deletions(-) diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 3963b43d8..2db607075 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -64,6 +64,12 @@ enum rspamd_log_type { RSPAMD_LOG_FILE }; +struct rspamd_worker_log_pipe { + gint fd; + gint type; + struct rspamd_worker_log_pipe *prev, *next; +}; + /** * script module list item */ @@ -328,6 +334,7 @@ struct rspamd_config { gboolean log_re_cache; /**< show statistics about regexps */ guint log_error_elts; /**< number of elements in error logbuf */ guint log_error_elt_maxlen; /**< maximum size of error log element */ + struct rspamd_worker_log_pipe *log_pipes; gboolean mlock_statfile_pool; /**< use mlock (2) for locking statfiles */ gboolean compat_messages; /**< use old messages in the protocol (array) */ diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index e8abcd9e6..354e9f082 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -191,6 +191,7 @@ void rspamd_config_free (struct rspamd_config *cfg) { struct rspamd_config_post_load_script *sc, *sctmp; + struct rspamd_worker_log_pipe *lp, *ltmp; rspamd_map_remove_all (cfg); @@ -244,6 +245,12 @@ rspamd_config_free (struct rspamd_config *cfg) rspamd_mempool_delete (cfg->cfg_pool); lua_close (cfg->lua_state); REF_RELEASE (cfg->libs_ctx); + + DL_FOREACH_SAFE (cfg->log_pipes, lp, ltmp) { + close (lp->fd); + g_slice_free1 (sizeof (*lp), lp); + } + g_slice_free1 (sizeof (*cfg), cfg); } diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index 5656d9458..cbf91027c 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -1401,8 +1401,7 @@ end: } static void -rspamd_protocol_write_log_pipe (struct rspamd_worker_ctx *ctx, - struct rspamd_task *task) +rspamd_protocol_write_log_pipe (struct rspamd_task *task) { struct rspamd_worker_log_pipe *lp; struct rspamd_protocol_log_message_sum *ls; @@ -1547,7 +1546,7 @@ rspamd_protocol_write_log_pipe (struct rspamd_worker_ctx *ctx, nextra = extra->len; - LL_FOREACH (ctx->log_pipes, lp) { + LL_FOREACH (task->cfg->log_pipes, lp) { if (lp->fd != -1) { switch (lp->type) { case RSPAMD_LOG_PIPE_SYMBOLS: @@ -1628,7 +1627,6 @@ rspamd_protocol_write_reply (struct rspamd_task *task) { struct rspamd_http_message *msg; const gchar *ctype = "application/json"; - struct rspamd_abstract_worker_ctx *actx; rspamd_fstring_t *reply; msg = rspamd_http_new_message (HTTP_RESPONSE); @@ -1678,14 +1676,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task) case CMD_SKIP: case CMD_CHECK_V2: rspamd_protocol_http_reply (msg, task); - - if (task->worker && task->worker->ctx) { - actx = task->worker->ctx; - - if (actx->magic == rspamd_worker_magic) { - rspamd_protocol_write_log_pipe (task->worker->ctx, task); - } - } + rspamd_protocol_write_log_pipe (task); break; case CMD_PING: rspamd_http_message_set_body (msg, "pong" CRLF, 6); diff --git a/src/worker.c b/src/worker.c index c611e0c39..724b9ccd2 100644 --- a/src/worker.c +++ b/src/worker.c @@ -459,7 +459,7 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, struct rspamd_control_command *cmd, gpointer ud) { - struct rspamd_worker_ctx *ctx = ud; + struct rspamd_config *cfg = ud; struct rspamd_worker_log_pipe *lp; struct rspamd_control_reply rep; @@ -471,7 +471,7 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, lp->fd = attached_fd; lp->type = cmd->cmd.log_pipe.type; - DL_APPEND (ctx->log_pipes, lp); + DL_APPEND (cfg->log_pipes, lp); msg_info ("added new log pipe"); } else { @@ -598,6 +598,28 @@ rspamd_worker_on_terminate (struct rspamd_worker *worker) return FALSE; } +void +rspamd_worker_init_scanner (struct rspamd_worker *worker, + struct event_base *ev_base, + struct rspamd_dns_resolver *resolver) +{ + rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx, + worker->srv->cfg, ev_base, resolver->r); + rspamd_stat_init (worker->srv->cfg, ev_base); + g_ptr_array_add (worker->finish_actions, + (gpointer) rspamd_worker_on_terminate); +#ifdef WITH_HYPERSCAN + rspamd_control_worker_add_cmd_handler (worker, + RSPAMD_CONTROL_HYPERSCAN_LOADED, + rspamd_worker_hyperscan_ready, + NULL); +#endif + rspamd_control_worker_add_cmd_handler (worker, + RSPAMD_CONTROL_LOG_PIPE, + rspamd_worker_log_pipe_handler, + worker->srv->cfg); +} + /* * Start worker process */ @@ -605,7 +627,6 @@ void start_worker (struct rspamd_worker *worker) { struct rspamd_worker_ctx *ctx = worker->ctx; - struct rspamd_worker_log_pipe *lp, *ltmp; ctx->cfg = worker->srv->cfg; ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket, TRUE); @@ -617,42 +638,19 @@ start_worker (struct rspamd_worker *worker) ctx->ev_base, worker->srv->cfg); rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver); - rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx, ctx->ev_base, ctx->resolver->r); - rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx, - worker->srv->cfg, ctx->ev_base, ctx->resolver->r); /* XXX: stupid default */ ctx->keys_cache = rspamd_keypair_cache_new (256); - rspamd_stat_init (worker->srv->cfg, ctx->ev_base); - g_ptr_array_add (worker->finish_actions, - (gpointer) rspamd_worker_on_terminate); + rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver); -#ifdef WITH_HYPERSCAN - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_HYPERSCAN_LOADED, - rspamd_worker_hyperscan_ready, - ctx); -#endif - - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_LOG_PIPE, - rspamd_worker_log_pipe_handler, - ctx); event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); rspamd_stat_close (); rspamd_log_close (worker->srv->logger); - rspamd_keypair_cache_destroy (ctx->keys_cache); - - DL_FOREACH_SAFE (ctx->log_pipes, lp, ltmp) { - close (lp->fd); - g_slice_free1 (sizeof (*lp), lp); - } - REF_RELEASE (ctx->cfg); exit (EXIT_SUCCESS); diff --git a/src/worker_private.h b/src/worker_private.h index 91989cdc0..ac391fc8c 100644 --- a/src/worker_private.h +++ b/src/worker_private.h @@ -23,15 +23,6 @@ #include "libserver/cfg_file.h" #include "libserver/rspamd_control.h" -/* - * Worker's context - */ -struct rspamd_worker_log_pipe { - gint fd; - enum rspamd_log_pipe_type type; - struct rspamd_worker_log_pipe *prev, *next; -}; - static const guint64 rspamd_worker_magic = 0xb48abc69d601dc1dULL; struct rspamd_worker_ctx { @@ -60,8 +51,13 @@ struct rspamd_worker_ctx { struct rspamd_keypair_cache *keys_cache; /* Configuration */ struct rspamd_config *cfg; - /* Log pipe */ - struct rspamd_worker_log_pipe *log_pipes; }; +/* + * Init scanning routines + */ +void rspamd_worker_init_scanner (struct rspamd_worker *worker, + struct event_base *ev_base, + struct rspamd_dns_resolver *resolver); + #endif -- 2.39.5