]> source.dussan.org Git - rspamd.git/commitdiff
* Initial release of synchronization server in controller
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 27 Nov 2009 16:15:48 +0000 (19:15 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 27 Nov 2009 16:15:48 +0000 (19:15 +0300)
src/binlog.c
src/binlog.h
src/controller.c
src/statfile_sync.c

index 9a96d4f7be49b0ee2ac772ea128545f3f21faed1..f006cd3687e81b77b0bc363759f25b90d5cf0279 100644 (file)
@@ -156,7 +156,7 @@ binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rota
        struct rspamd_binlog *new;
        int len = strlen (path);
        struct stat st;
-       
+
        new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog));
        new->pool = pool;
        new->rotate_time = rotate_time;
@@ -398,12 +398,12 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes)
 }
 
 gboolean 
-binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep)
+binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep)
 {
        uint32_t metaindex_num;
        struct rspamd_index_block *idxb;
        struct rspamd_binlog_index *idx;
-       gboolean idx_mapped = FALSE, res = TRUE;
+       gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE;
 
        if (!log || !log->metaindex || !log->cur_idx) {
                msg_info ("binlog_sync: improperly opened binlog: %s", log->filename);
@@ -412,6 +412,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G
 
        if (*rep == NULL) {
                *rep = g_malloc (sizeof (GByteArray));
+               is_first = TRUE;
        }
        else {
                /* Unmap old fragment */
@@ -440,7 +441,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G
                        res = FALSE;
                        goto end;
                }
-               if ((read (log->fd, &idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
+               if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
                        unlock_file (log->fd, FALSE);
                        msg_warn ("binlog_sync: cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno));
                        res = FALSE;
@@ -453,10 +454,13 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G
        }
        /* Now check specified index */
        idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN];
-       if (idx->time != from_time) {
+       if (is_first && idx->time != *from_time) {
                res = FALSE;
                goto end;
        }
+       else {
+               *from_time = idx->time;
+       }
 
        /* Now fill reply structure */
        (*rep)->len = idx->len;
@@ -537,4 +541,27 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil
        return FALSE;
 }
 
+struct rspamd_binlog* 
+get_binlog_by_statfile (struct statfile *st)
+{
+       struct rspamd_binlog *log;
+
+       if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) {
+               return NULL;
+       }
+
+       if (!maybe_init_static ()) {
+               return NULL;
+       }
 
+       if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
+               if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) {
+                       g_hash_table_insert (binlog_opened, st, log);
+               }
+               else {
+                       return NULL;
+               }
+       }
+       
+       return log;
+}
index dbf22ee43eaa72d82ef01f88bd2036998d890ea8..bb8b8f244a1fe8834560bf51f027b5f0944c1daa 100644 (file)
@@ -56,9 +56,10 @@ struct rspamd_binlog {
 struct classifier_config;
 
 struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter);
+struct rspamd_binlog* get_binlog_by_statfile (struct statfile *st);
 void binlog_close (struct rspamd_binlog *log);
 gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes);
-gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep);
+gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep);
 gboolean maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_file_t *file, GTree *nodes);
 
 #endif
index 0b79813b3ad423109fd65a2be1fe8222b3a1a40d..31bad233a0cbe5aceacecf679536dee34e1c9121 100644 (file)
@@ -33,8 +33,8 @@
 #include "tokenizers/tokenizers.h"
 #include "classifiers/classifiers.h"
 #include "binlog.h"
+#include "statfile_sync.h"
 
-#define CRLF "\r\n"
 #define END "END" CRLF
 
 /* 120 seconds for controller's IO */
@@ -50,6 +50,7 @@ enum command_type {
        COMMAND_LEARN,
        COMMAND_HELP,
        COMMAND_COUNTERS,
+       COMMAND_SYNC
 };
 
 struct controller_command {
@@ -75,6 +76,7 @@ static struct controller_command commands[] = {
        {"learn", TRUE, COMMAND_LEARN},
        {"help", FALSE, COMMAND_HELP},
        {"counters", FALSE, COMMAND_COUNTERS},
+       {"sync", FALSE, COMMAND_SYNC}
 };
 
 static GList                   *custom_commands = NULL;
@@ -177,6 +179,141 @@ counter_write_callback (gpointer key, gpointer value, void *data)
        rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
 }
 
+static gboolean
+write_whole_statfile (struct controller_session *session, char *symbol, struct classifier_config *ccf)
+{
+       stat_file_t                    *statfile;
+       struct statfile                *st;
+       char                            out_buf[BUFSIZ];
+       int                             i;
+       uint64_t                        rev, time, len, pos;
+       char                           *out;
+       struct rspamd_binlog_element    log_elt;
+       struct stat_file_block         *stat_elt;
+
+       statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf,
+                                               symbol, &st, FALSE);
+       if (statfile == NULL) {
+               return FALSE;
+       }
+       
+       /* Begin to copy all blocks into array */
+       statfile_get_revision (statfile, &rev, (time_t *)&time);
+       len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element);
+       i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)len);
+       rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
+       out = memory_pool_alloc (session->session_pool, len);
+
+       for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) {
+               stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block));
+               log_elt.h1 = stat_elt->hash1;
+               log_elt.h2 = stat_elt->hash2;
+               log_elt.value = stat_elt->value;
+               memcpy (out + pos, &log_elt, sizeof (log_elt));
+               pos += sizeof (struct rspamd_binlog_element);
+       }
+       if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) {
+               return FALSE;
+       }
+       
+       return TRUE;
+}
+
+static gboolean
+process_sync_command (struct controller_session *session, char **args)
+{
+       char                            out_buf[BUFSIZ], *arg, *err_str, *symbol;
+       int                             r;
+       uint64_t                        rev, time;
+       struct statfile                *st;
+       struct classifier_config       *ccf;
+       GList                          *cur;
+       struct rspamd_binlog           *binlog;
+       GByteArray                     *data = NULL;
+
+       arg = *args;
+       if (!arg || *arg == '\0') {
+               msg_info ("process_sync_command: bad arguments to sync command, need symbol");
+               return FALSE;
+       }
+       symbol = arg;
+       arg = *(args + 1);
+       if (!arg || *arg == '\0') {
+               msg_info ("process_sync_command: bad arguments to sync command, need revision");
+               return FALSE;
+       }
+       rev = strtoull (arg, &err_str, 10);
+       if (err_str && *err_str != 0) {
+               msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg);
+               return FALSE;
+       }
+       arg = *(args + 2);
+       if (!arg || *arg == '\0') {
+               msg_info ("process_sync_command: bad arguments to sync command, need time");
+               return FALSE;
+       }
+       time = strtoull (arg, &err_str, 10);
+       if (err_str && *err_str != 0) {
+               msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg);
+               return FALSE;
+       }
+
+       ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol);
+       if (ccf == NULL) {
+               msg_info ("process_sync_command: bad symbol: %s", symbol);
+               return FALSE;
+       }
+       
+       cur = g_list_first (ccf->statfiles);
+       while (cur) {
+               st = cur->data;
+               if (strcmp (symbol, st->symbol) == 0) {
+                       break;
+               }
+               st = NULL;
+               cur = g_list_next (cur);
+       }
+    if (st == NULL) {
+               msg_info ("process_sync_command: bad symbol: %s", symbol);
+        return FALSE;
+    }
+       
+       binlog = get_binlog_by_statfile (st);
+       if (binlog == NULL) {
+               msg_info ("process_sync_command: cannot open binlog: %s", symbol);
+        return FALSE;
+       }
+       
+       if (rev == 0) {
+               return write_whole_statfile (session, symbol, ccf);
+       }
+
+       while (binlog_sync (binlog, rev, &time, &data)) {
+               r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
+               rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE);
+               if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
+                       if (data != NULL) {
+                               g_free (data);
+                       }
+                       return FALSE;
+               }
+               rev ++;
+       }
+
+       if (time == 0) {
+               if (data != NULL) {
+                       g_free (data);
+               }
+               return write_whole_statfile (session, symbol, ccf);
+       }
+
+       if (data != NULL) {
+               g_free (data);
+       }
+
+       return TRUE;
+}
+
 static void
 process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
 {
@@ -344,6 +481,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                        session->state = STATE_LEARN;
                }
                break;
+       case COMMAND_SYNC:
+               if (!process_sync_command (session, cmd_args)) {
+                       r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF);
+                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+                       return;
+               }
+               break;
        case COMMAND_HELP:
                r = snprintf (out_buf, sizeof (out_buf),
                        "Rspamd CLI commands (* - privilleged command):" CRLF
@@ -619,6 +763,11 @@ start_controller (struct rspamd_worker *worker)
 
        start_time = time (NULL);
 
+       /* Start statfile synchronization */
+       if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+               msg_info ("start_controller: cannot start statfile synchronization, statfiles would not be synchronized");
+       }
+
        /* Init command completion */
        for (i = 0; i < G_N_ELEMENTS (commands); i++) {
                comp_list = g_list_prepend (comp_list, &commands[i]);
index 71be8873320eec5e4f945fc2564ab42fbe056ea8..d28e0d208fdbf1ff035c4083267144cbc58062d4 100644 (file)
@@ -38,6 +38,7 @@
 
 enum rspamd_sync_state {
        SYNC_STATE_GREETING,
+       SYNC_STATE_READ_LINE,
        SYNC_STATE_READ_REV,
        SYNC_STATE_QUIT,
 };
@@ -62,6 +63,25 @@ struct rspamd_sync_ctx {
        uint64_t new_len;
 };
 
+static void
+log_next_sync (const char *symbol, time_t delay)
+{
+       char outstr[200];
+    time_t t;
+       struct tm *tmp;
+       int r;
+
+       t = time(NULL);
+       t += delay;
+    tmp = localtime(&t);
+
+       if (tmp) {
+               r = snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol);
+               if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
+                       msg_info (outstr);
+               }
+       }
+}
 
 static                          gboolean
 parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
@@ -77,6 +97,13 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
                return TRUE;
        }
 
+       /* Next check for error line */
+       if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
+               ctx->state = SYNC_STATE_QUIT;
+               ctx->is_busy = FALSE;
+               return TRUE;
+       }
+
        /* Now try to extract 3 numbers from string: revision, time and length */
        p = in->begin;
        val = &ctx->new_rev;
@@ -137,9 +164,20 @@ static                          gboolean
 sync_read (f_str_t * in, void *arg)
 {
        struct rspamd_sync_ctx *ctx = arg;
+       char                    buf[256];
+       uint64_t                rev = 0;
+       time_t                  ti = 0;
 
        switch (ctx->state) {
                case SYNC_STATE_GREETING:
+                       /* Skip greeting line and write sync command */
+                       /* Write initial data */
+                       statfile_get_revision (ctx->real_statfile, &rev, &ti);
+                       rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
+                       ctx->state = SYNC_STATE_READ_LINE;
+                       rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);      
+                       break;
+               case SYNC_STATE_READ_LINE:
                        /* Try to parse line from server */
                        if (!parse_revision_line (ctx, in)) {
                                msg_info ("sync_read: cannot parse line");
@@ -170,7 +208,7 @@ sync_read (f_str_t * in, void *arg)
                        statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
                        /* Now try to read other revision or END line */
                        rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
-                       ctx->state = SYNC_STATE_GREETING;
+                       ctx->state = SYNC_STATE_READ_LINE;
                        break;
                case SYNC_STATE_QUIT:
                        rspamd_remove_dispatcher (ctx->dispatcher);
@@ -196,15 +234,13 @@ static void
 sync_timer_callback (int fd, short what, void *ud)
 {
        struct rspamd_sync_ctx *ctx = ud;
-       char                    buf[256];
-       uint64_t                rev = 0;
-       time_t                  ti = 0;
        
        /* Plan new event */
        evtimer_del (&ctx->tm_ev);
        ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
        ctx->interval.tv_usec = 0;
        evtimer_add (&ctx->tm_ev, &ctx->interval);
+       log_next_sync (ctx->st->symbol, ctx->interval.tv_sec);
        
        if (ctx->is_busy) {
                /* Sync is in progress */
@@ -220,14 +256,12 @@ sync_timer_callback (int fd, short what, void *ud)
        ctx->io_tv.tv_sec = IO_TIMEOUT;
        ctx->io_tv.tv_usec = 0;
        ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
-       /* Write initial data */
-       statfile_get_revision (ctx->real_statfile, &rev, &ti);
-       rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
        
        ctx->state = SYNC_STATE_GREETING;
        ctx->is_busy = TRUE;
 
-       rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);      
+       msg_info ("sync_timer_callback: starting synchronization of %s", ctx->st->symbol);
+
 }
 
 static gboolean
@@ -254,6 +288,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
        /* Now plan event for it's future executing */
        evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
        evtimer_add (&ctx->tm_ev, &ctx->interval);
+       log_next_sync (st->symbol, ctx->interval.tv_sec);
 
        return TRUE;
 }