aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-11-27 19:15:48 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-11-27 19:15:48 +0300
commit569df8dd24eb159b069ca7f5efa6a6ba3336d63d (patch)
tree54d5c6200e5ea121e9fba909b6e7c40b7e164258 /src
parent893d6efc979fa9e23d93a43c6ab1e46bd9f7da16 (diff)
downloadrspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.tar.gz
rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.zip
* Initial release of synchronization server in controller
Diffstat (limited to 'src')
-rw-r--r--src/binlog.c37
-rw-r--r--src/binlog.h3
-rw-r--r--src/controller.c151
-rw-r--r--src/statfile_sync.c51
4 files changed, 227 insertions, 15 deletions
diff --git a/src/binlog.c b/src/binlog.c
index 9a96d4f7b..f006cd368 100644
--- a/src/binlog.c
+++ b/src/binlog.c
@@ -156,7 +156,7 @@ binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rota
struct rspamd_binlog *new;
int len = strlen (path);
struct stat st;
-
+
new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog));
new->pool = pool;
new->rotate_time = rotate_time;
@@ -398,12 +398,12 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes)
}
gboolean
-binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep)
+binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep)
{
uint32_t metaindex_num;
struct rspamd_index_block *idxb;
struct rspamd_binlog_index *idx;
- gboolean idx_mapped = FALSE, res = TRUE;
+ gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE;
if (!log || !log->metaindex || !log->cur_idx) {
msg_info ("binlog_sync: improperly opened binlog: %s", log->filename);
@@ -412,6 +412,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G
if (*rep == NULL) {
*rep = g_malloc (sizeof (GByteArray));
+ is_first = TRUE;
}
else {
/* Unmap old fragment */
@@ -440,7 +441,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G
res = FALSE;
goto end;
}
- if ((read (log->fd, &idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
+ if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
unlock_file (log->fd, FALSE);
msg_warn ("binlog_sync: cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno));
res = FALSE;
@@ -453,10 +454,13 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G
}
/* Now check specified index */
idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN];
- if (idx->time != from_time) {
+ if (is_first && idx->time != *from_time) {
res = FALSE;
goto end;
}
+ else {
+ *from_time = idx->time;
+ }
/* Now fill reply structure */
(*rep)->len = idx->len;
@@ -537,4 +541,27 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil
return FALSE;
}
+struct rspamd_binlog*
+get_binlog_by_statfile (struct statfile *st)
+{
+ struct rspamd_binlog *log;
+
+ if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) {
+ return NULL;
+ }
+
+ if (!maybe_init_static ()) {
+ return NULL;
+ }
+ if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
+ if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) {
+ g_hash_table_insert (binlog_opened, st, log);
+ }
+ else {
+ return NULL;
+ }
+ }
+
+ return log;
+}
diff --git a/src/binlog.h b/src/binlog.h
index dbf22ee43..bb8b8f244 100644
--- a/src/binlog.h
+++ b/src/binlog.h
@@ -56,9 +56,10 @@ struct rspamd_binlog {
struct classifier_config;
struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter);
+struct rspamd_binlog* get_binlog_by_statfile (struct statfile *st);
void binlog_close (struct rspamd_binlog *log);
gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes);
-gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep);
+gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep);
gboolean maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_file_t *file, GTree *nodes);
#endif
diff --git a/src/controller.c b/src/controller.c
index 0b79813b3..31bad233a 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -33,8 +33,8 @@
#include "tokenizers/tokenizers.h"
#include "classifiers/classifiers.h"
#include "binlog.h"
+#include "statfile_sync.h"
-#define CRLF "\r\n"
#define END "END" CRLF
/* 120 seconds for controller's IO */
@@ -50,6 +50,7 @@ enum command_type {
COMMAND_LEARN,
COMMAND_HELP,
COMMAND_COUNTERS,
+ COMMAND_SYNC
};
struct controller_command {
@@ -75,6 +76,7 @@ static struct controller_command commands[] = {
{"learn", TRUE, COMMAND_LEARN},
{"help", FALSE, COMMAND_HELP},
{"counters", FALSE, COMMAND_COUNTERS},
+ {"sync", FALSE, COMMAND_SYNC}
};
static GList *custom_commands = NULL;
@@ -177,6 +179,141 @@ counter_write_callback (gpointer key, gpointer value, void *data)
rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
}
+static gboolean
+write_whole_statfile (struct controller_session *session, char *symbol, struct classifier_config *ccf)
+{
+ stat_file_t *statfile;
+ struct statfile *st;
+ char out_buf[BUFSIZ];
+ int i;
+ uint64_t rev, time, len, pos;
+ char *out;
+ struct rspamd_binlog_element log_elt;
+ struct stat_file_block *stat_elt;
+
+ statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf,
+ symbol, &st, FALSE);
+ if (statfile == NULL) {
+ return FALSE;
+ }
+
+ /* Begin to copy all blocks into array */
+ statfile_get_revision (statfile, &rev, (time_t *)&time);
+ len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element);
+ i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)len);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
+ out = memory_pool_alloc (session->session_pool, len);
+
+ for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) {
+ stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block));
+ log_elt.h1 = stat_elt->hash1;
+ log_elt.h2 = stat_elt->hash2;
+ log_elt.value = stat_elt->value;
+ memcpy (out + pos, &log_elt, sizeof (log_elt));
+ pos += sizeof (struct rspamd_binlog_element);
+ }
+ if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+process_sync_command (struct controller_session *session, char **args)
+{
+ char out_buf[BUFSIZ], *arg, *err_str, *symbol;
+ int r;
+ uint64_t rev, time;
+ struct statfile *st;
+ struct classifier_config *ccf;
+ GList *cur;
+ struct rspamd_binlog *binlog;
+ GByteArray *data = NULL;
+
+ arg = *args;
+ if (!arg || *arg == '\0') {
+ msg_info ("process_sync_command: bad arguments to sync command, need symbol");
+ return FALSE;
+ }
+ symbol = arg;
+ arg = *(args + 1);
+ if (!arg || *arg == '\0') {
+ msg_info ("process_sync_command: bad arguments to sync command, need revision");
+ return FALSE;
+ }
+ rev = strtoull (arg, &err_str, 10);
+ if (err_str && *err_str != 0) {
+ msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg);
+ return FALSE;
+ }
+ arg = *(args + 2);
+ if (!arg || *arg == '\0') {
+ msg_info ("process_sync_command: bad arguments to sync command, need time");
+ return FALSE;
+ }
+ time = strtoull (arg, &err_str, 10);
+ if (err_str && *err_str != 0) {
+ msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg);
+ return FALSE;
+ }
+
+ ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol);
+ if (ccf == NULL) {
+ msg_info ("process_sync_command: bad symbol: %s", symbol);
+ return FALSE;
+ }
+
+ cur = g_list_first (ccf->statfiles);
+ while (cur) {
+ st = cur->data;
+ if (strcmp (symbol, st->symbol) == 0) {
+ break;
+ }
+ st = NULL;
+ cur = g_list_next (cur);
+ }
+ if (st == NULL) {
+ msg_info ("process_sync_command: bad symbol: %s", symbol);
+ return FALSE;
+ }
+
+ binlog = get_binlog_by_statfile (st);
+ if (binlog == NULL) {
+ msg_info ("process_sync_command: cannot open binlog: %s", symbol);
+ return FALSE;
+ }
+
+ if (rev == 0) {
+ return write_whole_statfile (session, symbol, ccf);
+ }
+
+ while (binlog_sync (binlog, rev, &time, &data)) {
+ r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
+ if (data != NULL) {
+ g_free (data);
+ }
+ return FALSE;
+ }
+ rev ++;
+ }
+
+ if (time == 0) {
+ if (data != NULL) {
+ g_free (data);
+ }
+ return write_whole_statfile (session, symbol, ccf);
+ }
+
+ if (data != NULL) {
+ g_free (data);
+ }
+
+ return TRUE;
+}
+
static void
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
{
@@ -344,6 +481,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
session->state = STATE_LEARN;
}
break;
+ case COMMAND_SYNC:
+ if (!process_sync_command (session, cmd_args)) {
+ r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ return;
+ }
+ break;
case COMMAND_HELP:
r = snprintf (out_buf, sizeof (out_buf),
"Rspamd CLI commands (* - privilleged command):" CRLF
@@ -619,6 +763,11 @@ start_controller (struct rspamd_worker *worker)
start_time = time (NULL);
+ /* Start statfile synchronization */
+ if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+ msg_info ("start_controller: cannot start statfile synchronization, statfiles would not be synchronized");
+ }
+
/* Init command completion */
for (i = 0; i < G_N_ELEMENTS (commands); i++) {
comp_list = g_list_prepend (comp_list, &commands[i]);
diff --git a/src/statfile_sync.c b/src/statfile_sync.c
index 71be88733..d28e0d208 100644
--- a/src/statfile_sync.c
+++ b/src/statfile_sync.c
@@ -38,6 +38,7 @@
enum rspamd_sync_state {
SYNC_STATE_GREETING,
+ SYNC_STATE_READ_LINE,
SYNC_STATE_READ_REV,
SYNC_STATE_QUIT,
};
@@ -62,6 +63,25 @@ struct rspamd_sync_ctx {
uint64_t new_len;
};
+static void
+log_next_sync (const char *symbol, time_t delay)
+{
+ char outstr[200];
+ time_t t;
+ struct tm *tmp;
+ int r;
+
+ t = time(NULL);
+ t += delay;
+ tmp = localtime(&t);
+
+ if (tmp) {
+ r = snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol);
+ if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
+ msg_info (outstr);
+ }
+ }
+}
static gboolean
parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
@@ -77,6 +97,13 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
return TRUE;
}
+ /* Next check for error line */
+ if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
+ ctx->state = SYNC_STATE_QUIT;
+ ctx->is_busy = FALSE;
+ return TRUE;
+ }
+
/* Now try to extract 3 numbers from string: revision, time and length */
p = in->begin;
val = &ctx->new_rev;
@@ -137,9 +164,20 @@ static gboolean
sync_read (f_str_t * in, void *arg)
{
struct rspamd_sync_ctx *ctx = arg;
+ char buf[256];
+ uint64_t rev = 0;
+ time_t ti = 0;
switch (ctx->state) {
case SYNC_STATE_GREETING:
+ /* Skip greeting line and write sync command */
+ /* Write initial data */
+ statfile_get_revision (ctx->real_statfile, &rev, &ti);
+ rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
+ ctx->state = SYNC_STATE_READ_LINE;
+ rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+ break;
+ case SYNC_STATE_READ_LINE:
/* Try to parse line from server */
if (!parse_revision_line (ctx, in)) {
msg_info ("sync_read: cannot parse line");
@@ -170,7 +208,7 @@ sync_read (f_str_t * in, void *arg)
statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
/* Now try to read other revision or END line */
rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
- ctx->state = SYNC_STATE_GREETING;
+ ctx->state = SYNC_STATE_READ_LINE;
break;
case SYNC_STATE_QUIT:
rspamd_remove_dispatcher (ctx->dispatcher);
@@ -196,15 +234,13 @@ static void
sync_timer_callback (int fd, short what, void *ud)
{
struct rspamd_sync_ctx *ctx = ud;
- char buf[256];
- uint64_t rev = 0;
- time_t ti = 0;
/* Plan new event */
evtimer_del (&ctx->tm_ev);
ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
ctx->interval.tv_usec = 0;
evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (ctx->st->symbol, ctx->interval.tv_sec);
if (ctx->is_busy) {
/* Sync is in progress */
@@ -220,14 +256,12 @@ sync_timer_callback (int fd, short what, void *ud)
ctx->io_tv.tv_sec = IO_TIMEOUT;
ctx->io_tv.tv_usec = 0;
ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
- /* Write initial data */
- statfile_get_revision (ctx->real_statfile, &rev, &ti);
- rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
ctx->state = SYNC_STATE_GREETING;
ctx->is_busy = TRUE;
- rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+ msg_info ("sync_timer_callback: starting synchronization of %s", ctx->st->symbol);
+
}
static gboolean
@@ -254,6 +288,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
/* Now plan event for it's future executing */
evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (st->symbol, ctx->interval.tv_sec);
return TRUE;
}