Browse Source

Fix statistics processing

tags/1.1.0
Vsevolod Stakhov 8 years ago
parent
commit
e5676fe57a
4 changed files with 135 additions and 386 deletions
  1. 2
    3
      src/libserver/task.h
  2. 3
    0
      src/libstat/stat_config.c
  3. 1
    0
      src/libstat/stat_internal.h
  4. 129
    383
      src/libstat/stat_process.c

+ 2
- 3
src/libserver/task.h View File

@@ -149,8 +149,7 @@ struct rspamd_task {
GHashTable *raw_headers; /**< list of raw headers */
GHashTable *results; /**< hash table of metric_result indexed by
* metric's name */
GHashTable *tokens; /**< hash table of tokens indexed by tokenizer
* pointer */
GPtrArray *tokens; /**< statistics tokens */
InternetAddressList *rcpt_mime; /**< list of all recipients */
InternetAddressList *rcpt_envelope; /**< list of all recipients */
InternetAddressList *from_mime;
@@ -158,7 +157,7 @@ struct rspamd_task {

GList *messages; /**< list of messages that would be reported */
struct rspamd_re_runtime *re_rt; /**< regexp runtime */
GList *cl_runtimes; /**< classifiers runtime */
GPtrArray *stat_runtimes; /**< backend runtime */
struct rspamd_config *cfg; /**< pointer to config object */
GError *err;
rspamd_mempool_t *task_pool; /**< memory pool for task */

+ 3
- 0
src/libstat/stat_config.c View File

@@ -135,6 +135,9 @@ rspamd_stat_init (struct rspamd_config *cfg)
cl->cfg = clf;
cl->ctx = stat_ctx;
cl->statfiles_ids = g_array_new (FALSE, FALSE, sizeof (gint));
cl->subrs = rspamd_stat_get_classifier (clf->name);
g_assert (cl->subrs != NULL);
cl->subrs->init_func (cfg->cfg_pool, cl);

/* Init classifier cache */
if (clf->opts) {

+ 1
- 0
src/libstat/stat_internal.h View File

@@ -46,6 +46,7 @@ struct rspamd_classifier {
gulong spam_learns;
gulong ham_learns;
struct rspamd_classifier_config *cfg;
struct rspamd_stat_classifier *subrs;
};

struct rspamd_statfile {

+ 129
- 383
src/libstat/stat_process.c View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2015, Vsevolod Stakhov
/* Copyright (c) 2015-2016, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,19 +37,8 @@

static const gint similarity_treshold = 80;

#if 0
struct preprocess_cb_data {
struct rspamd_task *task;
GList *classifier_runtimes;
struct rspamd_tokenizer_runtime *tok;
guint results_count;
gboolean unlearn;
gboolean spam;
};

static void
rspamd_stat_tokenize_header (struct rspamd_task *task,
struct rspamd_tokenizer_runtime *tok,
const gchar *name, const gchar *prefix, GArray *ar)
{
struct raw_header *rh, *cur;
@@ -82,8 +71,8 @@ rspamd_stat_tokenize_header (struct rspamd_task *task,
}

static void
rspamd_stat_tokenize_parts_metadata (struct rspamd_task *task,
struct rspamd_tokenizer_runtime *tok)
rspamd_stat_tokenize_parts_metadata (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task)
{
struct rspamd_image *img;
struct mime_part *part;
@@ -165,16 +154,17 @@ rspamd_stat_tokenize_parts_metadata (struct rspamd_task *task,
cur = g_list_first (task->cfg->classify_headers);

while (cur) {
rspamd_stat_tokenize_header (task, tok, cur->data, "UA:", ar);
rspamd_stat_tokenize_header (task, cur->data, "UA:", ar);

cur = g_list_next (cur);
}

tok->tokenizer->tokenize_func (tok,
st_ctx->tokenizer->tokenize_func (st_ctx,
task->task_pool,
ar,
TRUE,
"META:");
"META:",
task->tokens);

g_array_free (ar, TRUE);
}
@@ -184,24 +174,36 @@ rspamd_stat_tokenize_parts_metadata (struct rspamd_task *task,
*/
static void
rspamd_stat_process_tokenize (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task, struct rspamd_tokenizer_runtime *tok)
struct rspamd_task *task)
{
struct mime_text_part *part;
GArray *words;
gchar *sub;
guint i;
guint i, reserved_len = 0;
gint *pdiff;
gboolean compat;

compat = tok->tokenizer->is_compat (tok);
for (i = 0; i < task->text_parts->len; i++) {
part = g_ptr_array_index (task->text_parts, i);

if (!IS_PART_EMPTY (part) && part->normalized_words != NULL) {
reserved_len += part->normalized_words->len;
}
/* XXX: normal window size */
reserved_len += 5;
}

task->tokens = g_ptr_array_sized_new (reserved_len);
rspamd_mempool_add_destructor (task->task_pool,
rspamd_ptr_array_free_hard, task->tokens);
pdiff = rspamd_mempool_get_variable (task->task_pool, "parts_distance");

for (i = 0; i < task->text_parts->len; i ++) {
part = g_ptr_array_index (task->text_parts, i);

if (!IS_PART_EMPTY (part) && part->normalized_words != NULL) {
tok->tokenizer->tokenize_func (tok, task->task_pool,
part->normalized_words, IS_PART_UTF (part), NULL);
st_ctx->tokenizer->tokenize_func (st_ctx, task->task_pool,
part->normalized_words, IS_PART_UTF (part),
NULL, task->tokens);
}


@@ -220,324 +222,118 @@ rspamd_stat_process_tokenize (struct rspamd_stat_ctx *st_ctx,
}

if (sub != NULL) {
words = rspamd_tokenize_text (sub, strlen (sub), TRUE, NULL, NULL, compat,
words = rspamd_tokenize_text (sub, strlen (sub), TRUE, NULL, NULL, FALSE,
NULL);
if (words != NULL) {
tok->tokenizer->tokenize_func (tok,
st_ctx->tokenizer->tokenize_func (st_ctx,
task->task_pool,
words,
TRUE,
"SUBJECT");
"SUBJECT",
task->tokens);
g_array_free (words, TRUE);
}
}

rspamd_stat_tokenize_parts_metadata (task, tok);
rspamd_stat_tokenize_parts_metadata (st_ctx, task);
}

static struct rspamd_tokenizer_runtime *
rspamd_stat_get_tokenizer_runtime (struct rspamd_tokenizer_config *cf,
struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task,
struct rspamd_classifier_runtime *cl_runtime,
gpointer conf, gsize conf_len)
static void
rspamd_stat_preprocess (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task, gboolean learn)
{
struct rspamd_tokenizer_runtime *tok = NULL;
const gchar *name;

if (cf == NULL || cf->name == NULL) {
name = RSPAMD_DEFAULT_TOKENIZER;
cf->name = name;
}
else {
name = cf->name;
}

tok = rspamd_mempool_alloc (task->task_pool, sizeof (*tok));
tok->tokenizer = rspamd_stat_get_tokenizer (name);
tok->tkcf = cf;

if (tok->tokenizer == NULL) {
return NULL;
}

if (!tok->tokenizer->load_config (task->task_pool, tok, conf, conf_len)) {
return NULL;
}
guint i;
struct rspamd_statfile *st;
gpointer bk_run;

tok->tokens = g_tree_new (token_node_compare_func);
rspamd_stat_process_tokenize (st_ctx, task);
task->stat_runtimes = g_ptr_array_sized_new (st_ctx->statfiles->len);
rspamd_mempool_add_destructor (task->task_pool,
(rspamd_mempool_destruct_t)g_tree_destroy, tok->tokens);
tok->name = name;
rspamd_stat_process_tokenize (st_ctx, task, tok);
cl_runtime->tok = tok;

return tok;
}
rspamd_ptr_array_free_hard, task->stat_runtimes);

static gboolean
preprocess_init_stat_token (gpointer k, gpointer v, gpointer d)
{
rspamd_token_t *t = (rspamd_token_t *)v;
struct preprocess_cb_data *cbdata = (struct preprocess_cb_data *)d;
struct rspamd_statfile_runtime *st_runtime;
struct rspamd_classifier_runtime *cl_runtime;
struct rspamd_token_result *res;
GList *cur, *curst;
struct rspamd_task *task;
gint i = 0;
for (i = 0; i < st_ctx->statfiles->len; i ++) {
st = g_ptr_array_index (st_ctx->statfiles, i);
g_assert (st != NULL);

task = cbdata->task;
t->results = g_array_sized_new (FALSE, TRUE,
sizeof (struct rspamd_token_result), cbdata->results_count);
g_array_set_size (t->results, cbdata->results_count);
rspamd_mempool_add_destructor (cbdata->task->task_pool,
rspamd_array_free_hard, t->results);

cur = g_list_first (cbdata->classifier_runtimes);
bk_run = st->backend->runtime (task, st->stcf, learn, st->bkcf);

while (cur) {
cl_runtime = (struct rspamd_classifier_runtime *)cur->data;

if (cl_runtime->clcf->min_tokens > 0 &&
(guint32)g_tree_nnodes (cbdata->tok->tokens) < cl_runtime->clcf->min_tokens) {
/* Skip this classifier */
cur = g_list_next (cur);
cl_runtime->skipped = TRUE;
continue;
if (bk_run == NULL) {
msg_err_task ("cannot init backend %s for statfile %s",
st->backend->name, st->stcf->symbol);
}

curst = cl_runtime->st_runtime;

while (curst) {

st_runtime = (struct rspamd_statfile_runtime *)curst->data;
res = &g_array_index (t->results, struct rspamd_token_result, i);
res->cl_runtime = cl_runtime;
res->st_runtime = st_runtime;

if (cl_runtime->backend->process_token (cbdata->task, t, res,
cl_runtime->backend->ctx)) {

if (cl_runtime->clcf->max_tokens > 0 &&
cl_runtime->processed_tokens > cl_runtime->clcf->max_tokens) {
msg_debug_task ("message contains more tokens than allowed for %s classifier: "
"%uL > %ud", cl_runtime->clcf->name,
cl_runtime->processed_tokens,
cl_runtime->clcf->max_tokens);

return TRUE;
}
}

i ++;
curst = g_list_next (curst);
}

cur = g_list_next (cur);
g_ptr_array_add (task->stat_runtimes, bk_run);
}


return FALSE;
}

static GList*
rspamd_stat_preprocess (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task,
lua_State *L,
gint op,
gboolean spam,
const gchar *classifier,
GError **err)
static void
rspamd_stat_backends_process (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task)
{
struct rspamd_classifier_config *clcf;
struct rspamd_statfile_config *stcf;
struct rspamd_classifier_runtime *cl_runtime;
struct rspamd_statfile_runtime *st_runtime;
struct rspamd_stat_backend *bk;
gpointer backend_runtime, tok_config;
GList *cur, *st_list = NULL, *curst;
GList *cl_runtimes = NULL;
guint result_size = 0, start_pos = 0, end_pos = 0;
gsize conf_len;
struct preprocess_cb_data cbdata;

cur = g_list_first (task->cfg->classifiers);

while (cur) {
clcf = (struct rspamd_classifier_config *)cur->data;
st_list = NULL;

if (classifier != NULL &&
(clcf->name == NULL || strcmp (clcf->name, classifier) != 0)) {
/* Skip this classifier */
msg_debug_task ("skip classifier %s, as we are requested to check %s only",
clcf->name, classifier);
cur = g_list_next (cur);
continue;
}

if (clcf->pre_callbacks != NULL) {
st_list = rspamd_lua_call_cls_pre_callbacks (clcf, task, FALSE,
FALSE, L);
}
if (st_list != NULL) {
rspamd_mempool_add_destructor (task->task_pool,
(rspamd_mempool_destruct_t)g_list_free, st_list);
}
else {
st_list = clcf->statfiles;
}

/* Now init runtime values */
cl_runtime = rspamd_mempool_alloc0 (task->task_pool, sizeof (*cl_runtime));
cl_runtime->cl = rspamd_stat_get_classifier (clcf->classifier);

if (cl_runtime->cl == NULL) {
g_set_error (err, rspamd_stat_quark(), 500,
"classifier %s is not defined", clcf->classifier);
g_list_free (cl_runtimes);
return NULL;
}

cl_runtime->clcf = clcf;

bk = rspamd_stat_get_backend (clcf->backend);
if (bk == NULL) {
g_set_error (err, rspamd_stat_quark(), 500,
"backend %s is not defined", clcf->backend);
g_list_free (cl_runtimes);
return NULL;
}

cl_runtime->backend = bk;

curst = st_list;
while (curst != NULL) {
stcf = (struct rspamd_statfile_config *)curst->data;

/* On learning skip statfiles that do not belong to class */
if (op == RSPAMD_LEARN_OP && (spam != stcf->is_spam)) {
curst = g_list_next (curst);
continue;
}

backend_runtime = bk->runtime (task, stcf, op != RSPAMD_CLASSIFY_OP,
bk->ctx);

if (backend_runtime == NULL) {
if (op != RSPAMD_CLASSIFY_OP) {
/* Assume backend absence as fatal error */
g_set_error (err, rspamd_stat_quark(), 500,
"cannot open backend for statfile %s", stcf->symbol);
g_list_free (cl_runtimes);

return NULL;
}
else {
/* Just skip this element */
msg_warn ("backend of type %s does not exist: %s",
clcf->backend, stcf->symbol);
curst = g_list_next (curst);
continue;
}
}

tok_config = bk->load_tokenizer_config (backend_runtime,
&conf_len);

if (cl_runtime->tok == NULL) {
cl_runtime->tok = rspamd_stat_get_tokenizer_runtime (clcf->tokenizer,
st_ctx, task, cl_runtime, tok_config, conf_len);

if (cl_runtime->tok == NULL) {
g_set_error (err, rspamd_stat_quark(), 500,
"cannot initialize tokenizer for statfile %s", stcf->symbol);
g_list_free (cl_runtimes);
guint i;
struct rspamd_statfile *st;
struct rspamd_classifier *cl;
gpointer bk_run;

return NULL;
}
}
g_assert (task->stat_runtimes != NULL);

if (!cl_runtime->tok->tokenizer->compatible_config (
cl_runtime->tok, tok_config, conf_len)) {
g_set_error (err, rspamd_stat_quark(), 500,
"incompatible tokenizer for statfile %s", stcf->symbol);
g_list_free (cl_runtimes);
for (i = 0; i < st_ctx->statfiles->len; i++) {
st = g_ptr_array_index (st_ctx->statfiles, i);
bk_run = g_ptr_array_index (task->stat_runtimes, i);
cl = st->classifier;
g_assert (st != NULL);

return NULL;
}
if (bk_run != NULL) {
st->backend->process_tokens (task, task->tokens, i, bk_run);

st_runtime = rspamd_mempool_alloc0 (task->task_pool,
sizeof (*st_runtime));
st_runtime->st = stcf;
st_runtime->backend_runtime = backend_runtime;

if (stcf->is_spam) {
cl_runtime->total_spam += bk->total_learns (task, backend_runtime,
bk->ctx);
if (st->stcf->is_spam) {
cl->spam_learns = st->backend->total_learns (task,
bk_run,
st_ctx);
}
else {
cl_runtime->total_ham += bk->total_learns (task, backend_runtime,
bk->ctx);
cl->ham_learns = st->backend->total_learns (task,
bk_run,
st_ctx);
}

cl_runtime->st_runtime = g_list_prepend (cl_runtime->st_runtime,
st_runtime);
result_size ++;

curst = g_list_next (curst);
end_pos ++;
}

if (cl_runtime->st_runtime != NULL) {
rspamd_mempool_add_destructor (task->task_pool,
(rspamd_mempool_destruct_t)g_list_free,
cl_runtime->st_runtime);
cl_runtimes = g_list_prepend (cl_runtimes, cl_runtime);
}
}
}

/* Set positions in the results array */
cl_runtime->start_pos = start_pos;
cl_runtime->end_pos = end_pos;
static void
rspamd_stat_backends_post_process (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task)
{
guint i;
struct rspamd_statfile *st;
gpointer bk_run;

msg_debug_task ("added runtime for %s classifier from %ud to %ud",
clcf->name, start_pos, end_pos);
g_assert (task->stat_runtimes != NULL);

start_pos = end_pos;
for (i = 0; i < st_ctx->statfiles->len; i++) {
st = g_ptr_array_index (st_ctx->statfiles, i);
bk_run = g_ptr_array_index (task->stat_runtimes, i);
g_assert (st != NULL);

/* Next classifier */
cur = g_list_next (cur);
if (bk_run != NULL) {
st->backend->finalize_process (task, bk_run, st_ctx);
}
}
}

if (cl_runtimes != NULL) {
/* Reverse list as we have used g_list_prepend */
cl_runtimes = g_list_reverse (cl_runtimes);
rspamd_mempool_add_destructor (task->task_pool,
(rspamd_mempool_destruct_t) g_list_free,
cl_runtimes);
cur = g_list_first (cl_runtimes);

while (cur) {
cl_runtime = cur->data;
static void
rspamd_stat_classifiers_process (struct rspamd_stat_ctx *st_ctx,
struct rspamd_task *task)
{
guint i;
struct rspamd_classifier *cl;

cbdata.results_count = result_size;
cbdata.classifier_runtimes = cl_runtimes;
cbdata.task = task;
cbdata.tok = cl_runtime->tok;
g_tree_foreach (cbdata.tok->tokens, preprocess_init_stat_token,
&cbdata);
for (i = 0; i < st_ctx->classifiers->len; i++) {
cl = g_ptr_array_index (st_ctx->classifiers, i);
g_assert (cl != NULL);

cur = g_list_next (cur);
}
cl->subrs->classify_func (cl, task->tokens, task);
}
else if (classifier != NULL) {
/* We likely cannot find any classifier with this name */
g_set_error (err, rspamd_stat_quark (), 404,
"cannot find classifier %s", classifier);
}

return cl_runtimes;
}

rspamd_stat_result_t
@@ -545,102 +341,30 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, guint stage,
GError **err)
{
struct rspamd_stat_ctx *st_ctx;
struct rspamd_statfile_runtime *st_run;
struct rspamd_classifier_runtime *cl_run;
GList *cl_runtimes;
GList *cur, *curst;
gboolean ret = RSPAMD_STAT_PROCESS_OK;
rspamd_stat_result_t ret = RSPAMD_STAT_PROCESS_OK;

st_ctx = rspamd_stat_get_ctx ();
g_assert (st_ctx != NULL);
cl_runtimes = task->cl_runtimes;

if (stage == RSPAMD_TASK_STAGE_CLASSIFIERS_PRE) {
/* Initialize classifiers and statfiles runtime */
if (task->cl_runtimes == NULL) {
if ((cl_runtimes = rspamd_stat_preprocess (st_ctx, task, L,
RSPAMD_CLASSIFY_OP, FALSE, NULL, err)) == NULL) {
return RSPAMD_STAT_PROCESS_OK;
}

task->cl_runtimes = cl_runtimes;
cur = cl_runtimes;

/* Finalize backend so it can load tokens delayed if needed */
while (cur) {
cl_run = (struct rspamd_classifier_runtime *) cur->data;
curst = cl_run->st_runtime;

while (curst) {
st_run = curst->data;
cl_run->backend->finalize_process (task,
st_run->backend_runtime,
cl_run->backend->ctx);
curst = g_list_next (curst);
}

cur = g_list_next (cur);
}
}
if (stage == RSPAMD_TASK_STAGE_CLASSIFIERS_PRE) {
/* Preprocess tokens */
rspamd_stat_preprocess (st_ctx, task, FALSE);
}
else if (stage == RSPAMD_TASK_STAGE_CLASSIFIERS) {
cur = cl_runtimes;

/* The first stage of classification */
while (cur) {
cl_run = (struct rspamd_classifier_runtime *) cur->data;
cl_run->stage = RSPAMD_STAT_STAGE_PRE;

if (cl_run->cl) {
cl_run->clctx = cl_run->cl->init_func (task->task_pool,
cl_run->clcf);

if (cl_run->clctx != NULL) {
cl_run->cl->classify_func (cl_run->clctx, cl_run->tok->tokens,
cl_run, task);
}
}

cur = g_list_next (cur);
}
/* Process backends */
rspamd_stat_backends_process (st_ctx, task);
}
else if (stage == RSPAMD_TASK_STAGE_CLASSIFIERS_POST) {
cur = cl_runtimes;
/* The second stage of classification */
while (cur) {
cl_run = (struct rspamd_classifier_runtime *) cur->data;
cl_run->stage = RSPAMD_STAT_STAGE_POST;

if (cl_run->skipped) {
cur = g_list_next (cur);
continue;
}

cl_run = (struct rspamd_classifier_runtime *) cur->data;
cl_run->stage = RSPAMD_STAT_STAGE_POST;

if (cl_run->skipped) {
cur = g_list_next (cur);
continue;
}

if (cl_run->cl) {
if (cl_run->clctx != NULL) {
if (cl_run->cl->classify_func (cl_run->clctx,
cl_run->tok->tokens,
cl_run, task)) {
ret = RSPAMD_STAT_PROCESS_OK;
}
}
}

cur = g_list_next (cur);
}
/* Process classifiers */
rspamd_stat_backends_post_process (st_ctx, task);
rspamd_stat_classifiers_process (st_ctx, task);
}

return ret;
}

#if 0
static gboolean
rspamd_stat_learn_token (gpointer k, gpointer v, gpointer d)
{
@@ -911,4 +635,26 @@ rspamd_stat_result_t rspamd_stat_statistics (struct rspamd_task *task,

return RSPAMD_STAT_PROCESS_OK;
}
#else
/* TODO: finish learning */
rspamd_stat_result_t rspamd_stat_learn (struct rspamd_task *task,
gboolean spam, lua_State *L, const gchar *classifier,
GError **err)
{
return RSPAMD_STAT_PROCESS_ERROR;
}

/**
* Get the overall statistics for all statfile backends
* @param cfg configuration
* @param total_learns the total number of learns is stored here
* @return array of statistical information
*/
rspamd_stat_result_t rspamd_stat_statistics (struct rspamd_task *task,
struct rspamd_config *cfg,
guint64 *total_learns,
ucl_object_t **res)
{
return RSPAMD_STAT_PROCESS_ERROR;
}
#endif

Loading…
Cancel
Save