Browse Source

[Minor] call periodic, finish and on_load via coroutine

tags/1.8.0
Mikhail Galanin 5 years ago
parent
commit
3c0b613742
4 changed files with 77 additions and 64 deletions
  1. 23
    22
      src/lua/lua_common.c
  2. 2
    4
      src/lua/lua_common.h
  3. 51
    37
      src/lua/lua_config.c
  4. 1
    1
      src/worker.c

+ 23
- 22
src/lua/lua_common.c View File

@@ -20,6 +20,7 @@
#include "worker_util.h"
#include "ottery.h"
#include "rspamd_control.h"
#include "lua_thread_pool.h"
#include <math.h>
#include <sys/wait.h>
#include <src/libserver/rspamd_control.h>
@@ -1641,7 +1642,9 @@ lua_check_ev_base (lua_State * L, gint pos)
return ud ? *((struct event_base **)ud) : NULL;
}

gboolean
static void rspamd_lua_run_postloads_error (struct thread_entry *thread, int ret, const char *msg);

void
rspamd_lua_run_postloads (lua_State *L, struct rspamd_config *cfg,
struct event_base *ev_base, struct rspamd_worker *w)
{
@@ -1649,42 +1652,40 @@ rspamd_lua_run_postloads (lua_State *L, struct rspamd_config *cfg,
struct rspamd_config **pcfg;
struct event_base **pev_base;
struct rspamd_worker **pw;
gint err_idx;
GString *tb;

/* Execute post load scripts */
LL_FOREACH (cfg->on_load, sc) {
lua_pushcfunction (L, &rspamd_lua_traceback);
err_idx = lua_gettop (L);
struct thread_entry *thread = lua_thread_pool_get_for_config (cfg);
thread->error_callback = rspamd_lua_run_postloads_error;
L = thread->lua_state;

lua_rawgeti (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
pcfg = lua_newuserdata (cfg->lua_state, sizeof (*pcfg));
lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref);
pcfg = lua_newuserdata (L, sizeof (*pcfg));
*pcfg = cfg;
rspamd_lua_setclass (cfg->lua_state, "rspamd{config}", -1);
rspamd_lua_setclass (L, "rspamd{config}", -1);

pev_base = lua_newuserdata (cfg->lua_state, sizeof (*pev_base));
pev_base = lua_newuserdata (L, sizeof (*pev_base));
*pev_base = ev_base;
rspamd_lua_setclass (cfg->lua_state, "rspamd{ev_base}", -1);
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);

pw = lua_newuserdata (cfg->lua_state, sizeof (*pw));
pw = lua_newuserdata (L, sizeof (*pw));
*pw = w;
rspamd_lua_setclass (cfg->lua_state, "rspamd{worker}", -1);
rspamd_lua_setclass (L, "rspamd{worker}", -1);

if (lua_pcall (cfg->lua_state, 3, 0, err_idx) != 0) {
tb = lua_touserdata (L, -1);
msg_err_config ("error executing post load code: %v", tb);
g_string_free (tb, TRUE);
lua_settop (L, err_idx - 1);
lua_thread_call (thread, 3);
}
}

return FALSE;
}

lua_settop (L, err_idx - 1);
}
static void
rspamd_lua_run_postloads_error (struct thread_entry *thread, int ret, const char *msg)
{
struct rspamd_config *cfg = thread->cfg;

return TRUE;
msg_err_config ("error executing post load code: %s", msg);
}


static struct rspamd_worker *
lua_check_worker (lua_State *L, gint pos)
{

+ 2
- 4
src/lua/lua_common.h View File

@@ -381,12 +381,10 @@ void *rspamd_lua_check_udata_maybe (lua_State *L, gint pos, const gchar *classna

/**
* Call finishing script with the specified task
* @param L
* @param sc
* @param task
*/
void lua_call_finish_script (lua_State *L, struct
rspamd_config_post_load_script *sc,
void lua_call_finish_script (struct rspamd_config_post_load_script *sc,
struct rspamd_task *task);

/**
@@ -395,7 +393,7 @@ void lua_call_finish_script (lua_State *L, struct
* @param cfg
* @param ev_base
*/
gboolean rspamd_lua_run_postloads (lua_State *L, struct rspamd_config *cfg,
void rspamd_lua_run_postloads (lua_State *L, struct rspamd_config *cfg,
struct event_base *ev_base, struct rspamd_worker *w);

/**

+ 51
- 37
src/lua/lua_config.c View File

@@ -2810,6 +2810,9 @@ lua_config_add_on_load (lua_State *L)
return 0;
}

static void lua_periodic_callback_finish (struct thread_entry *thread, int ret);
static void lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *msg);

struct rspamd_lua_periodic {
struct event_base *ev_base;
struct rspamd_config *cfg;
@@ -2823,19 +2826,18 @@ struct rspamd_lua_periodic {
static void
lua_periodic_callback (gint unused_fd, short what, gpointer ud)
{
gdouble timeout;
struct timeval tv;
struct rspamd_lua_periodic *periodic = ud;
struct rspamd_config **pcfg, *cfg;
struct event_base **pev_base;
struct thread_entry *thread;
lua_State *L;
gint err_idx;
gboolean plan_more = FALSE;
GString *tb;

L = periodic->L;
lua_pushcfunction (L, &rspamd_lua_traceback);
err_idx = lua_gettop (L);
thread = lua_thread_pool_get_for_config (periodic->cfg);
thread->cd = periodic;
thread->finish_callback = lua_periodic_callback_finish;
thread->error_callback = lua_periodic_callback_error;

L = thread->lua_state;

lua_rawgeti (L, LUA_REGISTRYINDEX, periodic->cbref);
pcfg = lua_newuserdata (L, sizeof (*pcfg));
@@ -2846,13 +2848,23 @@ lua_periodic_callback (gint unused_fd, short what, gpointer ud)
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
*pev_base = periodic->ev_base;

if (lua_pcall (L, 2, 1, err_idx) != 0) {
tb = lua_touserdata (L, -1);
msg_err_config ("call to finishing script failed: %v", tb);
g_string_free (tb, TRUE);
lua_pop (L, 2);
}
else {
event_del (&periodic->ev);

lua_thread_call (thread, 2);
}

static void
lua_periodic_callback_finish (struct thread_entry *thread, int ret)
{
lua_State *L;
struct rspamd_lua_periodic *periodic = thread->cd;
gboolean plan_more = FALSE;
struct timeval tv;
gdouble timeout = 0.0;

L = thread->lua_state;

if (ret == 0) {
if (lua_type (L, -1) == LUA_TBOOLEAN) {
plan_more = lua_toboolean (L, -1);
timeout = periodic->timeout;
@@ -2862,11 +2874,8 @@ lua_periodic_callback (gint unused_fd, short what, gpointer ud)
plan_more = timeout > 0 ? TRUE : FALSE;
}

lua_pop (L, 2); /* Return value + error function */
lua_pop (L, 1); /* Return value + error function */
}

event_del (&periodic->ev);

if (plan_more) {
if (periodic->need_jitter) {
timeout = rspamd_time_jitter (timeout, 0.0);
@@ -2881,6 +2890,19 @@ lua_periodic_callback (gint unused_fd, short what, gpointer ud)
}
}

static void
lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *msg)
{
struct rspamd_config *cfg;
struct rspamd_lua_periodic *periodic = thread->cd;
cfg = periodic->cfg;

msg_err_config ("call to finishing script failed: %s", msg);

lua_periodic_callback_finish(thread, ret);
}


static gint
lua_config_add_periodic (lua_State *L)
{
@@ -3634,30 +3656,22 @@ luaopen_config (lua_State * L)
}

void
lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc,
struct rspamd_task *task)
{
lua_call_finish_script (struct rspamd_config_post_load_script *sc,
struct rspamd_task *task) {
struct rspamd_task **ptask;
gint err_idx;
GString *tb;
struct thread_entry *thread;

lua_pushcfunction (L, &rspamd_lua_traceback);
err_idx = lua_gettop (L);
thread = lua_thread_pool_get_for_task (task);
thread->task = task;

lua_State *L = thread->lua_state;

lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref);

ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
rspamd_lua_setclass (L, "rspamd{task}", -1);
rspamd_lua_setclass (L, "rspamd{task}", - 1);
*ptask = task;

if (lua_pcall (L, 1, 0, err_idx) != 0) {
tb = lua_touserdata (L, -1);
msg_err_task ("call to finishing script failed: %v", tb);
g_string_free (tb, TRUE);
lua_pop (L, 1);
}

lua_pop (L, 1); /* Error function */

return;
lua_thread_call (thread, 1);
}

+ 1
- 1
src/worker.c View File

@@ -108,7 +108,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
task);

DL_FOREACH (cfg->finish_callbacks, sc) {
lua_call_finish_script (cfg->lua_state, sc, task);
lua_call_finish_script (sc, task);
}

task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;

Loading…
Cancel
Save