]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Support input vectorisation by recvmmsg call
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 2 Jun 2020 11:44:21 +0000 (12:44 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 2 Jun 2020 11:44:21 +0000 (12:44 +0100)
src/fuzzy_storage.c

index 5eb403ce79f80f1444f29542e63e1f30d42d4ba6..c25a38f393e4827a389783276b0f24fa927eeeb4 100644 (file)
@@ -1328,6 +1328,13 @@ fuzzy_session_destroy (gpointer d)
        g_free (session);
 }
 
+#define FUZZY_INPUT_BUFLEN 1024
+#ifdef HAVE_RECVMMSG
+#define MSGVEC_LEN 16
+#else
+#define MSGVEC_LEN 1
+#endif
+
 /*
  * Accept new connection and construct task
  */
@@ -1337,19 +1344,42 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
        struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
        struct fuzzy_session *session;
        rspamd_inet_addr_t *addr;
-       gssize r;
-       guint8 buf[512];
+       gssize r, msg_len;
        guint64 *nerrors;
+       struct iovec iovs[MSGVEC_LEN];
+       guint8 bufs[MSGVEC_LEN][FUZZY_INPUT_BUFLEN];
+       struct sockaddr_storage peer_sa[MSGVEC_LEN];
+       socklen_t salen = sizeof (peer_sa[0]);
+#ifdef HAVE_RECVMMSG
+#define MSG_FIELD(msg, field) msg.msg_hdr.field
+       struct mmsghdr msg[MSGVEC_LEN];
+#else
+#define MSG_FIELD(msg, field) msg.field
+       struct msghdr msg[MSGVEC_LEN];
+#endif
+
+       memset (msg, 0, sizeof (*msg) * MSGVEC_LEN);
+
+       /* Prepare messages to receive */
+       for (int i = 0; i < MSGVEC_LEN; i ++) {
+               /* Prepare msghdr structs */
+               iovs[i].iov_base = bufs[i];
+               iovs[i].iov_len = sizeof (bufs[i]);
+               MSG_FIELD(msg[i], msg_name) = (void *)&peer_sa[i];
+               MSG_FIELD(msg[i], msg_namelen) = salen;
+               MSG_FIELD(msg[i], msg_iov) = &iovs[i];
+               MSG_FIELD(msg[i], msg_iovlen) = 1;
+       }
 
        /* Got some data */
        if (revents == EV_READ) {
 
                for (;;) {
-                       r = rspamd_inet_address_recvfrom (w->fd,
-                                       buf,
-                                       sizeof (buf),
-                                       0,
-                                       &addr);
+#ifdef HAVE_RECVMMSG
+                       r = recvmmsg (w->fd, msg, MSGVEC_LEN, 0, NULL);
+#else
+                       r = recvmsg (w->fd, msg, 0);
+#endif
 
                        if (r == -1) {
                                if (errno == EINTR) {
@@ -1366,40 +1396,58 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
                                return;
                        }
 
-                       session = g_malloc0 (sizeof (*session));
-                       REF_INIT_RETAIN (session, fuzzy_session_destroy);
-                       session->worker = worker;
-                       session->fd = w->fd;
-                       session->ctx = worker->ctx;
-                       session->time = (guint64) time (NULL);
-                       session->addr = addr;
-                       worker->nconns++;
-
-                       if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
-                               /* Check shingles count sanity */
-                               rspamd_fuzzy_process_command (session);
-                       }
-                       else {
-                               /* Discard input */
-                               session->ctx->stat.invalid_requests ++;
-                               msg_debug ("invalid fuzzy command of size %z received", r);
-
-                               nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
-                                               addr, -1);
-
-                               if (nerrors == NULL) {
-                                       nerrors = g_malloc (sizeof (*nerrors));
-                                       *nerrors = 1;
-                                       rspamd_lru_hash_insert (session->ctx->errors_ips,
-                                                       rspamd_inet_address_copy (addr),
-                                                       nerrors, -1, -1);
+#ifndef HAVE_RECVMMSG
+                       msg_len = r; /* Save real length in bytes here */
+                       r = 1; /* Assume that we have received a single message */
+#endif
+
+                       for (int i = 0; i < r; i ++) {
+                               session = g_malloc0 (sizeof (*session));
+                               REF_INIT_RETAIN (session, fuzzy_session_destroy);
+                               session->worker = worker;
+                               session->fd = w->fd;
+                               session->ctx = worker->ctx;
+                               session->time = (guint64) time (NULL);
+                               session->addr = rspamd_inet_address_from_sa (MSG_FIELD(msg[i], msg_name),
+                                               MSG_FIELD(msg[i], msg_namelen));
+
+                               /* Each message can have its length in case of recvmmsg */
+#ifdef HAVE_RECVMMSG
+                               msg_len = msg[i].msg_len;
+#endif
+
+                               if (rspamd_fuzzy_cmd_from_wire (iovs[i].iov_base,
+                                               msg_len, session)) {
+                                       /* Check shingles count sanity */
+                                       worker->nconns++;
+                                       rspamd_fuzzy_process_command (session);
                                }
                                else {
-                                       *nerrors = *nerrors + 1;
+                                       /* Discard input */
+                                       session->ctx->stat.invalid_requests ++;
+                                       msg_debug ("invalid fuzzy command of size %z received", r);
+
+                                       nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
+                                                       addr, -1);
+
+                                       if (nerrors == NULL) {
+                                               nerrors = g_malloc (sizeof (*nerrors));
+                                               *nerrors = 1;
+                                               rspamd_lru_hash_insert (session->ctx->errors_ips,
+                                                               rspamd_inet_address_copy (addr),
+                                                               nerrors, -1, -1);
+                                       }
+                                       else {
+                                               *nerrors = *nerrors + 1;
+                                       }
                                }
-                       }
 
-                       REF_RELEASE (session);
+                               REF_RELEASE (session);
+                       }
+#ifdef HAVE_RECVMMSG
+                       /* Stop reading as we are using recvmmsg instead of recvmsg */
+                       break;
+#endif
                }
        }
 }