From fd20fa9f63e72df66b2b702cde982f729f704480 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 6 Jul 2017 08:56:02 +0100 Subject: [PATCH] [Rework] Further steps towards one process monitoring --- src/libserver/monitored.c | 42 +++++++++++++++++++++++++++++++--- src/libserver/monitored.h | 26 +++++++++++++++++++-- src/libserver/rspamd_control.c | 16 +++++++++---- src/libserver/rspamd_control.h | 16 +++++++++++++ src/worker.c | 38 ++++++++++++++++++++++++++++++ 5 files changed, 129 insertions(+), 9 deletions(-) diff --git a/src/libserver/monitored.c b/src/libserver/monitored.c index 472c3d4e6..a280298c8 100644 --- a/src/libserver/monitored.c +++ b/src/libserver/monitored.c @@ -39,6 +39,7 @@ struct rspamd_monitored_ctx { struct rdns_resolver *resolver; struct event_base *ev_base; GPtrArray *elts; + GHashTable *helts; mon_change_cb change_cb; gpointer ud; gdouble monitoring_interval; @@ -61,7 +62,7 @@ struct rspamd_monitored { struct rspamd_monitored_ctx *ctx; struct rspamd_monitored_methods proc; struct event periodic; - gchar tag[MEMPOOL_UID_LEN]; + gchar tag[RSPAMD_MONITORED_TAG_LEN]; }; #define msg_err_mon(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ @@ -352,6 +353,7 @@ rspamd_monitored_ctx_init (void) ctx->monitoring_interval = default_monitoring_interval; ctx->max_errors = default_max_errors; ctx->elts = g_ptr_array_new (); + ctx->helts = g_hash_table_new (g_str_hash, g_str_equal); return ctx; } @@ -385,11 +387,12 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx, struct rspamd_monitored * -rspamd_monitored_create (struct rspamd_monitored_ctx *ctx, +rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx, const gchar *line, enum rspamd_monitored_type type, enum rspamd_monitored_flags flags, - const ucl_object_t *opts) + const ucl_object_t *opts, + const gchar *loc) { struct rspamd_monitored *m; rspamd_cryptobox_hash_state_t st; @@ -429,9 +432,19 @@ rspamd_monitored_create (struct rspamd_monitored_ctx *ctx, /* Create a persistent tag */ rspamd_cryptobox_hash_init (&st, NULL, 0); rspamd_cryptobox_hash_update (&st, m->url, strlen (m->url)); + rspamd_cryptobox_hash_update (&st, loc, strlen (loc)); rspamd_cryptobox_hash_final (&st, cksum); cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum)); rspamd_strlcpy (m->tag, cksum_encoded, sizeof (m->tag)); + + if (g_hash_table_lookup (ctx->helts, m->tag) != NULL) { + msg_err ("monitored error: tag collision detected for %s; " + "url: %s", m->tag, m->url); + } + else { + g_hash_table_insert (ctx->helts, m->tag, m); + } + g_free (cksum_encoded); g_ptr_array_add (ctx->elts, m); @@ -546,3 +559,26 @@ rspamd_monitored_ctx_destroy (struct rspamd_monitored_ctx *ctx) g_ptr_array_free (ctx->elts, TRUE); g_slice_free1 (sizeof (*ctx), ctx); } + +struct rspamd_monitored * +rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx, + guchar tag[RSPAMD_MONITORED_TAG_LEN]) +{ + struct rspamd_monitored *res; + gchar rtag[RSPAMD_MONITORED_TAG_LEN]; + + rspamd_strlcpy (rtag, tag, sizeof (rtag)); + res = g_hash_table_lookup (ctx->helts, rtag); + + return res; +} + + +void +rspamd_monitored_get_tag (struct rspamd_monitored *m, + guchar tag_out[RSPAMD_MONITORED_TAG_LEN]) +{ + g_assert (m != NULL); + + rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN); +} \ No newline at end of file diff --git a/src/libserver/monitored.h b/src/libserver/monitored.h index e342a5886..261227d35 100644 --- a/src/libserver/monitored.h +++ b/src/libserver/monitored.h @@ -23,6 +23,8 @@ struct rspamd_monitored; struct rspamd_monitored_ctx; struct rspamd_config; +#define RSPAMD_MONITORED_TAG_LEN 32 + enum rspamd_monitored_type { RSPAMD_MONITORED_DNS = 0, }; @@ -63,12 +65,32 @@ void rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx, * @param flags specific flags for monitoring * @return new monitored object */ -struct rspamd_monitored *rspamd_monitored_create ( +struct rspamd_monitored *rspamd_monitored_create_ ( struct rspamd_monitored_ctx *ctx, const gchar *line, enum rspamd_monitored_type type, enum rspamd_monitored_flags flags, - const ucl_object_t *opts); + const ucl_object_t *opts, + const gchar *loc); +#define rspamd_monitored_create(ctx, line, type, flags, opts) \ + rspamd_monitored_create_(ctx, line, type, flags, opts, G_STRFUNC) + +/** + * Return monitored by its tag + * @param ctx + * @param tag + * @return + */ +struct rspamd_monitored * rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx, + guchar tag[RSPAMD_MONITORED_TAG_LEN]); + +/** + * Sets `tag_out` to the monitored tag + * @param m + * @param tag_out + */ +void rspamd_monitored_get_tag (struct rspamd_monitored *m, + guchar tag_out[RSPAMD_MONITORED_TAG_LEN]); /** * Return TRUE if monitored object is alive diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index c78f501f7..5d8008415 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -557,6 +557,7 @@ rspamd_control_default_cmd_handler (gint fd, case RSPAMD_CONTROL_RELOAD: case RSPAMD_CONTROL_RECOMPILE: case RSPAMD_CONTROL_HYPERSCAN_LOADED: + case RSPAMD_CONTROL_MONITORED_CHANGE: case RSPAMD_CONTROL_FUZZY_STAT: case RSPAMD_CONTROL_FUZZY_SYNC: case RSPAMD_CONTROL_LOG_PIPE: @@ -807,10 +808,6 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) /* Broadcast command to all workers */ memset (&wcmd, 0, sizeof (wcmd)); wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED; - /* - * We assume that cache dir is shared at the same address for all - * workers - */ rspamd_strlcpy (wcmd.cmd.hs_loaded.cache_dir, cmd.cmd.hs_loaded.cache_dir, sizeof (wcmd.cmd.hs_loaded.cache_dir)); @@ -818,6 +815,17 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) rspamd_control_broadcast_cmd (srv, &wcmd, rfd, rspamd_control_hs_io_handler, NULL); break; + case RSPAMD_SRV_MONITORED_CHANGE: + /* Broadcast command to all workers */ + memset (&wcmd, 0, sizeof (wcmd)); + wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE; + rspamd_strlcpy (wcmd.cmd.monitored_change.tag, + cmd.cmd.monitored_change.tag, + sizeof (wcmd.cmd.monitored_change.tag)); + wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive; + rspamd_control_broadcast_cmd (srv, &wcmd, rfd, + rspamd_control_hs_io_handler, NULL); + break; case RSPAMD_SRV_LOG_PIPE: memset (&wcmd, 0, sizeof (wcmd)); wcmd.type = RSPAMD_CONTROL_LOG_PIPE; diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index c7a60ef58..fd1395d96 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -32,12 +32,14 @@ enum rspamd_control_type { RSPAMD_CONTROL_LOG_PIPE, RSPAMD_CONTROL_FUZZY_STAT, RSPAMD_CONTROL_FUZZY_SYNC, + RSPAMD_CONTROL_MONITORED_CHANGE, RSPAMD_CONTROL_MAX }; enum rspamd_srv_type { RSPAMD_SRV_SOCKETPAIR = 0, RSPAMD_SRV_HYPERSCAN_LOADED, + RSPAMD_SRV_MONITORED_CHANGE, RSPAMD_SRV_LOG_PIPE, }; @@ -64,6 +66,10 @@ struct rspamd_control_command { gchar cache_dir[CONTROL_PATHLEN]; gboolean forced; } hs_loaded; + struct { + gchar tag[32]; + gboolean alive; + } monitored_change; struct { enum rspamd_log_pipe_type type; } log_pipe; @@ -98,6 +104,9 @@ struct rspamd_control_reply { struct { guint status; } hs_loaded; + struct { + guint status; + } monitored_change; struct { guint status; } log_pipe; @@ -126,6 +135,10 @@ struct rspamd_srv_command { gchar cache_dir[CONTROL_PATHLEN]; gboolean forced; } hs_loaded; + struct { + gchar tag[32]; + gboolean alive; + } monitored_change; struct { enum rspamd_log_pipe_type type; } log_pipe; @@ -142,6 +155,9 @@ struct rspamd_srv_reply { struct { gint forced; } hs_loaded; + struct { + gint status; + }; struct { enum rspamd_log_pipe_type type; } log_pipe; diff --git a/src/worker.c b/src/worker.c index 8cf2f5fe8..12a4ae299 100644 --- a/src/worker.c +++ b/src/worker.c @@ -18,6 +18,7 @@ */ #include +#include #include "config.h" #include "libutil/util.h" #include "libutil/map.h" @@ -487,6 +488,39 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, return TRUE; } +static gboolean +rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_control_reply rep; + struct rspamd_monitored *m; + struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx; + + memset (&rep, 0, sizeof (rep)); + rep.type = RSPAMD_CONTROL_MONITORED_CHANGE; + m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag); + + if (!m) { + rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive); + rep.reply.monitored_change.status = 1; + } + else { + msg_err ("cannot find monitored by tag: %*s", 32, + cmd->cmd.monitored_change.tag); + rep.reply.monitored_change.status = 0; + } + + if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { + msg_err ("cannot write reply to the control socket: %s", + strerror (errno)); + } + + return TRUE; +} + gpointer init_worker (struct rspamd_config *cfg) { @@ -618,6 +652,10 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker, RSPAMD_CONTROL_LOG_PIPE, rspamd_worker_log_pipe_handler, worker->srv->cfg); + rspamd_control_worker_add_cmd_handler (worker, + RSPAMD_CONTROL_MONITORED_CHANGE, + rspamd_worker_monitored_handler, + worker->srv->cfg); } /* -- 2.39.5