diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-10-20 14:48:06 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-10-20 19:43:32 +0100 |
commit | 1332f06a95c874480cfd155befe8ba840871b28d (patch) | |
tree | b1265930c015a5d3bcf58d6ccb1c16b9916ad4ba | |
parent | b849942bc86cf57d4ae4e57a676ed0f6a057cad2 (diff) | |
download | rspamd-1332f06a95c874480cfd155befe8ba840871b28d.tar.gz rspamd-1332f06a95c874480cfd155befe8ba840871b28d.zip |
[Project] Further cleanup from the watchers
-rw-r--r-- | src/libserver/symbols_cache.c | 79 | ||||
-rw-r--r-- | src/lua/lua_config.c | 151 |
2 files changed, 22 insertions, 208 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index 216f8aabf..5d30f46dc 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -1293,51 +1293,6 @@ rspamd_symbols_cache_metric_limit (struct rspamd_task *task, return FALSE; } -static void -rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) -{ - struct rspamd_task *task = sessiond; - struct rspamd_symcache_item *item = ud, *it; - struct cache_savepoint *checkpoint; - struct symbols_cache *cache; - gint i, remain = 0; - - checkpoint = task->checkpoint; - cache = task->cfg->cache; - - /* Specify that we are done with this item */ - setbit (checkpoint->processed_bits, item->id * 2 + 1); - - if (checkpoint->pass > 0) { -#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC - event_base_update_cache_time (task->ev_base); -#endif - for (i = 0; i < (gint)checkpoint->waitq->len; i ++) { - it = g_ptr_array_index (checkpoint->waitq, i); - - if (!isset (checkpoint->processed_bits, it->id * 2)) { - if (!rspamd_symbols_cache_check_deps (task, cache, it, - checkpoint, 0, TRUE)) { - remain ++; - } - else { - msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)", - item->id, - item->symbol, - it->id, - it->symbol); - rspamd_symbols_cache_check_symbol (task, cache, it, - checkpoint); - } - } - } - } - - msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting", - item->id, item->symbol, - remain); -} - static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, struct symbols_cache *cache, @@ -1391,10 +1346,11 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, #endif item->start_ticks = t1; item->async_events = 0; + g_assert (checkpoint->cur_item == NULL); + checkpoint->cur_item = item; checkpoint->items_inflight ++; /* Callback now must finalize itself */ item->func (task, item, item->user_data); - rspamd_session_watch_stop (task->s); if (checkpoint->items_inflight == 0) { return TRUE; @@ -1778,17 +1734,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, "resolved", item->id, item->symbol); - PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) { - if (item->id == tmp_it->id) { - found = TRUE; - break; - } - } - - if (!found) { - g_ptr_array_add (checkpoint->waitq, item); - } - continue; } @@ -1801,22 +1746,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, break; case RSPAMD_CACHE_PASS_WAIT_FILTERS: - /* We just go through the blocked symbols and check if they are ready */ - for (i = 0; i < (gint)checkpoint->waitq->len; i ++) { - item = g_ptr_array_index (checkpoint->waitq, i); - - if (!isset (checkpoint->processed_bits, item->id * 2)) { - if (!rspamd_symbols_cache_check_deps (task, cache, item, - checkpoint, 0, FALSE)) { - break; - } - - rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint); - } - } - - if (checkpoint->waitq->len == 0 || + if (checkpoint->items_inflight == 0 || stage == RSPAMD_TASK_STAGE_POST_FILTERS) { checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; } @@ -1879,7 +1809,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; } - if (checkpoint->waitq->len == 0 || + if (checkpoint->items_inflight == 0 || stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; } @@ -2644,6 +2574,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task, msg_debug_cache_task ("process finalize for item %s", item->symbol); setbit (checkpoint->processed_bits, item->id + 1); checkpoint->items_inflight --; + checkpoint->cur_item = NULL; #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC event_base_update_cache_time (task->ev_base); diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 1ed13245e..404251592 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -1055,13 +1055,11 @@ struct lua_callback_data { gint ref; } callback; gboolean cb_is_ref; + + /* Dynamic data */ gint stack_level; gint order; -}; - -struct lua_watcher_data { - struct lua_callback_data *cbd; - gint cb_ref; + struct rspamd_symcache_item *item; }; /* @@ -1093,120 +1091,12 @@ rspamd_compare_order_func (gconstpointer a, gconstpointer b) return cb2->order - cb1->order; } -static void -lua_watcher_callback (gpointer session_data, gpointer ud) -{ - struct rspamd_task *task = session_data, **ptask; - struct lua_watcher_data *wd = ud; - lua_State *L; - gint level, nresults, err_idx, ret; - GString *tb; - struct rspamd_symbol_result *s; - - L = wd->cbd->L; - level = lua_gettop (L); - lua_pushcfunction (L, &rspamd_lua_traceback); - err_idx = lua_gettop (L); +static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, + int ret); - level ++; - lua_rawgeti (L, LUA_REGISTRYINDEX, wd->cb_ref); - - ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (L, "rspamd{task}", -1); - *ptask = task; - - if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) { - tb = lua_touserdata (L, -1); - msg_err_task ("call to (%s) failed (%d): %v", - wd->cbd->symbol, ret, tb); - - if (tb) { - g_string_free (tb, TRUE); - } - } - else { - nresults = lua_gettop (L) - level; - - if (nresults >= 1) { - /* Function returned boolean, so maybe we need to insert result? */ - gint res = 0; - gint i; - gdouble flag = 1.0; - gint type; - struct lua_watcher_data *nwd; - - type = lua_type (L, level + 1); - - if (type == LUA_TBOOLEAN) { - res = lua_toboolean (L, level + 1); - } - else if (type == LUA_TFUNCTION) { - /* Function returned a closure that should be watched for */ - nwd = rspamd_mempool_alloc (task->task_pool, sizeof (*nwd)); - lua_pushvalue (L, level + 1); - nwd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX); - nwd->cbd = wd->cbd; - rspamd_session_watcher_push_callback (task->s, - rspamd_session_get_watcher (task->s), - lua_watcher_callback, nwd); - /* - * We immediately pop watcher since we have not registered - * any async events from here - */ - rspamd_session_watcher_pop (task->s, - rspamd_session_get_watcher (task->s)); - } - else { - res = (gint)lua_tonumber (L, level + 1); - } - - if (res) { - gint first_opt = 2; - - if (lua_type (L, level + 2) == LUA_TNUMBER) { - flag = lua_tonumber (L, level + 2); - /* Shift opt index */ - first_opt = 3; - } - else { - flag = res; - } - - s = rspamd_task_insert_result (task, - wd->cbd->symbol, flag, NULL); - - if (s) { - guint last_pos = lua_gettop (L); - - for (i = level + first_opt; i <= last_pos; i++) { - if (lua_type (L, i) == LUA_TSTRING) { - const char *opt = lua_tostring (L, i); - - rspamd_task_add_result_option (task, s, opt); - } - else if (lua_type (L, i) == LUA_TTABLE) { - lua_pushvalue (L, i); - - for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) { - const char *opt = lua_tostring (L, -1); - - rspamd_task_add_result_option (task, s, opt); - } - - lua_pop (L, 1); - } - } - } - } - } - } - - lua_settop (L, err_idx - 1); -} - -static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret); - -static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg); +static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, + int ret, + const char *msg); static void lua_metric_symbol_callback (struct rspamd_task *task, @@ -1217,6 +1107,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, struct rspamd_task **ptask; struct thread_entry *thread_entry; + rspamd_symcache_item_async_inc (task, item); thread_entry = lua_thread_pool_get_for_task (task); g_assert(thread_entry->cd == NULL); @@ -1224,6 +1115,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, lua_State *thread = thread_entry->lua_state; cd->stack_level = lua_gettop (thread); + cd->item = item; if (cd->cb_is_ref) { lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref); @@ -1243,11 +1135,15 @@ lua_metric_symbol_callback (struct rspamd_task *task, } static void -lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg) +lua_metric_symbol_callback_error (struct thread_entry *thread_entry, + int ret, + const char *msg) { struct lua_callback_data *cd = thread_entry->cd; struct rspamd_task *task = thread_entry->task; msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg); + + rspamd_symcache_item_async_dec_check (task, cd->item); } static void @@ -1270,7 +1166,6 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret) gint i; gdouble flag = 1.0; gint type; - struct lua_watcher_data *wd; type = lua_type (L, cd->stack_level + 1); @@ -1278,20 +1173,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret) res = lua_toboolean (L, cd->stack_level + 1); } else if (type == LUA_TFUNCTION) { - /* Function returned a closure that should be watched for */ - wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); - lua_pushvalue (L /*cd->L*/, cd->stack_level + 1); - wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX); - wd->cbd = cd; - rspamd_session_watcher_push_callback (task->s, - rspamd_session_get_watcher (task->s), - lua_watcher_callback, wd); - /* - * We immediately pop watcher since we have not registered - * any async events from here - */ - rspamd_session_watcher_pop (task->s, - rspamd_session_get_watcher (task->s)); + g_assert_not_reached (); } else { res = lua_tonumber (L, cd->stack_level + 1); @@ -1342,6 +1224,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret) g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */ cd->stack_level = 0; + rspamd_symcache_item_async_dec_check (task, cd->item); } static gint |