]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Move resume/yield methods into appropriate place
authorMikhail Galanin <mgalanin@mimecast.com>
Wed, 22 Aug 2018 14:58:22 +0000 (15:58 +0100)
committerMikhail Galanin <mgalanin@mimecast.com>
Wed, 22 Aug 2018 14:58:22 +0000 (15:58 +0100)
src/lua/lua_common.h
src/lua/lua_config.c
src/lua/lua_dns.c
src/lua/lua_thread_pool.c
src/lua/lua_thread_pool.h

index 0a9527bb0e9ea8bb28e162517d264e7b39831e0d..b84701bc1dbc7fa1525081215adbc22f3114d6e7 100644 (file)
@@ -434,7 +434,7 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults);
  * @param narg
  */
 void
-lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg);
+lua_resume_thread (struct thread_entry *thread_entry, gint narg);
 
 /* Paths defs */
 #define RSPAMD_CONFDIR_INDEX "CONFDIR"
index 6957ded7c9f1b511bf8a499794b18e26e68d4d1c..629653e851b9fed04a93d7140279fd9f78693479 100644 (file)
@@ -1145,7 +1145,7 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
                                                rspamd_session_get_watcher (task->s));
                        }
                        else {
-                               res = lua_tonumber (L, level + 1);
+                               res = (gint)lua_tonumber (L, level + 1);
                        }
 
                        if (res) {
@@ -1192,18 +1192,9 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
        lua_settop (L, err_idx - 1);
 }
 
-gint
-lua_do_resume (lua_State *L, gint narg)
-{
-#if LUA_VERSION_NUM < 503
-       return lua_resume (L, narg);
-#else
-       return lua_resume (L, NULL, narg);
-#endif
-}
+static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret);
 
-static void
-lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret);
+static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg);
 
 static void
 lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
@@ -1211,7 +1202,6 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
        struct lua_callback_data *cd = ud;
        struct rspamd_task **ptask;
        struct thread_entry *thread_entry;
-       gint ret;
 
        thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool);
 
@@ -1232,16 +1222,11 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
        rspamd_lua_setclass (thread, "rspamd{task}", -1);
        *ptask = task;
 
-       ret = lua_do_resume (thread, 1);
+       thread_entry->finish_callback = lua_metric_symbol_callback_return;
+       thread_entry->error_callback = lua_metric_symbol_callback_error;
+       thread_entry->task = task;
 
-       if (ret != LUA_YIELD) {
-               /*
-                LUA_YIELD state should not be handled here.
-                It should only happen when the thread initiated a asynchronous event and it will be restored as soon
-                the event is finished
-                */
-               lua_metric_symbol_callback_return (task, thread_entry, ud, ret);
-       }
+       lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1);
 }
 
 gint
@@ -1252,132 +1237,105 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults)
        return lua_yield (thread_entry->lua_state, nresults);
 }
 
-void
-lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg)
+static void
+lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg)
 {
-    g_assert (thread_entry->cd != NULL);
-
-    /*
-     * The only state where we can resume from is LUA_YIELD
-     * Another acceptable status is OK (0) but in that case we should push function on stack
-     * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
-     */
-    g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
-
-       gint ret;
-
-       lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry);
-       ret = lua_do_resume (thread_entry->lua_state, narg);
-
-       if (ret != LUA_YIELD) {
-               lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret);
-       }
+       struct lua_callback_data *cd = thread_entry->cd;
+       struct rspamd_task *task = thread_entry->task;
+       msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg);
 }
 
 static void
-lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret)
+lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
 {
-       GString *tb;
-       struct lua_callback_data *cd = ud;
+       struct lua_callback_data *cd = thread_entry->cd;
+       struct rspamd_task *task = thread_entry->task;
        int nresults;
        struct rspamd_symbol_result *s;
-       lua_State *thread = thread_entry->lua_state;
 
-       if (ret != 0) {
+       (void)ret;
 
-               tb = rspamd_lua_get_traceback_string (thread);
-               msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb);
+       lua_State *L = thread_entry->lua_state;
 
-               if (tb) {
-                       g_string_free (tb, TRUE);
-               }
-               g_assert (lua_gettop (thread) >= cd->stack_level);
-               /* maybe there is a way to recover here. For now, just remove faulty thread */
-               lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry);
-       }
-       else {
-               nresults = lua_gettop (thread) - cd->stack_level;
+       nresults = lua_gettop (L) - cd->stack_level;
 
-               if (nresults >= 1) {
-                       /* Function returned boolean, so maybe we need to insert result? */
-                       gint res = 0;
-                       gint i;
-                       gdouble flag = 1.0;
-                       gint type;
-                       struct lua_watcher_data *wd;
+       if (nresults >= 1) {
+               /* Function returned boolean, so maybe we need to insert result? */
+               gint res = 0;
+               gint i;
+               gdouble flag = 1.0;
+               gint type;
+               struct lua_watcher_data *wd;
 
-                       type = lua_type (thread, cd->stack_level + 1);
+               type = lua_type (L, cd->stack_level + 1);
 
-                       if (type == LUA_TBOOLEAN) {
-                               res = lua_toboolean (thread, cd->stack_level + 1);
-                       }
-                       else if (type == LUA_TFUNCTION) {
-                               /* Function returned a closure that should be watched for */
-                               wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
-                               lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1);
-                               wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX);
-                               wd->cbd = cd;
-                               rspamd_session_watcher_push_callback (task->s,
-                                               rspamd_session_get_watcher (task->s),
-                                               lua_watcher_callback, wd);
-                               /*
-                                * We immediately pop watcher since we have not registered
-                                * any async events from here
-                                */
-                               rspamd_session_watcher_pop (task->s,
-                                               rspamd_session_get_watcher (task->s));
+               if (type == LUA_TBOOLEAN) {
+                       res = lua_toboolean (L, cd->stack_level + 1);
+               }
+               else if (type == LUA_TFUNCTION) {
+                       /* Function returned a closure that should be watched for */
+                       wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
+                       lua_pushvalue (L /*cd->L*/, cd->stack_level + 1);
+                       wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+                       wd->cbd = cd;
+                       rspamd_session_watcher_push_callback (task->s,
+                                       rspamd_session_get_watcher (task->s),
+                                       lua_watcher_callback, wd);
+                       /*
+                        * We immediately pop watcher since we have not registered
+                        * any async events from here
+                        */
+                       rspamd_session_watcher_pop (task->s,
+                                       rspamd_session_get_watcher (task->s));
+               }
+               else {
+                       res = lua_tonumber (L, cd->stack_level + 1);
+               }
+
+               if (res) {
+                       gint first_opt = 2;
+
+                       if (lua_type (L, cd->stack_level + 2) == LUA_TNUMBER) {
+                               flag = lua_tonumber (L, cd->stack_level + 2);
+                               /* Shift opt index */
+                               first_opt = 3;
                        }
                        else {
-                               res = lua_tonumber (thread, cd->stack_level + 1);
+                               flag = res;
                        }
 
-                       if (res) {
-                               gint first_opt = 2;
+                       s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
 
-                               if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) {
-                                       flag = lua_tonumber (thread, cd->stack_level + 2);
-                                       /* Shift opt index */
-                                       first_opt = 3;
-                               }
-                               else {
-                                       flag = res;
-                               }
+                       if (s) {
+                               guint last_pos = lua_gettop (L);
 
-                               s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
+                               for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
+                                       if (lua_type (L, i) == LUA_TSTRING) {
+                                               const char *opt = lua_tostring (L, i);
 
-                               if (s) {
-                                       guint last_pos = lua_gettop (thread);
+                                               rspamd_task_add_result_option (task, s, opt);
+                                       }
+                                       else if (lua_type (L, i) == LUA_TTABLE) {
+                                               lua_pushvalue (L, i);
 
-                                       for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
-                                               if (lua_type (thread, i) == LUA_TSTRING) {
-                                                       const char *opt = lua_tostring (thread, i);
+                                               for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
+                                                       const char *opt = lua_tostring (L, -1);
 
                                                        rspamd_task_add_result_option (task, s, opt);
                                                }
-                                               else if (lua_type (thread, i) == LUA_TTABLE) {
-                                                       lua_pushvalue (thread, i);
 
-                                                       for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) {
-                                                               const char *opt = lua_tostring (thread, -1);
-
-                                                               rspamd_task_add_result_option (task, s, opt);
-                                                       }
-
-                                                       lua_pop (thread, 1);
-                                               }
+                                               lua_pop (L, 1);
                                        }
                                }
-
                        }
 
-                       lua_pop (thread, nresults);
                }
 
-               g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */
-
-               lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry);
+               lua_pop (L, nresults);
        }
 
+       g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */
+
        cd->stack_level = 0;
 }
 
index 0d12a137cd131bf38a5cf01a27fb8d24d743cd63..805921a03e4855d6d434919f8ba2884975ad6eaa 100644 (file)
@@ -147,7 +147,7 @@ lua_dns_callback (struct rdns_reply *reply, void *arg)
                lua_pushvalue (L, -3);
        }
 
-       lua_resume_thread (cbdata->task, cbdata->thread, 2);
+       lua_resume_thread (cbdata->thread, 2);
 
        if (cbdata->s) {
                rspamd_session_watcher_pop (cbdata->s, cbdata->w);
index 979b31f6b937c6f625375c9cc810fc05d45c7eb2..3525879ddcdb41680abc58dd97208ba83e9a8d3f 100644 (file)
@@ -91,6 +91,10 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
 
        if (g_queue_get_length (pool->available_items) <= pool->max_items) {
                thread_entry->cd = NULL;
+               thread_entry->finish_callback = NULL;
+               thread_entry->task = NULL;
+               thread_entry->cfg = NULL;
+
                g_queue_push_head (pool->available_items, thread_entry);
        }
        else {
@@ -146,3 +150,88 @@ lua_thread_pool_restore_callback (struct lua_callback_state *cbs)
        lua_thread_pool_return (cbs->thread_pool, cbs->my_thread);
        lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread);
 }
+
+static gint
+lua_do_resume (lua_State *L, gint narg)
+{
+#if LUA_VERSION_NUM < 503
+       return lua_resume (L, narg);
+#else
+       return lua_resume (L, NULL, narg);
+#endif
+}
+
+static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);
+
+void
+lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg)
+{
+       g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
+       g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */
+
+       lua_resume_thread_internal (thread_entry, narg);
+}
+
+void
+lua_resume_thread (struct thread_entry *thread_entry, gint narg)
+{
+       /*
+        * The only state where we can resume from is LUA_YIELD
+        * Another acceptable status is OK (0) but in that case we should push function on stack
+        * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
+        */
+       g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
+
+       lua_resume_thread_internal (thread_entry, narg);
+}
+
+static void
+lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg)
+{
+       gint ret;
+       struct lua_thread_pool *pool;
+       GString *tb;
+       struct rspamd_task *task;
+
+       ret = lua_do_resume (thread_entry->lua_state, narg);
+
+       if (ret != LUA_YIELD) {
+               /*
+                LUA_YIELD state should not be handled here.
+                It should only happen when the thread initiated a asynchronous event and it will be restored as soon
+                the event is finished
+                */
+
+               if (thread_entry->task) {
+                       pool = thread_entry->task->cfg->lua_thread_pool;
+               }
+               else {
+                       pool = thread_entry->cfg->lua_thread_pool;
+               }
+               if (ret == 0) {
+                       if (thread_entry->finish_callback) {
+                               thread_entry->finish_callback (thread_entry, ret);
+                       }
+                       lua_thread_pool_return (pool, thread_entry);
+               }
+               else {
+                       tb = rspamd_lua_get_traceback_string (thread_entry->lua_state);
+                       if (thread_entry->error_callback) {
+                               thread_entry->error_callback (thread_entry, ret, tb->str);
+                       }
+                       else if (thread_entry->task) {
+                               task = thread_entry->task;
+                               msg_err_task ("lua call failed (%d): %v", ret, tb);
+                       }
+                       else {
+                               msg_err ("lua call failed (%d): %v", ret, tb);
+                       }
+
+                       if (tb) {
+                               g_string_free (tb, TRUE);
+                       }
+                       /* maybe there is a way to recover here. For now, just remove faulty thread */
+                       lua_thread_pool_terminate_entry (pool, thread_entry);
+               }
+       }
+}
\ No newline at end of file
index b72b72e8d43cc92572ef79aed6605e6f0e3db16c..e5b2f287333bdbed30524e0d9dbb52609af3f9f4 100644 (file)
@@ -3,13 +3,25 @@
 
 #include <lua.h>
 
+struct thread_entry;
+struct lua_thread_pool;
+
+typedef void (*lua_thread_finish_t) (struct thread_entry *thread, int ret);
+typedef void (*lua_thread_error_t) (struct thread_entry *thread, int ret, const char *msg);
+
 struct thread_entry {
        lua_State *lua_state;
        gint thread_index;
        gpointer cd;
-};
 
-struct thread_pool;
+       /* function to handle result of called method, can be NULL */
+       lua_thread_finish_t finish_callback;
+
+       /* function to log result, i.e. if you want to modify error logging message or somehow process this state, can be NUL */
+       lua_thread_error_t error_callback;
+       struct rspamd_task *task;
+       struct rspamd_config *cfg;
+};
 
 struct lua_callback_state {
        lua_State *L;
@@ -102,5 +114,9 @@ lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callb
 void
 lua_thread_pool_restore_callback (struct lua_callback_state *cbs);
 
+
+void
+lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg);
+
 #endif /* LUA_THREAD_POOL_H_ */