]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add logic of the async events counter
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 13:29:46 +0000 (14:29 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 18:43:32 +0000 (19:43 +0100)
src/libserver/symbols_cache.c
src/libserver/symbols_cache.h
src/plugins/surbl.c

index b5db5c7f9658305e1b45bb590d668961a1d1db4d..216f8aabf0abfd0b6f1be0f387f96e105850c6a6 100644 (file)
@@ -2641,7 +2641,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
        g_assert (checkpoint->items_inflight > 0);
        g_assert (item->async_events == 0);
 
-       msg_debug_cache_task ("process post"
+       msg_debug_cache_task ("process finalize for item %s", item->symbol);
        setbit (checkpoint->processed_bits, item->id + 1);
        checkpoint->items_inflight --;
 
@@ -2679,8 +2679,8 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
 }
 
 guint
-rspamd_symcahe_item_async_inc (struct rspamd_task *task,
-               struct rspamd_symcache_item *item)
+rspamd_symcache_item_async_inc (struct rspamd_task *task,
+                                                               struct rspamd_symcache_item *item)
 {
        msg_debug_cache_task ("increase async events counter for %s = %d + 1",
                        item->symbol, item->async_events);
@@ -2688,12 +2688,25 @@ rspamd_symcahe_item_async_inc (struct rspamd_task *task,
 }
 
 guint
-rspamd_symcahe_item_async_dec (struct rspamd_task *task,
-               struct rspamd_symcache_item *item)
+rspamd_symcache_item_async_dec (struct rspamd_task *task,
+                                                               struct rspamd_symcache_item *item)
 {
        msg_debug_cache_task ("decrease async events counter for %s = %d - 1",
                        item->symbol, item->async_events);
        g_assert (item->async_events > 0);
 
        return --item->async_events;
+}
+
+gboolean
+rspamd_symcache_item_async_dec_check (struct rspamd_task *task,
+                                                                         struct rspamd_symcache_item *item)
+{
+       if (rspamd_symcache_item_async_dec (task, item) == 0) {
+               rspamd_symbols_cache_finalize_item (task, item);
+
+               return TRUE;
+       }
+
+       return FALSE;
 }
\ No newline at end of file
index adf7d2b26983af48e1c0d51f03ea05e83e41febb..5845840ab26ed1f16c41b394d5c35411b869f79b 100644 (file)
@@ -335,11 +335,21 @@ void rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
 /*
  * Increase number of async events pending for an item
  */
-guint rspamd_symcahe_item_async_inc (struct rspamd_task *task,
-               struct rspamd_symcache_item *item);
+guint rspamd_symcache_item_async_inc (struct rspamd_task *task,
+                                                                         struct rspamd_symcache_item *item);
 /*
  * Decrease number of async events pending for an item, asserts if no events pending
  */
-guint rspamd_symcahe_item_async_dec (struct rspamd_task *task,
-               struct rspamd_symcache_item *item);
+guint rspamd_symcache_item_async_dec (struct rspamd_task *task,
+                                                                         struct rspamd_symcache_item *item);
+
+/**
+ * Decrease number of async events pending for an item, asserts if no events pending
+ * If no events are left, this function calls `rspamd_symbols_cache_finalize_item` and returns TRUE
+ * @param task
+ * @param item
+ * @return
+ */
+gboolean rspamd_symcache_item_async_dec_check (struct rspamd_task *task,
+                                                                                          struct rspamd_symcache_item *item);
 #endif
index 88c6a0823f2797806022203f1407a3cf967fbe09..2c9acde31484993f475eb3f16a95477a1b14ce49 100644 (file)
@@ -1381,6 +1381,7 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task,
                                        surbl_dns_ip_callback,
                                        (void *) param, RDNS_REQUEST_A, surbl_req)) {
                                param->item = item;
+                               rspamd_symcache_item_async_inc (task, item);
                        }
                }
        }
@@ -1406,8 +1407,8 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task,
                if (make_dns_request_task (task,
                                surbl_dns_callback,
                                (void *) param, RDNS_REQUEST_A, surbl_req)) {
-                       param->w = rspamd_session_get_watcher (task->s);
-                       rspamd_session_watcher_push (task->s);
+                       param->item = item;
+                       rspamd_symcache_item_async_inc (task, item);
                }
        }
        else if (err != NULL) {
@@ -1512,7 +1513,7 @@ surbl_dns_callback (struct rdns_reply *reply, gpointer arg)
                        param->suffix->suffix);
        }
 
-       rspamd_session_watcher_pop (param->task->s, param->w);
+       rspamd_symcache_item_async_dec_check (param->task, param->item);
 }
 
 static void
@@ -1551,7 +1552,7 @@ surbl_dns_ip_callback (struct rdns_reply *reply, gpointer arg)
                                if (make_dns_request_task (task,
                                                surbl_dns_callback,
                                                param, RDNS_REQUEST_A, to_resolve->str)) {
-                                       rspamd_session_watcher_push_specific (task->s, param->w);
+                                       rspamd_symcache_item_async_inc (param->task, param->item);
                                }
 
                                g_string_free (to_resolve, TRUE);
@@ -1565,7 +1566,7 @@ surbl_dns_ip_callback (struct rdns_reply *reply, gpointer arg)
 
        }
 
-       rspamd_session_watcher_pop (param->task->s, param->w);
+       rspamd_symcache_item_async_dec_check (param->task, param->item);
 }
 
 static void
@@ -1823,6 +1824,7 @@ surbl_tree_redirector_callback (gpointer key, gpointer value, void *data)
                                *purl = url;
                                rspamd_lua_setclass (L, "rspamd{url}", -1);
                                lua_pushlightuserdata (L, nparam);
+                               rspamd_symbols_cache_set_cur_item (task, param->item);
 
                                if (lua_pcall (L, 3, 0, 0) != 0) {
                                        msg_err_task ("cannot call for redirector script: %s",
@@ -1830,8 +1832,7 @@ surbl_tree_redirector_callback (gpointer key, gpointer value, void *data)
                                        lua_pop (L, 1);
                                }
                                else {
-                                       nparam->w = rspamd_session_get_watcher (task->s);
-                                       rspamd_session_watcher_push (task->s);
+                                       nparam->item = param->item;
                                }
                        }
                        else {
@@ -1869,7 +1870,7 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data)
                return;
        }
 
-       make_surbl_requests (url, param->task, param->suffix, FALSE,
+       make_surbl_requests (url, param->task, param->item, param->suffix, FALSE,
                        param->tree, surbl_module_ctx);
 }
 
@@ -1897,6 +1898,8 @@ surbl_test_url (struct rspamd_task *task,
        param->suffix = suffix;
        param->tree = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
        param->ctx = surbl_module_ctx;
+       param->item = item;
+       rspamd_symcache_item_async_inc (task, item);
        rspamd_mempool_add_destructor (task->task_pool,
                (rspamd_mempool_destruct_t)g_hash_table_unref,
                param->tree);
@@ -1946,6 +1949,8 @@ surbl_test_url (struct rspamd_task *task,
                        }
                }
        }
+
+       rspamd_symcache_item_async_dec_check (task, item);
 }
 
 static void
@@ -1969,6 +1974,8 @@ surbl_test_redirector (struct rspamd_task *task,
        param->suffix = NULL;
        param->redirector_requests = 0;
        param->ctx = surbl_module_ctx;
+       param->item = item;
+       rspamd_symcache_item_async_inc (task, item);
        g_hash_table_foreach (task->urls, surbl_tree_redirector_callback, param);
 
        /* We also need to check and process img URLs */
@@ -1992,6 +1999,8 @@ surbl_test_redirector (struct rspamd_task *task,
                        }
                }
        }
+
+       rspamd_symcache_item_async_dec_check (task, item);
 }
 
 
@@ -2147,9 +2156,6 @@ surbl_continue_process_handler (lua_State *L)
                                        param->task->message_id,
                                        param->url->urllen, param->url->string);
                }
-
-               rspamd_session_watcher_pop (task->s, param->w);
-               param->w = NULL;
        }
        else {
                return luaL_error (L, "invalid arguments");