@@ -156,7 +156,7 @@ binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rota | |||
struct rspamd_binlog *new; | |||
int len = strlen (path); | |||
struct stat st; | |||
new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog)); | |||
new->pool = pool; | |||
new->rotate_time = rotate_time; | |||
@@ -398,12 +398,12 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) | |||
} | |||
gboolean | |||
binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep) | |||
binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep) | |||
{ | |||
uint32_t metaindex_num; | |||
struct rspamd_index_block *idxb; | |||
struct rspamd_binlog_index *idx; | |||
gboolean idx_mapped = FALSE, res = TRUE; | |||
gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE; | |||
if (!log || !log->metaindex || !log->cur_idx) { | |||
msg_info ("binlog_sync: improperly opened binlog: %s", log->filename); | |||
@@ -412,6 +412,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G | |||
if (*rep == NULL) { | |||
*rep = g_malloc (sizeof (GByteArray)); | |||
is_first = TRUE; | |||
} | |||
else { | |||
/* Unmap old fragment */ | |||
@@ -440,7 +441,7 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G | |||
res = FALSE; | |||
goto end; | |||
} | |||
if ((read (log->fd, &idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { | |||
if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { | |||
unlock_file (log->fd, FALSE); | |||
msg_warn ("binlog_sync: cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno)); | |||
res = FALSE; | |||
@@ -453,10 +454,13 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, G | |||
} | |||
/* Now check specified index */ | |||
idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN]; | |||
if (idx->time != from_time) { | |||
if (is_first && idx->time != *from_time) { | |||
res = FALSE; | |||
goto end; | |||
} | |||
else { | |||
*from_time = idx->time; | |||
} | |||
/* Now fill reply structure */ | |||
(*rep)->len = idx->len; | |||
@@ -537,4 +541,27 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil | |||
return FALSE; | |||
} | |||
struct rspamd_binlog* | |||
get_binlog_by_statfile (struct statfile *st) | |||
{ | |||
struct rspamd_binlog *log; | |||
if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { | |||
return NULL; | |||
} | |||
if (!maybe_init_static ()) { | |||
return NULL; | |||
} | |||
if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { | |||
if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { | |||
g_hash_table_insert (binlog_opened, st, log); | |||
} | |||
else { | |||
return NULL; | |||
} | |||
} | |||
return log; | |||
} |
@@ -56,9 +56,10 @@ struct rspamd_binlog { | |||
struct classifier_config; | |||
struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter); | |||
struct rspamd_binlog* get_binlog_by_statfile (struct statfile *st); | |||
void binlog_close (struct rspamd_binlog *log); | |||
gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes); | |||
gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep); | |||
gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep); | |||
gboolean maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_file_t *file, GTree *nodes); | |||
#endif |
@@ -33,8 +33,8 @@ | |||
#include "tokenizers/tokenizers.h" | |||
#include "classifiers/classifiers.h" | |||
#include "binlog.h" | |||
#include "statfile_sync.h" | |||
#define CRLF "\r\n" | |||
#define END "END" CRLF | |||
/* 120 seconds for controller's IO */ | |||
@@ -50,6 +50,7 @@ enum command_type { | |||
COMMAND_LEARN, | |||
COMMAND_HELP, | |||
COMMAND_COUNTERS, | |||
COMMAND_SYNC | |||
}; | |||
struct controller_command { | |||
@@ -75,6 +76,7 @@ static struct controller_command commands[] = { | |||
{"learn", TRUE, COMMAND_LEARN}, | |||
{"help", FALSE, COMMAND_HELP}, | |||
{"counters", FALSE, COMMAND_COUNTERS}, | |||
{"sync", FALSE, COMMAND_SYNC} | |||
}; | |||
static GList *custom_commands = NULL; | |||
@@ -177,6 +179,141 @@ counter_write_callback (gpointer key, gpointer value, void *data) | |||
rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE); | |||
} | |||
static gboolean | |||
write_whole_statfile (struct controller_session *session, char *symbol, struct classifier_config *ccf) | |||
{ | |||
stat_file_t *statfile; | |||
struct statfile *st; | |||
char out_buf[BUFSIZ]; | |||
int i; | |||
uint64_t rev, time, len, pos; | |||
char *out; | |||
struct rspamd_binlog_element log_elt; | |||
struct stat_file_block *stat_elt; | |||
statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf, | |||
symbol, &st, FALSE); | |||
if (statfile == NULL) { | |||
return FALSE; | |||
} | |||
/* Begin to copy all blocks into array */ | |||
statfile_get_revision (statfile, &rev, (time_t *)&time); | |||
len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element); | |||
i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)len); | |||
rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); | |||
out = memory_pool_alloc (session->session_pool, len); | |||
for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) { | |||
stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block)); | |||
log_elt.h1 = stat_elt->hash1; | |||
log_elt.h2 = stat_elt->hash2; | |||
log_elt.value = stat_elt->value; | |||
memcpy (out + pos, &log_elt, sizeof (log_elt)); | |||
pos += sizeof (struct rspamd_binlog_element); | |||
} | |||
if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) { | |||
return FALSE; | |||
} | |||
return TRUE; | |||
} | |||
static gboolean | |||
process_sync_command (struct controller_session *session, char **args) | |||
{ | |||
char out_buf[BUFSIZ], *arg, *err_str, *symbol; | |||
int r; | |||
uint64_t rev, time; | |||
struct statfile *st; | |||
struct classifier_config *ccf; | |||
GList *cur; | |||
struct rspamd_binlog *binlog; | |||
GByteArray *data = NULL; | |||
arg = *args; | |||
if (!arg || *arg == '\0') { | |||
msg_info ("process_sync_command: bad arguments to sync command, need symbol"); | |||
return FALSE; | |||
} | |||
symbol = arg; | |||
arg = *(args + 1); | |||
if (!arg || *arg == '\0') { | |||
msg_info ("process_sync_command: bad arguments to sync command, need revision"); | |||
return FALSE; | |||
} | |||
rev = strtoull (arg, &err_str, 10); | |||
if (err_str && *err_str != 0) { | |||
msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg); | |||
return FALSE; | |||
} | |||
arg = *(args + 2); | |||
if (!arg || *arg == '\0') { | |||
msg_info ("process_sync_command: bad arguments to sync command, need time"); | |||
return FALSE; | |||
} | |||
time = strtoull (arg, &err_str, 10); | |||
if (err_str && *err_str != 0) { | |||
msg_info ("process_sync_command: bad arguments to sync commanc: %s", arg); | |||
return FALSE; | |||
} | |||
ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol); | |||
if (ccf == NULL) { | |||
msg_info ("process_sync_command: bad symbol: %s", symbol); | |||
return FALSE; | |||
} | |||
cur = g_list_first (ccf->statfiles); | |||
while (cur) { | |||
st = cur->data; | |||
if (strcmp (symbol, st->symbol) == 0) { | |||
break; | |||
} | |||
st = NULL; | |||
cur = g_list_next (cur); | |||
} | |||
if (st == NULL) { | |||
msg_info ("process_sync_command: bad symbol: %s", symbol); | |||
return FALSE; | |||
} | |||
binlog = get_binlog_by_statfile (st); | |||
if (binlog == NULL) { | |||
msg_info ("process_sync_command: cannot open binlog: %s", symbol); | |||
return FALSE; | |||
} | |||
if (rev == 0) { | |||
return write_whole_statfile (session, symbol, ccf); | |||
} | |||
while (binlog_sync (binlog, rev, &time, &data)) { | |||
r = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu", (long unsigned)rev, (long unsigned)time, (long unsigned)data->len); | |||
rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE); | |||
if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) { | |||
if (data != NULL) { | |||
g_free (data); | |||
} | |||
return FALSE; | |||
} | |||
rev ++; | |||
} | |||
if (time == 0) { | |||
if (data != NULL) { | |||
g_free (data); | |||
} | |||
return write_whole_statfile (session, symbol, ccf); | |||
} | |||
if (data != NULL) { | |||
g_free (data); | |||
} | |||
return TRUE; | |||
} | |||
static void | |||
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) | |||
{ | |||
@@ -344,6 +481,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control | |||
session->state = STATE_LEARN; | |||
} | |||
break; | |||
case COMMAND_SYNC: | |||
if (!process_sync_command (session, cmd_args)) { | |||
r = snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF); | |||
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); | |||
return; | |||
} | |||
break; | |||
case COMMAND_HELP: | |||
r = snprintf (out_buf, sizeof (out_buf), | |||
"Rspamd CLI commands (* - privilleged command):" CRLF | |||
@@ -619,6 +763,11 @@ start_controller (struct rspamd_worker *worker) | |||
start_time = time (NULL); | |||
/* Start statfile synchronization */ | |||
if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) { | |||
msg_info ("start_controller: cannot start statfile synchronization, statfiles would not be synchronized"); | |||
} | |||
/* Init command completion */ | |||
for (i = 0; i < G_N_ELEMENTS (commands); i++) { | |||
comp_list = g_list_prepend (comp_list, &commands[i]); |
@@ -38,6 +38,7 @@ | |||
enum rspamd_sync_state { | |||
SYNC_STATE_GREETING, | |||
SYNC_STATE_READ_LINE, | |||
SYNC_STATE_READ_REV, | |||
SYNC_STATE_QUIT, | |||
}; | |||
@@ -62,6 +63,25 @@ struct rspamd_sync_ctx { | |||
uint64_t new_len; | |||
}; | |||
static void | |||
log_next_sync (const char *symbol, time_t delay) | |||
{ | |||
char outstr[200]; | |||
time_t t; | |||
struct tm *tmp; | |||
int r; | |||
t = time(NULL); | |||
t += delay; | |||
tmp = localtime(&t); | |||
if (tmp) { | |||
r = snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol); | |||
if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) { | |||
msg_info (outstr); | |||
} | |||
} | |||
} | |||
static gboolean | |||
parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) | |||
@@ -77,6 +97,13 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) | |||
return TRUE; | |||
} | |||
/* Next check for error line */ | |||
if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) { | |||
ctx->state = SYNC_STATE_QUIT; | |||
ctx->is_busy = FALSE; | |||
return TRUE; | |||
} | |||
/* Now try to extract 3 numbers from string: revision, time and length */ | |||
p = in->begin; | |||
val = &ctx->new_rev; | |||
@@ -137,9 +164,20 @@ static gboolean | |||
sync_read (f_str_t * in, void *arg) | |||
{ | |||
struct rspamd_sync_ctx *ctx = arg; | |||
char buf[256]; | |||
uint64_t rev = 0; | |||
time_t ti = 0; | |||
switch (ctx->state) { | |||
case SYNC_STATE_GREETING: | |||
/* Skip greeting line and write sync command */ | |||
/* Write initial data */ | |||
statfile_get_revision (ctx->real_statfile, &rev, &ti); | |||
rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); | |||
ctx->state = SYNC_STATE_READ_LINE; | |||
rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); | |||
break; | |||
case SYNC_STATE_READ_LINE: | |||
/* Try to parse line from server */ | |||
if (!parse_revision_line (ctx, in)) { | |||
msg_info ("sync_read: cannot parse line"); | |||
@@ -170,7 +208,7 @@ sync_read (f_str_t * in, void *arg) | |||
statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time); | |||
/* Now try to read other revision or END line */ | |||
rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0); | |||
ctx->state = SYNC_STATE_GREETING; | |||
ctx->state = SYNC_STATE_READ_LINE; | |||
break; | |||
case SYNC_STATE_QUIT: | |||
rspamd_remove_dispatcher (ctx->dispatcher); | |||
@@ -196,15 +234,13 @@ static void | |||
sync_timer_callback (int fd, short what, void *ud) | |||
{ | |||
struct rspamd_sync_ctx *ctx = ud; | |||
char buf[256]; | |||
uint64_t rev = 0; | |||
time_t ti = 0; | |||
/* Plan new event */ | |||
evtimer_del (&ctx->tm_ev); | |||
ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2); | |||
ctx->interval.tv_usec = 0; | |||
evtimer_add (&ctx->tm_ev, &ctx->interval); | |||
log_next_sync (ctx->st->symbol, ctx->interval.tv_sec); | |||
if (ctx->is_busy) { | |||
/* Sync is in progress */ | |||
@@ -220,14 +256,12 @@ sync_timer_callback (int fd, short what, void *ud) | |||
ctx->io_tv.tv_sec = IO_TIMEOUT; | |||
ctx->io_tv.tv_usec = 0; | |||
ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx); | |||
/* Write initial data */ | |||
statfile_get_revision (ctx->real_statfile, &rev, &ti); | |||
rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); | |||
ctx->state = SYNC_STATE_GREETING; | |||
ctx->is_busy = TRUE; | |||
rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); | |||
msg_info ("sync_timer_callback: starting synchronization of %s", ctx->st->symbol); | |||
} | |||
static gboolean | |||
@@ -254,6 +288,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st) | |||
/* Now plan event for it's future executing */ | |||
evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); | |||
evtimer_add (&ctx->tm_ev, &ctx->interval); | |||
log_next_sync (st->symbol, ctx->interval.tv_sec); | |||
return TRUE; | |||
} |