Browse Source

[Rework] Make log pipes worker agnostic, add scanners API

tags/1.6.0
Vsevolod Stakhov 7 years ago
parent
commit
ffe52ffb1f
5 changed files with 49 additions and 50 deletions
  1. 7
    0
      src/libserver/cfg_file.h
  2. 7
    0
      src/libserver/cfg_utils.c
  3. 3
    12
      src/libserver/protocol.c
  4. 25
    27
      src/worker.c
  5. 7
    11
      src/worker_private.h

+ 7
- 0
src/libserver/cfg_file.h View File

@@ -64,6 +64,12 @@ enum rspamd_log_type {
RSPAMD_LOG_FILE
};

struct rspamd_worker_log_pipe {
gint fd;
gint type;
struct rspamd_worker_log_pipe *prev, *next;
};

/**
* script module list item
*/
@@ -328,6 +334,7 @@ struct rspamd_config {
gboolean log_re_cache; /**< show statistics about regexps */
guint log_error_elts; /**< number of elements in error logbuf */
guint log_error_elt_maxlen; /**< maximum size of error log element */
struct rspamd_worker_log_pipe *log_pipes;

gboolean mlock_statfile_pool; /**< use mlock (2) for locking statfiles */
gboolean compat_messages; /**< use old messages in the protocol (array) */

+ 7
- 0
src/libserver/cfg_utils.c View File

@@ -191,6 +191,7 @@ void
rspamd_config_free (struct rspamd_config *cfg)
{
struct rspamd_config_post_load_script *sc, *sctmp;
struct rspamd_worker_log_pipe *lp, *ltmp;

rspamd_map_remove_all (cfg);

@@ -244,6 +245,12 @@ rspamd_config_free (struct rspamd_config *cfg)
rspamd_mempool_delete (cfg->cfg_pool);
lua_close (cfg->lua_state);
REF_RELEASE (cfg->libs_ctx);

DL_FOREACH_SAFE (cfg->log_pipes, lp, ltmp) {
close (lp->fd);
g_slice_free1 (sizeof (*lp), lp);
}

g_slice_free1 (sizeof (*cfg), cfg);
}


+ 3
- 12
src/libserver/protocol.c View File

@@ -1401,8 +1401,7 @@ end:
}

static void
rspamd_protocol_write_log_pipe (struct rspamd_worker_ctx *ctx,
struct rspamd_task *task)
rspamd_protocol_write_log_pipe (struct rspamd_task *task)
{
struct rspamd_worker_log_pipe *lp;
struct rspamd_protocol_log_message_sum *ls;
@@ -1547,7 +1546,7 @@ rspamd_protocol_write_log_pipe (struct rspamd_worker_ctx *ctx,

nextra = extra->len;

LL_FOREACH (ctx->log_pipes, lp) {
LL_FOREACH (task->cfg->log_pipes, lp) {
if (lp->fd != -1) {
switch (lp->type) {
case RSPAMD_LOG_PIPE_SYMBOLS:
@@ -1628,7 +1627,6 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
{
struct rspamd_http_message *msg;
const gchar *ctype = "application/json";
struct rspamd_abstract_worker_ctx *actx;
rspamd_fstring_t *reply;

msg = rspamd_http_new_message (HTTP_RESPONSE);
@@ -1678,14 +1676,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
case CMD_SKIP:
case CMD_CHECK_V2:
rspamd_protocol_http_reply (msg, task);

if (task->worker && task->worker->ctx) {
actx = task->worker->ctx;

if (actx->magic == rspamd_worker_magic) {
rspamd_protocol_write_log_pipe (task->worker->ctx, task);
}
}
rspamd_protocol_write_log_pipe (task);
break;
case CMD_PING:
rspamd_http_message_set_body (msg, "pong" CRLF, 6);

+ 25
- 27
src/worker.c View File

@@ -459,7 +459,7 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
gpointer ud)
{
struct rspamd_worker_ctx *ctx = ud;
struct rspamd_config *cfg = ud;
struct rspamd_worker_log_pipe *lp;
struct rspamd_control_reply rep;

@@ -471,7 +471,7 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
lp->fd = attached_fd;
lp->type = cmd->cmd.log_pipe.type;

DL_APPEND (ctx->log_pipes, lp);
DL_APPEND (cfg->log_pipes, lp);
msg_info ("added new log pipe");
}
else {
@@ -598,6 +598,28 @@ rspamd_worker_on_terminate (struct rspamd_worker *worker)
return FALSE;
}

void
rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct event_base *ev_base,
struct rspamd_dns_resolver *resolver)
{
rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
worker->srv->cfg, ev_base, resolver->r);
rspamd_stat_init (worker->srv->cfg, ev_base);
g_ptr_array_add (worker->finish_actions,
(gpointer) rspamd_worker_on_terminate);
#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_HYPERSCAN_LOADED,
rspamd_worker_hyperscan_ready,
NULL);
#endif
rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_LOG_PIPE,
rspamd_worker_log_pipe_handler,
worker->srv->cfg);
}

/*
* Start worker process
*/
@@ -605,7 +627,6 @@ void
start_worker (struct rspamd_worker *worker)
{
struct rspamd_worker_ctx *ctx = worker->ctx;
struct rspamd_worker_log_pipe *lp, *ltmp;

ctx->cfg = worker->srv->cfg;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket, TRUE);
@@ -617,42 +638,19 @@ start_worker (struct rspamd_worker *worker)
ctx->ev_base,
worker->srv->cfg);
rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);

rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->ev_base, ctx->resolver->r);
rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
worker->srv->cfg, ctx->ev_base, ctx->resolver->r);

/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
g_ptr_array_add (worker->finish_actions,
(gpointer) rspamd_worker_on_terminate);
rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver);

#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_HYPERSCAN_LOADED,
rspamd_worker_hyperscan_ready,
ctx);
#endif

rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_LOG_PIPE,
rspamd_worker_log_pipe_handler,
ctx);
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();

rspamd_stat_close ();
rspamd_log_close (worker->srv->logger);

rspamd_keypair_cache_destroy (ctx->keys_cache);

DL_FOREACH_SAFE (ctx->log_pipes, lp, ltmp) {
close (lp->fd);
g_slice_free1 (sizeof (*lp), lp);
}

REF_RELEASE (ctx->cfg);

exit (EXIT_SUCCESS);

+ 7
- 11
src/worker_private.h View File

@@ -23,15 +23,6 @@
#include "libserver/cfg_file.h"
#include "libserver/rspamd_control.h"

/*
* Worker's context
*/
struct rspamd_worker_log_pipe {
gint fd;
enum rspamd_log_pipe_type type;
struct rspamd_worker_log_pipe *prev, *next;
};

static const guint64 rspamd_worker_magic = 0xb48abc69d601dc1dULL;

struct rspamd_worker_ctx {
@@ -60,8 +51,13 @@ struct rspamd_worker_ctx {
struct rspamd_keypair_cache *keys_cache;
/* Configuration */
struct rspamd_config *cfg;
/* Log pipe */
struct rspamd_worker_log_pipe *log_pipes;
};

/*
* Init scanning routines
*/
void rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct event_base *ev_base,
struct rspamd_dns_resolver *resolver);

#endif

Loading…
Cancel
Save