aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/controller.c25
-rw-r--r--src/libmime/message.c7
-rw-r--r--src/libserver/cfg_file.h1
-rw-r--r--src/libserver/cfg_utils.c11
-rw-r--r--src/libserver/worker_util.c22
-rw-r--r--src/libutil/rrd.c70
-rw-r--r--src/libutil/rrd.h1
-rw-r--r--src/lua/lua_common.h10
-rw-r--r--src/lua/lua_config.c61
-rw-r--r--src/plugins/lua/dmarc.lua37
-rw-r--r--src/ragel/newlines_strip.rl2
-rw-r--r--src/rspamd.c1
-rw-r--r--src/rspamd.h2
-rw-r--r--src/worker.c49
14 files changed, 255 insertions, 44 deletions
diff --git a/src/controller.c b/src/controller.c
index 99c81b95b..7280d9951 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -2473,13 +2473,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
fd = open (ctx->saved_stats_path, O_WRONLY|O_CREAT|O_TRUNC, 00644);
if (fd == -1) {
- msg_err_ctx ("cannot load controller stats from %s: %s",
+ msg_err_ctx ("cannot open for writing controller stats from %s: %s",
ctx->saved_stats_path, strerror (errno));
return;
}
if (rspamd_file_lock (fd, FALSE) == -1) {
- msg_err_ctx ("cannot load controller stats from %s: %s",
+ msg_err_ctx ("cannot lock controller stats in %s: %s",
ctx->saved_stats_path, strerror (errno));
close (fd);
@@ -2660,6 +2660,20 @@ init_controller_worker (struct rspamd_config *cfg)
return ctx;
}
+static void
+rspamd_controller_on_terminate (struct rspamd_worker *worker)
+{
+ struct rspamd_controller_worker_ctx *ctx = worker->ctx;
+
+ rspamd_controller_store_saved_stats (ctx);
+
+ if (ctx->rrd) {
+ msg_info ("closing rrd file: %s", ctx->rrd->filename);
+ event_del (ctx->rrd_event);
+ rspamd_rrd_close (ctx->rrd);
+ }
+}
+
/*
* Start worker process
*/
@@ -2697,6 +2711,8 @@ start_controller_worker (struct rspamd_worker *worker)
DEFAULT_STATS_PATH);
}
+ g_ptr_array_add (worker->finish_actions,
+ (gpointer)rspamd_controller_on_terminate);
rspamd_controller_load_saved_stats (ctx);
/* RRD collector */
@@ -2832,11 +2848,6 @@ start_controller_worker (struct rspamd_worker *worker)
rspamd_stat_close ();
rspamd_http_router_free (ctx->http);
rspamd_log_close (worker->srv->logger);
- rspamd_controller_store_saved_stats (ctx);
-
- if (ctx->rrd) {
- rspamd_rrd_close (ctx->rrd);
- }
if (ctx->cached_password.len > 0) {
m = (gpointer)ctx->cached_password.begin;
diff --git a/src/libmime/message.c b/src/libmime/message.c
index e815bdbe4..8f4417db4 100644
--- a/src/libmime/message.c
+++ b/src/libmime/message.c
@@ -785,6 +785,7 @@ rspamd_normalize_text_part (struct rspamd_task *task,
const guchar *p, *c, *end;
guint i;
+ goffset off;
struct rspamd_process_exception *ex;
/* Strip newlines */
@@ -799,8 +800,10 @@ rspamd_normalize_text_part (struct rspamd_task *task,
for (i = 0; i < part->newlines->len; i ++) {
ex = rspamd_mempool_alloc (task->task_pool, sizeof (*ex));
- p = g_ptr_array_index (part->newlines, i);
- ex->pos = p - c;
+ off = (goffset)g_ptr_array_index (part->newlines, i);
+ g_ptr_array_index (part->newlines, i) = (gpointer)(goffset)
+ (part->stripped_content->data + off);
+ ex->pos = off;
ex->len = 0;
ex->type = RSPAMD_EXCEPTION_NEWLINE;
part->exceptions = g_list_prepend (part->exceptions, ex);
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index f66361a41..7c47f1e4b 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -400,6 +400,7 @@ struct rspamd_config {
struct worker_s **compiled_workers; /**< list of compiled C modules */
GList *dynamic_modules; /**< list of dynamic C modules */
GList *dynamic_workers; /**< list of dynamic C modules */
+ struct rspamd_config_post_load_script *finish_callbacks; /**< list of callbacks called on worker's termination */
struct rspamd_log_format *log_format; /**< parsed log format */
gchar *log_format_str; /**< raw log format string */
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index b0ae4a3ae..9fed83c9a 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -175,6 +175,7 @@ rspamd_config_free (struct rspamd_config *cfg)
{
struct rspamd_dynamic_module *dyn_mod;
struct rspamd_dynamic_worker *dyn_wrk;
+ struct rspamd_config_post_load_script *sc, *sctmp;
GList *cur;
rspamd_map_remove_all (cfg);
@@ -221,6 +222,16 @@ rspamd_config_free (struct rspamd_config *cfg)
cur = g_list_next (cur);
}
+ DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) {
+ luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
+ g_slice_free1 (sizeof (*sc), sc);
+ }
+
+ DL_FOREACH_SAFE (cfg->on_load, sc, sctmp) {
+ luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
+ g_slice_free1 (sizeof (*sc), sc);
+ }
+
g_list_free (cfg->classifiers);
g_list_free (cfg->metrics_list);
rspamd_symbols_cache_destroy (cfg->cache);
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index b7aa5035f..98da2ce9a 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -87,8 +87,17 @@ rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type)
return NULL;
}
-sig_atomic_t wanna_die = 0;
+static void
+rspamd_worker_terminate_handlers (struct rspamd_worker *w)
+{
+ guint i;
+ void (*cb)(struct rspamd_worker *);
+ for (i = 0; i < w->finish_actions->len; i ++) {
+ cb = g_ptr_array_index (w->finish_actions, i);
+ cb (w);
+ }
+}
/*
* Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
*/
@@ -98,10 +107,11 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
/* Do not accept new connections, preparing to end worker's process */
struct timeval tv;
- if (!wanna_die) {
+ if (!sigh->worker->wanna_die) {
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
- wanna_die = 1;
+ sigh->worker->wanna_die = TRUE;
+ rspamd_worker_terminate_handlers (sigh->worker);
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
@@ -127,14 +137,15 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
{
struct timeval tv;
- if (!wanna_die) {
+ if (!sigh->worker->wanna_die) {
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
G_STRFUNC,
"terminating after receiving signal %s",
g_strsignal (sigh->signo));
- wanna_die = 1;
+ rspamd_worker_terminate_handlers (sigh->worker);
+ sigh->worker->wanna_die = 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
@@ -525,6 +536,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf));
wrk->index = index;
wrk->ctx = cf->ctx;
+ wrk->finish_actions = g_ptr_array_new ();
wrk->pid = fork ();
diff --git a/src/libutil/rrd.c b/src/libutil/rrd.c
index ec061dd1a..fca4ecad2 100644
--- a/src/libutil/rrd.c
+++ b/src/libutil/rrd.c
@@ -350,6 +350,40 @@ rspamd_rrd_calculate_checksum (struct rspamd_rrd_file *file)
}
}
+static int
+rspamd_rrd_open_exclusive (const gchar *filename)
+{
+ struct timespec sleep_ts = {
+ .tv_sec = 0,
+ .tv_nsec = 1000000
+ };
+ gint fd;
+
+ fd = open (filename, O_RDWR);
+
+ if (fd == -1) {
+ return -1;
+ }
+
+ for (;;) {
+ if (rspamd_file_lock (fd, TRUE) == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ nanosleep (&sleep_ts, NULL);
+ continue;
+ }
+ else {
+ close (fd);
+ return -1;
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ return fd;
+};
+
/**
* Open completed or incompleted rrd file
* @param filename
@@ -376,7 +410,7 @@ rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
}
/* Open file */
- fd = open (filename, O_RDWR);
+ fd = rspamd_rrd_open_exclusive (filename);
if (fd == -1) {
g_set_error (err,
rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
@@ -386,14 +420,17 @@ rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
if (fstat (fd, &st) == -1) {
g_set_error (err,
rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+ rspamd_file_unlock (fd, FALSE);
close (fd);
return FALSE;
}
/* Mmap file */
file->size = st.st_size;
if ((file->map =
- mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
- 0)) == MAP_FAILED) {
+ mmap (NULL, st.st_size, PROT_READ | PROT_WRITE,
+ MAP_SHARED, fd, 0)) == MAP_FAILED) {
+
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
@@ -401,7 +438,7 @@ rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
return NULL;
}
- close (fd);
+ file->fd = fd;
/* Adjust pointers */
rspamd_rrd_adjust_pointers (file, completed);
@@ -462,7 +499,7 @@ rspamd_rrd_create (const gchar *filename,
guint i, j;
/* Open file */
- fd = open (filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
+ fd = open (filename, O_RDWR | O_CREAT | O_EXCL, 0644);
if (fd == -1) {
g_set_error (err,
rrd_error_quark (), errno, "rrd create error: %s",
@@ -470,6 +507,8 @@ rspamd_rrd_create (const gchar *filename,
return NULL;
}
+ rspamd_file_lock (fd, FALSE);
+
/* Fill header */
memset (&head, 0, sizeof (head));
head.rra_cnt = rra_count;
@@ -480,6 +519,7 @@ rspamd_rrd_create (const gchar *filename,
head.float_cookie = RRD_FLOAT_COOKIE;
if (write (fd, &head, sizeof (head)) != sizeof (head)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
@@ -493,6 +533,7 @@ rspamd_rrd_create (const gchar *filename,
memset (&ds.par, 0, sizeof (ds.par));
for (i = 0; i < ds_count; i++) {
if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s",
@@ -508,6 +549,7 @@ rspamd_rrd_create (const gchar *filename,
memset (&rra.par, 0, sizeof (rra.par));
for (i = 0; i < rra_count; i++) {
if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s",
@@ -522,6 +564,7 @@ rspamd_rrd_create (const gchar *filename,
lh.last_up_usec = (glong)((initial_ticks - lh.last_up) * 1e6f);
if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
@@ -537,6 +580,7 @@ rspamd_rrd_create (const gchar *filename,
for (i = 0; i < ds_count; i++) {
if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s",
@@ -554,6 +598,7 @@ rspamd_rrd_create (const gchar *filename,
for (i = 0; i < rra_count; i++) {
for (j = 0; j < ds_count; j++) {
if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s",
@@ -567,6 +612,7 @@ rspamd_rrd_create (const gchar *filename,
memset (&rra_ptr, 0, sizeof (rra_ptr));
for (i = 0; i < rra_count; i++) {
if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) {
+ rspamd_file_unlock (fd, FALSE);
close (fd);
g_set_error (err,
rrd_error_quark (), errno, "rrd write error: %s",
@@ -575,7 +621,9 @@ rspamd_rrd_create (const gchar *filename,
}
}
+ rspamd_file_unlock (fd, FALSE);
close (fd);
+
new = rspamd_rrd_open_common (filename, FALSE, err);
return new;
@@ -643,18 +691,13 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
gdouble vbuf[1024];
struct stat st;
- if (file == NULL || file->filename == NULL) {
+ if (file == NULL || file->filename == NULL || file->fd == -1) {
g_set_error (err,
rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
return FALSE;
}
- fd = open (file->filename, O_RDWR);
- if (fd == -1) {
- g_set_error (err,
- rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
- return FALSE;
- }
+ fd = file->fd;
if (lseek (fd, 0, SEEK_END) == -1) {
g_set_error (err,
@@ -710,7 +753,7 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
return FALSE;
}
- close (fd);
+
/* Adjust pointers */
rspamd_rrd_adjust_pointers (file, TRUE);
@@ -1214,6 +1257,7 @@ rspamd_rrd_close (struct rspamd_rrd_file * file)
}
munmap (file->map, file->size);
+ close (file->fd);
g_free (file->filename);
g_free (file->id);
diff --git a/src/libutil/rrd.h b/src/libutil/rrd.h
index c7bc8008a..d24249965 100644
--- a/src/libutil/rrd.h
+++ b/src/libutil/rrd.h
@@ -204,6 +204,7 @@ struct rspamd_rrd_file {
gsize size; /* its size */
gboolean finalized;
gchar *id;
+ gint fd;
};
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index 2ec235485..1fefdc2cb 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -354,5 +354,15 @@ gsize lua_logger_out_type (lua_State *L, gint pos, gchar *outbuf,
*/
void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname);
+/**
+ * Call finishing script with the specified task
+ * @param L
+ * @param sc
+ * @param task
+ */
+void lua_call_finish_script (lua_State *L, struct
+ rspamd_config_post_load_script *sc,
+ struct rspamd_task *task);
+
#endif /* WITH_LUA */
#endif /* RSPAMD_LUA_H */
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index ddd5d9146..1bb8ecc77 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -442,6 +442,15 @@ LUA_FUNCTION_DEF (config, get_symbol_callback);
*/
LUA_FUNCTION_DEF (config, set_symbol_callback);
+/***
+ * @method register_finish_script(callback)
+ * Adds new callback that is called on worker process termination when all
+ * tasks pending are processed
+ *
+ * @param callback {function} a fucntion with one argument (rspamd_task)
+ */
+LUA_FUNCTION_DEF (config, register_finish_script);
+
static const struct luaL_reg configlib_m[] = {
LUA_INTERFACE_DEF (config, get_module_opt),
LUA_INTERFACE_DEF (config, get_mempool),
@@ -474,6 +483,7 @@ static const struct luaL_reg configlib_m[] = {
LUA_INTERFACE_DEF (config, get_symbols_count),
LUA_INTERFACE_DEF (config, get_symbol_callback),
LUA_INTERFACE_DEF (config, set_symbol_callback),
+ LUA_INTERFACE_DEF (config, register_finish_script),
{"__tostring", rspamd_lua_class_tostring},
{"__newindex", lua_config_newindex},
{NULL, NULL}
@@ -1766,7 +1776,7 @@ lua_config_add_on_load (lua_State *L)
return luaL_error (L, "invalid arguments");
}
- sc = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*sc));
+ sc = g_slice_alloc0 (sizeof (*sc));
lua_pushvalue (L, 2);
sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
DL_APPEND (cfg->on_load, sc);
@@ -1860,6 +1870,26 @@ lua_config_set_symbol_callback (lua_State *L)
return 1;
}
+static gint
+lua_config_register_finish_script (lua_State *L)
+{
+ struct rspamd_config *cfg = lua_check_config (L, 1);
+ struct rspamd_config_post_load_script *sc;
+
+ if (cfg != NULL && lua_type (L, 2) == LUA_TFUNCTION) {
+ sc = g_slice_alloc0 (sizeof (*sc));
+ lua_pushvalue (L, 2);
+ sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ DL_APPEND (cfg->finish_callbacks, sc);
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
+
+ return 0;
+}
+
+
void
luaopen_config (lua_State * L)
{
@@ -1867,3 +1897,32 @@ luaopen_config (lua_State * L)
lua_pop (L, 1);
}
+
+void
+lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc,
+ struct rspamd_task *task)
+{
+ struct rspamd_task **ptask;
+ gint err_idx;
+ GString *tb;
+
+ lua_pushcfunction (L, &rspamd_lua_traceback);
+ err_idx = lua_gettop (L);
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref);
+
+ ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (L, "rspamd{task}", -1);
+ *ptask = task;
+
+ if (lua_pcall (L, 1, 0, err_idx) != 0) {
+ tb = lua_touserdata (L, -1);
+ msg_err_task ("call to finishing script failed: %v", tb);
+ g_string_free (tb, TRUE);
+ lua_pop (L, 1);
+ }
+
+ lua_pop (L, 1); /* Error function */
+
+ return;
+}
diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua
index d9961956f..d9c96459a 100644
--- a/src/plugins/lua/dmarc.lua
+++ b/src/plugins/lua/dmarc.lua
@@ -28,9 +28,11 @@ local symbols = {
spf_deny_symbol = 'R_SPF_FAIL',
spf_softfail_symbol = 'R_SPF_SOFTFAIL',
spf_neutral_symbol = 'R_SPF_NEUTRAL',
+ spf_tempfail_symbol = 'R_SPF_DNSFAIL',
dkim_allow_symbol = 'R_DKIM_ALLOW',
dkim_deny_symbol = 'R_DKIM_REJECT',
+ dkim_tempfail_symbol = 'R_DKIM_TEMPFAIL',
}
-- Default port for redis upstreams
local redis_params = nil
@@ -80,6 +82,10 @@ local function dmarc_callback(task)
local function dmarc_dns_cb(resolver, to_resolve, results, err, key)
local lookup_domain = string.sub(to_resolve, 8)
+ if err and err ~= 'requested record is not found' then
+ task:insert_result('DMARC_DNSFAIL', 1.0, lookup_domain .. ' : ' .. err)
+ return
+ end
if not results then
if lookup_domain ~= dmarc_domain then
local resolve_name = '_dmarc.' .. dmarc_domain
@@ -237,24 +243,31 @@ local function dmarc_callback(task)
disposition = "none"
if not (spf_ok or dkim_ok) then
res = 1.0
- if quarantine_policy then
- if not pct or pct == 100 or (math.random(100) <= pct) then
- task:insert_result('DMARC_POLICY_QUARANTINE', res, lookup_domain)
- disposition = "quarantine"
- end
- elseif strict_policy then
- if not pct or pct == 100 or (math.random(100) <= pct) then
- task:insert_result('DMARC_POLICY_REJECT', res, lookup_domain)
- disposition = "reject"
- end
+ local spf_tmpfail = task:get_symbol(symbols['spf_tempfail_symbol'])
+ local dkim_tmpfail = task:get_symbol(symbols['dkim_tempfail_symbol'])
+ if (spf_tmpfail or dkim_tmpfail) then
+ task:insert_result('DMARC_DNSFAIL', 1.0, lookup_domain .. ' : ' .. 'SPF/DKIM temp error')
+ disposition = 'failed'
else
- task:insert_result('DMARC_POLICY_SOFTFAIL', res, lookup_domain)
+ if quarantine_policy then
+ if not pct or pct == 100 or (math.random(100) <= pct) then
+ task:insert_result('DMARC_POLICY_QUARANTINE', res, lookup_domain)
+ disposition = "quarantine"
+ end
+ elseif strict_policy then
+ if not pct or pct == 100 or (math.random(100) <= pct) then
+ task:insert_result('DMARC_POLICY_REJECT', res, lookup_domain)
+ disposition = "reject"
+ end
+ else
+ task:insert_result('DMARC_POLICY_SOFTFAIL', res, lookup_domain)
+ end
end
else
task:insert_result('DMARC_POLICY_ALLOW', res, lookup_domain)
end
- if rua and redis_params and dmarc_reporting then
+ if rua and redis_params and dmarc_reporting and not (disposition == 'failed') then
-- Prepare and send redis report element
local redis_key = dmarc_redis_key_prefix .. from[1]['domain']
local report_data = dmarc_report(task, spf_ok, dkim_ok, disposition)
diff --git a/src/ragel/newlines_strip.rl b/src/ragel/newlines_strip.rl
index a0e239bc3..d5de198a5 100644
--- a/src/ragel/newlines_strip.rl
+++ b/src/ragel/newlines_strip.rl
@@ -40,7 +40,7 @@
}
(*newlines_count)++;
- g_ptr_array_add (newlines, (gpointer)p);
+ g_ptr_array_add (newlines, (((gpointer) (goffset) (data->len))));
c = p;
}
diff --git a/src/rspamd.c b/src/rspamd.c
index 056de6e12..3ddee3ff4 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -692,6 +692,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
g_quark_to_string (w->type), w->pid,
WTERMSIG (res) == SIGKILL ? "hardly" : "softly");
event_del (&w->srv_ev);
+ g_ptr_array_free (w->finish_actions, TRUE);
g_free (w->cf);
g_free (w);
diff --git a/src/rspamd.h b/src/rspamd.h
index ffebfe387..e99e4c18d 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -49,6 +49,7 @@ struct rspamd_worker {
pid_t pid; /**< pid of worker */
guint index; /**< index number */
guint nconns; /**< current connections count */
+ gboolean wanna_die; /**< worker is terminating */
gdouble start_time; /**< start time */
struct rspamd_main *srv; /**< pointer to server structure */
GQuark type; /**< process type */
@@ -62,6 +63,7 @@ struct rspamd_worker {
main process. [0] - main, [1] - worker */
struct event srv_ev; /**< used by main for read workers' requests */
gpointer control_data; /**< used by control protocol to handle commands */
+ GPtrArray *finish_actions; /**< called when worker is terminated */
};
struct rspamd_abstract_worker_ctx {
diff --git a/src/worker.c b/src/worker.c
index 60c39b2af..097f7c2e1 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -72,15 +72,47 @@ worker_t normal_worker = {
G_STRFUNC, \
__VA_ARGS__)
+static void
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+ struct rspamd_task *task;
+ struct rspamd_config *cfg = worker->srv->cfg;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_config_post_load_script *sc;
+
+ if (cfg->finish_callbacks) {
+ ctx = worker->ctx;
+ /* Create a fake task object for async events */
+ task = rspamd_task_new (worker, cfg);
+ task->resolver = ctx->resolver;
+ task->ev_base = ctx->ev_base;
+ task->s = rspamd_session_create (task->task_pool,
+ NULL,
+ NULL,
+ (event_finalizer_t) rspamd_task_free,
+ task);
+
+ DL_FOREACH (cfg->finish_callbacks, sc) {
+ lua_call_finish_script (cfg->lua_state, sc, task);
+ }
+ }
+
+}
+
/*
* Reduce number of tasks proceeded
*/
static void
reduce_tasks_count (gpointer arg)
{
- guint *nconns = arg;
+ struct rspamd_worker *worker = arg;
- (*nconns)--;
+ worker->nconns --;
+
+ if (worker->wanna_die && worker->nconns == 0) {
+ msg_info ("performing finishing actions");
+ rspamd_worker_call_finish_handlers (worker);
+ }
}
static void
@@ -344,7 +376,7 @@ accept_socket (gint fd, short what, void *arg)
task->ev_base = ctx->ev_base;
worker->nconns++;
rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns);
+ (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
/* Set up async session */
task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
@@ -525,6 +557,15 @@ init_worker (struct rspamd_config *cfg)
return ctx;
}
+static void
+rspamd_worker_on_terminate (struct rspamd_worker *worker)
+{
+ if (worker->nconns == 0) {
+ msg_info ("performing finishing actions");
+ rspamd_worker_call_finish_handlers (worker);
+ }
+}
+
/*
* Start worker process
*/
@@ -549,6 +590,8 @@ start_worker (struct rspamd_worker *worker)
/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
+ g_ptr_array_add (worker->finish_actions,
+ (gpointer) rspamd_worker_on_terminate);
#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,