/*- * Copyright 2016 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include "stat_api.h" #include "rspamd.h" #include "cfg_rcl.h" #include "stat_internal.h" #include "lua/lua_common.h" static struct rspamd_stat_ctx *stat_ctx = NULL; static struct rspamd_stat_classifier lua_classifier = { .name = "lua", .init_func = lua_classifier_init, .classify_func = lua_classifier_classify, .learn_spam_func = lua_classifier_learn_spam, .fin_func = NULL, }; static struct rspamd_stat_classifier stat_classifiers[] = { { .name = "bayes", .init_func = bayes_init, .classify_func = bayes_classify, .learn_spam_func = bayes_learn_spam, .fin_func = bayes_fin, } }; static struct rspamd_stat_tokenizer stat_tokenizers[] = { { .name = "osb-text", .get_config = rspamd_tokenizer_osb_get_config, .tokenize_func = rspamd_tokenizer_osb, }, { .name = "osb", .get_config = rspamd_tokenizer_osb_get_config, .tokenize_func = rspamd_tokenizer_osb, }, }; #define RSPAMD_STAT_BACKEND_ELT(nam, eltn) { \ .name = #nam, \ .init = rspamd_##eltn##_init, \ .runtime = rspamd_##eltn##_runtime, \ .process_tokens = rspamd_##eltn##_process_tokens, \ .finalize_process = rspamd_##eltn##_finalize_process, \ .learn_tokens = rspamd_##eltn##_learn_tokens, \ .finalize_learn = rspamd_##eltn##_finalize_learn, \ .total_learns = rspamd_##eltn##_total_learns, \ .inc_learns = rspamd_##eltn##_inc_learns, \ .dec_learns = rspamd_##eltn##_dec_learns, \ .get_stat = rspamd_##eltn##_get_stat, \ .load_tokenizer_config = rspamd_##eltn##_load_tokenizer_config, \ .close = rspamd_##eltn##_close \ } static struct rspamd_stat_backend stat_backends[] = { RSPAMD_STAT_BACKEND_ELT(mmap, mmaped_file), RSPAMD_STAT_BACKEND_ELT(sqlite3, sqlite3), #ifdef WITH_HIREDIS RSPAMD_STAT_BACKEND_ELT(redis, redis) #endif }; #define RSPAMD_STAT_CACHE_ELT(nam, eltn) { \ .name = #nam, \ .init = rspamd_stat_cache_##eltn##_init, \ .runtime = rspamd_stat_cache_##eltn##_runtime, \ .check = rspamd_stat_cache_##eltn##_check, \ .learn = rspamd_stat_cache_##eltn##_learn, \ .close = rspamd_stat_cache_##eltn##_close \ } static struct rspamd_stat_cache stat_caches[] = { RSPAMD_STAT_CACHE_ELT(sqlite3, sqlite3), #ifdef WITH_HIREDIS RSPAMD_STAT_CACHE_ELT(redis, redis), #endif }; void rspamd_stat_init (struct rspamd_config *cfg, struct ev_loop *ev_base) { GList *cur, *curst; struct rspamd_classifier_config *clf; struct rspamd_statfile_config *stf; struct rspamd_stat_backend *bk; struct rspamd_statfile *st; struct rspamd_classifier *cl; const ucl_object_t *cache_obj = NULL, *cache_name_obj; const gchar *cache_name = NULL; lua_State *L = cfg->lua_state; guint lua_classifiers_cnt = 0, i; gboolean skip_cache = FALSE; if (stat_ctx == NULL) { stat_ctx = g_malloc0 (sizeof (*stat_ctx)); } lua_getglobal (L, "rspamd_classifiers"); if (lua_type (L, -1) == LUA_TTABLE) { lua_pushnil (L); while (lua_next (L, -2) != 0) { lua_classifiers_cnt ++; lua_pop (L, 1); } } lua_pop (L, 1); stat_ctx->classifiers_count = G_N_ELEMENTS (stat_classifiers) + lua_classifiers_cnt; stat_ctx->classifiers_subrs = g_new0 (struct rspamd_stat_classifier, stat_ctx->classifiers_count); for (i = 0; i < G_N_ELEMENTS (stat_classifiers); i ++) { memcpy (&stat_ctx->classifiers_subrs[i], &stat_classifiers[i], sizeof (struct rspamd_stat_classifier)); } lua_getglobal (L, "rspamd_classifiers"); if (lua_type (L, -1) == LUA_TTABLE) { lua_pushnil (L); while (lua_next (L, -2) != 0) { lua_pushvalue (L, -2); memcpy (&stat_ctx->classifiers_subrs[i], &lua_classifier, sizeof (struct rspamd_stat_classifier)); stat_ctx->classifiers_subrs[i].name = g_strdup (lua_tostring (L, -1)); i ++; lua_pop (L, 2); } } lua_pop (L, 1); stat_ctx->backends_subrs = stat_backends; stat_ctx->backends_count = G_N_ELEMENTS (stat_backends); stat_ctx->tokenizers_subrs = stat_tokenizers; stat_ctx->tokenizers_count = G_N_ELEMENTS (stat_tokenizers); stat_ctx->caches_subrs = stat_caches; stat_ctx->caches_count = G_N_ELEMENTS (stat_caches); stat_ctx->cfg = cfg; stat_ctx->statfiles = g_ptr_array_new (); stat_ctx->classifiers = g_ptr_array_new (); stat_ctx->async_elts = g_queue_new (); stat_ctx->event_loop = ev_base; stat_ctx->lua_stat_tokens_ref = -1; /* Interact with lua_stat */ if (luaL_dostring (L, "return require \"lua_stat\"") != 0) { msg_err_config ("cannot require lua_stat: %s", lua_tostring (L, -1)); } else { if (lua_type (L, -1) != LUA_TTABLE) { msg_err_config ("lua stat must return " "table and not %s", lua_typename (L, lua_type (L, -1))); } else { lua_pushstring (L, "gen_stat_tokens"); lua_gettable (L, -2); if (lua_type (L, -1) != LUA_TFUNCTION) { msg_err_config ("gen_stat_tokens must return " "function and not %s", lua_typename (L, lua_type (L, -1))); } else { /* Call this function to obtain closure */ gint err_idx, ret; struct rspamd_config **pcfg; lua_pushcfunction (L, &rspamd_lua_traceback); err_idx = lua_gettop (L); lua_pushvalue (L, err_idx - 1); pcfg = lua_newuserdata (L, sizeof (*pcfg)); *pcfg = cfg; rspamd_lua_setclass (L, "rspamd{config}", -1); if ((ret = lua_pcall (L, 1, 1, err_idx)) != 0) { msg_err_config ("call to gen_stat_tokens lua " "script failed (%d): %s", ret, lua_tostring (L, -1)); } else { if (lua_type (L, -1) != LUA_TFUNCTION) { msg_err_config ("gen_stat_tokens invocation must return " "function and not %s", lua_typename (L, lua_type (L, -1))); } else { stat_ctx->lua_stat_tokens_ref = luaL_ref (L, LUA_REGISTRYINDEX); } } } } } /* Cleanup mess */ lua_settop (L, 0); /* Create statfiles from the classifiers */ cur = cfg->classifiers; while (cur) { bk = NULL; clf = cur->data; cl = g_malloc0 (sizeof (*cl)); cl->cfg = clf; cl->ctx = stat_ctx; cl->statfiles_ids = g_array_new (FALSE, FALSE, sizeof (gint)); cl->subrs = rspamd_stat_get_classifier (clf->classifier); if (cl->subrs == NULL) { g_free (cl); msg_err_config ("cannot init classifier type %s", clf->name); cur = g_list_next (cur); continue; } if (!cl->subrs->init_func (cfg, ev_base, cl)) { g_free (cl); msg_err_config ("cannot init classifier type %s", clf->name); cur = g_list_next (cur); continue; } if (!(clf->flags & RSPAMD_FLAG_CLASSIFIER_NO_BACKEND)) { bk = rspamd_stat_get_backend (clf->backend); if (bk == NULL) { msg_err_config ("cannot get backend of type %s, so disable classifier" " %s completely", clf->backend, clf->name); cur = g_list_next (cur); continue; } } /* XXX: * Here we get the first classifier tokenizer config as the only one * We NO LONGER support multiple tokenizers per rspamd instance */ if (stat_ctx->tkcf == NULL) { stat_ctx->tokenizer = rspamd_stat_get_tokenizer (clf->tokenizer->name); g_assert (stat_ctx->tokenizer != NULL); stat_ctx->tkcf = stat_ctx->tokenizer->get_config (cfg->cfg_pool, clf->tokenizer, NULL); } /* Init classifier cache */ cache_name = NULL; if (clf->opts) { cache_obj = ucl_object_lookup (clf->opts, "cache"); cache_name_obj = NULL; if (cache_obj && ucl_object_type (cache_obj) == UCL_NULL) { skip_cache = TRUE; } else { if (cache_obj) { cache_name_obj = ucl_object_lookup_any (cache_obj, "name", "type", NULL); } if (cache_name_obj) { cache_name = ucl_object_tostring (cache_name_obj); } } } if (cache_name == NULL && !skip_cache) { /* We assume that learn cache is the same as backend */ cache_name = clf->backend; } curst = clf->statfiles; while (curst) { stf = curst->data; st = g_malloc0 (sizeof (*st)); st->classifier = cl; st->stcf = stf; if (!(cl->cfg->flags & RSPAMD_FLAG_CLASSIFIER_NO_BACKEND) && bk) { st->backend = bk; st->bkcf = bk->init (stat_ctx, cfg, st); msg_debug_config ("added backend %s for symbol %s", bk->name, stf->symbol); } else { msg_debug_config ("added backend-less statfile for symbol %s", stf->symbol); } /* XXX: bad hack to pass statfiles configuration to cache */ if (cl->cache == NULL && !skip_cache) { cl->cache = rspamd_stat_get_cache (cache_name); g_assert (cl->cache != NULL); cl->cachecf = cl->cache->init (stat_ctx, cfg, st, cache_obj); if (cl->cachecf == NULL) { msg_err_config ("error adding cache %s for symbol %s", cl->cache->name, stf->symbol); cl->cache = NULL; } else { msg_debug_config ("added cache %s for symbol %s", cl->cache->name, stf->symbol); } } if (st->bkcf == NULL && !(cl->cfg->flags & RSPAMD_FLAG_CLASSIFIER_NO_BACKEND)) { msg_err_config ("cannot init backend %s for statfile %s", clf->backend, stf->symbol); g_free (st); } else { st->id = stat_ctx->statfiles->len; g_ptr_array_add (stat_ctx->statfiles, st); g_array_append_val (cl->statfiles_ids, st->id); } curst = curst->next; } g_ptr_array_add (stat_ctx->classifiers, cl); cur = cur->next; } } void rspamd_stat_close (void) { struct rspamd_classifier *cl; struct rspamd_statfile *st; struct rspamd_stat_ctx *st_ctx; struct rspamd_stat_async_elt *aelt; GList *cur; guint i, j; gint id; st_ctx = rspamd_stat_get_ctx (); g_assert (st_ctx != NULL); for (i = 0; i < st_ctx->classifiers->len; i ++) { cl = g_ptr_array_index (st_ctx->classifiers, i); for (j = 0; j < cl->statfiles_ids->len; j ++) { id = g_array_index (cl->statfiles_ids, gint, j); st = g_ptr_array_index (st_ctx->statfiles, id); if (!(st->classifier->cfg->flags & RSPAMD_FLAG_CLASSIFIER_NO_BACKEND)) { st->backend->close (st->bkcf); } g_free (st); } if (cl->cache && cl->cachecf) { cl->cache->close (cl->cachecf); } g_array_free (cl->statfiles_ids, TRUE); if (cl->subrs->fin_func) { cl->subrs->fin_func (cl); } g_free (cl); } cur = st_ctx->async_elts->head; while (cur) { aelt = cur->data; REF_RELEASE (aelt); cur = g_list_next (cur); } g_queue_free (stat_ctx->async_elts); g_ptr_array_free (st_ctx->statfiles, TRUE); g_ptr_array_free (st_ctx->classifiers, TRUE); if (st_ctx->lua_stat_tokens_ref != -1) { luaL_unref (st_ctx->cfg->lua_state, LUA_REGISTRYINDEX, st_ctx->lua_stat_tokens_ref); } g_free (st_ctx); /* Set global var to NULL */ stat_ctx = NULL; } struct rspamd_stat_ctx * rspamd_stat_get_ctx (void) { return stat_ctx; } struct rspamd_stat_classifier * rspamd_stat_get_classifier (const gchar *name) { guint i; if (name == NULL || name[0] == '\0') { name = RSPAMD_DEFAULT_CLASSIFIER; } for (i = 0; i < stat_ctx->classifiers_count; i ++) { if (strcmp (name, stat_ctx->classifiers_subrs[i].name) == 0) { return &stat_ctx->classifiers_subrs[i]; } } msg_err ("cannot find classifier named %s", name); return NULL; } struct rspamd_stat_backend * rspamd_stat_get_backend (const gchar *name) { guint i; if (name == NULL || name[0] == '\0') { name = RSPAMD_DEFAULT_BACKEND; } for (i = 0; i < stat_ctx->backends_count; i ++) { if (strcmp (name, stat_ctx->backends_subrs[i].name) == 0) { return &stat_ctx->backends_subrs[i]; } } msg_err ("cannot find backend named %s", name); return NULL; } struct rspamd_stat_tokenizer * rspamd_stat_get_tokenizer (const gchar *name) { guint i; if (name == NULL || name[0] == '\0') { name = RSPAMD_DEFAULT_TOKENIZER; } for (i = 0; i < stat_ctx->tokenizers_count; i ++) { if (strcmp (name, stat_ctx->tokenizers_subrs[i].name) == 0) { return &stat_ctx->tokenizers_subrs[i]; } } msg_err ("cannot find tokenizer named %s", name); return NULL; } struct rspamd_stat_cache * rspamd_stat_get_cache (const gchar *name) { guint i; if (name == NULL || name[0] == '\0') { name = RSPAMD_DEFAULT_CACHE; } for (i = 0; i < stat_ctx->caches_count; i++) { if (strcmp (name, stat_ctx->caches_subrs[i].name) == 0) { return &stat_ctx->caches_subrs[i]; } } msg_err ("cannot find cache named %s", name); return NULL; } static void rspamd_async_elt_dtor (struct rspamd_stat_async_elt *elt) { if (elt->cleanup) { elt->cleanup (elt, elt->ud); } ev_timer_stop (elt->event_loop, &elt->timer_ev); g_free (elt); } static void rspamd_async_elt_on_timer (EV_P_ ev_timer *w, int revents) { struct rspamd_stat_async_elt *elt = (struct rspamd_stat_async_elt *)w->data; gdouble jittered_time; if (elt->enabled) { elt->handler (elt, elt->ud); } jittered_time = rspamd_time_jitter (elt->timeout, 0); elt->timer_ev.repeat = jittered_time; ev_timer_again (EV_A_ w); } struct rspamd_stat_async_elt* rspamd_stat_ctx_register_async (rspamd_stat_async_handler handler, rspamd_stat_async_cleanup cleanup, gpointer d, gdouble timeout) { struct rspamd_stat_async_elt *elt; struct rspamd_stat_ctx *st_ctx; st_ctx = rspamd_stat_get_ctx (); g_assert (st_ctx != NULL); elt = g_malloc0 (sizeof (*elt)); elt->handler = handler; elt->cleanup = cleanup; elt->ud = d; elt->timeout = timeout; elt->event_loop = st_ctx->event_loop; REF_INIT_RETAIN (elt, rspamd_async_elt_dtor); /* Enabled by default */ if (st_ctx->event_loop) { elt->enabled = TRUE; /* * First we set timeval to zero as we want cb to be executed as * fast as possible */ elt->timer_ev.data = elt; ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, 0.1, 0.0); ev_timer_start (st_ctx->event_loop, &elt->timer_ev); } else { elt->enabled = FALSE; } g_queue_push_tail (st_ctx->async_elts, elt); return elt; }