diff options
-rw-r--r-- | lualib/lua_redis.lua | 90 | ||||
-rw-r--r-- | lualib/lua_selectors.lua | 17 | ||||
-rw-r--r-- | lualib/lua_util.lua | 20 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 32 | ||||
-rw-r--r-- | src/libserver/dns.c | 2 | ||||
-rw-r--r-- | src/libserver/events.c | 18 | ||||
-rw-r--r-- | src/libserver/events.h | 2 | ||||
-rw-r--r-- | src/libstat/backends/redis_backend.c | 4 | ||||
-rw-r--r-- | src/libstat/learn_cache/redis_cache.c | 4 | ||||
-rw-r--r-- | src/libutil/mem_pool.c | 22 | ||||
-rw-r--r-- | src/libutil/mem_pool.h | 7 | ||||
-rw-r--r-- | src/lua/lua_http.c | 2 | ||||
-rw-r--r-- | src/lua/lua_redis.c | 4 | ||||
-rw-r--r-- | src/lua/lua_tcp.c | 4 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 4 | ||||
-rw-r--r-- | src/plugins/surbl.c | 2 | ||||
-rw-r--r-- | src/rspamadm/lua_repl.c | 4 | ||||
-rw-r--r-- | src/rspamadm/rspamadm.c | 4 |
18 files changed, 187 insertions, 55 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua index 33757b154..44cb75f4a 100644 --- a/lualib/lua_redis.lua +++ b/lualib/lua_redis.lua @@ -1018,4 +1018,94 @@ end exports.redis_connect_sync = redis_connect_sync +--[[[ +-- @function lua_redis.request(redis_params, attrs, req) +-- Sends a request to Redis (modern API) +-- @param redis_params a table of redis server parameters +-- @param attrs a table of redis request attributes (e.g. task, or ev_base + cfg + session) +-- @param req a table of request: a command + command options +--]] + +exports.request = function(redis_params, attrs, req) + local lua_util = require "lua_util" + + if not attrs or not redis_params or not req then + logger.errx('invalid arguments for redis request') + return false,nil,nil + end + + if not (attrs.task or (attrs.config and attrs.ev_base)) then + logger.errx('invalid attributes for redis request') + return false,nil,nil + end + + local opts = lua_util.shallowcopy(attrs) + + local log_obj = opts.task or opts.config + + local addr + + if opts.callback then + -- Wrap callback + local callback = opts.callback + local function rspamd_redis_make_request_cb(err, data) + if err then + addr:fail() + else + addr:ok() + end + callback(err, data, addr) + end + opts.callback = rspamd_redis_make_request_cb + end + + local rspamd_redis = require "rspamd_redis" + local is_write = opts.is_write + + if opts.key then + if is_write then + addr = redis_params['write_servers']:get_upstream_by_hash(attrs.key) + else + addr = redis_params['read_servers']:get_upstream_by_hash(attrs.key) + end + else + if is_write then + addr = redis_params['write_servers']:get_upstream_master_slave(attrs.key) + else + addr = redis_params['read_servers']:get_upstream_round_robin(attrs.key) + end + end + + if not addr then + logger.errx(log_obj, 'cannot select server to make redis request') + end + + opts.host = addr:get_addr() + opts.timeout = redis_params.timeout + + if type(req) == 'string' then + opts.cmd = req + else + -- XXX: modifies the input table + opts.cmd = table.remove(req, 1); + opts.args = req + end + + if redis_params.password then + opts.password = redis_params.password + end + + if redis_params.db then + opts.dbname = redis_params.db + end + + local ret,conn = rspamd_redis.make_request(opts) + if not ret then + logger.errx(log_obj, 'cannot execute redis request') + addr:fail() + end + + return ret,conn,addr +end + return exports diff --git a/lualib/lua_selectors.lua b/lualib/lua_selectors.lua index d1ef91230..112443fd3 100644 --- a/lualib/lua_selectors.lua +++ b/lualib/lua_selectors.lua @@ -536,19 +536,6 @@ exports.parse_selector = function(cfg, str) local output = {} if not parsed then return nil end - local function shallowcopy(orig) - local orig_type = type(orig) - local copy - if orig_type == 'table' then - copy = {} - for orig_key, orig_value in pairs(orig) do - copy[orig_key] = orig_value - end - else - copy = orig - end - return copy - end -- Output AST format is the following: -- table of individual selectors @@ -570,7 +557,7 @@ exports.parse_selector = function(cfg, str) return nil end - res.selector = shallowcopy(extractors[selector_tbl[1]]) + res.selector = lua_util.shallowcopy(extractors[selector_tbl[1]]) res.selector.name = selector_tbl[1] res.selector.args = selector_tbl[2] or {} @@ -585,7 +572,7 @@ exports.parse_selector = function(cfg, str) logger.errx(cfg, 'processor %s is unknown', proc_name) return nil end - local processor = shallowcopy(transform_function[proc_name]) + local processor = lua_util.shallowcopy(transform_function[proc_name]) processor.name = proc_name processor.args = proc_tbl[2] lua_util.debugm(M, cfg, 'attached processor %s to selector %s, args: %s', diff --git a/lualib/lua_util.lua b/lualib/lua_util.lua index 0ce0c1874..e0096daa1 100644 --- a/lualib/lua_util.lua +++ b/lualib/lua_util.lua @@ -664,6 +664,25 @@ exports.extract_specific_urls = function(params_or_task, lim, need_emails, filte return res end + +--[[[ +-- @function lua_util.shallowcopy(tbl) +-- Performs shallow (and fast) copy of a table or another Lua type +--]] +exports.shallowcopy = function(orig) + local orig_type = type(orig) + local copy + if orig_type == 'table' then + copy = {} + for orig_key, orig_value in pairs(orig) do + copy[orig_key] = orig_value + end + else + copy = orig + end + return copy +end + -- Debugging support local unconditional_debug = false local debug_modules = {} @@ -697,4 +716,5 @@ exports.debugm = function(mod, ...) end end + return exports diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index 016556912..f4f9393bf 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -213,8 +213,6 @@ rspamd_config_free (struct rspamd_config *cfg) struct rspamd_config_post_load_script *sc, *sctmp; struct rspamd_worker_log_pipe *lp, *ltmp; - rspamd_map_remove_all (cfg); - DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) { luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref); g_free (sc); @@ -225,18 +223,12 @@ rspamd_config_free (struct rspamd_config *cfg) g_free (sc); } - if (cfg->monitored_ctx) { - rspamd_monitored_ctx_destroy (cfg->monitored_ctx); - } + rspamd_map_remove_all (cfg); + rspamd_mempool_destructors_enforce (cfg->cfg_pool); g_list_free (cfg->classifiers); g_list_free (cfg->workers); rspamd_symbols_cache_destroy (cfg->cache); -#ifdef WITH_HIREDIS - if (cfg->redis_pool) { - rspamd_redis_pool_destroy (cfg->redis_pool); - } -#endif ucl_object_unref (cfg->rcl_obj); ucl_object_unref (cfg->config_comments); ucl_object_unref (cfg->doc_strings); @@ -251,19 +243,28 @@ rspamd_config_free (struct rspamd_config *cfg) g_hash_table_unref (cfg->wrk_parsers); g_hash_table_unref (cfg->trusted_keys); - if (cfg->checksum) { - g_free (cfg->checksum); - } - rspamd_re_cache_unref (cfg->re_cache); rspamd_upstreams_library_unref (cfg->ups_ctx); - rspamd_mempool_delete (cfg->cfg_pool); g_ptr_array_free (cfg->c_modules, TRUE); if (cfg->lua_state && cfg->own_lua_state) { lua_thread_pool_free (cfg->lua_thread_pool); lua_close (cfg->lua_state); } + +#ifdef WITH_HIREDIS + if (cfg->redis_pool) { + rspamd_redis_pool_destroy (cfg->redis_pool); + } +#endif + + if (cfg->monitored_ctx) { + rspamd_monitored_ctx_destroy (cfg->monitored_ctx); + } + if (cfg->checksum) { + g_free (cfg->checksum); + } + REF_RELEASE (cfg->libs_ctx); DL_FOREACH_SAFE (cfg->log_pipes, lp, ltmp) { @@ -271,6 +272,7 @@ rspamd_config_free (struct rspamd_config *cfg) g_free (lp); } + rspamd_mempool_delete (cfg->cfg_pool); g_free (cfg); } diff --git a/src/libserver/dns.c b/src/libserver/dns.c index fbf37363a..5ac215ff7 100644 --- a/src/libserver/dns.c +++ b/src/libserver/dns.c @@ -122,7 +122,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver, return FALSE; } - if (session && rspamd_session_is_destroying (session)) { + if (session && rspamd_session_blocked (session)) { return FALSE; } diff --git a/src/libserver/events.c b/src/libserver/events.c index c23c90328..a15338e7a 100644 --- a/src/libserver/events.c +++ b/src/libserver/events.c @@ -21,9 +21,10 @@ #define RSPAMD_SESSION_FLAG_WATCHING (1 << 0) #define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1) +#define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2) #define RSPAMD_SESSION_IS_WATCHING(s) ((s)->flags & RSPAMD_SESSION_FLAG_WATCHING) -#define RSPAMD_SESSION_IS_DESTROYING(s) ((s)->flags & RSPAMD_SESSION_FLAG_DESTROYING) +#define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING|RSPAMD_SESSION_FLAG_CLEANUP))) #define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ "events", session->pool->tag.uid, \ @@ -172,8 +173,9 @@ rspamd_session_add_event (struct rspamd_async_session *session, g_assert_not_reached (); } - if (RSPAMD_SESSION_IS_DESTROYING (session)) { - msg_debug_session ("skip adding event subsystem: %s: session is destroying", + if (!RSPAMD_SESSION_CAN_ADD_EVENT (session)) { + msg_debug_session ("skip adding event subsystem: %s: " + "session is destroying/cleaning", g_quark_to_string (subsystem)); return NULL; @@ -293,7 +295,7 @@ rspamd_session_destroy (struct rspamd_async_session *session) return FALSE; } - if (!(session->flags & RSPAMD_SESSION_FLAG_DESTROYING)) { + if (!rspamd_session_blocked (session)) { session->flags |= RSPAMD_SESSION_FLAG_DESTROYING; rspamd_session_cleanup (session); @@ -315,6 +317,8 @@ rspamd_session_cleanup (struct rspamd_async_session *session) return; } + session->flags |= RSPAMD_SESSION_FLAG_CLEANUP; + kh_foreach_key (session->events, ev, { /* Call event's finalizer */ msg_debug_session ("removed event on destroy: %p, subsystem: %s", @@ -327,6 +331,8 @@ rspamd_session_cleanup (struct rspamd_async_session *session) }); kh_clear (rspamd_events_hash, session->events); + + session->flags &= ~RSPAMD_SESSION_FLAG_CLEANUP; } gboolean @@ -509,9 +515,9 @@ rspamd_session_mempool (struct rspamd_async_session *session) } gboolean -rspamd_session_is_destroying (struct rspamd_async_session *session) +rspamd_session_blocked (struct rspamd_async_session *session) { g_assert (session != NULL); - return RSPAMD_SESSION_IS_DESTROYING (session); + return !RSPAMD_SESSION_CAN_ADD_EVENT (session); }
\ No newline at end of file diff --git a/src/libserver/events.h b/src/libserver/events.h index 10ccb8d1d..f7eeae9d0 100644 --- a/src/libserver/events.h +++ b/src/libserver/events.h @@ -153,6 +153,6 @@ struct rspamd_async_watcher* rspamd_session_get_watcher ( * @param s * @return */ -gboolean rspamd_session_is_destroying (struct rspamd_async_session *s); +gboolean rspamd_session_blocked (struct rspamd_async_session *s); #endif /* RSPAMD_EVENTS_H */ diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 5510cff05..7b49db59e 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -1572,7 +1572,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task, gint ret; const gchar *learned_key = "learns"; - if (rspamd_session_is_destroying (task->s)) { + if (rspamd_session_blocked (task->s)) { return FALSE; } @@ -1667,7 +1667,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, goffset off; const gchar *learned_key = "learns"; - if (rspamd_session_is_destroying (task->s)) { + if (rspamd_session_blocked (task->s)) { return FALSE; } diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c index e17f20d27..fc928e75e 100644 --- a/src/libstat/learn_cache/redis_cache.c +++ b/src/libstat/learn_cache/redis_cache.c @@ -438,7 +438,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, struct timeval tv; gchar *h; - if (rspamd_session_is_destroying (task->s)) { + if (rspamd_session_blocked (task->s)) { return RSPAMD_LEARN_INGORE; } @@ -473,7 +473,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, gchar *h; gint flag; - if (rspamd_session_is_destroying (task->s)) { + if (rspamd_session_blocked (task->s)) { return RSPAMD_LEARN_INGORE; } diff --git a/src/libutil/mem_pool.c b/src/libutil/mem_pool.c index e6941c8f7..322ebc409 100644 --- a/src/libutil/mem_pool.c +++ b/src/libutil/mem_pool.c @@ -657,6 +657,27 @@ rspamd_mempool_adjust_entry (struct rspamd_mempool_entry_point *e) } void +rspamd_mempool_destructors_enforce (rspamd_mempool_t *pool) +{ + struct _pool_destructors *destructor; + guint i; + + POOL_MTX_LOCK (); + + for (i = 0; i < pool->destructors->len; i ++) { + destructor = &g_array_index (pool->destructors, struct _pool_destructors, i); + /* Avoid calling destructors for NULL pointers */ + if (destructor->data != NULL) { + destructor->func (destructor->data); + } + } + + pool->destructors->len = 0; + + POOL_MTX_UNLOCK (); +} + +void rspamd_mempool_delete (rspamd_mempool_t * pool) { struct _pool_chain *cur; @@ -667,7 +688,6 @@ rspamd_mempool_delete (rspamd_mempool_t * pool) POOL_MTX_LOCK (); - /* Find free space in pool chain */ cur = NULL; if (pool->pools[RSPAMD_MEMPOOL_NORMAL] != NULL && diff --git a/src/libutil/mem_pool.h b/src/libutil/mem_pool.h index 27d4c8ebf..c8dbf6042 100644 --- a/src/libutil/mem_pool.h +++ b/src/libutil/mem_pool.h @@ -224,6 +224,13 @@ void rspamd_mempool_replace_destructor (rspamd_mempool_t *pool, rspamd_mempool_destruct_t func, void *old_data, void *new_data); /** + * Calls all destructors associated with the specific memory pool without removing + * of the pool itself + * @param pool + */ +void rspamd_mempool_destructors_enforce (rspamd_mempool_t *pool); + +/** * Delete pool, free all its chunks and call destructors chain * @param pool memory pool object */ diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 1534ebfa2..2eb49d281 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -809,7 +809,7 @@ lua_http_request (lua_State *L) return 1; } - if (session && rspamd_session_is_destroying (session)) { + if (session && rspamd_session_blocked (session)) { lua_pushboolean (L, FALSE); return 1; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 0fc9c43b7..4003ac36a 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -686,7 +686,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) lua_pop (L, 1); /* table */ - if (session && rspamd_session_is_destroying (session)) { + if (session && rspamd_session_blocked (session)) { ret = FALSE; } @@ -1215,7 +1215,7 @@ lua_redis_add_cmd (lua_State *L) LL_PREPEND (sp_ud->c->specific, sp_ud); - if (ud->s && rspamd_session_is_destroying (ud->s)) { + if (ud->s && rspamd_session_blocked (ud->s)) { lua_pushboolean (L, 0); lua_pushstring (L, "session is terminating"); diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 3bd1ffad2..85f294142 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -1573,7 +1573,7 @@ lua_tcp_request (lua_State *L) if (session) { cbd->session = session; - if (rspamd_session_is_destroying (session)) { + if (rspamd_session_blocked (session)) { TCP_RELEASE (cbd); lua_pushboolean (L, FALSE); @@ -1737,7 +1737,7 @@ lua_tcp_connect_sync (lua_State *L) if (session) { cbd->session = session; - if (rspamd_session_is_destroying (session)) { + if (rspamd_session_blocked (session)) { TCP_RELEASE (cbd); lua_pushboolean (L, FALSE); lua_pushliteral (L, "Session is being destroyed, requests are not allowed"); diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index bf08c0e46..61ff8f54e 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -2830,7 +2830,7 @@ register_fuzzy_client_call (struct rspamd_task *task, rspamd_inet_addr_t *addr; gint sock; - if (!rspamd_session_is_destroying (task->s)) { + if (!rspamd_session_blocked (task->s)) { /* Get upstream */ selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); @@ -3311,7 +3311,7 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, gint ret = -1; /* Get upstream */ - if (!rspamd_session_is_destroying (task->s)) { + if (!rspamd_session_blocked (task->s)) { while ((selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { /* Create UDP socket */ diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 5b2375888..31c873304 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -1638,7 +1638,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task, struct rspamd_http_message *msg; struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg); - if (!rspamd_session_is_destroying (task->s)) { + if (!rspamd_session_blocked (task->s)) { selected = rspamd_upstream_get (surbl_module_ctx->redirectors, RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen); diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c index 710b9c5ef..c6c5fd0bf 100644 --- a/src/rspamadm/lua_repl.c +++ b/src/rspamadm/lua_repl.c @@ -287,8 +287,8 @@ rspamadm_exec_input (lua_State *L, const gchar *input) } } -void -wait_session_events () +static void +wait_session_events (void) { /* XXX: it's probably worth to add timeout here - not to wait forever */ while (rspamd_session_events_pending (rspamadm_session) > 0) { diff --git a/src/rspamadm/rspamadm.c b/src/rspamadm/rspamadm.c index fad9b2fcd..052dcd1d6 100644 --- a/src/rspamadm/rspamadm.c +++ b/src/rspamadm/rspamadm.c @@ -323,7 +323,7 @@ rspamadm_command_maybe_match_name (const gchar *cmd, const gchar *input) static void -rspamadm_add_lua_globals() +rspamadm_add_lua_globals (void) { struct rspamd_async_session **psession; struct event_base **pev_base; @@ -444,7 +444,7 @@ main (gint argc, gchar **argv, gchar **env) L = cfg->lua_state; rspamd_lua_set_path (L, NULL, ucl_vars); rspamd_lua_set_globals (cfg, L, ucl_vars); - rspamadm_add_lua_globals(); + rspamadm_add_lua_globals (); #ifdef WITH_HIREDIS rspamd_redis_pool_config (cfg->redis_pool, cfg, rspamd_main->ev_base); |