You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_thread_pool.c 8.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /*-
  2. * Copyright 2018 Mikhail Galanin
  3. * Copyright 2019 Vsevolod Stakhov
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #include "config.h"
  18. #include "lua_common.h"
  19. #include "lua_thread_pool.h"
  20. #define msg_debug_lua_threads(...) rspamd_conditional_debug_fast (NULL, NULL, \
  21. rspamd_lua_threads_log_id, "lua_threads", NULL, \
  22. G_STRFUNC, \
  23. __VA_ARGS__)
  24. INIT_LOG_MODULE(lua_threads)
  25. struct lua_thread_pool {
  26. GQueue *available_items;
  27. lua_State *L;
  28. gint max_items;
  29. struct thread_entry *running_entry;
  30. };
  31. static struct thread_entry *
  32. thread_entry_new (lua_State * L)
  33. {
  34. struct thread_entry *ent;
  35. ent = g_new0(struct thread_entry, 1);
  36. ent->lua_state = lua_newthread (L);
  37. ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX);
  38. return ent;
  39. }
  40. static void
  41. thread_entry_free (lua_State * L, struct thread_entry *ent)
  42. {
  43. luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index);
  44. g_free (ent);
  45. }
  46. struct lua_thread_pool *
  47. lua_thread_pool_new (lua_State * L)
  48. {
  49. struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1);
  50. pool->L = L;
  51. pool->max_items = 100;
  52. pool->available_items = g_queue_new ();
  53. int i;
  54. struct thread_entry *ent;
  55. for (i = 0; i < MAX(2, pool->max_items / 10); i ++) {
  56. ent = thread_entry_new (pool->L);
  57. g_queue_push_head (pool->available_items, ent);
  58. }
  59. return pool;
  60. }
  61. void
  62. lua_thread_pool_free (struct lua_thread_pool *pool)
  63. {
  64. struct thread_entry *ent = NULL;
  65. while (!g_queue_is_empty (pool->available_items)) {
  66. ent = g_queue_pop_head (pool->available_items);
  67. thread_entry_free (pool->L, ent);
  68. }
  69. g_queue_free (pool->available_items);
  70. g_free (pool);
  71. }
  72. static struct thread_entry *lua_thread_pool_get (struct lua_thread_pool *pool);
  73. struct thread_entry *
  74. lua_thread_pool_get_for_task (struct rspamd_task *task)
  75. {
  76. struct thread_entry *ent = lua_thread_pool_get (task->cfg->lua_thread_pool);
  77. ent->task = task;
  78. return ent;
  79. }
  80. struct thread_entry *
  81. lua_thread_pool_get_for_config (struct rspamd_config *cfg)
  82. {
  83. struct thread_entry *ent = lua_thread_pool_get (cfg->lua_thread_pool);
  84. ent->cfg = cfg;
  85. return ent;
  86. }
  87. static struct thread_entry *
  88. lua_thread_pool_get (struct lua_thread_pool *pool)
  89. {
  90. gpointer cur;
  91. struct thread_entry *ent = NULL;
  92. cur = g_queue_pop_head (pool->available_items);
  93. if (cur) {
  94. ent = cur;
  95. }
  96. else {
  97. ent = thread_entry_new (pool->L);
  98. }
  99. pool->running_entry = ent;
  100. return ent;
  101. }
  102. void
  103. lua_thread_pool_return_full (struct lua_thread_pool *pool,
  104. struct thread_entry *thread_entry, const gchar *loc)
  105. {
  106. /* we can't return a running/yielded thread into the pool */
  107. g_assert (lua_status (thread_entry->lua_state) == 0);
  108. if (pool->running_entry == thread_entry) {
  109. pool->running_entry = NULL;
  110. }
  111. if (g_queue_get_length (pool->available_items) <= pool->max_items) {
  112. thread_entry->cd = NULL;
  113. thread_entry->finish_callback = NULL;
  114. thread_entry->error_callback = NULL;
  115. thread_entry->task = NULL;
  116. thread_entry->cfg = NULL;
  117. msg_debug_lua_threads ("%s: returned thread to the threads pool %ud items",
  118. loc,
  119. g_queue_get_length (pool->available_items));
  120. g_queue_push_head (pool->available_items, thread_entry);
  121. }
  122. else {
  123. msg_debug_lua_threads ("%s: removed thread as thread pool has %ud items",
  124. loc,
  125. g_queue_get_length (pool->available_items));
  126. thread_entry_free (pool->L, thread_entry);
  127. }
  128. }
  129. static void
  130. lua_thread_pool_terminate_entry (struct lua_thread_pool *pool,
  131. struct thread_entry *thread_entry, const gchar *loc)
  132. {
  133. struct thread_entry *ent = NULL;
  134. /* we should only terminate failed threads */
  135. g_assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD);
  136. if (pool->running_entry == thread_entry) {
  137. pool->running_entry = NULL;
  138. }
  139. msg_debug_lua_threads ("%s: terminated thread entry", loc);
  140. thread_entry_free (pool->L, thread_entry);
  141. if (g_queue_get_length (pool->available_items) <= pool->max_items) {
  142. ent = thread_entry_new (pool->L);
  143. g_queue_push_head (pool->available_items, ent);
  144. }
  145. }
  146. struct thread_entry *
  147. lua_thread_pool_get_running_entry_full (struct lua_thread_pool *pool,
  148. const gchar *loc)
  149. {
  150. msg_debug_lua_threads ("%s: lua_thread_pool_get_running_entry_full", loc);
  151. return pool->running_entry;
  152. }
  153. void
  154. lua_thread_pool_set_running_entry_full (struct lua_thread_pool *pool,
  155. struct thread_entry *thread_entry,
  156. const gchar *loc)
  157. {
  158. msg_debug_lua_threads ("%s: lua_thread_pool_set_running_entry_full", loc);
  159. pool->running_entry = thread_entry;
  160. }
  161. static void
  162. lua_thread_pool_set_running_entry_for_thread (struct thread_entry *thread_entry,
  163. const gchar *loc)
  164. {
  165. struct lua_thread_pool *pool;
  166. if (thread_entry->task) {
  167. pool = thread_entry->task->cfg->lua_thread_pool;
  168. }
  169. else {
  170. pool = thread_entry->cfg->lua_thread_pool;
  171. }
  172. lua_thread_pool_set_running_entry_full (pool, thread_entry, loc);
  173. }
  174. void
  175. lua_thread_pool_prepare_callback_full (struct lua_thread_pool *pool,
  176. struct lua_callback_state *cbs, const gchar *loc)
  177. {
  178. msg_debug_lua_threads ("%s: lua_thread_pool_prepare_callback_full", loc);
  179. cbs->thread_pool = pool;
  180. cbs->previous_thread = lua_thread_pool_get_running_entry_full (pool, loc);
  181. cbs->my_thread = lua_thread_pool_get (pool);
  182. cbs->L = cbs->my_thread->lua_state;
  183. }
  184. void
  185. lua_thread_pool_restore_callback_full (struct lua_callback_state *cbs,
  186. const gchar *loc)
  187. {
  188. lua_thread_pool_return_full (cbs->thread_pool, cbs->my_thread, loc);
  189. lua_thread_pool_set_running_entry_full (cbs->thread_pool,
  190. cbs->previous_thread, loc);
  191. }
  192. static gint
  193. lua_do_resume_full (lua_State *L, gint narg, const gchar *loc)
  194. {
  195. msg_debug_lua_threads ("%s: lua_do_resume_full", loc);
  196. #if LUA_VERSION_NUM < 503
  197. return lua_resume (L, narg);
  198. #else
  199. return lua_resume (L, NULL, narg);
  200. #endif
  201. }
  202. static void lua_resume_thread_internal_full (struct thread_entry *thread_entry,
  203. gint narg, const gchar *loc);
  204. void
  205. lua_thread_call_full (struct thread_entry *thread_entry,
  206. int narg, const gchar *loc)
  207. {
  208. g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
  209. g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
  210. lua_resume_thread_internal_full (thread_entry, narg, loc);
  211. }
  212. void
  213. lua_thread_resume_full (struct thread_entry *thread_entry, gint narg,
  214. const gchar *loc)
  215. {
  216. /*
  217. * The only state where we can resume from is LUA_YIELD
  218. * Another acceptable status is OK (0) but in that case we should push function on stack
  219. * to start the thread from, which is happening in lua_thread_call(), not in this function.
  220. */
  221. g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
  222. msg_debug_lua_threads ("%s: lua_thread_resume_full", loc);
  223. lua_thread_pool_set_running_entry_for_thread (thread_entry, loc);
  224. lua_resume_thread_internal_full (thread_entry, narg, loc);
  225. }
  226. static void
  227. lua_resume_thread_internal_full (struct thread_entry *thread_entry,
  228. gint narg, const gchar *loc)
  229. {
  230. gint ret;
  231. struct lua_thread_pool *pool;
  232. GString *tb;
  233. struct rspamd_task *task;
  234. msg_debug_lua_threads ("%s: lua_resume_thread_internal_full", loc);
  235. ret = lua_do_resume_full (thread_entry->lua_state, narg, loc);
  236. if (ret != LUA_YIELD) {
  237. /*
  238. LUA_YIELD state should not be handled here.
  239. It should only happen when the thread initiated a asynchronous event and it will be restored as soon
  240. the event is finished
  241. */
  242. if (thread_entry->task) {
  243. pool = thread_entry->task->cfg->lua_thread_pool;
  244. }
  245. else {
  246. pool = thread_entry->cfg->lua_thread_pool;
  247. }
  248. if (ret == 0) {
  249. if (thread_entry->finish_callback) {
  250. thread_entry->finish_callback (thread_entry, ret);
  251. }
  252. lua_thread_pool_return_full (pool, thread_entry, loc);
  253. }
  254. else {
  255. tb = rspamd_lua_get_traceback_string (thread_entry->lua_state);
  256. if (tb && thread_entry->error_callback) {
  257. thread_entry->error_callback (thread_entry, ret, tb->str);
  258. }
  259. else if (thread_entry->task) {
  260. task = thread_entry->task;
  261. msg_err_task ("lua call failed (%d): %v", ret, tb);
  262. }
  263. else {
  264. msg_err ("lua call failed (%d): %v", ret, tb);
  265. }
  266. if (tb) {
  267. g_string_free (tb, TRUE);
  268. }
  269. /*
  270. * Maybe there is a way to recover here.
  271. * For now, just remove faulty thread
  272. */
  273. lua_thread_pool_terminate_entry (pool, thread_entry, loc);
  274. }
  275. }
  276. }
  277. gint
  278. lua_thread_yield_full (struct thread_entry *thread_entry,
  279. gint nresults,
  280. const gchar *loc)
  281. {
  282. g_assert (lua_status (thread_entry->lua_state) == 0);
  283. msg_debug_lua_threads ("%s: lua_thread_yield_full", loc);
  284. return lua_yield (thread_entry->lua_state, nresults);
  285. }