Browse Source

* Rework statfiles result processing

* Fix small bug in protocol implementation (\r -> \r\n)
tags/0.2.7
Vsevolod Stakhov 15 years ago
parent
commit
f411d46ff2
8 changed files with 55 additions and 16 deletions
  1. 2
    2
      rspamc.pl
  2. 12
    0
      src/cfg_utils.c
  3. 1
    1
      src/classifiers/classifiers.c
  4. 2
    0
      src/classifiers/classifiers.h
  5. 6
    0
      src/classifiers/winnow.c
  6. 28
    12
      src/filter.c
  7. 2
    1
      src/protocol.c
  8. 2
    0
      src/worker.c

+ 2
- 2
rspamc.pl View File

@@ -103,8 +103,8 @@ sub do_rspamc_command {
syswrite $sock, "Content-Length: " . length ($input) . $CRLF . $CRLF;
syswrite $sock, $input;
syswrite $sock, $CRLF;
while (<$sock>) {
print $_;
while (defined (my $line = <$sock>)) {
print $line;
}
}


+ 12
- 0
src/cfg_utils.c View File

@@ -18,12 +18,15 @@
#include "config.h"
#include "cfg_file.h"
#include "main.h"
#include "filter.h"
#ifndef HAVE_OWN_QUEUE_H
#include <sys/queue.h>
#else
#include "queue.h"
#endif

#define DEFAULT_SCORE 10.0

extern int yylineno;
extern char *yytext;

@@ -163,6 +166,8 @@ parse_bind_line (struct config_file *cf, char *str, char is_control)
void
init_defaults (struct config_file *cfg)
{
struct metric *def_metric;

cfg->memcached_error_time = DEFAULT_UPSTREAM_ERROR_TIME;
cfg->memcached_dead_time = DEFAULT_UPSTREAM_DEAD_TIME;
cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS;
@@ -179,6 +184,13 @@ init_defaults (struct config_file *cfg)
cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal);
cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal);

def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric));
def_metric->name = "default";
def_metric->func_name = "factors";
def_metric->func = factor_consolidation_func;
def_metric->required_score = DEFAULT_SCORE;
g_hash_table_insert (cfg->metrics, "default", def_metric);

LIST_INIT (&cfg->perl_modules);
}


+ 1
- 1
src/classifiers/classifiers.c View File

@@ -6,7 +6,7 @@
#include "classifiers.h"

struct classifier classifiers[] = {
{"winnow", winnow_classify, winnow_learn },
{"winnow", winnow_classify, winnow_learn, winnow_add_result },
};

struct classifier*

+ 2
- 0
src/classifiers/classifiers.h View File

@@ -16,6 +16,7 @@ struct classifier {
char *name;
double (*classify_func)(statfile_pool_t *pool, char *statfile, GTree *input);
void (*learn_func)(statfile_pool_t *pool, char *statfile, GTree *input, int in_class);
double (*add_result_func)(double result, double new);
};

/* Get classifier structure by name or return NULL if this name is not found */
@@ -23,6 +24,7 @@ struct classifier* get_classifier (char *name);
/* Winnow algorithm */
double winnow_classify (statfile_pool_t *pool, char *statfile, GTree *input);
void winnow_learn (statfile_pool_t *pool, char *statfile, GTree *input, int in_class);
double winnow_add_result (double result, double new);

/* Array of all defined classifiers */
extern struct classifier classifiers[];

+ 6
- 0
src/classifiers/winnow.c View File

@@ -105,3 +105,9 @@ winnow_learn (statfile_pool_t *pool, char *statfile, GTree *input, int in_class)
statfile_pool_unlock_file (pool, statfile);
}

double
winnow_add_result (double result, double new)
{
return result + new;
}

+ 28
- 12
src/filter.c View File

@@ -349,6 +349,12 @@ struct statfile_callback_data {
struct worker_task *task;
};

struct statfile_result {
double weight;
GList *symbols;
struct classifier *classifier;
};

static void
statfiles_callback (gpointer key, gpointer value, void *arg)
{
@@ -357,7 +363,8 @@ statfiles_callback (gpointer key, gpointer value, void *arg)
struct statfile *st = (struct statfile *)value;
GTree *tokens = NULL;
char *filename;
double weight, *w;
double weight;
struct statfile_result *res;
GList *cur = NULL;
GByteArray *content;
f_str_t c;
@@ -392,13 +399,16 @@ statfiles_callback (gpointer key, gpointer value, void *arg)
msg_debug ("process_statfiles: got classify weight: %.2f", weight);
if (weight > 0.000001) {
if ((w = g_hash_table_lookup (data->metrics, st->metric)) == NULL) {
w = memory_pool_alloc (task->task_pool, sizeof (double));
*w = weight * st->weight;
g_hash_table_insert (data->metrics, st->metric, w);

if ((res = g_hash_table_lookup (data->metrics, st->metric)) == NULL) {
res = memory_pool_alloc (task->task_pool, sizeof (struct statfile_result));
res->symbols = g_list_prepend (NULL, st->alias);
res->weight = st->classifier->add_result_func (0, weight);
g_hash_table_insert (data->metrics, st->metric, res);
}
else {
*w += weight * st->weight;
res->symbols = g_list_prepend (NULL, st->alias);
res->weight = st->classifier->add_result_func (res->weight, weight);
}
}
@@ -410,10 +420,10 @@ statfiles_results_callback (gpointer key, gpointer value, void *arg)
struct worker_task *task = (struct worker_task *)arg;
struct metric_result *metric_res;
struct metric *metric;
double w;
struct statfile_result *res = (struct statfile_result *)value;
GList *cur_symbol;

metric_res = g_hash_table_lookup (task->results, (char *)key);
w = *(double *)value;

metric = g_hash_table_lookup (task->worker->srv->cfg->metrics, (char *)key);
if (metric == NULL) {
@@ -426,13 +436,19 @@ statfiles_results_callback (gpointer key, gpointer value, void *arg)
metric_res->symbols = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_destroy, metric_res->symbols);
metric_res->metric = metric;
metric_res->score = w;
g_hash_table_insert (task->results, key, metric_res);
metric_res->score = res->weight;
g_hash_table_insert (task->results, metric->name, metric_res);
}
else {
metric_res->score += w;
metric_res->score += res->weight;
}

cur_symbol = g_list_first (res->symbols);
while (cur_symbol) {
msg_debug ("statfiles_results_callback: insert symbol %s to metric %s", (char *)cur_symbol->data, metric->name);
g_hash_table_insert (metric_res->symbols, (char *)cur_symbol->data, GSIZE_TO_POINTER (1));
cur_symbol = g_list_next (cur_symbol);
}
g_hash_table_insert (metric_res->symbols, key, GSIZE_TO_POINTER (1));

}


+ 2
- 1
src/protocol.c View File

@@ -401,7 +401,8 @@ show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_dat
cur = g_list_next (cur);
}
g_list_free (symbols);
outbuf[r++] = '\r'; outbuf[r] = '\n';
msg_debug ("show_metric_symbols: write symbols line: %s", outbuf);
outbuf[r++] = '\r'; outbuf[r++] = '\n';
bufferevent_write (task->bev, outbuf, r);
}


+ 2
- 0
src/worker.c View File

@@ -230,6 +230,8 @@ accept_socket (int fd, short what, void *arg)
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
worker->srv->stat->connections_count ++;

/* Read event */

Loading…
Cancel
Save