Kaynağa Gözat

[Feature] Implement finish scripts for worker processes

tags/1.4.0
Vsevolod Stakhov 7 yıl önce
ebeveyn
işleme
23f831e0c5
5 değiştirilmiş dosya ile 128 ekleme ve 4 silme
  1. 1
    0
      src/libserver/cfg_file.h
  2. 11
    0
      src/libserver/cfg_utils.c
  3. 10
    0
      src/lua/lua_common.h
  4. 60
    1
      src/lua/lua_config.c
  5. 46
    3
      src/worker.c

+ 1
- 0
src/libserver/cfg_file.h Dosyayı Görüntüle

@@ -400,6 +400,7 @@ struct rspamd_config {
struct worker_s **compiled_workers; /**< list of compiled C modules */
GList *dynamic_modules; /**< list of dynamic C modules */
GList *dynamic_workers; /**< list of dynamic C modules */
struct rspamd_config_post_load_script *finish_callbacks; /**< list of callbacks called on worker's termination */
struct rspamd_log_format *log_format; /**< parsed log format */
gchar *log_format_str; /**< raw log format string */


+ 11
- 0
src/libserver/cfg_utils.c Dosyayı Görüntüle

@@ -175,6 +175,7 @@ rspamd_config_free (struct rspamd_config *cfg)
{
struct rspamd_dynamic_module *dyn_mod;
struct rspamd_dynamic_worker *dyn_wrk;
struct rspamd_config_post_load_script *sc, *sctmp;
GList *cur;

rspamd_map_remove_all (cfg);
@@ -221,6 +222,16 @@ rspamd_config_free (struct rspamd_config *cfg)
cur = g_list_next (cur);
}

DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) {
luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
g_slice_free1 (sizeof (*sc), sc);
}

DL_FOREACH_SAFE (cfg->on_load, sc, sctmp) {
luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
g_slice_free1 (sizeof (*sc), sc);
}

g_list_free (cfg->classifiers);
g_list_free (cfg->metrics_list);
rspamd_symbols_cache_destroy (cfg->cache);

+ 10
- 0
src/lua/lua_common.h Dosyayı Görüntüle

@@ -354,5 +354,15 @@ gsize lua_logger_out_type (lua_State *L, gint pos, gchar *outbuf,
*/
void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname);

/**
* Call finishing script with the specified task
* @param L
* @param sc
* @param task
*/
void lua_call_finish_script (lua_State *L, struct
rspamd_config_post_load_script *sc,
struct rspamd_task *task);

#endif /* WITH_LUA */
#endif /* RSPAMD_LUA_H */

+ 60
- 1
src/lua/lua_config.c Dosyayı Görüntüle

@@ -442,6 +442,15 @@ LUA_FUNCTION_DEF (config, get_symbol_callback);
*/
LUA_FUNCTION_DEF (config, set_symbol_callback);

/***
* @method register_finish_script(callback)
* Adds new callback that is called on worker process termination when all
* tasks pending are processed
*
* @param callback {function} a fucntion with one argument (rspamd_task)
*/
LUA_FUNCTION_DEF (config, register_finish_script);

static const struct luaL_reg configlib_m[] = {
LUA_INTERFACE_DEF (config, get_module_opt),
LUA_INTERFACE_DEF (config, get_mempool),
@@ -474,6 +483,7 @@ static const struct luaL_reg configlib_m[] = {
LUA_INTERFACE_DEF (config, get_symbols_count),
LUA_INTERFACE_DEF (config, get_symbol_callback),
LUA_INTERFACE_DEF (config, set_symbol_callback),
LUA_INTERFACE_DEF (config, register_finish_script),
{"__tostring", rspamd_lua_class_tostring},
{"__newindex", lua_config_newindex},
{NULL, NULL}
@@ -1766,7 +1776,7 @@ lua_config_add_on_load (lua_State *L)
return luaL_error (L, "invalid arguments");
}

sc = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*sc));
sc = g_slice_alloc0 (sizeof (*sc));
lua_pushvalue (L, 2);
sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
DL_APPEND (cfg->on_load, sc);
@@ -1860,6 +1870,26 @@ lua_config_set_symbol_callback (lua_State *L)
return 1;
}

static gint
lua_config_register_finish_script (lua_State *L)
{
struct rspamd_config *cfg = lua_check_config (L, 1);
struct rspamd_config_post_load_script *sc;

if (cfg != NULL && lua_type (L, 2) == LUA_TFUNCTION) {
sc = g_slice_alloc0 (sizeof (*sc));
lua_pushvalue (L, 2);
sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
DL_APPEND (cfg->finish_callbacks, sc);
}
else {
return luaL_error (L, "invalid arguments");
}

return 0;
}


void
luaopen_config (lua_State * L)
{
@@ -1867,3 +1897,32 @@ luaopen_config (lua_State * L)

lua_pop (L, 1);
}

void
lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc,
struct rspamd_task *task)
{
struct rspamd_task **ptask;
gint err_idx;
GString *tb;

lua_pushcfunction (L, &rspamd_lua_traceback);
err_idx = lua_gettop (L);

lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref);

ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
rspamd_lua_setclass (L, "rspamd{task}", -1);
*ptask = task;

if (lua_pcall (L, 1, 0, err_idx) != 0) {
tb = lua_touserdata (L, -1);
msg_err_task ("call to finishing script failed: %v", tb);
g_string_free (tb, TRUE);
lua_pop (L, 1);
}

lua_pop (L, 1); /* Error function */

return;
}

+ 46
- 3
src/worker.c Dosyayı Görüntüle

@@ -72,15 +72,47 @@ worker_t normal_worker = {
G_STRFUNC, \
__VA_ARGS__)

static void
rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
{
struct rspamd_task *task;
struct rspamd_config *cfg = worker->srv->cfg;
struct rspamd_worker_ctx *ctx;
struct rspamd_config_post_load_script *sc;

if (cfg->finish_callbacks) {
ctx = worker->ctx;
/* Create a fake task object for async events */
task = rspamd_task_new (worker, cfg);
task->resolver = ctx->resolver;
task->ev_base = ctx->ev_base;
task->s = rspamd_session_create (task->task_pool,
NULL,
NULL,
(event_finalizer_t) rspamd_task_free,
task);

DL_FOREACH (cfg->finish_callbacks, sc) {
lua_call_finish_script (cfg->lua_state, sc, task);
}
}

}

/*
* Reduce number of tasks proceeded
*/
static void
reduce_tasks_count (gpointer arg)
{
guint *nconns = arg;
struct rspamd_worker *worker = arg;

(*nconns)--;
worker->nconns --;

if (worker->wanna_die && worker->nconns == 0) {
msg_info ("performing finishing actions");
rspamd_worker_call_finish_handlers (worker);
}
}

static void
@@ -344,7 +376,7 @@ accept_socket (gint fd, short what, void *arg)
task->ev_base = ctx->ev_base;
worker->nconns++;
rspamd_mempool_add_destructor (task->task_pool,
(rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns);
(rspamd_mempool_destruct_t)reduce_tasks_count, worker);

/* Set up async session */
task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
@@ -525,6 +557,15 @@ init_worker (struct rspamd_config *cfg)
return ctx;
}

static void
rspamd_worker_on_terminate (struct rspamd_worker *worker)
{
if (worker->nconns == 0) {
msg_info ("performing finishing actions");
rspamd_worker_call_finish_handlers (worker);
}
}

/*
* Start worker process
*/
@@ -549,6 +590,8 @@ start_worker (struct rspamd_worker *worker)
/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
g_ptr_array_add (worker->finish_actions,
(gpointer) rspamd_worker_on_terminate);

#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,

Loading…
İptal
Kaydet