diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-11-27 19:15:48 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-11-27 19:15:48 +0300 |
commit | 569df8dd24eb159b069ca7f5efa6a6ba3336d63d (patch) | |
tree | 54d5c6200e5ea121e9fba909b6e7c40b7e164258 /src/controller.c | |
parent | 893d6efc979fa9e23d93a43c6ab1e46bd9f7da16 (diff) | |
download | rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.tar.gz rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.zip |
* Initial release of synchronization server in controller
Diffstat (limited to 'src/controller.c')
-rw-r--r-- | src/controller.c | 151 |
1 files changed, 150 insertions, 1 deletions
diff --git a/src/controller.c b/src/controller.c index 0b79813b3..31bad233a 100644 --- a/src/controller.c +++ b/src/controller.c @@ -33,8 +33,8 @@ #include "tokenizers/tokenizers.h" #include "classifiers/classifiers.h" #include "binlog.h" +#include "statfile_sync.h" -#define CRLF "\r\n" #define END "END" CRLF /* 120 seconds for controller's IO */ @@ -50,6 +50,7 @@ enum command_type { COMMAND_LEARN, COMMAND_HELP, COMMAND_COUNTERS, + COMMAND_SYNC }; struct controller_command { @@ -75,6 +76,7 @@ static struct controller_command commands[] = { {"learn", TRUE, COMMAND_LEARN}, {"help", FALSE, COMMAND_HELP}, {"counters", FALSE, COMMAND_COUNTERS}, + {"sync", FALSE, COMMAND_SYNC} }; static GList *custom_commands = NULL; @@ -177,6 +179,141 @@ counter_write_callback (gpointer key, gpointer value, void *data) rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE); } +static gboolean +write_whole_statfile (struct controller_session *session, char *symbol, struct classifier_config *ccf) +{ + stat_file_t *statfile; + struct statfile *st; + char out_buf[BUFSIZ]; + int i; + uint64_t rev, time, len, pos; + char *out; + struct rspamd_binlog_element log_elt; + struct stat_file_block *stat_elt; + + statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf, + symbol, &st, FALSE); + if (statfile == NULL) { + return FALSE; + } + + /* Begin to copy all blocks into array */ + statfile_get_revision (statfile, &rev, (time_t *)&time); + len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element); + i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)len); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); + out = memory_pool_alloc (session->session_pool, len); + + for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) { + stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block)); + log_elt.h1 = stat_elt->hash1; + log_elt.h2 = stat_elt->hash2; + log_elt.value = stat_elt->value; + memcpy (out + pos, &log_elt, sizeof (log_elt)); + pos += sizeof (struct rspamd_binlog_element); + } + if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) { + return FALSE; + } + + return TRUE; +} + +static gboolean +process_sync_command (struct controller_session *session, char **args) +{ + char out_buf[BUFSIZ], *arg, *err_str, *symbol; + int r; + uint64_t rev, time; + struct statfile *st; + struct classifier_config *ccf; + GList *cur; + struct rspamd_binlog *binlog; + GByteArray *data = NULL; + + arg = *args; + if (!arg || *arg == '\0') { + msg_info ("process_sync_command: bad arguments to sync command, need symbol"); + return FALSE; + } + symbol = arg; + arg = *(args + 1); + if (!arg || *arg == '\0') { + msg_info ("process_sync_command: bad arguments to sync command, need revision"); + return FALSE; + } + rev = strtoull (arg, &err_str, 10); + if (err_str && *err_str != 0) { + msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg); + return FALSE; + } + arg = *(args + 2); + if (!arg || *arg == '\0') { + msg_info ("process_sync_command: bad arguments to sync command, need time"); + return FALSE; + } + time = strtoull (arg, &err_str, 10); + if (err_str && *err_str != 0) { + msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg); + return FALSE; + } + + ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol); + if (ccf == NULL) { + msg_info ("process_sync_command: bad symbol: %s", symbol); + return FALSE; + } + + cur = g_list_first (ccf->statfiles); + while (cur) { + st = cur->data; + if (strcmp (symbol, st->symbol) == 0) { + break; + } + st = NULL; + cur = g_list_next (cur); + } + if (st == NULL) { + msg_info ("process_sync_command: bad symbol: %s", symbol); + return FALSE; + } + + binlog = get_binlog_by_statfile (st); + if (binlog == NULL) { + msg_info ("process_sync_command: cannot open binlog: %s", symbol); + return FALSE; + } + + if (rev == 0) { + return write_whole_statfile (session, symbol, ccf); + } + + while (binlog_sync (binlog, rev, &time, &data)) { + r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)data->len); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE); + if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) { + if (data != NULL) { + g_free (data); + } + return FALSE; + } + rev ++; + } + + if (time == 0) { + if (data != NULL) { + g_free (data); + } + return write_whole_statfile (session, symbol, ccf); + } + + if (data != NULL) { + g_free (data); + } + + return TRUE; +} + static void process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) { @@ -344,6 +481,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control session->state = STATE_LEARN; } break; + case COMMAND_SYNC: + if (!process_sync_command (session, cmd_args)) { + r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + return; + } + break; case COMMAND_HELP: r = snprintf (out_buf, sizeof (out_buf), "Rspamd CLI commands (* - privilleged command):" CRLF @@ -619,6 +763,11 @@ start_controller (struct rspamd_worker *worker) start_time = time (NULL); + /* Start statfile synchronization */ + if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) { + msg_info ("start_controller: cannot start statfile synchronization, statfiles would not be synchronized"); + } + /* Init command completion */ for (i = 0; i < G_N_ELEMENTS (commands); i++) { comp_list = g_list_prepend (comp_list, &commands[i]); |