浏览代码

[Fix] Try hard to deal with ghost workers

tags/2.5
Vsevolod Stakhov 4 年前
父节点
当前提交
856a786462
共有 5 个文件被更改,包括 41 次插入7 次删除
  1. 30
    7
      src/libserver/rspamd_control.c
  2. 6
    0
      src/libserver/rspamd_control.h
  3. 3
    0
      src/libserver/worker_util.c
  4. 1
    0
      src/rspamd.c
  5. 1
    0
      src/rspamd.h

+ 30
- 7
src/libserver/rspamd_control.c 查看文件

@@ -40,6 +40,7 @@ struct rspamd_control_reply_elt {
pid_t wrk_pid;
gpointer ud;
gint attached_fd;
GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};

@@ -105,6 +106,17 @@ static const struct rspamd_control_cmd_match {

static void rspamd_control_ignore_io_handler (int fd, short what, void *ud);

static void
rspamd_control_stop_pending (struct rspamd_control_reply_elt *elt)
{
GHashTable *htb;
/* It stops event and frees hash */
htb = elt->pending_elts;
g_hash_table_remove (elt->pending_elts, elt);
/* Release hash reference */
g_hash_table_unref (htb);
}

void
rspamd_control_send_error (struct rspamd_control_session *session,
gint code, const gchar *error_msg, ...)
@@ -168,9 +180,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
rspamd_inet_address_to_string (session->addr));

DL_FOREACH_SAFE (session->replies, elt, telt) {
rspamd_ev_watcher_stop (session->event_loop,
&elt->ev);
g_free (elt);
rspamd_control_stop_pending (elt);
}

rspamd_inet_address_free (session->addr);
@@ -385,6 +395,15 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
}
}

void
rspamd_pending_control_free (gpointer p)
{
struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *)p;

rspamd_ev_watcher_stop (rep_elt->event_loop, &rep_elt->ev);
g_free (rep_elt);
}

static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
@@ -443,12 +462,14 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
rep_elt->pending_elts = g_hash_table_ref (wrk->control_events_pending);
rspamd_ev_watcher_init (&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start (rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
g_hash_table_insert (wrk->control_events_pending, rep_elt, rep_elt);

DL_APPEND (res, rep_elt);
}
@@ -750,12 +771,12 @@ rspamd_control_ignore_io_handler (int fd, short what, void *ud)
{
struct rspamd_control_reply_elt *elt =
(struct rspamd_control_reply_elt *)ud;

struct rspamd_control_reply rep;

/* At this point we just ignore replies from the workers */
(void)read (fd, &rep, sizeof (rep));
rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
g_free (elt);
rspamd_control_stop_pending (elt);
}

static void
@@ -767,8 +788,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)

/* At this point we just ignore replies from the workers */
(void) read (fd, &rep, sizeof (rep));
rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
g_free (elt);
rspamd_control_stop_pending (elt);
}

static void
@@ -802,6 +822,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
REF_RELEASE (child->cf);
g_hash_table_remove (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid));
g_hash_table_unref (child->control_events_pending);
g_free (child);
}
else {
@@ -816,6 +837,8 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
child->cf = parent->cf;
child->ppid = parent->pid;
REF_RETAIN (child->cf);
child->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, rspamd_pending_control_free);
g_hash_table_insert (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
}

+ 6
- 0
src/libserver/rspamd_control.h 查看文件

@@ -274,6 +274,12 @@ enum rspamd_control_type rspamd_control_command_from_string (const gchar *str);
*/
const gchar *rspamd_control_command_to_string (enum rspamd_control_type cmd);

/**
* Used to cleanup pending events
* @param p
*/
void rspamd_pending_control_free (gpointer p);

#ifdef __cplusplus
}
#endif

+ 3
- 0
src/libserver/worker_util.c 查看文件

@@ -466,6 +466,7 @@ rspamd_worker_init_signals (struct rspamd_worker *worker,
rspamd_worker_usr2_handler, NULL);
}


struct ev_loop *
rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
rspamd_accept_handler hdl)
@@ -979,6 +980,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
wrk->pid = fork ();
wrk->cores_throttled = rspamd_main->cores_throttling;
wrk->term_handler = term_handler;
wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, rspamd_pending_control_free);

switch (wrk->pid) {
case 0:

+ 1
- 0
src/rspamd.c 查看文件

@@ -1181,6 +1181,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
}

REF_RELEASE (wrk->cf);
g_hash_table_unref (wrk->control_events_pending);
g_free (wrk);
}


+ 1
- 0
src/rspamd.h 查看文件

@@ -121,6 +121,7 @@ struct rspamd_worker {
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
ev_child cld_ev; /**< to allow reaping */
rspamd_worker_term_cb term_handler; /**< custom term handler */
GHashTable *control_events_pending; /**< control events pending indexed by ptr */
};

struct rspamd_abstract_worker_ctx {

正在加载...
取消
保存