struct rdns_resolver *resolver;
struct event_base *ev_base;
GPtrArray *elts;
+ GHashTable *helts;
mon_change_cb change_cb;
gpointer ud;
gdouble monitoring_interval;
struct rspamd_monitored_ctx *ctx;
struct rspamd_monitored_methods proc;
struct event periodic;
- gchar tag[MEMPOOL_UID_LEN];
+ gchar tag[RSPAMD_MONITORED_TAG_LEN];
};
#define msg_err_mon(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
ctx->monitoring_interval = default_monitoring_interval;
ctx->max_errors = default_max_errors;
ctx->elts = g_ptr_array_new ();
+ ctx->helts = g_hash_table_new (g_str_hash, g_str_equal);
return ctx;
}
struct rspamd_monitored *
-rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
+rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx,
const gchar *line,
enum rspamd_monitored_type type,
enum rspamd_monitored_flags flags,
- const ucl_object_t *opts)
+ const ucl_object_t *opts,
+ const gchar *loc)
{
struct rspamd_monitored *m;
rspamd_cryptobox_hash_state_t st;
/* Create a persistent tag */
rspamd_cryptobox_hash_init (&st, NULL, 0);
rspamd_cryptobox_hash_update (&st, m->url, strlen (m->url));
+ rspamd_cryptobox_hash_update (&st, loc, strlen (loc));
rspamd_cryptobox_hash_final (&st, cksum);
cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum));
rspamd_strlcpy (m->tag, cksum_encoded, sizeof (m->tag));
+
+ if (g_hash_table_lookup (ctx->helts, m->tag) != NULL) {
+ msg_err ("monitored error: tag collision detected for %s; "
+ "url: %s", m->tag, m->url);
+ }
+ else {
+ g_hash_table_insert (ctx->helts, m->tag, m);
+ }
+
g_free (cksum_encoded);
g_ptr_array_add (ctx->elts, m);
g_ptr_array_free (ctx->elts, TRUE);
g_slice_free1 (sizeof (*ctx), ctx);
}
+
+struct rspamd_monitored *
+rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
+ guchar tag[RSPAMD_MONITORED_TAG_LEN])
+{
+ struct rspamd_monitored *res;
+ gchar rtag[RSPAMD_MONITORED_TAG_LEN];
+
+ rspamd_strlcpy (rtag, tag, sizeof (rtag));
+ res = g_hash_table_lookup (ctx->helts, rtag);
+
+ return res;
+}
+
+
+void
+rspamd_monitored_get_tag (struct rspamd_monitored *m,
+ guchar tag_out[RSPAMD_MONITORED_TAG_LEN])
+{
+ g_assert (m != NULL);
+
+ rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN);
+}
\ No newline at end of file
struct rspamd_monitored_ctx;
struct rspamd_config;
+#define RSPAMD_MONITORED_TAG_LEN 32
+
enum rspamd_monitored_type {
RSPAMD_MONITORED_DNS = 0,
};
* @param flags specific flags for monitoring
* @return new monitored object
*/
-struct rspamd_monitored *rspamd_monitored_create (
+struct rspamd_monitored *rspamd_monitored_create_ (
struct rspamd_monitored_ctx *ctx,
const gchar *line,
enum rspamd_monitored_type type,
enum rspamd_monitored_flags flags,
- const ucl_object_t *opts);
+ const ucl_object_t *opts,
+ const gchar *loc);
+#define rspamd_monitored_create(ctx, line, type, flags, opts) \
+ rspamd_monitored_create_(ctx, line, type, flags, opts, G_STRFUNC)
+
+/**
+ * Return monitored by its tag
+ * @param ctx
+ * @param tag
+ * @return
+ */
+struct rspamd_monitored * rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
+ guchar tag[RSPAMD_MONITORED_TAG_LEN]);
+
+/**
+ * Sets `tag_out` to the monitored tag
+ * @param m
+ * @param tag_out
+ */
+void rspamd_monitored_get_tag (struct rspamd_monitored *m,
+ guchar tag_out[RSPAMD_MONITORED_TAG_LEN]);
/**
* Return TRUE if monitored object is alive
case RSPAMD_CONTROL_RELOAD:
case RSPAMD_CONTROL_RECOMPILE:
case RSPAMD_CONTROL_HYPERSCAN_LOADED:
+ case RSPAMD_CONTROL_MONITORED_CHANGE:
case RSPAMD_CONTROL_FUZZY_STAT:
case RSPAMD_CONTROL_FUZZY_SYNC:
case RSPAMD_CONTROL_LOG_PIPE:
/* Broadcast command to all workers */
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- /*
- * We assume that cache dir is shared at the same address for all
- * workers
- */
rspamd_strlcpy (wcmd.cmd.hs_loaded.cache_dir,
cmd.cmd.hs_loaded.cache_dir,
sizeof (wcmd.cmd.hs_loaded.cache_dir));
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
break;
+ case RSPAMD_SRV_MONITORED_CHANGE:
+ /* Broadcast command to all workers */
+ memset (&wcmd, 0, sizeof (wcmd));
+ wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+ rspamd_strlcpy (wcmd.cmd.monitored_change.tag,
+ cmd.cmd.monitored_change.tag,
+ sizeof (wcmd.cmd.monitored_change.tag));
+ wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
+ rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+ rspamd_control_hs_io_handler, NULL);
+ break;
case RSPAMD_SRV_LOG_PIPE:
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_LOG_PIPE;
RSPAMD_CONTROL_LOG_PIPE,
RSPAMD_CONTROL_FUZZY_STAT,
RSPAMD_CONTROL_FUZZY_SYNC,
+ RSPAMD_CONTROL_MONITORED_CHANGE,
RSPAMD_CONTROL_MAX
};
enum rspamd_srv_type {
RSPAMD_SRV_SOCKETPAIR = 0,
RSPAMD_SRV_HYPERSCAN_LOADED,
+ RSPAMD_SRV_MONITORED_CHANGE,
RSPAMD_SRV_LOG_PIPE,
};
gchar cache_dir[CONTROL_PATHLEN];
gboolean forced;
} hs_loaded;
+ struct {
+ gchar tag[32];
+ gboolean alive;
+ } monitored_change;
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
struct {
guint status;
} hs_loaded;
+ struct {
+ guint status;
+ } monitored_change;
struct {
guint status;
} log_pipe;
gchar cache_dir[CONTROL_PATHLEN];
gboolean forced;
} hs_loaded;
+ struct {
+ gchar tag[32];
+ gboolean alive;
+ } monitored_change;
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
struct {
gint forced;
} hs_loaded;
+ struct {
+ gint status;
+ };
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
*/
#include <libserver/rspamd_control.h>
+#include <src/libserver/rspamd_control.h>
#include "config.h"
#include "libutil/util.h"
#include "libutil/map.h"
return TRUE;
}
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_control_reply rep;
+ struct rspamd_monitored *m;
+ struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+ m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+ if (!m) {
+ rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+ rep.reply.monitored_change.status = 1;
+ }
+ else {
+ msg_err ("cannot find monitored by tag: %*s", 32,
+ cmd->cmd.monitored_change.tag);
+ rep.reply.monitored_change.status = 0;
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
gpointer
init_worker (struct rspamd_config *cfg)
{
RSPAMD_CONTROL_LOG_PIPE,
rspamd_worker_log_pipe_handler,
worker->srv->cfg);
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_MONITORED_CHANGE,
+ rspamd_worker_monitored_handler,
+ worker->srv->cfg);
}
/*