|
|
@@ -34,11 +34,13 @@ static struct timeval io_timeout = { |
|
|
|
.tv_usec = 0 |
|
|
|
}; |
|
|
|
|
|
|
|
struct rspamd_control_session; |
|
|
|
|
|
|
|
struct rspamd_control_reply_elt { |
|
|
|
struct rspamd_control_reply reply; |
|
|
|
struct event io_ev; |
|
|
|
struct event tm_ev; |
|
|
|
struct rspamd_worker *wrk; |
|
|
|
struct rspamd_control_session *session; |
|
|
|
struct rspamd_control_reply_elt *prev, *next; |
|
|
|
}; |
|
|
|
|
|
|
@@ -48,6 +50,7 @@ struct rspamd_control_session { |
|
|
|
struct rspamd_http_connection *conn; |
|
|
|
struct rspamd_control_command cmd; |
|
|
|
struct rspamd_control_reply_elt *replies; |
|
|
|
guint replies_remain; |
|
|
|
gboolean is_reply; |
|
|
|
}; |
|
|
|
|
|
|
@@ -128,6 +131,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session) |
|
|
|
struct rspamd_control_reply_elt *elt, *telt; |
|
|
|
|
|
|
|
DL_FOREACH_SAFE (session->replies, elt, telt) { |
|
|
|
event_del (&elt->io_ev); |
|
|
|
g_slice_free1 (sizeof (*elt), elt); |
|
|
|
} |
|
|
|
|
|
|
@@ -136,6 +140,26 @@ rspamd_control_connection_close (struct rspamd_control_session *session) |
|
|
|
g_slice_free1 (sizeof (*session), session); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_control_wrk_io (gint fd, short what, gpointer ud) |
|
|
|
{ |
|
|
|
struct rspamd_control_reply_elt *elt = ud; |
|
|
|
|
|
|
|
if (read (elt->wrk->control_pipe[1], &elt->reply, sizeof (elt->reply)) != |
|
|
|
sizeof (elt->reply)) { |
|
|
|
msg_err ("cannot read request from the worker %p (%s): %s", |
|
|
|
elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno)); |
|
|
|
} |
|
|
|
|
|
|
|
elt->session->replies_remain --; |
|
|
|
event_del (&elt->io_ev); |
|
|
|
|
|
|
|
if (elt->session->replies_remain == 0) { |
|
|
|
/* TODO: add reply logic */ |
|
|
|
rspamd_control_connection_close (elt->session); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err) |
|
|
|
{ |
|
|
@@ -159,6 +183,10 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn, |
|
|
|
rspamd_ftok_t srch; |
|
|
|
guint i; |
|
|
|
gboolean found = FALSE; |
|
|
|
GHashTableIter it; |
|
|
|
struct rspamd_worker *wrk; |
|
|
|
struct rspamd_control_reply_elt *rep_elt; |
|
|
|
gpointer k, v; |
|
|
|
|
|
|
|
if (!session->is_reply) { |
|
|
|
if (msg->url == NULL) { |
|
|
@@ -184,7 +212,33 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn, |
|
|
|
rspamd_control_send_error (session, 404, "Command not defined"); |
|
|
|
} |
|
|
|
else { |
|
|
|
rspamd_control_send_error (session, 500, "Not implemented yet"); |
|
|
|
/* Send command to all workers */ |
|
|
|
g_hash_table_iter_init (&it, session->rspamd_main->workers); |
|
|
|
|
|
|
|
while (g_hash_table_iter_next (&it, &k, &v)) { |
|
|
|
wrk = v; |
|
|
|
|
|
|
|
if (write (wrk->control_pipe[0], &session->cmd, |
|
|
|
sizeof (session->cmd)) == sizeof (session->cmd)) { |
|
|
|
|
|
|
|
rep_elt = g_slice_alloc0 (sizeof (*rep_elt)); |
|
|
|
rep_elt->wrk = wrk; |
|
|
|
rep_elt->session = session; |
|
|
|
event_set (&rep_elt->io_ev, wrk->control_pipe[0], |
|
|
|
EV_READ | EV_PERSIST, rspamd_control_wrk_io, |
|
|
|
rep_elt); |
|
|
|
event_base_set (session->rspamd_main->ev_base, |
|
|
|
&rep_elt->io_ev); |
|
|
|
event_add (&rep_elt->io_ev, &io_timeout); |
|
|
|
|
|
|
|
DL_APPEND (session->replies, rep_elt); |
|
|
|
session->replies_remain ++; |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_err ("cannot write request to the worker %p (%s): %s", |
|
|
|
wrk->pid, g_quark_to_string (wrk->type), strerror (errno)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else { |