diff options
-rw-r--r-- | src/controller.c | 25 | ||||
-rw-r--r-- | src/libmime/message.c | 7 | ||||
-rw-r--r-- | src/libserver/cfg_file.h | 1 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 11 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 22 | ||||
-rw-r--r-- | src/libutil/rrd.c | 70 | ||||
-rw-r--r-- | src/libutil/rrd.h | 1 | ||||
-rw-r--r-- | src/lua/lua_common.h | 10 | ||||
-rw-r--r-- | src/lua/lua_config.c | 61 | ||||
-rw-r--r-- | src/plugins/lua/dmarc.lua | 37 | ||||
-rw-r--r-- | src/ragel/newlines_strip.rl | 2 | ||||
-rw-r--r-- | src/rspamd.c | 1 | ||||
-rw-r--r-- | src/rspamd.h | 2 | ||||
-rw-r--r-- | src/worker.c | 49 |
14 files changed, 255 insertions, 44 deletions
diff --git a/src/controller.c b/src/controller.c index 99c81b95b..7280d9951 100644 --- a/src/controller.c +++ b/src/controller.c @@ -2473,13 +2473,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx) fd = open (ctx->saved_stats_path, O_WRONLY|O_CREAT|O_TRUNC, 00644); if (fd == -1) { - msg_err_ctx ("cannot load controller stats from %s: %s", + msg_err_ctx ("cannot open for writing controller stats from %s: %s", ctx->saved_stats_path, strerror (errno)); return; } if (rspamd_file_lock (fd, FALSE) == -1) { - msg_err_ctx ("cannot load controller stats from %s: %s", + msg_err_ctx ("cannot lock controller stats in %s: %s", ctx->saved_stats_path, strerror (errno)); close (fd); @@ -2660,6 +2660,20 @@ init_controller_worker (struct rspamd_config *cfg) return ctx; } +static void +rspamd_controller_on_terminate (struct rspamd_worker *worker) +{ + struct rspamd_controller_worker_ctx *ctx = worker->ctx; + + rspamd_controller_store_saved_stats (ctx); + + if (ctx->rrd) { + msg_info ("closing rrd file: %s", ctx->rrd->filename); + event_del (ctx->rrd_event); + rspamd_rrd_close (ctx->rrd); + } +} + /* * Start worker process */ @@ -2697,6 +2711,8 @@ start_controller_worker (struct rspamd_worker *worker) DEFAULT_STATS_PATH); } + g_ptr_array_add (worker->finish_actions, + (gpointer)rspamd_controller_on_terminate); rspamd_controller_load_saved_stats (ctx); /* RRD collector */ @@ -2832,11 +2848,6 @@ start_controller_worker (struct rspamd_worker *worker) rspamd_stat_close (); rspamd_http_router_free (ctx->http); rspamd_log_close (worker->srv->logger); - rspamd_controller_store_saved_stats (ctx); - - if (ctx->rrd) { - rspamd_rrd_close (ctx->rrd); - } if (ctx->cached_password.len > 0) { m = (gpointer)ctx->cached_password.begin; diff --git a/src/libmime/message.c b/src/libmime/message.c index e815bdbe4..8f4417db4 100644 --- a/src/libmime/message.c +++ b/src/libmime/message.c @@ -785,6 +785,7 @@ rspamd_normalize_text_part (struct rspamd_task *task, const guchar *p, *c, *end; guint i; + goffset off; struct rspamd_process_exception *ex; /* Strip newlines */ @@ -799,8 +800,10 @@ rspamd_normalize_text_part (struct rspamd_task *task, for (i = 0; i < part->newlines->len; i ++) { ex = rspamd_mempool_alloc (task->task_pool, sizeof (*ex)); - p = g_ptr_array_index (part->newlines, i); - ex->pos = p - c; + off = (goffset)g_ptr_array_index (part->newlines, i); + g_ptr_array_index (part->newlines, i) = (gpointer)(goffset) + (part->stripped_content->data + off); + ex->pos = off; ex->len = 0; ex->type = RSPAMD_EXCEPTION_NEWLINE; part->exceptions = g_list_prepend (part->exceptions, ex); diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index f66361a41..7c47f1e4b 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -400,6 +400,7 @@ struct rspamd_config { struct worker_s **compiled_workers; /**< list of compiled C modules */ GList *dynamic_modules; /**< list of dynamic C modules */ GList *dynamic_workers; /**< list of dynamic C modules */ + struct rspamd_config_post_load_script *finish_callbacks; /**< list of callbacks called on worker's termination */ struct rspamd_log_format *log_format; /**< parsed log format */ gchar *log_format_str; /**< raw log format string */ diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index b0ae4a3ae..9fed83c9a 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -175,6 +175,7 @@ rspamd_config_free (struct rspamd_config *cfg) { struct rspamd_dynamic_module *dyn_mod; struct rspamd_dynamic_worker *dyn_wrk; + struct rspamd_config_post_load_script *sc, *sctmp; GList *cur; rspamd_map_remove_all (cfg); @@ -221,6 +222,16 @@ rspamd_config_free (struct rspamd_config *cfg) cur = g_list_next (cur); } + DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) { + luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref); + g_slice_free1 (sizeof (*sc), sc); + } + + DL_FOREACH_SAFE (cfg->on_load, sc, sctmp) { + luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref); + g_slice_free1 (sizeof (*sc), sc); + } + g_list_free (cfg->classifiers); g_list_free (cfg->metrics_list); rspamd_symbols_cache_destroy (cfg->cache); diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index b7aa5035f..98da2ce9a 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -87,8 +87,17 @@ rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type) return NULL; } -sig_atomic_t wanna_die = 0; +static void +rspamd_worker_terminate_handlers (struct rspamd_worker *w) +{ + guint i; + void (*cb)(struct rspamd_worker *); + for (i = 0; i < w->finish_actions->len; i ++) { + cb = g_ptr_array_index (w->finish_actions, i); + cb (w); + } +} /* * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them */ @@ -98,10 +107,11 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg /* Do not accept new connections, preparing to end worker's process */ struct timeval tv; - if (!wanna_die) { + if (!sigh->worker->wanna_die) { tv.tv_sec = SOFT_SHUTDOWN_TIME; tv.tv_usec = 0; - wanna_die = 1; + sigh->worker->wanna_die = TRUE; + rspamd_worker_terminate_handlers (sigh->worker); rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, @@ -127,14 +137,15 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg { struct timeval tv; - if (!wanna_die) { + if (!sigh->worker->wanna_die) { rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, G_STRFUNC, "terminating after receiving signal %s", g_strsignal (sigh->signo)); - wanna_die = 1; + rspamd_worker_terminate_handlers (sigh->worker); + sigh->worker->wanna_die = 1; tv.tv_sec = 0; tv.tv_usec = 0; @@ -525,6 +536,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf)); wrk->index = index; wrk->ctx = cf->ctx; + wrk->finish_actions = g_ptr_array_new (); wrk->pid = fork (); diff --git a/src/libutil/rrd.c b/src/libutil/rrd.c index ec061dd1a..fca4ecad2 100644 --- a/src/libutil/rrd.c +++ b/src/libutil/rrd.c @@ -350,6 +350,40 @@ rspamd_rrd_calculate_checksum (struct rspamd_rrd_file *file) } } +static int +rspamd_rrd_open_exclusive (const gchar *filename) +{ + struct timespec sleep_ts = { + .tv_sec = 0, + .tv_nsec = 1000000 + }; + gint fd; + + fd = open (filename, O_RDWR); + + if (fd == -1) { + return -1; + } + + for (;;) { + if (rspamd_file_lock (fd, TRUE) == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + nanosleep (&sleep_ts, NULL); + continue; + } + else { + close (fd); + return -1; + } + } + else { + break; + } + } + + return fd; +}; + /** * Open completed or incompleted rrd file * @param filename @@ -376,7 +410,7 @@ rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err) } /* Open file */ - fd = open (filename, O_RDWR); + fd = rspamd_rrd_open_exclusive (filename); if (fd == -1) { g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno)); @@ -386,14 +420,17 @@ rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err) if (fstat (fd, &st) == -1) { g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno)); + rspamd_file_unlock (fd, FALSE); close (fd); return FALSE; } /* Mmap file */ file->size = st.st_size; if ((file->map = - mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, - 0)) == MAP_FAILED) { + mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0)) == MAP_FAILED) { + + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno)); @@ -401,7 +438,7 @@ rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err) return NULL; } - close (fd); + file->fd = fd; /* Adjust pointers */ rspamd_rrd_adjust_pointers (file, completed); @@ -462,7 +499,7 @@ rspamd_rrd_create (const gchar *filename, guint i, j; /* Open file */ - fd = open (filename, O_RDWR | O_CREAT | O_TRUNC, 0644); + fd = open (filename, O_RDWR | O_CREAT | O_EXCL, 0644); if (fd == -1) { g_set_error (err, rrd_error_quark (), errno, "rrd create error: %s", @@ -470,6 +507,8 @@ rspamd_rrd_create (const gchar *filename, return NULL; } + rspamd_file_lock (fd, FALSE); + /* Fill header */ memset (&head, 0, sizeof (head)); head.rra_cnt = rra_count; @@ -480,6 +519,7 @@ rspamd_rrd_create (const gchar *filename, head.float_cookie = RRD_FLOAT_COOKIE; if (write (fd, &head, sizeof (head)) != sizeof (head)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); @@ -493,6 +533,7 @@ rspamd_rrd_create (const gchar *filename, memset (&ds.par, 0, sizeof (ds.par)); for (i = 0; i < ds_count; i++) { if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", @@ -508,6 +549,7 @@ rspamd_rrd_create (const gchar *filename, memset (&rra.par, 0, sizeof (rra.par)); for (i = 0; i < rra_count; i++) { if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", @@ -522,6 +564,7 @@ rspamd_rrd_create (const gchar *filename, lh.last_up_usec = (glong)((initial_ticks - lh.last_up) * 1e6f); if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); @@ -537,6 +580,7 @@ rspamd_rrd_create (const gchar *filename, for (i = 0; i < ds_count; i++) { if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", @@ -554,6 +598,7 @@ rspamd_rrd_create (const gchar *filename, for (i = 0; i < rra_count; i++) { for (j = 0; j < ds_count; j++) { if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", @@ -567,6 +612,7 @@ rspamd_rrd_create (const gchar *filename, memset (&rra_ptr, 0, sizeof (rra_ptr)); for (i = 0; i < rra_count; i++) { if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) { + rspamd_file_unlock (fd, FALSE); close (fd); g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", @@ -575,7 +621,9 @@ rspamd_rrd_create (const gchar *filename, } } + rspamd_file_unlock (fd, FALSE); close (fd); + new = rspamd_rrd_open_common (filename, FALSE, err); return new; @@ -643,18 +691,13 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err) gdouble vbuf[1024]; struct stat st; - if (file == NULL || file->filename == NULL) { + if (file == NULL || file->filename == NULL || file->fd == -1) { g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments"); return FALSE; } - fd = open (file->filename, O_RDWR); - if (fd == -1) { - g_set_error (err, - rrd_error_quark (), errno, "rrd open error: %s", strerror (errno)); - return FALSE; - } + fd = file->fd; if (lseek (fd, 0, SEEK_END) == -1) { g_set_error (err, @@ -710,7 +753,7 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err) g_slice_free1 (sizeof (struct rspamd_rrd_file), file); return FALSE; } - close (fd); + /* Adjust pointers */ rspamd_rrd_adjust_pointers (file, TRUE); @@ -1214,6 +1257,7 @@ rspamd_rrd_close (struct rspamd_rrd_file * file) } munmap (file->map, file->size); + close (file->fd); g_free (file->filename); g_free (file->id); diff --git a/src/libutil/rrd.h b/src/libutil/rrd.h index c7bc8008a..d24249965 100644 --- a/src/libutil/rrd.h +++ b/src/libutil/rrd.h @@ -204,6 +204,7 @@ struct rspamd_rrd_file { gsize size; /* its size */ gboolean finalized; gchar *id; + gint fd; }; diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index 2ec235485..1fefdc2cb 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -354,5 +354,15 @@ gsize lua_logger_out_type (lua_State *L, gint pos, gchar *outbuf, */ void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname); +/** + * Call finishing script with the specified task + * @param L + * @param sc + * @param task + */ +void lua_call_finish_script (lua_State *L, struct + rspamd_config_post_load_script *sc, + struct rspamd_task *task); + #endif /* WITH_LUA */ #endif /* RSPAMD_LUA_H */ diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index ddd5d9146..1bb8ecc77 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -442,6 +442,15 @@ LUA_FUNCTION_DEF (config, get_symbol_callback); */ LUA_FUNCTION_DEF (config, set_symbol_callback); +/*** + * @method register_finish_script(callback) + * Adds new callback that is called on worker process termination when all + * tasks pending are processed + * + * @param callback {function} a fucntion with one argument (rspamd_task) + */ +LUA_FUNCTION_DEF (config, register_finish_script); + static const struct luaL_reg configlib_m[] = { LUA_INTERFACE_DEF (config, get_module_opt), LUA_INTERFACE_DEF (config, get_mempool), @@ -474,6 +483,7 @@ static const struct luaL_reg configlib_m[] = { LUA_INTERFACE_DEF (config, get_symbols_count), LUA_INTERFACE_DEF (config, get_symbol_callback), LUA_INTERFACE_DEF (config, set_symbol_callback), + LUA_INTERFACE_DEF (config, register_finish_script), {"__tostring", rspamd_lua_class_tostring}, {"__newindex", lua_config_newindex}, {NULL, NULL} @@ -1766,7 +1776,7 @@ lua_config_add_on_load (lua_State *L) return luaL_error (L, "invalid arguments"); } - sc = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*sc)); + sc = g_slice_alloc0 (sizeof (*sc)); lua_pushvalue (L, 2); sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX); DL_APPEND (cfg->on_load, sc); @@ -1860,6 +1870,26 @@ lua_config_set_symbol_callback (lua_State *L) return 1; } +static gint +lua_config_register_finish_script (lua_State *L) +{ + struct rspamd_config *cfg = lua_check_config (L, 1); + struct rspamd_config_post_load_script *sc; + + if (cfg != NULL && lua_type (L, 2) == LUA_TFUNCTION) { + sc = g_slice_alloc0 (sizeof (*sc)); + lua_pushvalue (L, 2); + sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX); + DL_APPEND (cfg->finish_callbacks, sc); + } + else { + return luaL_error (L, "invalid arguments"); + } + + return 0; +} + + void luaopen_config (lua_State * L) { @@ -1867,3 +1897,32 @@ luaopen_config (lua_State * L) lua_pop (L, 1); } + +void +lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc, + struct rspamd_task *task) +{ + struct rspamd_task **ptask; + gint err_idx; + GString *tb; + + lua_pushcfunction (L, &rspamd_lua_traceback); + err_idx = lua_gettop (L); + + lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref); + + ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall (L, 1, 0, err_idx) != 0) { + tb = lua_touserdata (L, -1); + msg_err_task ("call to finishing script failed: %v", tb); + g_string_free (tb, TRUE); + lua_pop (L, 1); + } + + lua_pop (L, 1); /* Error function */ + + return; +} diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua index d9961956f..d9c96459a 100644 --- a/src/plugins/lua/dmarc.lua +++ b/src/plugins/lua/dmarc.lua @@ -28,9 +28,11 @@ local symbols = { spf_deny_symbol = 'R_SPF_FAIL', spf_softfail_symbol = 'R_SPF_SOFTFAIL', spf_neutral_symbol = 'R_SPF_NEUTRAL', + spf_tempfail_symbol = 'R_SPF_DNSFAIL', dkim_allow_symbol = 'R_DKIM_ALLOW', dkim_deny_symbol = 'R_DKIM_REJECT', + dkim_tempfail_symbol = 'R_DKIM_TEMPFAIL', } -- Default port for redis upstreams local redis_params = nil @@ -80,6 +82,10 @@ local function dmarc_callback(task) local function dmarc_dns_cb(resolver, to_resolve, results, err, key) local lookup_domain = string.sub(to_resolve, 8) + if err and err ~= 'requested record is not found' then + task:insert_result('DMARC_DNSFAIL', 1.0, lookup_domain .. ' : ' .. err) + return + end if not results then if lookup_domain ~= dmarc_domain then local resolve_name = '_dmarc.' .. dmarc_domain @@ -237,24 +243,31 @@ local function dmarc_callback(task) disposition = "none" if not (spf_ok or dkim_ok) then res = 1.0 - if quarantine_policy then - if not pct or pct == 100 or (math.random(100) <= pct) then - task:insert_result('DMARC_POLICY_QUARANTINE', res, lookup_domain) - disposition = "quarantine" - end - elseif strict_policy then - if not pct or pct == 100 or (math.random(100) <= pct) then - task:insert_result('DMARC_POLICY_REJECT', res, lookup_domain) - disposition = "reject" - end + local spf_tmpfail = task:get_symbol(symbols['spf_tempfail_symbol']) + local dkim_tmpfail = task:get_symbol(symbols['dkim_tempfail_symbol']) + if (spf_tmpfail or dkim_tmpfail) then + task:insert_result('DMARC_DNSFAIL', 1.0, lookup_domain .. ' : ' .. 'SPF/DKIM temp error') + disposition = 'failed' else - task:insert_result('DMARC_POLICY_SOFTFAIL', res, lookup_domain) + if quarantine_policy then + if not pct or pct == 100 or (math.random(100) <= pct) then + task:insert_result('DMARC_POLICY_QUARANTINE', res, lookup_domain) + disposition = "quarantine" + end + elseif strict_policy then + if not pct or pct == 100 or (math.random(100) <= pct) then + task:insert_result('DMARC_POLICY_REJECT', res, lookup_domain) + disposition = "reject" + end + else + task:insert_result('DMARC_POLICY_SOFTFAIL', res, lookup_domain) + end end else task:insert_result('DMARC_POLICY_ALLOW', res, lookup_domain) end - if rua and redis_params and dmarc_reporting then + if rua and redis_params and dmarc_reporting and not (disposition == 'failed') then -- Prepare and send redis report element local redis_key = dmarc_redis_key_prefix .. from[1]['domain'] local report_data = dmarc_report(task, spf_ok, dkim_ok, disposition) diff --git a/src/ragel/newlines_strip.rl b/src/ragel/newlines_strip.rl index a0e239bc3..d5de198a5 100644 --- a/src/ragel/newlines_strip.rl +++ b/src/ragel/newlines_strip.rl @@ -40,7 +40,7 @@ } (*newlines_count)++; - g_ptr_array_add (newlines, (gpointer)p); + g_ptr_array_add (newlines, (((gpointer) (goffset) (data->len)))); c = p; } diff --git a/src/rspamd.c b/src/rspamd.c index 056de6e12..3ddee3ff4 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -692,6 +692,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) g_quark_to_string (w->type), w->pid, WTERMSIG (res) == SIGKILL ? "hardly" : "softly"); event_del (&w->srv_ev); + g_ptr_array_free (w->finish_actions, TRUE); g_free (w->cf); g_free (w); diff --git a/src/rspamd.h b/src/rspamd.h index ffebfe387..e99e4c18d 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -49,6 +49,7 @@ struct rspamd_worker { pid_t pid; /**< pid of worker */ guint index; /**< index number */ guint nconns; /**< current connections count */ + gboolean wanna_die; /**< worker is terminating */ gdouble start_time; /**< start time */ struct rspamd_main *srv; /**< pointer to server structure */ GQuark type; /**< process type */ @@ -62,6 +63,7 @@ struct rspamd_worker { main process. [0] - main, [1] - worker */ struct event srv_ev; /**< used by main for read workers' requests */ gpointer control_data; /**< used by control protocol to handle commands */ + GPtrArray *finish_actions; /**< called when worker is terminated */ }; struct rspamd_abstract_worker_ctx { diff --git a/src/worker.c b/src/worker.c index 60c39b2af..097f7c2e1 100644 --- a/src/worker.c +++ b/src/worker.c @@ -72,15 +72,47 @@ worker_t normal_worker = { G_STRFUNC, \ __VA_ARGS__) +static void +rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) +{ + struct rspamd_task *task; + struct rspamd_config *cfg = worker->srv->cfg; + struct rspamd_worker_ctx *ctx; + struct rspamd_config_post_load_script *sc; + + if (cfg->finish_callbacks) { + ctx = worker->ctx; + /* Create a fake task object for async events */ + task = rspamd_task_new (worker, cfg); + task->resolver = ctx->resolver; + task->ev_base = ctx->ev_base; + task->s = rspamd_session_create (task->task_pool, + NULL, + NULL, + (event_finalizer_t) rspamd_task_free, + task); + + DL_FOREACH (cfg->finish_callbacks, sc) { + lua_call_finish_script (cfg->lua_state, sc, task); + } + } + +} + /* * Reduce number of tasks proceeded */ static void reduce_tasks_count (gpointer arg) { - guint *nconns = arg; + struct rspamd_worker *worker = arg; - (*nconns)--; + worker->nconns --; + + if (worker->wanna_die && worker->nconns == 0) { + msg_info ("performing finishing actions"); + rspamd_worker_call_finish_handlers (worker); + } } static void @@ -344,7 +376,7 @@ accept_socket (gint fd, short what, void *arg) task->ev_base = ctx->ev_base; worker->nconns++; rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns); + (rspamd_mempool_destruct_t)reduce_tasks_count, worker); /* Set up async session */ task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, @@ -525,6 +557,15 @@ init_worker (struct rspamd_config *cfg) return ctx; } +static void +rspamd_worker_on_terminate (struct rspamd_worker *worker) +{ + if (worker->nconns == 0) { + msg_info ("performing finishing actions"); + rspamd_worker_call_finish_handlers (worker); + } +} + /* * Start worker process */ @@ -549,6 +590,8 @@ start_worker (struct rspamd_worker *worker) /* XXX: stupid default */ ctx->keys_cache = rspamd_keypair_cache_new (256); rspamd_stat_init (worker->srv->cfg, ctx->ev_base); + g_ptr_array_add (worker->finish_actions, + (gpointer) rspamd_worker_on_terminate); #ifdef WITH_HYPERSCAN rspamd_control_worker_add_cmd_handler (worker, |