summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 13:42:37 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 19:43:32 +0100
commit450179f942070bf750514d43dd091ed529475739 (patch)
treeb970c75426882bd7e9d1cef6a1948d502d22505d /src
parentafa07e499f51eac10dc1ed6a566752904f905f80 (diff)
downloadrspamd-450179f942070bf750514d43dd091ed529475739.tar.gz
rspamd-450179f942070bf750514d43dd091ed529475739.zip
[Project] Implement counter for async events in symcache item
Diffstat (limited to 'src')
-rw-r--r--src/libserver/symbols_cache.c64
-rw-r--r--src/libserver/symbols_cache.h11
-rw-r--r--src/plugins/surbl.c8
3 files changed, 62 insertions, 21 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 90fbf1df8..b5db5c7f9 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -41,6 +41,10 @@
rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
+#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
INIT_LOG_MODULE(symcache)
@@ -104,6 +108,7 @@ struct rspamd_symcache_item {
/* Per process counter */
gdouble start_ticks;
+ guint async_events;
struct rspamd_counter_data *cd;
gchar *symbol;
enum rspamd_symbol_type type;
@@ -1316,7 +1321,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
remain ++;
}
else {
- msg_debug_task ("watcher for %d(%s), unblocked item %d(%s)",
+ msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)",
item->id,
item->symbol,
it->id,
@@ -1328,7 +1333,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
}
}
- msg_debug_task ("finished watcher for %d(%s), %ud symbols waiting",
+ msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting",
item->id, item->symbol,
remain);
}
@@ -1374,7 +1379,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
}
if (check) {
- msg_debug_task ("execute %s, %d", item->symbol, item->id);
+ msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
struct timeval tv;
@@ -1385,6 +1390,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
t1 = rspamd_get_ticks (FALSE);
#endif
item->start_ticks = t1;
+ item->async_events = 0;
checkpoint->items_inflight ++;
/* Callback now must finalize itself */
item->func (task, item, item->user_data);
@@ -1397,7 +1403,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
return FALSE;
}
else {
- msg_debug_task ("skipping check of %s as its start condition is false",
+ msg_debug_cache_task ("skipping check of %s as its start condition is false",
item->symbol);
setbit (checkpoint->processed_bits, item->id * 2 + 1);
@@ -1438,7 +1444,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
if (dep->item == NULL) {
/* Assume invalid deps as done */
- msg_debug_task ("symbol %d(%s) has invalid dependencies on %d(%s)",
+ msg_debug_cache_task ("symbol %d(%s) has invalid dependencies on %d(%s)",
item->id, item->symbol, dep->id, dep->sym);
continue;
}
@@ -1454,7 +1460,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
check_only)) {
ret = FALSE;
- msg_debug_task ("delayed dependency %d(%s) for "
+ msg_debug_cache_task ("delayed dependency %d(%s) for "
"symbol %d(%s)",
dep->id, dep->sym, item->id, item->symbol);
}
@@ -1463,19 +1469,19 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
checkpoint)) {
/* Now started, but has events pending */
ret = FALSE;
- msg_debug_task ("started check of %d(%s) symbol "
+ msg_debug_cache_task ("started check of %d(%s) symbol "
"as dep for "
"%d(%s)",
dep->id, dep->sym, item->id, item->symbol);
}
else {
- msg_debug_task ("dependency %d(%s) for symbol %d(%s) is "
+ msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is "
"already processed",
dep->id, dep->sym, item->id, item->symbol);
}
}
else {
- msg_debug_task ("dependency %d(%s) for symbol %d(%s) "
+ msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) "
"cannot be started now",
dep->id, dep->sym,
item->id, item->symbol);
@@ -1484,7 +1490,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
}
else {
/* Started but not finished */
- msg_debug_task ("dependency %d(%s) for symbol %d(%s) is "
+ msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is "
"still executing",
dep->id, dep->sym,
item->id, item->symbol);
@@ -1492,7 +1498,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
}
}
else {
- msg_debug_task ("dependency %d(%s) for symbol %d(%s) is already "
+ msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is already "
"checked",
dep->id, dep->sym,
item->id, item->symbol);
@@ -1671,7 +1677,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}
- msg_debug_task ("symbols processing stage at pass: %d", checkpoint->pass);
+ msg_debug_cache_task ("symbols processing stage at pass: %d", checkpoint->pass);
start_events_pending = rspamd_session_events_pending (task->s);
switch (checkpoint->pass) {
@@ -1768,7 +1774,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
guint j;
struct rspamd_symcache_item *tmp_it;
- msg_debug_task ("blocked execution of %d(%s) unless deps are "
+ msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
"resolved",
item->id, item->symbol);
@@ -2367,10 +2373,10 @@ rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task,
setbit (checkpoint->processed_bits, item->id * 2);
setbit (checkpoint->processed_bits, item->id * 2 + 1);
- msg_debug_task ("disable execution of %s", symbol);
+ msg_debug_cache_task ("disable execution of %s", symbol);
}
else {
- msg_debug_task ("skip squeezed symbol %s", symbol);
+ msg_debug_cache_task ("skip squeezed symbol %s", symbol);
}
}
else {
@@ -2403,7 +2409,7 @@ rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task,
clrbit (checkpoint->processed_bits, item->id * 2);
clrbit (checkpoint->processed_bits, item->id * 2 + 1);
- msg_debug_task ("enable execution of %s (%d)", symbol, id);
+ msg_debug_cache_task ("enable execution of %s (%d)", symbol, id);
}
else {
msg_info_task ("cannot enable %s: not found", symbol);
@@ -2631,8 +2637,12 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
struct timeval tv;
const gdouble slow_diff_limit = 0.1;
- setbit (checkpoint->processed_bits, item->id + 1);
+ /* Sanity checks */
g_assert (checkpoint->items_inflight > 0);
+ g_assert (item->async_events == 0);
+
+ msg_debug_cache_task ("process post"
+ setbit (checkpoint->processed_bits, item->id + 1);
checkpoint->items_inflight --;
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
@@ -2666,4 +2676,24 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
rdep,
checkpoint);
}
+}
+
+guint
+rspamd_symcahe_item_async_inc (struct rspamd_task *task,
+ struct rspamd_symcache_item *item)
+{
+ msg_debug_cache_task ("increase async events counter for %s = %d + 1",
+ item->symbol, item->async_events);
+ return ++item->async_events;
+}
+
+guint
+rspamd_symcahe_item_async_dec (struct rspamd_task *task,
+ struct rspamd_symcache_item *item)
+{
+ msg_debug_cache_task ("decrease async events counter for %s = %d - 1",
+ item->symbol, item->async_events);
+ g_assert (item->async_events > 0);
+
+ return --item->async_events;
} \ No newline at end of file
diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h
index f8764ee8c..adf7d2b26 100644
--- a/src/libserver/symbols_cache.h
+++ b/src/libserver/symbols_cache.h
@@ -331,4 +331,15 @@ struct rspamd_symcache_item *rspamd_symbols_cache_set_cur_item (struct rspamd_ta
*/
void rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
struct rspamd_symcache_item *item);
+
+/*
+ * Increase number of async events pending for an item
+ */
+guint rspamd_symcahe_item_async_inc (struct rspamd_task *task,
+ struct rspamd_symcache_item *item);
+/*
+ * Decrease number of async events pending for an item, asserts if no events pending
+ */
+guint rspamd_symcahe_item_async_dec (struct rspamd_task *task,
+ struct rspamd_symcache_item *item);
#endif
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index fa4b92162..88c6a0823 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -108,7 +108,7 @@ struct dns_param {
struct rspamd_task *task;
gchar *host_resolve;
struct suffix_item *suffix;
- struct rspamd_async_watcher *w;
+ struct rspamd_symcache_item *item;
struct surbl_module_ctx *ctx;
};
@@ -120,7 +120,7 @@ struct redirector_param {
struct rspamd_http_connection *conn;
GHashTable *tree;
struct suffix_item *suffix;
- struct rspamd_async_watcher *w;
+ struct rspamd_symcache_item *item;
gint sock;
guint redirector_requests;
};
@@ -1323,6 +1323,7 @@ format_surbl_request (rspamd_mempool_t * pool,
static void
make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task,
+ struct rspamd_symcache_item *item,
struct suffix_item *suffix,
gboolean forced, GHashTable *tree,
struct surbl_ctx *surbl_module_ctx)
@@ -1379,8 +1380,7 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task,
if (make_dns_request_task (task,
surbl_dns_ip_callback,
(void *) param, RDNS_REQUEST_A, surbl_req)) {
- param->w = rspamd_session_get_watcher (task->s);
- rspamd_session_watcher_push (task->s);
+ param->item = item;
}
}
}