]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Further steps towards one process monitoring
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 6 Jul 2017 07:56:02 +0000 (08:56 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 6 Jul 2017 07:56:02 +0000 (08:56 +0100)
src/libserver/monitored.c
src/libserver/monitored.h
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/worker.c

index 472c3d4e6180351c5639b670d77074fa6ec5a303..a280298c87f808a852ecfe03f6b90db61cae14b7 100644 (file)
@@ -39,6 +39,7 @@ struct rspamd_monitored_ctx {
        struct rdns_resolver *resolver;
        struct event_base *ev_base;
        GPtrArray *elts;
+       GHashTable *helts;
        mon_change_cb change_cb;
        gpointer ud;
        gdouble monitoring_interval;
@@ -61,7 +62,7 @@ struct rspamd_monitored {
        struct rspamd_monitored_ctx *ctx;
        struct rspamd_monitored_methods proc;
        struct event periodic;
-       gchar tag[MEMPOOL_UID_LEN];
+       gchar tag[RSPAMD_MONITORED_TAG_LEN];
 };
 
 #define msg_err_mon(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -352,6 +353,7 @@ rspamd_monitored_ctx_init (void)
        ctx->monitoring_interval = default_monitoring_interval;
        ctx->max_errors = default_max_errors;
        ctx->elts = g_ptr_array_new ();
+       ctx->helts = g_hash_table_new (g_str_hash, g_str_equal);
 
        return ctx;
 }
@@ -385,11 +387,12 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
 
 
 struct rspamd_monitored *
-rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
+rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx,
                const gchar *line,
                enum rspamd_monitored_type type,
                enum rspamd_monitored_flags flags,
-               const ucl_object_t *opts)
+               const ucl_object_t *opts,
+               const gchar *loc)
 {
        struct rspamd_monitored *m;
        rspamd_cryptobox_hash_state_t st;
@@ -429,9 +432,19 @@ rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
        /* Create a persistent tag */
        rspamd_cryptobox_hash_init (&st, NULL, 0);
        rspamd_cryptobox_hash_update (&st, m->url, strlen (m->url));
+       rspamd_cryptobox_hash_update (&st, loc, strlen (loc));
        rspamd_cryptobox_hash_final (&st, cksum);
        cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum));
        rspamd_strlcpy (m->tag, cksum_encoded, sizeof (m->tag));
+
+       if (g_hash_table_lookup (ctx->helts, m->tag) != NULL) {
+               msg_err ("monitored error: tag collision detected for %s; "
+                               "url: %s", m->tag, m->url);
+       }
+       else {
+               g_hash_table_insert (ctx->helts, m->tag, m);
+       }
+
        g_free (cksum_encoded);
 
        g_ptr_array_add (ctx->elts, m);
@@ -546,3 +559,26 @@ rspamd_monitored_ctx_destroy (struct rspamd_monitored_ctx *ctx)
        g_ptr_array_free (ctx->elts, TRUE);
        g_slice_free1 (sizeof (*ctx), ctx);
 }
+
+struct rspamd_monitored *
+rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
+               guchar tag[RSPAMD_MONITORED_TAG_LEN])
+{
+       struct rspamd_monitored *res;
+       gchar rtag[RSPAMD_MONITORED_TAG_LEN];
+
+       rspamd_strlcpy (rtag, tag, sizeof (rtag));
+       res = g_hash_table_lookup (ctx->helts, rtag);
+
+       return res;
+}
+
+
+void
+rspamd_monitored_get_tag (struct rspamd_monitored *m,
+               guchar tag_out[RSPAMD_MONITORED_TAG_LEN])
+{
+       g_assert (m != NULL);
+
+       rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN);
+}
\ No newline at end of file
index e342a5886a30eb375788f1b412d5d860e3bf906e..261227d35c237d773e35f4d3ccb39ba1b80507a5 100644 (file)
@@ -23,6 +23,8 @@ struct rspamd_monitored;
 struct rspamd_monitored_ctx;
 struct rspamd_config;
 
+#define RSPAMD_MONITORED_TAG_LEN 32
+
 enum rspamd_monitored_type {
        RSPAMD_MONITORED_DNS = 0,
 };
@@ -63,12 +65,32 @@ void rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
  * @param flags specific flags for monitoring
  * @return new monitored object
  */
-struct rspamd_monitored *rspamd_monitored_create (
+struct rspamd_monitored *rspamd_monitored_create_ (
                struct rspamd_monitored_ctx *ctx,
                const gchar *line,
                enum rspamd_monitored_type type,
                enum rspamd_monitored_flags flags,
-               const ucl_object_t *opts);
+               const ucl_object_t *opts,
+               const gchar *loc);
+#define rspamd_monitored_create(ctx, line, type, flags, opts) \
+       rspamd_monitored_create_(ctx, line, type, flags, opts, G_STRFUNC)
+
+/**
+ * Return monitored by its tag
+ * @param ctx
+ * @param tag
+ * @return
+ */
+struct rspamd_monitored * rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
+               guchar tag[RSPAMD_MONITORED_TAG_LEN]);
+
+/**
+ * Sets `tag_out` to the monitored tag
+ * @param m
+ * @param tag_out
+ */
+void rspamd_monitored_get_tag (struct rspamd_monitored *m,
+               guchar tag_out[RSPAMD_MONITORED_TAG_LEN]);
 
 /**
  * Return TRUE if monitored object is alive
index c78f501f777e9e9f92e53f099691ef850300a025..5d8008415962b9bb6b10a06d7938df67f5fbd786 100644 (file)
@@ -557,6 +557,7 @@ rspamd_control_default_cmd_handler (gint fd,
        case RSPAMD_CONTROL_RELOAD:
        case RSPAMD_CONTROL_RECOMPILE:
        case RSPAMD_CONTROL_HYPERSCAN_LOADED:
+       case RSPAMD_CONTROL_MONITORED_CHANGE:
        case RSPAMD_CONTROL_FUZZY_STAT:
        case RSPAMD_CONTROL_FUZZY_SYNC:
        case RSPAMD_CONTROL_LOG_PIPE:
@@ -807,10 +808,6 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                /* Broadcast command to all workers */
                                memset (&wcmd, 0, sizeof (wcmd));
                                wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
-                               /*
-                                * We assume that cache dir is shared at the same address for all
-                                * workers
-                                */
                                rspamd_strlcpy (wcmd.cmd.hs_loaded.cache_dir,
                                                cmd.cmd.hs_loaded.cache_dir,
                                                sizeof (wcmd.cmd.hs_loaded.cache_dir));
@@ -818,6 +815,17 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
                                                rspamd_control_hs_io_handler, NULL);
                                break;
+                       case RSPAMD_SRV_MONITORED_CHANGE:
+                               /* Broadcast command to all workers */
+                               memset (&wcmd, 0, sizeof (wcmd));
+                               wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+                               rspamd_strlcpy (wcmd.cmd.monitored_change.tag,
+                                               cmd.cmd.monitored_change.tag,
+                                               sizeof (wcmd.cmd.monitored_change.tag));
+                               wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
+                               rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+                                               rspamd_control_hs_io_handler, NULL);
+                               break;
                        case RSPAMD_SRV_LOG_PIPE:
                                memset (&wcmd, 0, sizeof (wcmd));
                                wcmd.type = RSPAMD_CONTROL_LOG_PIPE;
index c7a60ef58375320555393abea7303e1009ddc461..fd1395d96394acf59ead8015a6a722c7066053ad 100644 (file)
@@ -32,12 +32,14 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_LOG_PIPE,
        RSPAMD_CONTROL_FUZZY_STAT,
        RSPAMD_CONTROL_FUZZY_SYNC,
+       RSPAMD_CONTROL_MONITORED_CHANGE,
        RSPAMD_CONTROL_MAX
 };
 
 enum rspamd_srv_type {
        RSPAMD_SRV_SOCKETPAIR = 0,
        RSPAMD_SRV_HYPERSCAN_LOADED,
+       RSPAMD_SRV_MONITORED_CHANGE,
        RSPAMD_SRV_LOG_PIPE,
 };
 
@@ -64,6 +66,10 @@ struct rspamd_control_command {
                        gchar cache_dir[CONTROL_PATHLEN];
                        gboolean forced;
                } hs_loaded;
+               struct {
+                       gchar tag[32];
+                       gboolean alive;
+               } monitored_change;
                struct {
                        enum rspamd_log_pipe_type type;
                } log_pipe;
@@ -98,6 +104,9 @@ struct rspamd_control_reply {
                struct {
                        guint status;
                } hs_loaded;
+               struct {
+                       guint status;
+               } monitored_change;
                struct {
                        guint status;
                } log_pipe;
@@ -126,6 +135,10 @@ struct rspamd_srv_command {
                        gchar cache_dir[CONTROL_PATHLEN];
                        gboolean forced;
                } hs_loaded;
+               struct {
+                       gchar tag[32];
+                       gboolean alive;
+               } monitored_change;
                struct {
                        enum rspamd_log_pipe_type type;
                } log_pipe;
@@ -142,6 +155,9 @@ struct rspamd_srv_reply {
                struct {
                        gint forced;
                } hs_loaded;
+               struct {
+                       gint status;
+               };
                struct {
                        enum rspamd_log_pipe_type type;
                } log_pipe;
index 8cf2f5fe8795533aa7d1754de6686463a8d6c578..12a4ae2992f5f9c56fed0c1347300abe5219d06b 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #include <libserver/rspamd_control.h>
+#include <src/libserver/rspamd_control.h>
 #include "config.h"
 #include "libutil/util.h"
 #include "libutil/map.h"
@@ -487,6 +488,39 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
        return TRUE;
 }
 
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+               struct rspamd_worker *worker, gint fd,
+               gint attached_fd,
+               struct rspamd_control_command *cmd,
+               gpointer ud)
+{
+       struct rspamd_control_reply rep;
+       struct rspamd_monitored *m;
+       struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+
+       memset (&rep, 0, sizeof (rep));
+       rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+       m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+       if (!m) {
+               rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+               rep.reply.monitored_change.status = 1;
+       }
+       else {
+               msg_err ("cannot find monitored by tag: %*s", 32,
+                               cmd->cmd.monitored_change.tag);
+               rep.reply.monitored_change.status = 0;
+       }
+
+       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+               msg_err ("cannot write reply to the control socket: %s",
+                               strerror (errno));
+       }
+
+       return TRUE;
+}
+
 gpointer
 init_worker (struct rspamd_config *cfg)
 {
@@ -618,6 +652,10 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
                        RSPAMD_CONTROL_LOG_PIPE,
                        rspamd_worker_log_pipe_handler,
                        worker->srv->cfg);
+       rspamd_control_worker_add_cmd_handler (worker,
+                       RSPAMD_CONTROL_MONITORED_CHANGE,
+                       rspamd_worker_monitored_handler,
+                       worker->srv->cfg);
 }
 
 /*