aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lualib/lua_redis.lua90
-rw-r--r--lualib/lua_selectors.lua17
-rw-r--r--lualib/lua_util.lua20
-rw-r--r--src/libserver/cfg_utils.c32
-rw-r--r--src/libserver/dns.c2
-rw-r--r--src/libserver/events.c18
-rw-r--r--src/libserver/events.h2
-rw-r--r--src/libstat/backends/redis_backend.c4
-rw-r--r--src/libstat/learn_cache/redis_cache.c4
-rw-r--r--src/libutil/mem_pool.c22
-rw-r--r--src/libutil/mem_pool.h7
-rw-r--r--src/lua/lua_http.c2
-rw-r--r--src/lua/lua_redis.c4
-rw-r--r--src/lua/lua_tcp.c4
-rw-r--r--src/plugins/fuzzy_check.c4
-rw-r--r--src/plugins/surbl.c2
-rw-r--r--src/rspamadm/lua_repl.c4
-rw-r--r--src/rspamadm/rspamadm.c4
18 files changed, 187 insertions, 55 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index 33757b154..44cb75f4a 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -1018,4 +1018,94 @@ end
exports.redis_connect_sync = redis_connect_sync
+--[[[
+-- @function lua_redis.request(redis_params, attrs, req)
+-- Sends a request to Redis (modern API)
+-- @param redis_params a table of redis server parameters
+-- @param attrs a table of redis request attributes (e.g. task, or ev_base + cfg + session)
+-- @param req a table of request: a command + command options
+--]]
+
+exports.request = function(redis_params, attrs, req)
+ local lua_util = require "lua_util"
+
+ if not attrs or not redis_params or not req then
+ logger.errx('invalid arguments for redis request')
+ return false,nil,nil
+ end
+
+ if not (attrs.task or (attrs.config and attrs.ev_base)) then
+ logger.errx('invalid attributes for redis request')
+ return false,nil,nil
+ end
+
+ local opts = lua_util.shallowcopy(attrs)
+
+ local log_obj = opts.task or opts.config
+
+ local addr
+
+ if opts.callback then
+ -- Wrap callback
+ local callback = opts.callback
+ local function rspamd_redis_make_request_cb(err, data)
+ if err then
+ addr:fail()
+ else
+ addr:ok()
+ end
+ callback(err, data, addr)
+ end
+ opts.callback = rspamd_redis_make_request_cb
+ end
+
+ local rspamd_redis = require "rspamd_redis"
+ local is_write = opts.is_write
+
+ if opts.key then
+ if is_write then
+ addr = redis_params['write_servers']:get_upstream_by_hash(attrs.key)
+ else
+ addr = redis_params['read_servers']:get_upstream_by_hash(attrs.key)
+ end
+ else
+ if is_write then
+ addr = redis_params['write_servers']:get_upstream_master_slave(attrs.key)
+ else
+ addr = redis_params['read_servers']:get_upstream_round_robin(attrs.key)
+ end
+ end
+
+ if not addr then
+ logger.errx(log_obj, 'cannot select server to make redis request')
+ end
+
+ opts.host = addr:get_addr()
+ opts.timeout = redis_params.timeout
+
+ if type(req) == 'string' then
+ opts.cmd = req
+ else
+ -- XXX: modifies the input table
+ opts.cmd = table.remove(req, 1);
+ opts.args = req
+ end
+
+ if redis_params.password then
+ opts.password = redis_params.password
+ end
+
+ if redis_params.db then
+ opts.dbname = redis_params.db
+ end
+
+ local ret,conn = rspamd_redis.make_request(opts)
+ if not ret then
+ logger.errx(log_obj, 'cannot execute redis request')
+ addr:fail()
+ end
+
+ return ret,conn,addr
+end
+
return exports
diff --git a/lualib/lua_selectors.lua b/lualib/lua_selectors.lua
index d1ef91230..112443fd3 100644
--- a/lualib/lua_selectors.lua
+++ b/lualib/lua_selectors.lua
@@ -536,19 +536,6 @@ exports.parse_selector = function(cfg, str)
local output = {}
if not parsed then return nil end
- local function shallowcopy(orig)
- local orig_type = type(orig)
- local copy
- if orig_type == 'table' then
- copy = {}
- for orig_key, orig_value in pairs(orig) do
- copy[orig_key] = orig_value
- end
- else
- copy = orig
- end
- return copy
- end
-- Output AST format is the following:
-- table of individual selectors
@@ -570,7 +557,7 @@ exports.parse_selector = function(cfg, str)
return nil
end
- res.selector = shallowcopy(extractors[selector_tbl[1]])
+ res.selector = lua_util.shallowcopy(extractors[selector_tbl[1]])
res.selector.name = selector_tbl[1]
res.selector.args = selector_tbl[2] or {}
@@ -585,7 +572,7 @@ exports.parse_selector = function(cfg, str)
logger.errx(cfg, 'processor %s is unknown', proc_name)
return nil
end
- local processor = shallowcopy(transform_function[proc_name])
+ local processor = lua_util.shallowcopy(transform_function[proc_name])
processor.name = proc_name
processor.args = proc_tbl[2]
lua_util.debugm(M, cfg, 'attached processor %s to selector %s, args: %s',
diff --git a/lualib/lua_util.lua b/lualib/lua_util.lua
index 0ce0c1874..e0096daa1 100644
--- a/lualib/lua_util.lua
+++ b/lualib/lua_util.lua
@@ -664,6 +664,25 @@ exports.extract_specific_urls = function(params_or_task, lim, need_emails, filte
return res
end
+
+--[[[
+-- @function lua_util.shallowcopy(tbl)
+-- Performs shallow (and fast) copy of a table or another Lua type
+--]]
+exports.shallowcopy = function(orig)
+ local orig_type = type(orig)
+ local copy
+ if orig_type == 'table' then
+ copy = {}
+ for orig_key, orig_value in pairs(orig) do
+ copy[orig_key] = orig_value
+ end
+ else
+ copy = orig
+ end
+ return copy
+end
+
-- Debugging support
local unconditional_debug = false
local debug_modules = {}
@@ -697,4 +716,5 @@ exports.debugm = function(mod, ...)
end
end
+
return exports
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index 016556912..f4f9393bf 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -213,8 +213,6 @@ rspamd_config_free (struct rspamd_config *cfg)
struct rspamd_config_post_load_script *sc, *sctmp;
struct rspamd_worker_log_pipe *lp, *ltmp;
- rspamd_map_remove_all (cfg);
-
DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) {
luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
g_free (sc);
@@ -225,18 +223,12 @@ rspamd_config_free (struct rspamd_config *cfg)
g_free (sc);
}
- if (cfg->monitored_ctx) {
- rspamd_monitored_ctx_destroy (cfg->monitored_ctx);
- }
+ rspamd_map_remove_all (cfg);
+ rspamd_mempool_destructors_enforce (cfg->cfg_pool);
g_list_free (cfg->classifiers);
g_list_free (cfg->workers);
rspamd_symbols_cache_destroy (cfg->cache);
-#ifdef WITH_HIREDIS
- if (cfg->redis_pool) {
- rspamd_redis_pool_destroy (cfg->redis_pool);
- }
-#endif
ucl_object_unref (cfg->rcl_obj);
ucl_object_unref (cfg->config_comments);
ucl_object_unref (cfg->doc_strings);
@@ -251,19 +243,28 @@ rspamd_config_free (struct rspamd_config *cfg)
g_hash_table_unref (cfg->wrk_parsers);
g_hash_table_unref (cfg->trusted_keys);
- if (cfg->checksum) {
- g_free (cfg->checksum);
- }
-
rspamd_re_cache_unref (cfg->re_cache);
rspamd_upstreams_library_unref (cfg->ups_ctx);
- rspamd_mempool_delete (cfg->cfg_pool);
g_ptr_array_free (cfg->c_modules, TRUE);
if (cfg->lua_state && cfg->own_lua_state) {
lua_thread_pool_free (cfg->lua_thread_pool);
lua_close (cfg->lua_state);
}
+
+#ifdef WITH_HIREDIS
+ if (cfg->redis_pool) {
+ rspamd_redis_pool_destroy (cfg->redis_pool);
+ }
+#endif
+
+ if (cfg->monitored_ctx) {
+ rspamd_monitored_ctx_destroy (cfg->monitored_ctx);
+ }
+ if (cfg->checksum) {
+ g_free (cfg->checksum);
+ }
+
REF_RELEASE (cfg->libs_ctx);
DL_FOREACH_SAFE (cfg->log_pipes, lp, ltmp) {
@@ -271,6 +272,7 @@ rspamd_config_free (struct rspamd_config *cfg)
g_free (lp);
}
+ rspamd_mempool_delete (cfg->cfg_pool);
g_free (cfg);
}
diff --git a/src/libserver/dns.c b/src/libserver/dns.c
index fbf37363a..5ac215ff7 100644
--- a/src/libserver/dns.c
+++ b/src/libserver/dns.c
@@ -122,7 +122,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
return FALSE;
}
- if (session && rspamd_session_is_destroying (session)) {
+ if (session && rspamd_session_blocked (session)) {
return FALSE;
}
diff --git a/src/libserver/events.c b/src/libserver/events.c
index c23c90328..a15338e7a 100644
--- a/src/libserver/events.c
+++ b/src/libserver/events.c
@@ -21,9 +21,10 @@
#define RSPAMD_SESSION_FLAG_WATCHING (1 << 0)
#define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1)
+#define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2)
#define RSPAMD_SESSION_IS_WATCHING(s) ((s)->flags & RSPAMD_SESSION_FLAG_WATCHING)
-#define RSPAMD_SESSION_IS_DESTROYING(s) ((s)->flags & RSPAMD_SESSION_FLAG_DESTROYING)
+#define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING|RSPAMD_SESSION_FLAG_CLEANUP)))
#define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
"events", session->pool->tag.uid, \
@@ -172,8 +173,9 @@ rspamd_session_add_event (struct rspamd_async_session *session,
g_assert_not_reached ();
}
- if (RSPAMD_SESSION_IS_DESTROYING (session)) {
- msg_debug_session ("skip adding event subsystem: %s: session is destroying",
+ if (!RSPAMD_SESSION_CAN_ADD_EVENT (session)) {
+ msg_debug_session ("skip adding event subsystem: %s: "
+ "session is destroying/cleaning",
g_quark_to_string (subsystem));
return NULL;
@@ -293,7 +295,7 @@ rspamd_session_destroy (struct rspamd_async_session *session)
return FALSE;
}
- if (!(session->flags & RSPAMD_SESSION_FLAG_DESTROYING)) {
+ if (!rspamd_session_blocked (session)) {
session->flags |= RSPAMD_SESSION_FLAG_DESTROYING;
rspamd_session_cleanup (session);
@@ -315,6 +317,8 @@ rspamd_session_cleanup (struct rspamd_async_session *session)
return;
}
+ session->flags |= RSPAMD_SESSION_FLAG_CLEANUP;
+
kh_foreach_key (session->events, ev, {
/* Call event's finalizer */
msg_debug_session ("removed event on destroy: %p, subsystem: %s",
@@ -327,6 +331,8 @@ rspamd_session_cleanup (struct rspamd_async_session *session)
});
kh_clear (rspamd_events_hash, session->events);
+
+ session->flags &= ~RSPAMD_SESSION_FLAG_CLEANUP;
}
gboolean
@@ -509,9 +515,9 @@ rspamd_session_mempool (struct rspamd_async_session *session)
}
gboolean
-rspamd_session_is_destroying (struct rspamd_async_session *session)
+rspamd_session_blocked (struct rspamd_async_session *session)
{
g_assert (session != NULL);
- return RSPAMD_SESSION_IS_DESTROYING (session);
+ return !RSPAMD_SESSION_CAN_ADD_EVENT (session);
} \ No newline at end of file
diff --git a/src/libserver/events.h b/src/libserver/events.h
index 10ccb8d1d..f7eeae9d0 100644
--- a/src/libserver/events.h
+++ b/src/libserver/events.h
@@ -153,6 +153,6 @@ struct rspamd_async_watcher* rspamd_session_get_watcher (
* @param s
* @return
*/
-gboolean rspamd_session_is_destroying (struct rspamd_async_session *s);
+gboolean rspamd_session_blocked (struct rspamd_async_session *s);
#endif /* RSPAMD_EVENTS_H */
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 5510cff05..7b49db59e 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -1572,7 +1572,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
gint ret;
const gchar *learned_key = "learns";
- if (rspamd_session_is_destroying (task->s)) {
+ if (rspamd_session_blocked (task->s)) {
return FALSE;
}
@@ -1667,7 +1667,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
goffset off;
const gchar *learned_key = "learns";
- if (rspamd_session_is_destroying (task->s)) {
+ if (rspamd_session_blocked (task->s)) {
return FALSE;
}
diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c
index e17f20d27..fc928e75e 100644
--- a/src/libstat/learn_cache/redis_cache.c
+++ b/src/libstat/learn_cache/redis_cache.c
@@ -438,7 +438,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
struct timeval tv;
gchar *h;
- if (rspamd_session_is_destroying (task->s)) {
+ if (rspamd_session_blocked (task->s)) {
return RSPAMD_LEARN_INGORE;
}
@@ -473,7 +473,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
gchar *h;
gint flag;
- if (rspamd_session_is_destroying (task->s)) {
+ if (rspamd_session_blocked (task->s)) {
return RSPAMD_LEARN_INGORE;
}
diff --git a/src/libutil/mem_pool.c b/src/libutil/mem_pool.c
index e6941c8f7..322ebc409 100644
--- a/src/libutil/mem_pool.c
+++ b/src/libutil/mem_pool.c
@@ -657,6 +657,27 @@ rspamd_mempool_adjust_entry (struct rspamd_mempool_entry_point *e)
}
void
+rspamd_mempool_destructors_enforce (rspamd_mempool_t *pool)
+{
+ struct _pool_destructors *destructor;
+ guint i;
+
+ POOL_MTX_LOCK ();
+
+ for (i = 0; i < pool->destructors->len; i ++) {
+ destructor = &g_array_index (pool->destructors, struct _pool_destructors, i);
+ /* Avoid calling destructors for NULL pointers */
+ if (destructor->data != NULL) {
+ destructor->func (destructor->data);
+ }
+ }
+
+ pool->destructors->len = 0;
+
+ POOL_MTX_UNLOCK ();
+}
+
+void
rspamd_mempool_delete (rspamd_mempool_t * pool)
{
struct _pool_chain *cur;
@@ -667,7 +688,6 @@ rspamd_mempool_delete (rspamd_mempool_t * pool)
POOL_MTX_LOCK ();
- /* Find free space in pool chain */
cur = NULL;
if (pool->pools[RSPAMD_MEMPOOL_NORMAL] != NULL &&
diff --git a/src/libutil/mem_pool.h b/src/libutil/mem_pool.h
index 27d4c8ebf..c8dbf6042 100644
--- a/src/libutil/mem_pool.h
+++ b/src/libutil/mem_pool.h
@@ -224,6 +224,13 @@ void rspamd_mempool_replace_destructor (rspamd_mempool_t *pool,
rspamd_mempool_destruct_t func, void *old_data, void *new_data);
/**
+ * Calls all destructors associated with the specific memory pool without removing
+ * of the pool itself
+ * @param pool
+ */
+void rspamd_mempool_destructors_enforce (rspamd_mempool_t *pool);
+
+/**
* Delete pool, free all its chunks and call destructors chain
* @param pool memory pool object
*/
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index 1534ebfa2..2eb49d281 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -809,7 +809,7 @@ lua_http_request (lua_State *L)
return 1;
}
- if (session && rspamd_session_is_destroying (session)) {
+ if (session && rspamd_session_blocked (session)) {
lua_pushboolean (L, FALSE);
return 1;
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 0fc9c43b7..4003ac36a 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -686,7 +686,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
lua_pop (L, 1); /* table */
- if (session && rspamd_session_is_destroying (session)) {
+ if (session && rspamd_session_blocked (session)) {
ret = FALSE;
}
@@ -1215,7 +1215,7 @@ lua_redis_add_cmd (lua_State *L)
LL_PREPEND (sp_ud->c->specific, sp_ud);
- if (ud->s && rspamd_session_is_destroying (ud->s)) {
+ if (ud->s && rspamd_session_blocked (ud->s)) {
lua_pushboolean (L, 0);
lua_pushstring (L, "session is terminating");
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 3bd1ffad2..85f294142 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -1573,7 +1573,7 @@ lua_tcp_request (lua_State *L)
if (session) {
cbd->session = session;
- if (rspamd_session_is_destroying (session)) {
+ if (rspamd_session_blocked (session)) {
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
@@ -1737,7 +1737,7 @@ lua_tcp_connect_sync (lua_State *L)
if (session) {
cbd->session = session;
- if (rspamd_session_is_destroying (session)) {
+ if (rspamd_session_blocked (session)) {
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Session is being destroyed, requests are not allowed");
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index bf08c0e46..61ff8f54e 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -2830,7 +2830,7 @@ register_fuzzy_client_call (struct rspamd_task *task,
rspamd_inet_addr_t *addr;
gint sock;
- if (!rspamd_session_is_destroying (task->s)) {
+ if (!rspamd_session_blocked (task->s)) {
/* Get upstream */
selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL, 0);
@@ -3311,7 +3311,7 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
gint ret = -1;
/* Get upstream */
- if (!rspamd_session_is_destroying (task->s)) {
+ if (!rspamd_session_blocked (task->s)) {
while ((selected = rspamd_upstream_get (rule->servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 5b2375888..31c873304 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -1638,7 +1638,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
struct rspamd_http_message *msg;
struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);
- if (!rspamd_session_is_destroying (task->s)) {
+ if (!rspamd_session_blocked (task->s)) {
selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c
index 710b9c5ef..c6c5fd0bf 100644
--- a/src/rspamadm/lua_repl.c
+++ b/src/rspamadm/lua_repl.c
@@ -287,8 +287,8 @@ rspamadm_exec_input (lua_State *L, const gchar *input)
}
}
-void
-wait_session_events ()
+static void
+wait_session_events (void)
{
/* XXX: it's probably worth to add timeout here - not to wait forever */
while (rspamd_session_events_pending (rspamadm_session) > 0) {
diff --git a/src/rspamadm/rspamadm.c b/src/rspamadm/rspamadm.c
index fad9b2fcd..052dcd1d6 100644
--- a/src/rspamadm/rspamadm.c
+++ b/src/rspamadm/rspamadm.c
@@ -323,7 +323,7 @@ rspamadm_command_maybe_match_name (const gchar *cmd, const gchar *input)
static void
-rspamadm_add_lua_globals()
+rspamadm_add_lua_globals (void)
{
struct rspamd_async_session **psession;
struct event_base **pev_base;
@@ -444,7 +444,7 @@ main (gint argc, gchar **argv, gchar **env)
L = cfg->lua_state;
rspamd_lua_set_path (L, NULL, ucl_vars);
rspamd_lua_set_globals (cfg, L, ucl_vars);
- rspamadm_add_lua_globals();
+ rspamadm_add_lua_globals ();
#ifdef WITH_HIREDIS
rspamd_redis_pool_config (cfg->redis_pool, cfg, rspamd_main->ev_base);