Selaa lähdekoodia

Implement delayed writing for fuzzy replies.

tags/1.1.0
Vsevolod Stakhov 8 vuotta sitten
vanhempi
commit
f6ee8625dc
1 muutettua tiedostoa jossa 77 lisäystä ja 28 poistoa
  1. 77
    28
      src/fuzzy_storage.c

+ 77
- 28
src/fuzzy_storage.c Näytä tiedosto

@@ -39,6 +39,7 @@
#include "cryptobox.h"
#include "keypairs_cache.h"
#include "keypair_private.h"
#include "ref.h"

/* This number is used as expire time in seconds for cache items (2 days) */
#define DEFAULT_EXPIRE 172800L
@@ -92,6 +93,8 @@ enum fuzzy_cmd_type {

struct fuzzy_session {
struct rspamd_worker *worker;
rspamd_inet_addr_t *addr;
struct rspamd_fuzzy_storage_ctx *ctx;

union {
struct rspamd_fuzzy_encrypted_shingle_cmd enc_shingle;
@@ -99,15 +102,20 @@ struct fuzzy_session {
struct rspamd_fuzzy_cmd normal;
struct rspamd_fuzzy_shingle_cmd shingle;
} cmd;

struct rspamd_fuzzy_encrypted_reply reply;

enum rspamd_fuzzy_epoch epoch;
enum fuzzy_cmd_type cmd_type;
gint fd;
guint64 time;
rspamd_inet_addr_t *addr;
struct rspamd_fuzzy_storage_ctx *ctx;
struct event io;
ref_entry_t ref;
guchar nm[rspamd_cryptobox_MAX_NMBYTES];
};

static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);

static gboolean
rspamd_fuzzy_check_client (struct fuzzy_session *session)
{
@@ -121,17 +129,43 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session)
}

static void
rspamd_fuzzy_write_reply (struct fuzzy_session *session,
gconstpointer data, gsize len)
rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
{
struct fuzzy_session *session = d;

rspamd_fuzzy_write_reply (session);
REF_RELEASE (session);
}

static void
rspamd_fuzzy_write_reply (struct fuzzy_session *session)
{
gssize r;
gsize len;
gconstpointer data;

if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
/* Encrypted reply */
data = &session->reply;
len = sizeof (session->reply);
}
else {
data = &session->reply.rep;
len = sizeof (session->reply.rep);
}

r = rspamd_inet_address_sendto (session->fd, data, len, 0,
session->addr);

if (r == -1) {
if (errno == EINTR) {
rspamd_fuzzy_write_reply (session, data, len);
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
/* Grab reference to avoid early destruction */
REF_RETAIN (session);
event_set (&session->io, session->fd, EV_WRITE,
rspamd_fuzzy_reply_io, session);
event_base_set (session->ctx->ev_base, &session->io);
event_add (&session->io, NULL);
}
else {
msg_err ("error while writing reply: %s", strerror (errno));
@@ -144,7 +178,6 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
gboolean res = FALSE, encrypted = FALSE;
struct rspamd_fuzzy_cmd *cmd;
struct rspamd_fuzzy_encrypted_reply rep;
struct rspamd_fuzzy_reply result;

switch (session->cmd_type) {
@@ -201,18 +234,20 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
}

result.tag = cmd->tag;
memcpy (&rep.rep, &result, sizeof (result));
memcpy (&session->reply.rep, &result, sizeof (result));

if (encrypted) {
/* We need also to encrypt reply */
ottery_rand_bytes (rep.hdr.nonce, sizeof (rep.hdr.nonce));
rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&rep.rep, sizeof (rep.rep),
rep.hdr.nonce, session->nm, rep.hdr.mac);
rspamd_fuzzy_write_reply (session, &rep, sizeof (rep));
}
else {
rspamd_fuzzy_write_reply (session, &rep.rep, sizeof (rep.rep));
ottery_rand_bytes (session->reply.hdr.nonce,
sizeof (session->reply.hdr.nonce));
rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
sizeof (session->reply.rep),
session->reply.hdr.nonce,
session->nm,
session->reply.hdr.mac);
}

rspamd_fuzzy_write_reply (session);
}


@@ -370,6 +405,17 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
return TRUE;
}

static void
fuzzy_session_destroy (gpointer d)
{
struct fuzzy_session *session = d;

rspamd_inet_address_destroy (session->addr);
rspamd_explicit_memzero (session->nm, sizeof (session->nm));
session->worker->nconns--;
g_slice_free1 (sizeof (*session), session);
}

/*
* Accept new connection and construct task
*/
@@ -377,15 +423,11 @@ static void
accept_fuzzy_socket (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct fuzzy_session session;
struct fuzzy_session *session;
rspamd_inet_addr_t *addr;
gssize r;
guint8 buf[512];

session.worker = worker;
session.fd = fd;
session.ctx = worker->ctx;
session.time = (guint64)time (NULL);

/* Got some data */
if (what == EV_READ) {

@@ -396,13 +438,14 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
buf,
sizeof (buf),
0,
&session.addr);
&addr);

if (r == -1) {
if (errno == EINTR) {
continue;
}
else if (errno == EAGAIN) {
else if (errno == EAGAIN || errno == EWOULDBLOCK) {

return;
}

@@ -412,9 +455,17 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
return;
}

if (rspamd_fuzzy_cmd_from_wire (buf, r, &session)) {
session = g_slice_alloc0 (sizeof (*session));
REF_INIT_RETAIN (session, fuzzy_session_destroy);
session->worker = worker;
session->fd = fd;
session->ctx = worker->ctx;
session->time = (guint64) time (NULL);
session->addr = addr;

if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
/* Check shingles count sanity */
rspamd_fuzzy_process_command (&session);
rspamd_fuzzy_process_command (session);
}
else {
/* Discard input */
@@ -422,9 +473,7 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
msg_debug ("invalid fuzzy command of size %d received", r);
}

rspamd_inet_address_destroy (session.addr);
rspamd_explicit_memzero (session.nm, sizeof (session.nm));
worker->nconns--;
REF_RELEASE (session);
}
}
}

Loading…
Peruuta
Tallenna