From: Vsevolod Stakhov Date: Sat, 7 Nov 2015 13:22:33 +0000 (+0000) Subject: Implement delayed writing for fuzzy replies. X-Git-Tag: 1.1.0~612 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=f6ee8625dc09db650f150f7a2efdcce0107f0e68;p=rspamd.git Implement delayed writing for fuzzy replies. --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 0dfb74f25..dd23c6112 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -39,6 +39,7 @@ #include "cryptobox.h" #include "keypairs_cache.h" #include "keypair_private.h" +#include "ref.h" /* This number is used as expire time in seconds for cache items (2 days) */ #define DEFAULT_EXPIRE 172800L @@ -92,6 +93,8 @@ enum fuzzy_cmd_type { struct fuzzy_session { struct rspamd_worker *worker; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_storage_ctx *ctx; union { struct rspamd_fuzzy_encrypted_shingle_cmd enc_shingle; @@ -99,15 +102,20 @@ struct fuzzy_session { struct rspamd_fuzzy_cmd normal; struct rspamd_fuzzy_shingle_cmd shingle; } cmd; + + struct rspamd_fuzzy_encrypted_reply reply; + enum rspamd_fuzzy_epoch epoch; enum fuzzy_cmd_type cmd_type; gint fd; guint64 time; - rspamd_inet_addr_t *addr; - struct rspamd_fuzzy_storage_ctx *ctx; + struct event io; + ref_entry_t ref; guchar nm[rspamd_cryptobox_MAX_NMBYTES]; }; +static void rspamd_fuzzy_write_reply (struct fuzzy_session *session); + static gboolean rspamd_fuzzy_check_client (struct fuzzy_session *session) { @@ -121,17 +129,43 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session) } static void -rspamd_fuzzy_write_reply (struct fuzzy_session *session, - gconstpointer data, gsize len) +rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d) +{ + struct fuzzy_session *session = d; + + rspamd_fuzzy_write_reply (session); + REF_RELEASE (session); +} + +static void +rspamd_fuzzy_write_reply (struct fuzzy_session *session) { gssize r; + gsize len; + gconstpointer data; + + if (session->cmd_type == CMD_ENCRYPTED_NORMAL || + session->cmd_type == CMD_ENCRYPTED_SHINGLE) { + /* Encrypted reply */ + data = &session->reply; + len = sizeof (session->reply); + } + else { + data = &session->reply.rep; + len = sizeof (session->reply.rep); + } r = rspamd_inet_address_sendto (session->fd, data, len, 0, session->addr); if (r == -1) { - if (errno == EINTR) { - rspamd_fuzzy_write_reply (session, data, len); + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + /* Grab reference to avoid early destruction */ + REF_RETAIN (session); + event_set (&session->io, session->fd, EV_WRITE, + rspamd_fuzzy_reply_io, session); + event_base_set (session->ctx->ev_base, &session->io); + event_add (&session->io, NULL); } else { msg_err ("error while writing reply: %s", strerror (errno)); @@ -144,7 +178,6 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) { gboolean res = FALSE, encrypted = FALSE; struct rspamd_fuzzy_cmd *cmd; - struct rspamd_fuzzy_encrypted_reply rep; struct rspamd_fuzzy_reply result; switch (session->cmd_type) { @@ -201,18 +234,20 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) } result.tag = cmd->tag; - memcpy (&rep.rep, &result, sizeof (result)); + memcpy (&session->reply.rep, &result, sizeof (result)); if (encrypted) { /* We need also to encrypt reply */ - ottery_rand_bytes (rep.hdr.nonce, sizeof (rep.hdr.nonce)); - rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&rep.rep, sizeof (rep.rep), - rep.hdr.nonce, session->nm, rep.hdr.mac); - rspamd_fuzzy_write_reply (session, &rep, sizeof (rep)); - } - else { - rspamd_fuzzy_write_reply (session, &rep.rep, sizeof (rep.rep)); + ottery_rand_bytes (session->reply.hdr.nonce, + sizeof (session->reply.hdr.nonce)); + rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep, + sizeof (session->reply.rep), + session->reply.hdr.nonce, + session->nm, + session->reply.hdr.mac); } + + rspamd_fuzzy_write_reply (session); } @@ -370,6 +405,17 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s) return TRUE; } +static void +fuzzy_session_destroy (gpointer d) +{ + struct fuzzy_session *session = d; + + rspamd_inet_address_destroy (session->addr); + rspamd_explicit_memzero (session->nm, sizeof (session->nm)); + session->worker->nconns--; + g_slice_free1 (sizeof (*session), session); +} + /* * Accept new connection and construct task */ @@ -377,15 +423,11 @@ static void accept_fuzzy_socket (gint fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct fuzzy_session session; + struct fuzzy_session *session; + rspamd_inet_addr_t *addr; gssize r; guint8 buf[512]; - session.worker = worker; - session.fd = fd; - session.ctx = worker->ctx; - session.time = (guint64)time (NULL); - /* Got some data */ if (what == EV_READ) { @@ -396,13 +438,14 @@ accept_fuzzy_socket (gint fd, short what, void *arg) buf, sizeof (buf), 0, - &session.addr); + &addr); if (r == -1) { if (errno == EINTR) { continue; } - else if (errno == EAGAIN) { + else if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; } @@ -412,9 +455,17 @@ accept_fuzzy_socket (gint fd, short what, void *arg) return; } - if (rspamd_fuzzy_cmd_from_wire (buf, r, &session)) { + session = g_slice_alloc0 (sizeof (*session)); + REF_INIT_RETAIN (session, fuzzy_session_destroy); + session->worker = worker; + session->fd = fd; + session->ctx = worker->ctx; + session->time = (guint64) time (NULL); + session->addr = addr; + + if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) { /* Check shingles count sanity */ - rspamd_fuzzy_process_command (&session); + rspamd_fuzzy_process_command (session); } else { /* Discard input */ @@ -422,9 +473,7 @@ accept_fuzzy_socket (gint fd, short what, void *arg) msg_debug ("invalid fuzzy command of size %d received", r); } - rspamd_inet_address_destroy (session.addr); - rspamd_explicit_memzero (session.nm, sizeof (session.nm)); - worker->nconns--; + REF_RELEASE (session); } } }