aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/statfile_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/statfile_sync.c')
-rw-r--r--src/libserver/statfile_sync.c240
1 files changed, 144 insertions, 96 deletions
diff --git a/src/libserver/statfile_sync.c b/src/libserver/statfile_sync.c
index 1121658fa..262176880 100644
--- a/src/libserver/statfile_sync.c
+++ b/src/libserver/statfile_sync.c
@@ -56,63 +56,69 @@ struct rspamd_sync_ctx {
enum rspamd_sync_state state;
gboolean is_busy;
- guint64 new_rev;
- guint64 new_time;
- guint64 new_len;
+ guint64 new_rev;
+ guint64 new_time;
+ guint64 new_len;
};
static void
log_next_sync (const gchar *symbol, time_t delay)
{
- gchar outstr[200];
- time_t t;
+ gchar outstr[200];
+ time_t t;
struct tm *tmp;
- gint r;
+ gint r;
- t = time(NULL);
+ t = time (NULL);
t += delay;
- tmp = localtime(&t);
+ tmp = localtime (&t);
if (tmp) {
- r = rspamd_snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol);
- if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
+ r = rspamd_snprintf (outstr,
+ sizeof (outstr),
+ "statfile_sync: next sync of %s at ",
+ symbol);
+ if ((r = strftime (outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
msg_info (outstr);
}
}
}
-static gboolean
+static gboolean
parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
{
- guint i, state = 0;
- gchar *p, *c, numbuf[sizeof("18446744073709551615")];
- guint64 *val;
+ guint i, state = 0;
+ gchar *p, *c, numbuf[sizeof("18446744073709551615")];
+ guint64 *val;
/* First of all try to find END line */
- if (in->len >= sizeof ("END") - 1 && memcmp (in->begin, "END", sizeof ("END") - 1) == 0) {
+ if (in->len >= sizeof ("END") - 1 &&
+ memcmp (in->begin, "END", sizeof ("END") - 1) == 0) {
ctx->state = SYNC_STATE_QUIT;
ctx->is_busy = FALSE;
return TRUE;
}
/* Next check for error line */
- if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
+ if (in->len >= sizeof ("FAIL") - 1 &&
+ memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
ctx->state = SYNC_STATE_QUIT;
ctx->is_busy = FALSE;
return TRUE;
}
-
+
/* Now try to extract 3 numbers from string: revision, time and length */
p = in->begin;
val = &ctx->new_rev;
- for (i = 0; i < in->len; i ++, p ++) {
+ for (i = 0; i < in->len; i++, p++) {
if (g_ascii_isspace (*p) || i == in->len - 1) {
if (state == 1) {
if (i == in->len - 1) {
/* One more character */
- p ++;
+ p++;
}
- rspamd_strlcpy (numbuf, c, MIN (p - c + 1, (gint)sizeof (numbuf)));
+ rspamd_strlcpy (numbuf, c, MIN (p - c + 1,
+ (gint)sizeof (numbuf)));
errno = 0;
*val = strtoull (numbuf, NULL, 10);
if (errno != 0) {
@@ -144,94 +150,113 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
return ((val == &ctx->new_len));
}
-static gboolean
+static gboolean
read_blocks (struct rspamd_sync_ctx *ctx, f_str_t *in)
{
struct rspamd_binlog_element *elt;
- guint i;
-
+ guint i;
+
statfile_pool_lock_file (ctx->pool, ctx->real_statfile);
elt = (struct rspamd_binlog_element *)in->begin;
- for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i ++, elt ++) {
- statfile_pool_set_block (ctx->pool, ctx->real_statfile, elt->h1, elt->h2, ctx->new_time, elt->value);
+ for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i++,
+ elt++) {
+ statfile_pool_set_block (ctx->pool,
+ ctx->real_statfile,
+ elt->h1,
+ elt->h2,
+ ctx->new_time,
+ elt->value);
}
statfile_pool_unlock_file (ctx->pool, ctx->real_statfile);
return TRUE;
}
-static gboolean
+static gboolean
sync_read (f_str_t * in, void *arg)
{
struct rspamd_sync_ctx *ctx = arg;
- gchar buf[256];
- guint64 rev = 0;
- time_t ti = 0;
+ gchar buf[256];
+ guint64 rev = 0;
+ time_t ti = 0;
if (in->len == 0) {
/* Skip empty lines */
return TRUE;
}
switch (ctx->state) {
- case SYNC_STATE_GREETING:
- /* Skip greeting line and write sync command */
- /* Write initial data */
- statfile_get_revision (ctx->real_statfile, &rev, &ti);
- rev = rspamd_snprintf (buf, sizeof (buf), "sync %s %uL %T" CRLF, ctx->st->symbol, rev, ti);
- ctx->state = SYNC_STATE_READ_LINE;
- return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
- break;
- case SYNC_STATE_READ_LINE:
- /* Try to parse line from server */
- if (!parse_revision_line (ctx, in)) {
- msg_info ("cannot parse line of length %z: '%*s'", in->len, (gint)in->len, in->begin);
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- return FALSE;
- }
- else if (ctx->state != SYNC_STATE_QUIT) {
- if (ctx->new_len > 0) {
- ctx->state = SYNC_STATE_READ_REV;
- rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_CHARACTER, ctx->new_len);
- }
- }
- else {
- /* Quit this session */
- msg_info ("sync ended for: %s", ctx->st->symbol);
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- /* Immediately return from callback */
- return FALSE;
- }
- break;
- case SYNC_STATE_READ_REV:
- /* In now contains all blocks of specified revision, so we can read them directly */
- if (!read_blocks (ctx, in)) {
- msg_info ("cannot read blocks");
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- return FALSE;
+ case SYNC_STATE_GREETING:
+ /* Skip greeting line and write sync command */
+ /* Write initial data */
+ statfile_get_revision (ctx->real_statfile, &rev, &ti);
+ rev = rspamd_snprintf (buf,
+ sizeof (buf),
+ "sync %s %uL %T" CRLF,
+ ctx->st->symbol,
+ rev,
+ ti);
+ ctx->state = SYNC_STATE_READ_LINE;
+ return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE,
+ FALSE);
+ break;
+ case SYNC_STATE_READ_LINE:
+ /* Try to parse line from server */
+ if (!parse_revision_line (ctx, in)) {
+ msg_info ("cannot parse line of length %z: '%*s'",
+ in->len,
+ (gint)in->len,
+ in->begin);
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+ else if (ctx->state != SYNC_STATE_QUIT) {
+ if (ctx->new_len > 0) {
+ ctx->state = SYNC_STATE_READ_REV;
+ rspamd_set_dispatcher_policy (ctx->dispatcher,
+ BUFFER_CHARACTER,
+ ctx->new_len);
}
- statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
- msg_info ("set new revision: %uL, readed %z bytes", ctx->new_rev, in->len);
- /* Now try to read other revision or END line */
- ctx->state = SYNC_STATE_READ_LINE;
- rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
- break;
- case SYNC_STATE_QUIT:
+ }
+ else {
+ /* Quit this session */
+ msg_info ("sync ended for: %s", ctx->st->symbol);
close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
+ /* Immediately return from callback */
return FALSE;
+ }
+ break;
+ case SYNC_STATE_READ_REV:
+ /* In now contains all blocks of specified revision, so we can read them directly */
+ if (!read_blocks (ctx, in)) {
+ msg_info ("cannot read blocks");
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+ statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+ msg_info ("set new revision: %uL, readed %z bytes",
+ ctx->new_rev,
+ in->len);
+ /* Now try to read other revision or END line */
+ ctx->state = SYNC_STATE_READ_LINE;
+ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
+ break;
+ case SYNC_STATE_QUIT:
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
}
return TRUE;
}
-static void
+static void
sync_err (GError *err, void *arg)
{
struct rspamd_sync_ctx *ctx = arg;
@@ -248,30 +273,40 @@ sync_timer_callback (gint fd, short what, void *ud)
{
struct rspamd_sync_ctx *ctx = ud;
guint32 jittered_interval;
-
+
/* Plan new event */
evtimer_del (&ctx->tm_ev);
/* Add some jittering for synchronization */
- jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
+ jittered_interval = g_random_int_range (ctx->sync_interval,
+ ctx->sync_interval * 2);
msec_to_tv (jittered_interval, &ctx->interval);
evtimer_add (&ctx->tm_ev, &ctx->interval);
log_next_sync (ctx->st->symbol, ctx->interval.tv_sec);
-
+
if (ctx->is_busy) {
/* Sync is in progress */
msg_info ("syncronization process is in progress, do not start new one");
return;
}
- if ((ctx->sock = make_universal_socket (ctx->st->binlog->master_addr, ctx->st->binlog->master_port,
- SOCK_STREAM, TRUE, FALSE, TRUE)) == -1) {
+ if ((ctx->sock =
+ make_universal_socket (ctx->st->binlog->master_addr,
+ ctx->st->binlog->master_port,
+ SOCK_STREAM, TRUE, FALSE, TRUE)) == -1) {
msg_info ("cannot connect to %s", ctx->st->binlog->master_addr);
return;
}
/* Now create and activate dispatcher */
msec_to_tv (ctx->timeout, &ctx->io_tv);
- ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
-
+ ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base,
+ ctx->sock,
+ BUFFER_LINE,
+ sync_read,
+ NULL,
+ sync_err,
+ &ctx->io_tv,
+ ctx);
+
ctx->state = SYNC_STATE_GREETING;
ctx->is_busy = TRUE;
@@ -280,29 +315,39 @@ sync_timer_callback (gint fd, short what, void *ud)
}
static gboolean
-add_statfile_watch (statfile_pool_t *pool, struct rspamd_statfile_config *st, struct rspamd_config *cfg, struct event_base *ev_base)
+add_statfile_watch (statfile_pool_t *pool,
+ struct rspamd_statfile_config *st,
+ struct rspamd_config *cfg,
+ struct event_base *ev_base)
{
struct rspamd_sync_ctx *ctx;
guint32 jittered_interval;
-
+
if (st->binlog->master_addr != NULL) {
- ctx = rspamd_mempool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+ ctx =
+ rspamd_mempool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
ctx->st = st;
ctx->timeout = cfg->statfile_sync_timeout;
ctx->sync_interval = cfg->statfile_sync_interval;
ctx->ev_base = ev_base;
/* Add some jittering for synchronization */
- jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
+ jittered_interval = g_random_int_range (ctx->sync_interval,
+ ctx->sync_interval * 2);
msec_to_tv (jittered_interval, &ctx->interval);
/* Open statfile and attach it to pool */
- if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
- if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+ if ((ctx->real_statfile =
+ statfile_pool_is_open (pool, st->path)) == NULL) {
+ if ((ctx->real_statfile =
+ statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
msg_warn ("cannot open %s", st->path);
if (statfile_pool_create (pool, st->path, st->size) == -1) {
msg_err ("cannot create statfile %s", st->path);
return FALSE;
}
- ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
+ ctx->real_statfile = statfile_pool_open (pool,
+ st->path,
+ st->size,
+ FALSE);
}
}
/* Now plan event for it's future executing */
@@ -312,21 +357,24 @@ add_statfile_watch (statfile_pool_t *pool, struct rspamd_statfile_config *st, st
log_next_sync (st->symbol, ctx->interval.tv_sec);
}
else {
- msg_err ("cannot add statfile watch for statfile %s: no master defined", st->symbol);
+ msg_err ("cannot add statfile watch for statfile %s: no master defined",
+ st->symbol);
return FALSE;
}
return TRUE;
}
-gboolean
-start_statfile_sync (statfile_pool_t *pool, struct rspamd_config *cfg, struct event_base *ev_base)
+gboolean
+start_statfile_sync (statfile_pool_t *pool,
+ struct rspamd_config *cfg,
+ struct event_base *ev_base)
{
GList *cur, *l;
struct rspamd_classifier_config *cl;
struct rspamd_statfile_config *st;
- /*
+ /*
* First of all walk through all classifiers and find those statfiles
* for which we should do sync (slave affinity)
*/