@@ -195,6 +195,7 @@ struct rspamd_worker_conf { | |||
gpointer *ctx; /**< worker's context */ | |||
ucl_object_t *options; /**< other worker's options */ | |||
struct rspamd_worker_lua_script *scripts; /**< registered lua scripts */ | |||
ref_entry_t ref; | |||
}; | |||
enum rspamd_log_format_type { |
@@ -28,6 +28,7 @@ | |||
#include "unix-std.h" | |||
#include "libutil/multipattern.h" | |||
#include "monitored.h" | |||
#include "ref.h" | |||
#include <math.h> | |||
#define DEFAULT_SCORE 10.0 | |||
@@ -939,22 +940,32 @@ rspamd_config_new_group (struct rspamd_config *cfg, struct metric *metric, | |||
return gr; | |||
} | |||
static void | |||
rspamd_worker_conf_dtor (struct rspamd_worker_conf *wcf) | |||
{ | |||
if (wcf) { | |||
g_queue_free (wcf->active_workers); | |||
g_hash_table_unref (wcf->params); | |||
g_slice_free1 (sizeof (*wcf), wcf); | |||
} | |||
} | |||
static void | |||
rspamd_worker_conf_cfg_fin (gpointer d) | |||
{ | |||
struct rspamd_worker_conf *wcf = d; | |||
REF_RELEASE (wcf); | |||
} | |||
struct rspamd_worker_conf * | |||
rspamd_config_new_worker (struct rspamd_config *cfg, | |||
struct rspamd_worker_conf *c) | |||
{ | |||
if (c == NULL) { | |||
c = | |||
rspamd_mempool_alloc0 (cfg->cfg_pool, | |||
sizeof (struct rspamd_worker_conf)); | |||
c = g_slice_alloc0 (sizeof (struct rspamd_worker_conf)); | |||
c->params = g_hash_table_new (rspamd_str_hash, rspamd_str_equal); | |||
c->active_workers = g_queue_new (); | |||
rspamd_mempool_add_destructor (cfg->cfg_pool, | |||
(rspamd_mempool_destruct_t)g_hash_table_destroy, | |||
c->params); | |||
rspamd_mempool_add_destructor (cfg->cfg_pool, | |||
(rspamd_mempool_destruct_t)g_queue_free, | |||
c->active_workers); | |||
#ifdef HAVE_SC_NPROCESSORS_ONLN | |||
c->count = sysconf (_SC_NPROCESSORS_ONLN); | |||
#else | |||
@@ -962,6 +973,10 @@ rspamd_config_new_worker (struct rspamd_config *cfg, | |||
#endif | |||
c->rlimit_nofile = 0; | |||
c->rlimit_maxcore = 0; | |||
REF_INIT_RETAIN (c, rspamd_worker_conf_dtor); | |||
rspamd_mempool_add_destructor (cfg->cfg_pool, | |||
rspamd_worker_conf_cfg_fin, c); | |||
} | |||
return c; |
@@ -550,8 +550,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, | |||
wrk->srv = rspamd_main; | |||
wrk->type = cf->type; | |||
wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf)); | |||
memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf)); | |||
wrk->cf = cf; | |||
REF_RETAIN (cf); | |||
wrk->index = index; | |||
wrk->ctx = cf->ctx; | |||
wrk->finish_actions = g_ptr_array_new (); |
@@ -173,6 +173,8 @@ start_log_helper (struct rspamd_worker *worker) | |||
{ | |||
struct log_helper_ctx *ctx = worker->ctx; | |||
gssize r = -1; | |||
gint nscripts = 0; | |||
struct rspamd_worker_lua_script *tmp; | |||
static struct rspamd_srv_command srv_cmd; | |||
ctx->ev_base = rspamd_prepare_worker (worker, | |||
@@ -183,6 +185,9 @@ start_log_helper (struct rspamd_worker *worker) | |||
ctx->scripts = worker->cf->scripts; | |||
ctx->L = ctx->cfg->lua_state; | |||
DL_COUNT (worker->cf->scripts, tmp, nscripts); | |||
msg_info ("started log_helper worker with %d scripts", nscripts); | |||
#ifdef HAVE_SOCK_SEQPACKET | |||
r = socketpair (AF_LOCAL, SOCK_SEQPACKET, 0, ctx->pair); | |||
#endif |
@@ -2009,7 +2009,6 @@ lua_config_register_worker_script (lua_State *L) | |||
for (cur = g_list_first (cfg->workers); cur != NULL; cur = g_list_next (cur)) { | |||
cf = cur->data; | |||
wtype = g_quark_to_string (cf->type); | |||
if (g_ascii_strcasecmp (wtype, worker_type) == 0) { |
@@ -559,7 +559,7 @@ else | |||
if opts['train']['max_epoch'] then | |||
max_epoch = opts['train']['max_epoch'] | |||
end | |||
cfg:register_worker_script("log_helper", | |||
local ret = cfg:register_worker_script("log_helper", | |||
function(score, req_score, results, cf, id, extra) | |||
-- map (snd x) (filter (fst x == module_id) extra) | |||
local extra_fann = map(function(e) return e[2] end, | |||
@@ -572,6 +572,10 @@ else | |||
opts['train'], extra_fann) | |||
end | |||
end) | |||
if not ret then | |||
rspamd_logger.errx(cfg, 'cannot find worker "log_helper"') | |||
end | |||
end) | |||
rspamd_plugins["fann_score"] = { | |||
log_callback = function(task) |
@@ -692,7 +692,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) | |||
WTERMSIG (res) == SIGKILL ? "hardly" : "softly"); | |||
event_del (&w->srv_ev); | |||
g_ptr_array_free (w->finish_actions, TRUE); | |||
g_free (w->cf); | |||
REF_RELEASE (w->cf); | |||
g_free (w); | |||
return TRUE; |