aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libserver/rspamd_control.c1
-rw-r--r--src/libserver/worker_util.c49
-rw-r--r--src/libserver/worker_util.h2
-rw-r--r--src/rspamd.c44
4 files changed, 67 insertions, 29 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 685ffabf8..10d853e9d 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -532,6 +532,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id);
if (spair == NULL) {
spair = g_malloc (sizeof (gint) * 2);
+
if (rspamd_socketpair (spair) == -1) {
rdata->rep.reply.spair.code = errno;
msg_err ("cannot create socket pair: %s", strerror (errno));
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 4b24ee377..b8b014a90 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -436,33 +436,34 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
struct rspamd_worker *
rspamd_fork_worker (struct rspamd_main *rspamd_main,
struct rspamd_worker_conf *cf,
- guint index)
+ guint index,
+ struct event_base *ev_base)
{
- struct rspamd_worker *cur;
+ struct rspamd_worker *wrk;
gint rc;
/* Starting worker process */
- cur = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
+ wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
- if (!rspamd_socketpair (cur->control_pipe)) {
+ if (!rspamd_socketpair (wrk->control_pipe)) {
msg_err ("socketpair failure: %s", strerror (errno));
exit (-errno);
}
- if (!rspamd_socketpair (cur->srv_pipe)) {
+ if (!rspamd_socketpair (wrk->srv_pipe)) {
msg_err ("socketpair failure: %s", strerror (errno));
exit (-errno);
}
- cur->srv = rspamd_main;
- cur->type = cf->type;
- cur->cf = g_malloc (sizeof (struct rspamd_worker_conf));
- memcpy (cur->cf, cf, sizeof (struct rspamd_worker_conf));
- cur->index = index;
- cur->ctx = cf->ctx;
+ wrk->srv = rspamd_main;
+ wrk->type = cf->type;
+ wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf));
+ memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf));
+ wrk->index = index;
+ wrk->ctx = cf->ctx;
- cur->pid = fork ();
+ wrk->pid = fork ();
- switch (cur->pid) {
+ switch (wrk->pid) {
case 0:
/* Update pid for logging */
rspamd_log_update_pid (cf->type, rspamd_main->logger);
@@ -484,7 +485,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
/* Do silent log reopen to avoid collisions */
rspamd_log_close (rspamd_main->logger);
rspamd_log_open (rspamd_main->logger);
- cur->start_time = rspamd_get_calendar_ticks ();
+ wrk->start_time = rspamd_get_calendar_ticks ();
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
# if (GLIB_MINOR_VERSION > 20)
@@ -498,10 +499,13 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
#endif
msg_info_main ("starting %s process %P", cf->worker->name, getpid ());
/* Close parent part of socketpair */
- close (cur->control_pipe[0]);
- rspamd_socket_nonblocking (cur->control_pipe[1]);
+ close (wrk->control_pipe[0]);
+ close (wrk->srv_pipe[0]);
+ rspamd_socket_nonblocking (wrk->control_pipe[1]);
+ rspamd_socket_nonblocking (wrk->srv_pipe[1]);
/* Execute worker */
- cf->worker->worker_start_func (cur);
+ cf->worker->worker_start_func (wrk);
+ exit (EXIT_FAILURE);
break;
case -1:
msg_err_main ("cannot fork main process. %s", strerror (errno));
@@ -510,15 +514,18 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
break;
default:
/* Close worker part of socketpair */
- close (cur->control_pipe[1]);
- rspamd_socket_nonblocking (cur->control_pipe[0]);
+ close (wrk->control_pipe[1]);
+ close (wrk->srv_pipe[1]);
+ rspamd_socket_nonblocking (wrk->control_pipe[0]);
+ rspamd_socket_nonblocking (wrk->srv_pipe[0]);
+ rspamd_srv_start_watching (wrk, ev_base);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
- cur->pid), cur);
+ wrk->pid), wrk);
break;
}
- return cur;
+ return wrk;
}
void
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 837e6ac33..276fb64ce 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -133,7 +133,7 @@ void rspamd_worker_block_signals (void);
* Fork new worker with the specified configuration
*/
struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
- struct rspamd_worker_conf *, guint);
+ struct rspamd_worker_conf *, guint idx, struct event_base *ev_base);
#define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \
diff --git a/src/rspamd.c b/src/rspamd.c
index c24dcc08a..0fe5a76bc 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -313,7 +313,8 @@ rspamd_fork_delayed_cb (gint signo, short what, gpointer arg)
struct waiting_worker *w = arg;
event_del (&w->wait_ev);
- rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex);
+ rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex,
+ w->rspamd_main->ev_base);
g_slice_free1 (sizeof (*w), w);
}
@@ -431,7 +432,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf)
}
static void
-spawn_workers (struct rspamd_main *rspamd_main)
+spawn_workers (struct rspamd_main *rspamd_main, struct event_base *ev_base)
{
GList *cur, *ls;
struct rspamd_worker_conf *cf;
@@ -493,14 +494,14 @@ spawn_workers (struct rspamd_main *rspamd_main)
msg_warn_main ("cannot spawn more than 1 %s worker, so spawn one",
cf->worker->name);
}
- rspamd_fork_worker (rspamd_main, cf, 0);
+ rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
}
else if (cf->worker->threaded) {
- rspamd_fork_worker (rspamd_main, cf, 0);
+ rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
}
else {
for (i = 0; i < cf->count; i++) {
- rspamd_fork_worker (rspamd_main, cf, i);
+ rspamd_fork_worker (rspamd_main, cf, i, ev_base);
}
}
}
@@ -553,6 +554,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
msg_info_main ("%s process %P terminated %s", g_quark_to_string (
w->type), w->pid,
WTERMSIG (res) == SIGKILL ? "hardly" : "softly");
+ event_del (&w->srv_ev);
g_free (w->cf);
g_free (w);
@@ -664,7 +666,7 @@ rspamd_hup_handler (gint signo, short what, gpointer arg)
g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
rspamd_map_remove_all (rspamd_main->cfg);
reread_config (rspamd_main);
- spawn_workers (rspamd_main);
+ spawn_workers (rspamd_main, rspamd_main->ev_base);
}
static void
@@ -731,6 +733,8 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
rspamd_fork_delayed (cur->cf, cur->index, rspamd_main);
}
+ event_del (&cur->srv_ev);
+ g_free (cur->cf);
g_free (cur);
}
else {
@@ -783,6 +787,28 @@ rspamd_control_handler (gint fd, short what, gpointer arg)
rspamd_control_process_client_socket (rspamd_main, nfd);
}
+static guint
+rspamd_spair_hash (gconstpointer p)
+{
+ return XXH64 (p, PAIR_ID_LEN, rspamd_hash_seed ());
+}
+
+static gboolean
+rspamd_spair_equal (gconstpointer a, gconstpointer b)
+{
+ return memcmp (a, b, PAIR_ID_LEN) == 0;
+}
+
+static void
+rspamd_spair_close (gpointer p)
+{
+ gint *fds = p;
+
+ close (fds[0]);
+ close (fds[1]);
+ g_free (p);
+}
+
gint
main (gint argc, gchar **argv, gchar **env)
{
@@ -806,6 +832,8 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool,
sizeof (struct rspamd_stat));
rspamd_main->cfg = rspamd_config_new ();
+ rspamd_main->spairs = g_hash_table_new_full (rspamd_spair_hash,
+ rspamd_spair_equal, g_free, rspamd_spair_close);
#ifndef HAVE_SETPROCTITLE
init_title (argc, argv, env);
@@ -1009,7 +1037,6 @@ main (gint argc, gchar **argv, gchar **env)
#endif
/* Spawn workers */
rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
- spawn_workers (rspamd_main);
/* Init event base */
ev_base = event_init ();
@@ -1035,6 +1062,8 @@ main (gint argc, gchar **argv, gchar **env)
event_base_set (ev_base, &usr1_ev);
event_add (&usr1_ev, NULL);
+ spawn_workers (rspamd_main, ev_base);
+
if (control_fd != -1) {
msg_info_main ("listening for control commands on %s",
rspamd_inet_address_to_string (control_addr));
@@ -1073,6 +1102,7 @@ main (gint argc, gchar **argv, gchar **env)
/* Wait for workers termination */
g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+ g_hash_table_unref (rspamd_main->spairs);
event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST,
rspamd_final_term_handler, rspamd_main);