Browse Source

[Project] Further cleanup from the watchers

tags/1.8.2
Vsevolod Stakhov 5 years ago
parent
commit
1332f06a95
2 changed files with 22 additions and 208 deletions
  1. 5
    74
      src/libserver/symbols_cache.c
  2. 17
    134
      src/lua/lua_config.c

+ 5
- 74
src/libserver/symbols_cache.c View File

@@ -1293,51 +1293,6 @@ rspamd_symbols_cache_metric_limit (struct rspamd_task *task,
return FALSE;
}

static void
rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
{
struct rspamd_task *task = sessiond;
struct rspamd_symcache_item *item = ud, *it;
struct cache_savepoint *checkpoint;
struct symbols_cache *cache;
gint i, remain = 0;

checkpoint = task->checkpoint;
cache = task->cfg->cache;

/* Specify that we are done with this item */
setbit (checkpoint->processed_bits, item->id * 2 + 1);

if (checkpoint->pass > 0) {
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
event_base_update_cache_time (task->ev_base);
#endif
for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
it = g_ptr_array_index (checkpoint->waitq, i);

if (!isset (checkpoint->processed_bits, it->id * 2)) {
if (!rspamd_symbols_cache_check_deps (task, cache, it,
checkpoint, 0, TRUE)) {
remain ++;
}
else {
msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)",
item->id,
item->symbol,
it->id,
it->symbol);
rspamd_symbols_cache_check_symbol (task, cache, it,
checkpoint);
}
}
}
}

msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting",
item->id, item->symbol,
remain);
}

static gboolean
rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
@@ -1391,10 +1346,11 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
#endif
item->start_ticks = t1;
item->async_events = 0;
g_assert (checkpoint->cur_item == NULL);
checkpoint->cur_item = item;
checkpoint->items_inflight ++;
/* Callback now must finalize itself */
item->func (task, item, item->user_data);
rspamd_session_watch_stop (task->s);

if (checkpoint->items_inflight == 0) {
return TRUE;
@@ -1778,17 +1734,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
"resolved",
item->id, item->symbol);

PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) {
if (item->id == tmp_it->id) {
found = TRUE;
break;
}
}

if (!found) {
g_ptr_array_add (checkpoint->waitq, item);
}

continue;
}

@@ -1801,22 +1746,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
break;

case RSPAMD_CACHE_PASS_WAIT_FILTERS:
/* We just go through the blocked symbols and check if they are ready */
for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
item = g_ptr_array_index (checkpoint->waitq, i);

if (!isset (checkpoint->processed_bits, item->id * 2)) {
if (!rspamd_symbols_cache_check_deps (task, cache, item,
checkpoint, 0, FALSE)) {
break;
}

rspamd_symbols_cache_check_symbol (task, cache, item,
checkpoint);
}
}

if (checkpoint->waitq->len == 0 ||
if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
}
@@ -1879,7 +1809,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}

if (checkpoint->waitq->len == 0 ||
if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}
@@ -2644,6 +2574,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
msg_debug_cache_task ("process finalize for item %s", item->symbol);
setbit (checkpoint->processed_bits, item->id + 1);
checkpoint->items_inflight --;
checkpoint->cur_item = NULL;

#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
event_base_update_cache_time (task->ev_base);

+ 17
- 134
src/lua/lua_config.c View File

@@ -1055,13 +1055,11 @@ struct lua_callback_data {
gint ref;
} callback;
gboolean cb_is_ref;

/* Dynamic data */
gint stack_level;
gint order;
};

struct lua_watcher_data {
struct lua_callback_data *cbd;
gint cb_ref;
struct rspamd_symcache_item *item;
};

/*
@@ -1093,120 +1091,12 @@ rspamd_compare_order_func (gconstpointer a, gconstpointer b)
return cb2->order - cb1->order;
}

static void
lua_watcher_callback (gpointer session_data, gpointer ud)
{
struct rspamd_task *task = session_data, **ptask;
struct lua_watcher_data *wd = ud;
lua_State *L;
gint level, nresults, err_idx, ret;
GString *tb;
struct rspamd_symbol_result *s;

L = wd->cbd->L;
level = lua_gettop (L);
lua_pushcfunction (L, &rspamd_lua_traceback);
err_idx = lua_gettop (L);
static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry,
int ret);

level ++;
lua_rawgeti (L, LUA_REGISTRYINDEX, wd->cb_ref);

ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
rspamd_lua_setclass (L, "rspamd{task}", -1);
*ptask = task;

if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) {
tb = lua_touserdata (L, -1);
msg_err_task ("call to (%s) failed (%d): %v",
wd->cbd->symbol, ret, tb);

if (tb) {
g_string_free (tb, TRUE);
}
}
else {
nresults = lua_gettop (L) - level;

if (nresults >= 1) {
/* Function returned boolean, so maybe we need to insert result? */
gint res = 0;
gint i;
gdouble flag = 1.0;
gint type;
struct lua_watcher_data *nwd;

type = lua_type (L, level + 1);

if (type == LUA_TBOOLEAN) {
res = lua_toboolean (L, level + 1);
}
else if (type == LUA_TFUNCTION) {
/* Function returned a closure that should be watched for */
nwd = rspamd_mempool_alloc (task->task_pool, sizeof (*nwd));
lua_pushvalue (L, level + 1);
nwd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
nwd->cbd = wd->cbd;
rspamd_session_watcher_push_callback (task->s,
rspamd_session_get_watcher (task->s),
lua_watcher_callback, nwd);
/*
* We immediately pop watcher since we have not registered
* any async events from here
*/
rspamd_session_watcher_pop (task->s,
rspamd_session_get_watcher (task->s));
}
else {
res = (gint)lua_tonumber (L, level + 1);
}

if (res) {
gint first_opt = 2;

if (lua_type (L, level + 2) == LUA_TNUMBER) {
flag = lua_tonumber (L, level + 2);
/* Shift opt index */
first_opt = 3;
}
else {
flag = res;
}

s = rspamd_task_insert_result (task,
wd->cbd->symbol, flag, NULL);

if (s) {
guint last_pos = lua_gettop (L);

for (i = level + first_opt; i <= last_pos; i++) {
if (lua_type (L, i) == LUA_TSTRING) {
const char *opt = lua_tostring (L, i);

rspamd_task_add_result_option (task, s, opt);
}
else if (lua_type (L, i) == LUA_TTABLE) {
lua_pushvalue (L, i);

for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
const char *opt = lua_tostring (L, -1);

rspamd_task_add_result_option (task, s, opt);
}

lua_pop (L, 1);
}
}
}
}
}
}

lua_settop (L, err_idx - 1);
}

static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret);

static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg);
static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry,
int ret,
const char *msg);

static void
lua_metric_symbol_callback (struct rspamd_task *task,
@@ -1217,6 +1107,7 @@ lua_metric_symbol_callback (struct rspamd_task *task,
struct rspamd_task **ptask;
struct thread_entry *thread_entry;

rspamd_symcache_item_async_inc (task, item);
thread_entry = lua_thread_pool_get_for_task (task);

g_assert(thread_entry->cd == NULL);
@@ -1224,6 +1115,7 @@ lua_metric_symbol_callback (struct rspamd_task *task,

lua_State *thread = thread_entry->lua_state;
cd->stack_level = lua_gettop (thread);
cd->item = item;

if (cd->cb_is_ref) {
lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1243,11 +1135,15 @@ lua_metric_symbol_callback (struct rspamd_task *task,
}

static void
lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg)
lua_metric_symbol_callback_error (struct thread_entry *thread_entry,
int ret,
const char *msg)
{
struct lua_callback_data *cd = thread_entry->cd;
struct rspamd_task *task = thread_entry->task;
msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg);

rspamd_symcache_item_async_dec_check (task, cd->item);
}

static void
@@ -1270,7 +1166,6 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
gint i;
gdouble flag = 1.0;
gint type;
struct lua_watcher_data *wd;

type = lua_type (L, cd->stack_level + 1);

@@ -1278,20 +1173,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
res = lua_toboolean (L, cd->stack_level + 1);
}
else if (type == LUA_TFUNCTION) {
/* Function returned a closure that should be watched for */
wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
lua_pushvalue (L /*cd->L*/, cd->stack_level + 1);
wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
wd->cbd = cd;
rspamd_session_watcher_push_callback (task->s,
rspamd_session_get_watcher (task->s),
lua_watcher_callback, wd);
/*
* We immediately pop watcher since we have not registered
* any async events from here
*/
rspamd_session_watcher_pop (task->s,
rspamd_session_get_watcher (task->s));
g_assert_not_reached ();
}
else {
res = lua_tonumber (L, cd->stack_level + 1);
@@ -1342,6 +1224,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */

cd->stack_level = 0;
rspamd_symcache_item_async_dec_check (task, cd->item);
}

static gint

Loading…
Cancel
Save