From: Vsevolod Stakhov Date: Wed, 25 Nov 2015 15:16:04 +0000 (+0000) Subject: Implement listening for srv pipe in the main process X-Git-Tag: 1.1.0~482 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=e0b4ba6307bcb324e380b4420f1a49bcc1ef764f;p=rspamd.git Implement listening for srv pipe in the main process --- diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 685ffabf8..10d853e9d 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -532,6 +532,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id); if (spair == NULL) { spair = g_malloc (sizeof (gint) * 2); + if (rspamd_socketpair (spair) == -1) { rdata->rep.reply.spair.code = errno; msg_err ("cannot create socket pair: %s", strerror (errno)); diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 4b24ee377..b8b014a90 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -436,33 +436,34 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main, struct rspamd_worker * rspamd_fork_worker (struct rspamd_main *rspamd_main, struct rspamd_worker_conf *cf, - guint index) + guint index, + struct event_base *ev_base) { - struct rspamd_worker *cur; + struct rspamd_worker *wrk; gint rc; /* Starting worker process */ - cur = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker)); + wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker)); - if (!rspamd_socketpair (cur->control_pipe)) { + if (!rspamd_socketpair (wrk->control_pipe)) { msg_err ("socketpair failure: %s", strerror (errno)); exit (-errno); } - if (!rspamd_socketpair (cur->srv_pipe)) { + if (!rspamd_socketpair (wrk->srv_pipe)) { msg_err ("socketpair failure: %s", strerror (errno)); exit (-errno); } - cur->srv = rspamd_main; - cur->type = cf->type; - cur->cf = g_malloc (sizeof (struct rspamd_worker_conf)); - memcpy (cur->cf, cf, sizeof (struct rspamd_worker_conf)); - cur->index = index; - cur->ctx = cf->ctx; + wrk->srv = rspamd_main; + wrk->type = cf->type; + wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf)); + memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf)); + wrk->index = index; + wrk->ctx = cf->ctx; - cur->pid = fork (); + wrk->pid = fork (); - switch (cur->pid) { + switch (wrk->pid) { case 0: /* Update pid for logging */ rspamd_log_update_pid (cf->type, rspamd_main->logger); @@ -484,7 +485,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, /* Do silent log reopen to avoid collisions */ rspamd_log_close (rspamd_main->logger); rspamd_log_open (rspamd_main->logger); - cur->start_time = rspamd_get_calendar_ticks (); + wrk->start_time = rspamd_get_calendar_ticks (); #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) # if (GLIB_MINOR_VERSION > 20) @@ -498,10 +499,13 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, #endif msg_info_main ("starting %s process %P", cf->worker->name, getpid ()); /* Close parent part of socketpair */ - close (cur->control_pipe[0]); - rspamd_socket_nonblocking (cur->control_pipe[1]); + close (wrk->control_pipe[0]); + close (wrk->srv_pipe[0]); + rspamd_socket_nonblocking (wrk->control_pipe[1]); + rspamd_socket_nonblocking (wrk->srv_pipe[1]); /* Execute worker */ - cf->worker->worker_start_func (cur); + cf->worker->worker_start_func (wrk); + exit (EXIT_FAILURE); break; case -1: msg_err_main ("cannot fork main process. %s", strerror (errno)); @@ -510,15 +514,18 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, break; default: /* Close worker part of socketpair */ - close (cur->control_pipe[1]); - rspamd_socket_nonblocking (cur->control_pipe[0]); + close (wrk->control_pipe[1]); + close (wrk->srv_pipe[1]); + rspamd_socket_nonblocking (wrk->control_pipe[0]); + rspamd_socket_nonblocking (wrk->srv_pipe[0]); + rspamd_srv_start_watching (wrk, ev_base); /* Insert worker into worker's table, pid is index */ g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER ( - cur->pid), cur); + wrk->pid), wrk); break; } - return cur; + return wrk; } void diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 837e6ac33..276fb64ce 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -133,7 +133,7 @@ void rspamd_worker_block_signals (void); * Fork new worker with the specified configuration */ struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *, - struct rspamd_worker_conf *, guint); + struct rspamd_worker_conf *, guint idx, struct event_base *ev_base); #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \ diff --git a/src/rspamd.c b/src/rspamd.c index c24dcc08a..0fe5a76bc 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -313,7 +313,8 @@ rspamd_fork_delayed_cb (gint signo, short what, gpointer arg) struct waiting_worker *w = arg; event_del (&w->wait_ev); - rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex); + rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex, + w->rspamd_main->ev_base); g_slice_free1 (sizeof (*w), w); } @@ -431,7 +432,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf) } static void -spawn_workers (struct rspamd_main *rspamd_main) +spawn_workers (struct rspamd_main *rspamd_main, struct event_base *ev_base) { GList *cur, *ls; struct rspamd_worker_conf *cf; @@ -493,14 +494,14 @@ spawn_workers (struct rspamd_main *rspamd_main) msg_warn_main ("cannot spawn more than 1 %s worker, so spawn one", cf->worker->name); } - rspamd_fork_worker (rspamd_main, cf, 0); + rspamd_fork_worker (rspamd_main, cf, 0, ev_base); } else if (cf->worker->threaded) { - rspamd_fork_worker (rspamd_main, cf, 0); + rspamd_fork_worker (rspamd_main, cf, 0, ev_base); } else { for (i = 0; i < cf->count; i++) { - rspamd_fork_worker (rspamd_main, cf, i); + rspamd_fork_worker (rspamd_main, cf, i, ev_base); } } } @@ -553,6 +554,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) msg_info_main ("%s process %P terminated %s", g_quark_to_string ( w->type), w->pid, WTERMSIG (res) == SIGKILL ? "hardly" : "softly"); + event_del (&w->srv_ev); g_free (w->cf); g_free (w); @@ -664,7 +666,7 @@ rspamd_hup_handler (gint signo, short what, gpointer arg) g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL); rspamd_map_remove_all (rspamd_main->cfg); reread_config (rspamd_main); - spawn_workers (rspamd_main); + spawn_workers (rspamd_main, rspamd_main->ev_base); } static void @@ -731,6 +733,8 @@ rspamd_cld_handler (gint signo, short what, gpointer arg) rspamd_fork_delayed (cur->cf, cur->index, rspamd_main); } + event_del (&cur->srv_ev); + g_free (cur->cf); g_free (cur); } else { @@ -783,6 +787,28 @@ rspamd_control_handler (gint fd, short what, gpointer arg) rspamd_control_process_client_socket (rspamd_main, nfd); } +static guint +rspamd_spair_hash (gconstpointer p) +{ + return XXH64 (p, PAIR_ID_LEN, rspamd_hash_seed ()); +} + +static gboolean +rspamd_spair_equal (gconstpointer a, gconstpointer b) +{ + return memcmp (a, b, PAIR_ID_LEN) == 0; +} + +static void +rspamd_spair_close (gpointer p) +{ + gint *fds = p; + + close (fds[0]); + close (fds[1]); + g_free (p); +} + gint main (gint argc, gchar **argv, gchar **env) { @@ -806,6 +832,8 @@ main (gint argc, gchar **argv, gchar **env) rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool, sizeof (struct rspamd_stat)); rspamd_main->cfg = rspamd_config_new (); + rspamd_main->spairs = g_hash_table_new_full (rspamd_spair_hash, + rspamd_spair_equal, g_free, rspamd_spair_close); #ifndef HAVE_SETPROCTITLE init_title (argc, argv, env); @@ -1009,7 +1037,6 @@ main (gint argc, gchar **argv, gchar **env) #endif /* Spawn workers */ rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal); - spawn_workers (rspamd_main); /* Init event base */ ev_base = event_init (); @@ -1035,6 +1062,8 @@ main (gint argc, gchar **argv, gchar **env) event_base_set (ev_base, &usr1_ev); event_add (&usr1_ev, NULL); + spawn_workers (rspamd_main, ev_base); + if (control_fd != -1) { msg_info_main ("listening for control commands on %s", rspamd_inet_address_to_string (control_addr)); @@ -1073,6 +1102,7 @@ main (gint argc, gchar **argv, gchar **env) /* Wait for workers termination */ g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL); + g_hash_table_unref (rspamd_main->spairs); event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST, rspamd_final_term_handler, rspamd_main);