aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 17:28:59 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 17:28:59 +0000
commit5ac408da9088842d7c8147666fc31670da630b5a (patch)
tree662e336810de3994aad1ccd1ad854bea72488c24 /src
parent02061addefac8309269a2e7c9f2e588097ed06e7 (diff)
downloadrspamd-5ac408da9088842d7c8147666fc31670da630b5a.tar.gz
rspamd-5ac408da9088842d7c8147666fc31670da630b5a.zip
Implement new fuzzy updates architecture
So far, fuzzy storage can run in multiple processes. However, merely one process is responsible for changes whilst others just work as proxies when dealing with updates. That should fix sqlite concurrency issues.
Diffstat (limited to 'src')
-rw-r--r--src/fuzzy_storage.c187
1 files changed, 171 insertions, 16 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 0a3ab89af..b1b1b2708 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -78,10 +78,13 @@ struct rspamd_fuzzy_storage_ctx {
gchar *update_map;
guint keypair_cache_size;
struct event_base *ev_base;
+ gint peer_fd;
+ struct event peer_ev;
/* Local keypair */
gpointer key;
struct rspamd_keypair_cache *keypair_cache;
struct rspamd_fuzzy_backend *backend;
+ GQueue *updates_pending;
};
enum fuzzy_cmd_type {
@@ -114,6 +117,18 @@ struct fuzzy_session {
guchar nm[rspamd_cryptobox_MAX_NMBYTES];
};
+struct fuzzy_peer_cmd {
+ union {
+ struct rspamd_fuzzy_cmd normal;
+ struct rspamd_fuzzy_shingle_cmd shingle;
+ } cmd;
+};
+
+struct fuzzy_peer_request {
+ struct event io_ev;
+ struct fuzzy_peer_cmd cmd;
+};
+
static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
static gboolean
@@ -129,6 +144,35 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session)
}
static void
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
+{
+ GList *cur;
+ struct fuzzy_peer_cmd *cmd;
+ guint nupdates = 0;
+
+ cur = ctx->updates_pending->head;
+ while (cur) {
+ cmd = cur->data;
+
+ if (cmd->cmd.normal.cmd == FUZZY_WRITE) {
+ rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal);
+ }
+ else {
+ rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal);
+ }
+
+ g_slice_free1 (sizeof (*cmd), cmd);
+ nupdates ++;
+ cur = g_list_next (cur);
+ }
+
+ g_queue_clear (ctx->updates_pending);
+ server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+
+ msg_info ("updated fuzzy storage: %ud updates processed", nupdates);
+}
+
+static void
rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
{
struct fuzzy_session *session = d;
@@ -174,25 +218,48 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
}
static void
+fuzzy_peer_send_io (gint fd, gshort what, gpointer d)
+{
+ struct fuzzy_peer_request *up_req = d;
+ gssize r;
+
+ r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
+
+ if (r != sizeof (up_req->cmd)) {
+ msg_err ("cannot send update request to the peer: %s", strerror (errno));
+ }
+
+ event_del (&up_req->io_ev);
+ g_slice_free1 (sizeof (*up_req), up_req);
+}
+
+static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
- gboolean res = FALSE, encrypted = FALSE;
+ gboolean encrypted = FALSE;
struct rspamd_fuzzy_cmd *cmd;
struct rspamd_fuzzy_reply result;
+ struct fuzzy_peer_cmd *up_cmd;
+ struct fuzzy_peer_request *up_req;
+ gsize up_len;
switch (session->cmd_type) {
case CMD_NORMAL:
cmd = &session->cmd.normal;
+ up_len = sizeof (session->cmd.normal);
break;
case CMD_SHINGLE:
cmd = &session->cmd.shingle.basic;
+ up_len = sizeof (session->cmd.shingle);
break;
case CMD_ENCRYPTED_NORMAL:
cmd = &session->cmd.enc_normal.cmd;
+ up_len = sizeof (session->cmd.normal);
encrypted = TRUE;
break;
case CMD_ENCRYPTED_SHINGLE:
cmd = &session->cmd.enc_shingle.cmd.basic;
+ up_len = sizeof (session->cmd.shingle);
encrypted = TRUE;
break;
}
@@ -210,27 +277,30 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
else {
result.flag = cmd->flag;
if (rspamd_fuzzy_check_client (session)) {
- if (cmd->cmd == FUZZY_WRITE) {
- res = rspamd_fuzzy_backend_add (session->ctx->backend, cmd);
- }
- else {
- res = rspamd_fuzzy_backend_del (session->ctx->backend, cmd);
- }
- if (!res) {
- result.value = 404;
- result.prob = 0.0;
+
+ if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
+ /* Just add to the queue */
+ up_cmd = g_slice_alloc (sizeof (*up_cmd));
+ memcpy (up_cmd, cmd, up_len);
+ g_queue_push_tail (session->ctx->updates_pending, up_cmd);
}
else {
- result.value = 0;
- result.prob = 1.0;
+ /* We need to send request to the peer */
+ up_req = g_slice_alloc (sizeof (*up_req));
+ memcpy (&up_req->cmd, cmd, up_len);
+ event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE,
+ fuzzy_peer_send_io, up_req);
+ event_base_set (session->ctx->ev_base, &up_req->io_ev);
+ event_add (&up_req->io_ev, NULL);
}
+
+ result.value = 0;
+ result.prob = 1.0;
}
else {
result.value = 403;
result.prob = 0.0;
}
-
- server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (session->ctx->backend);
}
result.tag = cmd->tag;
@@ -489,6 +559,7 @@ sync_callback (gint fd, short what, void *arg)
ctx = worker->ctx;
if (ctx->backend) {
+ rspamd_fuzzy_process_updates_queue (ctx);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -605,6 +676,67 @@ init_fuzzy (struct rspamd_config *cfg)
return ctx;
}
+static void
+rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d)
+{
+ struct fuzzy_peer_cmd cmd, *pcmd;
+ struct rspamd_fuzzy_storage_ctx *ctx = d;
+ gssize r;
+
+ r = read (fd, &cmd, sizeof (cmd));
+
+ if (r != sizeof (cmd)) {
+ msg_err ("cannot read command from peers: %s", strerror (errno));
+ }
+ else {
+ pcmd = g_slice_alloc (sizeof (*pcmd));
+ memcpy (pcmd, &cmd, sizeof (cmd));
+ g_queue_push_tail (ctx->updates_pending, pcmd);
+ }
+}
+
+static void
+fuzzy_peer_rep (struct rspamd_worker *worker,
+ struct rspamd_srv_reply *rep, gint rep_fd,
+ gpointer ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+ GList *cur;
+ gint listen_socket;
+ struct event *accept_event;
+
+ ctx->peer_fd = rep_fd;
+
+ if (rep_fd == -1) {
+ msg_warn ("cannot receive peer fd from the main process");
+ }
+
+ /* Start listening */
+ cur = worker->cf->listen_socks;
+ while (cur) {
+ listen_socket = GPOINTER_TO_INT (cur->data);
+ if (listen_socket != -1) {
+ accept_event = g_slice_alloc0 (sizeof (struct event));
+ event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+ accept_fuzzy_socket, worker);
+ event_base_set (ctx->ev_base, accept_event);
+ event_add (accept_event, NULL);
+ worker->accept_events = g_list_prepend (worker->accept_events,
+ accept_event);
+ }
+ cur = g_list_next (cur);
+ }
+
+ if (worker->index == 0 && ctx->peer_fd != -1) {
+ /* Listen for peer requests */
+ event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST,
+ rspamd_fuzzy_peer_io, ctx);
+ event_base_set (ctx->ev_base, &ctx->peer_ev);
+ event_add (&ctx->peer_ev, NULL);
+ ctx->updates_pending = g_queue_new ();
+ }
+}
+
/*
* Start worker process
*/
@@ -614,10 +746,12 @@ start_fuzzy (struct rspamd_worker *worker)
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
GError *err = NULL;
gdouble next_check;
+ struct rspamd_srv_command srv_cmd;
ctx->ev_base = rspamd_prepare_worker (worker,
"fuzzy",
- accept_fuzzy_socket);
+ NULL);
+ ctx->peer_fd = -1;
server_stat = worker->srv->stat;
/*
@@ -666,13 +800,34 @@ start_fuzzy (struct rspamd_worker *worker)
/* Maps events */
rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
+ /* Get peer pipe */
+ srv_cmd.type = RSPAMD_SRV_SOCKETPAIR;
+ srv_cmd.id = ottery_rand_uint64 ();
+ srv_cmd.cmd.spair.af = SOCK_DGRAM;
+ srv_cmd.cmd.spair.pair_num = worker->index;
+ memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
+ memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));
+
+ rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, fuzzy_peer_rep, ctx);
+
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
- rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+ if (worker->index == 0) {
+ rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+ }
+
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);
+ if (ctx->peer_fd != -1) {
+ if (worker->index == 0) {
+ event_del (&ctx->peer_ev);
+ }
+ close (ctx->peer_fd);
+ }
+
if (ctx->keypair_cache) {
rspamd_keypair_cache_destroy (ctx->keypair_cache);
}