diff options
-rw-r--r-- | CMakeLists.txt | 4 | ||||
-rw-r--r-- | config.h.in | 1 | ||||
-rw-r--r-- | src/cfg_file.h | 2 | ||||
-rw-r--r-- | src/controller.c | 22 | ||||
-rw-r--r-- | src/fuzzy_storage.c | 35 | ||||
-rw-r--r-- | src/kvstorage_server.c | 3 | ||||
-rw-r--r-- | src/lua_worker.c | 20 | ||||
-rw-r--r-- | src/main.c | 39 | ||||
-rw-r--r-- | src/main.h | 27 | ||||
-rw-r--r-- | src/smtp.c | 18 | ||||
-rw-r--r-- | src/smtp_proxy.c | 18 | ||||
-rw-r--r-- | src/util.c | 135 | ||||
-rw-r--r-- | src/util.h | 13 | ||||
-rw-r--r-- | src/webui.c | 12 | ||||
-rw-r--r-- | src/worker.c | 26 | ||||
-rw-r--r-- | src/worker_util.c | 63 |
16 files changed, 292 insertions, 146 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index a06264774..c520889ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1018,8 +1018,6 @@ INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/src" "${CMAKE_BINARY_DIR}/src") SET(RSPAMDSRC src/modules.c src/controller.c src/fuzzy_storage.c - src/kvstorage_server.c - src/lmtp.c src/lua_worker.c src/main.c src/map.c @@ -1036,7 +1034,7 @@ SET(PLUGINSSRC src/plugins/surbl.c src/plugins/dkim_check.c) SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim) -SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage lua webui) +SET(WORKERS_LIST normal controller smtp smtp_proxy fuzzy lua webui) AddModules(MODULES_LIST WORKERS_LIST) diff --git a/config.h.in b/config.h.in index e930e520f..69df33469 100644 --- a/config.h.in +++ b/config.h.in @@ -467,6 +467,7 @@ typedef struct worker_s { gboolean unique; gboolean threaded; gboolean killable; + gint listen_type; } worker_t; extern module_t *modules[]; diff --git a/src/cfg_file.h b/src/cfg_file.h index 3b7059786..829b4b453 100644 --- a/src/cfg_file.h +++ b/src/cfg_file.h @@ -257,7 +257,7 @@ struct worker_conf { guint16 bind_port; /**< bind port in case of TCP socket */ guint16 bind_family; /**< bind type (AF_UNIX or AF_INET) */ guint16 count; /**< number of workers */ - gint listen_sock; /**< listening socket desctiptor */ + GList *listen_socks; /**< listening sockets desctiptors */ guint32 rlimit_nofile; /**< max files limit */ guint32 rlimit_maxcore; /**< maximum core file size */ GHashTable *params; /**< params for worker */ diff --git a/src/controller.c b/src/controller.c index 45c026425..b712895d2 100644 --- a/src/controller.c +++ b/src/controller.c @@ -60,7 +60,8 @@ worker_t controller_worker = { TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ - TRUE /* Killable */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; enum command_type { @@ -171,7 +172,7 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); + worker_stop_accept (worker); msg_info ("controller's shutdown is pending in %d sec", 2); event_loopexit (&tv); return; @@ -1886,22 +1887,15 @@ init_controller (void) void start_controller (struct rspamd_worker *worker) { - struct sigaction signals; gchar *hostbuf; gsize hostmax; - struct rspamd_controller_ctx *ctx; + struct rspamd_controller_ctx *ctx = worker->ctx; GError *err = NULL; struct timeval tv; - worker->srv->pid = getpid (); - ctx = worker->ctx; - - ctx->ev_base = event_init (); + ctx->ev_base = prepare_worker (worker, "controller", sig_handler, accept_socket); g_mime_init (0); - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); event_base_set (ctx->ev_base, &worker->sig_ev_usr2); @@ -1949,16 +1943,10 @@ start_controller (struct rspamd_worker *worker) gethostname (hostbuf, hostmax); hostbuf[hostmax - 1] = '\0'; rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf); - /* Accept event */ - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_base_set (ctx->ev_base, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); start_map_watch (worker->srv->cfg, ctx->ev_base); ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); - gperf_profiler_init (worker->srv->cfg, "controller"); - event_base_loop (ctx->ev_base, 0); close_log (worker->srv->logger); diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index c5ceb1b1c..7bd950014 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -71,10 +71,11 @@ worker_t fuzzy_worker = { "fuzzy", /* Name */ init_fuzzy, /* Init function */ start_fuzzy, /* Start function */ - FALSE, /* No socket */ + TRUE, /* No socket */ TRUE, /* Unique */ TRUE, /* Threaded */ - FALSE /* Non killable */ + FALSE, /* Non killable */ + SOCK_DGRAM /* UDP socket */ }; static GQueue *hashes[BUCKETS]; @@ -364,8 +365,6 @@ sigterm_handler (gint fd, short what, void *arg) ctx = worker->ctx; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); - close (worker->cf->listen_sock); rspamd_mutex_lock (ctx->update_mtx); mods = ctx->max_mods + 1; @@ -392,8 +391,7 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); - close (worker->cf->listen_sock); + worker_stop_accept (worker); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); rspamd_mutex_lock (ctx->update_mtx); mods = ctx->max_mods + 1; @@ -1025,21 +1023,14 @@ init_fuzzy (void) void start_fuzzy (struct rspamd_worker *worker) { - struct sigaction signals; struct event sev; - gint retries = 0; struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx; GError *err = NULL; - worker->srv->pid = getpid (); - - ctx->ev_base = event_init (); + ctx->ev_base = prepare_worker (worker, "controller", sig_handler, accept_fuzzy_socket); server_stat = worker->srv->stat; - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); event_base_set (ctx->ev_base, &worker->sig_ev_usr2); @@ -1054,16 +1045,6 @@ start_fuzzy (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &sev); signal_add (&sev, NULL); - /* Listen event */ - while ((worker->cf->listen_sock = - make_universal_socket (worker->cf->bind_addr, worker->cf->bind_port, SOCK_DGRAM, TRUE, TRUE, FALSE)) == -1) { - sleep (1); - if (++retries > MAX_RETRIES) { - msg_err ("cannot bind to socket, exiting"); - exit (0); - } - } - /* Init bloom filter */ bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES); /* Try to read hashes from file */ @@ -1078,10 +1059,6 @@ start_fuzzy (struct rspamd_worker *worker) tmv.tv_usec = 0; evtimer_add (&tev, &tmv); - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker); - event_base_set (ctx->ev_base, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); - /* Create radix tree */ if (ctx->update_map != NULL) { if (!add_map (worker->srv->cfg, ctx->update_map, "Allow fuzzy updates from specified addresses", @@ -1095,8 +1072,6 @@ start_fuzzy (struct rspamd_worker *worker) /* Maps events */ start_map_watch (worker->srv->cfg, ctx->ev_base); - gperf_profiler_init (worker->srv->cfg, "fuzzy"); - ctx->update_thread = rspamd_create_thread ("fuzzy update", sync_cache, worker, &err); if (ctx->update_thread == NULL) { msg_err ("error creating update thread: %s", err->message); diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 92090d4f6..b493eee46 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -74,7 +74,8 @@ worker_t keystorage_worker = { TRUE, /* Has socket */ FALSE, /* Non unique */ TRUE, /* Non threaded */ - FALSE /* Non killable */ + FALSE, /* Non killable */ + SOCK_STREAM /* TCP socket */ }; #ifndef HAVE_SA_SIGINFO diff --git a/src/lua_worker.c b/src/lua_worker.c index 0cadb7eb2..6262af85b 100644 --- a/src/lua_worker.c +++ b/src/lua_worker.c @@ -53,7 +53,8 @@ worker_t lua_worker = { TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ - TRUE /* Killable */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; /* @@ -302,7 +303,7 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); + worker_stop_accept (worker); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); event_loopexit (&tv); } @@ -439,7 +440,6 @@ init_lua_worker (void) void start_lua_worker (struct rspamd_worker *worker) { - struct sigaction signals; struct rspamd_lua_worker_ctx *ctx = worker->ctx, **pctx; lua_State *L; @@ -448,18 +448,12 @@ start_lua_worker (struct rspamd_worker *worker) monstartup ((u_long) & _start, (u_long) & etext); #endif - gperf_profiler_init (worker->srv->cfg, "lua_worker"); - - worker->srv->pid = getpid (); - - ctx->ev_base = event_init (); + ctx->ev_base = prepare_worker (worker, "lua_worker", sig_handler, lua_accept_socket); L = worker->srv->cfg->lua_state; ctx->L = L; ctx->cfg = worker->srv->cfg; - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); @@ -471,12 +465,6 @@ start_lua_worker (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &worker->sig_ev_usr1); signal_add (&worker->sig_ev_usr1, NULL); - /* Accept event */ - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, - lua_accept_socket, (void *) worker); - event_base_set (ctx->ev_base, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); - ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); /* Open worker's lib */ diff --git a/src/main.c b/src/main.c index 9b5a12108..607685720 100644 --- a/src/main.c +++ b/src/main.c @@ -550,20 +550,26 @@ dump_cfg_vars (struct config_file *cfg) g_hash_table_foreach (cfg->variables, dump_all_variables, NULL); } -static gint -create_listen_socket (const gchar *addr, gint port, gint family) +static GList * +create_listen_socket (const gchar *addr, gint port, gint family, gint listen_type) { gint listen_sock = -1; - /* Create listen socket */ - listen_sock = make_universal_socket (addr, port, SOCK_STREAM, TRUE, TRUE, TRUE); - - if (listen_sock != -1) { - if (listen (listen_sock, -1) == -1) { - msg_err ("cannot listen on socket. %s", strerror (errno)); + GList *result, *cur; + /* Create listen sockets */ + result = make_universal_sockets_list (addr, port, listen_type, TRUE, TRUE, TRUE); + + cur = result; + while (cur != NULL) { + listen_sock = GPOINTER_TO_INT (cur->data); + if (listen_sock != -1) { + if (listen (listen_sock, -1) == -1) { + msg_err ("cannot listen on socket. %s", strerror (errno)); + } } + cur = g_list_next (cur); } - return listen_sock; + return result; } static void @@ -596,9 +602,9 @@ make_listen_key (const gchar *addr, gint port, gint family) static void spawn_workers (struct rspamd_main *rspamd) { - GList *cur; + GList *cur, *ls; struct worker_conf *cf; - gint i, listen_sock; + gint i; gpointer p; cur = rspamd->cfg->workers; @@ -614,19 +620,20 @@ spawn_workers (struct rspamd_main *rspamd) if ((p = g_hash_table_lookup (listen_sockets, GINT_TO_POINTER ( make_listen_key (cf->bind_addr, cf->bind_port, cf->bind_family)))) == NULL) { /* Create listen socket */ - listen_sock = create_listen_socket (cf->bind_addr, cf->bind_port, cf->bind_family); - if (listen_sock == -1) { + ls = create_listen_socket (cf->bind_addr, cf->bind_port, cf->bind_family, + cf->worker->listen_type); + if (ls == NULL) { exit (-errno); } g_hash_table_insert (listen_sockets, GINT_TO_POINTER ( make_listen_key (cf->bind_addr, cf->bind_port, cf->bind_family)), - GINT_TO_POINTER (listen_sock)); + ls); } else { /* We had socket for this type of worker */ - listen_sock = GPOINTER_TO_INT (p); + ls = p; } - cf->listen_sock = listen_sock; + cf->listen_socks = ls; } if (cf->worker->unique) { diff --git a/src/main.h b/src/main.h index 521e0740a..052c6ce6d 100644 --- a/src/main.h +++ b/src/main.h @@ -53,7 +53,7 @@ struct rspamd_worker { GQuark type; /**< process type */ struct event sig_ev_usr1; /**< signals event */ struct event sig_ev_usr2; /**< signals event */ - struct event bind_ev; /**< socket events */ + GList *accept_events; /**< socket events */ struct worker_conf *cf; /**< worker config data */ gpointer ctx; /**< worker's specific data */ }; @@ -321,6 +321,31 @@ void free_task_soft (gpointer ud); */ double set_counter (const gchar *name, guint32 value); +#ifndef HAVE_SA_SIGINFO +typedef void (*rspamd_sig_handler_t) (gint); +#else +typedef void (*rspamd_sig_handler_t) (gint, siginfo_t *, void *); +#endif + +/** + * Prepare worker's startup + * @param worker worker structure + * @param name name of the worker + * @param sig_handler handler of main signals + * @param accept_handler handler of accept event for listen sockets + * @return event base suitable for a worker + */ +struct event_base * +prepare_worker (struct rspamd_worker *worker, const char *name, + rspamd_sig_handler_t sig_handler, + void (*accept_handler)(evutil_socket_t, short, void *)); + +/** + * Stop accepting new connections for a worker + * @param worker + */ +void worker_stop_accept (struct rspamd_worker *worker); + #endif /* diff --git a/src/smtp.c b/src/smtp.c index 075e1b663..f4033c954 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -61,7 +61,8 @@ worker_t smtp_worker = { TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ - TRUE /* Killable */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; #ifndef HAVE_SA_SIGINFO @@ -105,7 +106,7 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); + worker_stop_accept (worker); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); event_loopexit (&tv); } @@ -944,13 +945,10 @@ config_smtp_worker (struct rspamd_worker *worker) void start_smtp (struct rspamd_worker *worker) { - struct sigaction signals; struct smtp_worker_ctx *ctx = worker->ctx; - gperf_profiler_init (worker->srv->cfg, "worker"); + ctx->ev_base = prepare_worker (worker, "smtp_worker", sig_handler, accept_socket); - worker->srv->pid = getpid (); - ctx->ev_base = event_init (); /* Set smtp options */ if ( !config_smtp_worker (worker)) { @@ -958,9 +956,6 @@ start_smtp (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); event_base_set (ctx->ev_base, &worker->sig_ev_usr2); @@ -971,11 +966,6 @@ start_smtp (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &worker->sig_ev_usr1); signal_add (&worker->sig_ev_usr1, NULL); - /* Accept event */ - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_base_set (ctx->ev_base, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); - /* Maps events */ start_map_watch (worker->srv->cfg, ctx->ev_base); diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c index e90be31fe..d3bcd3866 100644 --- a/src/smtp_proxy.c +++ b/src/smtp_proxy.c @@ -62,7 +62,8 @@ worker_t smtp_proxy_worker = { TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ - TRUE /* Killable */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; struct smtp_proxy_ctx { @@ -179,7 +180,7 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); + worker_stop_accept (worker); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); event_loopexit (&tv); } @@ -1021,13 +1022,9 @@ config_smtp_proxy_worker (struct rspamd_worker *worker) void start_smtp_proxy (struct rspamd_worker *worker) { - struct sigaction signals; struct smtp_proxy_ctx *ctx = worker->ctx; - gperf_profiler_init (worker->srv->cfg, "worker"); - - worker->srv->pid = getpid (); - ctx->ev_base = event_init (); + ctx->ev_base = prepare_worker (worker, "smtp_proxy", sig_handler, accept_socket); /* Set smtp options */ if ( !config_smtp_proxy_worker (worker)) { @@ -1035,8 +1032,6 @@ start_smtp_proxy (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); @@ -1048,11 +1043,6 @@ start_smtp_proxy (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &worker->sig_ev_usr1); signal_add (&worker->sig_ev_usr1, NULL); - /* Accept event */ - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_base_set (ctx->ev_base, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); - /* DNS resolver */ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); diff --git a/src/util.c b/src/util.c index 441ce007a..4936b37dc 100644 --- a/src/util.c +++ b/src/util.c @@ -85,7 +85,7 @@ poll_sync_socket (gint fd, gint timeout, short events) } static gint -make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean async) +make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean async, GList **list) { gint fd, r, optlen, on = 1, s_error; struct addrinfo *cur; @@ -146,7 +146,13 @@ make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean goto out; } } - break; + if (list == NULL) { + /* Go out immediately */ + break; + } + else if (fd != -1) { + *list = g_list_prepend (*list, GINT_TO_POINTER (fd)); + } out: if (fd != -1) { close (fd); @@ -160,13 +166,13 @@ out: gint make_tcp_socket (struct addrinfo *addr, gboolean is_server, gboolean async) { - return make_inet_socket (SOCK_STREAM, addr, is_server, async); + return make_inet_socket (SOCK_STREAM, addr, is_server, async, NULL); } gint make_udp_socket (struct addrinfo *addr, gboolean is_server, gboolean async) { - return make_inet_socket (SOCK_DGRAM, addr, is_server, async); + return make_inet_socket (SOCK_DGRAM, addr, is_server, async, NULL); } gint @@ -284,7 +290,7 @@ make_unix_socket (const gchar *path, struct sockaddr_un *addr, gint type, gboole } /** - * Make universal stream socket + * Make a universal socket * @param credits host, ip or path to unix socket * @param port port (used for network sockets) * @param async make this socket asynced @@ -292,7 +298,8 @@ make_unix_socket (const gchar *path, struct sockaddr_un *addr, gint type, gboole * @param try_resolve try name resolution for a socket (BLOCKING) */ gint -make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean async, gboolean is_server, gboolean try_resolve) +make_universal_socket (const gchar *credits, guint16 port, + gint type, gboolean async, gboolean is_server, gboolean try_resolve) { struct sockaddr_un un; struct stat st; @@ -347,7 +354,7 @@ make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean a rspamd_snprintf (portbuf, sizeof (portbuf), "%d", (int)port); if ((r = getaddrinfo (credits, portbuf, &hints, &res)) == 0) { - r = make_inet_socket (type, res, is_server, async); + r = make_inet_socket (type, res, is_server, async, NULL); freeaddrinfo (res); return r; } @@ -358,6 +365,120 @@ make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean a } } +/** + * Make universal stream socket + * @param credits host, ip or path to unix socket + * @param port port (used for network sockets) + * @param async make this socket asynced + * @param is_server make this socket as server socket + * @param try_resolve try name resolution for a socket (BLOCKING) + */ +GList* +make_universal_sockets_list (const gchar *credits, guint16 port, + gint type, gboolean async, gboolean is_server, gboolean try_resolve) +{ + struct sockaddr_un un; + struct stat st; + struct addrinfo hints, *res; + gint r, fd, serrno; + gchar portbuf[8], **strv, **cur; + GList *result = NULL, *rcur; + + strv = g_strsplit_set (credits, ",", -1); + if (strv == NULL) { + msg_err ("invalid sockets credentials: %s", credits); + return NULL; + } + cur = strv; + while (*cur != NULL) { + if (*credits == '/') { + r = stat (credits, &st); + if (is_server) { + if (r == -1) { + fd = make_unix_socket (credits, &un, type, is_server, async); + } + else { + /* Unix socket exists, it must be unlinked first */ + errno = EEXIST; + goto err; + } + } + else { + if (r == -1) { + /* Unix socket doesn't exists it must be created first */ + errno = ENOENT; + goto err; + } + else { + if ((st.st_mode & S_IFSOCK) == 0) { + /* Path is not valid socket */ + errno = EINVAL; + goto err; + } + else { + fd = make_unix_socket (credits, &un, type, is_server, async); + } + } + } + if (fd != -1) { + result = g_list_prepend (result, GINT_TO_POINTER (fd)); + } + else { + goto err; + } + } + else { + /* TCP related part */ + memset (&hints, 0, sizeof (hints)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = type; /* Type of the socket */ + hints.ai_flags = is_server ? AI_PASSIVE : 0; + hints.ai_protocol = 0; /* Any protocol */ + hints.ai_canonname = NULL; + hints.ai_addr = NULL; + hints.ai_next = NULL; + + if (!try_resolve) { + hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; + } + + rspamd_snprintf (portbuf, sizeof (portbuf), "%d", (int)port); + if ((r = getaddrinfo (credits, portbuf, &hints, &res)) == 0) { + r = make_inet_socket (type, res, is_server, async, &result); + freeaddrinfo (res); + if (r == -1) { + goto err; + } + } + else { + msg_err ("address resolution for %s failed: %s", credits, gai_strerror (r)); + goto err; + } + } + cur ++; + } + + g_strfreev (strv); + return result; + +err: + g_strfreev (strv); + serrno = errno; + rcur = result; + while (rcur != NULL) { + fd = GPOINTER_TO_INT (rcur->data); + if (fd != -1) { + close (fd); + } + } + if (result != NULL) { + g_list_free (result); + } + + errno = serrno; + return NULL; +} + gint make_socketpair (gint pair[2]) { diff --git a/src/util.h b/src/util.h index b1e07c538..5fa2eaff9 100644 --- a/src/util.h +++ b/src/util.h @@ -32,7 +32,7 @@ gint accept_from_socket (gint listen_sock, struct sockaddr *addr, socklen_t *len gint make_unix_socket (const gchar *, struct sockaddr_un *, gint type, gboolean is_server, gboolean async); /** - * Make universal stream socket + * Make a universal socket * @param credits host, ip or path to unix socket * @param port port (used for network sockets) * @param type type of socket (SO_STREAM or SO_DGRAM) @@ -43,6 +43,17 @@ gint make_unix_socket (const gchar *, struct sockaddr_un *, gint type, gboolean gint make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean async, gboolean is_server, gboolean try_resolve); +/** + * Make a universal sockets + * @param credits host, ip or path to unix socket (several items may be separated by ',') + * @param port port (used for network sockets) + * @param type type of socket (SO_STREAM or SO_DGRAM) + * @param async make this socket asynced + * @param is_server make this socket as server socket + * @param try_resolve try name resolution for a socket (BLOCKING) + */ +GList* make_universal_sockets_list (const gchar *credits, guint16 port, gint type, + gboolean async, gboolean is_server, gboolean try_resolve); /* * Create socketpair */ diff --git a/src/webui.c b/src/webui.c index 0260632fb..eb9cf8ba8 100644 --- a/src/webui.c +++ b/src/webui.c @@ -103,7 +103,8 @@ worker_t webui_worker = { TRUE, /* Has socket */ TRUE, /* Non unique */ FALSE, /* Non threaded */ - TRUE /* Killable */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; #if defined(LIBEVENT_EVHTTP) || (defined(_EVENT_NUMERIC_VERSION) && (_EVENT_NUMERIC_VERSION > 0x02010000)) @@ -181,7 +182,6 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); event_loopexit (&tv); } @@ -1741,6 +1741,7 @@ start_webui_worker (struct rspamd_worker *worker) { struct sigaction signals; struct rspamd_webui_worker_ctx *ctx = worker->ctx; + GList *cur; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -1773,7 +1774,12 @@ start_webui_worker (struct rspamd_worker *worker) ctx->worker = worker; /* Accept event */ ctx->http = evhttp_new (ctx->ev_base); - evhttp_accept_socket (ctx->http, worker->cf->listen_sock); + + cur = worker->cf->listen_socks; + while (cur) { + evhttp_accept_socket (ctx->http, GPOINTER_TO_INT (cur->data)); + cur = g_list_next (cur); + } if (ctx->use_ssl) { #ifdef HAVE_WEBUI_SSL diff --git a/src/worker.c b/src/worker.c index 973395f35..68ef89cb5 100644 --- a/src/worker.c +++ b/src/worker.c @@ -57,7 +57,8 @@ worker_t normal_worker = { TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ - TRUE /* Killable */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; #ifndef BUILD_STATIC @@ -157,7 +158,7 @@ sigusr2_handler (gint fd, short what, void *arg) tv.tv_usec = 0; event_del (&worker->sig_ev_usr1); event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); + worker_stop_accept (worker); msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); event_loopexit (&tv); } @@ -816,25 +817,12 @@ init_worker (void) void start_worker (struct rspamd_worker *worker) { - struct sigaction signals; gchar *is_custom_str; struct rspamd_worker_ctx *ctx = worker->ctx; GError *err = NULL; struct lua_locked_state *nL; -#ifdef WITH_PROFILER - extern void _start (void), etext (void); - monstartup ((u_long) & _start, (u_long) & etext); -#endif - - gperf_profiler_init (worker->srv->cfg, "worker"); - - worker->srv->pid = getpid (); - - ctx->ev_base = event_init (); - - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + ctx->ev_base = prepare_worker (worker, "normal", sig_handler, accept_socket); /* SIGUSR2 handler */ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); @@ -846,12 +834,6 @@ start_worker (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &worker->sig_ev_usr1); signal_add (&worker->sig_ev_usr1, NULL); - /* Accept event */ - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, - accept_socket, (void *) worker); - event_base_set (ctx->ev_base, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); - #ifndef BUILD_STATIC /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */ diff --git a/src/worker_util.c b/src/worker_util.c index 541d4f1e4..dd020e7cc 100644 --- a/src/worker_util.c +++ b/src/worker_util.c @@ -213,3 +213,66 @@ set_counter (const gchar *name, guint32 value) return cd->value; } + +struct event_base * +prepare_worker (struct rspamd_worker *worker, const char *name, + rspamd_sig_handler_t sig_handler, + void (*accept_handler)(evutil_socket_t, short, void *)) +{ + struct event_base *ev_base; + struct event *accept_event; + struct sigaction signals; + GList *cur; + gint listen_socket; + +#ifdef WITH_PROFILER + extern void _start (void), etext (void); + monstartup ((u_long) & _start, (u_long) & etext); +#endif + + gperf_profiler_init (worker->srv->cfg, "worker"); + + worker->srv->pid = getpid (); + + ev_base = event_init (); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* Accept all sockets */ + cur = worker->cf->listen_socks; + while (cur) { + listen_socket = GPOINTER_TO_INT (cur->data); + if (listen_socket != -1) { + accept_event = g_slice_alloc0 (sizeof (struct event)); + event_set (accept_event, listen_socket, EV_READ | EV_PERSIST, + accept_handler, worker); + event_base_set (ev_base, accept_event); + event_add (accept_event, NULL); + worker->accept_events = g_list_prepend (worker->accept_events, accept_event); + } + cur = g_list_next (cur); + } + + return ev_base; +} + +void +worker_stop_accept (struct rspamd_worker *worker) +{ + GList *cur; + struct event *event; + + /* Remove all events */ + cur = worker->accept_events; + while (cur) { + event = cur->data; + event_del (event); + cur = g_list_next (cur); + g_slice_free1 (sizeof (struct event), event); + } + + if (worker->accept_events != NULL) { + g_list_free (worker->accept_events); + } +} |