summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 14:48:06 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 19:43:32 +0100
commit1332f06a95c874480cfd155befe8ba840871b28d (patch)
treeb1265930c015a5d3bcf58d6ccb1c16b9916ad4ba
parentb849942bc86cf57d4ae4e57a676ed0f6a057cad2 (diff)
downloadrspamd-1332f06a95c874480cfd155befe8ba840871b28d.tar.gz
rspamd-1332f06a95c874480cfd155befe8ba840871b28d.zip
[Project] Further cleanup from the watchers
-rw-r--r--src/libserver/symbols_cache.c79
-rw-r--r--src/lua/lua_config.c151
2 files changed, 22 insertions, 208 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 216f8aabf..5d30f46dc 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -1293,51 +1293,6 @@ rspamd_symbols_cache_metric_limit (struct rspamd_task *task,
return FALSE;
}
-static void
-rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
-{
- struct rspamd_task *task = sessiond;
- struct rspamd_symcache_item *item = ud, *it;
- struct cache_savepoint *checkpoint;
- struct symbols_cache *cache;
- gint i, remain = 0;
-
- checkpoint = task->checkpoint;
- cache = task->cfg->cache;
-
- /* Specify that we are done with this item */
- setbit (checkpoint->processed_bits, item->id * 2 + 1);
-
- if (checkpoint->pass > 0) {
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- event_base_update_cache_time (task->ev_base);
-#endif
- for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
- it = g_ptr_array_index (checkpoint->waitq, i);
-
- if (!isset (checkpoint->processed_bits, it->id * 2)) {
- if (!rspamd_symbols_cache_check_deps (task, cache, it,
- checkpoint, 0, TRUE)) {
- remain ++;
- }
- else {
- msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)",
- item->id,
- item->symbol,
- it->id,
- it->symbol);
- rspamd_symbols_cache_check_symbol (task, cache, it,
- checkpoint);
- }
- }
- }
- }
-
- msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting",
- item->id, item->symbol,
- remain);
-}
-
static gboolean
rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
@@ -1391,10 +1346,11 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
#endif
item->start_ticks = t1;
item->async_events = 0;
+ g_assert (checkpoint->cur_item == NULL);
+ checkpoint->cur_item = item;
checkpoint->items_inflight ++;
/* Callback now must finalize itself */
item->func (task, item, item->user_data);
- rspamd_session_watch_stop (task->s);
if (checkpoint->items_inflight == 0) {
return TRUE;
@@ -1778,17 +1734,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
"resolved",
item->id, item->symbol);
- PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) {
- if (item->id == tmp_it->id) {
- found = TRUE;
- break;
- }
- }
-
- if (!found) {
- g_ptr_array_add (checkpoint->waitq, item);
- }
-
continue;
}
@@ -1801,22 +1746,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
break;
case RSPAMD_CACHE_PASS_WAIT_FILTERS:
- /* We just go through the blocked symbols and check if they are ready */
- for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
- item = g_ptr_array_index (checkpoint->waitq, i);
-
- if (!isset (checkpoint->processed_bits, item->id * 2)) {
- if (!rspamd_symbols_cache_check_deps (task, cache, item,
- checkpoint, 0, FALSE)) {
- break;
- }
-
- rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint);
- }
- }
-
- if (checkpoint->waitq->len == 0 ||
+ if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
}
@@ -1879,7 +1809,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}
- if (checkpoint->waitq->len == 0 ||
+ if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}
@@ -2644,6 +2574,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
msg_debug_cache_task ("process finalize for item %s", item->symbol);
setbit (checkpoint->processed_bits, item->id + 1);
checkpoint->items_inflight --;
+ checkpoint->cur_item = NULL;
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
event_base_update_cache_time (task->ev_base);
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 1ed13245e..404251592 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -1055,13 +1055,11 @@ struct lua_callback_data {
gint ref;
} callback;
gboolean cb_is_ref;
+
+ /* Dynamic data */
gint stack_level;
gint order;
-};
-
-struct lua_watcher_data {
- struct lua_callback_data *cbd;
- gint cb_ref;
+ struct rspamd_symcache_item *item;
};
/*
@@ -1093,120 +1091,12 @@ rspamd_compare_order_func (gconstpointer a, gconstpointer b)
return cb2->order - cb1->order;
}
-static void
-lua_watcher_callback (gpointer session_data, gpointer ud)
-{
- struct rspamd_task *task = session_data, **ptask;
- struct lua_watcher_data *wd = ud;
- lua_State *L;
- gint level, nresults, err_idx, ret;
- GString *tb;
- struct rspamd_symbol_result *s;
-
- L = wd->cbd->L;
- level = lua_gettop (L);
- lua_pushcfunction (L, &rspamd_lua_traceback);
- err_idx = lua_gettop (L);
+static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry,
+ int ret);
- level ++;
- lua_rawgeti (L, LUA_REGISTRYINDEX, wd->cb_ref);
-
- ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (L, "rspamd{task}", -1);
- *ptask = task;
-
- if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) {
- tb = lua_touserdata (L, -1);
- msg_err_task ("call to (%s) failed (%d): %v",
- wd->cbd->symbol, ret, tb);
-
- if (tb) {
- g_string_free (tb, TRUE);
- }
- }
- else {
- nresults = lua_gettop (L) - level;
-
- if (nresults >= 1) {
- /* Function returned boolean, so maybe we need to insert result? */
- gint res = 0;
- gint i;
- gdouble flag = 1.0;
- gint type;
- struct lua_watcher_data *nwd;
-
- type = lua_type (L, level + 1);
-
- if (type == LUA_TBOOLEAN) {
- res = lua_toboolean (L, level + 1);
- }
- else if (type == LUA_TFUNCTION) {
- /* Function returned a closure that should be watched for */
- nwd = rspamd_mempool_alloc (task->task_pool, sizeof (*nwd));
- lua_pushvalue (L, level + 1);
- nwd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
- nwd->cbd = wd->cbd;
- rspamd_session_watcher_push_callback (task->s,
- rspamd_session_get_watcher (task->s),
- lua_watcher_callback, nwd);
- /*
- * We immediately pop watcher since we have not registered
- * any async events from here
- */
- rspamd_session_watcher_pop (task->s,
- rspamd_session_get_watcher (task->s));
- }
- else {
- res = (gint)lua_tonumber (L, level + 1);
- }
-
- if (res) {
- gint first_opt = 2;
-
- if (lua_type (L, level + 2) == LUA_TNUMBER) {
- flag = lua_tonumber (L, level + 2);
- /* Shift opt index */
- first_opt = 3;
- }
- else {
- flag = res;
- }
-
- s = rspamd_task_insert_result (task,
- wd->cbd->symbol, flag, NULL);
-
- if (s) {
- guint last_pos = lua_gettop (L);
-
- for (i = level + first_opt; i <= last_pos; i++) {
- if (lua_type (L, i) == LUA_TSTRING) {
- const char *opt = lua_tostring (L, i);
-
- rspamd_task_add_result_option (task, s, opt);
- }
- else if (lua_type (L, i) == LUA_TTABLE) {
- lua_pushvalue (L, i);
-
- for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
- const char *opt = lua_tostring (L, -1);
-
- rspamd_task_add_result_option (task, s, opt);
- }
-
- lua_pop (L, 1);
- }
- }
- }
- }
- }
- }
-
- lua_settop (L, err_idx - 1);
-}
-
-static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret);
-
-static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg);
+static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry,
+ int ret,
+ const char *msg);
static void
lua_metric_symbol_callback (struct rspamd_task *task,
@@ -1217,6 +1107,7 @@ lua_metric_symbol_callback (struct rspamd_task *task,
struct rspamd_task **ptask;
struct thread_entry *thread_entry;
+ rspamd_symcache_item_async_inc (task, item);
thread_entry = lua_thread_pool_get_for_task (task);
g_assert(thread_entry->cd == NULL);
@@ -1224,6 +1115,7 @@ lua_metric_symbol_callback (struct rspamd_task *task,
lua_State *thread = thread_entry->lua_state;
cd->stack_level = lua_gettop (thread);
+ cd->item = item;
if (cd->cb_is_ref) {
lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1243,11 +1135,15 @@ lua_metric_symbol_callback (struct rspamd_task *task,
}
static void
-lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg)
+lua_metric_symbol_callback_error (struct thread_entry *thread_entry,
+ int ret,
+ const char *msg)
{
struct lua_callback_data *cd = thread_entry->cd;
struct rspamd_task *task = thread_entry->task;
msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg);
+
+ rspamd_symcache_item_async_dec_check (task, cd->item);
}
static void
@@ -1270,7 +1166,6 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
gint i;
gdouble flag = 1.0;
gint type;
- struct lua_watcher_data *wd;
type = lua_type (L, cd->stack_level + 1);
@@ -1278,20 +1173,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
res = lua_toboolean (L, cd->stack_level + 1);
}
else if (type == LUA_TFUNCTION) {
- /* Function returned a closure that should be watched for */
- wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
- lua_pushvalue (L /*cd->L*/, cd->stack_level + 1);
- wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
- wd->cbd = cd;
- rspamd_session_watcher_push_callback (task->s,
- rspamd_session_get_watcher (task->s),
- lua_watcher_callback, wd);
- /*
- * We immediately pop watcher since we have not registered
- * any async events from here
- */
- rspamd_session_watcher_pop (task->s,
- rspamd_session_get_watcher (task->s));
+ g_assert_not_reached ();
}
else {
res = lua_tonumber (L, cd->stack_level + 1);
@@ -1342,6 +1224,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */
cd->stack_level = 0;
+ rspamd_symcache_item_async_dec_check (task, cd->item);
}
static gint