Browse Source

[Fix] Add fail-safety for destroying sessions

tags/1.8.0
Vsevolod Stakhov 5 years ago
parent
commit
1af89c3566

+ 4
- 0
src/libserver/dns.c View File

@@ -122,6 +122,10 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
return FALSE;
}

if (session && rspamd_session_is_destroying (session)) {
return FALSE;
}

if (pool != NULL) {
reqdata =
rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_dns_request_ud));

+ 15
- 0
src/libserver/events.c View File

@@ -172,6 +172,13 @@ rspamd_session_add_event (struct rspamd_async_session *session,
g_assert_not_reached ();
}

if (RSPAMD_SESSION_IS_DESTROYING (session)) {
msg_debug_session ("skip adding event subsystem: %s: session is destroying",
g_quark_to_string (subsystem));

return NULL;
}

new_event = rspamd_mempool_alloc (session->pool,
sizeof (struct rspamd_async_event));
new_event->fin = fin;
@@ -502,4 +509,12 @@ rspamd_session_mempool (struct rspamd_async_session *session)
g_assert (session != NULL);

return session->pool;
}

gboolean
rspamd_session_is_destroying (struct rspamd_async_session *session)
{
g_assert (session != NULL);

return RSPAMD_SESSION_IS_DESTROYING (session);
}

+ 7
- 0
src/libserver/events.h View File

@@ -148,4 +148,11 @@ void rspamd_session_watcher_pop (struct rspamd_async_session *s,
struct rspamd_async_watcher* rspamd_session_get_watcher (
struct rspamd_async_session *s);

/**
* Returns TRUE if an async session is currently destroying
* @param s
* @return
*/
gboolean rspamd_session_is_destroying (struct rspamd_async_session *s);

#endif /* RSPAMD_EVENTS_H */

+ 7
- 0
src/libstat/backends/redis_backend.c View File

@@ -1572,6 +1572,9 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
gint ret;
const gchar *learned_key = "learns";

if (rspamd_session_is_destroying (task->s)) {
return FALSE;
}

if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
return FALSE;
@@ -1664,6 +1667,10 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
goffset off;
const gchar *learned_key = "learns";

if (rspamd_session_is_destroying (task->s)) {
return FALSE;
}

up = rspamd_upstream_get (rt->ctx->write_servers,
RSPAMD_UPSTREAM_MASTER_SLAVE,
NULL,

+ 8
- 0
src/libstat/learn_cache/redis_cache.c View File

@@ -438,6 +438,10 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
struct timeval tv;
gchar *h;

if (rspamd_session_is_destroying (task->s)) {
return RSPAMD_LEARN_INGORE;
}

h = rspamd_mempool_get_variable (task->task_pool, "words_hash");

if (h == NULL) {
@@ -469,6 +473,10 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
gchar *h;
gint flag;

if (rspamd_session_is_destroying (task->s)) {
return RSPAMD_LEARN_INGORE;
}

h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
g_assert (h != NULL);


+ 7
- 1
src/lua/lua_http.c View File

@@ -377,7 +377,7 @@ lua_http_request (lua_State *L)
struct rspamd_http_message *msg;
struct lua_http_cbdata *cbd;
struct rspamd_dns_resolver *resolver;
struct rspamd_async_session *session;
struct rspamd_async_session *session = NULL;
struct rspamd_lua_text *t;
struct rspamd_task *task = NULL;
struct rspamd_config *cfg = NULL;
@@ -700,6 +700,12 @@ lua_http_request (lua_State *L)
return 1;
}

if (session && rspamd_session_is_destroying (session)) {
lua_pushboolean (L, FALSE);

return 1;
}

cbd = g_malloc0 (sizeof (*cbd));
cbd->L = L;
cbd->cbref = cbref;

+ 11
- 0
src/lua/lua_redis.c View File

@@ -675,6 +675,10 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)

lua_pop (L, 1); /* table */

if (session && rspamd_session_is_destroying (session)) {
ret = FALSE;
}


if (ret && addr != NULL) {
ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
@@ -1200,6 +1204,13 @@ lua_redis_add_cmd (lua_State *L)

LL_PREPEND (sp_ud->c->specific, sp_ud);

if (ud->s && rspamd_session_is_destroying (ud->s)) {
lua_pushboolean (L, 0);
lua_pushstring (L, "session is terminating");

return 2;
}

ret = redisAsyncCommandArgv (sp_ud->c->ctx,
lua_redis_callback,
sp_ud,

+ 16
- 1
src/lua/lua_tcp.c View File

@@ -804,7 +804,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
}
}

static void

static gboolean
lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
{
if (cbd->session) {
@@ -812,9 +813,16 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
(event_finalizer_t) lua_tcp_fin,
cbd,
g_quark_from_static_string ("lua tcp"));

if (!cbd->async_ev) {
return FALSE;
}

cbd->w = rspamd_session_get_watcher (cbd->session);
rspamd_session_watcher_push (cbd->session);
}

return TRUE;
}

static gboolean
@@ -1232,6 +1240,13 @@ lua_tcp_request (lua_State *L)

if (session) {
cbd->session = session;

if (rspamd_session_is_destroying (session)) {
REF_RELEASE (cbd);
lua_pushboolean (L, FALSE);

return 1;
}
}

if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {

+ 84
- 83
src/plugins/fuzzy_check.c View File

@@ -2830,49 +2830,50 @@ register_fuzzy_client_call (struct rspamd_task *task,
rspamd_inet_addr_t *addr;
gint sock;

/* Get upstream */
selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL, 0);
if (selected) {
addr = rspamd_upstream_addr (selected);
if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
msg_warn_task ("cannot connect to %s(%s), %d, %s",
rspamd_upstream_name (selected),
rspamd_inet_address_to_string_pretty (addr),
errno,
strerror (errno));
rspamd_upstream_fail (selected, FALSE);
g_ptr_array_free (commands, TRUE);
}
else {
/* Create session for a socket */
session =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_client_session));
msec_to_tv (rule->ctx->io_timeout, &session->tv);
session->state = 0;
session->commands = commands;
session->task = task;
session->fd = sock;
session->server = selected;
session->rule = rule;
session->addr = addr;
session->results = g_ptr_array_sized_new (32);
event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
session);
event_base_set (session->task->ev_base, &session->ev);
event_add (&session->ev, NULL);
if (!rspamd_session_is_destroying (task->s)) {
/* Get upstream */
selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL, 0);
if (selected) {
addr = rspamd_upstream_addr (selected);
if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
msg_warn_task ("cannot connect to %s(%s), %d, %s",
rspamd_upstream_name (selected),
rspamd_inet_address_to_string_pretty (addr),
errno,
strerror (errno));
rspamd_upstream_fail (selected, FALSE);
g_ptr_array_free (commands, TRUE);
} else {
/* Create session for a socket */
session =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_client_session));
msec_to_tv (rule->ctx->io_timeout, &session->tv);
session->state = 0;
session->commands = commands;
session->task = task;
session->fd = sock;
session->server = selected;
session->rule = rule;
session->addr = addr;
session->results = g_ptr_array_sized_new (32);
event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
session);
event_base_set (session->task->ev_base, &session->ev);
event_add (&session->ev, NULL);

evtimer_set (&session->timev, fuzzy_check_timer_callback,
session);
event_base_set (session->task->ev_base, &session->timev);
event_add (&session->timev, &session->tv);
evtimer_set (&session->timev, fuzzy_check_timer_callback,
session);
event_base_set (session->task->ev_base, &session->timev);
event_add (&session->timev, &session->tv);

rspamd_session_add_event (task->s,
fuzzy_io_fin,
session,
g_quark_from_static_string ("fuzzy check"));
rspamd_session_add_event (task->s,
fuzzy_io_fin,
session,
g_quark_from_static_string ("fuzzy check"));
}
}
}
}
@@ -3310,48 +3311,48 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
gint ret = -1;

/* Get upstream */
while ((selected = rspamd_upstream_get (rule->servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
addr = rspamd_upstream_addr (selected);
if ((sock = rspamd_inet_address_connect (addr,
SOCK_DGRAM, TRUE)) == -1) {
rspamd_upstream_fail (selected, TRUE);
}
else {
s =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_learn_session));
msec_to_tv (rule->ctx->io_timeout, &s->tv);
s->task = task;
s->addr = addr;
s->commands = commands;
s->http_entry = NULL;
s->server = selected;
s->saved = saved;
s->fd = sock;
s->err = err;
s->rule = rule;
s->session = task->s;
event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
event_base_set (task->ev_base, &s->ev);
event_add (&s->ev, NULL);
evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
event_base_set (s->task->ev_base, &s->timev);
event_add (&s->timev, &s->tv);
rspamd_session_add_event (task->s,
fuzzy_lua_fin,
s,
g_quark_from_static_string ("fuzzy check"));
(*saved)++;
ret = 1;
if (!rspamd_session_is_destroying (task->s)) {
while ((selected = rspamd_upstream_get (rule->servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
addr = rspamd_upstream_addr (selected);
if ((sock = rspamd_inet_address_connect (addr,
SOCK_DGRAM, TRUE)) == -1) {
rspamd_upstream_fail (selected, TRUE);
} else {
s =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_learn_session));
msec_to_tv (rule->ctx->io_timeout, &s->tv);
s->task = task;
s->addr = addr;
s->commands = commands;
s->http_entry = NULL;
s->server = selected;
s->saved = saved;
s->fd = sock;
s->err = err;
s->rule = rule;
s->session = task->s;
event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
event_base_set (task->ev_base, &s->ev);
event_add (&s->ev, NULL);
evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
event_base_set (s->task->ev_base, &s->timev);
event_add (&s->timev, &s->tv);
rspamd_session_add_event (task->s,
fuzzy_lua_fin,
s,
g_quark_from_static_string ("fuzzy check"));
(*saved)++;
ret = 1;
}
}
}


+ 49
- 46
src/plugins/surbl.c View File

@@ -1638,55 +1638,58 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
struct rspamd_http_message *msg;
struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);

selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
if (!rspamd_session_is_destroying (task->s)) {

if (selected) {
s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
SOCK_STREAM, TRUE);
}
selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);

if (s == -1) {
msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
task->message_id,
strerror (errno));
return;
}
if (selected) {
s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
SOCK_STREAM, TRUE);
}

param =
rspamd_mempool_alloc (task->task_pool,
sizeof (struct redirector_param));
param->url = url;
param->task = task;
param->conn = rspamd_http_connection_new (NULL,
surbl_redirector_error,
surbl_redirector_finish,
RSPAMD_HTTP_CLIENT_SIMPLE,
RSPAMD_HTTP_CLIENT,
NULL,
NULL);
param->ctx = surbl_module_ctx;
msg = rspamd_http_new_message (HTTP_REQUEST);
msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
param->sock = s;
param->redirector = selected;
timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
double_to_tv (surbl_module_ctx->read_timeout, timeout);

rspamd_session_add_event (task->s,
free_redirector_session,
param,
g_quark_from_static_string ("surbl"));

rspamd_http_connection_write_message (param->conn, msg, NULL,
NULL, param, s, timeout, task->ev_base);

msg_info_surbl (
"<%s> registered redirector call for %*s to %s, according to rule: %s",
task->message_id,
url->urllen, url->string,
rspamd_upstream_name (param->redirector),
rule);
if (s == -1) {
msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
task->message_id,
strerror (errno));
return;
}

param =
rspamd_mempool_alloc (task->task_pool,
sizeof (struct redirector_param));
param->url = url;
param->task = task;
param->conn = rspamd_http_connection_new (NULL,
surbl_redirector_error,
surbl_redirector_finish,
RSPAMD_HTTP_CLIENT_SIMPLE,
RSPAMD_HTTP_CLIENT,
NULL,
NULL);
param->ctx = surbl_module_ctx;
msg = rspamd_http_new_message (HTTP_REQUEST);
msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
param->sock = s;
param->redirector = selected;
timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
double_to_tv (surbl_module_ctx->read_timeout, timeout);

rspamd_session_add_event (task->s,
free_redirector_session,
param,
g_quark_from_static_string ("surbl"));

rspamd_http_connection_write_message (param->conn, msg, NULL,
NULL, param, s, timeout, task->ev_base);

msg_info_surbl (
"<%s> registered redirector call for %*s to %s, according to rule: %s",
task->message_id,
url->urllen, url->string,
rspamd_upstream_name (param->redirector),
rule);
}
}

static gboolean

Loading…
Cancel
Save