msg_err ("cannot write reply to the control socket: %s",
strerror (errno));
}
+
+ if (attached_fd != -1) {
+ close (attached_fd);
+ }
}
static void
g_slice_free1 (sizeof (*elt), elt);
}
+static void
+rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
+{
+ struct rspamd_control_reply_elt *elt = ud;
+ struct rspamd_control_reply rep;
+
+ /* At this point we just ignore replies from the workers */
+ (void) read (fd, &rep, sizeof (rep));
+ event_del (&elt->io_ev);
+ g_slice_free1 (sizeof (*elt), elt);
+}
+
static void
rspamd_srv_handler (gint fd, short what, gpointer ud)
{
struct cmsghdr *cmsg;
struct iovec iov;
guchar fdspace[CMSG_SPACE(sizeof (int))];
- gint *spair;
+ gint *spair, rfd = -1;
gchar *nid;
struct rspamd_control_command wcmd;
gssize r;
rdata->rep.id = cmd.id;
rdata->rep.type = cmd.type;
rdata->fd = -1;
+ if (msg.msg_controllen >= CMSG_SPACE(sizeof (int))) {
+ rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR (&msg));
+ }
switch (cmd.type) {
case RSPAMD_SRV_SOCKETPAIR:
*/
wcmd.cmd.hs_loaded.cache_dir = cmd.cmd.hs_loaded.cache_dir;
wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
- rspamd_control_broadcast_cmd (srv, &wcmd, -1,
+ rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
break;
+ case RSPAMD_SRV_LOG_PIPE:
+ memset (&wcmd, 0, sizeof (wcmd));
+ wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
+ rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+ rspamd_control_log_pipe_io_handler, NULL);
default:
msg_err ("unknown command type: %d", cmd.type);
break;
}
+ if (rfd != -1) {
+ /* Close our copy to avoid descriptors leak */
+ close (rfd);
+ }
+
/* Now plan write event and send data back */
event_del (&worker->srv_ev);
event_set (&worker->srv_ev,
RSPAMD_CONTROL_RERESOLVE,
RSPAMD_CONTROL_RECOMPILE,
RSPAMD_CONTROL_HYPERSCAN_LOADED,
+ RSPAMD_CONTROL_LOG_PIPE,
RSPAMD_CONTROL_FUZZY_STAT,
RSPAMD_CONTROL_FUZZY_SYNC,
RSPAMD_CONTROL_MAX
enum rspamd_srv_type {
RSPAMD_SRV_SOCKETPAIR = 0,
RSPAMD_SRV_HYPERSCAN_LOADED,
+ RSPAMD_SRV_LOG_PIPE,
+};
+
+enum rspamd_log_pipe_type {
+ RSPAMD_LOG_PIPE_SYMBOLS = 0,
};
struct rspamd_control_command {
gpointer cache_dir;
gboolean forced;
} hs_loaded;
+ struct {
+ enum rspamd_log_pipe_type type;
+ } log_pipe;
struct {
guint unused;
} fuzzy_stat;
struct {
guint status;
} hs_loaded;
+ struct {
+ guint status;
+ } log_pipe;
struct {
guint status;
gchar storage_id[MEMPOOL_UID_LEN];
gpointer cache_dir;
gboolean forced;
} hs_loaded;
+ struct {
+ enum rspamd_log_pipe_type type;
+ } log_pipe;
} cmd;
};
* Rspamd worker implementation
*/
+#include <libserver/rspamd_control.h>
#include "config.h"
#include "libutil/util.h"
#include "libutil/map.h"
#include "libstat/stat_api.h"
#include "libserver/worker_util.h"
#include "libserver/rspamd_control.h"
+#include "utlist.h"
#include "lua/lua_common.h"
struct rspamd_keypair_cache *keys_cache;
/* Configuration */
struct rspamd_config *cfg;
+ /* Log pipe */
+ struct rspamd_worker_log_pipe *log_pipes;
};
/*
}
#endif
+static gboolean
+rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_worker_ctx *ctx = ud;
+ struct rspamd_worker_log_pipe *lp;
+ struct rspamd_control_reply rep;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_LOG_PIPE;
+
+ if (attached_fd != -1) {
+ lp = g_slice_alloc0 (sizeof (*lp));
+ lp->fd = attached_fd;
+ lp->type = cmd->cmd.log_pipe.type;
+
+ DL_APPEND (ctx->log_pipes, lp);
+ }
+ else {
+ rep.reply.log_pipe.status = ENOENT;
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
gpointer
init_worker (struct rspamd_config *cfg)
{
start_worker (struct rspamd_worker *worker)
{
struct rspamd_worker_ctx *ctx = worker->ctx;
+ struct rspamd_worker_log_pipe *lp, *ltmp;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
msec_to_tv (ctx->timeout, &ctx->io_tv);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
#ifdef WITH_HYPERSCAN
- rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_HYPERSCAN_LOADED,
- rspamd_worker_hyperscan_ready, ctx);
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_HYPERSCAN_LOADED,
+ rspamd_worker_hyperscan_ready,
+ ctx);
#endif
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_LOG_PIPE,
+ rspamd_worker_log_pipe_handler,
+ ctx);
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
rspamd_keypair_cache_destroy (ctx->keys_cache);
+ DL_FOREACH_SAFE (ctx->log_pipes, lp, ltmp) {
+ close (lp->fd);
+ g_slice_free1 (sizeof (*lp), lp);
+ }
+
exit (EXIT_SUCCESS);
}