return FALSE;
}
+ if (session && rspamd_session_is_destroying (session)) {
+ return FALSE;
+ }
+
if (pool != NULL) {
reqdata =
rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_dns_request_ud));
g_assert_not_reached ();
}
+ if (RSPAMD_SESSION_IS_DESTROYING (session)) {
+ msg_debug_session ("skip adding event subsystem: %s: session is destroying",
+ g_quark_to_string (subsystem));
+
+ return NULL;
+ }
+
new_event = rspamd_mempool_alloc (session->pool,
sizeof (struct rspamd_async_event));
new_event->fin = fin;
g_assert (session != NULL);
return session->pool;
+}
+
+gboolean
+rspamd_session_is_destroying (struct rspamd_async_session *session)
+{
+ g_assert (session != NULL);
+
+ return RSPAMD_SESSION_IS_DESTROYING (session);
}
\ No newline at end of file
struct rspamd_async_watcher* rspamd_session_get_watcher (
struct rspamd_async_session *s);
+/**
+ * Returns TRUE if an async session is currently destroying
+ * @param s
+ * @return
+ */
+gboolean rspamd_session_is_destroying (struct rspamd_async_session *s);
+
#endif /* RSPAMD_EVENTS_H */
gint ret;
const gchar *learned_key = "learns";
+ if (rspamd_session_is_destroying (task->s)) {
+ return FALSE;
+ }
if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
return FALSE;
goffset off;
const gchar *learned_key = "learns";
+ if (rspamd_session_is_destroying (task->s)) {
+ return FALSE;
+ }
+
up = rspamd_upstream_get (rt->ctx->write_servers,
RSPAMD_UPSTREAM_MASTER_SLAVE,
NULL,
struct timeval tv;
gchar *h;
+ if (rspamd_session_is_destroying (task->s)) {
+ return RSPAMD_LEARN_INGORE;
+ }
+
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
if (h == NULL) {
gchar *h;
gint flag;
+ if (rspamd_session_is_destroying (task->s)) {
+ return RSPAMD_LEARN_INGORE;
+ }
+
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
g_assert (h != NULL);
struct rspamd_http_message *msg;
struct lua_http_cbdata *cbd;
struct rspamd_dns_resolver *resolver;
- struct rspamd_async_session *session;
+ struct rspamd_async_session *session = NULL;
struct rspamd_lua_text *t;
struct rspamd_task *task = NULL;
struct rspamd_config *cfg = NULL;
return 1;
}
+ if (session && rspamd_session_is_destroying (session)) {
+ lua_pushboolean (L, FALSE);
+
+ return 1;
+ }
+
cbd = g_malloc0 (sizeof (*cbd));
cbd->L = L;
cbd->cbref = cbref;
lua_pop (L, 1); /* table */
+ if (session && rspamd_session_is_destroying (session)) {
+ ret = FALSE;
+ }
+
if (ret && addr != NULL) {
ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
LL_PREPEND (sp_ud->c->specific, sp_ud);
+ if (ud->s && rspamd_session_is_destroying (ud->s)) {
+ lua_pushboolean (L, 0);
+ lua_pushstring (L, "session is terminating");
+
+ return 2;
+ }
+
ret = redisAsyncCommandArgv (sp_ud->c->ctx,
lua_redis_callback,
sp_ud,
}
}
-static void
+
+static gboolean
lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
{
if (cbd->session) {
(event_finalizer_t) lua_tcp_fin,
cbd,
g_quark_from_static_string ("lua tcp"));
+
+ if (!cbd->async_ev) {
+ return FALSE;
+ }
+
cbd->w = rspamd_session_get_watcher (cbd->session);
rspamd_session_watcher_push (cbd->session);
}
+
+ return TRUE;
}
static gboolean
if (session) {
cbd->session = session;
+
+ if (rspamd_session_is_destroying (session)) {
+ REF_RELEASE (cbd);
+ lua_pushboolean (L, FALSE);
+
+ return 1;
+ }
}
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
rspamd_inet_addr_t *addr;
gint sock;
- /* Get upstream */
- selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
- NULL, 0);
- if (selected) {
- addr = rspamd_upstream_addr (selected);
- if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
- msg_warn_task ("cannot connect to %s(%s), %d, %s",
- rspamd_upstream_name (selected),
- rspamd_inet_address_to_string_pretty (addr),
- errno,
- strerror (errno));
- rspamd_upstream_fail (selected, FALSE);
- g_ptr_array_free (commands, TRUE);
- }
- else {
- /* Create session for a socket */
- session =
- rspamd_mempool_alloc0 (task->task_pool,
- sizeof (struct fuzzy_client_session));
- msec_to_tv (rule->ctx->io_timeout, &session->tv);
- session->state = 0;
- session->commands = commands;
- session->task = task;
- session->fd = sock;
- session->server = selected;
- session->rule = rule;
- session->addr = addr;
- session->results = g_ptr_array_sized_new (32);
-
- event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
- session);
- event_base_set (session->task->ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ if (!rspamd_session_is_destroying (task->s)) {
+ /* Get upstream */
+ selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL, 0);
+ if (selected) {
+ addr = rspamd_upstream_addr (selected);
+ if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
+ msg_warn_task ("cannot connect to %s(%s), %d, %s",
+ rspamd_upstream_name (selected),
+ rspamd_inet_address_to_string_pretty (addr),
+ errno,
+ strerror (errno));
+ rspamd_upstream_fail (selected, FALSE);
+ g_ptr_array_free (commands, TRUE);
+ } else {
+ /* Create session for a socket */
+ session =
+ rspamd_mempool_alloc0 (task->task_pool,
+ sizeof (struct fuzzy_client_session));
+ msec_to_tv (rule->ctx->io_timeout, &session->tv);
+ session->state = 0;
+ session->commands = commands;
+ session->task = task;
+ session->fd = sock;
+ session->server = selected;
+ session->rule = rule;
+ session->addr = addr;
+ session->results = g_ptr_array_sized_new (32);
+
+ event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
+ session);
+ event_base_set (session->task->ev_base, &session->ev);
+ event_add (&session->ev, NULL);
- evtimer_set (&session->timev, fuzzy_check_timer_callback,
- session);
- event_base_set (session->task->ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
+ evtimer_set (&session->timev, fuzzy_check_timer_callback,
+ session);
+ event_base_set (session->task->ev_base, &session->timev);
+ event_add (&session->timev, &session->tv);
- rspamd_session_add_event (task->s,
- fuzzy_io_fin,
- session,
- g_quark_from_static_string ("fuzzy check"));
+ rspamd_session_add_event (task->s,
+ fuzzy_io_fin,
+ session,
+ g_quark_from_static_string ("fuzzy check"));
+ }
}
}
}
gint ret = -1;
/* Get upstream */
-
- while ((selected = rspamd_upstream_get (rule->servers,
- RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
- /* Create UDP socket */
- addr = rspamd_upstream_addr (selected);
-
- if ((sock = rspamd_inet_address_connect (addr,
- SOCK_DGRAM, TRUE)) == -1) {
- rspamd_upstream_fail (selected, TRUE);
- }
- else {
- s =
- rspamd_mempool_alloc0 (task->task_pool,
- sizeof (struct fuzzy_learn_session));
-
- msec_to_tv (rule->ctx->io_timeout, &s->tv);
- s->task = task;
- s->addr = addr;
- s->commands = commands;
- s->http_entry = NULL;
- s->server = selected;
- s->saved = saved;
- s->fd = sock;
- s->err = err;
- s->rule = rule;
- s->session = task->s;
-
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (task->ev_base, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
- event_base_set (s->task->ev_base, &s->timev);
- event_add (&s->timev, &s->tv);
-
- rspamd_session_add_event (task->s,
- fuzzy_lua_fin,
- s,
- g_quark_from_static_string ("fuzzy check"));
-
- (*saved)++;
- ret = 1;
+ if (!rspamd_session_is_destroying (task->s)) {
+ while ((selected = rspamd_upstream_get (rule->servers,
+ RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
+ /* Create UDP socket */
+ addr = rspamd_upstream_addr (selected);
+
+ if ((sock = rspamd_inet_address_connect (addr,
+ SOCK_DGRAM, TRUE)) == -1) {
+ rspamd_upstream_fail (selected, TRUE);
+ } else {
+ s =
+ rspamd_mempool_alloc0 (task->task_pool,
+ sizeof (struct fuzzy_learn_session));
+
+ msec_to_tv (rule->ctx->io_timeout, &s->tv);
+ s->task = task;
+ s->addr = addr;
+ s->commands = commands;
+ s->http_entry = NULL;
+ s->server = selected;
+ s->saved = saved;
+ s->fd = sock;
+ s->err = err;
+ s->rule = rule;
+ s->session = task->s;
+
+ event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
+ event_base_set (task->ev_base, &s->ev);
+ event_add (&s->ev, NULL);
+
+ evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
+ event_base_set (s->task->ev_base, &s->timev);
+ event_add (&s->timev, &s->tv);
+
+ rspamd_session_add_event (task->s,
+ fuzzy_lua_fin,
+ s,
+ g_quark_from_static_string ("fuzzy check"));
+
+ (*saved)++;
+ ret = 1;
+ }
}
}
struct rspamd_http_message *msg;
struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);
- selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
- RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
+ if (!rspamd_session_is_destroying (task->s)) {
- if (selected) {
- s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
- SOCK_STREAM, TRUE);
- }
+ selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
- if (s == -1) {
- msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
- task->message_id,
- strerror (errno));
- return;
- }
+ if (selected) {
+ s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
+ SOCK_STREAM, TRUE);
+ }
- param =
- rspamd_mempool_alloc (task->task_pool,
- sizeof (struct redirector_param));
- param->url = url;
- param->task = task;
- param->conn = rspamd_http_connection_new (NULL,
- surbl_redirector_error,
- surbl_redirector_finish,
- RSPAMD_HTTP_CLIENT_SIMPLE,
- RSPAMD_HTTP_CLIENT,
- NULL,
- NULL);
- param->ctx = surbl_module_ctx;
- msg = rspamd_http_new_message (HTTP_REQUEST);
- msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
- param->sock = s;
- param->redirector = selected;
- timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
- double_to_tv (surbl_module_ctx->read_timeout, timeout);
-
- rspamd_session_add_event (task->s,
- free_redirector_session,
- param,
- g_quark_from_static_string ("surbl"));
-
- rspamd_http_connection_write_message (param->conn, msg, NULL,
- NULL, param, s, timeout, task->ev_base);
-
- msg_info_surbl (
- "<%s> registered redirector call for %*s to %s, according to rule: %s",
- task->message_id,
- url->urllen, url->string,
- rspamd_upstream_name (param->redirector),
- rule);
+ if (s == -1) {
+ msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
+ task->message_id,
+ strerror (errno));
+ return;
+ }
+
+ param =
+ rspamd_mempool_alloc (task->task_pool,
+ sizeof (struct redirector_param));
+ param->url = url;
+ param->task = task;
+ param->conn = rspamd_http_connection_new (NULL,
+ surbl_redirector_error,
+ surbl_redirector_finish,
+ RSPAMD_HTTP_CLIENT_SIMPLE,
+ RSPAMD_HTTP_CLIENT,
+ NULL,
+ NULL);
+ param->ctx = surbl_module_ctx;
+ msg = rspamd_http_new_message (HTTP_REQUEST);
+ msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
+ param->sock = s;
+ param->redirector = selected;
+ timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
+ double_to_tv (surbl_module_ctx->read_timeout, timeout);
+
+ rspamd_session_add_event (task->s,
+ free_redirector_session,
+ param,
+ g_quark_from_static_string ("surbl"));
+
+ rspamd_http_connection_write_message (param->conn, msg, NULL,
+ NULL, param, s, timeout, task->ev_base);
+
+ msg_info_surbl (
+ "<%s> registered redirector call for %*s to %s, according to rule: %s",
+ task->message_id,
+ url->urllen, url->string,
+ rspamd_upstream_name (param->redirector),
+ rule);
+ }
}
static gboolean