]> source.dussan.org Git - rspamd.git/commitdiff
Implement listening for srv pipe in the main process
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Nov 2015 15:16:04 +0000 (15:16 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Nov 2015 15:16:04 +0000 (15:16 +0000)
src/libserver/rspamd_control.c
src/libserver/worker_util.c
src/libserver/worker_util.h
src/rspamd.c

index 685ffabf8b22642c5035c57710b3eab9f9946b42..10d853e9dcdf4fad2b84d2e911087130fcc1c325 100644 (file)
@@ -532,6 +532,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id);
                                if (spair == NULL) {
                                        spair = g_malloc (sizeof (gint) * 2);
+
                                        if (rspamd_socketpair (spair) == -1) {
                                                rdata->rep.reply.spair.code = errno;
                                                msg_err ("cannot create socket pair: %s", strerror (errno));
index 4b24ee377010bca86e81d327a6c9c2aa6617f057..b8b014a90c58744e27de295b15a51f5b5a7aeff2 100644 (file)
@@ -436,33 +436,34 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
 struct rspamd_worker *
 rspamd_fork_worker (struct rspamd_main *rspamd_main,
                struct rspamd_worker_conf *cf,
-               guint index)
+               guint index,
+               struct event_base *ev_base)
 {
-       struct rspamd_worker *cur;
+       struct rspamd_worker *wrk;
        gint rc;
        /* Starting worker process */
-       cur = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
+       wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
 
-       if (!rspamd_socketpair (cur->control_pipe)) {
+       if (!rspamd_socketpair (wrk->control_pipe)) {
                msg_err ("socketpair failure: %s", strerror (errno));
                exit (-errno);
        }
 
-       if (!rspamd_socketpair (cur->srv_pipe)) {
+       if (!rspamd_socketpair (wrk->srv_pipe)) {
                msg_err ("socketpair failure: %s", strerror (errno));
                exit (-errno);
        }
 
-       cur->srv = rspamd_main;
-       cur->type = cf->type;
-       cur->cf = g_malloc (sizeof (struct rspamd_worker_conf));
-       memcpy (cur->cf, cf, sizeof (struct rspamd_worker_conf));
-       cur->index = index;
-       cur->ctx = cf->ctx;
+       wrk->srv = rspamd_main;
+       wrk->type = cf->type;
+       wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf));
+       memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf));
+       wrk->index = index;
+       wrk->ctx = cf->ctx;
 
-       cur->pid = fork ();
+       wrk->pid = fork ();
 
-       switch (cur->pid) {
+       switch (wrk->pid) {
        case 0:
                /* Update pid for logging */
                rspamd_log_update_pid (cf->type, rspamd_main->logger);
@@ -484,7 +485,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
                /* Do silent log reopen to avoid collisions */
                rspamd_log_close (rspamd_main->logger);
                rspamd_log_open (rspamd_main->logger);
-               cur->start_time = rspamd_get_calendar_ticks ();
+               wrk->start_time = rspamd_get_calendar_ticks ();
 
 #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
 # if (GLIB_MINOR_VERSION > 20)
@@ -498,10 +499,13 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
 #endif
                msg_info_main ("starting %s process %P", cf->worker->name, getpid ());
                /* Close parent part of socketpair */
-               close (cur->control_pipe[0]);
-               rspamd_socket_nonblocking (cur->control_pipe[1]);
+               close (wrk->control_pipe[0]);
+               close (wrk->srv_pipe[0]);
+               rspamd_socket_nonblocking (wrk->control_pipe[1]);
+               rspamd_socket_nonblocking (wrk->srv_pipe[1]);
                /* Execute worker */
-               cf->worker->worker_start_func (cur);
+               cf->worker->worker_start_func (wrk);
+               exit (EXIT_FAILURE);
                break;
        case -1:
                msg_err_main ("cannot fork main process. %s", strerror (errno));
@@ -510,15 +514,18 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
                break;
        default:
                /* Close worker part of socketpair */
-               close (cur->control_pipe[1]);
-               rspamd_socket_nonblocking (cur->control_pipe[0]);
+               close (wrk->control_pipe[1]);
+               close (wrk->srv_pipe[1]);
+               rspamd_socket_nonblocking (wrk->control_pipe[0]);
+               rspamd_socket_nonblocking (wrk->srv_pipe[0]);
+               rspamd_srv_start_watching (wrk, ev_base);
                /* Insert worker into worker's table, pid is index */
                g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
-                               cur->pid), cur);
+                               wrk->pid), wrk);
                break;
        }
 
-       return cur;
+       return wrk;
 }
 
 void
index 837e6ac331f0636564f5f2f88b3e8e59c88039ef..276fb64cea157425cd164475af262d12cfcbd4a4 100644 (file)
@@ -133,7 +133,7 @@ void rspamd_worker_block_signals (void);
  * Fork new worker with the specified configuration
  */
 struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
-               struct rspamd_worker_conf *, guint);
+               struct rspamd_worker_conf *, guint idx, struct event_base *ev_base);
 
 #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
         rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \
index c24dcc08a0e8e1c90220d6172ce0a9e61a5f4938..0fe5a76bc208512546225be0958d61bda7204737 100644 (file)
@@ -313,7 +313,8 @@ rspamd_fork_delayed_cb (gint signo, short what, gpointer arg)
        struct waiting_worker *w = arg;
 
        event_del (&w->wait_ev);
-       rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex);
+       rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex,
+                       w->rspamd_main->ev_base);
        g_slice_free1 (sizeof (*w), w);
 }
 
@@ -431,7 +432,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf)
 }
 
 static void
-spawn_workers (struct rspamd_main *rspamd_main)
+spawn_workers (struct rspamd_main *rspamd_main, struct event_base *ev_base)
 {
        GList *cur, *ls;
        struct rspamd_worker_conf *cf;
@@ -493,14 +494,14 @@ spawn_workers (struct rspamd_main *rspamd_main)
                                                msg_warn_main ("cannot spawn more than 1 %s worker, so spawn one",
                                                                cf->worker->name);
                                        }
-                                       rspamd_fork_worker (rspamd_main, cf, 0);
+                                       rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
                                }
                                else if (cf->worker->threaded) {
-                                       rspamd_fork_worker (rspamd_main, cf, 0);
+                                       rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
                                }
                                else {
                                        for (i = 0; i < cf->count; i++) {
-                                               rspamd_fork_worker (rspamd_main, cf, i);
+                                               rspamd_fork_worker (rspamd_main, cf, i, ev_base);
                                        }
                                }
                        }
@@ -553,6 +554,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
        msg_info_main ("%s process %P terminated %s", g_quark_to_string (
                        w->type), w->pid,
                        WTERMSIG (res) == SIGKILL ? "hardly" : "softly");
+       event_del (&w->srv_ev);
        g_free (w->cf);
        g_free (w);
 
@@ -664,7 +666,7 @@ rspamd_hup_handler (gint signo, short what, gpointer arg)
        g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
        rspamd_map_remove_all (rspamd_main->cfg);
        reread_config (rspamd_main);
-       spawn_workers (rspamd_main);
+       spawn_workers (rspamd_main, rspamd_main->ev_base);
 }
 
 static void
@@ -731,6 +733,8 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
                        rspamd_fork_delayed (cur->cf, cur->index, rspamd_main);
                }
 
+               event_del (&cur->srv_ev);
+               g_free (cur->cf);
                g_free (cur);
        }
        else {
@@ -783,6 +787,28 @@ rspamd_control_handler (gint fd, short what, gpointer arg)
        rspamd_control_process_client_socket (rspamd_main, nfd);
 }
 
+static guint
+rspamd_spair_hash (gconstpointer p)
+{
+       return XXH64 (p, PAIR_ID_LEN, rspamd_hash_seed ());
+}
+
+static gboolean
+rspamd_spair_equal (gconstpointer a, gconstpointer b)
+{
+       return memcmp (a, b, PAIR_ID_LEN) == 0;
+}
+
+static void
+rspamd_spair_close (gpointer p)
+{
+       gint *fds = p;
+
+       close (fds[0]);
+       close (fds[1]);
+       g_free (p);
+}
+
 gint
 main (gint argc, gchar **argv, gchar **env)
 {
@@ -806,6 +832,8 @@ main (gint argc, gchar **argv, gchar **env)
        rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool,
                        sizeof (struct rspamd_stat));
        rspamd_main->cfg = rspamd_config_new ();
+       rspamd_main->spairs = g_hash_table_new_full (rspamd_spair_hash,
+                       rspamd_spair_equal, g_free, rspamd_spair_close);
 
 #ifndef HAVE_SETPROCTITLE
        init_title (argc, argv, env);
@@ -1009,7 +1037,6 @@ main (gint argc, gchar **argv, gchar **env)
 #endif
        /* Spawn workers */
        rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
-       spawn_workers (rspamd_main);
 
        /* Init event base */
        ev_base = event_init ();
@@ -1035,6 +1062,8 @@ main (gint argc, gchar **argv, gchar **env)
        event_base_set (ev_base, &usr1_ev);
        event_add (&usr1_ev, NULL);
 
+       spawn_workers (rspamd_main, ev_base);
+
        if (control_fd != -1) {
                msg_info_main ("listening for control commands on %s",
                                rspamd_inet_address_to_string (control_addr));
@@ -1073,6 +1102,7 @@ main (gint argc, gchar **argv, gchar **env)
 
        /* Wait for workers termination */
        g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+       g_hash_table_unref (rspamd_main->spairs);
 
        event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST,
                        rspamd_final_term_handler, rspamd_main);