aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt4
-rw-r--r--config.h.in1
-rw-r--r--src/cfg_file.h2
-rw-r--r--src/controller.c22
-rw-r--r--src/fuzzy_storage.c35
-rw-r--r--src/kvstorage_server.c3
-rw-r--r--src/lua_worker.c20
-rw-r--r--src/main.c39
-rw-r--r--src/main.h27
-rw-r--r--src/smtp.c18
-rw-r--r--src/smtp_proxy.c18
-rw-r--r--src/util.c135
-rw-r--r--src/util.h13
-rw-r--r--src/webui.c12
-rw-r--r--src/worker.c26
-rw-r--r--src/worker_util.c63
16 files changed, 292 insertions, 146 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a06264774..c520889ef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1018,8 +1018,6 @@ INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/src" "${CMAKE_BINARY_DIR}/src")
SET(RSPAMDSRC src/modules.c
src/controller.c
src/fuzzy_storage.c
- src/kvstorage_server.c
- src/lmtp.c
src/lua_worker.c
src/main.c
src/map.c
@@ -1036,7 +1034,7 @@ SET(PLUGINSSRC src/plugins/surbl.c
src/plugins/dkim_check.c)
SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage lua webui)
+SET(WORKERS_LIST normal controller smtp smtp_proxy fuzzy lua webui)
AddModules(MODULES_LIST WORKERS_LIST)
diff --git a/config.h.in b/config.h.in
index e930e520f..69df33469 100644
--- a/config.h.in
+++ b/config.h.in
@@ -467,6 +467,7 @@ typedef struct worker_s {
gboolean unique;
gboolean threaded;
gboolean killable;
+ gint listen_type;
} worker_t;
extern module_t *modules[];
diff --git a/src/cfg_file.h b/src/cfg_file.h
index 3b7059786..829b4b453 100644
--- a/src/cfg_file.h
+++ b/src/cfg_file.h
@@ -257,7 +257,7 @@ struct worker_conf {
guint16 bind_port; /**< bind port in case of TCP socket */
guint16 bind_family; /**< bind type (AF_UNIX or AF_INET) */
guint16 count; /**< number of workers */
- gint listen_sock; /**< listening socket desctiptor */
+ GList *listen_socks; /**< listening sockets desctiptors */
guint32 rlimit_nofile; /**< max files limit */
guint32 rlimit_maxcore; /**< maximum core file size */
GHashTable *params; /**< params for worker */
diff --git a/src/controller.c b/src/controller.c
index 45c026425..b712895d2 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -60,7 +60,8 @@ worker_t controller_worker = {
TRUE, /* Has socket */
FALSE, /* Non unique */
FALSE, /* Non threaded */
- TRUE /* Killable */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
enum command_type {
@@ -171,7 +172,7 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
+ worker_stop_accept (worker);
msg_info ("controller's shutdown is pending in %d sec", 2);
event_loopexit (&tv);
return;
@@ -1886,22 +1887,15 @@ init_controller (void)
void
start_controller (struct rspamd_worker *worker)
{
- struct sigaction signals;
gchar *hostbuf;
gsize hostmax;
- struct rspamd_controller_ctx *ctx;
+ struct rspamd_controller_ctx *ctx = worker->ctx;
GError *err = NULL;
struct timeval tv;
- worker->srv->pid = getpid ();
- ctx = worker->ctx;
-
- ctx->ev_base = event_init ();
+ ctx->ev_base = prepare_worker (worker, "controller", sig_handler, accept_socket);
g_mime_init (0);
- init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
@@ -1949,16 +1943,10 @@ start_controller (struct rspamd_worker *worker)
gethostname (hostbuf, hostmax);
hostbuf[hostmax - 1] = '\0';
rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
- /* Accept event */
- event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
- event_base_set (ctx->ev_base, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
start_map_watch (worker->srv->cfg, ctx->ev_base);
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
- gperf_profiler_init (worker->srv->cfg, "controller");
-
event_base_loop (ctx->ev_base, 0);
close_log (worker->srv->logger);
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index c5ceb1b1c..7bd950014 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -71,10 +71,11 @@ worker_t fuzzy_worker = {
"fuzzy", /* Name */
init_fuzzy, /* Init function */
start_fuzzy, /* Start function */
- FALSE, /* No socket */
+ TRUE, /* No socket */
TRUE, /* Unique */
TRUE, /* Threaded */
- FALSE /* Non killable */
+ FALSE, /* Non killable */
+ SOCK_DGRAM /* UDP socket */
};
static GQueue *hashes[BUCKETS];
@@ -364,8 +365,6 @@ sigterm_handler (gint fd, short what, void *arg)
ctx = worker->ctx;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
- close (worker->cf->listen_sock);
rspamd_mutex_lock (ctx->update_mtx);
mods = ctx->max_mods + 1;
@@ -392,8 +391,7 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
- close (worker->cf->listen_sock);
+ worker_stop_accept (worker);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
rspamd_mutex_lock (ctx->update_mtx);
mods = ctx->max_mods + 1;
@@ -1025,21 +1023,14 @@ init_fuzzy (void)
void
start_fuzzy (struct rspamd_worker *worker)
{
- struct sigaction signals;
struct event sev;
- gint retries = 0;
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
GError *err = NULL;
- worker->srv->pid = getpid ();
-
- ctx->ev_base = event_init ();
+ ctx->ev_base = prepare_worker (worker, "controller", sig_handler, accept_fuzzy_socket);
server_stat = worker->srv->stat;
- init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
@@ -1054,16 +1045,6 @@ start_fuzzy (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &sev);
signal_add (&sev, NULL);
- /* Listen event */
- while ((worker->cf->listen_sock =
- make_universal_socket (worker->cf->bind_addr, worker->cf->bind_port, SOCK_DGRAM, TRUE, TRUE, FALSE)) == -1) {
- sleep (1);
- if (++retries > MAX_RETRIES) {
- msg_err ("cannot bind to socket, exiting");
- exit (0);
- }
- }
-
/* Init bloom filter */
bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
/* Try to read hashes from file */
@@ -1078,10 +1059,6 @@ start_fuzzy (struct rspamd_worker *worker)
tmv.tv_usec = 0;
evtimer_add (&tev, &tmv);
- event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
- event_base_set (ctx->ev_base, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
-
/* Create radix tree */
if (ctx->update_map != NULL) {
if (!add_map (worker->srv->cfg, ctx->update_map, "Allow fuzzy updates from specified addresses",
@@ -1095,8 +1072,6 @@ start_fuzzy (struct rspamd_worker *worker)
/* Maps events */
start_map_watch (worker->srv->cfg, ctx->ev_base);
- gperf_profiler_init (worker->srv->cfg, "fuzzy");
-
ctx->update_thread = rspamd_create_thread ("fuzzy update", sync_cache, worker, &err);
if (ctx->update_thread == NULL) {
msg_err ("error creating update thread: %s", err->message);
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 92090d4f6..b493eee46 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -74,7 +74,8 @@ worker_t keystorage_worker = {
TRUE, /* Has socket */
FALSE, /* Non unique */
TRUE, /* Non threaded */
- FALSE /* Non killable */
+ FALSE, /* Non killable */
+ SOCK_STREAM /* TCP socket */
};
#ifndef HAVE_SA_SIGINFO
diff --git a/src/lua_worker.c b/src/lua_worker.c
index 0cadb7eb2..6262af85b 100644
--- a/src/lua_worker.c
+++ b/src/lua_worker.c
@@ -53,7 +53,8 @@ worker_t lua_worker = {
TRUE, /* Has socket */
FALSE, /* Non unique */
FALSE, /* Non threaded */
- TRUE /* Killable */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
/*
@@ -302,7 +303,7 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
+ worker_stop_accept (worker);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
}
@@ -439,7 +440,6 @@ init_lua_worker (void)
void
start_lua_worker (struct rspamd_worker *worker)
{
- struct sigaction signals;
struct rspamd_lua_worker_ctx *ctx = worker->ctx, **pctx;
lua_State *L;
@@ -448,18 +448,12 @@ start_lua_worker (struct rspamd_worker *worker)
monstartup ((u_long) & _start, (u_long) & etext);
#endif
- gperf_profiler_init (worker->srv->cfg, "lua_worker");
-
- worker->srv->pid = getpid ();
-
- ctx->ev_base = event_init ();
+ ctx->ev_base = prepare_worker (worker, "lua_worker", sig_handler, lua_accept_socket);
L = worker->srv->cfg->lua_state;
ctx->L = L;
ctx->cfg = worker->srv->cfg;
- init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
@@ -471,12 +465,6 @@ start_lua_worker (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
- /* Accept event */
- event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
- lua_accept_socket, (void *) worker);
- event_base_set (ctx->ev_base, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
-
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
/* Open worker's lib */
diff --git a/src/main.c b/src/main.c
index 9b5a12108..607685720 100644
--- a/src/main.c
+++ b/src/main.c
@@ -550,20 +550,26 @@ dump_cfg_vars (struct config_file *cfg)
g_hash_table_foreach (cfg->variables, dump_all_variables, NULL);
}
-static gint
-create_listen_socket (const gchar *addr, gint port, gint family)
+static GList *
+create_listen_socket (const gchar *addr, gint port, gint family, gint listen_type)
{
gint listen_sock = -1;
- /* Create listen socket */
- listen_sock = make_universal_socket (addr, port, SOCK_STREAM, TRUE, TRUE, TRUE);
-
- if (listen_sock != -1) {
- if (listen (listen_sock, -1) == -1) {
- msg_err ("cannot listen on socket. %s", strerror (errno));
+ GList *result, *cur;
+ /* Create listen sockets */
+ result = make_universal_sockets_list (addr, port, listen_type, TRUE, TRUE, TRUE);
+
+ cur = result;
+ while (cur != NULL) {
+ listen_sock = GPOINTER_TO_INT (cur->data);
+ if (listen_sock != -1) {
+ if (listen (listen_sock, -1) == -1) {
+ msg_err ("cannot listen on socket. %s", strerror (errno));
+ }
}
+ cur = g_list_next (cur);
}
- return listen_sock;
+ return result;
}
static void
@@ -596,9 +602,9 @@ make_listen_key (const gchar *addr, gint port, gint family)
static void
spawn_workers (struct rspamd_main *rspamd)
{
- GList *cur;
+ GList *cur, *ls;
struct worker_conf *cf;
- gint i, listen_sock;
+ gint i;
gpointer p;
cur = rspamd->cfg->workers;
@@ -614,19 +620,20 @@ spawn_workers (struct rspamd_main *rspamd)
if ((p = g_hash_table_lookup (listen_sockets, GINT_TO_POINTER (
make_listen_key (cf->bind_addr, cf->bind_port, cf->bind_family)))) == NULL) {
/* Create listen socket */
- listen_sock = create_listen_socket (cf->bind_addr, cf->bind_port, cf->bind_family);
- if (listen_sock == -1) {
+ ls = create_listen_socket (cf->bind_addr, cf->bind_port, cf->bind_family,
+ cf->worker->listen_type);
+ if (ls == NULL) {
exit (-errno);
}
g_hash_table_insert (listen_sockets, GINT_TO_POINTER (
make_listen_key (cf->bind_addr, cf->bind_port, cf->bind_family)),
- GINT_TO_POINTER (listen_sock));
+ ls);
}
else {
/* We had socket for this type of worker */
- listen_sock = GPOINTER_TO_INT (p);
+ ls = p;
}
- cf->listen_sock = listen_sock;
+ cf->listen_socks = ls;
}
if (cf->worker->unique) {
diff --git a/src/main.h b/src/main.h
index 521e0740a..052c6ce6d 100644
--- a/src/main.h
+++ b/src/main.h
@@ -53,7 +53,7 @@ struct rspamd_worker {
GQuark type; /**< process type */
struct event sig_ev_usr1; /**< signals event */
struct event sig_ev_usr2; /**< signals event */
- struct event bind_ev; /**< socket events */
+ GList *accept_events; /**< socket events */
struct worker_conf *cf; /**< worker config data */
gpointer ctx; /**< worker's specific data */
};
@@ -321,6 +321,31 @@ void free_task_soft (gpointer ud);
*/
double set_counter (const gchar *name, guint32 value);
+#ifndef HAVE_SA_SIGINFO
+typedef void (*rspamd_sig_handler_t) (gint);
+#else
+typedef void (*rspamd_sig_handler_t) (gint, siginfo_t *, void *);
+#endif
+
+/**
+ * Prepare worker's startup
+ * @param worker worker structure
+ * @param name name of the worker
+ * @param sig_handler handler of main signals
+ * @param accept_handler handler of accept event for listen sockets
+ * @return event base suitable for a worker
+ */
+struct event_base *
+prepare_worker (struct rspamd_worker *worker, const char *name,
+ rspamd_sig_handler_t sig_handler,
+ void (*accept_handler)(evutil_socket_t, short, void *));
+
+/**
+ * Stop accepting new connections for a worker
+ * @param worker
+ */
+void worker_stop_accept (struct rspamd_worker *worker);
+
#endif
/*
diff --git a/src/smtp.c b/src/smtp.c
index 075e1b663..f4033c954 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -61,7 +61,8 @@ worker_t smtp_worker = {
TRUE, /* Has socket */
FALSE, /* Non unique */
FALSE, /* Non threaded */
- TRUE /* Killable */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
#ifndef HAVE_SA_SIGINFO
@@ -105,7 +106,7 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
+ worker_stop_accept (worker);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
}
@@ -944,13 +945,10 @@ config_smtp_worker (struct rspamd_worker *worker)
void
start_smtp (struct rspamd_worker *worker)
{
- struct sigaction signals;
struct smtp_worker_ctx *ctx = worker->ctx;
- gperf_profiler_init (worker->srv->cfg, "worker");
+ ctx->ev_base = prepare_worker (worker, "smtp_worker", sig_handler, accept_socket);
- worker->srv->pid = getpid ();
- ctx->ev_base = event_init ();
/* Set smtp options */
if ( !config_smtp_worker (worker)) {
@@ -958,9 +956,6 @@ start_smtp (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
- init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
@@ -971,11 +966,6 @@ start_smtp (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
- /* Accept event */
- event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
- event_base_set (ctx->ev_base, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
-
/* Maps events */
start_map_watch (worker->srv->cfg, ctx->ev_base);
diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c
index e90be31fe..d3bcd3866 100644
--- a/src/smtp_proxy.c
+++ b/src/smtp_proxy.c
@@ -62,7 +62,8 @@ worker_t smtp_proxy_worker = {
TRUE, /* Has socket */
FALSE, /* Non unique */
FALSE, /* Non threaded */
- TRUE /* Killable */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
struct smtp_proxy_ctx {
@@ -179,7 +180,7 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
+ worker_stop_accept (worker);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
}
@@ -1021,13 +1022,9 @@ config_smtp_proxy_worker (struct rspamd_worker *worker)
void
start_smtp_proxy (struct rspamd_worker *worker)
{
- struct sigaction signals;
struct smtp_proxy_ctx *ctx = worker->ctx;
- gperf_profiler_init (worker->srv->cfg, "worker");
-
- worker->srv->pid = getpid ();
- ctx->ev_base = event_init ();
+ ctx->ev_base = prepare_worker (worker, "smtp_proxy", sig_handler, accept_socket);
/* Set smtp options */
if ( !config_smtp_proxy_worker (worker)) {
@@ -1035,8 +1032,6 @@ start_smtp_proxy (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
- init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
@@ -1048,11 +1043,6 @@ start_smtp_proxy (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
- /* Accept event */
- event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
- event_base_set (ctx->ev_base, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
-
/* DNS resolver */
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
diff --git a/src/util.c b/src/util.c
index 441ce007a..4936b37dc 100644
--- a/src/util.c
+++ b/src/util.c
@@ -85,7 +85,7 @@ poll_sync_socket (gint fd, gint timeout, short events)
}
static gint
-make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean async)
+make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean async, GList **list)
{
gint fd, r, optlen, on = 1, s_error;
struct addrinfo *cur;
@@ -146,7 +146,13 @@ make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean
goto out;
}
}
- break;
+ if (list == NULL) {
+ /* Go out immediately */
+ break;
+ }
+ else if (fd != -1) {
+ *list = g_list_prepend (*list, GINT_TO_POINTER (fd));
+ }
out:
if (fd != -1) {
close (fd);
@@ -160,13 +166,13 @@ out:
gint
make_tcp_socket (struct addrinfo *addr, gboolean is_server, gboolean async)
{
- return make_inet_socket (SOCK_STREAM, addr, is_server, async);
+ return make_inet_socket (SOCK_STREAM, addr, is_server, async, NULL);
}
gint
make_udp_socket (struct addrinfo *addr, gboolean is_server, gboolean async)
{
- return make_inet_socket (SOCK_DGRAM, addr, is_server, async);
+ return make_inet_socket (SOCK_DGRAM, addr, is_server, async, NULL);
}
gint
@@ -284,7 +290,7 @@ make_unix_socket (const gchar *path, struct sockaddr_un *addr, gint type, gboole
}
/**
- * Make universal stream socket
+ * Make a universal socket
* @param credits host, ip or path to unix socket
* @param port port (used for network sockets)
* @param async make this socket asynced
@@ -292,7 +298,8 @@ make_unix_socket (const gchar *path, struct sockaddr_un *addr, gint type, gboole
* @param try_resolve try name resolution for a socket (BLOCKING)
*/
gint
-make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean async, gboolean is_server, gboolean try_resolve)
+make_universal_socket (const gchar *credits, guint16 port,
+ gint type, gboolean async, gboolean is_server, gboolean try_resolve)
{
struct sockaddr_un un;
struct stat st;
@@ -347,7 +354,7 @@ make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean a
rspamd_snprintf (portbuf, sizeof (portbuf), "%d", (int)port);
if ((r = getaddrinfo (credits, portbuf, &hints, &res)) == 0) {
- r = make_inet_socket (type, res, is_server, async);
+ r = make_inet_socket (type, res, is_server, async, NULL);
freeaddrinfo (res);
return r;
}
@@ -358,6 +365,120 @@ make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean a
}
}
+/**
+ * Make universal stream socket
+ * @param credits host, ip or path to unix socket
+ * @param port port (used for network sockets)
+ * @param async make this socket asynced
+ * @param is_server make this socket as server socket
+ * @param try_resolve try name resolution for a socket (BLOCKING)
+ */
+GList*
+make_universal_sockets_list (const gchar *credits, guint16 port,
+ gint type, gboolean async, gboolean is_server, gboolean try_resolve)
+{
+ struct sockaddr_un un;
+ struct stat st;
+ struct addrinfo hints, *res;
+ gint r, fd, serrno;
+ gchar portbuf[8], **strv, **cur;
+ GList *result = NULL, *rcur;
+
+ strv = g_strsplit_set (credits, ",", -1);
+ if (strv == NULL) {
+ msg_err ("invalid sockets credentials: %s", credits);
+ return NULL;
+ }
+ cur = strv;
+ while (*cur != NULL) {
+ if (*credits == '/') {
+ r = stat (credits, &st);
+ if (is_server) {
+ if (r == -1) {
+ fd = make_unix_socket (credits, &un, type, is_server, async);
+ }
+ else {
+ /* Unix socket exists, it must be unlinked first */
+ errno = EEXIST;
+ goto err;
+ }
+ }
+ else {
+ if (r == -1) {
+ /* Unix socket doesn't exists it must be created first */
+ errno = ENOENT;
+ goto err;
+ }
+ else {
+ if ((st.st_mode & S_IFSOCK) == 0) {
+ /* Path is not valid socket */
+ errno = EINVAL;
+ goto err;
+ }
+ else {
+ fd = make_unix_socket (credits, &un, type, is_server, async);
+ }
+ }
+ }
+ if (fd != -1) {
+ result = g_list_prepend (result, GINT_TO_POINTER (fd));
+ }
+ else {
+ goto err;
+ }
+ }
+ else {
+ /* TCP related part */
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_socktype = type; /* Type of the socket */
+ hints.ai_flags = is_server ? AI_PASSIVE : 0;
+ hints.ai_protocol = 0; /* Any protocol */
+ hints.ai_canonname = NULL;
+ hints.ai_addr = NULL;
+ hints.ai_next = NULL;
+
+ if (!try_resolve) {
+ hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
+ }
+
+ rspamd_snprintf (portbuf, sizeof (portbuf), "%d", (int)port);
+ if ((r = getaddrinfo (credits, portbuf, &hints, &res)) == 0) {
+ r = make_inet_socket (type, res, is_server, async, &result);
+ freeaddrinfo (res);
+ if (r == -1) {
+ goto err;
+ }
+ }
+ else {
+ msg_err ("address resolution for %s failed: %s", credits, gai_strerror (r));
+ goto err;
+ }
+ }
+ cur ++;
+ }
+
+ g_strfreev (strv);
+ return result;
+
+err:
+ g_strfreev (strv);
+ serrno = errno;
+ rcur = result;
+ while (rcur != NULL) {
+ fd = GPOINTER_TO_INT (rcur->data);
+ if (fd != -1) {
+ close (fd);
+ }
+ }
+ if (result != NULL) {
+ g_list_free (result);
+ }
+
+ errno = serrno;
+ return NULL;
+}
+
gint
make_socketpair (gint pair[2])
{
diff --git a/src/util.h b/src/util.h
index b1e07c538..5fa2eaff9 100644
--- a/src/util.h
+++ b/src/util.h
@@ -32,7 +32,7 @@ gint accept_from_socket (gint listen_sock, struct sockaddr *addr, socklen_t *len
gint make_unix_socket (const gchar *, struct sockaddr_un *, gint type, gboolean is_server, gboolean async);
/**
- * Make universal stream socket
+ * Make a universal socket
* @param credits host, ip or path to unix socket
* @param port port (used for network sockets)
* @param type type of socket (SO_STREAM or SO_DGRAM)
@@ -43,6 +43,17 @@ gint make_unix_socket (const gchar *, struct sockaddr_un *, gint type, gboolean
gint make_universal_socket (const gchar *credits, guint16 port, gint type,
gboolean async, gboolean is_server, gboolean try_resolve);
+/**
+ * Make a universal sockets
+ * @param credits host, ip or path to unix socket (several items may be separated by ',')
+ * @param port port (used for network sockets)
+ * @param type type of socket (SO_STREAM or SO_DGRAM)
+ * @param async make this socket asynced
+ * @param is_server make this socket as server socket
+ * @param try_resolve try name resolution for a socket (BLOCKING)
+ */
+GList* make_universal_sockets_list (const gchar *credits, guint16 port, gint type,
+ gboolean async, gboolean is_server, gboolean try_resolve);
/*
* Create socketpair
*/
diff --git a/src/webui.c b/src/webui.c
index 0260632fb..eb9cf8ba8 100644
--- a/src/webui.c
+++ b/src/webui.c
@@ -103,7 +103,8 @@ worker_t webui_worker = {
TRUE, /* Has socket */
TRUE, /* Non unique */
FALSE, /* Non threaded */
- TRUE /* Killable */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
#if defined(LIBEVENT_EVHTTP) || (defined(_EVENT_NUMERIC_VERSION) && (_EVENT_NUMERIC_VERSION > 0x02010000))
@@ -181,7 +182,6 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
}
@@ -1741,6 +1741,7 @@ start_webui_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
struct rspamd_webui_worker_ctx *ctx = worker->ctx;
+ GList *cur;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -1773,7 +1774,12 @@ start_webui_worker (struct rspamd_worker *worker)
ctx->worker = worker;
/* Accept event */
ctx->http = evhttp_new (ctx->ev_base);
- evhttp_accept_socket (ctx->http, worker->cf->listen_sock);
+
+ cur = worker->cf->listen_socks;
+ while (cur) {
+ evhttp_accept_socket (ctx->http, GPOINTER_TO_INT (cur->data));
+ cur = g_list_next (cur);
+ }
if (ctx->use_ssl) {
#ifdef HAVE_WEBUI_SSL
diff --git a/src/worker.c b/src/worker.c
index 973395f35..68ef89cb5 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -57,7 +57,8 @@ worker_t normal_worker = {
TRUE, /* Has socket */
FALSE, /* Non unique */
FALSE, /* Non threaded */
- TRUE /* Killable */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
#ifndef BUILD_STATIC
@@ -157,7 +158,7 @@ sigusr2_handler (gint fd, short what, void *arg)
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
- event_del (&worker->bind_ev);
+ worker_stop_accept (worker);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
}
@@ -816,25 +817,12 @@ init_worker (void)
void
start_worker (struct rspamd_worker *worker)
{
- struct sigaction signals;
gchar *is_custom_str;
struct rspamd_worker_ctx *ctx = worker->ctx;
GError *err = NULL;
struct lua_locked_state *nL;
-#ifdef WITH_PROFILER
- extern void _start (void), etext (void);
- monstartup ((u_long) & _start, (u_long) & etext);
-#endif
-
- gperf_profiler_init (worker->srv->cfg, "worker");
-
- worker->srv->pid = getpid ();
-
- ctx->ev_base = event_init ();
-
- init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+ ctx->ev_base = prepare_worker (worker, "normal", sig_handler, accept_socket);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
@@ -846,12 +834,6 @@ start_worker (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
- /* Accept event */
- event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
- accept_socket, (void *) worker);
- event_base_set (ctx->ev_base, &worker->bind_ev);
- event_add (&worker->bind_ev, NULL);
-
#ifndef BUILD_STATIC
/* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
diff --git a/src/worker_util.c b/src/worker_util.c
index 541d4f1e4..dd020e7cc 100644
--- a/src/worker_util.c
+++ b/src/worker_util.c
@@ -213,3 +213,66 @@ set_counter (const gchar *name, guint32 value)
return cd->value;
}
+
+struct event_base *
+prepare_worker (struct rspamd_worker *worker, const char *name,
+ rspamd_sig_handler_t sig_handler,
+ void (*accept_handler)(evutil_socket_t, short, void *))
+{
+ struct event_base *ev_base;
+ struct event *accept_event;
+ struct sigaction signals;
+ GList *cur;
+ gint listen_socket;
+
+#ifdef WITH_PROFILER
+ extern void _start (void), etext (void);
+ monstartup ((u_long) & _start, (u_long) & etext);
+#endif
+
+ gperf_profiler_init (worker->srv->cfg, "worker");
+
+ worker->srv->pid = getpid ();
+
+ ev_base = event_init ();
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* Accept all sockets */
+ cur = worker->cf->listen_socks;
+ while (cur) {
+ listen_socket = GPOINTER_TO_INT (cur->data);
+ if (listen_socket != -1) {
+ accept_event = g_slice_alloc0 (sizeof (struct event));
+ event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+ accept_handler, worker);
+ event_base_set (ev_base, accept_event);
+ event_add (accept_event, NULL);
+ worker->accept_events = g_list_prepend (worker->accept_events, accept_event);
+ }
+ cur = g_list_next (cur);
+ }
+
+ return ev_base;
+}
+
+void
+worker_stop_accept (struct rspamd_worker *worker)
+{
+ GList *cur;
+ struct event *event;
+
+ /* Remove all events */
+ cur = worker->accept_events;
+ while (cur) {
+ event = cur->data;
+ event_del (event);
+ cur = g_list_next (cur);
+ g_slice_free1 (sizeof (struct event), event);
+ }
+
+ if (worker->accept_events != NULL) {
+ g_list_free (worker->accept_events);
+ }
+}