]> source.dussan.org Git - rspamd.git/commitdiff
Add event for loading of hyperscan files
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 8 Dec 2015 18:17:32 +0000 (18:17 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 8 Dec 2015 18:17:32 +0000 (18:17 +0000)
src/libserver/re_cache.c
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h

index c893ceb0289f6835ec29be5d2b07b998673654c5..3e67fd9ed4a1365e3d4997bb4c6fb2c722879891 100644 (file)
@@ -1105,7 +1105,7 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache,
        return FALSE;
 #else
        gchar path[PATH_MAX];
-       gint fd, i, n, *hs_ids = NULL;
+       gint fd, i, n, *hs_ids = NULL, total = 0;
        GHashTableIter it;
        gpointer k, v;
        guint8 *map, *p, *end;
@@ -1154,6 +1154,7 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache,
                                return FALSE;
                        }
 
+                       total += n;
                        p += sizeof (n);
                        hs_ids = g_malloc (n * sizeof (*hs_ids));
                        memcpy (hs_ids, p, n * sizeof (*hs_ids));
@@ -1194,6 +1195,8 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache,
                }
        }
 
+       msg_info_re_cache ("hyperscan database of %d regexps has been loaded", total);
+
        return TRUE;
 #endif
 }
index 5ba24d0561a6d295c1773bd8619abc2ca3dbb715..8b85c745a4d11295cd458143254753f2e9c21004 100644 (file)
@@ -48,7 +48,7 @@ struct rspamd_control_reply_elt {
        struct rspamd_control_reply reply;
        struct event io_ev;
        struct rspamd_worker *wrk;
-       struct rspamd_control_session *session;
+       gpointer ud;
        struct rspamd_control_reply_elt *prev, *next;
 };
 
@@ -182,7 +182,7 @@ rspamd_control_write_reply (struct rspamd_control_session *session)
                ucl_object_insert_key (cur, ucl_object_fromstring (g_quark_to_string (
                                elt->wrk->type)), "type", 0, false);
 
-               switch (elt->session->cmd.type) {
+               switch (session->cmd.type) {
                case RSPAMD_CONTROL_STAT:
                        ucl_object_insert_key (cur, ucl_object_fromint (
                                        elt->reply.reply.stat.conns), "conns", 0, false);
@@ -243,6 +243,9 @@ static void
 rspamd_control_wrk_io (gint fd, short what, gpointer ud)
 {
        struct rspamd_control_reply_elt *elt = ud;
+       struct rspamd_control_session *session;
+
+       session = elt->ud;
 
        if (read (elt->wrk->control_pipe[0], &elt->reply, sizeof (elt->reply)) !=
                                sizeof (elt->reply)) {
@@ -250,11 +253,11 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud)
                                elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno));
        }
 
-       elt->session->replies_remain --;
+       session->replies_remain --;
        event_del (&elt->io_ev);
 
-       if (elt->session->replies_remain == 0) {
-               rspamd_control_write_reply (elt->session);
+       if (session->replies_remain == 0) {
+               rspamd_control_write_reply (session);
        }
 }
 
@@ -273,18 +276,55 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
        }
 }
 
+static struct rspamd_control_reply_elt *
+rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
+               struct rspamd_control_command *cmd,
+               void (*handler) (evutil_socket_t, short, void *), gpointer ud)
+{
+       GHashTableIter it;
+       struct rspamd_worker *wrk;
+       struct rspamd_control_reply_elt *rep_elt, *res = NULL;
+       gpointer k, v;
+
+       g_hash_table_iter_init (&it, rspamd_main->workers);
+
+       while (g_hash_table_iter_next (&it, &k, &v)) {
+               wrk = v;
+
+               if (write (wrk->control_pipe[0], cmd,
+                               sizeof (*cmd)) == sizeof (*cmd)) {
+
+                       rep_elt = g_slice_alloc0 (sizeof (*rep_elt));
+                       rep_elt->wrk = wrk;
+                       rep_elt->ud = ud;
+                       event_set (&rep_elt->io_ev, wrk->control_pipe[0],
+                                       EV_READ | EV_PERSIST, handler,
+                                       rep_elt);
+                       event_base_set (rspamd_main->ev_base,
+                                       &rep_elt->io_ev);
+                       event_add (&rep_elt->io_ev, &worker_io_timeout);
+
+                       DL_APPEND (res, rep_elt);
+               }
+               else {
+                       msg_err ("cannot write request to the worker %P (%s): %s",
+                                       wrk->pid, g_quark_to_string (wrk->type), strerror (errno));
+               }
+       }
+
+       return res;
+}
+
 static gint
-rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
+rspamd_control_finish_handler (struct rspamd_http_connection *conn,
                struct rspamd_http_message *msg)
 {
        struct rspamd_control_session *session = conn->ud;
        rspamd_ftok_t srch;
        guint i;
        gboolean found = FALSE;
-       GHashTableIter it;
-       struct rspamd_worker *wrk;
-       struct rspamd_control_reply_elt *rep_elt;
-       gpointer k, v;
+       struct rspamd_control_reply_elt *cur;
+
 
        if (!session->is_reply) {
                if (msg->url == NULL) {
@@ -311,31 +351,12 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
                }
                else {
                        /* Send command to all workers */
-                       g_hash_table_iter_init (&it, session->rspamd_main->workers);
-
-                       while (g_hash_table_iter_next (&it, &k, &v)) {
-                               wrk = v;
-
-                               if (write (wrk->control_pipe[0], &session->cmd,
-                                               sizeof (session->cmd)) == sizeof (session->cmd)) {
-
-                                       rep_elt = g_slice_alloc0 (sizeof (*rep_elt));
-                                       rep_elt->wrk = wrk;
-                                       rep_elt->session = session;
-                                       event_set (&rep_elt->io_ev, wrk->control_pipe[0],
-                                                       EV_READ | EV_PERSIST, rspamd_control_wrk_io,
-                                                       rep_elt);
-                                       event_base_set (session->rspamd_main->ev_base,
-                                                       &rep_elt->io_ev);
-                                       event_add (&rep_elt->io_ev, &worker_io_timeout);
-
-                                       DL_APPEND (session->replies, rep_elt);
-                                       session->replies_remain ++;
-                               }
-                               else {
-                                       msg_err ("cannot write request to the worker %P (%s): %s",
-                                               wrk->pid, g_quark_to_string (wrk->type), strerror (errno));
-                               }
+                       session->replies = rspamd_control_broadcast_cmd (
+                                       session->rspamd_main, &session->cmd,
+                                       rspamd_control_wrk_io, session);
+
+                       DL_FOREACH (session->replies, cur) {
+                               session->replies_remain ++;
                        }
                }
        }
@@ -357,7 +378,7 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main,
 
        session->fd = fd;
        session->conn = rspamd_http_connection_new (NULL, rspamd_control_error_handler,
-                       rspamd_control_finish_hadler, 0, RSPAMD_HTTP_SERVER, NULL);
+                       rspamd_control_finish_handler, 0, RSPAMD_HTTP_SERVER, NULL);
        session->rspamd_main = rspamd_main;
        rspamd_http_connection_read_message (session->conn, session, session->fd,
                        &io_timeout, rspamd_main->ev_base);
@@ -403,6 +424,7 @@ rspamd_control_default_cmd_handler (gint fd,
                break;
        case RSPAMD_CONTROL_RELOAD:
        case RSPAMD_CONTROL_RECOMPILE:
+       case RSPAMD_CONTROL_HYPERSCAN_LOADED:
                break;
        case RSPAMD_CONTROL_RERESOLVE:
                if (cd->worker->srv->cfg) {
@@ -504,6 +526,16 @@ struct rspamd_srv_reply_data {
        struct rspamd_srv_reply rep;
 };
 
+static void
+rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
+{
+       struct rspamd_control_reply_elt *elt = ud;
+
+       /* At this point we just ignore replies from the workers */
+       event_del (&elt->io_ev);
+       g_slice_free1 (sizeof (*elt), elt);
+}
+
 static void
 rspamd_srv_handler (gint fd, short what, gpointer ud)
 {
@@ -517,6 +549,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
        guchar fdspace[CMSG_SPACE(sizeof (int))];
        gint *spair;
        gchar *nid;
+       struct rspamd_control_command wcmd;
        gssize r;
 
        if (what == EV_READ) {
@@ -571,6 +604,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                        rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
                                }
                                break;
+                       case RSPAMD_SRV_HYPERSCAN_LOADED:
+                               /* Broadcast command to all workers */
+                               wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
+                               /*
+                                * We assume that cache dir is shared at the same address for all
+                                * workers
+                                */
+                               wcmd.cmd.hs_loaded.cache_dir = cmd.cmd.hs_loaded.cache_dir;
+                               rspamd_control_broadcast_cmd (srv, &wcmd,
+                                               rspamd_control_hs_io_handler, NULL);
+                               rdata->rep.reply.hs_loaded.unused = 0;
+                               break;
                        default:
                                msg_err ("unknown command type: %d", cmd.type);
                                break;
@@ -708,7 +753,10 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
        return;
 
 cleanup:
-       rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
+
+       if (rd->handler) {
+               rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
+       }
        event_del (&rd->io_ev);
        g_slice_free1 (sizeof (*rd), rd);
 }
@@ -723,7 +771,6 @@ rspamd_srv_send_command (struct rspamd_worker *worker,
 
        g_assert (cmd != NULL);
        g_assert (worker != NULL);
-       g_assert (handler != NULL);
 
        rd = g_slice_alloc0 (sizeof (*rd));
        memcpy (&rd->cmd, cmd, sizeof (rd->cmd));
index 74597a1ac06a08fc19328c35cc77b14248d7a3f5..4ae83692536a05c16d03c5f846613fa0e4f2ed1f 100644 (file)
@@ -36,11 +36,13 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_RELOAD,
        RSPAMD_CONTROL_RERESOLVE,
        RSPAMD_CONTROL_RECOMPILE,
+       RSPAMD_CONTROL_HYPERSCAN_LOADED,
        RSPAMD_CONTROL_MAX
 };
 
 enum rspamd_srv_type {
        RSPAMD_SRV_SOCKETPAIR = 0,
+       RSPAMD_SRV_HYPERSCAN_LOADED,
 };
 
 struct rspamd_control_command {
@@ -58,6 +60,9 @@ struct rspamd_control_command {
                struct {
                        guint unused;
                } recompile;
+               struct {
+                       gpointer cache_dir;
+               } hs_loaded;
        } cmd;
 };
 
@@ -80,6 +85,9 @@ struct rspamd_control_reply {
                struct {
                        guint status;
                } recompile;
+               struct {
+                       guint status;
+               } hs_loaded;
        } reply;
 };
 
@@ -93,6 +101,9 @@ struct rspamd_srv_command {
                        gchar pair_id[PAIR_ID_LEN];
                        guint pair_num;
                } spair;
+               struct {
+                       gpointer cache_dir;
+               } hs_loaded;
        } cmd;
 };
 
@@ -103,6 +114,9 @@ struct rspamd_srv_reply {
                struct {
                        gint code;
                } spair;
+               struct {
+                       gint unused;
+               } hs_loaded;
        } reply;
 };