diff options
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 187 |
1 files changed, 171 insertions, 16 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 0a3ab89af..b1b1b2708 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -78,10 +78,13 @@ struct rspamd_fuzzy_storage_ctx { gchar *update_map; guint keypair_cache_size; struct event_base *ev_base; + gint peer_fd; + struct event peer_ev; /* Local keypair */ gpointer key; struct rspamd_keypair_cache *keypair_cache; struct rspamd_fuzzy_backend *backend; + GQueue *updates_pending; }; enum fuzzy_cmd_type { @@ -114,6 +117,18 @@ struct fuzzy_session { guchar nm[rspamd_cryptobox_MAX_NMBYTES]; }; +struct fuzzy_peer_cmd { + union { + struct rspamd_fuzzy_cmd normal; + struct rspamd_fuzzy_shingle_cmd shingle; + } cmd; +}; + +struct fuzzy_peer_request { + struct event io_ev; + struct fuzzy_peer_cmd cmd; +}; + static void rspamd_fuzzy_write_reply (struct fuzzy_session *session); static gboolean @@ -129,6 +144,35 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session) } static void +rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) +{ + GList *cur; + struct fuzzy_peer_cmd *cmd; + guint nupdates = 0; + + cur = ctx->updates_pending->head; + while (cur) { + cmd = cur->data; + + if (cmd->cmd.normal.cmd == FUZZY_WRITE) { + rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal); + } + else { + rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal); + } + + g_slice_free1 (sizeof (*cmd), cmd); + nupdates ++; + cur = g_list_next (cur); + } + + g_queue_clear (ctx->updates_pending); + server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend); + + msg_info ("updated fuzzy storage: %ud updates processed", nupdates); +} + +static void rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d) { struct fuzzy_session *session = d; @@ -174,25 +218,48 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session) } static void +fuzzy_peer_send_io (gint fd, gshort what, gpointer d) +{ + struct fuzzy_peer_request *up_req = d; + gssize r; + + r = write (fd, &up_req->cmd, sizeof (up_req->cmd)); + + if (r != sizeof (up_req->cmd)) { + msg_err ("cannot send update request to the peer: %s", strerror (errno)); + } + + event_del (&up_req->io_ev); + g_slice_free1 (sizeof (*up_req), up_req); +} + +static void rspamd_fuzzy_process_command (struct fuzzy_session *session) { - gboolean res = FALSE, encrypted = FALSE; + gboolean encrypted = FALSE; struct rspamd_fuzzy_cmd *cmd; struct rspamd_fuzzy_reply result; + struct fuzzy_peer_cmd *up_cmd; + struct fuzzy_peer_request *up_req; + gsize up_len; switch (session->cmd_type) { case CMD_NORMAL: cmd = &session->cmd.normal; + up_len = sizeof (session->cmd.normal); break; case CMD_SHINGLE: cmd = &session->cmd.shingle.basic; + up_len = sizeof (session->cmd.shingle); break; case CMD_ENCRYPTED_NORMAL: cmd = &session->cmd.enc_normal.cmd; + up_len = sizeof (session->cmd.normal); encrypted = TRUE; break; case CMD_ENCRYPTED_SHINGLE: cmd = &session->cmd.enc_shingle.cmd.basic; + up_len = sizeof (session->cmd.shingle); encrypted = TRUE; break; } @@ -210,27 +277,30 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) else { result.flag = cmd->flag; if (rspamd_fuzzy_check_client (session)) { - if (cmd->cmd == FUZZY_WRITE) { - res = rspamd_fuzzy_backend_add (session->ctx->backend, cmd); - } - else { - res = rspamd_fuzzy_backend_del (session->ctx->backend, cmd); - } - if (!res) { - result.value = 404; - result.prob = 0.0; + + if (session->worker->index == 0 || session->ctx->peer_fd == -1) { + /* Just add to the queue */ + up_cmd = g_slice_alloc (sizeof (*up_cmd)); + memcpy (up_cmd, cmd, up_len); + g_queue_push_tail (session->ctx->updates_pending, up_cmd); } else { - result.value = 0; - result.prob = 1.0; + /* We need to send request to the peer */ + up_req = g_slice_alloc (sizeof (*up_req)); + memcpy (&up_req->cmd, cmd, up_len); + event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE, + fuzzy_peer_send_io, up_req); + event_base_set (session->ctx->ev_base, &up_req->io_ev); + event_add (&up_req->io_ev, NULL); } + + result.value = 0; + result.prob = 1.0; } else { result.value = 403; result.prob = 0.0; } - - server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (session->ctx->backend); } result.tag = cmd->tag; @@ -489,6 +559,7 @@ sync_callback (gint fd, short what, void *arg) ctx = worker->ctx; if (ctx->backend) { + rspamd_fuzzy_process_updates_queue (ctx); /* Call backend sync */ old_expired = rspamd_fuzzy_backend_expired (ctx->backend); rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); @@ -605,6 +676,67 @@ init_fuzzy (struct rspamd_config *cfg) return ctx; } +static void +rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d) +{ + struct fuzzy_peer_cmd cmd, *pcmd; + struct rspamd_fuzzy_storage_ctx *ctx = d; + gssize r; + + r = read (fd, &cmd, sizeof (cmd)); + + if (r != sizeof (cmd)) { + msg_err ("cannot read command from peers: %s", strerror (errno)); + } + else { + pcmd = g_slice_alloc (sizeof (*pcmd)); + memcpy (pcmd, &cmd, sizeof (cmd)); + g_queue_push_tail (ctx->updates_pending, pcmd); + } +} + +static void +fuzzy_peer_rep (struct rspamd_worker *worker, + struct rspamd_srv_reply *rep, gint rep_fd, + gpointer ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = ud; + GList *cur; + gint listen_socket; + struct event *accept_event; + + ctx->peer_fd = rep_fd; + + if (rep_fd == -1) { + msg_warn ("cannot receive peer fd from the main process"); + } + + /* Start listening */ + cur = worker->cf->listen_socks; + while (cur) { + listen_socket = GPOINTER_TO_INT (cur->data); + if (listen_socket != -1) { + accept_event = g_slice_alloc0 (sizeof (struct event)); + event_set (accept_event, listen_socket, EV_READ | EV_PERSIST, + accept_fuzzy_socket, worker); + event_base_set (ctx->ev_base, accept_event); + event_add (accept_event, NULL); + worker->accept_events = g_list_prepend (worker->accept_events, + accept_event); + } + cur = g_list_next (cur); + } + + if (worker->index == 0 && ctx->peer_fd != -1) { + /* Listen for peer requests */ + event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST, + rspamd_fuzzy_peer_io, ctx); + event_base_set (ctx->ev_base, &ctx->peer_ev); + event_add (&ctx->peer_ev, NULL); + ctx->updates_pending = g_queue_new (); + } +} + /* * Start worker process */ @@ -614,10 +746,12 @@ start_fuzzy (struct rspamd_worker *worker) struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx; GError *err = NULL; gdouble next_check; + struct rspamd_srv_command srv_cmd; ctx->ev_base = rspamd_prepare_worker (worker, "fuzzy", - accept_fuzzy_socket); + NULL); + ctx->peer_fd = -1; server_stat = worker->srv->stat; /* @@ -666,13 +800,34 @@ start_fuzzy (struct rspamd_worker *worker) /* Maps events */ rspamd_map_watch (worker->srv->cfg, ctx->ev_base); + /* Get peer pipe */ + srv_cmd.type = RSPAMD_SRV_SOCKETPAIR; + srv_cmd.id = ottery_rand_uint64 (); + srv_cmd.cmd.spair.af = SOCK_DGRAM; + srv_cmd.cmd.spair.pair_num = worker->index; + memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id)); + memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy")); + + rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, fuzzy_peer_rep, ctx); + event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); - rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); + if (worker->index == 0) { + rspamd_fuzzy_process_updates_queue (ctx); + rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); + } + rspamd_fuzzy_backend_close (ctx->backend); rspamd_log_close (worker->srv->logger); + if (ctx->peer_fd != -1) { + if (worker->index == 0) { + event_del (&ctx->peer_ev); + } + close (ctx->peer_fd); + } + if (ctx->keypair_cache) { rspamd_keypair_cache_destroy (ctx->keypair_cache); } |