Browse Source

Implement new fuzzy updates architecture

So far, fuzzy storage can run in multiple processes. However, merely one process is responsible for changes whilst others just work as proxies when dealing with updates. That should fix sqlite concurrency issues.
tags/1.1.0
Vsevolod Stakhov 8 years ago
parent
commit
5ac408da90
1 changed files with 171 additions and 16 deletions
  1. 171
    16
      src/fuzzy_storage.c

+ 171
- 16
src/fuzzy_storage.c View File

@@ -78,10 +78,13 @@ struct rspamd_fuzzy_storage_ctx {
gchar *update_map;
guint keypair_cache_size;
struct event_base *ev_base;
gint peer_fd;
struct event peer_ev;
/* Local keypair */
gpointer key;
struct rspamd_keypair_cache *keypair_cache;
struct rspamd_fuzzy_backend *backend;
GQueue *updates_pending;
};

enum fuzzy_cmd_type {
@@ -114,6 +117,18 @@ struct fuzzy_session {
guchar nm[rspamd_cryptobox_MAX_NMBYTES];
};

struct fuzzy_peer_cmd {
union {
struct rspamd_fuzzy_cmd normal;
struct rspamd_fuzzy_shingle_cmd shingle;
} cmd;
};

struct fuzzy_peer_request {
struct event io_ev;
struct fuzzy_peer_cmd cmd;
};

static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);

static gboolean
@@ -128,6 +143,35 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session)
return TRUE;
}

static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
{
GList *cur;
struct fuzzy_peer_cmd *cmd;
guint nupdates = 0;

cur = ctx->updates_pending->head;
while (cur) {
cmd = cur->data;

if (cmd->cmd.normal.cmd == FUZZY_WRITE) {
rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal);
}
else {
rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal);
}

g_slice_free1 (sizeof (*cmd), cmd);
nupdates ++;
cur = g_list_next (cur);
}

g_queue_clear (ctx->updates_pending);
server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);

msg_info ("updated fuzzy storage: %ud updates processed", nupdates);
}

static void
rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
{
@@ -173,26 +217,49 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
}
}

static void
fuzzy_peer_send_io (gint fd, gshort what, gpointer d)
{
struct fuzzy_peer_request *up_req = d;
gssize r;

r = write (fd, &up_req->cmd, sizeof (up_req->cmd));

if (r != sizeof (up_req->cmd)) {
msg_err ("cannot send update request to the peer: %s", strerror (errno));
}

event_del (&up_req->io_ev);
g_slice_free1 (sizeof (*up_req), up_req);
}

static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
gboolean res = FALSE, encrypted = FALSE;
gboolean encrypted = FALSE;
struct rspamd_fuzzy_cmd *cmd;
struct rspamd_fuzzy_reply result;
struct fuzzy_peer_cmd *up_cmd;
struct fuzzy_peer_request *up_req;
gsize up_len;

switch (session->cmd_type) {
case CMD_NORMAL:
cmd = &session->cmd.normal;
up_len = sizeof (session->cmd.normal);
break;
case CMD_SHINGLE:
cmd = &session->cmd.shingle.basic;
up_len = sizeof (session->cmd.shingle);
break;
case CMD_ENCRYPTED_NORMAL:
cmd = &session->cmd.enc_normal.cmd;
up_len = sizeof (session->cmd.normal);
encrypted = TRUE;
break;
case CMD_ENCRYPTED_SHINGLE:
cmd = &session->cmd.enc_shingle.cmd.basic;
up_len = sizeof (session->cmd.shingle);
encrypted = TRUE;
break;
}
@@ -210,27 +277,30 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
else {
result.flag = cmd->flag;
if (rspamd_fuzzy_check_client (session)) {
if (cmd->cmd == FUZZY_WRITE) {
res = rspamd_fuzzy_backend_add (session->ctx->backend, cmd);
}
else {
res = rspamd_fuzzy_backend_del (session->ctx->backend, cmd);
}
if (!res) {
result.value = 404;
result.prob = 0.0;

if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
/* Just add to the queue */
up_cmd = g_slice_alloc (sizeof (*up_cmd));
memcpy (up_cmd, cmd, up_len);
g_queue_push_tail (session->ctx->updates_pending, up_cmd);
}
else {
result.value = 0;
result.prob = 1.0;
/* We need to send request to the peer */
up_req = g_slice_alloc (sizeof (*up_req));
memcpy (&up_req->cmd, cmd, up_len);
event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE,
fuzzy_peer_send_io, up_req);
event_base_set (session->ctx->ev_base, &up_req->io_ev);
event_add (&up_req->io_ev, NULL);
}

result.value = 0;
result.prob = 1.0;
}
else {
result.value = 403;
result.prob = 0.0;
}

server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (session->ctx->backend);
}

result.tag = cmd->tag;
@@ -489,6 +559,7 @@ sync_callback (gint fd, short what, void *arg)
ctx = worker->ctx;

if (ctx->backend) {
rspamd_fuzzy_process_updates_queue (ctx);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -605,6 +676,67 @@ init_fuzzy (struct rspamd_config *cfg)
return ctx;
}

static void
rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d)
{
struct fuzzy_peer_cmd cmd, *pcmd;
struct rspamd_fuzzy_storage_ctx *ctx = d;
gssize r;

r = read (fd, &cmd, sizeof (cmd));

if (r != sizeof (cmd)) {
msg_err ("cannot read command from peers: %s", strerror (errno));
}
else {
pcmd = g_slice_alloc (sizeof (*pcmd));
memcpy (pcmd, &cmd, sizeof (cmd));
g_queue_push_tail (ctx->updates_pending, pcmd);
}
}

static void
fuzzy_peer_rep (struct rspamd_worker *worker,
struct rspamd_srv_reply *rep, gint rep_fd,
gpointer ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;
GList *cur;
gint listen_socket;
struct event *accept_event;

ctx->peer_fd = rep_fd;

if (rep_fd == -1) {
msg_warn ("cannot receive peer fd from the main process");
}

/* Start listening */
cur = worker->cf->listen_socks;
while (cur) {
listen_socket = GPOINTER_TO_INT (cur->data);
if (listen_socket != -1) {
accept_event = g_slice_alloc0 (sizeof (struct event));
event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
accept_fuzzy_socket, worker);
event_base_set (ctx->ev_base, accept_event);
event_add (accept_event, NULL);
worker->accept_events = g_list_prepend (worker->accept_events,
accept_event);
}
cur = g_list_next (cur);
}

if (worker->index == 0 && ctx->peer_fd != -1) {
/* Listen for peer requests */
event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST,
rspamd_fuzzy_peer_io, ctx);
event_base_set (ctx->ev_base, &ctx->peer_ev);
event_add (&ctx->peer_ev, NULL);
ctx->updates_pending = g_queue_new ();
}
}

/*
* Start worker process
*/
@@ -614,10 +746,12 @@ start_fuzzy (struct rspamd_worker *worker)
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
GError *err = NULL;
gdouble next_check;
struct rspamd_srv_command srv_cmd;

ctx->ev_base = rspamd_prepare_worker (worker,
"fuzzy",
accept_fuzzy_socket);
NULL);
ctx->peer_fd = -1;
server_stat = worker->srv->stat;

/*
@@ -666,13 +800,34 @@ start_fuzzy (struct rspamd_worker *worker)
/* Maps events */
rspamd_map_watch (worker->srv->cfg, ctx->ev_base);

/* Get peer pipe */
srv_cmd.type = RSPAMD_SRV_SOCKETPAIR;
srv_cmd.id = ottery_rand_uint64 ();
srv_cmd.cmd.spair.af = SOCK_DGRAM;
srv_cmd.cmd.spair.pair_num = worker->index;
memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));

rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, fuzzy_peer_rep, ctx);

event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();

rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
if (worker->index == 0) {
rspamd_fuzzy_process_updates_queue (ctx);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
}

rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);

if (ctx->peer_fd != -1) {
if (worker->index == 0) {
event_del (&ctx->peer_ev);
}
close (ctx->peer_fd);
}

if (ctx->keypair_cache) {
rspamd_keypair_cache_destroy (ctx->keypair_cache);
}

Loading…
Cancel
Save