summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-05-02 12:31:10 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-05-02 12:31:10 +0100
commitffe52ffb1f0f5250292d24e0da4a0f22c0ed488b (patch)
tree0b58b8a81ac36726fdee0221ab9a5c5807a4c9e5
parentd71409c68eef6eadf741537d145ab022f255203b (diff)
downloadrspamd-ffe52ffb1f0f5250292d24e0da4a0f22c0ed488b.tar.gz
rspamd-ffe52ffb1f0f5250292d24e0da4a0f22c0ed488b.zip
[Rework] Make log pipes worker agnostic, add scanners API
-rw-r--r--src/libserver/cfg_file.h7
-rw-r--r--src/libserver/cfg_utils.c7
-rw-r--r--src/libserver/protocol.c15
-rw-r--r--src/worker.c52
-rw-r--r--src/worker_private.h18
5 files changed, 49 insertions, 50 deletions
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 3963b43d8..2db607075 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -64,6 +64,12 @@ enum rspamd_log_type {
RSPAMD_LOG_FILE
};
+struct rspamd_worker_log_pipe {
+ gint fd;
+ gint type;
+ struct rspamd_worker_log_pipe *prev, *next;
+};
+
/**
* script module list item
*/
@@ -328,6 +334,7 @@ struct rspamd_config {
gboolean log_re_cache; /**< show statistics about regexps */
guint log_error_elts; /**< number of elements in error logbuf */
guint log_error_elt_maxlen; /**< maximum size of error log element */
+ struct rspamd_worker_log_pipe *log_pipes;
gboolean mlock_statfile_pool; /**< use mlock (2) for locking statfiles */
gboolean compat_messages; /**< use old messages in the protocol (array) */
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index e8abcd9e6..354e9f082 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -191,6 +191,7 @@ void
rspamd_config_free (struct rspamd_config *cfg)
{
struct rspamd_config_post_load_script *sc, *sctmp;
+ struct rspamd_worker_log_pipe *lp, *ltmp;
rspamd_map_remove_all (cfg);
@@ -244,6 +245,12 @@ rspamd_config_free (struct rspamd_config *cfg)
rspamd_mempool_delete (cfg->cfg_pool);
lua_close (cfg->lua_state);
REF_RELEASE (cfg->libs_ctx);
+
+ DL_FOREACH_SAFE (cfg->log_pipes, lp, ltmp) {
+ close (lp->fd);
+ g_slice_free1 (sizeof (*lp), lp);
+ }
+
g_slice_free1 (sizeof (*cfg), cfg);
}
diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c
index 5656d9458..cbf91027c 100644
--- a/src/libserver/protocol.c
+++ b/src/libserver/protocol.c
@@ -1401,8 +1401,7 @@ end:
}
static void
-rspamd_protocol_write_log_pipe (struct rspamd_worker_ctx *ctx,
- struct rspamd_task *task)
+rspamd_protocol_write_log_pipe (struct rspamd_task *task)
{
struct rspamd_worker_log_pipe *lp;
struct rspamd_protocol_log_message_sum *ls;
@@ -1547,7 +1546,7 @@ rspamd_protocol_write_log_pipe (struct rspamd_worker_ctx *ctx,
nextra = extra->len;
- LL_FOREACH (ctx->log_pipes, lp) {
+ LL_FOREACH (task->cfg->log_pipes, lp) {
if (lp->fd != -1) {
switch (lp->type) {
case RSPAMD_LOG_PIPE_SYMBOLS:
@@ -1628,7 +1627,6 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
{
struct rspamd_http_message *msg;
const gchar *ctype = "application/json";
- struct rspamd_abstract_worker_ctx *actx;
rspamd_fstring_t *reply;
msg = rspamd_http_new_message (HTTP_RESPONSE);
@@ -1678,14 +1676,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
case CMD_SKIP:
case CMD_CHECK_V2:
rspamd_protocol_http_reply (msg, task);
-
- if (task->worker && task->worker->ctx) {
- actx = task->worker->ctx;
-
- if (actx->magic == rspamd_worker_magic) {
- rspamd_protocol_write_log_pipe (task->worker->ctx, task);
- }
- }
+ rspamd_protocol_write_log_pipe (task);
break;
case CMD_PING:
rspamd_http_message_set_body (msg, "pong" CRLF, 6);
diff --git a/src/worker.c b/src/worker.c
index c611e0c39..724b9ccd2 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -459,7 +459,7 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
gpointer ud)
{
- struct rspamd_worker_ctx *ctx = ud;
+ struct rspamd_config *cfg = ud;
struct rspamd_worker_log_pipe *lp;
struct rspamd_control_reply rep;
@@ -471,7 +471,7 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
lp->fd = attached_fd;
lp->type = cmd->cmd.log_pipe.type;
- DL_APPEND (ctx->log_pipes, lp);
+ DL_APPEND (cfg->log_pipes, lp);
msg_info ("added new log pipe");
}
else {
@@ -598,6 +598,28 @@ rspamd_worker_on_terminate (struct rspamd_worker *worker)
return FALSE;
}
+void
+rspamd_worker_init_scanner (struct rspamd_worker *worker,
+ struct event_base *ev_base,
+ struct rspamd_dns_resolver *resolver)
+{
+ rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
+ worker->srv->cfg, ev_base, resolver->r);
+ rspamd_stat_init (worker->srv->cfg, ev_base);
+ g_ptr_array_add (worker->finish_actions,
+ (gpointer) rspamd_worker_on_terminate);
+#ifdef WITH_HYPERSCAN
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_HYPERSCAN_LOADED,
+ rspamd_worker_hyperscan_ready,
+ NULL);
+#endif
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_LOG_PIPE,
+ rspamd_worker_log_pipe_handler,
+ worker->srv->cfg);
+}
+
/*
* Start worker process
*/
@@ -605,7 +627,6 @@ void
start_worker (struct rspamd_worker *worker)
{
struct rspamd_worker_ctx *ctx = worker->ctx;
- struct rspamd_worker_log_pipe *lp, *ltmp;
ctx->cfg = worker->srv->cfg;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket, TRUE);
@@ -617,42 +638,19 @@ start_worker (struct rspamd_worker *worker)
ctx->ev_base,
worker->srv->cfg);
rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
-
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->ev_base, ctx->resolver->r);
- rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
- worker->srv->cfg, ctx->ev_base, ctx->resolver->r);
/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);
- rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
- g_ptr_array_add (worker->finish_actions,
- (gpointer) rspamd_worker_on_terminate);
+ rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver);
-#ifdef WITH_HYPERSCAN
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_HYPERSCAN_LOADED,
- rspamd_worker_hyperscan_ready,
- ctx);
-#endif
-
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_LOG_PIPE,
- rspamd_worker_log_pipe_handler,
- ctx);
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
rspamd_stat_close ();
rspamd_log_close (worker->srv->logger);
-
rspamd_keypair_cache_destroy (ctx->keys_cache);
-
- DL_FOREACH_SAFE (ctx->log_pipes, lp, ltmp) {
- close (lp->fd);
- g_slice_free1 (sizeof (*lp), lp);
- }
-
REF_RELEASE (ctx->cfg);
exit (EXIT_SUCCESS);
diff --git a/src/worker_private.h b/src/worker_private.h
index 91989cdc0..ac391fc8c 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -23,15 +23,6 @@
#include "libserver/cfg_file.h"
#include "libserver/rspamd_control.h"
-/*
- * Worker's context
- */
-struct rspamd_worker_log_pipe {
- gint fd;
- enum rspamd_log_pipe_type type;
- struct rspamd_worker_log_pipe *prev, *next;
-};
-
static const guint64 rspamd_worker_magic = 0xb48abc69d601dc1dULL;
struct rspamd_worker_ctx {
@@ -60,8 +51,13 @@ struct rspamd_worker_ctx {
struct rspamd_keypair_cache *keys_cache;
/* Configuration */
struct rspamd_config *cfg;
- /* Log pipe */
- struct rspamd_worker_log_pipe *log_pipes;
};
+/*
+ * Init scanning routines
+ */
+void rspamd_worker_init_scanner (struct rspamd_worker *worker,
+ struct event_base *ev_base,
+ struct rspamd_dns_resolver *resolver);
+
#endif