new_event->w = session->cur_watcher;
new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, watcher: %d",
+ "subsystem: %s, watcher: %d (%d)",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem),
- new_event->w->id);
+ new_event->w->id,
+ new_event->w->remain);
} else {
new_event->w = NULL;
msg_debug_session ("added event: %p, pending %d events, "
new_event->w = w;
new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, explicit watcher: %d",
+ "subsystem: %s, explicit watcher: %d (%d)",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem),
- new_event->w->id);
+ new_event->w->id,
+ new_event->w->remain);
}
kh_put (rspamd_events_hash, session->events, new_event, &ret);
"pending %d events, watcher: %d (%d pending)", ud,
g_quark_to_string (found_ev->subsystem),
kh_size (session->events),
- found_ev->w->id, found_ev->w->remain);
+ found_ev->w->id, found_ev->w->remain - 1);
if (found_ev->w->remain > 0) {
if (--found_ev->w->remain == 0) {
}
}
+struct rspamd_async_watcher*
+rspamd_session_replace_watcher (struct rspamd_async_session *s,
+ struct rspamd_async_watcher *w)
+{
+ struct rspamd_async_watcher *res = NULL;
+
+ g_assert (s != NULL);
+
+ if (s->cur_watcher) {
+ res = s->cur_watcher;
+
+ if (!w) {
+ /* We remove watching, so clear watching flag as well */
+ s->flags &= ~RSPAMD_SESSION_FLAG_WATCHING;
+
+ }
+
+ s->cur_watcher = w;
+ }
+ else {
+ if (w) {
+ s->flags |= RSPAMD_SESSION_FLAG_WATCHING;
+ }
+
+ s->cur_watcher = w;
+ }
+
+ return res;
+}
rspamd_mempool_t *
rspamd_session_mempool (struct rspamd_async_session *session)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
struct rspamd_http_header *h, *htmp;
+ struct rspamd_async_watcher *existing_watcher = NULL;
const gchar *body;
gsize body_len;
lua_settable (L, -3);
}
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, 4, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
lua_pop (L, 1);
}
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
lua_http_maybe_free (cbd);
lua_thread_pool_restore_callback (&lcbd);
const gchar *body;
gsize body_len;
struct rspamd_http_header *h, *htmp;
+ struct rspamd_async_watcher *existing_watcher = NULL;
if (err) {
lua_pushstring (L, err);
lua_settable (L, -3);
}
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
lua_thread_resume (cbd->thread, 2);
+
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
}
static gboolean