gpointer *ctx; /**< worker's context */
ucl_object_t *options; /**< other worker's options */
struct rspamd_worker_lua_script *scripts; /**< registered lua scripts */
+ ref_entry_t ref;
};
enum rspamd_log_format_type {
#include "unix-std.h"
#include "libutil/multipattern.h"
#include "monitored.h"
+#include "ref.h"
#include <math.h>
#define DEFAULT_SCORE 10.0
return gr;
}
+static void
+rspamd_worker_conf_dtor (struct rspamd_worker_conf *wcf)
+{
+ if (wcf) {
+ g_queue_free (wcf->active_workers);
+ g_hash_table_unref (wcf->params);
+ g_slice_free1 (sizeof (*wcf), wcf);
+ }
+}
+
+static void
+rspamd_worker_conf_cfg_fin (gpointer d)
+{
+ struct rspamd_worker_conf *wcf = d;
+
+ REF_RELEASE (wcf);
+}
+
struct rspamd_worker_conf *
rspamd_config_new_worker (struct rspamd_config *cfg,
struct rspamd_worker_conf *c)
{
if (c == NULL) {
- c =
- rspamd_mempool_alloc0 (cfg->cfg_pool,
- sizeof (struct rspamd_worker_conf));
+ c = g_slice_alloc0 (sizeof (struct rspamd_worker_conf));
c->params = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
c->active_workers = g_queue_new ();
- rspamd_mempool_add_destructor (cfg->cfg_pool,
- (rspamd_mempool_destruct_t)g_hash_table_destroy,
- c->params);
- rspamd_mempool_add_destructor (cfg->cfg_pool,
- (rspamd_mempool_destruct_t)g_queue_free,
- c->active_workers);
#ifdef HAVE_SC_NPROCESSORS_ONLN
c->count = sysconf (_SC_NPROCESSORS_ONLN);
#else
#endif
c->rlimit_nofile = 0;
c->rlimit_maxcore = 0;
+
+ REF_INIT_RETAIN (c, rspamd_worker_conf_dtor);
+ rspamd_mempool_add_destructor (cfg->cfg_pool,
+ rspamd_worker_conf_cfg_fin, c);
}
return c;
wrk->srv = rspamd_main;
wrk->type = cf->type;
- wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf));
- memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf));
+ wrk->cf = cf;
+ REF_RETAIN (cf);
wrk->index = index;
wrk->ctx = cf->ctx;
wrk->finish_actions = g_ptr_array_new ();
{
struct log_helper_ctx *ctx = worker->ctx;
gssize r = -1;
+ gint nscripts = 0;
+ struct rspamd_worker_lua_script *tmp;
static struct rspamd_srv_command srv_cmd;
ctx->ev_base = rspamd_prepare_worker (worker,
ctx->scripts = worker->cf->scripts;
ctx->L = ctx->cfg->lua_state;
+ DL_COUNT (worker->cf->scripts, tmp, nscripts);
+ msg_info ("started log_helper worker with %d scripts", nscripts);
+
#ifdef HAVE_SOCK_SEQPACKET
r = socketpair (AF_LOCAL, SOCK_SEQPACKET, 0, ctx->pair);
#endif
for (cur = g_list_first (cfg->workers); cur != NULL; cur = g_list_next (cur)) {
cf = cur->data;
-
wtype = g_quark_to_string (cf->type);
if (g_ascii_strcasecmp (wtype, worker_type) == 0) {
if opts['train']['max_epoch'] then
max_epoch = opts['train']['max_epoch']
end
- cfg:register_worker_script("log_helper",
+ local ret = cfg:register_worker_script("log_helper",
function(score, req_score, results, cf, id, extra)
-- map (snd x) (filter (fst x == module_id) extra)
local extra_fann = map(function(e) return e[2] end,
opts['train'], extra_fann)
end
end)
+
+ if not ret then
+ rspamd_logger.errx(cfg, 'cannot find worker "log_helper"')
+ end
end)
rspamd_plugins["fann_score"] = {
log_callback = function(task)
WTERMSIG (res) == SIGKILL ? "hardly" : "softly");
event_del (&w->srv_ev);
g_ptr_array_free (w->finish_actions, TRUE);
- g_free (w->cf);
+ REF_RELEASE (w->cf);
g_free (w);
return TRUE;