summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-12-03 21:28:50 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-12-03 21:28:50 +0300
commit2f551bd813a96567bac80aeaa186b706571b69f7 (patch)
treeae3705b93cbfe24376221b060b22d60ab6cac622 /src
parent56379fcf26379a1588e3ca9ad4bdb1c7f370d2c7 (diff)
downloadrspamd-2f551bd813a96567bac80aeaa186b706571b69f7.tar.gz
rspamd-2f551bd813a96567bac80aeaa186b706571b69f7.zip
* Many major fixes to statfiles:
- fix bug with mmapping files: new addresses must NOT be allocated in shared memory by themselves - fix bug with winnow classifier that totally brokes it down - fix bug with too much grow of values * Use double precission values in statistics * Add statistics for statfiles * Add more informative data to output of LEARN command (weight of incoming message) * Add weight to output of classifier as well
Diffstat (limited to 'src')
-rw-r--r--src/classifiers/classifiers.h4
-rw-r--r--src/classifiers/winnow.c45
-rw-r--r--src/controller.c78
-rw-r--r--src/filter.c2
-rw-r--r--src/statfile.c49
-rw-r--r--src/statfile.h25
-rw-r--r--src/util.c5
7 files changed, 161 insertions, 47 deletions
diff --git a/src/classifiers/classifiers.h b/src/classifiers/classifiers.h
index cab6eff21..12787f049 100644
--- a/src/classifiers/classifiers.h
+++ b/src/classifiers/classifiers.h
@@ -20,7 +20,7 @@ struct classifier {
struct classifier_ctx* (*init_func)(memory_pool_t *pool, struct classifier_config *cf);
void (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
void (*learn_func)(struct classifier_ctx* ctx, statfile_pool_t *pool,
- stat_file_t *file, GTree *input, gboolean in_class);
+ stat_file_t *file, GTree *input, gboolean in_class, double *sum);
};
/* Get classifier structure by name or return NULL if this name is not found */
@@ -29,7 +29,7 @@ struct classifier* get_classifier (char *name);
/* Winnow algorithm */
struct classifier_ctx* winnow_init (memory_pool_t *pool, struct classifier_config *cf);
void winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
-void winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, stat_file_t *file, GTree *input, gboolean in_class);
+void winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, stat_file_t *file, GTree *input, gboolean in_class, double *sum);
/* Array of all defined classifiers */
extern struct classifier classifiers[];
diff --git a/src/classifiers/winnow.c b/src/classifiers/winnow.c
index 8f2b6a6fb..7e1144ae7 100644
--- a/src/classifiers/winnow.c
+++ b/src/classifiers/winnow.c
@@ -49,13 +49,11 @@ classify_callback (gpointer key, gpointer value, gpointer data)
{
token_node_t *node = key;
struct winnow_callback_data *cd = data;
- float v;
+ double v;
/* Consider that not found blocks have value 1 */
- if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) {
- cd->sum += 1;
- }
- else {
+ v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now);
+ if (fabs (v) > 0.00001) {
cd->sum += v;
cd->in_class++;
}
@@ -70,12 +68,13 @@ learn_callback (gpointer key, gpointer value, gpointer data)
{
token_node_t *node = key;
struct winnow_callback_data *cd = data;
- float v, c;
+ double v, c;
c = (cd->in_class) ? WINNOW_PROMOTION : WINNOW_DEMOTION;
/* Consider that not found blocks have value 1 */
- if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) {
+ v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now);
+ if (fabs (v) < 0.00001) {
statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, c);
node->value = c;
}
@@ -83,7 +82,8 @@ learn_callback (gpointer key, gpointer value, gpointer data)
statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, v * c);
node->value = v * c;
}
-
+
+ cd->sum += node->value;
cd->count++;
return FALSE;
@@ -104,8 +104,8 @@ void
winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task)
{
struct winnow_callback_data data;
- double *res = memory_pool_alloc (ctx->pool, sizeof (double));
- double max = 0;
+ char *sumbuf;
+ double res = 0., max = 0.;
GList *cur;
struct statfile *st, *sel = NULL;
@@ -136,25 +136,28 @@ winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * inp
}
if (data.count != 0) {
- *res = (data.sum / data.count);
+ res = data.sum / data.count;
}
else {
- *res = 0;
+ res = 0;
}
- if (*res > max) {
- max = *res;
+ if (res > max) {
+ max = res;
sel = st;
}
cur = g_list_next (cur);
}
if (sel != NULL) {
- insert_result (task, ctx->cfg->metric, sel->symbol, 1, NULL);
+ sumbuf = memory_pool_alloc (task->task_pool, 32);
+ snprintf (sumbuf, 32, "%.2f", max);
+ cur = g_list_prepend (NULL, sumbuf);
+ insert_result (task, ctx->cfg->metric, sel->symbol, 1, cur);
}
}
void
-winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *file, GTree * input, int in_class)
+winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *file, GTree * input, int in_class, double *sum)
{
struct winnow_callback_data data = {
.file = NULL,
@@ -172,9 +175,19 @@ winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *fi
data.file = file;
+
if (data.file != NULL) {
statfile_pool_lock_file (pool, data.file);
g_tree_foreach (input, learn_callback, &data);
statfile_pool_unlock_file (pool, data.file);
}
+
+ if (sum) {
+ if (data.count != 0) {
+ *sum = data.sum / data.count;
+ }
+ else {
+ *sum = 0;
+ }
+ }
}
diff --git a/src/controller.c b/src/controller.c
index 6df3a220c..6176fd0e6 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -321,6 +321,63 @@ process_sync_command (struct controller_session *session, char **args)
return TRUE;
}
+static gboolean
+process_stat_command (struct controller_session *session)
+{
+ char out_buf[BUFSIZ], *numbuf;
+ int r;
+ uint64_t used, total, rev;
+ time_t ti;
+ memory_pool_stat_t mem_st;
+ struct classifier_config *ccf;
+ stat_file_t *statfile;
+ struct statfile *st;
+ GList *cur_cl, *cur_st;
+
+ memory_pool_stat (&mem_st);
+ r = snprintf (out_buf, sizeof (out_buf), "Messages scanned: %u" CRLF, session->worker->srv->stat->messages_scanned);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %u" CRLF, session->worker->srv->stat->messages_learned);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %u" CRLF, session->worker->srv->stat->connections_count);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %u" CRLF, session->worker->srv->stat->control_connections_count);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %ld" CRLF, (long int)mem_st.pools_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %ld" CRLF, (long int)mem_st.pools_freed);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %ld" CRLF, (long int)mem_st.bytes_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %ld" CRLF, (long int)mem_st.chunks_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %ld" CRLF, (long int)mem_st.shared_chunks_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %ld" CRLF, (long int)mem_st.chunks_freed);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %ld" CRLF, (long int)mem_st.oversized_chunks);
+ /* Now write statistics for each statfile */
+ cur_cl = g_list_first (session->cfg->classifiers);
+ while (cur_cl) {
+ ccf = cur_cl->data;
+ cur_st = g_list_first (ccf->statfiles);
+ while (cur_st) {
+ st = cur_st->data;
+ if ((statfile = statfile_pool_is_open (session->worker->srv->statfile_pool, st->path)) == NULL) {
+ statfile = statfile_pool_open (session->worker->srv->statfile_pool, st->path, st->size, FALSE);
+ }
+ if (statfile) {
+ used = statfile_get_used_blocks (statfile);
+ total = statfile_get_total_blocks (statfile);
+ statfile_get_revision (statfile, &rev, &ti);
+ if (total != (uint64_t)-1 && used != (uint64_t)-1) {
+ numbuf = g_format_size_for_display (st->size);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r,
+ "Statfile: %s (version %lu); length: %s; free blocks: %lu; total blocks: %lu; free: %.2f%%" CRLF,
+ st->symbol, (long unsigned)rev, numbuf,
+ (long unsigned)(total - used), (long unsigned)total,
+ (double)((double)(total - used) / (double)total) * 100.);
+ g_free (numbuf);
+ }
+ }
+ cur_st = g_list_next (cur_st);
+ }
+ cur_cl = g_list_next (cur_cl);
+ }
+
+ return rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+}
+
static void
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
{
@@ -329,7 +386,6 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
time_t uptime;
unsigned long size = 0;
struct classifier_config *cl;
- memory_pool_stat_t mem_st;
char *password = g_hash_table_lookup (session->worker->cf->params, "password");
switch (cmd->type) {
@@ -369,19 +425,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
break;
case COMMAND_STAT:
if (check_auth (cmd, session)) {
- memory_pool_stat (&mem_st);
- r = snprintf (out_buf, sizeof (out_buf), "Messages scanned: %u" CRLF, session->worker->srv->stat->messages_scanned);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %u" CRLF, session->worker->srv->stat->messages_learned);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %u" CRLF, session->worker->srv->stat->connections_count);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %u" CRLF, session->worker->srv->stat->control_connections_count);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %ld" CRLF, (long int)mem_st.pools_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %ld" CRLF, (long int)mem_st.pools_freed);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %ld" CRLF, (long int)mem_st.bytes_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %ld" CRLF, (long int)mem_st.chunks_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %ld" CRLF, (long int)mem_st.shared_chunks_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %ld" CRLF, (long int)mem_st.chunks_freed);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %ld" CRLF, (long int)mem_st.oversized_chunks);
- rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ (void)process_stat_command (session);
}
break;
case COMMAND_SHUTDOWN:
@@ -545,6 +589,7 @@ controller_read_socket (f_str_t * in, void *arg)
GList *comp_list, *cur = NULL;
GTree *tokens = NULL;
f_str_t c;
+ double sum;
switch (session->state) {
case STATE_COMMAND:
@@ -644,13 +689,14 @@ controller_read_socket (f_str_t * in, void *arg)
cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
/* XXX: remove this awful legacy */
- session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, statfile, tokens, session->in_class);
+ session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
+ statfile, tokens, session->in_class, &sum);
session->worker->srv->stat->messages_learned++;
maybe_write_binlog (session->learn_classifier, st, statfile, tokens);
free_task (task, FALSE);
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok, sum weight: %.2f" CRLF, sum);
if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
return FALSE;
}
diff --git a/src/filter.c b/src/filter.c
index 66a256708..0734bcf56 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -488,7 +488,7 @@ process_autolearn (struct statfile *st, struct worker_task *task, GTree * tokens
return;
}
- classifier->learn_func (ctx, task->worker->srv->statfile_pool, statfile, tokens, TRUE);
+ classifier->learn_func (ctx, task->worker->srv->statfile_pool, statfile, tokens, TRUE, NULL);
maybe_write_binlog (ctx->cfg, st, statfile, tokens);
}
}
diff --git a/src/statfile.c b/src/statfile.c
index 67bc677bb..a58730144 100644
--- a/src/statfile.c
+++ b/src/statfile.c
@@ -35,7 +35,7 @@
static void statfile_pool_set_block_common (
statfile_pool_t * pool, stat_file_t * file,
uint32_t h1, uint32_t h2,
- time_t t, float value,
+ time_t t, double value,
gboolean from_now);
static int
@@ -209,7 +209,7 @@ statfile_pool_new (size_t max_size)
bzero (new, sizeof (statfile_pool_t));
new->pool = memory_pool_new (memory_pool_get_size ());
new->max = max_size;
- new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t));
+ new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t));
new->lock = memory_pool_get_mutex (new->pool);
return new;
@@ -405,7 +405,8 @@ statfile_pool_create (statfile_pool_t * pool, char *filename, size_t size)
.version = RSPAMD_STATFILE_VERSION,
.padding = {0, 0, 0},
.revision = 0,
- .rev_time = 0
+ .rev_time = 0,
+ .used_blocks = 0
};
struct stat_file_section section = {
.code = STATFILE_SECTION_COMMON,
@@ -422,6 +423,7 @@ statfile_pool_create (statfile_pool_t * pool, char *filename, size_t size)
memory_pool_lock_mutex (pool->lock);
nblocks = (size - sizeof (struct stat_file_header) - sizeof (struct stat_file_section)) / sizeof (struct stat_file_block);
+ header.total_blocks = nblocks;
if ((fd = open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
msg_info ("statfile_pool_create: cannot create file %s, error %d, %s", filename, errno, strerror (errno));
@@ -513,7 +515,7 @@ statfile_pool_unlock_file (statfile_pool_t * pool, stat_file_t * file)
memory_pool_unlock_mutex (file->lock);
}
-float
+double
statfile_pool_get_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now)
{
struct stat_file_block *block;
@@ -549,7 +551,7 @@ statfile_pool_get_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1
}
static void
-statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t t, float value, gboolean from_now)
+statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t t, double value, gboolean from_now)
{
struct stat_file_block *block, *to_expire = NULL;
struct stat_file_header *header;
@@ -599,6 +601,8 @@ statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint
else {
block->last_access = t;
}
+ header->used_blocks ++;
+
return;
}
if (block->last_access > oldest) {
@@ -629,7 +633,7 @@ statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint
}
void
-statfile_pool_set_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now, float value)
+statfile_pool_set_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now, double value)
{
statfile_pool_set_block_common (pool, file, h1, h2, now, value, TRUE);
}
@@ -784,3 +788,36 @@ statfile_get_revision (stat_file_t *file, uint64_t *rev, time_t *time)
return TRUE;
}
+
+uint64_t
+statfile_get_used_blocks (stat_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return (uint64_t)-1;
+ }
+
+ header = (struct stat_file_header *)file->map;
+
+ return header->used_blocks;
+}
+
+uint64_t
+statfile_get_total_blocks (stat_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return (uint64_t)-1;
+ }
+
+ header = (struct stat_file_header *)file->map;
+
+ /* If total blocks is 0 we have old version of header, so set total blocks correctly */
+ if (header->total_blocks == 0) {
+ header->total_blocks = file->cur_section.length;
+ }
+
+ return header->total_blocks;
+}
diff --git a/src/statfile.h b/src/statfile.h
index 02b6dbcc8..43b84bdfe 100644
--- a/src/statfile.h
+++ b/src/statfile.h
@@ -28,7 +28,9 @@ struct stat_file_header {
uint64_t create_time; /**< create time (time_t->uint64_t) */
uint64_t revision; /**< revision number */
uint64_t rev_time; /**< revision time */
- u_char unused[255]; /**< some bytes that can be used in future */
+ uint64_t used_blocks; /**< used blocks number */
+ uint64_t total_blocks; /**< total number of blocks */
+ u_char unused[239]; /**< some bytes that can be used in future */
};
/**
@@ -46,7 +48,7 @@ struct stat_file_block {
uint32_t hash1; /**< hash1 (also acts as index) */
uint32_t hash2; /**< hash2 */
uint32_t last_access; /**< last access to block since create time of file */
- float value; /**< float value */
+ double value; /**< double value */
};
/**
@@ -152,7 +154,7 @@ void statfile_pool_unlock_file (statfile_pool_t *pool, stat_file_t *file);
* @param now current time
* @return block value or 0 if block is not found
*/
-float statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now);
+double statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now);
/**
* Set specified block in statfile
@@ -163,7 +165,7 @@ float statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_
* @param now current time
* @param value value of block
*/
-void statfile_pool_set_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now, float value);
+void statfile_pool_set_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now, double value);
/**
* Check whether statfile is opened
@@ -211,7 +213,6 @@ uint32_t statfile_get_section_by_name (const char *name);
/**
* Set statfile revision and revision time
- * @param pool statfile pool object
* @param filename name of statfile
* @param revision number of revision
* @param time time of revision
@@ -221,7 +222,6 @@ gboolean statfile_set_revision (stat_file_t *file, uint64_t rev, time_t time);
/**
* Set statfile revision and revision time
- * @param pool statfile pool object
* @param filename name of statfile
* @param revision saved number of revision
* @param time saved time of revision
@@ -229,5 +229,18 @@ gboolean statfile_set_revision (stat_file_t *file, uint64_t rev, time_t time);
*/
gboolean statfile_get_revision (stat_file_t *file, uint64_t *rev, time_t *time);
+/**
+ * Get statfile used blocks
+ * @param file file to get number of used blocks
+ * @return number of used blocks or (uint64_t)-1 in case of error
+ */
+uint64_t statfile_get_used_blocks (stat_file_t *file);
+
+/**
+ * Get statfile total blocks
+ * @param file file to get number of used blocks
+ * @return number of used blocks or (uint64_t)-1 in case of error
+ */
+uint64_t statfile_get_total_blocks (stat_file_t *file);
#endif
diff --git a/src/util.c b/src/util.c
index 2e22772c2..000aeb43e 100644
--- a/src/util.c
+++ b/src/util.c
@@ -1153,6 +1153,7 @@ get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
GList *cur;
if (pool == NULL || ccf == NULL || symbol == NULL) {
+ msg_err ("get_statfile_by_symbol: invalid input arguments");
return NULL;
}
@@ -1166,6 +1167,7 @@ get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
cur = g_list_next (cur);
}
if (*st == NULL) {
+ msg_info ("get_statfile_by_symbol: cannot find statfile with symbol %s", symbol);
return NULL;
}
@@ -1178,6 +1180,9 @@ get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
return NULL;
}
res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE);
+ if (res == NULL) {
+ msg_err ("get_statfile_by_symbol: cannot open statfile %s after creation", (*st)->path);
+ }
}
}
}