aboutsummaryrefslogtreecommitdiffstats
path: root/src/controller.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-11-27 19:15:48 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-11-27 19:15:48 +0300
commit569df8dd24eb159b069ca7f5efa6a6ba3336d63d (patch)
tree54d5c6200e5ea121e9fba909b6e7c40b7e164258 /src/controller.c
parent893d6efc979fa9e23d93a43c6ab1e46bd9f7da16 (diff)
downloadrspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.tar.gz
rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.zip
* Initial release of synchronization server in controller
Diffstat (limited to 'src/controller.c')
-rw-r--r--src/controller.c151
1 files changed, 150 insertions, 1 deletions
diff --git a/src/controller.c b/src/controller.c
index 0b79813b3..31bad233a 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -33,8 +33,8 @@
#include "tokenizers/tokenizers.h"
#include "classifiers/classifiers.h"
#include "binlog.h"
+#include "statfile_sync.h"
-#define CRLF "\r\n"
#define END "END" CRLF
/* 120 seconds for controller's IO */
@@ -50,6 +50,7 @@ enum command_type {
COMMAND_LEARN,
COMMAND_HELP,
COMMAND_COUNTERS,
+ COMMAND_SYNC
};
struct controller_command {
@@ -75,6 +76,7 @@ static struct controller_command commands[] = {
{"learn", TRUE, COMMAND_LEARN},
{"help", FALSE, COMMAND_HELP},
{"counters", FALSE, COMMAND_COUNTERS},
+ {"sync", FALSE, COMMAND_SYNC}
};
static GList *custom_commands = NULL;
@@ -177,6 +179,141 @@ counter_write_callback (gpointer key, gpointer value, void *data)
rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
}
+static gboolean
+write_whole_statfile (struct controller_session *session, char *symbol, struct classifier_config *ccf)
+{
+ stat_file_t *statfile;
+ struct statfile *st;
+ char out_buf[BUFSIZ];
+ int i;
+ uint64_t rev, time, len, pos;
+ char *out;
+ struct rspamd_binlog_element log_elt;
+ struct stat_file_block *stat_elt;
+
+ statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf,
+ symbol, &st, FALSE);
+ if (statfile == NULL) {
+ return FALSE;
+ }
+
+ /* Begin to copy all blocks into array */
+ statfile_get_revision (statfile, &rev, (time_t *)&time);
+ len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element);
+ i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)len);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
+ out = memory_pool_alloc (session->session_pool, len);
+
+ for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) {
+ stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block));
+ log_elt.h1 = stat_elt->hash1;
+ log_elt.h2 = stat_elt->hash2;
+ log_elt.value = stat_elt->value;
+ memcpy (out + pos, &log_elt, sizeof (log_elt));
+ pos += sizeof (struct rspamd_binlog_element);
+ }
+ if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+process_sync_command (struct controller_session *session, char **args)
+{
+ char out_buf[BUFSIZ], *arg, *err_str, *symbol;
+ int r;
+ uint64_t rev, time;
+ struct statfile *st;
+ struct classifier_config *ccf;
+ GList *cur;
+ struct rspamd_binlog *binlog;
+ GByteArray *data = NULL;
+
+ arg = *args;
+ if (!arg || *arg == '\0') {
+ msg_info ("process_sync_command: bad arguments to sync command, need symbol");
+ return FALSE;
+ }
+ symbol = arg;
+ arg = *(args + 1);
+ if (!arg || *arg == '\0') {
+ msg_info ("process_sync_command: bad arguments to sync command, need revision");
+ return FALSE;
+ }
+ rev = strtoull (arg, &err_str, 10);
+ if (err_str && *err_str != 0) {
+ msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg);
+ return FALSE;
+ }
+ arg = *(args + 2);
+ if (!arg || *arg == '\0') {
+ msg_info ("process_sync_command: bad arguments to sync command, need time");
+ return FALSE;
+ }
+ time = strtoull (arg, &err_str, 10);
+ if (err_str && *err_str != 0) {
+ msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg);
+ return FALSE;
+ }
+
+ ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol);
+ if (ccf == NULL) {
+ msg_info ("process_sync_command: bad symbol: %s", symbol);
+ return FALSE;
+ }
+
+ cur = g_list_first (ccf->statfiles);
+ while (cur) {
+ st = cur->data;
+ if (strcmp (symbol, st->symbol) == 0) {
+ break;
+ }
+ st = NULL;
+ cur = g_list_next (cur);
+ }
+ if (st == NULL) {
+ msg_info ("process_sync_command: bad symbol: %s", symbol);
+ return FALSE;
+ }
+
+ binlog = get_binlog_by_statfile (st);
+ if (binlog == NULL) {
+ msg_info ("process_sync_command: cannot open binlog: %s", symbol);
+ return FALSE;
+ }
+
+ if (rev == 0) {
+ return write_whole_statfile (session, symbol, ccf);
+ }
+
+ while (binlog_sync (binlog, rev, &time, &data)) {
+ r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
+ if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
+ if (data != NULL) {
+ g_free (data);
+ }
+ return FALSE;
+ }
+ rev ++;
+ }
+
+ if (time == 0) {
+ if (data != NULL) {
+ g_free (data);
+ }
+ return write_whole_statfile (session, symbol, ccf);
+ }
+
+ if (data != NULL) {
+ g_free (data);
+ }
+
+ return TRUE;
+}
+
static void
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
{
@@ -344,6 +481,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
session->state = STATE_LEARN;
}
break;
+ case COMMAND_SYNC:
+ if (!process_sync_command (session, cmd_args)) {
+ r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ return;
+ }
+ break;
case COMMAND_HELP:
r = snprintf (out_buf, sizeof (out_buf),
"Rspamd CLI commands (* - privilleged command):" CRLF
@@ -619,6 +763,11 @@ start_controller (struct rspamd_worker *worker)
start_time = time (NULL);
+ /* Start statfile synchronization */
+ if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+ msg_info ("start_controller: cannot start statfile synchronization, statfiles would not be synchronized");
+ }
+
/* Init command completion */
for (i = 0; i < G_N_ELEMENTS (commands); i++) {
comp_list = g_list_prepend (comp_list, &commands[i]);