diff options
-rw-r--r-- | src/libserver/symbols_cache.c | 23 | ||||
-rw-r--r-- | src/libserver/symbols_cache.h | 18 | ||||
-rw-r--r-- | src/plugins/surbl.c | 28 |
3 files changed, 49 insertions, 20 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index b5db5c7f9..216f8aabf 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -2641,7 +2641,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task, g_assert (checkpoint->items_inflight > 0); g_assert (item->async_events == 0); - msg_debug_cache_task ("process post" + msg_debug_cache_task ("process finalize for item %s", item->symbol); setbit (checkpoint->processed_bits, item->id + 1); checkpoint->items_inflight --; @@ -2679,8 +2679,8 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task, } guint -rspamd_symcahe_item_async_inc (struct rspamd_task *task, - struct rspamd_symcache_item *item) +rspamd_symcache_item_async_inc (struct rspamd_task *task, + struct rspamd_symcache_item *item) { msg_debug_cache_task ("increase async events counter for %s = %d + 1", item->symbol, item->async_events); @@ -2688,12 +2688,25 @@ rspamd_symcahe_item_async_inc (struct rspamd_task *task, } guint -rspamd_symcahe_item_async_dec (struct rspamd_task *task, - struct rspamd_symcache_item *item) +rspamd_symcache_item_async_dec (struct rspamd_task *task, + struct rspamd_symcache_item *item) { msg_debug_cache_task ("decrease async events counter for %s = %d - 1", item->symbol, item->async_events); g_assert (item->async_events > 0); return --item->async_events; +} + +gboolean +rspamd_symcache_item_async_dec_check (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + if (rspamd_symcache_item_async_dec (task, item) == 0) { + rspamd_symbols_cache_finalize_item (task, item); + + return TRUE; + } + + return FALSE; }
\ No newline at end of file diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h index adf7d2b26..5845840ab 100644 --- a/src/libserver/symbols_cache.h +++ b/src/libserver/symbols_cache.h @@ -335,11 +335,21 @@ void rspamd_symbols_cache_finalize_item (struct rspamd_task *task, /* * Increase number of async events pending for an item */ -guint rspamd_symcahe_item_async_inc (struct rspamd_task *task, - struct rspamd_symcache_item *item); +guint rspamd_symcache_item_async_inc (struct rspamd_task *task, + struct rspamd_symcache_item *item); /* * Decrease number of async events pending for an item, asserts if no events pending */ -guint rspamd_symcahe_item_async_dec (struct rspamd_task *task, - struct rspamd_symcache_item *item); +guint rspamd_symcache_item_async_dec (struct rspamd_task *task, + struct rspamd_symcache_item *item); + +/** + * Decrease number of async events pending for an item, asserts if no events pending + * If no events are left, this function calls `rspamd_symbols_cache_finalize_item` and returns TRUE + * @param task + * @param item + * @return + */ +gboolean rspamd_symcache_item_async_dec_check (struct rspamd_task *task, + struct rspamd_symcache_item *item); #endif diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 88c6a0823..2c9acde31 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -1381,6 +1381,7 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task, surbl_dns_ip_callback, (void *) param, RDNS_REQUEST_A, surbl_req)) { param->item = item; + rspamd_symcache_item_async_inc (task, item); } } } @@ -1406,8 +1407,8 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task, if (make_dns_request_task (task, surbl_dns_callback, (void *) param, RDNS_REQUEST_A, surbl_req)) { - param->w = rspamd_session_get_watcher (task->s); - rspamd_session_watcher_push (task->s); + param->item = item; + rspamd_symcache_item_async_inc (task, item); } } else if (err != NULL) { @@ -1512,7 +1513,7 @@ surbl_dns_callback (struct rdns_reply *reply, gpointer arg) param->suffix->suffix); } - rspamd_session_watcher_pop (param->task->s, param->w); + rspamd_symcache_item_async_dec_check (param->task, param->item); } static void @@ -1551,7 +1552,7 @@ surbl_dns_ip_callback (struct rdns_reply *reply, gpointer arg) if (make_dns_request_task (task, surbl_dns_callback, param, RDNS_REQUEST_A, to_resolve->str)) { - rspamd_session_watcher_push_specific (task->s, param->w); + rspamd_symcache_item_async_inc (param->task, param->item); } g_string_free (to_resolve, TRUE); @@ -1565,7 +1566,7 @@ surbl_dns_ip_callback (struct rdns_reply *reply, gpointer arg) } - rspamd_session_watcher_pop (param->task->s, param->w); + rspamd_symcache_item_async_dec_check (param->task, param->item); } static void @@ -1823,6 +1824,7 @@ surbl_tree_redirector_callback (gpointer key, gpointer value, void *data) *purl = url; rspamd_lua_setclass (L, "rspamd{url}", -1); lua_pushlightuserdata (L, nparam); + rspamd_symbols_cache_set_cur_item (task, param->item); if (lua_pcall (L, 3, 0, 0) != 0) { msg_err_task ("cannot call for redirector script: %s", @@ -1830,8 +1832,7 @@ surbl_tree_redirector_callback (gpointer key, gpointer value, void *data) lua_pop (L, 1); } else { - nparam->w = rspamd_session_get_watcher (task->s); - rspamd_session_watcher_push (task->s); + nparam->item = param->item; } } else { @@ -1869,7 +1870,7 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data) return; } - make_surbl_requests (url, param->task, param->suffix, FALSE, + make_surbl_requests (url, param->task, param->item, param->suffix, FALSE, param->tree, surbl_module_ctx); } @@ -1897,6 +1898,8 @@ surbl_test_url (struct rspamd_task *task, param->suffix = suffix; param->tree = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); param->ctx = surbl_module_ctx; + param->item = item; + rspamd_symcache_item_async_inc (task, item); rspamd_mempool_add_destructor (task->task_pool, (rspamd_mempool_destruct_t)g_hash_table_unref, param->tree); @@ -1946,6 +1949,8 @@ surbl_test_url (struct rspamd_task *task, } } } + + rspamd_symcache_item_async_dec_check (task, item); } static void @@ -1969,6 +1974,8 @@ surbl_test_redirector (struct rspamd_task *task, param->suffix = NULL; param->redirector_requests = 0; param->ctx = surbl_module_ctx; + param->item = item; + rspamd_symcache_item_async_inc (task, item); g_hash_table_foreach (task->urls, surbl_tree_redirector_callback, param); /* We also need to check and process img URLs */ @@ -1992,6 +1999,8 @@ surbl_test_redirector (struct rspamd_task *task, } } } + + rspamd_symcache_item_async_dec_check (task, item); } @@ -2147,9 +2156,6 @@ surbl_continue_process_handler (lua_State *L) param->task->message_id, param->url->urllen, param->url->string); } - - rspamd_session_watcher_pop (task->s, param->w); - param->w = NULL; } else { return luaL_error (L, "invalid arguments"); |