From 8fd0795feffdde4dd3a9f2fbe8c6ba517be370bf Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 14 May 2016 12:39:56 +0100 Subject: [PATCH] [Feature] Rework listening system to allow multiple socket types per worker --- src/controller.c | 2 +- src/fuzzy_storage.c | 12 +++++++----- src/hs_helper.c | 2 +- src/libserver/worker_util.c | 11 +++++++---- src/log_helper.c | 4 ++-- src/lua_worker.c | 10 +++++----- src/rspamd.c | 34 ++++++++++++++++++++++++++-------- src/rspamd.h | 14 +++++++++++++- src/rspamd_proxy.c | 2 +- src/smtp_proxy.c | 2 +- src/worker.c | 2 +- 11 files changed, 65 insertions(+), 30 deletions(-) diff --git a/src/controller.c b/src/controller.c index 8b16eda19..61a84ca7e 100644 --- a/src/controller.c +++ b/src/controller.c @@ -109,7 +109,7 @@ worker_t controller_worker = { init_controller_worker, /* Init function */ start_controller_worker, /* Start function */ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE, - SOCK_STREAM, /* TCP socket */ + RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER /* Version info */ }; /* diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 7803307b0..a1881aff3 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -53,7 +53,7 @@ worker_t fuzzy_worker = { init_fuzzy, /* Init function */ start_fuzzy, /* Start function */ RSPAMD_WORKER_HAS_SOCKET, - SOCK_DGRAM, /* UDP socket */ + RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */ RSPAMD_WORKER_VER /* Version info */ }; @@ -1351,7 +1351,7 @@ fuzzy_peer_rep (struct rspamd_worker *worker, { struct rspamd_fuzzy_storage_ctx *ctx = ud; GList *cur; - gint listen_socket; + struct rspamd_worker_listen_socket *ls; struct event *accept_event; gdouble next_check; @@ -1368,16 +1368,18 @@ fuzzy_peer_rep (struct rspamd_worker *worker, /* Start listening */ cur = worker->cf->listen_socks; while (cur) { - listen_socket = GPOINTER_TO_INT (cur->data); - if (listen_socket != -1) { + ls = cur->data; + + if (ls->fd != -1) { accept_event = g_slice_alloc0 (sizeof (struct event)); - event_set (accept_event, listen_socket, EV_READ | EV_PERSIST, + event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, accept_fuzzy_socket, worker); event_base_set (ctx->ev_base, accept_event); event_add (accept_event, NULL); worker->accept_events = g_list_prepend (worker->accept_events, accept_event); } + cur = g_list_next (cur); } diff --git a/src/hs_helper.c b/src/hs_helper.c index 1292d6460..127cf2785 100644 --- a/src/hs_helper.c +++ b/src/hs_helper.c @@ -33,7 +33,7 @@ worker_t hs_helper_worker = { init_hs_helper, /* Init function */ start_hs_helper, /* Start function */ RSPAMD_WORKER_UNIQUE|RSPAMD_WORKER_KILLABLE|RSPAMD_WORKER_ALWAYS_START, - SOCK_STREAM, /* TCP socket */ + RSPAMD_WORKER_SOCKET_NONE, /* No socket */ RSPAMD_WORKER_VER /* Version info */ }; diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 740c4cd7a..d3e296d6d 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -250,7 +250,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, struct event_base *ev_base; struct event *accept_event; GList *cur; - gint listen_socket; + struct rspamd_worker_listen_socket *ls; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -271,17 +271,20 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, /* Accept all sockets */ if (accept_handler) { cur = worker->cf->listen_socks; + while (cur) { - listen_socket = GPOINTER_TO_INT (cur->data); - if (listen_socket != -1) { + ls = cur->data; + + if (ls->fd != -1) { accept_event = g_slice_alloc0 (sizeof (struct event)); - event_set (accept_event, listen_socket, EV_READ | EV_PERSIST, + event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, accept_handler, worker); event_base_set (ev_base, accept_event); event_add (accept_event, NULL); worker->accept_events = g_list_prepend (worker->accept_events, accept_event); } + cur = g_list_next (cur); } } diff --git a/src/log_helper.c b/src/log_helper.c index d01e102c6..c4cbeb73c 100644 --- a/src/log_helper.c +++ b/src/log_helper.c @@ -37,8 +37,8 @@ worker_t log_helper_worker = { init_log_helper, /* Init function */ start_log_helper, /* Start function */ RSPAMD_WORKER_UNIQUE | RSPAMD_WORKER_KILLABLE, - SOCK_STREAM, /* TCP socket */ - RSPAMD_WORKER_VER /* Version info */ + RSPAMD_WORKER_SOCKET_NONE, /* No socket */ + RSPAMD_WORKER_VER /* Version info */ }; static const guint64 rspamd_log_helper_magic = 0x1090bb46aaa74c9aULL; diff --git a/src/lua_worker.c b/src/lua_worker.c index 87f597e28..df6970efa 100644 --- a/src/lua_worker.c +++ b/src/lua_worker.c @@ -39,12 +39,12 @@ gpointer init_lua_worker (struct rspamd_config *cfg); void start_lua_worker (struct rspamd_worker *worker); worker_t lua_worker = { - "lua", /* Name */ - init_lua_worker, /* Init function */ - start_lua_worker, /* Start function */ + "lua", /* Name */ + init_lua_worker, /* Init function */ + start_lua_worker, /* Start function */ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE, - SOCK_STREAM, /* TCP socket */ - RSPAMD_WORKER_VER /* Version info */ + RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ + RSPAMD_WORKER_VER /* Version info */ }; static const guint64 rspamd_lua_ctx_magic = 0x8055e2652aacf96eULL; diff --git a/src/rspamd.c b/src/rspamd.c index 72e676267..99fa6ca39 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -340,20 +340,38 @@ rspamd_fork_delayed (struct rspamd_worker_conf *cf, } static GList * -create_listen_socket (GPtrArray *addrs, guint cnt, gint listen_type) +create_listen_socket (GPtrArray *addrs, guint cnt, + enum rspamd_worker_socket_type listen_type) { GList *result = NULL; gint fd; guint i; - gpointer p; + struct rspamd_worker_listen_socket *ls; g_ptr_array_sort (addrs, rspamd_inet_address_compare_ptr); for (i = 0; i < cnt; i ++) { - fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i), - listen_type, TRUE); - if (fd != -1) { - p = GINT_TO_POINTER (fd); - result = g_list_prepend (result, p); + + if (listen_type & RSPAMD_WORKER_SOCKET_TCP) { + fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i), + SOCK_STREAM, TRUE); + if (fd != -1) { + ls = g_slice_alloc0 (sizeof (*ls)); + ls->addr = g_ptr_array_index (addrs, i); + ls->fd = fd; + ls->type = RSPAMD_WORKER_SOCKET_TCP; + result = g_list_prepend (result, ls); + } + } + if (listen_type & RSPAMD_WORKER_SOCKET_UDP) { + fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i), + SOCK_DGRAM, TRUE); + if (fd != -1) { + ls = g_slice_alloc0 (sizeof (*ls)); + ls->addr = g_ptr_array_index (addrs, i); + ls->fd = fd; + ls->type = RSPAMD_WORKER_SOCKET_TCP; + result = g_list_prepend (result, ls); + } } } @@ -492,7 +510,7 @@ spawn_workers (struct rspamd_main *rspamd_main, struct event_base *ev_base) key = make_listen_key (bcf); if ((p = g_hash_table_lookup (listen_sockets, - GINT_TO_POINTER (key))) == NULL) { + GINT_TO_POINTER (key))) == NULL) { if (!bcf->is_systemd) { /* Create listen socket */ diff --git a/src/rspamd.h b/src/rspamd.h index bd4ca9684..c0c60185d 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -165,12 +165,24 @@ enum rspamd_worker_flags { RSPAMD_WORKER_ALWAYS_START = (1 << 4), }; +enum rspamd_worker_socket_type { + RSPAMD_WORKER_SOCKET_NONE = 0, + RSPAMD_WORKER_SOCKET_TCP = (1 << 0), + RSPAMD_WORKER_SOCKET_UDP = (1 << 1), +}; + +struct rspamd_worker_listen_socket { + const rspamd_inet_addr_t *addr; + gint fd; + enum rspamd_worker_socket_type type; +}; + typedef struct worker_s { const gchar *name; gpointer (*worker_init_func)(struct rspamd_config *cfg); void (*worker_start_func)(struct rspamd_worker *worker); enum rspamd_worker_flags flags; - gint listen_type; + enum rspamd_worker_socket_type listen_type; guint worker_version; guint64 rspamd_version; const gchar *rspamd_features; diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 32a2937e8..247c856f4 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -57,7 +57,7 @@ worker_t rspamd_proxy_worker = { init_rspamd_proxy, /* Init function */ start_rspamd_proxy, /* Start function */ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE, - SOCK_STREAM, /* TCP socket */ + RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER }; diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c index 0202e3cfe..6b0a4fe2e 100644 --- a/src/smtp_proxy.c +++ b/src/smtp_proxy.c @@ -44,7 +44,7 @@ worker_t smtp_proxy_worker = { init_smtp_proxy, /* Init function */ start_smtp_proxy, /* Start function */ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE, - SOCK_STREAM, /* TCP socket */ + RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER /* Version info */ }; diff --git a/src/worker.c b/src/worker.c index 735cbc6a9..d3e3fc4c8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -50,7 +50,7 @@ worker_t normal_worker = { init_worker, /* Init function */ start_worker, /* Start function */ RSPAMD_WORKER_HAS_SOCKET|RSPAMD_WORKER_KILLABLE, - SOCK_STREAM, /* TCP socket */ + RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER /* Version info */ }; -- 2.39.5