Browse Source

[Fix] Implement watchers replacement to handle nested calls

tags/1.8.0
Vsevolod Stakhov 5 years ago
parent
commit
5480f08714
3 changed files with 62 additions and 5 deletions
  1. 36
    5
      src/libserver/events.c
  2. 4
    0
      src/libserver/events.h
  3. 22
    0
      src/lua/lua_http.c

+ 36
- 5
src/libserver/events.c View File

@@ -193,11 +193,12 @@ rspamd_session_add_event (struct rspamd_async_session *session,
new_event->w = session->cur_watcher;
new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
"subsystem: %s, watcher: %d",
"subsystem: %s, watcher: %d (%d)",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem),
new_event->w->id);
new_event->w->id,
new_event->w->remain);
} else {
new_event->w = NULL;
msg_debug_session ("added event: %p, pending %d events, "
@@ -211,11 +212,12 @@ rspamd_session_add_event (struct rspamd_async_session *session,
new_event->w = w;
new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
"subsystem: %s, explicit watcher: %d",
"subsystem: %s, explicit watcher: %d (%d)",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem),
new_event->w->id);
new_event->w->id,
new_event->w->remain);
}

kh_put (rspamd_events_hash, session->events, new_event, &ret);
@@ -286,7 +288,7 @@ rspamd_session_remove_event (struct rspamd_async_session *session,
"pending %d events, watcher: %d (%d pending)", ud,
g_quark_to_string (found_ev->subsystem),
kh_size (session->events),
found_ev->w->id, found_ev->w->remain);
found_ev->w->id, found_ev->w->remain - 1);

if (found_ev->w->remain > 0) {
if (--found_ev->w->remain == 0) {
@@ -522,6 +524,35 @@ rspamd_session_get_watcher (struct rspamd_async_session *session)
}
}

struct rspamd_async_watcher*
rspamd_session_replace_watcher (struct rspamd_async_session *s,
struct rspamd_async_watcher *w)
{
struct rspamd_async_watcher *res = NULL;

g_assert (s != NULL);

if (s->cur_watcher) {
res = s->cur_watcher;

if (!w) {
/* We remove watching, so clear watching flag as well */
s->flags &= ~RSPAMD_SESSION_FLAG_WATCHING;

}

s->cur_watcher = w;
}
else {
if (w) {
s->flags |= RSPAMD_SESSION_FLAG_WATCHING;
}

s->cur_watcher = w;
}

return res;
}

rspamd_mempool_t *
rspamd_session_mempool (struct rspamd_async_session *session)

+ 4
- 0
src/libserver/events.h View File

@@ -151,6 +151,10 @@ void rspamd_session_watcher_pop (struct rspamd_async_session *s,
struct rspamd_async_watcher* rspamd_session_get_watcher (
struct rspamd_async_session *s);

struct rspamd_async_watcher* rspamd_session_replace_watcher (
struct rspamd_async_session *s,
struct rspamd_async_watcher *w);

/**
* Returns TRUE if an async session is currently destroying
* @param s

+ 22
- 0
src/lua/lua_http.c View File

@@ -204,6 +204,7 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
struct rspamd_http_header *h, *htmp;
struct rspamd_async_watcher *existing_watcher = NULL;
const gchar *body;
gsize body_len;

@@ -257,11 +258,21 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
lua_settable (L, -3);
}

if (cbd->w) {
/* Replace watcher to deal with nested calls */
existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
}

if (lua_pcall (L, 4, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
lua_pop (L, 1);
}

if (cbd->w) {
/* Restore existing watcher */
rspamd_session_replace_watcher (cbd->session, existing_watcher);
}

lua_http_maybe_free (cbd);

lua_thread_pool_restore_callback (&lcbd);
@@ -281,6 +292,7 @@ lua_http_resume_handler (struct rspamd_http_connection *conn,
const gchar *body;
gsize body_len;
struct rspamd_http_header *h, *htmp;
struct rspamd_async_watcher *existing_watcher = NULL;

if (err) {
lua_pushstring (L, err);
@@ -343,7 +355,17 @@ lua_http_resume_handler (struct rspamd_http_connection *conn,
lua_settable (L, -3);
}

if (cbd->w) {
/* Replace watcher to deal with nested calls */
existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
}

lua_thread_resume (cbd->thread, 2);

if (cbd->w) {
/* Restore existing watcher */
rspamd_session_replace_watcher (cbd->session, existing_watcher);
}
}

static gboolean

Loading…
Cancel
Save