]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Rework listening system to allow multiple socket types per worker
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 14 May 2016 11:39:56 +0000 (12:39 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 14 May 2016 11:39:56 +0000 (12:39 +0100)
src/controller.c
src/fuzzy_storage.c
src/hs_helper.c
src/libserver/worker_util.c
src/log_helper.c
src/lua_worker.c
src/rspamd.c
src/rspamd.h
src/rspamd_proxy.c
src/smtp_proxy.c
src/worker.c

index 8b16eda19302f1cb26bc1356e19b6865b5335fda..61a84ca7efa963d9349122ee6895319f67ec247b 100644 (file)
@@ -109,7 +109,7 @@ worker_t controller_worker = {
        init_controller_worker,         /* Init function */
        start_controller_worker,        /* Start function */
        RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
-       SOCK_STREAM,            /* TCP socket */
+       RSPAMD_WORKER_SOCKET_TCP,       /* TCP socket */
        RSPAMD_WORKER_VER       /* Version info */
 };
 /*
index 7803307b0fffe0df8d05142c279535b352928d20..a1881aff32054ffebeeeb126773577a18e486e94 100644 (file)
@@ -53,7 +53,7 @@ worker_t fuzzy_worker = {
                init_fuzzy,                 /* Init function */
                start_fuzzy,                /* Start function */
                RSPAMD_WORKER_HAS_SOCKET,
-               SOCK_DGRAM,                 /* UDP socket */
+               RSPAMD_WORKER_SOCKET_UDP,   /* UDP socket */
                RSPAMD_WORKER_VER           /* Version info */
 };
 
@@ -1351,7 +1351,7 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
 {
        struct rspamd_fuzzy_storage_ctx *ctx = ud;
        GList *cur;
-       gint listen_socket;
+       struct rspamd_worker_listen_socket *ls;
        struct event *accept_event;
        gdouble next_check;
 
@@ -1368,16 +1368,18 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
        /* Start listening */
        cur = worker->cf->listen_socks;
        while (cur) {
-               listen_socket = GPOINTER_TO_INT (cur->data);
-               if (listen_socket != -1) {
+               ls = cur->data;
+
+               if (ls->fd != -1) {
                        accept_event = g_slice_alloc0 (sizeof (struct event));
-                       event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+                       event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
                                        accept_fuzzy_socket, worker);
                        event_base_set (ctx->ev_base, accept_event);
                        event_add (accept_event, NULL);
                        worker->accept_events = g_list_prepend (worker->accept_events,
                                        accept_event);
                }
+
                cur = g_list_next (cur);
        }
 
index 1292d6460c7ef856921154f6e8dd98baf63e8c7d..127cf278546ef2e84b5b92c66efbe3cf64f036e6 100644 (file)
@@ -33,7 +33,7 @@ worker_t hs_helper_worker = {
                init_hs_helper,             /* Init function */
                start_hs_helper,            /* Start function */
                RSPAMD_WORKER_UNIQUE|RSPAMD_WORKER_KILLABLE|RSPAMD_WORKER_ALWAYS_START,
-               SOCK_STREAM,                /* TCP socket */
+               RSPAMD_WORKER_SOCKET_NONE,  /* No socket */
                RSPAMD_WORKER_VER           /* Version info */
 };
 
index 740c4cd7afeb753e4bf361e216e8864818c5cbdc..d3e296d6df3eceeca4860f215a11ed474f43577d 100644 (file)
@@ -250,7 +250,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
        struct event_base *ev_base;
        struct event *accept_event;
        GList *cur;
-       gint listen_socket;
+       struct rspamd_worker_listen_socket *ls;
 
 #ifdef WITH_PROFILER
        extern void _start (void), etext (void);
@@ -271,17 +271,20 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
        /* Accept all sockets */
        if (accept_handler) {
                cur = worker->cf->listen_socks;
+
                while (cur) {
-                       listen_socket = GPOINTER_TO_INT (cur->data);
-                       if (listen_socket != -1) {
+                       ls = cur->data;
+
+                       if (ls->fd != -1) {
                                accept_event = g_slice_alloc0 (sizeof (struct event));
-                               event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+                               event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
                                                accept_handler, worker);
                                event_base_set (ev_base, accept_event);
                                event_add (accept_event, NULL);
                                worker->accept_events = g_list_prepend (worker->accept_events,
                                                accept_event);
                        }
+
                        cur = g_list_next (cur);
                }
        }
index d01e102c68e8e7056fc625a0893470f33c4b1947..c4cbeb73c0275150847cac04b379f025dcfa7d99 100644 (file)
@@ -37,8 +37,8 @@ worker_t log_helper_worker = {
                init_log_helper,             /* Init function */
                start_log_helper,            /* Start function */
                RSPAMD_WORKER_UNIQUE | RSPAMD_WORKER_KILLABLE,
-               SOCK_STREAM,                /* TCP socket */
-               RSPAMD_WORKER_VER           /* Version info */
+               RSPAMD_WORKER_SOCKET_NONE,   /* No socket */
+               RSPAMD_WORKER_VER            /* Version info */
 };
 
 static const guint64 rspamd_log_helper_magic = 0x1090bb46aaa74c9aULL;
index 87f597e28fea1577e4c22c3fd6c93f450c232c06..df6970efac302556393eff84a8e9eec91419e638 100644 (file)
@@ -39,12 +39,12 @@ gpointer init_lua_worker (struct rspamd_config *cfg);
 void start_lua_worker (struct rspamd_worker *worker);
 
 worker_t lua_worker = {
-       "lua",                  /* Name */
-       init_lua_worker,        /* Init function */
-       start_lua_worker,       /* Start function */
+       "lua",                     /* Name */
+       init_lua_worker,           /* Init function */
+       start_lua_worker,          /* Start function */
        RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
-       SOCK_STREAM,            /* TCP socket */
-       RSPAMD_WORKER_VER       /* Version info */
+       RSPAMD_WORKER_SOCKET_TCP,  /* TCP socket */
+       RSPAMD_WORKER_VER          /* Version info */
 };
 
 static const guint64 rspamd_lua_ctx_magic = 0x8055e2652aacf96eULL;
index 72e676267fe6ef121783dd084d51a14818eb5791..99fa6ca392aff8a61960807123bac163c11b36ca 100644 (file)
@@ -340,20 +340,38 @@ rspamd_fork_delayed (struct rspamd_worker_conf *cf,
 }
 
 static GList *
-create_listen_socket (GPtrArray *addrs, guint cnt, gint listen_type)
+create_listen_socket (GPtrArray *addrs, guint cnt,
+               enum rspamd_worker_socket_type listen_type)
 {
        GList *result = NULL;
        gint fd;
        guint i;
-       gpointer p;
+       struct rspamd_worker_listen_socket *ls;
 
        g_ptr_array_sort (addrs, rspamd_inet_address_compare_ptr);
        for (i = 0; i < cnt; i ++) {
-               fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i),
-                               listen_type, TRUE);
-               if (fd != -1) {
-                       p = GINT_TO_POINTER (fd);
-                       result = g_list_prepend (result, p);
+
+               if (listen_type & RSPAMD_WORKER_SOCKET_TCP) {
+                       fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i),
+                                       SOCK_STREAM, TRUE);
+                       if (fd != -1) {
+                               ls = g_slice_alloc0 (sizeof (*ls));
+                               ls->addr = g_ptr_array_index (addrs, i);
+                               ls->fd = fd;
+                               ls->type = RSPAMD_WORKER_SOCKET_TCP;
+                               result = g_list_prepend (result, ls);
+                       }
+               }
+               if (listen_type & RSPAMD_WORKER_SOCKET_UDP) {
+                       fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i),
+                                       SOCK_DGRAM, TRUE);
+                       if (fd != -1) {
+                               ls = g_slice_alloc0 (sizeof (*ls));
+                               ls->addr = g_ptr_array_index (addrs, i);
+                               ls->fd = fd;
+                               ls->type = RSPAMD_WORKER_SOCKET_TCP;
+                               result = g_list_prepend (result, ls);
+                       }
                }
        }
 
@@ -492,7 +510,7 @@ spawn_workers (struct rspamd_main *rspamd_main, struct event_base *ev_base)
                                        key = make_listen_key (bcf);
                                        if ((p =
                                                g_hash_table_lookup (listen_sockets,
-                                               GINT_TO_POINTER (key))) == NULL) {
+                                                               GINT_TO_POINTER (key))) == NULL) {
 
                                                if (!bcf->is_systemd) {
                                                        /* Create listen socket */
index bd4ca968486f9418b895a05827737e07a4a70e95..c0c60185d9681bae3f7016872d8b81ecb7745b5e 100644 (file)
@@ -165,12 +165,24 @@ enum rspamd_worker_flags {
        RSPAMD_WORKER_ALWAYS_START = (1 << 4),
 };
 
+enum rspamd_worker_socket_type {
+       RSPAMD_WORKER_SOCKET_NONE = 0,
+       RSPAMD_WORKER_SOCKET_TCP = (1 << 0),
+       RSPAMD_WORKER_SOCKET_UDP = (1 << 1),
+};
+
+struct rspamd_worker_listen_socket {
+       const rspamd_inet_addr_t *addr;
+       gint fd;
+       enum rspamd_worker_socket_type type;
+};
+
 typedef struct worker_s {
        const gchar *name;
        gpointer (*worker_init_func)(struct rspamd_config *cfg);
        void (*worker_start_func)(struct rspamd_worker *worker);
        enum rspamd_worker_flags flags;
-       gint listen_type;
+       enum rspamd_worker_socket_type listen_type;
        guint worker_version;
        guint64 rspamd_version;
        const gchar *rspamd_features;
index 32a2937e859877bddcd8ad5493683e5963e879a6..247c856f4017f1b5612282a43de73400cb182402 100644 (file)
@@ -57,7 +57,7 @@ worker_t rspamd_proxy_worker = {
        init_rspamd_proxy,            /* Init function */
        start_rspamd_proxy,           /* Start function */
        RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
-       SOCK_STREAM,                /* TCP socket */
+       RSPAMD_WORKER_SOCKET_TCP,    /* TCP socket */
        RSPAMD_WORKER_VER
 };
 
index 0202e3cfea68005f26fdab599a8c3fb812d440d4..6b0a4fe2e898a3b00a883647736007a528144884 100644 (file)
@@ -44,7 +44,7 @@ worker_t smtp_proxy_worker = {
        init_smtp_proxy,            /* Init function */
        start_smtp_proxy,           /* Start function */
        RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
-       SOCK_STREAM,                /* TCP socket */
+       RSPAMD_WORKER_SOCKET_TCP,   /* TCP socket */
        RSPAMD_WORKER_VER           /* Version info */
 };
 
index 735cbc6a9ca27761a1d167e9a3ddc7cfc2f67436..d3e3fc4c87d7e4b8f0bb1a0eeca4e46ebaf784cd 100644 (file)
@@ -50,7 +50,7 @@ worker_t normal_worker = {
                init_worker,                /* Init function */
                start_worker,               /* Start function */
                RSPAMD_WORKER_HAS_SOCKET|RSPAMD_WORKER_KILLABLE,
-               SOCK_STREAM,                /* TCP socket */
+               RSPAMD_WORKER_SOCKET_TCP,   /* TCP socket */
                RSPAMD_WORKER_VER           /* Version info */
 };