From: Vsevolod Stakhov Date: Thu, 3 Dec 2009 18:28:50 +0000 (+0300) Subject: * Many major fixes to statfiles: X-Git-Tag: 0.3.0~116 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=2f551bd813a96567bac80aeaa186b706571b69f7;p=rspamd.git * Many major fixes to statfiles: - fix bug with mmapping files: new addresses must NOT be allocated in shared memory by themselves - fix bug with winnow classifier that totally brokes it down - fix bug with too much grow of values * Use double precission values in statistics * Add statistics for statfiles * Add more informative data to output of LEARN command (weight of incoming message) * Add weight to output of classifier as well --- diff --git a/rspamc.pl.in b/rspamc.pl.in index 17e830e3b..f628fb3d1 100755 --- a/rspamc.pl.in +++ b/rspamc.pl.in @@ -169,8 +169,8 @@ sub do_control_command { syswrite $sock, "learn $cfg{'statfile'} $len" . $CRLF; syswrite $sock, $input . $CRLF; if (defined (my $reply = <$sock>)) { - if ($reply =~ /^learn ok/) { - print "Learn succeed\n"; + if ($reply =~ /^learn ok, sum weight: ([0-9.]+)/) { + print "Learn succeed. Sum weight: $1\n"; } else { print "Learn failed\n"; diff --git a/src/classifiers/classifiers.h b/src/classifiers/classifiers.h index cab6eff21..12787f049 100644 --- a/src/classifiers/classifiers.h +++ b/src/classifiers/classifiers.h @@ -20,7 +20,7 @@ struct classifier { struct classifier_ctx* (*init_func)(memory_pool_t *pool, struct classifier_config *cf); void (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); void (*learn_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, - stat_file_t *file, GTree *input, gboolean in_class); + stat_file_t *file, GTree *input, gboolean in_class, double *sum); }; /* Get classifier structure by name or return NULL if this name is not found */ @@ -29,7 +29,7 @@ struct classifier* get_classifier (char *name); /* Winnow algorithm */ struct classifier_ctx* winnow_init (memory_pool_t *pool, struct classifier_config *cf); void winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); -void winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, stat_file_t *file, GTree *input, gboolean in_class); +void winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, stat_file_t *file, GTree *input, gboolean in_class, double *sum); /* Array of all defined classifiers */ extern struct classifier classifiers[]; diff --git a/src/classifiers/winnow.c b/src/classifiers/winnow.c index 8f2b6a6fb..7e1144ae7 100644 --- a/src/classifiers/winnow.c +++ b/src/classifiers/winnow.c @@ -49,13 +49,11 @@ classify_callback (gpointer key, gpointer value, gpointer data) { token_node_t *node = key; struct winnow_callback_data *cd = data; - float v; + double v; /* Consider that not found blocks have value 1 */ - if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) { - cd->sum += 1; - } - else { + v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now); + if (fabs (v) > 0.00001) { cd->sum += v; cd->in_class++; } @@ -70,12 +68,13 @@ learn_callback (gpointer key, gpointer value, gpointer data) { token_node_t *node = key; struct winnow_callback_data *cd = data; - float v, c; + double v, c; c = (cd->in_class) ? WINNOW_PROMOTION : WINNOW_DEMOTION; /* Consider that not found blocks have value 1 */ - if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) { + v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now); + if (fabs (v) < 0.00001) { statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, c); node->value = c; } @@ -83,7 +82,8 @@ learn_callback (gpointer key, gpointer value, gpointer data) statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, v * c); node->value = v * c; } - + + cd->sum += node->value; cd->count++; return FALSE; @@ -104,8 +104,8 @@ void winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task) { struct winnow_callback_data data; - double *res = memory_pool_alloc (ctx->pool, sizeof (double)); - double max = 0; + char *sumbuf; + double res = 0., max = 0.; GList *cur; struct statfile *st, *sel = NULL; @@ -136,25 +136,28 @@ winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * inp } if (data.count != 0) { - *res = (data.sum / data.count); + res = data.sum / data.count; } else { - *res = 0; + res = 0; } - if (*res > max) { - max = *res; + if (res > max) { + max = res; sel = st; } cur = g_list_next (cur); } if (sel != NULL) { - insert_result (task, ctx->cfg->metric, sel->symbol, 1, NULL); + sumbuf = memory_pool_alloc (task->task_pool, 32); + snprintf (sumbuf, 32, "%.2f", max); + cur = g_list_prepend (NULL, sumbuf); + insert_result (task, ctx->cfg->metric, sel->symbol, 1, cur); } } void -winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *file, GTree * input, int in_class) +winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *file, GTree * input, int in_class, double *sum) { struct winnow_callback_data data = { .file = NULL, @@ -172,9 +175,19 @@ winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *fi data.file = file; + if (data.file != NULL) { statfile_pool_lock_file (pool, data.file); g_tree_foreach (input, learn_callback, &data); statfile_pool_unlock_file (pool, data.file); } + + if (sum) { + if (data.count != 0) { + *sum = data.sum / data.count; + } + else { + *sum = 0; + } + } } diff --git a/src/controller.c b/src/controller.c index 6df3a220c..6176fd0e6 100644 --- a/src/controller.c +++ b/src/controller.c @@ -321,6 +321,63 @@ process_sync_command (struct controller_session *session, char **args) return TRUE; } +static gboolean +process_stat_command (struct controller_session *session) +{ + char out_buf[BUFSIZ], *numbuf; + int r; + uint64_t used, total, rev; + time_t ti; + memory_pool_stat_t mem_st; + struct classifier_config *ccf; + stat_file_t *statfile; + struct statfile *st; + GList *cur_cl, *cur_st; + + memory_pool_stat (&mem_st); + r = snprintf (out_buf, sizeof (out_buf), "Messages scanned: %u" CRLF, session->worker->srv->stat->messages_scanned); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %u" CRLF, session->worker->srv->stat->messages_learned); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %u" CRLF, session->worker->srv->stat->connections_count); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %u" CRLF, session->worker->srv->stat->control_connections_count); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %ld" CRLF, (long int)mem_st.pools_allocated); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %ld" CRLF, (long int)mem_st.pools_freed); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %ld" CRLF, (long int)mem_st.bytes_allocated); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %ld" CRLF, (long int)mem_st.chunks_allocated); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %ld" CRLF, (long int)mem_st.shared_chunks_allocated); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %ld" CRLF, (long int)mem_st.chunks_freed); + r += snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %ld" CRLF, (long int)mem_st.oversized_chunks); + /* Now write statistics for each statfile */ + cur_cl = g_list_first (session->cfg->classifiers); + while (cur_cl) { + ccf = cur_cl->data; + cur_st = g_list_first (ccf->statfiles); + while (cur_st) { + st = cur_st->data; + if ((statfile = statfile_pool_is_open (session->worker->srv->statfile_pool, st->path)) == NULL) { + statfile = statfile_pool_open (session->worker->srv->statfile_pool, st->path, st->size, FALSE); + } + if (statfile) { + used = statfile_get_used_blocks (statfile); + total = statfile_get_total_blocks (statfile); + statfile_get_revision (statfile, &rev, &ti); + if (total != (uint64_t)-1 && used != (uint64_t)-1) { + numbuf = g_format_size_for_display (st->size); + r += snprintf (out_buf + r, sizeof (out_buf) - r, + "Statfile: %s (version %lu); length: %s; free blocks: %lu; total blocks: %lu; free: %.2f%%" CRLF, + st->symbol, (long unsigned)rev, numbuf, + (long unsigned)(total - used), (long unsigned)total, + (double)((double)(total - used) / (double)total) * 100.); + g_free (numbuf); + } + } + cur_st = g_list_next (cur_st); + } + cur_cl = g_list_next (cur_cl); + } + + return rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); +} + static void process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session) { @@ -329,7 +386,6 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control time_t uptime; unsigned long size = 0; struct classifier_config *cl; - memory_pool_stat_t mem_st; char *password = g_hash_table_lookup (session->worker->cf->params, "password"); switch (cmd->type) { @@ -369,19 +425,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control break; case COMMAND_STAT: if (check_auth (cmd, session)) { - memory_pool_stat (&mem_st); - r = snprintf (out_buf, sizeof (out_buf), "Messages scanned: %u" CRLF, session->worker->srv->stat->messages_scanned); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %u" CRLF, session->worker->srv->stat->messages_learned); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %u" CRLF, session->worker->srv->stat->connections_count); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %u" CRLF, session->worker->srv->stat->control_connections_count); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %ld" CRLF, (long int)mem_st.pools_allocated); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %ld" CRLF, (long int)mem_st.pools_freed); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %ld" CRLF, (long int)mem_st.bytes_allocated); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %ld" CRLF, (long int)mem_st.chunks_allocated); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %ld" CRLF, (long int)mem_st.shared_chunks_allocated); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %ld" CRLF, (long int)mem_st.chunks_freed); - r += snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %ld" CRLF, (long int)mem_st.oversized_chunks); - rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + (void)process_stat_command (session); } break; case COMMAND_SHUTDOWN: @@ -545,6 +589,7 @@ controller_read_socket (f_str_t * in, void *arg) GList *comp_list, *cur = NULL; GTree *tokens = NULL; f_str_t c; + double sum; switch (session->state) { case STATE_COMMAND: @@ -644,13 +689,14 @@ controller_read_socket (f_str_t * in, void *arg) cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier); /* XXX: remove this awful legacy */ - session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, statfile, tokens, session->in_class); + session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, + statfile, tokens, session->in_class, &sum); session->worker->srv->stat->messages_learned++; maybe_write_binlog (session->learn_classifier, st, statfile, tokens); free_task (task, FALSE); - i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF); + i = snprintf (out_buf, sizeof (out_buf), "learn ok, sum weight: %.2f" CRLF, sum); if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { return FALSE; } diff --git a/src/filter.c b/src/filter.c index 66a256708..0734bcf56 100644 --- a/src/filter.c +++ b/src/filter.c @@ -488,7 +488,7 @@ process_autolearn (struct statfile *st, struct worker_task *task, GTree * tokens return; } - classifier->learn_func (ctx, task->worker->srv->statfile_pool, statfile, tokens, TRUE); + classifier->learn_func (ctx, task->worker->srv->statfile_pool, statfile, tokens, TRUE, NULL); maybe_write_binlog (ctx->cfg, st, statfile, tokens); } } diff --git a/src/statfile.c b/src/statfile.c index 67bc677bb..a58730144 100644 --- a/src/statfile.c +++ b/src/statfile.c @@ -35,7 +35,7 @@ static void statfile_pool_set_block_common ( statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, - time_t t, float value, + time_t t, double value, gboolean from_now); static int @@ -209,7 +209,7 @@ statfile_pool_new (size_t max_size) bzero (new, sizeof (statfile_pool_t)); new->pool = memory_pool_new (memory_pool_get_size ()); new->max = max_size; - new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t)); + new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t)); new->lock = memory_pool_get_mutex (new->pool); return new; @@ -405,7 +405,8 @@ statfile_pool_create (statfile_pool_t * pool, char *filename, size_t size) .version = RSPAMD_STATFILE_VERSION, .padding = {0, 0, 0}, .revision = 0, - .rev_time = 0 + .rev_time = 0, + .used_blocks = 0 }; struct stat_file_section section = { .code = STATFILE_SECTION_COMMON, @@ -422,6 +423,7 @@ statfile_pool_create (statfile_pool_t * pool, char *filename, size_t size) memory_pool_lock_mutex (pool->lock); nblocks = (size - sizeof (struct stat_file_header) - sizeof (struct stat_file_section)) / sizeof (struct stat_file_block); + header.total_blocks = nblocks; if ((fd = open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { msg_info ("statfile_pool_create: cannot create file %s, error %d, %s", filename, errno, strerror (errno)); @@ -513,7 +515,7 @@ statfile_pool_unlock_file (statfile_pool_t * pool, stat_file_t * file) memory_pool_unlock_mutex (file->lock); } -float +double statfile_pool_get_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now) { struct stat_file_block *block; @@ -549,7 +551,7 @@ statfile_pool_get_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1 } static void -statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t t, float value, gboolean from_now) +statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t t, double value, gboolean from_now) { struct stat_file_block *block, *to_expire = NULL; struct stat_file_header *header; @@ -599,6 +601,8 @@ statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint else { block->last_access = t; } + header->used_blocks ++; + return; } if (block->last_access > oldest) { @@ -629,7 +633,7 @@ statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint } void -statfile_pool_set_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now, float value) +statfile_pool_set_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now, double value) { statfile_pool_set_block_common (pool, file, h1, h2, now, value, TRUE); } @@ -784,3 +788,36 @@ statfile_get_revision (stat_file_t *file, uint64_t *rev, time_t *time) return TRUE; } + +uint64_t +statfile_get_used_blocks (stat_file_t *file) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return (uint64_t)-1; + } + + header = (struct stat_file_header *)file->map; + + return header->used_blocks; +} + +uint64_t +statfile_get_total_blocks (stat_file_t *file) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return (uint64_t)-1; + } + + header = (struct stat_file_header *)file->map; + + /* If total blocks is 0 we have old version of header, so set total blocks correctly */ + if (header->total_blocks == 0) { + header->total_blocks = file->cur_section.length; + } + + return header->total_blocks; +} diff --git a/src/statfile.h b/src/statfile.h index 02b6dbcc8..43b84bdfe 100644 --- a/src/statfile.h +++ b/src/statfile.h @@ -28,7 +28,9 @@ struct stat_file_header { uint64_t create_time; /**< create time (time_t->uint64_t) */ uint64_t revision; /**< revision number */ uint64_t rev_time; /**< revision time */ - u_char unused[255]; /**< some bytes that can be used in future */ + uint64_t used_blocks; /**< used blocks number */ + uint64_t total_blocks; /**< total number of blocks */ + u_char unused[239]; /**< some bytes that can be used in future */ }; /** @@ -46,7 +48,7 @@ struct stat_file_block { uint32_t hash1; /**< hash1 (also acts as index) */ uint32_t hash2; /**< hash2 */ uint32_t last_access; /**< last access to block since create time of file */ - float value; /**< float value */ + double value; /**< double value */ }; /** @@ -152,7 +154,7 @@ void statfile_pool_unlock_file (statfile_pool_t *pool, stat_file_t *file); * @param now current time * @return block value or 0 if block is not found */ -float statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now); +double statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now); /** * Set specified block in statfile @@ -163,7 +165,7 @@ float statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_ * @param now current time * @param value value of block */ -void statfile_pool_set_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now, float value); +void statfile_pool_set_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now, double value); /** * Check whether statfile is opened @@ -211,7 +213,6 @@ uint32_t statfile_get_section_by_name (const char *name); /** * Set statfile revision and revision time - * @param pool statfile pool object * @param filename name of statfile * @param revision number of revision * @param time time of revision @@ -221,7 +222,6 @@ gboolean statfile_set_revision (stat_file_t *file, uint64_t rev, time_t time); /** * Set statfile revision and revision time - * @param pool statfile pool object * @param filename name of statfile * @param revision saved number of revision * @param time saved time of revision @@ -229,5 +229,18 @@ gboolean statfile_set_revision (stat_file_t *file, uint64_t rev, time_t time); */ gboolean statfile_get_revision (stat_file_t *file, uint64_t *rev, time_t *time); +/** + * Get statfile used blocks + * @param file file to get number of used blocks + * @return number of used blocks or (uint64_t)-1 in case of error + */ +uint64_t statfile_get_used_blocks (stat_file_t *file); + +/** + * Get statfile total blocks + * @param file file to get number of used blocks + * @return number of used blocks or (uint64_t)-1 in case of error + */ +uint64_t statfile_get_total_blocks (stat_file_t *file); #endif diff --git a/src/util.c b/src/util.c index 2e22772c2..000aeb43e 100644 --- a/src/util.c +++ b/src/util.c @@ -1153,6 +1153,7 @@ get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, GList *cur; if (pool == NULL || ccf == NULL || symbol == NULL) { + msg_err ("get_statfile_by_symbol: invalid input arguments"); return NULL; } @@ -1166,6 +1167,7 @@ get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, cur = g_list_next (cur); } if (*st == NULL) { + msg_info ("get_statfile_by_symbol: cannot find statfile with symbol %s", symbol); return NULL; } @@ -1178,6 +1180,9 @@ get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, return NULL; } res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE); + if (res == NULL) { + msg_err ("get_statfile_by_symbol: cannot open statfile %s after creation", (*st)->path); + } } } }