aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-08-25 12:42:13 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-08-25 12:42:13 +0100
commit23f831e0c58959e22c24ecd2bbd21a8a91183823 (patch)
treeb0238dbea53bead8715a833203b24d788219e81c /src/worker.c
parent6dd5ad342c865f057573e1510df113650bcfe570 (diff)
downloadrspamd-23f831e0c58959e22c24ecd2bbd21a8a91183823.tar.gz
rspamd-23f831e0c58959e22c24ecd2bbd21a8a91183823.zip
[Feature] Implement finish scripts for worker processes
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c49
1 files changed, 46 insertions, 3 deletions
diff --git a/src/worker.c b/src/worker.c
index 60c39b2af..097f7c2e1 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -72,15 +72,47 @@ worker_t normal_worker = {
G_STRFUNC, \
__VA_ARGS__)
+static void
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+ struct rspamd_task *task;
+ struct rspamd_config *cfg = worker->srv->cfg;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_config_post_load_script *sc;
+
+ if (cfg->finish_callbacks) {
+ ctx = worker->ctx;
+ /* Create a fake task object for async events */
+ task = rspamd_task_new (worker, cfg);
+ task->resolver = ctx->resolver;
+ task->ev_base = ctx->ev_base;
+ task->s = rspamd_session_create (task->task_pool,
+ NULL,
+ NULL,
+ (event_finalizer_t) rspamd_task_free,
+ task);
+
+ DL_FOREACH (cfg->finish_callbacks, sc) {
+ lua_call_finish_script (cfg->lua_state, sc, task);
+ }
+ }
+
+}
+
/*
* Reduce number of tasks proceeded
*/
static void
reduce_tasks_count (gpointer arg)
{
- guint *nconns = arg;
+ struct rspamd_worker *worker = arg;
- (*nconns)--;
+ worker->nconns --;
+
+ if (worker->wanna_die && worker->nconns == 0) {
+ msg_info ("performing finishing actions");
+ rspamd_worker_call_finish_handlers (worker);
+ }
}
static void
@@ -344,7 +376,7 @@ accept_socket (gint fd, short what, void *arg)
task->ev_base = ctx->ev_base;
worker->nconns++;
rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns);
+ (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
/* Set up async session */
task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
@@ -525,6 +557,15 @@ init_worker (struct rspamd_config *cfg)
return ctx;
}
+static void
+rspamd_worker_on_terminate (struct rspamd_worker *worker)
+{
+ if (worker->nconns == 0) {
+ msg_info ("performing finishing actions");
+ rspamd_worker_call_finish_handlers (worker);
+ }
+}
+
/*
* Start worker process
*/
@@ -549,6 +590,8 @@ start_worker (struct rspamd_worker *worker)
/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
+ g_ptr_array_add (worker->finish_actions,
+ (gpointer) rspamd_worker_on_terminate);
#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,