From 44aa58ddceed5d33c0395d42335ce71e1c865e96 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 2 Jun 2020 12:44:21 +0100 Subject: [PATCH] [Feature] Support input vectorisation by recvmmsg call --- src/fuzzy_storage.c | 122 ++++++++++++++++++++++++++++++-------------- 1 file changed, 85 insertions(+), 37 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 5eb403ce7..c25a38f39 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -1328,6 +1328,13 @@ fuzzy_session_destroy (gpointer d) g_free (session); } +#define FUZZY_INPUT_BUFLEN 1024 +#ifdef HAVE_RECVMMSG +#define MSGVEC_LEN 16 +#else +#define MSGVEC_LEN 1 +#endif + /* * Accept new connection and construct task */ @@ -1337,19 +1344,42 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents) struct rspamd_worker *worker = (struct rspamd_worker *)w->data; struct fuzzy_session *session; rspamd_inet_addr_t *addr; - gssize r; - guint8 buf[512]; + gssize r, msg_len; guint64 *nerrors; + struct iovec iovs[MSGVEC_LEN]; + guint8 bufs[MSGVEC_LEN][FUZZY_INPUT_BUFLEN]; + struct sockaddr_storage peer_sa[MSGVEC_LEN]; + socklen_t salen = sizeof (peer_sa[0]); +#ifdef HAVE_RECVMMSG +#define MSG_FIELD(msg, field) msg.msg_hdr.field + struct mmsghdr msg[MSGVEC_LEN]; +#else +#define MSG_FIELD(msg, field) msg.field + struct msghdr msg[MSGVEC_LEN]; +#endif + + memset (msg, 0, sizeof (*msg) * MSGVEC_LEN); + + /* Prepare messages to receive */ + for (int i = 0; i < MSGVEC_LEN; i ++) { + /* Prepare msghdr structs */ + iovs[i].iov_base = bufs[i]; + iovs[i].iov_len = sizeof (bufs[i]); + MSG_FIELD(msg[i], msg_name) = (void *)&peer_sa[i]; + MSG_FIELD(msg[i], msg_namelen) = salen; + MSG_FIELD(msg[i], msg_iov) = &iovs[i]; + MSG_FIELD(msg[i], msg_iovlen) = 1; + } /* Got some data */ if (revents == EV_READ) { for (;;) { - r = rspamd_inet_address_recvfrom (w->fd, - buf, - sizeof (buf), - 0, - &addr); +#ifdef HAVE_RECVMMSG + r = recvmmsg (w->fd, msg, MSGVEC_LEN, 0, NULL); +#else + r = recvmsg (w->fd, msg, 0); +#endif if (r == -1) { if (errno == EINTR) { @@ -1366,40 +1396,58 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents) return; } - session = g_malloc0 (sizeof (*session)); - REF_INIT_RETAIN (session, fuzzy_session_destroy); - session->worker = worker; - session->fd = w->fd; - session->ctx = worker->ctx; - session->time = (guint64) time (NULL); - session->addr = addr; - worker->nconns++; - - if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) { - /* Check shingles count sanity */ - rspamd_fuzzy_process_command (session); - } - else { - /* Discard input */ - session->ctx->stat.invalid_requests ++; - msg_debug ("invalid fuzzy command of size %z received", r); - - nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips, - addr, -1); - - if (nerrors == NULL) { - nerrors = g_malloc (sizeof (*nerrors)); - *nerrors = 1; - rspamd_lru_hash_insert (session->ctx->errors_ips, - rspamd_inet_address_copy (addr), - nerrors, -1, -1); +#ifndef HAVE_RECVMMSG + msg_len = r; /* Save real length in bytes here */ + r = 1; /* Assume that we have received a single message */ +#endif + + for (int i = 0; i < r; i ++) { + session = g_malloc0 (sizeof (*session)); + REF_INIT_RETAIN (session, fuzzy_session_destroy); + session->worker = worker; + session->fd = w->fd; + session->ctx = worker->ctx; + session->time = (guint64) time (NULL); + session->addr = rspamd_inet_address_from_sa (MSG_FIELD(msg[i], msg_name), + MSG_FIELD(msg[i], msg_namelen)); + + /* Each message can have its length in case of recvmmsg */ +#ifdef HAVE_RECVMMSG + msg_len = msg[i].msg_len; +#endif + + if (rspamd_fuzzy_cmd_from_wire (iovs[i].iov_base, + msg_len, session)) { + /* Check shingles count sanity */ + worker->nconns++; + rspamd_fuzzy_process_command (session); } else { - *nerrors = *nerrors + 1; + /* Discard input */ + session->ctx->stat.invalid_requests ++; + msg_debug ("invalid fuzzy command of size %z received", r); + + nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips, + addr, -1); + + if (nerrors == NULL) { + nerrors = g_malloc (sizeof (*nerrors)); + *nerrors = 1; + rspamd_lru_hash_insert (session->ctx->errors_ips, + rspamd_inet_address_copy (addr), + nerrors, -1, -1); + } + else { + *nerrors = *nerrors + 1; + } } - } - REF_RELEASE (session); + REF_RELEASE (session); + } +#ifdef HAVE_RECVMMSG + /* Stop reading as we are using recvmmsg instead of recvmsg */ + break; +#endif } } } -- 2.39.5