aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-14 12:51:54 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-14 12:51:54 +0100
commit1af89c35663dd364c8d46523f0d71adc64f8bde4 (patch)
tree1a814dea5f5ce6045bd53f2e802dd3eb87123ca0 /src
parent0478b4ee28138a886f9b457bdba7f2897e151093 (diff)
downloadrspamd-1af89c35663dd364c8d46523f0d71adc64f8bde4.tar.gz
rspamd-1af89c35663dd364c8d46523f0d71adc64f8bde4.zip
[Fix] Add fail-safety for destroying sessions
Diffstat (limited to 'src')
-rw-r--r--src/libserver/dns.c4
-rw-r--r--src/libserver/events.c15
-rw-r--r--src/libserver/events.h7
-rw-r--r--src/libstat/backends/redis_backend.c7
-rw-r--r--src/libstat/learn_cache/redis_cache.c8
-rw-r--r--src/lua/lua_http.c8
-rw-r--r--src/lua/lua_redis.c11
-rw-r--r--src/lua/lua_tcp.c17
-rw-r--r--src/plugins/fuzzy_check.c167
-rw-r--r--src/plugins/surbl.c95
10 files changed, 208 insertions, 131 deletions
diff --git a/src/libserver/dns.c b/src/libserver/dns.c
index 6d488ccf3..fbf37363a 100644
--- a/src/libserver/dns.c
+++ b/src/libserver/dns.c
@@ -122,6 +122,10 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
return FALSE;
}
+ if (session && rspamd_session_is_destroying (session)) {
+ return FALSE;
+ }
+
if (pool != NULL) {
reqdata =
rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_dns_request_ud));
diff --git a/src/libserver/events.c b/src/libserver/events.c
index 9b0d049d4..f62005b96 100644
--- a/src/libserver/events.c
+++ b/src/libserver/events.c
@@ -172,6 +172,13 @@ rspamd_session_add_event (struct rspamd_async_session *session,
g_assert_not_reached ();
}
+ if (RSPAMD_SESSION_IS_DESTROYING (session)) {
+ msg_debug_session ("skip adding event subsystem: %s: session is destroying",
+ g_quark_to_string (subsystem));
+
+ return NULL;
+ }
+
new_event = rspamd_mempool_alloc (session->pool,
sizeof (struct rspamd_async_event));
new_event->fin = fin;
@@ -502,4 +509,12 @@ rspamd_session_mempool (struct rspamd_async_session *session)
g_assert (session != NULL);
return session->pool;
+}
+
+gboolean
+rspamd_session_is_destroying (struct rspamd_async_session *session)
+{
+ g_assert (session != NULL);
+
+ return RSPAMD_SESSION_IS_DESTROYING (session);
} \ No newline at end of file
diff --git a/src/libserver/events.h b/src/libserver/events.h
index 760bb000c..10ccb8d1d 100644
--- a/src/libserver/events.h
+++ b/src/libserver/events.h
@@ -148,4 +148,11 @@ void rspamd_session_watcher_pop (struct rspamd_async_session *s,
struct rspamd_async_watcher* rspamd_session_get_watcher (
struct rspamd_async_session *s);
+/**
+ * Returns TRUE if an async session is currently destroying
+ * @param s
+ * @return
+ */
+gboolean rspamd_session_is_destroying (struct rspamd_async_session *s);
+
#endif /* RSPAMD_EVENTS_H */
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 69c14e167..5510cff05 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -1572,6 +1572,9 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
gint ret;
const gchar *learned_key = "learns";
+ if (rspamd_session_is_destroying (task->s)) {
+ return FALSE;
+ }
if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
return FALSE;
@@ -1664,6 +1667,10 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
goffset off;
const gchar *learned_key = "learns";
+ if (rspamd_session_is_destroying (task->s)) {
+ return FALSE;
+ }
+
up = rspamd_upstream_get (rt->ctx->write_servers,
RSPAMD_UPSTREAM_MASTER_SLAVE,
NULL,
diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c
index d43ec3665..e17f20d27 100644
--- a/src/libstat/learn_cache/redis_cache.c
+++ b/src/libstat/learn_cache/redis_cache.c
@@ -438,6 +438,10 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
struct timeval tv;
gchar *h;
+ if (rspamd_session_is_destroying (task->s)) {
+ return RSPAMD_LEARN_INGORE;
+ }
+
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
if (h == NULL) {
@@ -469,6 +473,10 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
gchar *h;
gint flag;
+ if (rspamd_session_is_destroying (task->s)) {
+ return RSPAMD_LEARN_INGORE;
+ }
+
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
g_assert (h != NULL);
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index b9a2d7b59..cb2026421 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -377,7 +377,7 @@ lua_http_request (lua_State *L)
struct rspamd_http_message *msg;
struct lua_http_cbdata *cbd;
struct rspamd_dns_resolver *resolver;
- struct rspamd_async_session *session;
+ struct rspamd_async_session *session = NULL;
struct rspamd_lua_text *t;
struct rspamd_task *task = NULL;
struct rspamd_config *cfg = NULL;
@@ -700,6 +700,12 @@ lua_http_request (lua_State *L)
return 1;
}
+ if (session && rspamd_session_is_destroying (session)) {
+ lua_pushboolean (L, FALSE);
+
+ return 1;
+ }
+
cbd = g_malloc0 (sizeof (*cbd));
cbd->L = L;
cbd->cbref = cbref;
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index cf416a12a..82ef0c53b 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -675,6 +675,10 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
lua_pop (L, 1); /* table */
+ if (session && rspamd_session_is_destroying (session)) {
+ ret = FALSE;
+ }
+
if (ret && addr != NULL) {
ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
@@ -1200,6 +1204,13 @@ lua_redis_add_cmd (lua_State *L)
LL_PREPEND (sp_ud->c->specific, sp_ud);
+ if (ud->s && rspamd_session_is_destroying (ud->s)) {
+ lua_pushboolean (L, 0);
+ lua_pushstring (L, "session is terminating");
+
+ return 2;
+ }
+
ret = redisAsyncCommandArgv (sp_ud->c->ctx,
lua_redis_callback,
sp_ud,
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 6c3f6ec04..797bdcc4e 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -804,7 +804,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
}
}
-static void
+
+static gboolean
lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
{
if (cbd->session) {
@@ -812,9 +813,16 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
(event_finalizer_t) lua_tcp_fin,
cbd,
g_quark_from_static_string ("lua tcp"));
+
+ if (!cbd->async_ev) {
+ return FALSE;
+ }
+
cbd->w = rspamd_session_get_watcher (cbd->session);
rspamd_session_watcher_push (cbd->session);
}
+
+ return TRUE;
}
static gboolean
@@ -1232,6 +1240,13 @@ lua_tcp_request (lua_State *L)
if (session) {
cbd->session = session;
+
+ if (rspamd_session_is_destroying (session)) {
+ REF_RELEASE (cbd);
+ lua_pushboolean (L, FALSE);
+
+ return 1;
+ }
}
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index b5c390328..c0fd8aa4c 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -2830,49 +2830,50 @@ register_fuzzy_client_call (struct rspamd_task *task,
rspamd_inet_addr_t *addr;
gint sock;
- /* Get upstream */
- selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
- NULL, 0);
- if (selected) {
- addr = rspamd_upstream_addr (selected);
- if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
- msg_warn_task ("cannot connect to %s(%s), %d, %s",
- rspamd_upstream_name (selected),
- rspamd_inet_address_to_string_pretty (addr),
- errno,
- strerror (errno));
- rspamd_upstream_fail (selected, FALSE);
- g_ptr_array_free (commands, TRUE);
- }
- else {
- /* Create session for a socket */
- session =
- rspamd_mempool_alloc0 (task->task_pool,
- sizeof (struct fuzzy_client_session));
- msec_to_tv (rule->ctx->io_timeout, &session->tv);
- session->state = 0;
- session->commands = commands;
- session->task = task;
- session->fd = sock;
- session->server = selected;
- session->rule = rule;
- session->addr = addr;
- session->results = g_ptr_array_sized_new (32);
-
- event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
- session);
- event_base_set (session->task->ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ if (!rspamd_session_is_destroying (task->s)) {
+ /* Get upstream */
+ selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL, 0);
+ if (selected) {
+ addr = rspamd_upstream_addr (selected);
+ if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
+ msg_warn_task ("cannot connect to %s(%s), %d, %s",
+ rspamd_upstream_name (selected),
+ rspamd_inet_address_to_string_pretty (addr),
+ errno,
+ strerror (errno));
+ rspamd_upstream_fail (selected, FALSE);
+ g_ptr_array_free (commands, TRUE);
+ } else {
+ /* Create session for a socket */
+ session =
+ rspamd_mempool_alloc0 (task->task_pool,
+ sizeof (struct fuzzy_client_session));
+ msec_to_tv (rule->ctx->io_timeout, &session->tv);
+ session->state = 0;
+ session->commands = commands;
+ session->task = task;
+ session->fd = sock;
+ session->server = selected;
+ session->rule = rule;
+ session->addr = addr;
+ session->results = g_ptr_array_sized_new (32);
+
+ event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
+ session);
+ event_base_set (session->task->ev_base, &session->ev);
+ event_add (&session->ev, NULL);
- evtimer_set (&session->timev, fuzzy_check_timer_callback,
- session);
- event_base_set (session->task->ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
+ evtimer_set (&session->timev, fuzzy_check_timer_callback,
+ session);
+ event_base_set (session->task->ev_base, &session->timev);
+ event_add (&session->timev, &session->tv);
- rspamd_session_add_event (task->s,
- fuzzy_io_fin,
- session,
- g_quark_from_static_string ("fuzzy check"));
+ rspamd_session_add_event (task->s,
+ fuzzy_io_fin,
+ session,
+ g_quark_from_static_string ("fuzzy check"));
+ }
}
}
}
@@ -3310,48 +3311,48 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
gint ret = -1;
/* Get upstream */
-
- while ((selected = rspamd_upstream_get (rule->servers,
- RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
- /* Create UDP socket */
- addr = rspamd_upstream_addr (selected);
-
- if ((sock = rspamd_inet_address_connect (addr,
- SOCK_DGRAM, TRUE)) == -1) {
- rspamd_upstream_fail (selected, TRUE);
- }
- else {
- s =
- rspamd_mempool_alloc0 (task->task_pool,
- sizeof (struct fuzzy_learn_session));
-
- msec_to_tv (rule->ctx->io_timeout, &s->tv);
- s->task = task;
- s->addr = addr;
- s->commands = commands;
- s->http_entry = NULL;
- s->server = selected;
- s->saved = saved;
- s->fd = sock;
- s->err = err;
- s->rule = rule;
- s->session = task->s;
-
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (task->ev_base, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
- event_base_set (s->task->ev_base, &s->timev);
- event_add (&s->timev, &s->tv);
-
- rspamd_session_add_event (task->s,
- fuzzy_lua_fin,
- s,
- g_quark_from_static_string ("fuzzy check"));
-
- (*saved)++;
- ret = 1;
+ if (!rspamd_session_is_destroying (task->s)) {
+ while ((selected = rspamd_upstream_get (rule->servers,
+ RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
+ /* Create UDP socket */
+ addr = rspamd_upstream_addr (selected);
+
+ if ((sock = rspamd_inet_address_connect (addr,
+ SOCK_DGRAM, TRUE)) == -1) {
+ rspamd_upstream_fail (selected, TRUE);
+ } else {
+ s =
+ rspamd_mempool_alloc0 (task->task_pool,
+ sizeof (struct fuzzy_learn_session));
+
+ msec_to_tv (rule->ctx->io_timeout, &s->tv);
+ s->task = task;
+ s->addr = addr;
+ s->commands = commands;
+ s->http_entry = NULL;
+ s->server = selected;
+ s->saved = saved;
+ s->fd = sock;
+ s->err = err;
+ s->rule = rule;
+ s->session = task->s;
+
+ event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
+ event_base_set (task->ev_base, &s->ev);
+ event_add (&s->ev, NULL);
+
+ evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
+ event_base_set (s->task->ev_base, &s->timev);
+ event_add (&s->timev, &s->tv);
+
+ rspamd_session_add_event (task->s,
+ fuzzy_lua_fin,
+ s,
+ g_quark_from_static_string ("fuzzy check"));
+
+ (*saved)++;
+ ret = 1;
+ }
}
}
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 81496f0a9..5b2375888 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -1638,55 +1638,58 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
struct rspamd_http_message *msg;
struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);
- selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
- RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
+ if (!rspamd_session_is_destroying (task->s)) {
- if (selected) {
- s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
- SOCK_STREAM, TRUE);
- }
+ selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
- if (s == -1) {
- msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
- task->message_id,
- strerror (errno));
- return;
- }
+ if (selected) {
+ s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
+ SOCK_STREAM, TRUE);
+ }
- param =
- rspamd_mempool_alloc (task->task_pool,
- sizeof (struct redirector_param));
- param->url = url;
- param->task = task;
- param->conn = rspamd_http_connection_new (NULL,
- surbl_redirector_error,
- surbl_redirector_finish,
- RSPAMD_HTTP_CLIENT_SIMPLE,
- RSPAMD_HTTP_CLIENT,
- NULL,
- NULL);
- param->ctx = surbl_module_ctx;
- msg = rspamd_http_new_message (HTTP_REQUEST);
- msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
- param->sock = s;
- param->redirector = selected;
- timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
- double_to_tv (surbl_module_ctx->read_timeout, timeout);
-
- rspamd_session_add_event (task->s,
- free_redirector_session,
- param,
- g_quark_from_static_string ("surbl"));
-
- rspamd_http_connection_write_message (param->conn, msg, NULL,
- NULL, param, s, timeout, task->ev_base);
-
- msg_info_surbl (
- "<%s> registered redirector call for %*s to %s, according to rule: %s",
- task->message_id,
- url->urllen, url->string,
- rspamd_upstream_name (param->redirector),
- rule);
+ if (s == -1) {
+ msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
+ task->message_id,
+ strerror (errno));
+ return;
+ }
+
+ param =
+ rspamd_mempool_alloc (task->task_pool,
+ sizeof (struct redirector_param));
+ param->url = url;
+ param->task = task;
+ param->conn = rspamd_http_connection_new (NULL,
+ surbl_redirector_error,
+ surbl_redirector_finish,
+ RSPAMD_HTTP_CLIENT_SIMPLE,
+ RSPAMD_HTTP_CLIENT,
+ NULL,
+ NULL);
+ param->ctx = surbl_module_ctx;
+ msg = rspamd_http_new_message (HTTP_REQUEST);
+ msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
+ param->sock = s;
+ param->redirector = selected;
+ timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
+ double_to_tv (surbl_module_ctx->read_timeout, timeout);
+
+ rspamd_session_add_event (task->s,
+ free_redirector_session,
+ param,
+ g_quark_from_static_string ("surbl"));
+
+ rspamd_http_connection_write_message (param->conn, msg, NULL,
+ NULL, param, s, timeout, task->ev_base);
+
+ msg_info_surbl (
+ "<%s> registered redirector call for %*s to %s, according to rule: %s",
+ task->message_id,
+ url->urllen, url->string,
+ rspamd_upstream_name (param->redirector),
+ rule);
+ }
}
static gboolean