summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libserver/rspamd_control.c37
-rw-r--r--src/libserver/rspamd_control.h6
-rw-r--r--src/libserver/worker_util.c3
-rw-r--r--src/rspamd.c1
-rw-r--r--src/rspamd.h1
5 files changed, 41 insertions, 7 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 4b2cfc733..30d959e47 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -40,6 +40,7 @@ struct rspamd_control_reply_elt {
pid_t wrk_pid;
gpointer ud;
gint attached_fd;
+ GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
@@ -105,6 +106,17 @@ static const struct rspamd_control_cmd_match {
static void rspamd_control_ignore_io_handler (int fd, short what, void *ud);
+static void
+rspamd_control_stop_pending (struct rspamd_control_reply_elt *elt)
+{
+ GHashTable *htb;
+ /* It stops event and frees hash */
+ htb = elt->pending_elts;
+ g_hash_table_remove (elt->pending_elts, elt);
+ /* Release hash reference */
+ g_hash_table_unref (htb);
+}
+
void
rspamd_control_send_error (struct rspamd_control_session *session,
gint code, const gchar *error_msg, ...)
@@ -168,9 +180,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
rspamd_inet_address_to_string (session->addr));
DL_FOREACH_SAFE (session->replies, elt, telt) {
- rspamd_ev_watcher_stop (session->event_loop,
- &elt->ev);
- g_free (elt);
+ rspamd_control_stop_pending (elt);
}
rspamd_inet_address_free (session->addr);
@@ -385,6 +395,15 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
}
}
+void
+rspamd_pending_control_free (gpointer p)
+{
+ struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *)p;
+
+ rspamd_ev_watcher_stop (rep_elt->event_loop, &rep_elt->ev);
+ g_free (rep_elt);
+}
+
static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
@@ -443,12 +462,14 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
+ rep_elt->pending_elts = g_hash_table_ref (wrk->control_events_pending);
rspamd_ev_watcher_init (&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start (rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
+ g_hash_table_insert (wrk->control_events_pending, rep_elt, rep_elt);
DL_APPEND (res, rep_elt);
}
@@ -750,12 +771,12 @@ rspamd_control_ignore_io_handler (int fd, short what, void *ud)
{
struct rspamd_control_reply_elt *elt =
(struct rspamd_control_reply_elt *)ud;
+
struct rspamd_control_reply rep;
/* At this point we just ignore replies from the workers */
(void)read (fd, &rep, sizeof (rep));
- rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
- g_free (elt);
+ rspamd_control_stop_pending (elt);
}
static void
@@ -767,8 +788,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
/* At this point we just ignore replies from the workers */
(void) read (fd, &rep, sizeof (rep));
- rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
- g_free (elt);
+ rspamd_control_stop_pending (elt);
}
static void
@@ -802,6 +822,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
REF_RELEASE (child->cf);
g_hash_table_remove (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid));
+ g_hash_table_unref (child->control_events_pending);
g_free (child);
}
else {
@@ -816,6 +837,8 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
child->cf = parent->cf;
child->ppid = parent->pid;
REF_RETAIN (child->cf);
+ child->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL, rspamd_pending_control_free);
g_hash_table_insert (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
}
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index d1ce88f31..21ab1a663 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -274,6 +274,12 @@ enum rspamd_control_type rspamd_control_command_from_string (const gchar *str);
*/
const gchar *rspamd_control_command_to_string (enum rspamd_control_type cmd);
+/**
+ * Used to cleanup pending events
+ * @param p
+ */
+void rspamd_pending_control_free (gpointer p);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index d97190f2b..5a2234f29 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -466,6 +466,7 @@ rspamd_worker_init_signals (struct rspamd_worker *worker,
rspamd_worker_usr2_handler, NULL);
}
+
struct ev_loop *
rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
rspamd_accept_handler hdl)
@@ -979,6 +980,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
wrk->pid = fork ();
wrk->cores_throttled = rspamd_main->cores_throttling;
wrk->term_handler = term_handler;
+ wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL, rspamd_pending_control_free);
switch (wrk->pid) {
case 0:
diff --git a/src/rspamd.c b/src/rspamd.c
index dee990c41..fb3b93e36 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -1181,6 +1181,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
}
REF_RELEASE (wrk->cf);
+ g_hash_table_unref (wrk->control_events_pending);
g_free (wrk);
}
diff --git a/src/rspamd.h b/src/rspamd.h
index 8885480c2..9e50c054a 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -121,6 +121,7 @@ struct rspamd_worker {
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
ev_child cld_ev; /**< to allow reaping */
rspamd_worker_term_cb term_handler; /**< custom term handler */
+ GHashTable *control_events_pending; /**< control events pending indexed by ptr */
};
struct rspamd_abstract_worker_ctx {