Browse Source

[Rework] Further steps towards one process monitoring

tags/1.6.2
Vsevolod Stakhov 7 years ago
parent
commit
fd20fa9f63
5 changed files with 129 additions and 9 deletions
  1. 39
    3
      src/libserver/monitored.c
  2. 24
    2
      src/libserver/monitored.h
  3. 12
    4
      src/libserver/rspamd_control.c
  4. 16
    0
      src/libserver/rspamd_control.h
  5. 38
    0
      src/worker.c

+ 39
- 3
src/libserver/monitored.c View File

@@ -39,6 +39,7 @@ struct rspamd_monitored_ctx {
struct rdns_resolver *resolver;
struct event_base *ev_base;
GPtrArray *elts;
GHashTable *helts;
mon_change_cb change_cb;
gpointer ud;
gdouble monitoring_interval;
@@ -61,7 +62,7 @@ struct rspamd_monitored {
struct rspamd_monitored_ctx *ctx;
struct rspamd_monitored_methods proc;
struct event periodic;
gchar tag[MEMPOOL_UID_LEN];
gchar tag[RSPAMD_MONITORED_TAG_LEN];
};

#define msg_err_mon(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -352,6 +353,7 @@ rspamd_monitored_ctx_init (void)
ctx->monitoring_interval = default_monitoring_interval;
ctx->max_errors = default_max_errors;
ctx->elts = g_ptr_array_new ();
ctx->helts = g_hash_table_new (g_str_hash, g_str_equal);

return ctx;
}
@@ -385,11 +387,12 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,


struct rspamd_monitored *
rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx,
const gchar *line,
enum rspamd_monitored_type type,
enum rspamd_monitored_flags flags,
const ucl_object_t *opts)
const ucl_object_t *opts,
const gchar *loc)
{
struct rspamd_monitored *m;
rspamd_cryptobox_hash_state_t st;
@@ -429,9 +432,19 @@ rspamd_monitored_create (struct rspamd_monitored_ctx *ctx,
/* Create a persistent tag */
rspamd_cryptobox_hash_init (&st, NULL, 0);
rspamd_cryptobox_hash_update (&st, m->url, strlen (m->url));
rspamd_cryptobox_hash_update (&st, loc, strlen (loc));
rspamd_cryptobox_hash_final (&st, cksum);
cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum));
rspamd_strlcpy (m->tag, cksum_encoded, sizeof (m->tag));

if (g_hash_table_lookup (ctx->helts, m->tag) != NULL) {
msg_err ("monitored error: tag collision detected for %s; "
"url: %s", m->tag, m->url);
}
else {
g_hash_table_insert (ctx->helts, m->tag, m);
}

g_free (cksum_encoded);

g_ptr_array_add (ctx->elts, m);
@@ -546,3 +559,26 @@ rspamd_monitored_ctx_destroy (struct rspamd_monitored_ctx *ctx)
g_ptr_array_free (ctx->elts, TRUE);
g_slice_free1 (sizeof (*ctx), ctx);
}

struct rspamd_monitored *
rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
guchar tag[RSPAMD_MONITORED_TAG_LEN])
{
struct rspamd_monitored *res;
gchar rtag[RSPAMD_MONITORED_TAG_LEN];

rspamd_strlcpy (rtag, tag, sizeof (rtag));
res = g_hash_table_lookup (ctx->helts, rtag);

return res;
}


void
rspamd_monitored_get_tag (struct rspamd_monitored *m,
guchar tag_out[RSPAMD_MONITORED_TAG_LEN])
{
g_assert (m != NULL);

rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN);
}

+ 24
- 2
src/libserver/monitored.h View File

@@ -23,6 +23,8 @@ struct rspamd_monitored;
struct rspamd_monitored_ctx;
struct rspamd_config;

#define RSPAMD_MONITORED_TAG_LEN 32

enum rspamd_monitored_type {
RSPAMD_MONITORED_DNS = 0,
};
@@ -63,12 +65,32 @@ void rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
* @param flags specific flags for monitoring
* @return new monitored object
*/
struct rspamd_monitored *rspamd_monitored_create (
struct rspamd_monitored *rspamd_monitored_create_ (
struct rspamd_monitored_ctx *ctx,
const gchar *line,
enum rspamd_monitored_type type,
enum rspamd_monitored_flags flags,
const ucl_object_t *opts);
const ucl_object_t *opts,
const gchar *loc);
#define rspamd_monitored_create(ctx, line, type, flags, opts) \
rspamd_monitored_create_(ctx, line, type, flags, opts, G_STRFUNC)

/**
* Return monitored by its tag
* @param ctx
* @param tag
* @return
*/
struct rspamd_monitored * rspamd_monitored_by_tag (struct rspamd_monitored_ctx *ctx,
guchar tag[RSPAMD_MONITORED_TAG_LEN]);

/**
* Sets `tag_out` to the monitored tag
* @param m
* @param tag_out
*/
void rspamd_monitored_get_tag (struct rspamd_monitored *m,
guchar tag_out[RSPAMD_MONITORED_TAG_LEN]);

/**
* Return TRUE if monitored object is alive

+ 12
- 4
src/libserver/rspamd_control.c View File

@@ -557,6 +557,7 @@ rspamd_control_default_cmd_handler (gint fd,
case RSPAMD_CONTROL_RELOAD:
case RSPAMD_CONTROL_RECOMPILE:
case RSPAMD_CONTROL_HYPERSCAN_LOADED:
case RSPAMD_CONTROL_MONITORED_CHANGE:
case RSPAMD_CONTROL_FUZZY_STAT:
case RSPAMD_CONTROL_FUZZY_SYNC:
case RSPAMD_CONTROL_LOG_PIPE:
@@ -807,10 +808,6 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
/* Broadcast command to all workers */
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
/*
* We assume that cache dir is shared at the same address for all
* workers
*/
rspamd_strlcpy (wcmd.cmd.hs_loaded.cache_dir,
cmd.cmd.hs_loaded.cache_dir,
sizeof (wcmd.cmd.hs_loaded.cache_dir));
@@ -818,6 +815,17 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
break;
case RSPAMD_SRV_MONITORED_CHANGE:
/* Broadcast command to all workers */
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE;
rspamd_strlcpy (wcmd.cmd.monitored_change.tag,
cmd.cmd.monitored_change.tag,
sizeof (wcmd.cmd.monitored_change.tag));
wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
break;
case RSPAMD_SRV_LOG_PIPE:
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_LOG_PIPE;

+ 16
- 0
src/libserver/rspamd_control.h View File

@@ -32,12 +32,14 @@ enum rspamd_control_type {
RSPAMD_CONTROL_LOG_PIPE,
RSPAMD_CONTROL_FUZZY_STAT,
RSPAMD_CONTROL_FUZZY_SYNC,
RSPAMD_CONTROL_MONITORED_CHANGE,
RSPAMD_CONTROL_MAX
};

enum rspamd_srv_type {
RSPAMD_SRV_SOCKETPAIR = 0,
RSPAMD_SRV_HYPERSCAN_LOADED,
RSPAMD_SRV_MONITORED_CHANGE,
RSPAMD_SRV_LOG_PIPE,
};

@@ -64,6 +66,10 @@ struct rspamd_control_command {
gchar cache_dir[CONTROL_PATHLEN];
gboolean forced;
} hs_loaded;
struct {
gchar tag[32];
gboolean alive;
} monitored_change;
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
@@ -98,6 +104,9 @@ struct rspamd_control_reply {
struct {
guint status;
} hs_loaded;
struct {
guint status;
} monitored_change;
struct {
guint status;
} log_pipe;
@@ -126,6 +135,10 @@ struct rspamd_srv_command {
gchar cache_dir[CONTROL_PATHLEN];
gboolean forced;
} hs_loaded;
struct {
gchar tag[32];
gboolean alive;
} monitored_change;
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
@@ -142,6 +155,9 @@ struct rspamd_srv_reply {
struct {
gint forced;
} hs_loaded;
struct {
gint status;
};
struct {
enum rspamd_log_pipe_type type;
} log_pipe;

+ 38
- 0
src/worker.c View File

@@ -18,6 +18,7 @@
*/

#include <libserver/rspamd_control.h>
#include <src/libserver/rspamd_control.h>
#include "config.h"
#include "libutil/util.h"
#include "libutil/map.h"
@@ -487,6 +488,39 @@ rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
return TRUE;
}

static gboolean
rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
gint attached_fd,
struct rspamd_control_command *cmd,
gpointer ud)
{
struct rspamd_control_reply rep;
struct rspamd_monitored *m;
struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;

memset (&rep, 0, sizeof (rep));
rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);

if (!m) {
rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
rep.reply.monitored_change.status = 1;
}
else {
msg_err ("cannot find monitored by tag: %*s", 32,
cmd->cmd.monitored_change.tag);
rep.reply.monitored_change.status = 0;
}

if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
strerror (errno));
}

return TRUE;
}

gpointer
init_worker (struct rspamd_config *cfg)
{
@@ -618,6 +652,10 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
RSPAMD_CONTROL_LOG_PIPE,
rspamd_worker_log_pipe_handler,
worker->srv->cfg);
rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_MONITORED_CHANGE,
rspamd_worker_monitored_handler,
worker->srv->cfg);
}

/*

Loading…
Cancel
Save