aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c187
1 files changed, 171 insertions, 16 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 0a3ab89af..b1b1b2708 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -78,10 +78,13 @@ struct rspamd_fuzzy_storage_ctx {
gchar *update_map;
guint keypair_cache_size;
struct event_base *ev_base;
+ gint peer_fd;
+ struct event peer_ev;
/* Local keypair */
gpointer key;
struct rspamd_keypair_cache *keypair_cache;
struct rspamd_fuzzy_backend *backend;
+ GQueue *updates_pending;
};
enum fuzzy_cmd_type {
@@ -114,6 +117,18 @@ struct fuzzy_session {
guchar nm[rspamd_cryptobox_MAX_NMBYTES];
};
+struct fuzzy_peer_cmd {
+ union {
+ struct rspamd_fuzzy_cmd normal;
+ struct rspamd_fuzzy_shingle_cmd shingle;
+ } cmd;
+};
+
+struct fuzzy_peer_request {
+ struct event io_ev;
+ struct fuzzy_peer_cmd cmd;
+};
+
static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
static gboolean
@@ -129,6 +144,35 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session)
}
static void
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
+{
+ GList *cur;
+ struct fuzzy_peer_cmd *cmd;
+ guint nupdates = 0;
+
+ cur = ctx->updates_pending->head;
+ while (cur) {
+ cmd = cur->data;
+
+ if (cmd->cmd.normal.cmd == FUZZY_WRITE) {
+ rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal);
+ }
+ else {
+ rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal);
+ }
+
+ g_slice_free1 (sizeof (*cmd), cmd);
+ nupdates ++;
+ cur = g_list_next (cur);
+ }
+
+ g_queue_clear (ctx->updates_pending);
+ server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+
+ msg_info ("updated fuzzy storage: %ud updates processed", nupdates);
+}
+
+static void
rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
{
struct fuzzy_session *session = d;
@@ -174,25 +218,48 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
}
static void
+fuzzy_peer_send_io (gint fd, gshort what, gpointer d)
+{
+ struct fuzzy_peer_request *up_req = d;
+ gssize r;
+
+ r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
+
+ if (r != sizeof (up_req->cmd)) {
+ msg_err ("cannot send update request to the peer: %s", strerror (errno));
+ }
+
+ event_del (&up_req->io_ev);
+ g_slice_free1 (sizeof (*up_req), up_req);
+}
+
+static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
- gboolean res = FALSE, encrypted = FALSE;
+ gboolean encrypted = FALSE;
struct rspamd_fuzzy_cmd *cmd;
struct rspamd_fuzzy_reply result;
+ struct fuzzy_peer_cmd *up_cmd;
+ struct fuzzy_peer_request *up_req;
+ gsize up_len;
switch (session->cmd_type) {
case CMD_NORMAL:
cmd = &session->cmd.normal;
+ up_len = sizeof (session->cmd.normal);
break;
case CMD_SHINGLE:
cmd = &session->cmd.shingle.basic;
+ up_len = sizeof (session->cmd.shingle);
break;
case CMD_ENCRYPTED_NORMAL:
cmd = &session->cmd.enc_normal.cmd;
+ up_len = sizeof (session->cmd.normal);
encrypted = TRUE;
break;
case CMD_ENCRYPTED_SHINGLE:
cmd = &session->cmd.enc_shingle.cmd.basic;
+ up_len = sizeof (session->cmd.shingle);
encrypted = TRUE;
break;
}
@@ -210,27 +277,30 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
else {
result.flag = cmd->flag;
if (rspamd_fuzzy_check_client (session)) {
- if (cmd->cmd == FUZZY_WRITE) {
- res = rspamd_fuzzy_backend_add (session->ctx->backend, cmd);
- }
- else {
- res = rspamd_fuzzy_backend_del (session->ctx->backend, cmd);
- }
- if (!res) {
- result.value = 404;
- result.prob = 0.0;
+
+ if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
+ /* Just add to the queue */
+ up_cmd = g_slice_alloc (sizeof (*up_cmd));
+ memcpy (up_cmd, cmd, up_len);
+ g_queue_push_tail (session->ctx->updates_pending, up_cmd);
}
else {
- result.value = 0;
- result.prob = 1.0;
+ /* We need to send request to the peer */
+ up_req = g_slice_alloc (sizeof (*up_req));
+ memcpy (&up_req->cmd, cmd, up_len);
+ event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE,
+ fuzzy_peer_send_io, up_req);
+ event_base_set (session->ctx->ev_base, &up_req->io_ev);
+ event_add (&up_req->io_ev, NULL);
}
+
+ result.value = 0;
+ result.prob = 1.0;
}
else {
result.value = 403;
result.prob = 0.0;
}
-
- server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (session->ctx->backend);
}
result.tag = cmd->tag;
@@ -489,6 +559,7 @@ sync_callback (gint fd, short what, void *arg)
ctx = worker->ctx;
if (ctx->backend) {
+ rspamd_fuzzy_process_updates_queue (ctx);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -605,6 +676,67 @@ init_fuzzy (struct rspamd_config *cfg)
return ctx;
}
+static void
+rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d)
+{
+ struct fuzzy_peer_cmd cmd, *pcmd;
+ struct rspamd_fuzzy_storage_ctx *ctx = d;
+ gssize r;
+
+ r = read (fd, &cmd, sizeof (cmd));
+
+ if (r != sizeof (cmd)) {
+ msg_err ("cannot read command from peers: %s", strerror (errno));
+ }
+ else {
+ pcmd = g_slice_alloc (sizeof (*pcmd));
+ memcpy (pcmd, &cmd, sizeof (cmd));
+ g_queue_push_tail (ctx->updates_pending, pcmd);
+ }
+}
+
+static void
+fuzzy_peer_rep (struct rspamd_worker *worker,
+ struct rspamd_srv_reply *rep, gint rep_fd,
+ gpointer ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+ GList *cur;
+ gint listen_socket;
+ struct event *accept_event;
+
+ ctx->peer_fd = rep_fd;
+
+ if (rep_fd == -1) {
+ msg_warn ("cannot receive peer fd from the main process");
+ }
+
+ /* Start listening */
+ cur = worker->cf->listen_socks;
+ while (cur) {
+ listen_socket = GPOINTER_TO_INT (cur->data);
+ if (listen_socket != -1) {
+ accept_event = g_slice_alloc0 (sizeof (struct event));
+ event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+ accept_fuzzy_socket, worker);
+ event_base_set (ctx->ev_base, accept_event);
+ event_add (accept_event, NULL);
+ worker->accept_events = g_list_prepend (worker->accept_events,
+ accept_event);
+ }
+ cur = g_list_next (cur);
+ }
+
+ if (worker->index == 0 && ctx->peer_fd != -1) {
+ /* Listen for peer requests */
+ event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST,
+ rspamd_fuzzy_peer_io, ctx);
+ event_base_set (ctx->ev_base, &ctx->peer_ev);
+ event_add (&ctx->peer_ev, NULL);
+ ctx->updates_pending = g_queue_new ();
+ }
+}
+
/*
* Start worker process
*/
@@ -614,10 +746,12 @@ start_fuzzy (struct rspamd_worker *worker)
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
GError *err = NULL;
gdouble next_check;
+ struct rspamd_srv_command srv_cmd;
ctx->ev_base = rspamd_prepare_worker (worker,
"fuzzy",
- accept_fuzzy_socket);
+ NULL);
+ ctx->peer_fd = -1;
server_stat = worker->srv->stat;
/*
@@ -666,13 +800,34 @@ start_fuzzy (struct rspamd_worker *worker)
/* Maps events */
rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
+ /* Get peer pipe */
+ srv_cmd.type = RSPAMD_SRV_SOCKETPAIR;
+ srv_cmd.id = ottery_rand_uint64 ();
+ srv_cmd.cmd.spair.af = SOCK_DGRAM;
+ srv_cmd.cmd.spair.pair_num = worker->index;
+ memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
+ memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));
+
+ rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, fuzzy_peer_rep, ctx);
+
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
- rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+ if (worker->index == 0) {
+ rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+ }
+
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);
+ if (ctx->peer_fd != -1) {
+ if (worker->index == 0) {
+ event_del (&ctx->peer_ev);
+ }
+ close (ctx->peer_fd);
+ }
+
if (ctx->keypair_cache) {
rspamd_keypair_cache_destroy (ctx->keypair_cache);
}