]> source.dussan.org Git - rspamd.git/commitdiff
Implement delayed writing for fuzzy replies.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 7 Nov 2015 13:22:33 +0000 (13:22 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 7 Nov 2015 13:22:33 +0000 (13:22 +0000)
src/fuzzy_storage.c

index 0dfb74f257ef39f33a87bc195e83a8c254c79aa3..dd23c6112bfad5bc4d404dd2b15054938658ff4d 100644 (file)
@@ -39,6 +39,7 @@
 #include "cryptobox.h"
 #include "keypairs_cache.h"
 #include "keypair_private.h"
+#include "ref.h"
 
 /* This number is used as expire time in seconds for cache items  (2 days) */
 #define DEFAULT_EXPIRE 172800L
@@ -92,6 +93,8 @@ enum fuzzy_cmd_type {
 
 struct fuzzy_session {
        struct rspamd_worker *worker;
+       rspamd_inet_addr_t *addr;
+       struct rspamd_fuzzy_storage_ctx *ctx;
 
        union {
                struct rspamd_fuzzy_encrypted_shingle_cmd enc_shingle;
@@ -99,15 +102,20 @@ struct fuzzy_session {
                struct rspamd_fuzzy_cmd normal;
                struct rspamd_fuzzy_shingle_cmd shingle;
        } cmd;
+
+       struct rspamd_fuzzy_encrypted_reply reply;
+
        enum rspamd_fuzzy_epoch epoch;
        enum fuzzy_cmd_type cmd_type;
        gint fd;
        guint64 time;
-       rspamd_inet_addr_t *addr;
-       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct event io;
+       ref_entry_t ref;
        guchar nm[rspamd_cryptobox_MAX_NMBYTES];
 };
 
+static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
+
 static gboolean
 rspamd_fuzzy_check_client (struct fuzzy_session *session)
 {
@@ -121,17 +129,43 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session)
 }
 
 static void
-rspamd_fuzzy_write_reply (struct fuzzy_session *session,
-               gconstpointer data, gsize len)
+rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
+{
+       struct fuzzy_session *session = d;
+
+       rspamd_fuzzy_write_reply (session);
+       REF_RELEASE (session);
+}
+
+static void
+rspamd_fuzzy_write_reply (struct fuzzy_session *session)
 {
        gssize r;
+       gsize len;
+       gconstpointer data;
+
+       if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
+                               session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
+               /* Encrypted reply */
+               data = &session->reply;
+               len = sizeof (session->reply);
+       }
+       else {
+               data = &session->reply.rep;
+               len = sizeof (session->reply.rep);
+       }
 
        r = rspamd_inet_address_sendto (session->fd, data, len, 0,
                        session->addr);
 
        if (r == -1) {
-               if (errno == EINTR) {
-                       rspamd_fuzzy_write_reply (session, data, len);
+               if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+                       /* Grab reference to avoid early destruction */
+                       REF_RETAIN (session);
+                       event_set (&session->io, session->fd, EV_WRITE,
+                                       rspamd_fuzzy_reply_io, session);
+                       event_base_set (session->ctx->ev_base, &session->io);
+                       event_add (&session->io, NULL);
                }
                else {
                        msg_err ("error while writing reply: %s", strerror (errno));
@@ -144,7 +178,6 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
 {
        gboolean res = FALSE, encrypted = FALSE;
        struct rspamd_fuzzy_cmd *cmd;
-       struct rspamd_fuzzy_encrypted_reply rep;
        struct rspamd_fuzzy_reply result;
 
        switch (session->cmd_type) {
@@ -201,18 +234,20 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
        }
 
        result.tag = cmd->tag;
-       memcpy (&rep.rep, &result, sizeof (result));
+       memcpy (&session->reply.rep, &result, sizeof (result));
 
        if (encrypted) {
                /* We need also to encrypt reply */
-               ottery_rand_bytes (rep.hdr.nonce, sizeof (rep.hdr.nonce));
-               rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&rep.rep, sizeof (rep.rep),
-                               rep.hdr.nonce, session->nm, rep.hdr.mac);
-               rspamd_fuzzy_write_reply (session, &rep, sizeof (rep));
-       }
-       else {
-               rspamd_fuzzy_write_reply (session, &rep.rep, sizeof (rep.rep));
+               ottery_rand_bytes (session->reply.hdr.nonce,
+                               sizeof (session->reply.hdr.nonce));
+               rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
+                               sizeof (session->reply.rep),
+                               session->reply.hdr.nonce,
+                               session->nm,
+                               session->reply.hdr.mac);
        }
+
+       rspamd_fuzzy_write_reply (session);
 }
 
 
@@ -370,6 +405,17 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
        return TRUE;
 }
 
+static void
+fuzzy_session_destroy (gpointer d)
+{
+       struct fuzzy_session *session = d;
+
+       rspamd_inet_address_destroy (session->addr);
+       rspamd_explicit_memzero (session->nm, sizeof (session->nm));
+       session->worker->nconns--;
+       g_slice_free1 (sizeof (*session), session);
+}
+
 /*
  * Accept new connection and construct task
  */
@@ -377,15 +423,11 @@ static void
 accept_fuzzy_socket (gint fd, short what, void *arg)
 {
        struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       struct fuzzy_session session;
+       struct fuzzy_session *session;
+       rspamd_inet_addr_t *addr;
        gssize r;
        guint8 buf[512];
 
-       session.worker = worker;
-       session.fd = fd;
-       session.ctx = worker->ctx;
-       session.time = (guint64)time (NULL);
-
        /* Got some data */
        if (what == EV_READ) {
 
@@ -396,13 +438,14 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
                                        buf,
                                        sizeof (buf),
                                        0,
-                                       &session.addr);
+                                       &addr);
 
                        if (r == -1) {
                                if (errno == EINTR) {
                                        continue;
                                }
-                               else if (errno == EAGAIN) {
+                               else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+
                                        return;
                                }
 
@@ -412,9 +455,17 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
                                return;
                        }
 
-                       if (rspamd_fuzzy_cmd_from_wire (buf, r, &session)) {
+                       session = g_slice_alloc0 (sizeof (*session));
+                       REF_INIT_RETAIN (session, fuzzy_session_destroy);
+                       session->worker = worker;
+                       session->fd = fd;
+                       session->ctx = worker->ctx;
+                       session->time = (guint64) time (NULL);
+                       session->addr = addr;
+
+                       if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
                                /* Check shingles count sanity */
-                               rspamd_fuzzy_process_command (&session);
+                               rspamd_fuzzy_process_command (session);
                        }
                        else {
                                /* Discard input */
@@ -422,9 +473,7 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
                                msg_debug ("invalid fuzzy command of size %d received", r);
                        }
 
-                       rspamd_inet_address_destroy (session.addr);
-                       rspamd_explicit_memzero (session.nm, sizeof (session.nm));
-                       worker->nconns--;
+                       REF_RELEASE (session);
                }
        }
 }