summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-12-08 18:17:32 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-12-08 18:17:32 +0000
commiteccc8fb354cc26ae171d43d6a25c5ffcdb861263 (patch)
treeadba1bfca99280ee391819e2bd153e78fbdf827b /src
parent37b7b0c2812cca78bad63e8f15842c7cc8d891f1 (diff)
downloadrspamd-eccc8fb354cc26ae171d43d6a25c5ffcdb861263.tar.gz
rspamd-eccc8fb354cc26ae171d43d6a25c5ffcdb861263.zip
Add event for loading of hyperscan files
Diffstat (limited to 'src')
-rw-r--r--src/libserver/re_cache.c5
-rw-r--r--src/libserver/rspamd_control.c123
-rw-r--r--src/libserver/rspamd_control.h14
3 files changed, 103 insertions, 39 deletions
diff --git a/src/libserver/re_cache.c b/src/libserver/re_cache.c
index c893ceb02..3e67fd9ed 100644
--- a/src/libserver/re_cache.c
+++ b/src/libserver/re_cache.c
@@ -1105,7 +1105,7 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache,
return FALSE;
#else
gchar path[PATH_MAX];
- gint fd, i, n, *hs_ids = NULL;
+ gint fd, i, n, *hs_ids = NULL, total = 0;
GHashTableIter it;
gpointer k, v;
guint8 *map, *p, *end;
@@ -1154,6 +1154,7 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache,
return FALSE;
}
+ total += n;
p += sizeof (n);
hs_ids = g_malloc (n * sizeof (*hs_ids));
memcpy (hs_ids, p, n * sizeof (*hs_ids));
@@ -1194,6 +1195,8 @@ rspamd_re_cache_load_hyperscan (struct rspamd_re_cache *cache,
}
}
+ msg_info_re_cache ("hyperscan database of %d regexps has been loaded", total);
+
return TRUE;
#endif
}
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 5ba24d056..8b85c745a 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -48,7 +48,7 @@ struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
struct event io_ev;
struct rspamd_worker *wrk;
- struct rspamd_control_session *session;
+ gpointer ud;
struct rspamd_control_reply_elt *prev, *next;
};
@@ -182,7 +182,7 @@ rspamd_control_write_reply (struct rspamd_control_session *session)
ucl_object_insert_key (cur, ucl_object_fromstring (g_quark_to_string (
elt->wrk->type)), "type", 0, false);
- switch (elt->session->cmd.type) {
+ switch (session->cmd.type) {
case RSPAMD_CONTROL_STAT:
ucl_object_insert_key (cur, ucl_object_fromint (
elt->reply.reply.stat.conns), "conns", 0, false);
@@ -243,6 +243,9 @@ static void
rspamd_control_wrk_io (gint fd, short what, gpointer ud)
{
struct rspamd_control_reply_elt *elt = ud;
+ struct rspamd_control_session *session;
+
+ session = elt->ud;
if (read (elt->wrk->control_pipe[0], &elt->reply, sizeof (elt->reply)) !=
sizeof (elt->reply)) {
@@ -250,11 +253,11 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud)
elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno));
}
- elt->session->replies_remain --;
+ session->replies_remain --;
event_del (&elt->io_ev);
- if (elt->session->replies_remain == 0) {
- rspamd_control_write_reply (elt->session);
+ if (session->replies_remain == 0) {
+ rspamd_control_write_reply (session);
}
}
@@ -273,18 +276,55 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
}
}
+static struct rspamd_control_reply_elt *
+rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
+ struct rspamd_control_command *cmd,
+ void (*handler) (evutil_socket_t, short, void *), gpointer ud)
+{
+ GHashTableIter it;
+ struct rspamd_worker *wrk;
+ struct rspamd_control_reply_elt *rep_elt, *res = NULL;
+ gpointer k, v;
+
+ g_hash_table_iter_init (&it, rspamd_main->workers);
+
+ while (g_hash_table_iter_next (&it, &k, &v)) {
+ wrk = v;
+
+ if (write (wrk->control_pipe[0], cmd,
+ sizeof (*cmd)) == sizeof (*cmd)) {
+
+ rep_elt = g_slice_alloc0 (sizeof (*rep_elt));
+ rep_elt->wrk = wrk;
+ rep_elt->ud = ud;
+ event_set (&rep_elt->io_ev, wrk->control_pipe[0],
+ EV_READ | EV_PERSIST, handler,
+ rep_elt);
+ event_base_set (rspamd_main->ev_base,
+ &rep_elt->io_ev);
+ event_add (&rep_elt->io_ev, &worker_io_timeout);
+
+ DL_APPEND (res, rep_elt);
+ }
+ else {
+ msg_err ("cannot write request to the worker %P (%s): %s",
+ wrk->pid, g_quark_to_string (wrk->type), strerror (errno));
+ }
+ }
+
+ return res;
+}
+
static gint
-rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
+rspamd_control_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
struct rspamd_control_session *session = conn->ud;
rspamd_ftok_t srch;
guint i;
gboolean found = FALSE;
- GHashTableIter it;
- struct rspamd_worker *wrk;
- struct rspamd_control_reply_elt *rep_elt;
- gpointer k, v;
+ struct rspamd_control_reply_elt *cur;
+
if (!session->is_reply) {
if (msg->url == NULL) {
@@ -311,31 +351,12 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
}
else {
/* Send command to all workers */
- g_hash_table_iter_init (&it, session->rspamd_main->workers);
-
- while (g_hash_table_iter_next (&it, &k, &v)) {
- wrk = v;
-
- if (write (wrk->control_pipe[0], &session->cmd,
- sizeof (session->cmd)) == sizeof (session->cmd)) {
-
- rep_elt = g_slice_alloc0 (sizeof (*rep_elt));
- rep_elt->wrk = wrk;
- rep_elt->session = session;
- event_set (&rep_elt->io_ev, wrk->control_pipe[0],
- EV_READ | EV_PERSIST, rspamd_control_wrk_io,
- rep_elt);
- event_base_set (session->rspamd_main->ev_base,
- &rep_elt->io_ev);
- event_add (&rep_elt->io_ev, &worker_io_timeout);
-
- DL_APPEND (session->replies, rep_elt);
- session->replies_remain ++;
- }
- else {
- msg_err ("cannot write request to the worker %P (%s): %s",
- wrk->pid, g_quark_to_string (wrk->type), strerror (errno));
- }
+ session->replies = rspamd_control_broadcast_cmd (
+ session->rspamd_main, &session->cmd,
+ rspamd_control_wrk_io, session);
+
+ DL_FOREACH (session->replies, cur) {
+ session->replies_remain ++;
}
}
}
@@ -357,7 +378,7 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main,
session->fd = fd;
session->conn = rspamd_http_connection_new (NULL, rspamd_control_error_handler,
- rspamd_control_finish_hadler, 0, RSPAMD_HTTP_SERVER, NULL);
+ rspamd_control_finish_handler, 0, RSPAMD_HTTP_SERVER, NULL);
session->rspamd_main = rspamd_main;
rspamd_http_connection_read_message (session->conn, session, session->fd,
&io_timeout, rspamd_main->ev_base);
@@ -403,6 +424,7 @@ rspamd_control_default_cmd_handler (gint fd,
break;
case RSPAMD_CONTROL_RELOAD:
case RSPAMD_CONTROL_RECOMPILE:
+ case RSPAMD_CONTROL_HYPERSCAN_LOADED:
break;
case RSPAMD_CONTROL_RERESOLVE:
if (cd->worker->srv->cfg) {
@@ -505,6 +527,16 @@ struct rspamd_srv_reply_data {
};
static void
+rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
+{
+ struct rspamd_control_reply_elt *elt = ud;
+
+ /* At this point we just ignore replies from the workers */
+ event_del (&elt->io_ev);
+ g_slice_free1 (sizeof (*elt), elt);
+}
+
+static void
rspamd_srv_handler (gint fd, short what, gpointer ud)
{
struct rspamd_worker *worker;
@@ -517,6 +549,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
guchar fdspace[CMSG_SPACE(sizeof (int))];
gint *spair;
gchar *nid;
+ struct rspamd_control_command wcmd;
gssize r;
if (what == EV_READ) {
@@ -571,6 +604,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
}
break;
+ case RSPAMD_SRV_HYPERSCAN_LOADED:
+ /* Broadcast command to all workers */
+ wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
+ /*
+ * We assume that cache dir is shared at the same address for all
+ * workers
+ */
+ wcmd.cmd.hs_loaded.cache_dir = cmd.cmd.hs_loaded.cache_dir;
+ rspamd_control_broadcast_cmd (srv, &wcmd,
+ rspamd_control_hs_io_handler, NULL);
+ rdata->rep.reply.hs_loaded.unused = 0;
+ break;
default:
msg_err ("unknown command type: %d", cmd.type);
break;
@@ -708,7 +753,10 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
return;
cleanup:
- rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
+
+ if (rd->handler) {
+ rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
+ }
event_del (&rd->io_ev);
g_slice_free1 (sizeof (*rd), rd);
}
@@ -723,7 +771,6 @@ rspamd_srv_send_command (struct rspamd_worker *worker,
g_assert (cmd != NULL);
g_assert (worker != NULL);
- g_assert (handler != NULL);
rd = g_slice_alloc0 (sizeof (*rd));
memcpy (&rd->cmd, cmd, sizeof (rd->cmd));
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index 74597a1ac..4ae836925 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -36,11 +36,13 @@ enum rspamd_control_type {
RSPAMD_CONTROL_RELOAD,
RSPAMD_CONTROL_RERESOLVE,
RSPAMD_CONTROL_RECOMPILE,
+ RSPAMD_CONTROL_HYPERSCAN_LOADED,
RSPAMD_CONTROL_MAX
};
enum rspamd_srv_type {
RSPAMD_SRV_SOCKETPAIR = 0,
+ RSPAMD_SRV_HYPERSCAN_LOADED,
};
struct rspamd_control_command {
@@ -58,6 +60,9 @@ struct rspamd_control_command {
struct {
guint unused;
} recompile;
+ struct {
+ gpointer cache_dir;
+ } hs_loaded;
} cmd;
};
@@ -80,6 +85,9 @@ struct rspamd_control_reply {
struct {
guint status;
} recompile;
+ struct {
+ guint status;
+ } hs_loaded;
} reply;
};
@@ -93,6 +101,9 @@ struct rspamd_srv_command {
gchar pair_id[PAIR_ID_LEN];
guint pair_num;
} spair;
+ struct {
+ gpointer cache_dir;
+ } hs_loaded;
} cmd;
};
@@ -103,6 +114,9 @@ struct rspamd_srv_reply {
struct {
gint code;
} spair;
+ struct {
+ gint unused;
+ } hs_loaded;
} reply;
};