aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-07-06 08:56:02 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-07-06 08:56:02 +0100
commitfd20fa9f63e72df66b2b702cde982f729f704480 (patch)
treebb7dc05025ff92cf09911bea0280ae761b1c8097
parent07b8d8cb2d14a357f196afcb42aa8469b43a6167 (diff)
downloadrspamd-fd20fa9f63e72df66b2b702cde982f729f704480.tar.gz
rspamd-fd20fa9f63e72df66b2b702cde982f729f704480.zip
[Rework] Further steps towards one process monitoring
-rw-r--r--src/libserver/monitored.c42
-rw-r--r--src/libserver/monitored.h26
-rw-r--r--src/libserver/rspamd_control.c16
-rw-r--r--src/libserver/rspamd_control.h16
-rw-r--r--src/worker.c38
5 files changed, 129 insertions, 9 deletions
diff --git a/src/libserver/monitored.c b/src/libserver/monitored.c
index 472c3d4e6..a280298c8 100644
--- a/src/libserver/monitored.c
+++ b/src/libserver/monitored.c
@@ -39,6 +39,7 @@ struct rspamd_monitored_ctx {
struct rdns_resolver *resolver;
struct event_base *ev_base;
GPtrArray *elts;
+ GHashTable *helts;
mon_change_cb change_cb;
gpointer ud;
gdouble monitoring_interval;
@@ -61,7 +62,7 @@ struct rspamd_monitored {
struct rspamd_monitored_ctx *ctx;
struct rspamd_monitored_methods proc;
struct event periodic;
- gchar tag[MEMPOOL_UID_LEN];
+ gchar tag[RSPAMD_MONITORED_TAG_LEN];
};
#define msg_err_mon(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -352,6 +353,7 @@ rspamd_monitored_ctx_init (void)
ctx->monitoring_interval = default_monitoring_interval;
ctx->max_errors = default_max_errors;
ctx->elts = g_ptr_array_new ();
+ ctx->helts = g_hash_table_new (g_str_hash, g_str_equal);
return ctx;
}
@@ -385,11 +387,12 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
struct rspamd_monitored *
-rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
+rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx,
const gchar *line,
enum rspamd_monitored_type type,
enum rspamd_monitored_flags flags,
- const ucl_object_t *opts)
+ const ucl_object_t *opts,
+ const gchar *loc)
{
struct rspamd_monitored *m;
rspamd_cryptobox_hash_state_t st;
@@ -429,9 +432,19 @@ rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
/* Create a persistent tag */
rspamd_cryptobox_hash_init (&st, NULL, 0);
rspamd_cryptobox_hash_update (&st, m->url, strlen (m->url));
+ rspamd_cryptobox_hash_update (&st, loc, strlen (loc));
rspamd_cryptobox_hash_final (&st, cksum);
cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum));
rspamd_strlcpy (m->tag, cksum_encoded, sizeof (m->tag));
+
+ if (g_hash_table_lookup (ctx->helts, m->tag) != NULL) {
+ msg_err ("monitored error: tag collision detected for %s; "
+ "url: %s", m->tag, m->url);
+ }
+ else {
+ g_hash_table_insert (ctx->helts, m->tag, m);
+ }
+
g_free (cksum_encoded);
g_ptr_array_add (ctx->elts, m);
@@ -546,3 +559,26 @@ rspamd_monitored_ctx_destroy (struct rspamd_monitored_ctx *ctx)
g_ptr_array_free (ctx->elts, TRUE);
g_slice_free1 (sizeof (*ctx), ctx);
}
+
+struct rspamd_monitored *
+rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
+ guchar tag[RSPAMD_MONITORED_TAG_LEN])
+{
+ struct rspamd_monitored *res;
+ gchar rtag[RSPAMD_MONITORED_TAG_LEN];
+
+ rspamd_strlcpy (rtag, tag, sizeof (rtag));
+ res = g_hash_table_lookup (ctx->helts, rtag);
+
+ return res;
+}
+
+
+void
+rspamd_monitored_get_tag (struct rspamd_monitored *m,
+ guchar tag_out[RSPAMD_MONITORED_TAG_LEN])
+{
+ g_assert (m != NULL);
+
+ rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN);
+} \ No newline at end of file
diff --git a/src/libserver/monitored.h b/src/libserver/monitored.h
index e342a5886..261227d35 100644
--- a/src/libserver/monitored.h
+++ b/src/libserver/monitored.h
@@ -23,6 +23,8 @@ struct rspamd_monitored;
struct rspamd_monitored_ctx;
struct rspamd_config;
+#define RSPAMD_MONITORED_TAG_LEN 32
+
enum rspamd_monitored_type {
RSPAMD_MONITORED_DNS = 0,
};
@@ -63,12 +65,32 @@ void rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
* @param flags specific flags for monitoring
* @return new monitored object
*/
-struct rspamd_monitored *rspamd_monitored_create (
+struct rspamd_monitored *rspamd_monitored_create_ (
struct rspamd_monitored_ctx *ctx,
const gchar *line,
enum rspamd_monitored_type type,
enum rspamd_monitored_flags flags,
- const ucl_object_t *opts);
+ const ucl_object_t *opts,
+ const gchar *loc);
+#define rspamd_monitored_create(ctx, line, type, flags, opts) \
+ rspamd_monitored_create_(ctx, line, type, flags, opts, G_STRFUNC)
+
+/**
+ * Return monitored by its tag
+ * @param ctx
+ * @param tag
+ * @return
+ */
+struct rspamd_monitored * rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
+ guchar tag[RSPAMD_MONITORED_TAG_LEN]);
+
+/**
+ * Sets `tag_out` to the monitored tag
+ * @param m
+ * @param tag_out
+ */
+void rspamd_monitored_get_tag (struct rspamd_monitored *m,
+ guchar tag_out[RSPAMD_MONITORED_TAG_LEN]);
/**
* Return TRUE if monitored object is alive
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index c78f501f7..5d8008415 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -557,6 +557,7 @@ rspamd_control_default_cmd_handler (gint fd,
case RSPAMD_CONTROL_RELOAD:
case RSPAMD_CONTROL_RECOMPILE:
case RSPAMD_CONTROL_HYPERSCAN_LOADED:
+ case RSPAMD_CONTROL_MONITORED_CHANGE:
case RSPAMD_CONTROL_FUZZY_STAT:
case RSPAMD_CONTROL_FUZZY_SYNC:
case RSPAMD_CONTROL_LOG_PIPE:
@@ -807,10 +808,6 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
/* Broadcast command to all workers */
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- /*
- * We assume that cache dir is shared at the same address for all
- * workers
- */
rspamd_strlcpy (wcmd.cmd.hs_loaded.cache_dir,
cmd.cmd.hs_loaded.cache_dir,
sizeof (wcmd.cmd.hs_loaded.cache_dir));
@@ -818,6 +815,17 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
break;
+ case RSPAMD_SRV_MONITORED_CHANGE:
+ /* Broadcast command to all workers */
+ memset (&wcmd, 0, sizeof (wcmd));
+ wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+ rspamd_strlcpy (wcmd.cmd.monitored_change.tag,
+ cmd.cmd.monitored_change.tag,
+ sizeof (wcmd.cmd.monitored_change.tag));
+ wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
+ rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+ rspamd_control_hs_io_handler, NULL);
+ break;
case RSPAMD_SRV_LOG_PIPE:
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_LOG_PIPE;
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index c7a60ef58..fd1395d96 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -32,12 +32,14 @@ enum rspamd_control_type {
RSPAMD_CONTROL_LOG_PIPE,
RSPAMD_CONTROL_FUZZY_STAT,
RSPAMD_CONTROL_FUZZY_SYNC,
+ RSPAMD_CONTROL_MONITORED_CHANGE,
RSPAMD_CONTROL_MAX
};
enum rspamd_srv_type {
RSPAMD_SRV_SOCKETPAIR = 0,
RSPAMD_SRV_HYPERSCAN_LOADED,
+ RSPAMD_SRV_MONITORED_CHANGE,
RSPAMD_SRV_LOG_PIPE,
};
@@ -65,6 +67,10 @@ struct rspamd_control_command {
gboolean forced;
} hs_loaded;
struct {
+ gchar tag[32];
+ gboolean alive;
+ } monitored_change;
+ struct {
enum rspamd_log_pipe_type type;
} log_pipe;
struct {
@@ -100,6 +106,9 @@ struct rspamd_control_reply {
} hs_loaded;
struct {
guint status;
+ } monitored_change;
+ struct {
+ guint status;
} log_pipe;
struct {
guint status;
@@ -127,6 +136,10 @@ struct rspamd_srv_command {
gboolean forced;
} hs_loaded;
struct {
+ gchar tag[32];
+ gboolean alive;
+ } monitored_change;
+ struct {
enum rspamd_log_pipe_type type;
} log_pipe;
} cmd;
@@ -143,6 +156,9 @@ struct rspamd_srv_reply {
gint forced;
} hs_loaded;
struct {
+ gint status;
+ };
+ struct {
enum rspamd_log_pipe_type type;
} log_pipe;
} reply;
diff --git a/src/worker.c b/src/worker.c
index 8cf2f5fe8..12a4ae299 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -18,6 +18,7 @@
*/
#include <libserver/rspamd_control.h>
+#include <src/libserver/rspamd_control.h>
#include "config.h"
#include "libutil/util.h"
#include "libutil/map.h"
@@ -487,6 +488,39 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
return TRUE;
}
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_control_reply rep;
+ struct rspamd_monitored *m;
+ struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+ m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+ if (!m) {
+ rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+ rep.reply.monitored_change.status = 1;
+ }
+ else {
+ msg_err ("cannot find monitored by tag: %*s", 32,
+ cmd->cmd.monitored_change.tag);
+ rep.reply.monitored_change.status = 0;
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
gpointer
init_worker (struct rspamd_config *cfg)
{
@@ -618,6 +652,10 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
RSPAMD_CONTROL_LOG_PIPE,
rspamd_worker_log_pipe_handler,
worker->srv->cfg);
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_MONITORED_CHANGE,
+ rspamd_worker_monitored_handler,
+ worker->srv->cfg);
}
/*