@@ -69,6 +69,7 @@ enum rspamd_redis_connection_state { | |||
struct redis_stat_runtime { | |||
struct redis_stat_ctx *ctx; | |||
struct rspamd_task *task; | |||
struct rspamd_symcache_item *item; | |||
struct upstream *selected; | |||
struct event timeout_event; | |||
GArray *results; | |||
@@ -1196,6 +1197,10 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv) | |||
} | |||
if (rt->has_event) { | |||
if (rt->item) { | |||
rspamd_symcache_item_async_dec_check (task, rt->item); | |||
} | |||
rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); | |||
} | |||
} | |||
@@ -1228,6 +1233,10 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv) | |||
} | |||
if (rt->has_event) { | |||
if (rt->item) { | |||
rspamd_symcache_item_async_dec_check (task, rt->item); | |||
} | |||
rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt); | |||
} | |||
} | |||
@@ -1594,7 +1603,9 @@ rspamd_redis_process_tokens (struct rspamd_task *task, | |||
if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", | |||
rt->redis_object_expanded, learned_key) == REDIS_OK) { | |||
rspamd_session_add_event (task->s, NULL, rspamd_redis_fin, rt, rspamd_redis_stat_quark ()); | |||
rspamd_session_add_event (task->s, | |||
rspamd_redis_fin, rt, rspamd_redis_stat_quark ()); | |||
rt->item = rspamd_symbols_cache_get_cur_item (task); | |||
rt->has_event = TRUE; | |||
if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) { | |||
@@ -1798,7 +1809,9 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, | |||
"RSIG"); | |||
} | |||
rspamd_session_add_event (task->s, NULL, rspamd_redis_fin_learn, rt, rspamd_redis_stat_quark ()); | |||
rspamd_session_add_event (task->s, | |||
rspamd_redis_fin_learn, rt, rspamd_redis_stat_quark ()); | |||
rt->item = rspamd_symbols_cache_get_cur_item (task); | |||
rt->has_event = TRUE; | |||
/* Set timeout */ |
@@ -41,6 +41,7 @@ struct rspamd_redis_cache_ctx { | |||
struct rspamd_redis_cache_runtime { | |||
struct rspamd_redis_cache_ctx *ctx; | |||
struct rspamd_task *task; | |||
struct rspamd_symcache_item *item; | |||
struct upstream *selected; | |||
struct event timeout_event; | |||
redisAsyncContext *redis; | |||
@@ -151,6 +152,9 @@ rspamd_stat_cache_redis_get (redisAsyncContext *c, gpointer r, gpointer priv) | |||
} | |||
if (rt->has_event) { | |||
if (rt->item) { | |||
rspamd_symcache_item_async_dec_check (task, rt->item); | |||
} | |||
rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt); | |||
} | |||
} | |||
@@ -173,6 +177,9 @@ rspamd_stat_cache_redis_set (redisAsyncContext *c, gpointer r, gpointer priv) | |||
} | |||
if (rt->has_event) { | |||
if (rt->item) { | |||
rspamd_symcache_item_async_dec_check (task, rt->item); | |||
} | |||
rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt); | |||
} | |||
} | |||
@@ -453,7 +460,11 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, | |||
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt, | |||
"HGET %s %s", | |||
rt->ctx->redis_object, h) == REDIS_OK) { | |||
rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ()); | |||
rspamd_session_add_event (task->s, | |||
rspamd_redis_cache_fin, | |||
rt, | |||
rspamd_stat_cache_redis_quark ()); | |||
rt->item = rspamd_symbols_cache_get_cur_item (task); | |||
event_add (&rt->timeout_event, &tv); | |||
rt->has_event = TRUE; | |||
} | |||
@@ -485,7 +496,9 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, | |||
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt, | |||
"HSET %s %s %d", | |||
rt->ctx->redis_object, h, flag) == REDIS_OK) { | |||
rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ()); | |||
rspamd_session_add_event (task->s, | |||
rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ()); | |||
rt->item = rspamd_symbols_cache_get_cur_item (task); | |||
event_add (&rt->timeout_event, &tv); | |||
rt->has_event = TRUE; | |||
} |
@@ -86,7 +86,7 @@ struct dkim_check_result { | |||
gint res; | |||
gdouble mult_allow; | |||
gdouble mult_deny; | |||
struct rspamd_async_watcher *w; | |||
struct rspamd_symcache_item *item; | |||
struct dkim_check_result *next, *prev, *first; | |||
}; | |||
@@ -1029,7 +1029,8 @@ dkim_module_check (struct dkim_check_result *res) | |||
tracebuf); | |||
} | |||
} | |||
rspamd_session_watcher_pop (res->task->s, res->w); | |||
rspamd_symcache_item_async_dec_check (res->task, res->item); | |||
} | |||
} | |||
@@ -1147,6 +1148,7 @@ dkim_symbol_callback (struct rspamd_task *task, | |||
cur->task = task; | |||
cur->mult_allow = 1.0; | |||
cur->mult_deny = 1.0; | |||
cur->item = item; | |||
ctx = rspamd_create_dkim_context (rh->decoded, | |||
task->task_pool, | |||
@@ -1207,10 +1209,8 @@ dkim_symbol_callback (struct rspamd_task *task, | |||
res = cur; | |||
res->first = res; | |||
res->prev = res; | |||
res->w = rspamd_session_get_watcher (task->s); | |||
} | |||
else { | |||
cur->w = res->w; | |||
DL_APPEND (res, cur); | |||
} | |||
@@ -1232,7 +1232,7 @@ dkim_symbol_callback (struct rspamd_task *task, | |||
} | |||
if (res != NULL) { | |||
rspamd_session_watcher_push (task->s); | |||
rspamd_symcache_item_async_inc (task, item); | |||
dkim_module_check (res); | |||
} | |||
} |
@@ -131,6 +131,7 @@ struct fuzzy_client_session { | |||
GPtrArray *commands; | |||
GPtrArray *results; | |||
struct rspamd_task *task; | |||
struct rspamd_symcache_item *item; | |||
struct upstream *server; | |||
struct fuzzy_rule *rule; | |||
struct event ev; | |||
@@ -2106,6 +2107,7 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session) | |||
if (nreplied == session->commands->len) { | |||
fuzzy_insert_metric_results (session->task, session->results); | |||
rspamd_symcache_item_async_dec_check (session->task, session->item); | |||
rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); | |||
return TRUE; | |||
@@ -2180,6 +2182,7 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) | |||
errno, | |||
strerror (errno)); | |||
rspamd_upstream_fail (session->server, FALSE); | |||
rspamd_symcache_item_async_dec_check (session->task, session->item); | |||
rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); | |||
} | |||
else { | |||
@@ -2220,6 +2223,7 @@ fuzzy_check_timer_callback (gint fd, short what, void *arg) | |||
rspamd_upstream_addr (session->server)), | |||
session->retransmits); | |||
rspamd_upstream_fail (session->server, FALSE); | |||
rspamd_symcache_item_async_dec_check (session->task, session->item); | |||
rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); | |||
} | |||
else { | |||
@@ -2872,8 +2876,10 @@ register_fuzzy_client_call (struct rspamd_task *task, | |||
event_base_set (session->task->ev_base, &session->timev); | |||
event_add (&session->timev, &session->tv); | |||
rspamd_session_add_event (task->s, NULL, fuzzy_io_fin, session, | |||
rspamd_session_add_event (task->s, fuzzy_io_fin, session, | |||
g_quark_from_static_string ("fuzzy check")); | |||
session->item = rspamd_symbols_cache_get_cur_item (task); | |||
rspamd_symcache_item_async_inc (task, session->item); | |||
} | |||
} | |||
} | |||
@@ -3346,7 +3352,10 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, | |||
event_base_set (s->task->ev_base, &s->timev); | |||
event_add (&s->timev, &s->tv); | |||
rspamd_session_add_event (task->s, NULL, fuzzy_lua_fin, s, g_quark_from_static_string ("fuzzy check")); | |||
rspamd_session_add_event (task->s, | |||
fuzzy_lua_fin, | |||
s, | |||
g_quark_from_static_string ("fuzzy check")); | |||
(*saved)++; | |||
ret = 1; |
@@ -512,7 +512,7 @@ spf_plugin_callback (struct spf_resolved *record, struct rspamd_task *task, | |||
gpointer ud) | |||
{ | |||
struct spf_resolved *l; | |||
struct rspamd_async_watcher *w = ud; | |||
struct rspamd_symcache_item *item = (struct rspamd_symcache_item *)ud; | |||
struct spf_ctx *spf_module_ctx = spf_get_context (task->cfg); | |||
if (record && record->na) { | |||
@@ -562,7 +562,7 @@ spf_plugin_callback (struct spf_resolved *record, struct rspamd_task *task, | |||
spf_record_unref (l); | |||
} | |||
rspamd_session_watcher_pop (task->s, w); | |||
rspamd_symcache_item_async_dec_check (task, item); | |||
} | |||
@@ -573,7 +573,6 @@ spf_symbol_callback (struct rspamd_task *task, | |||
{ | |||
const gchar *domain; | |||
struct spf_resolved *l; | |||
struct rspamd_async_watcher *w; | |||
gint *dmarc_checks; | |||
struct spf_ctx *spf_module_ctx = spf_get_context (task->cfg); | |||
@@ -616,9 +615,8 @@ spf_symbol_callback (struct rspamd_task *task, | |||
spf_record_unref (l); | |||
} | |||
else { | |||
w = rspamd_session_get_watcher (task->s); | |||
if (!rspamd_spf_resolve (task, spf_plugin_callback, w)) { | |||
if (!rspamd_spf_resolve (task, spf_plugin_callback, item)) { | |||
msg_info_task ("cannot make spf request for [%s]", | |||
task->message_id); | |||
rspamd_task_insert_result (task, | |||
@@ -627,7 +625,7 @@ spf_symbol_callback (struct rspamd_task *task, | |||
"(SPF): spf DNS fail"); | |||
} | |||
else { | |||
rspamd_session_watcher_push (task->s); | |||
rspamd_symcache_item_async_inc (task, item); | |||
} | |||
} | |||
} |
@@ -1705,7 +1705,10 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task, | |||
timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval)); | |||
double_to_tv (surbl_module_ctx->read_timeout, timeout); | |||
rspamd_session_add_event (task->s, NULL, free_redirector_session, param, g_quark_from_static_string ("surbl")); | |||
rspamd_session_add_event (task->s, | |||
free_redirector_session, param, | |||
g_quark_from_static_string ("surbl")); | |||
param->item = rspamd_symbols_cache_get_cur_item (task); | |||
rspamd_http_connection_write_message (param->conn, msg, NULL, | |||
NULL, param, s, timeout, task->ev_base); |