]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement log pipe feature for rspamd logs analysis
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 4 Apr 2016 13:24:07 +0000 (14:24 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 4 Apr 2016 13:24:07 +0000 (14:24 +0100)
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/worker.c

index 6ed9d15029f974447616c2d7921baf5955cbf00a..cc336174e48d4925362c71f1680bf080ce9c3c94 100644 (file)
@@ -577,6 +577,10 @@ rspamd_control_default_cmd_handler (gint fd,
                msg_err ("cannot write reply to the control socket: %s",
                                strerror (errno));
        }
+
+       if (attached_fd != -1) {
+               close (attached_fd);
+       }
 }
 
 static void
@@ -697,6 +701,18 @@ rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
        g_slice_free1 (sizeof (*elt), elt);
 }
 
+static void
+rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
+{
+       struct rspamd_control_reply_elt *elt = ud;
+       struct rspamd_control_reply rep;
+
+       /* At this point we just ignore replies from the workers */
+       (void) read (fd, &rep, sizeof (rep));
+       event_del (&elt->io_ev);
+       g_slice_free1 (sizeof (*elt), elt);
+}
+
 static void
 rspamd_srv_handler (gint fd, short what, gpointer ud)
 {
@@ -708,7 +724,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
        struct cmsghdr *cmsg;
        struct iovec iov;
        guchar fdspace[CMSG_SPACE(sizeof (int))];
-       gint *spair;
+       gint *spair, rfd = -1;
        gchar *nid;
        struct rspamd_control_command wcmd;
        gssize r;
@@ -747,6 +763,9 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                        rdata->rep.id = cmd.id;
                        rdata->rep.type = cmd.type;
                        rdata->fd = -1;
+                       if (msg.msg_controllen >= CMSG_SPACE(sizeof (int))) {
+                               rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR (&msg));
+                       }
 
                        switch (cmd.type) {
                        case RSPAMD_SRV_SOCKETPAIR:
@@ -782,14 +801,24 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                 */
                                wcmd.cmd.hs_loaded.cache_dir = cmd.cmd.hs_loaded.cache_dir;
                                wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
-                               rspamd_control_broadcast_cmd (srv, &wcmd, -1,
+                               rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
                                                rspamd_control_hs_io_handler, NULL);
                                break;
+                       case RSPAMD_SRV_LOG_PIPE:
+                               memset (&wcmd, 0, sizeof (wcmd));
+                               wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
+                               rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+                                               rspamd_control_log_pipe_io_handler, NULL);
                        default:
                                msg_err ("unknown command type: %d", cmd.type);
                                break;
                        }
 
+                       if (rfd != -1) {
+                               /* Close our copy to avoid descriptors leak */
+                               close (rfd);
+                       }
+
                        /* Now plan write event and send data back */
                        event_del (&worker->srv_ev);
                        event_set (&worker->srv_ev,
index dbe1001d67163489341681079c71b9a9b7271b41..fd24b856befa0108a7d0c9fbc80af01278808ae9 100644 (file)
@@ -29,6 +29,7 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_RERESOLVE,
        RSPAMD_CONTROL_RECOMPILE,
        RSPAMD_CONTROL_HYPERSCAN_LOADED,
+       RSPAMD_CONTROL_LOG_PIPE,
        RSPAMD_CONTROL_FUZZY_STAT,
        RSPAMD_CONTROL_FUZZY_SYNC,
        RSPAMD_CONTROL_MAX
@@ -37,6 +38,11 @@ enum rspamd_control_type {
 enum rspamd_srv_type {
        RSPAMD_SRV_SOCKETPAIR = 0,
        RSPAMD_SRV_HYPERSCAN_LOADED,
+       RSPAMD_SRV_LOG_PIPE,
+};
+
+enum rspamd_log_pipe_type {
+       RSPAMD_LOG_PIPE_SYMBOLS = 0,
 };
 
 struct rspamd_control_command {
@@ -58,6 +64,9 @@ struct rspamd_control_command {
                        gpointer cache_dir;
                        gboolean forced;
                } hs_loaded;
+               struct {
+                       enum rspamd_log_pipe_type type;
+               } log_pipe;
                struct {
                        guint unused;
                } fuzzy_stat;
@@ -89,6 +98,9 @@ struct rspamd_control_reply {
                struct {
                        guint status;
                } hs_loaded;
+               struct {
+                       guint status;
+               } log_pipe;
                struct {
                        guint status;
                        gchar storage_id[MEMPOOL_UID_LEN];
@@ -113,6 +125,9 @@ struct rspamd_srv_command {
                        gpointer cache_dir;
                        gboolean forced;
                } hs_loaded;
+               struct {
+                       enum rspamd_log_pipe_type type;
+               } log_pipe;
        } cmd;
 };
 
index 67c262e4038ae76b7dff43c3097d8730988e60d0..3939db7022e9e6c48063ac4d19651075f057369a 100644 (file)
@@ -17,6 +17,7 @@
  * Rspamd worker implementation
  */
 
+#include <libserver/rspamd_control.h>
 #include "config.h"
 #include "libutil/util.h"
 #include "libutil/map.h"
@@ -31,6 +32,7 @@
 #include "libstat/stat_api.h"
 #include "libserver/worker_util.h"
 #include "libserver/rspamd_control.h"
+#include "utlist.h"
 
 #include "lua/lua_common.h"
 
@@ -105,6 +107,8 @@ struct rspamd_worker_ctx {
        struct rspamd_keypair_cache *keys_cache;
        /* Configuration */
        struct rspamd_config *cfg;
+       /* Log pipe */
+       struct rspamd_worker_log_pipe *log_pipes;
 };
 
 /*
@@ -392,6 +396,39 @@ rspamd_worker_hyperscan_ready (struct rspamd_main *rspamd_main,
 }
 #endif
 
+static gboolean
+rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
+               struct rspamd_worker *worker, gint fd,
+               gint attached_fd,
+               struct rspamd_control_command *cmd,
+               gpointer ud)
+{
+       struct rspamd_worker_ctx *ctx = ud;
+       struct rspamd_worker_log_pipe *lp;
+       struct rspamd_control_reply rep;
+
+       memset (&rep, 0, sizeof (rep));
+       rep.type = RSPAMD_CONTROL_LOG_PIPE;
+
+       if (attached_fd != -1) {
+               lp = g_slice_alloc0 (sizeof (*lp));
+               lp->fd = attached_fd;
+               lp->type = cmd->cmd.log_pipe.type;
+
+               DL_APPEND (ctx->log_pipes, lp);
+       }
+       else {
+               rep.reply.log_pipe.status = ENOENT;
+       }
+
+       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+               msg_err ("cannot write reply to the control socket: %s",
+                               strerror (errno));
+       }
+
+       return TRUE;
+}
+
 gpointer
 init_worker (struct rspamd_config *cfg)
 {
@@ -495,6 +532,7 @@ void
 start_worker (struct rspamd_worker *worker)
 {
        struct rspamd_worker_ctx *ctx = worker->ctx;
+       struct rspamd_worker_log_pipe *lp, *ltmp;
 
        ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
        msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -513,10 +551,16 @@ start_worker (struct rspamd_worker *worker)
        rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
 
 #ifdef WITH_HYPERSCAN
-       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_HYPERSCAN_LOADED,
-                       rspamd_worker_hyperscan_ready, ctx);
+       rspamd_control_worker_add_cmd_handler (worker,
+                       RSPAMD_CONTROL_HYPERSCAN_LOADED,
+                       rspamd_worker_hyperscan_ready,
+                       ctx);
 #endif
 
+       rspamd_control_worker_add_cmd_handler (worker,
+                       RSPAMD_CONTROL_LOG_PIPE,
+                       rspamd_worker_log_pipe_handler,
+                       ctx);
        event_base_loop (ctx->ev_base, 0);
        rspamd_worker_block_signals ();
 
@@ -530,5 +574,10 @@ start_worker (struct rspamd_worker *worker)
 
        rspamd_keypair_cache_destroy (ctx->keys_cache);
 
+       DL_FOREACH_SAFE (ctx->log_pipes, lp, ltmp) {
+               close (lp->fd);
+               g_slice_free1 (sizeof (*lp), lp);
+       }
+
        exit (EXIT_SUCCESS);
 }