diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-10-20 13:42:37 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-10-20 19:43:32 +0100 |
commit | 450179f942070bf750514d43dd091ed529475739 (patch) | |
tree | b970c75426882bd7e9d1cef6a1948d502d22505d /src | |
parent | afa07e499f51eac10dc1ed6a566752904f905f80 (diff) | |
download | rspamd-450179f942070bf750514d43dd091ed529475739.tar.gz rspamd-450179f942070bf750514d43dd091ed529475739.zip |
[Project] Implement counter for async events in symcache item
Diffstat (limited to 'src')
-rw-r--r-- | src/libserver/symbols_cache.c | 64 | ||||
-rw-r--r-- | src/libserver/symbols_cache.h | 11 | ||||
-rw-r--r-- | src/plugins/surbl.c | 8 |
3 files changed, 62 insertions, 21 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index 90fbf1df8..b5db5c7f9 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -41,6 +41,10 @@ rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \ G_STRFUNC, \ __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) INIT_LOG_MODULE(symcache) @@ -104,6 +108,7 @@ struct rspamd_symcache_item { /* Per process counter */ gdouble start_ticks; + guint async_events; struct rspamd_counter_data *cd; gchar *symbol; enum rspamd_symbol_type type; @@ -1316,7 +1321,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) remain ++; } else { - msg_debug_task ("watcher for %d(%s), unblocked item %d(%s)", + msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)", item->id, item->symbol, it->id, @@ -1328,7 +1333,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) } } - msg_debug_task ("finished watcher for %d(%s), %ud symbols waiting", + msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting", item->id, item->symbol, remain); } @@ -1374,7 +1379,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, } if (check) { - msg_debug_task ("execute %s, %d", item->symbol, item->id); + msg_debug_cache_task ("execute %s, %d", item->symbol, item->id); #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC struct timeval tv; @@ -1385,6 +1390,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, t1 = rspamd_get_ticks (FALSE); #endif item->start_ticks = t1; + item->async_events = 0; checkpoint->items_inflight ++; /* Callback now must finalize itself */ item->func (task, item, item->user_data); @@ -1397,7 +1403,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, return FALSE; } else { - msg_debug_task ("skipping check of %s as its start condition is false", + msg_debug_cache_task ("skipping check of %s as its start condition is false", item->symbol); setbit (checkpoint->processed_bits, item->id * 2 + 1); @@ -1438,7 +1444,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, if (dep->item == NULL) { /* Assume invalid deps as done */ - msg_debug_task ("symbol %d(%s) has invalid dependencies on %d(%s)", + msg_debug_cache_task ("symbol %d(%s) has invalid dependencies on %d(%s)", item->id, item->symbol, dep->id, dep->sym); continue; } @@ -1454,7 +1460,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, check_only)) { ret = FALSE; - msg_debug_task ("delayed dependency %d(%s) for " + msg_debug_cache_task ("delayed dependency %d(%s) for " "symbol %d(%s)", dep->id, dep->sym, item->id, item->symbol); } @@ -1463,19 +1469,19 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, checkpoint)) { /* Now started, but has events pending */ ret = FALSE; - msg_debug_task ("started check of %d(%s) symbol " + msg_debug_cache_task ("started check of %d(%s) symbol " "as dep for " "%d(%s)", dep->id, dep->sym, item->id, item->symbol); } else { - msg_debug_task ("dependency %d(%s) for symbol %d(%s) is " + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " "already processed", dep->id, dep->sym, item->id, item->symbol); } } else { - msg_debug_task ("dependency %d(%s) for symbol %d(%s) " + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) " "cannot be started now", dep->id, dep->sym, item->id, item->symbol); @@ -1484,7 +1490,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, } else { /* Started but not finished */ - msg_debug_task ("dependency %d(%s) for symbol %d(%s) is " + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " "still executing", dep->id, dep->sym, item->id, item->symbol); @@ -1492,7 +1498,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, } } else { - msg_debug_task ("dependency %d(%s) for symbol %d(%s) is already " + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is already " "checked", dep->id, dep->sym, item->id, item->symbol); @@ -1671,7 +1677,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; } - msg_debug_task ("symbols processing stage at pass: %d", checkpoint->pass); + msg_debug_cache_task ("symbols processing stage at pass: %d", checkpoint->pass); start_events_pending = rspamd_session_events_pending (task->s); switch (checkpoint->pass) { @@ -1768,7 +1774,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, guint j; struct rspamd_symcache_item *tmp_it; - msg_debug_task ("blocked execution of %d(%s) unless deps are " + msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " "resolved", item->id, item->symbol); @@ -2367,10 +2373,10 @@ rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task, setbit (checkpoint->processed_bits, item->id * 2); setbit (checkpoint->processed_bits, item->id * 2 + 1); - msg_debug_task ("disable execution of %s", symbol); + msg_debug_cache_task ("disable execution of %s", symbol); } else { - msg_debug_task ("skip squeezed symbol %s", symbol); + msg_debug_cache_task ("skip squeezed symbol %s", symbol); } } else { @@ -2403,7 +2409,7 @@ rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task, clrbit (checkpoint->processed_bits, item->id * 2); clrbit (checkpoint->processed_bits, item->id * 2 + 1); - msg_debug_task ("enable execution of %s (%d)", symbol, id); + msg_debug_cache_task ("enable execution of %s (%d)", symbol, id); } else { msg_info_task ("cannot enable %s: not found", symbol); @@ -2631,8 +2637,12 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task, struct timeval tv; const gdouble slow_diff_limit = 0.1; - setbit (checkpoint->processed_bits, item->id + 1); + /* Sanity checks */ g_assert (checkpoint->items_inflight > 0); + g_assert (item->async_events == 0); + + msg_debug_cache_task ("process post" + setbit (checkpoint->processed_bits, item->id + 1); checkpoint->items_inflight --; #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC @@ -2666,4 +2676,24 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task, rdep, checkpoint); } +} + +guint +rspamd_symcahe_item_async_inc (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + msg_debug_cache_task ("increase async events counter for %s = %d + 1", + item->symbol, item->async_events); + return ++item->async_events; +} + +guint +rspamd_symcahe_item_async_dec (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + msg_debug_cache_task ("decrease async events counter for %s = %d - 1", + item->symbol, item->async_events); + g_assert (item->async_events > 0); + + return --item->async_events; }
\ No newline at end of file diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h index f8764ee8c..adf7d2b26 100644 --- a/src/libserver/symbols_cache.h +++ b/src/libserver/symbols_cache.h @@ -331,4 +331,15 @@ struct rspamd_symcache_item *rspamd_symbols_cache_set_cur_item (struct rspamd_ta */ void rspamd_symbols_cache_finalize_item (struct rspamd_task *task, struct rspamd_symcache_item *item); + +/* + * Increase number of async events pending for an item + */ +guint rspamd_symcahe_item_async_inc (struct rspamd_task *task, + struct rspamd_symcache_item *item); +/* + * Decrease number of async events pending for an item, asserts if no events pending + */ +guint rspamd_symcahe_item_async_dec (struct rspamd_task *task, + struct rspamd_symcache_item *item); #endif diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index fa4b92162..88c6a0823 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -108,7 +108,7 @@ struct dns_param { struct rspamd_task *task; gchar *host_resolve; struct suffix_item *suffix; - struct rspamd_async_watcher *w; + struct rspamd_symcache_item *item; struct surbl_module_ctx *ctx; }; @@ -120,7 +120,7 @@ struct redirector_param { struct rspamd_http_connection *conn; GHashTable *tree; struct suffix_item *suffix; - struct rspamd_async_watcher *w; + struct rspamd_symcache_item *item; gint sock; guint redirector_requests; }; @@ -1323,6 +1323,7 @@ format_surbl_request (rspamd_mempool_t * pool, static void make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task, + struct rspamd_symcache_item *item, struct suffix_item *suffix, gboolean forced, GHashTable *tree, struct surbl_ctx *surbl_module_ctx) @@ -1379,8 +1380,7 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task, if (make_dns_request_task (task, surbl_dns_ip_callback, (void *) param, RDNS_REQUEST_A, surbl_req)) { - param->w = rspamd_session_get_watcher (task->s); - rspamd_session_watcher_push (task->s); + param->item = item; } } } |