diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-11-27 19:15:48 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-11-27 19:15:48 +0300 |
commit | 569df8dd24eb159b069ca7f5efa6a6ba3336d63d (patch) | |
tree | 54d5c6200e5ea121e9fba909b6e7c40b7e164258 /src | |
parent | 893d6efc979fa9e23d93a43c6ab1e46bd9f7da16 (diff) | |
download | rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.tar.gz rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.zip |
* Initial release of synchronization server in controller
Diffstat (limited to 'src')
-rw-r--r-- | src/binlog.c | 37 | ||||
-rw-r--r-- | src/binlog.h | 3 | ||||
-rw-r--r-- | src/controller.c | 151 | ||||
-rw-r--r-- | src/statfile_sync.c | 51 |
4 files changed, 227 insertions, 15 deletions
diff --git a/src/binlog.c b/src/binlog.c index 9a96d4f7b..f006cd368 100644 --- a/src/binlog.c +++ b/src/binlog.c @@ -156,7 +156,7 @@ binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rota struct rspamd_binlog *new; int len = strlen (path); struct stat st; - + new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog)); new->pool = pool; new->rotate_time = rotate_time; @@ -398,12 +398,12 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) } gboolean -binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep) +binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep) { uint32_t metaindex_num; struct rspamd_index_block *idxb; struct rspamd_binlog_index *idx; - gboolean idx_mapped = FALSE, res = TRUE; + gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE; if (!log || !log->metaindex || !log->cur_idx) { msg_info ("binlog_sync: improperly opened binlog: %s", log->filename); @@ -412,6 +412,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G if (*rep == NULL) { *rep = g_malloc (sizeof (GByteArray)); + is_first = TRUE; } else { /* Unmap old fragment */ @@ -440,7 +441,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G res = FALSE; goto end; } - if ((read (log->fd, &idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { + if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { unlock_file (log->fd, FALSE); msg_warn ("binlog_sync: cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; @@ -453,10 +454,13 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G } /* Now check specified index */ idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN]; - if (idx->time != from_time) { + if (is_first && idx->time != *from_time) { res = FALSE; goto end; } + else { + *from_time = idx->time; + } /* Now fill reply structure */ (*rep)->len = idx->len; @@ -537,4 +541,27 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil return FALSE; } +struct rspamd_binlog* +get_binlog_by_statfile (struct statfile *st) +{ + struct rspamd_binlog *log; + + if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { + return NULL; + } + + if (!maybe_init_static ()) { + return NULL; + } + if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { + if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { + g_hash_table_insert (binlog_opened, st, log); + } + else { + return NULL; + } + } + + return log; +} diff --git a/src/binlog.h b/src/binlog.h index dbf22ee43..bb8b8f244 100644 --- a/src/binlog.h +++ b/src/binlog.h @@ -56,9 +56,10 @@ struct rspamd_binlog { struct classifier_config; struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter); +struct rspamd_binlog* get_binlog_by_statfile (struct statfile *st); void binlog_close (struct rspamd_binlog *log); gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes); -gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep); +gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep); gboolean maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_file_t *file, GTree *nodes); #endif diff --git a/src/controller.c b/src/controller.c index 0b79813b3..31bad233a 100644 --- a/src/controller.c +++ b/src/controller.c @@ -33,8 +33,8 @@ #include "tokenizers/tokenizers.h" #include "classifiers/classifiers.h" #include "binlog.h" +#include "statfile_sync.h" -#define CRLF "\r\n" #define END "END" CRLF /* 120 seconds for controller's IO */ @@ -50,6 +50,7 @@ enum command_type { COMMAND_LEARN, COMMAND_HELP, COMMAND_COUNTERS, + COMMAND_SYNC }; struct controller_command { @@ -75,6 +76,7 @@ static struct controller_command commands[] = { {"learn", TRUE, COMMAND_LEARN}, {"help", FALSE, COMMAND_HELP}, {"counters", FALSE, COMMAND_COUNTERS}, + {"sync", FALSE, COMMAND_SYNC} }; static GList *custom_commands = NULL; @@ -177,6 +179,141 @@ counter_write_callback (gpointer key, gpointer value, void *data) rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE); } +static gboolean +write_whole_statfile (struct controller_session *session, char *symbol, struct classifier_config *ccf) +{ + stat_file_t *statfile; + struct statfile *st; + char out_buf[BUFSIZ]; + int i; + uint64_t rev, time, len, pos; + char *out; + struct rspamd_binlog_element log_elt; + struct stat_file_block *stat_elt; + + statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf, + symbol, &st, FALSE); + if (statfile == NULL) { + return FALSE; + } + + /* Begin to copy all blocks into array */ + statfile_get_revision (statfile, &rev, (time_t *)&time); + len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element); + i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)len); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); + out = memory_pool_alloc (session->session_pool, len); + + for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) { + stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block)); + log_elt.h1 = stat_elt->hash1; + log_elt.h2 = stat_elt->hash2; + log_elt.value = stat_elt->value; + memcpy (out + pos, &log_elt, sizeof (log_elt)); + pos += sizeof (struct rspamd_binlog_element); + } + if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) { + return FALSE; + } + + return TRUE; +} + +static gboolean +process_sync_command (struct controller_session *session, char **args) +{ + char out_buf[BUFSIZ], *arg, *err_str, *symbol; + int r; + uint64_t rev, time; + struct statfile *st; + struct classifier_config *ccf; + GList *cur; + struct rspamd_binlog *binlog; + GByteArray *data = NULL; + + arg = *args; + if (!arg || *arg == '\0') { + msg_info ("process_sync_command: bad arguments to sync command, need symbol"); + return FALSE; + } + symbol = arg; + arg = *(args + 1); + if (!arg || *arg == '\0') { + msg_info ("process_sync_command: bad arguments to sync command, need revision"); + return FALSE; + } + rev = strtoull (arg, &err_str, 10); + if (err_str && *err_str != 0) { + msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg); + return FALSE; + } + arg = *(args + 2); + if (!arg || *arg == '\0') { + msg_info ("process_sync_command: bad arguments to sync command, need time"); + return FALSE; + } + time = strtoull (arg, &err_str, 10); + if (err_str && *err_str != 0) { + msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg); + return FALSE; + } + + ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol); + if (ccf == NULL) { + msg_info ("process_sync_command: bad symbol: %s", symbol); + return FALSE; + } + + cur = g_list_first (ccf->statfiles); + while (cur) { + st = cur->data; + if (strcmp (symbol, st->symbol) == 0) { + break; + } + st = NULL; + cur = g_list_next (cur); + } + if (st == NULL) { + msg_info ("process_sync_command: bad symbol: %s", symbol); + return FALSE; + } + + binlog = get_binlog_by_statfile (st); + if (binlog == NULL) { + msg_info ("process_sync_command: cannot open binlog: %s", symbol); + return FALSE; + } + + if (rev == 0) { + return write_whole_statfile (session, symbol, ccf); + } + + while (binlog_sync (binlog, rev, &time, &data)) { + r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)data->len); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) { + if (data != NULL) { + g_free (data); + } + return FALSE; + } + rev ++; + } + + if (time == 0) { + if (data != NULL) { + g_free (data); + } + return write_whole_statfile (session, symbol, ccf); + } + + if (data != NULL) { + g_free (data); + } + + return TRUE; +} + static void process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) { @@ -344,6 +481,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control session->state = STATE_LEARN; } break; + case COMMAND_SYNC: + if (!process_sync_command (session, cmd_args)) { + r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + return; + } + break; case COMMAND_HELP: r = snprintf (out_buf, sizeof (out_buf), "Rspamd CLI commands (* - privilleged command):" CRLF @@ -619,6 +763,11 @@ start_controller (struct rspamd_worker *worker) start_time = time (NULL); + /* Start statfile synchronization */ + if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) { + msg_info ("start_controller: cannot start statfile synchronization, statfiles would not be synchronized"); + } + /* Init command completion */ for (i = 0; i < G_N_ELEMENTS (commands); i++) { comp_list = g_list_prepend (comp_list, &commands[i]); diff --git a/src/statfile_sync.c b/src/statfile_sync.c index 71be88733..d28e0d208 100644 --- a/src/statfile_sync.c +++ b/src/statfile_sync.c @@ -38,6 +38,7 @@ enum rspamd_sync_state { SYNC_STATE_GREETING, + SYNC_STATE_READ_LINE, SYNC_STATE_READ_REV, SYNC_STATE_QUIT, }; @@ -62,6 +63,25 @@ struct rspamd_sync_ctx { uint64_t new_len; }; +static void +log_next_sync (const char *symbol, time_t delay) +{ + char outstr[200]; + time_t t; + struct tm *tmp; + int r; + + t = time(NULL); + t += delay; + tmp = localtime(&t); + + if (tmp) { + r = snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol); + if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) { + msg_info (outstr); + } + } +} static gboolean parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) @@ -77,6 +97,13 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) return TRUE; } + /* Next check for error line */ + if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) { + ctx->state = SYNC_STATE_QUIT; + ctx->is_busy = FALSE; + return TRUE; + } + /* Now try to extract 3 numbers from string: revision, time and length */ p = in->begin; val = &ctx->new_rev; @@ -137,9 +164,20 @@ static gboolean sync_read (f_str_t * in, void *arg) { struct rspamd_sync_ctx *ctx = arg; + char buf[256]; + uint64_t rev = 0; + time_t ti = 0; switch (ctx->state) { case SYNC_STATE_GREETING: + /* Skip greeting line and write sync command */ + /* Write initial data */ + statfile_get_revision (ctx->real_statfile, &rev, &ti); + rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); + ctx->state = SYNC_STATE_READ_LINE; + rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); + break; + case SYNC_STATE_READ_LINE: /* Try to parse line from server */ if (!parse_revision_line (ctx, in)) { msg_info ("sync_read: cannot parse line"); @@ -170,7 +208,7 @@ sync_read (f_str_t * in, void *arg) statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time); /* Now try to read other revision or END line */ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0); - ctx->state = SYNC_STATE_GREETING; + ctx->state = SYNC_STATE_READ_LINE; break; case SYNC_STATE_QUIT: rspamd_remove_dispatcher (ctx->dispatcher); @@ -196,15 +234,13 @@ static void sync_timer_callback (int fd, short what, void *ud) { struct rspamd_sync_ctx *ctx = ud; - char buf[256]; - uint64_t rev = 0; - time_t ti = 0; /* Plan new event */ evtimer_del (&ctx->tm_ev); ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2); ctx->interval.tv_usec = 0; evtimer_add (&ctx->tm_ev, &ctx->interval); + log_next_sync (ctx->st->symbol, ctx->interval.tv_sec); if (ctx->is_busy) { /* Sync is in progress */ @@ -220,14 +256,12 @@ sync_timer_callback (int fd, short what, void *ud) ctx->io_tv.tv_sec = IO_TIMEOUT; ctx->io_tv.tv_usec = 0; ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx); - /* Write initial data */ - statfile_get_revision (ctx->real_statfile, &rev, &ti); - rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); ctx->state = SYNC_STATE_GREETING; ctx->is_busy = TRUE; - rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); + msg_info ("sync_timer_callback: starting synchronization of %s", ctx->st->symbol); + } static gboolean @@ -254,6 +288,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st) /* Now plan event for it's future executing */ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); evtimer_add (&ctx->tm_ev, &ctx->interval); + log_next_sync (st->symbol, ctx->interval.tv_sec); return TRUE; } |