- fix bug with mmapping files: new addresses must NOT be allocated in shared memory by themselves
- fix bug with winnow classifier that totally brokes it down
- fix bug with too much grow of values
* Use double precission values in statistics
* Add statistics for statfiles
* Add more informative data to output of LEARN command (weight of incoming message)
* Add weight to output of classifier as well
syswrite $sock, "learn $cfg{'statfile'} $len" . $CRLF;
syswrite $sock, $input . $CRLF;
if (defined (my $reply = <$sock>)) {
- if ($reply =~ /^learn ok/) {
- print "Learn succeed\n";
+ if ($reply =~ /^learn ok, sum weight: ([0-9.]+)/) {
+ print "Learn succeed. Sum weight: $1\n";
}
else {
print "Learn failed\n";
struct classifier_ctx* (*init_func)(memory_pool_t *pool, struct classifier_config *cf);
void (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
void (*learn_func)(struct classifier_ctx* ctx, statfile_pool_t *pool,
- stat_file_t *file, GTree *input, gboolean in_class);
+ stat_file_t *file, GTree *input, gboolean in_class, double *sum);
};
/* Get classifier structure by name or return NULL if this name is not found */
/* Winnow algorithm */
struct classifier_ctx* winnow_init (memory_pool_t *pool, struct classifier_config *cf);
void winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
-void winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, stat_file_t *file, GTree *input, gboolean in_class);
+void winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, stat_file_t *file, GTree *input, gboolean in_class, double *sum);
/* Array of all defined classifiers */
extern struct classifier classifiers[];
{
token_node_t *node = key;
struct winnow_callback_data *cd = data;
- float v;
+ double v;
/* Consider that not found blocks have value 1 */
- if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) {
- cd->sum += 1;
- }
- else {
+ v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now);
+ if (fabs (v) > 0.00001) {
cd->sum += v;
cd->in_class++;
}
{
token_node_t *node = key;
struct winnow_callback_data *cd = data;
- float v, c;
+ double v, c;
c = (cd->in_class) ? WINNOW_PROMOTION : WINNOW_DEMOTION;
/* Consider that not found blocks have value 1 */
- if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) {
+ v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now);
+ if (fabs (v) < 0.00001) {
statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, c);
node->value = c;
}
statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, v * c);
node->value = v * c;
}
-
+
+ cd->sum += node->value;
cd->count++;
return FALSE;
winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task)
{
struct winnow_callback_data data;
- double *res = memory_pool_alloc (ctx->pool, sizeof (double));
- double max = 0;
+ char *sumbuf;
+ double res = 0., max = 0.;
GList *cur;
struct statfile *st, *sel = NULL;
}
if (data.count != 0) {
- *res = (data.sum / data.count);
+ res = data.sum / data.count;
}
else {
- *res = 0;
+ res = 0;
}
- if (*res > max) {
- max = *res;
+ if (res > max) {
+ max = res;
sel = st;
}
cur = g_list_next (cur);
}
if (sel != NULL) {
- insert_result (task, ctx->cfg->metric, sel->symbol, 1, NULL);
+ sumbuf = memory_pool_alloc (task->task_pool, 32);
+ snprintf (sumbuf, 32, "%.2f", max);
+ cur = g_list_prepend (NULL, sumbuf);
+ insert_result (task, ctx->cfg->metric, sel->symbol, 1, cur);
}
}
void
-winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *file, GTree * input, int in_class)
+winnow_learn (struct classifier_ctx *ctx, statfile_pool_t *pool, stat_file_t *file, GTree * input, int in_class, double *sum)
{
struct winnow_callback_data data = {
.file = NULL,
data.file = file;
+
if (data.file != NULL) {
statfile_pool_lock_file (pool, data.file);
g_tree_foreach (input, learn_callback, &data);
statfile_pool_unlock_file (pool, data.file);
}
+
+ if (sum) {
+ if (data.count != 0) {
+ *sum = data.sum / data.count;
+ }
+ else {
+ *sum = 0;
+ }
+ }
}
return TRUE;
}
+static gboolean
+process_stat_command (struct controller_session *session)
+{
+ char out_buf[BUFSIZ], *numbuf;
+ int r;
+ uint64_t used, total, rev;
+ time_t ti;
+ memory_pool_stat_t mem_st;
+ struct classifier_config *ccf;
+ stat_file_t *statfile;
+ struct statfile *st;
+ GList *cur_cl, *cur_st;
+
+ memory_pool_stat (&mem_st);
+ r = snprintf (out_buf, sizeof (out_buf), "Messages scanned: %u" CRLF, session->worker->srv->stat->messages_scanned);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %u" CRLF, session->worker->srv->stat->messages_learned);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %u" CRLF, session->worker->srv->stat->connections_count);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %u" CRLF, session->worker->srv->stat->control_connections_count);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %ld" CRLF, (long int)mem_st.pools_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %ld" CRLF, (long int)mem_st.pools_freed);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %ld" CRLF, (long int)mem_st.bytes_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %ld" CRLF, (long int)mem_st.chunks_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %ld" CRLF, (long int)mem_st.shared_chunks_allocated);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %ld" CRLF, (long int)mem_st.chunks_freed);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %ld" CRLF, (long int)mem_st.oversized_chunks);
+ /* Now write statistics for each statfile */
+ cur_cl = g_list_first (session->cfg->classifiers);
+ while (cur_cl) {
+ ccf = cur_cl->data;
+ cur_st = g_list_first (ccf->statfiles);
+ while (cur_st) {
+ st = cur_st->data;
+ if ((statfile = statfile_pool_is_open (session->worker->srv->statfile_pool, st->path)) == NULL) {
+ statfile = statfile_pool_open (session->worker->srv->statfile_pool, st->path, st->size, FALSE);
+ }
+ if (statfile) {
+ used = statfile_get_used_blocks (statfile);
+ total = statfile_get_total_blocks (statfile);
+ statfile_get_revision (statfile, &rev, &ti);
+ if (total != (uint64_t)-1 && used != (uint64_t)-1) {
+ numbuf = g_format_size_for_display (st->size);
+ r += snprintf (out_buf + r, sizeof (out_buf) - r,
+ "Statfile: %s (version %lu); length: %s; free blocks: %lu; total blocks: %lu; free: %.2f%%" CRLF,
+ st->symbol, (long unsigned)rev, numbuf,
+ (long unsigned)(total - used), (long unsigned)total,
+ (double)((double)(total - used) / (double)total) * 100.);
+ g_free (numbuf);
+ }
+ }
+ cur_st = g_list_next (cur_st);
+ }
+ cur_cl = g_list_next (cur_cl);
+ }
+
+ return rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+}
+
static void
process_command (struct controller_command *cmd, char **cmd_args, struct controller_session *session)
{
time_t uptime;
unsigned long size = 0;
struct classifier_config *cl;
- memory_pool_stat_t mem_st;
char *password = g_hash_table_lookup (session->worker->cf->params, "password");
switch (cmd->type) {
break;
case COMMAND_STAT:
if (check_auth (cmd, session)) {
- memory_pool_stat (&mem_st);
- r = snprintf (out_buf, sizeof (out_buf), "Messages scanned: %u" CRLF, session->worker->srv->stat->messages_scanned);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %u" CRLF, session->worker->srv->stat->messages_learned);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %u" CRLF, session->worker->srv->stat->connections_count);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %u" CRLF, session->worker->srv->stat->control_connections_count);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %ld" CRLF, (long int)mem_st.pools_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %ld" CRLF, (long int)mem_st.pools_freed);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %ld" CRLF, (long int)mem_st.bytes_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %ld" CRLF, (long int)mem_st.chunks_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %ld" CRLF, (long int)mem_st.shared_chunks_allocated);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %ld" CRLF, (long int)mem_st.chunks_freed);
- r += snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %ld" CRLF, (long int)mem_st.oversized_chunks);
- rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ (void)process_stat_command (session);
}
break;
case COMMAND_SHUTDOWN:
GList *comp_list, *cur = NULL;
GTree *tokens = NULL;
f_str_t c;
+ double sum;
switch (session->state) {
case STATE_COMMAND:
cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
/* XXX: remove this awful legacy */
- session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool, statfile, tokens, session->in_class);
+ session->learn_classifier->classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
+ statfile, tokens, session->in_class, &sum);
session->worker->srv->stat->messages_learned++;
maybe_write_binlog (session->learn_classifier, st, statfile, tokens);
free_task (task, FALSE);
- i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+ i = snprintf (out_buf, sizeof (out_buf), "learn ok, sum weight: %.2f" CRLF, sum);
if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
return FALSE;
}
return;
}
- classifier->learn_func (ctx, task->worker->srv->statfile_pool, statfile, tokens, TRUE);
+ classifier->learn_func (ctx, task->worker->srv->statfile_pool, statfile, tokens, TRUE, NULL);
maybe_write_binlog (ctx->cfg, st, statfile, tokens);
}
}
static void statfile_pool_set_block_common (
statfile_pool_t * pool, stat_file_t * file,
uint32_t h1, uint32_t h2,
- time_t t, float value,
+ time_t t, double value,
gboolean from_now);
static int
bzero (new, sizeof (statfile_pool_t));
new->pool = memory_pool_new (memory_pool_get_size ());
new->max = max_size;
- new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t));
+ new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t));
new->lock = memory_pool_get_mutex (new->pool);
return new;
.version = RSPAMD_STATFILE_VERSION,
.padding = {0, 0, 0},
.revision = 0,
- .rev_time = 0
+ .rev_time = 0,
+ .used_blocks = 0
};
struct stat_file_section section = {
.code = STATFILE_SECTION_COMMON,
memory_pool_lock_mutex (pool->lock);
nblocks = (size - sizeof (struct stat_file_header) - sizeof (struct stat_file_section)) / sizeof (struct stat_file_block);
+ header.total_blocks = nblocks;
if ((fd = open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
msg_info ("statfile_pool_create: cannot create file %s, error %d, %s", filename, errno, strerror (errno));
memory_pool_unlock_mutex (file->lock);
}
-float
+double
statfile_pool_get_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now)
{
struct stat_file_block *block;
}
static void
-statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t t, float value, gboolean from_now)
+statfile_pool_set_block_common (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t t, double value, gboolean from_now)
{
struct stat_file_block *block, *to_expire = NULL;
struct stat_file_header *header;
else {
block->last_access = t;
}
+ header->used_blocks ++;
+
return;
}
if (block->last_access > oldest) {
}
void
-statfile_pool_set_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now, float value)
+statfile_pool_set_block (statfile_pool_t * pool, stat_file_t * file, uint32_t h1, uint32_t h2, time_t now, double value)
{
statfile_pool_set_block_common (pool, file, h1, h2, now, value, TRUE);
}
return TRUE;
}
+
+uint64_t
+statfile_get_used_blocks (stat_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return (uint64_t)-1;
+ }
+
+ header = (struct stat_file_header *)file->map;
+
+ return header->used_blocks;
+}
+
+uint64_t
+statfile_get_total_blocks (stat_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return (uint64_t)-1;
+ }
+
+ header = (struct stat_file_header *)file->map;
+
+ /* If total blocks is 0 we have old version of header, so set total blocks correctly */
+ if (header->total_blocks == 0) {
+ header->total_blocks = file->cur_section.length;
+ }
+
+ return header->total_blocks;
+}
uint64_t create_time; /**< create time (time_t->uint64_t) */
uint64_t revision; /**< revision number */
uint64_t rev_time; /**< revision time */
- u_char unused[255]; /**< some bytes that can be used in future */
+ uint64_t used_blocks; /**< used blocks number */
+ uint64_t total_blocks; /**< total number of blocks */
+ u_char unused[239]; /**< some bytes that can be used in future */
};
/**
uint32_t hash1; /**< hash1 (also acts as index) */
uint32_t hash2; /**< hash2 */
uint32_t last_access; /**< last access to block since create time of file */
- float value; /**< float value */
+ double value; /**< double value */
};
/**
* @param now current time
* @return block value or 0 if block is not found
*/
-float statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now);
+double statfile_pool_get_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now);
/**
* Set specified block in statfile
* @param now current time
* @param value value of block
*/
-void statfile_pool_set_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now, float value);
+void statfile_pool_set_block (statfile_pool_t *pool, stat_file_t *file, uint32_t h1, uint32_t h2, time_t now, double value);
/**
* Check whether statfile is opened
/**
* Set statfile revision and revision time
- * @param pool statfile pool object
* @param filename name of statfile
* @param revision number of revision
* @param time time of revision
/**
* Set statfile revision and revision time
- * @param pool statfile pool object
* @param filename name of statfile
* @param revision saved number of revision
* @param time saved time of revision
*/
gboolean statfile_get_revision (stat_file_t *file, uint64_t *rev, time_t *time);
+/**
+ * Get statfile used blocks
+ * @param file file to get number of used blocks
+ * @return number of used blocks or (uint64_t)-1 in case of error
+ */
+uint64_t statfile_get_used_blocks (stat_file_t *file);
+
+/**
+ * Get statfile total blocks
+ * @param file file to get number of used blocks
+ * @return number of used blocks or (uint64_t)-1 in case of error
+ */
+uint64_t statfile_get_total_blocks (stat_file_t *file);
#endif
GList *cur;
if (pool == NULL || ccf == NULL || symbol == NULL) {
+ msg_err ("get_statfile_by_symbol: invalid input arguments");
return NULL;
}
cur = g_list_next (cur);
}
if (*st == NULL) {
+ msg_info ("get_statfile_by_symbol: cannot find statfile with symbol %s", symbol);
return NULL;
}
return NULL;
}
res = statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE);
+ if (res == NULL) {
+ msg_err ("get_statfile_by_symbol: cannot open statfile %s after creation", (*st)->path);
+ }
}
}
}