From eccc8fb354cc26ae171d43d6a25c5ffcdb861263 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 8 Dec 2015 18:17:32 +0000 Subject: [PATCH] Add event for loading of hyperscan files --- src/libserver/re_cache.c | 5 +- src/libserver/rspamd_control.c | 123 +++++++++++++++++++++++---------- src/libserver/rspamd_control.h | 14 ++++ 3 files changed, 103 insertions(+), 39 deletions(-) diff --git a/src/libserver/re_cache.c b/src/libserver/re_cache.c index c893ceb02..3e67fd9ed 100644 --- a/src/libserver/re_cache.c +++ b/src/libserver/re_cache.c @@ -1105,7 +1105,7 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache, return FALSE; #else gchar path[PATH_MAX]; - gint fd, i, n, *hs_ids = NULL; + gint fd, i, n, *hs_ids = NULL, total = 0; GHashTableIter it; gpointer k, v; guint8 *map, *p, *end; @@ -1154,6 +1154,7 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache, return FALSE; } + total += n; p += sizeof (n); hs_ids = g_malloc (n * sizeof (*hs_ids)); memcpy (hs_ids, p, n * sizeof (*hs_ids)); @@ -1194,6 +1195,8 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache, } } + msg_info_re_cache ("hyperscan database of %d regexps has been loaded", total); + return TRUE; #endif } diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 5ba24d056..8b85c745a 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -48,7 +48,7 @@ struct rspamd_control_reply_elt { struct rspamd_control_reply reply; struct event io_ev; struct rspamd_worker *wrk; - struct rspamd_control_session *session; + gpointer ud; struct rspamd_control_reply_elt *prev, *next; }; @@ -182,7 +182,7 @@ rspamd_control_write_reply (struct rspamd_control_session *session) ucl_object_insert_key (cur, ucl_object_fromstring (g_quark_to_string ( elt->wrk->type)), "type", 0, false); - switch (elt->session->cmd.type) { + switch (session->cmd.type) { case RSPAMD_CONTROL_STAT: ucl_object_insert_key (cur, ucl_object_fromint ( elt->reply.reply.stat.conns), "conns", 0, false); @@ -243,6 +243,9 @@ static void rspamd_control_wrk_io (gint fd, short what, gpointer ud) { struct rspamd_control_reply_elt *elt = ud; + struct rspamd_control_session *session; + + session = elt->ud; if (read (elt->wrk->control_pipe[0], &elt->reply, sizeof (elt->reply)) != sizeof (elt->reply)) { @@ -250,11 +253,11 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud) elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno)); } - elt->session->replies_remain --; + session->replies_remain --; event_del (&elt->io_ev); - if (elt->session->replies_remain == 0) { - rspamd_control_write_reply (elt->session); + if (session->replies_remain == 0) { + rspamd_control_write_reply (session); } } @@ -273,18 +276,55 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err) } } +static struct rspamd_control_reply_elt * +rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main, + struct rspamd_control_command *cmd, + void (*handler) (evutil_socket_t, short, void *), gpointer ud) +{ + GHashTableIter it; + struct rspamd_worker *wrk; + struct rspamd_control_reply_elt *rep_elt, *res = NULL; + gpointer k, v; + + g_hash_table_iter_init (&it, rspamd_main->workers); + + while (g_hash_table_iter_next (&it, &k, &v)) { + wrk = v; + + if (write (wrk->control_pipe[0], cmd, + sizeof (*cmd)) == sizeof (*cmd)) { + + rep_elt = g_slice_alloc0 (sizeof (*rep_elt)); + rep_elt->wrk = wrk; + rep_elt->ud = ud; + event_set (&rep_elt->io_ev, wrk->control_pipe[0], + EV_READ | EV_PERSIST, handler, + rep_elt); + event_base_set (rspamd_main->ev_base, + &rep_elt->io_ev); + event_add (&rep_elt->io_ev, &worker_io_timeout); + + DL_APPEND (res, rep_elt); + } + else { + msg_err ("cannot write request to the worker %P (%s): %s", + wrk->pid, g_quark_to_string (wrk->type), strerror (errno)); + } + } + + return res; +} + static gint -rspamd_control_finish_hadler (struct rspamd_http_connection *conn, +rspamd_control_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) { struct rspamd_control_session *session = conn->ud; rspamd_ftok_t srch; guint i; gboolean found = FALSE; - GHashTableIter it; - struct rspamd_worker *wrk; - struct rspamd_control_reply_elt *rep_elt; - gpointer k, v; + struct rspamd_control_reply_elt *cur; + if (!session->is_reply) { if (msg->url == NULL) { @@ -311,31 +351,12 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn, } else { /* Send command to all workers */ - g_hash_table_iter_init (&it, session->rspamd_main->workers); - - while (g_hash_table_iter_next (&it, &k, &v)) { - wrk = v; - - if (write (wrk->control_pipe[0], &session->cmd, - sizeof (session->cmd)) == sizeof (session->cmd)) { - - rep_elt = g_slice_alloc0 (sizeof (*rep_elt)); - rep_elt->wrk = wrk; - rep_elt->session = session; - event_set (&rep_elt->io_ev, wrk->control_pipe[0], - EV_READ | EV_PERSIST, rspamd_control_wrk_io, - rep_elt); - event_base_set (session->rspamd_main->ev_base, - &rep_elt->io_ev); - event_add (&rep_elt->io_ev, &worker_io_timeout); - - DL_APPEND (session->replies, rep_elt); - session->replies_remain ++; - } - else { - msg_err ("cannot write request to the worker %P (%s): %s", - wrk->pid, g_quark_to_string (wrk->type), strerror (errno)); - } + session->replies = rspamd_control_broadcast_cmd ( + session->rspamd_main, &session->cmd, + rspamd_control_wrk_io, session); + + DL_FOREACH (session->replies, cur) { + session->replies_remain ++; } } } @@ -357,7 +378,7 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main, session->fd = fd; session->conn = rspamd_http_connection_new (NULL, rspamd_control_error_handler, - rspamd_control_finish_hadler, 0, RSPAMD_HTTP_SERVER, NULL); + rspamd_control_finish_handler, 0, RSPAMD_HTTP_SERVER, NULL); session->rspamd_main = rspamd_main; rspamd_http_connection_read_message (session->conn, session, session->fd, &io_timeout, rspamd_main->ev_base); @@ -403,6 +424,7 @@ rspamd_control_default_cmd_handler (gint fd, break; case RSPAMD_CONTROL_RELOAD: case RSPAMD_CONTROL_RECOMPILE: + case RSPAMD_CONTROL_HYPERSCAN_LOADED: break; case RSPAMD_CONTROL_RERESOLVE: if (cd->worker->srv->cfg) { @@ -504,6 +526,16 @@ struct rspamd_srv_reply_data { struct rspamd_srv_reply rep; }; +static void +rspamd_control_hs_io_handler (gint fd, short what, gpointer ud) +{ + struct rspamd_control_reply_elt *elt = ud; + + /* At this point we just ignore replies from the workers */ + event_del (&elt->io_ev); + g_slice_free1 (sizeof (*elt), elt); +} + static void rspamd_srv_handler (gint fd, short what, gpointer ud) { @@ -517,6 +549,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) guchar fdspace[CMSG_SPACE(sizeof (int))]; gint *spair; gchar *nid; + struct rspamd_control_command wcmd; gssize r; if (what == EV_READ) { @@ -571,6 +604,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0]; } break; + case RSPAMD_SRV_HYPERSCAN_LOADED: + /* Broadcast command to all workers */ + wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED; + /* + * We assume that cache dir is shared at the same address for all + * workers + */ + wcmd.cmd.hs_loaded.cache_dir = cmd.cmd.hs_loaded.cache_dir; + rspamd_control_broadcast_cmd (srv, &wcmd, + rspamd_control_hs_io_handler, NULL); + rdata->rep.reply.hs_loaded.unused = 0; + break; default: msg_err ("unknown command type: %d", cmd.type); break; @@ -708,7 +753,10 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud) return; cleanup: - rd->handler (rd->worker, &rd->rep, rfd, rd->ud); + + if (rd->handler) { + rd->handler (rd->worker, &rd->rep, rfd, rd->ud); + } event_del (&rd->io_ev); g_slice_free1 (sizeof (*rd), rd); } @@ -723,7 +771,6 @@ rspamd_srv_send_command (struct rspamd_worker *worker, g_assert (cmd != NULL); g_assert (worker != NULL); - g_assert (handler != NULL); rd = g_slice_alloc0 (sizeof (*rd)); memcpy (&rd->cmd, cmd, sizeof (rd->cmd)); diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index 74597a1ac..4ae836925 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -36,11 +36,13 @@ enum rspamd_control_type { RSPAMD_CONTROL_RELOAD, RSPAMD_CONTROL_RERESOLVE, RSPAMD_CONTROL_RECOMPILE, + RSPAMD_CONTROL_HYPERSCAN_LOADED, RSPAMD_CONTROL_MAX }; enum rspamd_srv_type { RSPAMD_SRV_SOCKETPAIR = 0, + RSPAMD_SRV_HYPERSCAN_LOADED, }; struct rspamd_control_command { @@ -58,6 +60,9 @@ struct rspamd_control_command { struct { guint unused; } recompile; + struct { + gpointer cache_dir; + } hs_loaded; } cmd; }; @@ -80,6 +85,9 @@ struct rspamd_control_reply { struct { guint status; } recompile; + struct { + guint status; + } hs_loaded; } reply; }; @@ -93,6 +101,9 @@ struct rspamd_srv_command { gchar pair_id[PAIR_ID_LEN]; guint pair_num; } spair; + struct { + gpointer cache_dir; + } hs_loaded; } cmd; }; @@ -103,6 +114,9 @@ struct rspamd_srv_reply { struct { gint code; } spair; + struct { + gint unused; + } hs_loaded; } reply; }; -- 2.39.5