From: Vsevolod Stakhov Date: Thu, 9 Jun 2016 15:35:31 +0000 (+0100) Subject: [Feature] Add protection against open files limit and accepting sockets X-Git-Tag: 1.3.0~388 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=0988e1b1b1d0a3a82728df658d834aba199baf4c;p=rspamd.git [Feature] Add protection against open files limit and accepting sockets --- diff --git a/src/controller.c b/src/controller.c index 91f4cfed6..f5efb4535 100644 --- a/src/controller.c +++ b/src/controller.c @@ -2216,7 +2216,7 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg) ctx = worker->ctx; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { msg_warn_ctx ("accept failed: %s", strerror (errno)); return; } diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index bd888fd3c..d96346ce4 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -1161,7 +1161,7 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg) struct fuzzy_master_update_session *session; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } @@ -2006,7 +2006,7 @@ fuzzy_peer_rep (struct rspamd_worker *worker, struct rspamd_fuzzy_storage_ctx *ctx = ud; GList *cur; struct rspamd_worker_listen_socket *ls; - struct event *accept_event; + struct event *accept_events; gdouble next_check; ctx->peer_fd = rep_fd; @@ -2026,23 +2026,23 @@ fuzzy_peer_rep (struct rspamd_worker *worker, if (ls->fd != -1) { if (ls->type == RSPAMD_WORKER_SOCKET_UDP) { - accept_event = g_slice_alloc0 (sizeof (struct event)); - event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, + accept_events = g_slice_alloc0 (sizeof (struct event) * 2); + event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, accept_fuzzy_socket, worker); - event_base_set (ctx->ev_base, accept_event); - event_add (accept_event, NULL); + event_base_set (ctx->ev_base, &accept_events[0]); + event_add (&accept_events[0], NULL); worker->accept_events = g_list_prepend (worker->accept_events, - accept_event); + accept_events); } else if (worker->index == 0) { /* We allow TCP listeners only for a update worker */ - accept_event = g_slice_alloc0 (sizeof (struct event)); - event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, + accept_events = g_slice_alloc0 (sizeof (struct event) * 2); + event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, accept_fuzzy_mirror_socket, worker); - event_base_set (ctx->ev_base, accept_event); - event_add (accept_event, NULL); + event_base_set (ctx->ev_base, &accept_events[0]); + event_add (&accept_events[0], NULL); worker->accept_events = g_list_prepend (worker->accept_events, - accept_event); + accept_events); } } diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index d3e296d6d..b50b0384f 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -248,7 +248,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, void (*accept_handler)(int, short, void *)) { struct event_base *ev_base; - struct event *accept_event; + struct event *accept_events; GList *cur; struct rspamd_worker_listen_socket *ls; @@ -276,13 +276,13 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, ls = cur->data; if (ls->fd != -1) { - accept_event = g_slice_alloc0 (sizeof (struct event)); - event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, + accept_events = g_slice_alloc0 (sizeof (struct event) * 2); + event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, accept_handler, worker); - event_base_set (ev_base, accept_event); - event_add (accept_event, NULL); + event_base_set (ev_base, &accept_events[0]); + event_add (&accept_events[0], NULL); worker->accept_events = g_list_prepend (worker->accept_events, - accept_event); + accept_events); } cur = g_list_next (cur); @@ -296,7 +296,7 @@ void rspamd_worker_stop_accept (struct rspamd_worker *worker) { GList *cur; - struct event *event; + struct event *events; GHashTableIter it; struct rspamd_worker_signal_handler *sigh; gpointer k, v; @@ -305,10 +305,11 @@ rspamd_worker_stop_accept (struct rspamd_worker *worker) /* Remove all events */ cur = worker->accept_events; while (cur) { - event = cur->data; - event_del (event); + events = cur->data; + event_del (&events[0]); + event_del (&events[1]); cur = g_list_next (cur); - g_slice_free1 (sizeof (struct event), event); + g_slice_free1 (sizeof (struct event) * 2, events); } if (worker->accept_events != NULL) { diff --git a/src/libutil/addr.c b/src/libutil/addr.c index 5c5a51e13..3fee0b4bc 100644 --- a/src/libutil/addr.c +++ b/src/libutil/addr.c @@ -199,8 +199,41 @@ rspamd_ip_is_valid (const rspamd_inet_addr_t *addr) return ret; } +static void +rspamd_enable_accept_event (gint fd, short what, gpointer d) +{ + struct event *events = d; + + event_del (&events[1]); + event_add (&events[0], NULL); +} + +static void +rspamd_disable_accept_events (gint sock, GList *accept_events) +{ + GList *cur; + struct event *events; + const gdouble throttling = 0.5; + struct timeval tv; + struct event_base *ev_base; + + double_to_tv (throttling, &tv); + + for (cur = accept_events; cur != NULL; cur = g_list_next (cur)) { + events = cur->data; + + ev_base = event_get_base (&events[0]); + event_del (&events[0]); + event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event, + events); + event_base_set (ev_base, &events[1]); + event_add (&events[1], &tv); + } +} + gint -rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target) +rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target, + GList *accept_events) { gint nfd, serrno; union sa_union su; @@ -215,6 +248,13 @@ rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target) if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { return 0; } + else if (errno == EMFILE || errno == ENFILE) { + /* Temporary disable accept event */ + rspamd_disable_accept_events (sock, accept_events); + + return 0; + } + return -1; } diff --git a/src/libutil/addr.h b/src/libutil/addr.h index bb9fd2573..200543d6f 100644 --- a/src/libutil/addr.h +++ b/src/libutil/addr.h @@ -193,10 +193,12 @@ gboolean rspamd_ip_is_valid (const rspamd_inet_addr_t *addr); /** * Accept from listening socket filling addr structure * @param sock listening socket - * @param addr allocated inet addr structur + * @param addr allocated inet addr structure + * @param accept_events events for accepting new sockets * @return */ -gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr); +gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr, + GList *accept_events); /** * Parse host[:port[:priority]] line diff --git a/src/lua_worker.c b/src/lua_worker.c index df6970efa..b74b8d422 100644 --- a/src/lua_worker.c +++ b/src/lua_worker.c @@ -261,7 +261,7 @@ lua_accept_socket (gint fd, short what, void *arg) L = ctx->L; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c index f0d337ad9..f96aa2890 100644 --- a/src/rspamadm/lua_repl.c +++ b/src/rspamadm/lua_repl.c @@ -482,7 +482,7 @@ rspamadm_lua_accept_cb (gint fd, short what, void *arg) gint nfd; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, NULL)) == -1) { rspamd_fprintf (stderr, "accept failed: %s", strerror (errno)); return; } diff --git a/src/rspamd.c b/src/rspamd.c index b6b5a271c..922327f38 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -959,7 +959,7 @@ rspamd_control_handler (gint fd, short what, gpointer arg) gint nfd; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, NULL)) == -1) { msg_warn_main ("accept failed: %s", strerror (errno)); return; } diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 7ca0cc51d..d2ac90aa0 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -1232,7 +1232,7 @@ proxy_accept_socket (gint fd, short what, void *arg) ctx = worker->ctx; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c index 6b0a4fe2e..8eebc2c86 100644 --- a/src/smtp_proxy.c +++ b/src/smtp_proxy.c @@ -902,7 +902,7 @@ accept_socket (gint fd, short what, void *arg) ctx = worker->ctx; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } diff --git a/src/worker.c b/src/worker.c index ac104f7f0..81c5b1c63 100644 --- a/src/worker.c +++ b/src/worker.c @@ -266,7 +266,7 @@ accept_socket (gint fd, short what, void *arg) } if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { msg_warn_ctx ("accept failed: %s", strerror (errno)); return; } diff --git a/test/rspamd_http_test.c b/test/rspamd_http_test.c index 428c510c4..88ac48676 100644 --- a/test/rspamd_http_test.c +++ b/test/rspamd_http_test.c @@ -55,7 +55,7 @@ rspamd_server_accept (gint fd, short what, void *arg) gint nfd; if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, NULL)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } diff --git a/utils/rspamd_http_server.c b/utils/rspamd_http_server.c index ad01085df..9af96fad9 100644 --- a/utils/rspamd_http_server.c +++ b/utils/rspamd_http_server.c @@ -134,7 +134,7 @@ rspamd_server_accept (gint fd, short what, void *arg) do { if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr, NULL)) == -1) { rspamd_fprintf (stderr, "accept failed: %s", strerror (errno)); return; }