From: Vsevolod Stakhov Date: Mon, 4 Apr 2016 13:24:07 +0000 (+0100) Subject: [Feature] Implement log pipe feature for rspamd logs analysis X-Git-Tag: 1.2.3~72 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=007c2befbe517ac6d44830a7900c3ac3fdf6d634;p=rspamd.git [Feature] Implement log pipe feature for rspamd logs analysis --- diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 6ed9d1502..cc336174e 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -577,6 +577,10 @@ rspamd_control_default_cmd_handler (gint fd, msg_err ("cannot write reply to the control socket: %s", strerror (errno)); } + + if (attached_fd != -1) { + close (attached_fd); + } } static void @@ -697,6 +701,18 @@ rspamd_control_hs_io_handler (gint fd, short what, gpointer ud) g_slice_free1 (sizeof (*elt), elt); } +static void +rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud) +{ + struct rspamd_control_reply_elt *elt = ud; + struct rspamd_control_reply rep; + + /* At this point we just ignore replies from the workers */ + (void) read (fd, &rep, sizeof (rep)); + event_del (&elt->io_ev); + g_slice_free1 (sizeof (*elt), elt); +} + static void rspamd_srv_handler (gint fd, short what, gpointer ud) { @@ -708,7 +724,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) struct cmsghdr *cmsg; struct iovec iov; guchar fdspace[CMSG_SPACE(sizeof (int))]; - gint *spair; + gint *spair, rfd = -1; gchar *nid; struct rspamd_control_command wcmd; gssize r; @@ -747,6 +763,9 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) rdata->rep.id = cmd.id; rdata->rep.type = cmd.type; rdata->fd = -1; + if (msg.msg_controllen >= CMSG_SPACE(sizeof (int))) { + rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR (&msg)); + } switch (cmd.type) { case RSPAMD_SRV_SOCKETPAIR: @@ -782,14 +801,24 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) */ wcmd.cmd.hs_loaded.cache_dir = cmd.cmd.hs_loaded.cache_dir; wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced; - rspamd_control_broadcast_cmd (srv, &wcmd, -1, + rspamd_control_broadcast_cmd (srv, &wcmd, rfd, rspamd_control_hs_io_handler, NULL); break; + case RSPAMD_SRV_LOG_PIPE: + memset (&wcmd, 0, sizeof (wcmd)); + wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type; + rspamd_control_broadcast_cmd (srv, &wcmd, rfd, + rspamd_control_log_pipe_io_handler, NULL); default: msg_err ("unknown command type: %d", cmd.type); break; } + if (rfd != -1) { + /* Close our copy to avoid descriptors leak */ + close (rfd); + } + /* Now plan write event and send data back */ event_del (&worker->srv_ev); event_set (&worker->srv_ev, diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index dbe1001d6..fd24b856b 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -29,6 +29,7 @@ enum rspamd_control_type { RSPAMD_CONTROL_RERESOLVE, RSPAMD_CONTROL_RECOMPILE, RSPAMD_CONTROL_HYPERSCAN_LOADED, + RSPAMD_CONTROL_LOG_PIPE, RSPAMD_CONTROL_FUZZY_STAT, RSPAMD_CONTROL_FUZZY_SYNC, RSPAMD_CONTROL_MAX @@ -37,6 +38,11 @@ enum rspamd_control_type { enum rspamd_srv_type { RSPAMD_SRV_SOCKETPAIR = 0, RSPAMD_SRV_HYPERSCAN_LOADED, + RSPAMD_SRV_LOG_PIPE, +}; + +enum rspamd_log_pipe_type { + RSPAMD_LOG_PIPE_SYMBOLS = 0, }; struct rspamd_control_command { @@ -58,6 +64,9 @@ struct rspamd_control_command { gpointer cache_dir; gboolean forced; } hs_loaded; + struct { + enum rspamd_log_pipe_type type; + } log_pipe; struct { guint unused; } fuzzy_stat; @@ -89,6 +98,9 @@ struct rspamd_control_reply { struct { guint status; } hs_loaded; + struct { + guint status; + } log_pipe; struct { guint status; gchar storage_id[MEMPOOL_UID_LEN]; @@ -113,6 +125,9 @@ struct rspamd_srv_command { gpointer cache_dir; gboolean forced; } hs_loaded; + struct { + enum rspamd_log_pipe_type type; + } log_pipe; } cmd; }; diff --git a/src/worker.c b/src/worker.c index 67c262e40..3939db702 100644 --- a/src/worker.c +++ b/src/worker.c @@ -17,6 +17,7 @@ * Rspamd worker implementation */ +#include #include "config.h" #include "libutil/util.h" #include "libutil/map.h" @@ -31,6 +32,7 @@ #include "libstat/stat_api.h" #include "libserver/worker_util.h" #include "libserver/rspamd_control.h" +#include "utlist.h" #include "lua/lua_common.h" @@ -105,6 +107,8 @@ struct rspamd_worker_ctx { struct rspamd_keypair_cache *keys_cache; /* Configuration */ struct rspamd_config *cfg; + /* Log pipe */ + struct rspamd_worker_log_pipe *log_pipes; }; /* @@ -392,6 +396,39 @@ rspamd_worker_hyperscan_ready (struct rspamd_main *rspamd_main, } #endif +static gboolean +rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_worker_ctx *ctx = ud; + struct rspamd_worker_log_pipe *lp; + struct rspamd_control_reply rep; + + memset (&rep, 0, sizeof (rep)); + rep.type = RSPAMD_CONTROL_LOG_PIPE; + + if (attached_fd != -1) { + lp = g_slice_alloc0 (sizeof (*lp)); + lp->fd = attached_fd; + lp->type = cmd->cmd.log_pipe.type; + + DL_APPEND (ctx->log_pipes, lp); + } + else { + rep.reply.log_pipe.status = ENOENT; + } + + if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { + msg_err ("cannot write reply to the control socket: %s", + strerror (errno)); + } + + return TRUE; +} + gpointer init_worker (struct rspamd_config *cfg) { @@ -495,6 +532,7 @@ void start_worker (struct rspamd_worker *worker) { struct rspamd_worker_ctx *ctx = worker->ctx; + struct rspamd_worker_log_pipe *lp, *ltmp; ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket); msec_to_tv (ctx->timeout, &ctx->io_tv); @@ -513,10 +551,16 @@ start_worker (struct rspamd_worker *worker) rspamd_stat_init (worker->srv->cfg, ctx->ev_base); #ifdef WITH_HYPERSCAN - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_HYPERSCAN_LOADED, - rspamd_worker_hyperscan_ready, ctx); + rspamd_control_worker_add_cmd_handler (worker, + RSPAMD_CONTROL_HYPERSCAN_LOADED, + rspamd_worker_hyperscan_ready, + ctx); #endif + rspamd_control_worker_add_cmd_handler (worker, + RSPAMD_CONTROL_LOG_PIPE, + rspamd_worker_log_pipe_handler, + ctx); event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); @@ -530,5 +574,10 @@ start_worker (struct rspamd_worker *worker) rspamd_keypair_cache_destroy (ctx->keys_cache); + DL_FOREACH_SAFE (ctx->log_pipes, lp, ltmp) { + close (lp->fd); + g_slice_free1 (sizeof (*lp), lp); + } + exit (EXIT_SUCCESS); }