ctx = worker->ctx;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
struct fuzzy_master_update_session *session;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
struct rspamd_fuzzy_storage_ctx *ctx = ud;
GList *cur;
struct rspamd_worker_listen_socket *ls;
- struct event *accept_event;
+ struct event *accept_events;
gdouble next_check;
ctx->peer_fd = rep_fd;
if (ls->fd != -1) {
if (ls->type == RSPAMD_WORKER_SOCKET_UDP) {
- accept_event = g_slice_alloc0 (sizeof (struct event));
- event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+ accept_events = g_slice_alloc0 (sizeof (struct event) * 2);
+ event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
accept_fuzzy_socket, worker);
- event_base_set (ctx->ev_base, accept_event);
- event_add (accept_event, NULL);
+ event_base_set (ctx->ev_base, &accept_events[0]);
+ event_add (&accept_events[0], NULL);
worker->accept_events = g_list_prepend (worker->accept_events,
- accept_event);
+ accept_events);
}
else if (worker->index == 0) {
/* We allow TCP listeners only for a update worker */
- accept_event = g_slice_alloc0 (sizeof (struct event));
- event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+ accept_events = g_slice_alloc0 (sizeof (struct event) * 2);
+ event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
accept_fuzzy_mirror_socket, worker);
- event_base_set (ctx->ev_base, accept_event);
- event_add (accept_event, NULL);
+ event_base_set (ctx->ev_base, &accept_events[0]);
+ event_add (&accept_events[0], NULL);
worker->accept_events = g_list_prepend (worker->accept_events,
- accept_event);
+ accept_events);
}
}
void (*accept_handler)(int, short, void *))
{
struct event_base *ev_base;
- struct event *accept_event;
+ struct event *accept_events;
GList *cur;
struct rspamd_worker_listen_socket *ls;
ls = cur->data;
if (ls->fd != -1) {
- accept_event = g_slice_alloc0 (sizeof (struct event));
- event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+ accept_events = g_slice_alloc0 (sizeof (struct event) * 2);
+ event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
accept_handler, worker);
- event_base_set (ev_base, accept_event);
- event_add (accept_event, NULL);
+ event_base_set (ev_base, &accept_events[0]);
+ event_add (&accept_events[0], NULL);
worker->accept_events = g_list_prepend (worker->accept_events,
- accept_event);
+ accept_events);
}
cur = g_list_next (cur);
rspamd_worker_stop_accept (struct rspamd_worker *worker)
{
GList *cur;
- struct event *event;
+ struct event *events;
GHashTableIter it;
struct rspamd_worker_signal_handler *sigh;
gpointer k, v;
/* Remove all events */
cur = worker->accept_events;
while (cur) {
- event = cur->data;
- event_del (event);
+ events = cur->data;
+ event_del (&events[0]);
+ event_del (&events[1]);
cur = g_list_next (cur);
- g_slice_free1 (sizeof (struct event), event);
+ g_slice_free1 (sizeof (struct event) * 2, events);
}
if (worker->accept_events != NULL) {
return ret;
}
+static void
+rspamd_enable_accept_event (gint fd, short what, gpointer d)
+{
+ struct event *events = d;
+
+ event_del (&events[1]);
+ event_add (&events[0], NULL);
+}
+
+static void
+rspamd_disable_accept_events (gint sock, GList *accept_events)
+{
+ GList *cur;
+ struct event *events;
+ const gdouble throttling = 0.5;
+ struct timeval tv;
+ struct event_base *ev_base;
+
+ double_to_tv (throttling, &tv);
+
+ for (cur = accept_events; cur != NULL; cur = g_list_next (cur)) {
+ events = cur->data;
+
+ ev_base = event_get_base (&events[0]);
+ event_del (&events[0]);
+ event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
+ events);
+ event_base_set (ev_base, &events[1]);
+ event_add (&events[1], &tv);
+ }
+}
+
gint
-rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target)
+rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **target,
+ GList *accept_events)
{
gint nfd, serrno;
union sa_union su;
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
return 0;
}
+ else if (errno == EMFILE || errno == ENFILE) {
+ /* Temporary disable accept event */
+ rspamd_disable_accept_events (sock, accept_events);
+
+ return 0;
+ }
+
return -1;
}
/**
* Accept from listening socket filling addr structure
* @param sock listening socket
- * @param addr allocated inet addr structur
+ * @param addr allocated inet addr structure
+ * @param accept_events events for accepting new sockets
* @return
*/
-gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr);
+gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t **addr,
+ GList *accept_events);
/**
* Parse host[:port[:priority]] line
L = ctx->L;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
gint nfd;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
rspamd_fprintf (stderr, "accept failed: %s", strerror (errno));
return;
}
gint nfd;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
msg_warn_main ("accept failed: %s", strerror (errno));
return;
}
ctx = worker->ctx;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
ctx = worker->ctx;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
}
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
gint nfd;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
do {
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
rspamd_fprintf (stderr, "accept failed: %s", strerror (errno));
return;
}