diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-14 12:51:54 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-14 12:51:54 +0100 |
commit | 1af89c35663dd364c8d46523f0d71adc64f8bde4 (patch) | |
tree | 1a814dea5f5ce6045bd53f2e802dd3eb87123ca0 /src | |
parent | 0478b4ee28138a886f9b457bdba7f2897e151093 (diff) | |
download | rspamd-1af89c35663dd364c8d46523f0d71adc64f8bde4.tar.gz rspamd-1af89c35663dd364c8d46523f0d71adc64f8bde4.zip |
[Fix] Add fail-safety for destroying sessions
Diffstat (limited to 'src')
-rw-r--r-- | src/libserver/dns.c | 4 | ||||
-rw-r--r-- | src/libserver/events.c | 15 | ||||
-rw-r--r-- | src/libserver/events.h | 7 | ||||
-rw-r--r-- | src/libstat/backends/redis_backend.c | 7 | ||||
-rw-r--r-- | src/libstat/learn_cache/redis_cache.c | 8 | ||||
-rw-r--r-- | src/lua/lua_http.c | 8 | ||||
-rw-r--r-- | src/lua/lua_redis.c | 11 | ||||
-rw-r--r-- | src/lua/lua_tcp.c | 17 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 167 | ||||
-rw-r--r-- | src/plugins/surbl.c | 95 |
10 files changed, 208 insertions, 131 deletions
diff --git a/src/libserver/dns.c b/src/libserver/dns.c index 6d488ccf3..fbf37363a 100644 --- a/src/libserver/dns.c +++ b/src/libserver/dns.c @@ -122,6 +122,10 @@ make_dns_request (struct rspamd_dns_resolver *resolver, return FALSE; } + if (session && rspamd_session_is_destroying (session)) { + return FALSE; + } + if (pool != NULL) { reqdata = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_dns_request_ud)); diff --git a/src/libserver/events.c b/src/libserver/events.c index 9b0d049d4..f62005b96 100644 --- a/src/libserver/events.c +++ b/src/libserver/events.c @@ -172,6 +172,13 @@ rspamd_session_add_event (struct rspamd_async_session *session, g_assert_not_reached (); } + if (RSPAMD_SESSION_IS_DESTROYING (session)) { + msg_debug_session ("skip adding event subsystem: %s: session is destroying", + g_quark_to_string (subsystem)); + + return NULL; + } + new_event = rspamd_mempool_alloc (session->pool, sizeof (struct rspamd_async_event)); new_event->fin = fin; @@ -502,4 +509,12 @@ rspamd_session_mempool (struct rspamd_async_session *session) g_assert (session != NULL); return session->pool; +} + +gboolean +rspamd_session_is_destroying (struct rspamd_async_session *session) +{ + g_assert (session != NULL); + + return RSPAMD_SESSION_IS_DESTROYING (session); }
\ No newline at end of file diff --git a/src/libserver/events.h b/src/libserver/events.h index 760bb000c..10ccb8d1d 100644 --- a/src/libserver/events.h +++ b/src/libserver/events.h @@ -148,4 +148,11 @@ void rspamd_session_watcher_pop (struct rspamd_async_session *s, struct rspamd_async_watcher* rspamd_session_get_watcher ( struct rspamd_async_session *s); +/** + * Returns TRUE if an async session is currently destroying + * @param s + * @return + */ +gboolean rspamd_session_is_destroying (struct rspamd_async_session *s); + #endif /* RSPAMD_EVENTS_H */ diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 69c14e167..5510cff05 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -1572,6 +1572,9 @@ rspamd_redis_process_tokens (struct rspamd_task *task, gint ret; const gchar *learned_key = "learns"; + if (rspamd_session_is_destroying (task->s)) { + return FALSE; + } if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) { return FALSE; @@ -1664,6 +1667,10 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, goffset off; const gchar *learned_key = "learns"; + if (rspamd_session_is_destroying (task->s)) { + return FALSE; + } + up = rspamd_upstream_get (rt->ctx->write_servers, RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c index d43ec3665..e17f20d27 100644 --- a/src/libstat/learn_cache/redis_cache.c +++ b/src/libstat/learn_cache/redis_cache.c @@ -438,6 +438,10 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, struct timeval tv; gchar *h; + if (rspamd_session_is_destroying (task->s)) { + return RSPAMD_LEARN_INGORE; + } + h = rspamd_mempool_get_variable (task->task_pool, "words_hash"); if (h == NULL) { @@ -469,6 +473,10 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, gchar *h; gint flag; + if (rspamd_session_is_destroying (task->s)) { + return RSPAMD_LEARN_INGORE; + } + h = rspamd_mempool_get_variable (task->task_pool, "words_hash"); g_assert (h != NULL); diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index b9a2d7b59..cb2026421 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -377,7 +377,7 @@ lua_http_request (lua_State *L) struct rspamd_http_message *msg; struct lua_http_cbdata *cbd; struct rspamd_dns_resolver *resolver; - struct rspamd_async_session *session; + struct rspamd_async_session *session = NULL; struct rspamd_lua_text *t; struct rspamd_task *task = NULL; struct rspamd_config *cfg = NULL; @@ -700,6 +700,12 @@ lua_http_request (lua_State *L) return 1; } + if (session && rspamd_session_is_destroying (session)) { + lua_pushboolean (L, FALSE); + + return 1; + } + cbd = g_malloc0 (sizeof (*cbd)); cbd->L = L; cbd->cbref = cbref; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index cf416a12a..82ef0c53b 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -675,6 +675,10 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) lua_pop (L, 1); /* table */ + if (session && rspamd_session_is_destroying (session)) { + ret = FALSE; + } + if (ret && addr != NULL) { ctx = g_malloc0 (sizeof (struct lua_redis_ctx)); @@ -1200,6 +1204,13 @@ lua_redis_add_cmd (lua_State *L) LL_PREPEND (sp_ud->c->specific, sp_ud); + if (ud->s && rspamd_session_is_destroying (ud->s)) { + lua_pushboolean (L, 0); + lua_pushstring (L, "session is terminating"); + + return 2; + } + ret = redisAsyncCommandArgv (sp_ud->c->ctx, lua_redis_callback, sp_ud, diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 6c3f6ec04..797bdcc4e 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -804,7 +804,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, } } -static void + +static gboolean lua_tcp_register_event (struct lua_tcp_cbdata *cbd) { if (cbd->session) { @@ -812,9 +813,16 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd) (event_finalizer_t) lua_tcp_fin, cbd, g_quark_from_static_string ("lua tcp")); + + if (!cbd->async_ev) { + return FALSE; + } + cbd->w = rspamd_session_get_watcher (cbd->session); rspamd_session_watcher_push (cbd->session); } + + return TRUE; } static gboolean @@ -1232,6 +1240,13 @@ lua_tcp_request (lua_State *L) if (session) { cbd->session = session; + + if (rspamd_session_is_destroying (session)) { + REF_RELEASE (cbd); + lua_pushboolean (L, FALSE); + + return 1; + } } if (rspamd_parse_inet_address (&cbd->addr, host, 0)) { diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index b5c390328..c0fd8aa4c 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -2830,49 +2830,50 @@ register_fuzzy_client_call (struct rspamd_task *task, rspamd_inet_addr_t *addr; gint sock; - /* Get upstream */ - selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN, - NULL, 0); - if (selected) { - addr = rspamd_upstream_addr (selected); - if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) { - msg_warn_task ("cannot connect to %s(%s), %d, %s", - rspamd_upstream_name (selected), - rspamd_inet_address_to_string_pretty (addr), - errno, - strerror (errno)); - rspamd_upstream_fail (selected, FALSE); - g_ptr_array_free (commands, TRUE); - } - else { - /* Create session for a socket */ - session = - rspamd_mempool_alloc0 (task->task_pool, - sizeof (struct fuzzy_client_session)); - msec_to_tv (rule->ctx->io_timeout, &session->tv); - session->state = 0; - session->commands = commands; - session->task = task; - session->fd = sock; - session->server = selected; - session->rule = rule; - session->addr = addr; - session->results = g_ptr_array_sized_new (32); - - event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback, - session); - event_base_set (session->task->ev_base, &session->ev); - event_add (&session->ev, NULL); + if (!rspamd_session_is_destroying (task->s)) { + /* Get upstream */ + selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, 0); + if (selected) { + addr = rspamd_upstream_addr (selected); + if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) { + msg_warn_task ("cannot connect to %s(%s), %d, %s", + rspamd_upstream_name (selected), + rspamd_inet_address_to_string_pretty (addr), + errno, + strerror (errno)); + rspamd_upstream_fail (selected, FALSE); + g_ptr_array_free (commands, TRUE); + } else { + /* Create session for a socket */ + session = + rspamd_mempool_alloc0 (task->task_pool, + sizeof (struct fuzzy_client_session)); + msec_to_tv (rule->ctx->io_timeout, &session->tv); + session->state = 0; + session->commands = commands; + session->task = task; + session->fd = sock; + session->server = selected; + session->rule = rule; + session->addr = addr; + session->results = g_ptr_array_sized_new (32); + + event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback, + session); + event_base_set (session->task->ev_base, &session->ev); + event_add (&session->ev, NULL); - evtimer_set (&session->timev, fuzzy_check_timer_callback, - session); - event_base_set (session->task->ev_base, &session->timev); - event_add (&session->timev, &session->tv); + evtimer_set (&session->timev, fuzzy_check_timer_callback, + session); + event_base_set (session->task->ev_base, &session->timev); + event_add (&session->timev, &session->tv); - rspamd_session_add_event (task->s, - fuzzy_io_fin, - session, - g_quark_from_static_string ("fuzzy check")); + rspamd_session_add_event (task->s, + fuzzy_io_fin, + session, + g_quark_from_static_string ("fuzzy check")); + } } } } @@ -3310,48 +3311,48 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, gint ret = -1; /* Get upstream */ - - while ((selected = rspamd_upstream_get (rule->servers, - RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { - /* Create UDP socket */ - addr = rspamd_upstream_addr (selected); - - if ((sock = rspamd_inet_address_connect (addr, - SOCK_DGRAM, TRUE)) == -1) { - rspamd_upstream_fail (selected, TRUE); - } - else { - s = - rspamd_mempool_alloc0 (task->task_pool, - sizeof (struct fuzzy_learn_session)); - - msec_to_tv (rule->ctx->io_timeout, &s->tv); - s->task = task; - s->addr = addr; - s->commands = commands; - s->http_entry = NULL; - s->server = selected; - s->saved = saved; - s->fd = sock; - s->err = err; - s->rule = rule; - s->session = task->s; - - event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); - event_base_set (task->ev_base, &s->ev); - event_add (&s->ev, NULL); - - evtimer_set (&s->timev, fuzzy_controller_timer_callback, s); - event_base_set (s->task->ev_base, &s->timev); - event_add (&s->timev, &s->tv); - - rspamd_session_add_event (task->s, - fuzzy_lua_fin, - s, - g_quark_from_static_string ("fuzzy check")); - - (*saved)++; - ret = 1; + if (!rspamd_session_is_destroying (task->s)) { + while ((selected = rspamd_upstream_get (rule->servers, + RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) { + /* Create UDP socket */ + addr = rspamd_upstream_addr (selected); + + if ((sock = rspamd_inet_address_connect (addr, + SOCK_DGRAM, TRUE)) == -1) { + rspamd_upstream_fail (selected, TRUE); + } else { + s = + rspamd_mempool_alloc0 (task->task_pool, + sizeof (struct fuzzy_learn_session)); + + msec_to_tv (rule->ctx->io_timeout, &s->tv); + s->task = task; + s->addr = addr; + s->commands = commands; + s->http_entry = NULL; + s->server = selected; + s->saved = saved; + s->fd = sock; + s->err = err; + s->rule = rule; + s->session = task->s; + + event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); + event_base_set (task->ev_base, &s->ev); + event_add (&s->ev, NULL); + + evtimer_set (&s->timev, fuzzy_controller_timer_callback, s); + event_base_set (s->task->ev_base, &s->timev); + event_add (&s->timev, &s->tv); + + rspamd_session_add_event (task->s, + fuzzy_lua_fin, + s, + g_quark_from_static_string ("fuzzy check")); + + (*saved)++; + ret = 1; + } } } diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 81496f0a9..5b2375888 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -1638,55 +1638,58 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task, struct rspamd_http_message *msg; struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg); - selected = rspamd_upstream_get (surbl_module_ctx->redirectors, - RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen); + if (!rspamd_session_is_destroying (task->s)) { - if (selected) { - s = rspamd_inet_address_connect (rspamd_upstream_addr (selected), - SOCK_STREAM, TRUE); - } + selected = rspamd_upstream_get (surbl_module_ctx->redirectors, + RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen); - if (s == -1) { - msg_info_surbl ("<%s> cannot create tcp socket failed: %s", - task->message_id, - strerror (errno)); - return; - } + if (selected) { + s = rspamd_inet_address_connect (rspamd_upstream_addr (selected), + SOCK_STREAM, TRUE); + } - param = - rspamd_mempool_alloc (task->task_pool, - sizeof (struct redirector_param)); - param->url = url; - param->task = task; - param->conn = rspamd_http_connection_new (NULL, - surbl_redirector_error, - surbl_redirector_finish, - RSPAMD_HTTP_CLIENT_SIMPLE, - RSPAMD_HTTP_CLIENT, - NULL, - NULL); - param->ctx = surbl_module_ctx; - msg = rspamd_http_new_message (HTTP_REQUEST); - msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen); - param->sock = s; - param->redirector = selected; - timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval)); - double_to_tv (surbl_module_ctx->read_timeout, timeout); - - rspamd_session_add_event (task->s, - free_redirector_session, - param, - g_quark_from_static_string ("surbl")); - - rspamd_http_connection_write_message (param->conn, msg, NULL, - NULL, param, s, timeout, task->ev_base); - - msg_info_surbl ( - "<%s> registered redirector call for %*s to %s, according to rule: %s", - task->message_id, - url->urllen, url->string, - rspamd_upstream_name (param->redirector), - rule); + if (s == -1) { + msg_info_surbl ("<%s> cannot create tcp socket failed: %s", + task->message_id, + strerror (errno)); + return; + } + + param = + rspamd_mempool_alloc (task->task_pool, + sizeof (struct redirector_param)); + param->url = url; + param->task = task; + param->conn = rspamd_http_connection_new (NULL, + surbl_redirector_error, + surbl_redirector_finish, + RSPAMD_HTTP_CLIENT_SIMPLE, + RSPAMD_HTTP_CLIENT, + NULL, + NULL); + param->ctx = surbl_module_ctx; + msg = rspamd_http_new_message (HTTP_REQUEST); + msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen); + param->sock = s; + param->redirector = selected; + timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval)); + double_to_tv (surbl_module_ctx->read_timeout, timeout); + + rspamd_session_add_event (task->s, + free_redirector_session, + param, + g_quark_from_static_string ("surbl")); + + rspamd_http_connection_write_message (param->conn, msg, NULL, + NULL, param, s, timeout, task->ev_base); + + msg_info_surbl ( + "<%s> registered redirector call for %*s to %s, according to rule: %s", + task->message_id, + url->urllen, url->string, + rspamd_upstream_name (param->redirector), + rule); + } } static gboolean |