|
|
@@ -1,8 +1,32 @@ |
|
|
|
/*- |
|
|
|
* Copyright 2018 Mikhail Galanin |
|
|
|
* Copyright 2019 Vsevolod Stakhov |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* You may obtain a copy of the License at |
|
|
|
* |
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
* |
|
|
|
* Unless required by applicable law or agreed to in writing, software |
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
|
|
* See the License for the specific language governing permissions and |
|
|
|
* limitations under the License. |
|
|
|
*/ |
|
|
|
|
|
|
|
#include "config.h" |
|
|
|
|
|
|
|
#include "lua_common.h" |
|
|
|
#include "lua_thread_pool.h" |
|
|
|
|
|
|
|
#define msg_debug_lua_threads(...) rspamd_conditional_debug_fast (NULL, NULL, \ |
|
|
|
rspamd_lua_threads_log_id, "lua_threads", NULL, \ |
|
|
|
G_STRFUNC, \ |
|
|
|
__VA_ARGS__) |
|
|
|
|
|
|
|
INIT_LOG_MODULE(lua_threads) |
|
|
|
|
|
|
|
struct lua_thread_pool { |
|
|
|
GQueue *available_items; |
|
|
|
lua_State *L; |
|
|
@@ -60,8 +84,6 @@ lua_thread_pool_free (struct lua_thread_pool *pool) |
|
|
|
g_free (pool); |
|
|
|
} |
|
|
|
|
|
|
|
struct thread_entry *lua_thread_pool_get_for_config (struct rspamd_config *cfg); |
|
|
|
|
|
|
|
static struct thread_entry *lua_thread_pool_get (struct lua_thread_pool *pool); |
|
|
|
|
|
|
|
struct thread_entry * |
|
|
@@ -104,9 +126,11 @@ lua_thread_pool_get (struct lua_thread_pool *pool) |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *thread_entry) |
|
|
|
lua_thread_pool_return_full (struct lua_thread_pool *pool, |
|
|
|
struct thread_entry *thread_entry, const gchar *loc) |
|
|
|
{ |
|
|
|
g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't return a running/yielded thread into the pool */ |
|
|
|
/* we can't return a running/yielded thread into the pool */ |
|
|
|
g_assert (lua_status (thread_entry->lua_state) == 0); |
|
|
|
|
|
|
|
if (pool->running_entry == thread_entry) { |
|
|
|
pool->running_entry = NULL; |
|
|
@@ -119,15 +143,23 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa |
|
|
|
thread_entry->task = NULL; |
|
|
|
thread_entry->cfg = NULL; |
|
|
|
|
|
|
|
msg_debug_lua_threads ("%s: returned thread to the threads pool %ud items", |
|
|
|
loc, |
|
|
|
g_queue_get_length (pool->available_items)); |
|
|
|
|
|
|
|
g_queue_push_head (pool->available_items, thread_entry); |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_debug_lua_threads ("%s: removed thread as thread pool has %ud items", |
|
|
|
loc, |
|
|
|
g_queue_get_length (pool->available_items)); |
|
|
|
thread_entry_free (pool->L, thread_entry); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry) |
|
|
|
lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, |
|
|
|
struct thread_entry *thread_entry, const gchar *loc) |
|
|
|
{ |
|
|
|
struct thread_entry *ent = NULL; |
|
|
|
|
|
|
@@ -138,6 +170,7 @@ lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_ent |
|
|
|
pool->running_entry = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug_lua_threads ("%s: terminated thread entry", loc); |
|
|
|
thread_entry_free (pool->L, thread_entry); |
|
|
|
|
|
|
|
if (g_queue_get_length (pool->available_items) <= pool->max_items) { |
|
|
@@ -147,19 +180,25 @@ lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_ent |
|
|
|
} |
|
|
|
|
|
|
|
struct thread_entry * |
|
|
|
lua_thread_pool_get_running_entry (struct lua_thread_pool *pool) |
|
|
|
lua_thread_pool_get_running_entry_full (struct lua_thread_pool *pool, |
|
|
|
const gchar *loc) |
|
|
|
{ |
|
|
|
msg_debug_lua_threads ("%s: lua_thread_pool_get_running_entry_full", loc); |
|
|
|
return pool->running_entry; |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
lua_thread_pool_set_running_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry) |
|
|
|
lua_thread_pool_set_running_entry_full (struct lua_thread_pool *pool, |
|
|
|
struct thread_entry *thread_entry, |
|
|
|
const gchar *loc) |
|
|
|
{ |
|
|
|
msg_debug_lua_threads ("%s: lua_thread_pool_set_running_entry_full", loc); |
|
|
|
pool->running_entry = thread_entry; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
lua_thread_pool_set_running_entry_for_thread (struct thread_entry *thread_entry) |
|
|
|
lua_thread_pool_set_running_entry_for_thread (struct thread_entry *thread_entry, |
|
|
|
const gchar *loc) |
|
|
|
{ |
|
|
|
struct lua_thread_pool *pool; |
|
|
|
|
|
|
@@ -170,28 +209,33 @@ lua_thread_pool_set_running_entry_for_thread (struct thread_entry *thread_entry) |
|
|
|
pool = thread_entry->cfg->lua_thread_pool; |
|
|
|
} |
|
|
|
|
|
|
|
lua_thread_pool_set_running_entry (pool, thread_entry); |
|
|
|
lua_thread_pool_set_running_entry_full (pool, thread_entry, loc); |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callback_state *cbs) |
|
|
|
lua_thread_pool_prepare_callback_full (struct lua_thread_pool *pool, |
|
|
|
struct lua_callback_state *cbs, const gchar *loc) |
|
|
|
{ |
|
|
|
msg_debug_lua_threads ("%s: lua_thread_pool_prepare_callback_full", loc); |
|
|
|
cbs->thread_pool = pool; |
|
|
|
cbs->previous_thread = lua_thread_pool_get_running_entry (pool); |
|
|
|
cbs->previous_thread = lua_thread_pool_get_running_entry_full (pool, loc); |
|
|
|
cbs->my_thread = lua_thread_pool_get (pool); |
|
|
|
cbs->L = cbs->my_thread->lua_state; |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
lua_thread_pool_restore_callback (struct lua_callback_state *cbs) |
|
|
|
lua_thread_pool_restore_callback_full (struct lua_callback_state *cbs, |
|
|
|
const gchar *loc) |
|
|
|
{ |
|
|
|
lua_thread_pool_return (cbs->thread_pool, cbs->my_thread); |
|
|
|
lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread); |
|
|
|
lua_thread_pool_return_full (cbs->thread_pool, cbs->my_thread, loc); |
|
|
|
lua_thread_pool_set_running_entry_full (cbs->thread_pool, |
|
|
|
cbs->previous_thread, loc); |
|
|
|
} |
|
|
|
|
|
|
|
static gint |
|
|
|
lua_do_resume (lua_State *L, gint narg) |
|
|
|
lua_do_resume_full (lua_State *L, gint narg, const gchar *loc) |
|
|
|
{ |
|
|
|
msg_debug_lua_threads ("%s: lua_do_resume_full", loc); |
|
|
|
#if LUA_VERSION_NUM < 503 |
|
|
|
return lua_resume (L, narg); |
|
|
|
#else |
|
|
@@ -199,19 +243,22 @@ lua_do_resume (lua_State *L, gint narg) |
|
|
|
#endif |
|
|
|
} |
|
|
|
|
|
|
|
static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg); |
|
|
|
static void lua_resume_thread_internal_full (struct thread_entry *thread_entry, |
|
|
|
gint narg, const gchar *loc); |
|
|
|
|
|
|
|
void |
|
|
|
lua_thread_call (struct thread_entry *thread_entry, int narg) |
|
|
|
lua_thread_call_full (struct thread_entry *thread_entry, |
|
|
|
int narg, const gchar *loc) |
|
|
|
{ |
|
|
|
g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ |
|
|
|
g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */ |
|
|
|
|
|
|
|
lua_resume_thread_internal (thread_entry, narg); |
|
|
|
lua_resume_thread_internal_full (thread_entry, narg, loc); |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
lua_thread_resume (struct thread_entry *thread_entry, gint narg) |
|
|
|
lua_thread_resume_full (struct thread_entry *thread_entry, gint narg, |
|
|
|
const gchar *loc) |
|
|
|
{ |
|
|
|
/* |
|
|
|
* The only state where we can resume from is LUA_YIELD |
|
|
@@ -219,21 +266,23 @@ lua_thread_resume (struct thread_entry *thread_entry, gint narg) |
|
|
|
* to start the thread from, which is happening in lua_thread_call(), not in this function. |
|
|
|
*/ |
|
|
|
g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); |
|
|
|
msg_debug_lua_threads ("%s: lua_thread_resume_full", loc); |
|
|
|
lua_thread_pool_set_running_entry_for_thread (thread_entry, loc); |
|
|
|
|
|
|
|
lua_thread_pool_set_running_entry_for_thread(thread_entry); |
|
|
|
|
|
|
|
lua_resume_thread_internal (thread_entry, narg); |
|
|
|
lua_resume_thread_internal_full (thread_entry, narg, loc); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg) |
|
|
|
lua_resume_thread_internal_full (struct thread_entry *thread_entry, |
|
|
|
gint narg, const gchar *loc) |
|
|
|
{ |
|
|
|
gint ret; |
|
|
|
struct lua_thread_pool *pool; |
|
|
|
GString *tb; |
|
|
|
struct rspamd_task *task; |
|
|
|
|
|
|
|
ret = lua_do_resume (thread_entry->lua_state, narg); |
|
|
|
msg_debug_lua_threads ("%s: lua_resume_thread_internal_full", loc); |
|
|
|
ret = lua_do_resume_full (thread_entry->lua_state, narg, loc); |
|
|
|
|
|
|
|
if (ret != LUA_YIELD) { |
|
|
|
/* |
|
|
@@ -252,7 +301,7 @@ lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg) |
|
|
|
if (thread_entry->finish_callback) { |
|
|
|
thread_entry->finish_callback (thread_entry, ret); |
|
|
|
} |
|
|
|
lua_thread_pool_return (pool, thread_entry); |
|
|
|
lua_thread_pool_return_full (pool, thread_entry, loc); |
|
|
|
} |
|
|
|
else { |
|
|
|
tb = rspamd_lua_get_traceback_string (thread_entry->lua_state); |
|
|
@@ -270,16 +319,22 @@ lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg) |
|
|
|
if (tb) { |
|
|
|
g_string_free (tb, TRUE); |
|
|
|
} |
|
|
|
/* maybe there is a way to recover here. For now, just remove faulty thread */ |
|
|
|
lua_thread_pool_terminate_entry (pool, thread_entry); |
|
|
|
/* |
|
|
|
* Maybe there is a way to recover here. |
|
|
|
* For now, just remove faulty thread |
|
|
|
*/ |
|
|
|
lua_thread_pool_terminate_entry (pool, thread_entry, loc); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
gint |
|
|
|
lua_thread_yield (struct thread_entry *thread_entry, gint nresults) |
|
|
|
lua_thread_yield_full (struct thread_entry *thread_entry, |
|
|
|
gint nresults, |
|
|
|
const gchar *loc) |
|
|
|
{ |
|
|
|
g_assert (lua_status (thread_entry->lua_state) == 0); |
|
|
|
|
|
|
|
msg_debug_lua_threads ("%s: lua_thread_yield_full", loc); |
|
|
|
return lua_yield (thread_entry->lua_state, nresults); |
|
|
|
} |