diff options
-rw-r--r-- | src/libserver/rspamd_control.c | 37 | ||||
-rw-r--r-- | src/libserver/rspamd_control.h | 6 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 3 | ||||
-rw-r--r-- | src/rspamd.c | 1 | ||||
-rw-r--r-- | src/rspamd.h | 1 |
5 files changed, 41 insertions, 7 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 4b2cfc733..30d959e47 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -40,6 +40,7 @@ struct rspamd_control_reply_elt { pid_t wrk_pid; gpointer ud; gint attached_fd; + GHashTable *pending_elts; struct rspamd_control_reply_elt *prev, *next; }; @@ -105,6 +106,17 @@ static const struct rspamd_control_cmd_match { static void rspamd_control_ignore_io_handler (int fd, short what, void *ud); +static void +rspamd_control_stop_pending (struct rspamd_control_reply_elt *elt) +{ + GHashTable *htb; + /* It stops event and frees hash */ + htb = elt->pending_elts; + g_hash_table_remove (elt->pending_elts, elt); + /* Release hash reference */ + g_hash_table_unref (htb); +} + void rspamd_control_send_error (struct rspamd_control_session *session, gint code, const gchar *error_msg, ...) @@ -168,9 +180,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session) rspamd_inet_address_to_string (session->addr)); DL_FOREACH_SAFE (session->replies, elt, telt) { - rspamd_ev_watcher_stop (session->event_loop, - &elt->ev); - g_free (elt); + rspamd_control_stop_pending (elt); } rspamd_inet_address_free (session->addr); @@ -385,6 +395,15 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err) } } +void +rspamd_pending_control_free (gpointer p) +{ + struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *)p; + + rspamd_ev_watcher_stop (rep_elt->event_loop, &rep_elt->ev); + g_free (rep_elt); +} + static struct rspamd_control_reply_elt * rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main, struct rspamd_control_command *cmd, @@ -443,12 +462,14 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main, rep_elt->wrk_type = wrk->type; rep_elt->event_loop = rspamd_main->event_loop; rep_elt->ud = ud; + rep_elt->pending_elts = g_hash_table_ref (wrk->control_events_pending); rspamd_ev_watcher_init (&rep_elt->ev, wrk->control_pipe[0], EV_READ, handler, rep_elt); rspamd_ev_watcher_start (rspamd_main->event_loop, &rep_elt->ev, worker_io_timeout); + g_hash_table_insert (wrk->control_events_pending, rep_elt, rep_elt); DL_APPEND (res, rep_elt); } @@ -750,12 +771,12 @@ rspamd_control_ignore_io_handler (int fd, short what, void *ud) { struct rspamd_control_reply_elt *elt = (struct rspamd_control_reply_elt *)ud; + struct rspamd_control_reply rep; /* At this point we just ignore replies from the workers */ (void)read (fd, &rep, sizeof (rep)); - rspamd_ev_watcher_stop (elt->event_loop, &elt->ev); - g_free (elt); + rspamd_control_stop_pending (elt); } static void @@ -767,8 +788,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud) /* At this point we just ignore replies from the workers */ (void) read (fd, &rep, sizeof (rep)); - rspamd_ev_watcher_stop (elt->event_loop, &elt->ev); - g_free (elt); + rspamd_control_stop_pending (elt); } static void @@ -802,6 +822,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd, REF_RELEASE (child->cf); g_hash_table_remove (srv->workers, GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid)); + g_hash_table_unref (child->control_events_pending); g_free (child); } else { @@ -816,6 +837,8 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd, child->cf = parent->cf; child->ppid = parent->pid; REF_RETAIN (child->cf); + child->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, rspamd_pending_control_free); g_hash_table_insert (srv->workers, GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child); } diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index d1ce88f31..21ab1a663 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -274,6 +274,12 @@ enum rspamd_control_type rspamd_control_command_from_string (const gchar *str); */ const gchar *rspamd_control_command_to_string (enum rspamd_control_type cmd); +/** + * Used to cleanup pending events + * @param p + */ +void rspamd_pending_control_free (gpointer p); + #ifdef __cplusplus } #endif diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index d97190f2b..5a2234f29 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -466,6 +466,7 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, rspamd_worker_usr2_handler, NULL); } + struct ev_loop * rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, rspamd_accept_handler hdl) @@ -979,6 +980,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, wrk->pid = fork (); wrk->cores_throttled = rspamd_main->cores_throttling; wrk->term_handler = term_handler; + wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, rspamd_pending_control_free); switch (wrk->pid) { case 0: diff --git a/src/rspamd.c b/src/rspamd.c index dee990c41..fb3b93e36 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -1181,6 +1181,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main, } REF_RELEASE (wrk->cf); + g_hash_table_unref (wrk->control_events_pending); g_free (wrk); } diff --git a/src/rspamd.h b/src/rspamd.h index 8885480c2..9e50c054a 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -121,6 +121,7 @@ struct rspamd_worker { gpointer tmp_data; /**< used to avoid race condition to deal with control messages */ ev_child cld_ev; /**< to allow reaping */ rspamd_worker_term_cb term_handler; /**< custom term handler */ + GHashTable *control_events_pending; /**< control events pending indexed by ptr */ }; struct rspamd_abstract_worker_ctx { |