if (session) {
if (req != NULL) {
- rspamd_session_add_event (session,
- (event_finalizer_t)rspamd_dns_fin_cb,
- reqdata,
+ rspamd_session_add_event (session, NULL, (event_finalizer_t) rspamd_dns_fin_cb, reqdata,
g_quark_from_static_string ("dns resolver"));
}
}
struct rspamd_async_event *
rspamd_session_add_event (struct rspamd_async_session *session,
- event_finalizer_t fin,
- void *user_data,
- GQuark subsystem)
+ struct rspamd_async_watcher *w,
+ event_finalizer_t fin,
+ gpointer user_data,
+ GQuark subsystem)
{
struct rspamd_async_event *new_event;
gint ret;
new_event->user_data = user_data;
new_event->subsystem = subsystem;
- if (RSPAMD_SESSION_IS_WATCHING (session)) {
- new_event->w = session->cur_watcher;
- new_event->w->remain ++;
- msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, watcher: %d",
- user_data,
- kh_size (session->events),
- g_quark_to_string (subsystem),
- new_event->w->id);
+ if (w == NULL) {
+ if (RSPAMD_SESSION_IS_WATCHING (session)) {
+ new_event->w = session->cur_watcher;
+ new_event->w->remain++;
+ msg_debug_session ("added event: %p, pending %d events, "
+ "subsystem: %s, watcher: %d",
+ user_data,
+ kh_size (session->events),
+ g_quark_to_string (subsystem),
+ new_event->w->id);
+ } else {
+ new_event->w = NULL;
+ msg_debug_session ("added event: %p, pending %d events, "
+ "subsystem: %s, no watcher!",
+ user_data,
+ kh_size (session->events),
+ g_quark_to_string (subsystem));
+ }
}
else {
- new_event->w = NULL;
+ new_event->w = w;
+ new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, no watcher!",
+ "subsystem: %s, explicit watcher: %d",
user_data,
kh_size (session->events),
- g_quark_to_string (subsystem));
+ g_quark_to_string (subsystem),
+ new_event->w->id);
}
kh_put (rspamd_events_hash, session->events, new_event, &ret);
* @param user_data abstract user_data
* @param forced unused
*/
-struct rspamd_async_event* rspamd_session_add_event (
- struct rspamd_async_session *session,
- event_finalizer_t fin, gpointer user_data, GQuark subsystem);
+struct rspamd_async_event *
+rspamd_session_add_event (struct rspamd_async_session *session,
+ struct rspamd_async_watcher *w,
+ event_finalizer_t fin,
+ gpointer user_data,
+ GQuark subsystem);
/**
* Remove normal event
if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
rt->redis_object_expanded, learned_key) == REDIS_OK) {
- rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
- rspamd_redis_stat_quark ());
+ rspamd_session_add_event (task->s, NULL, rspamd_redis_fin, rt, rspamd_redis_stat_quark ());
rt->has_event = TRUE;
if (event_get_base (&rt->timeout_event)) {
"RSIG");
}
- rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt,
- rspamd_redis_stat_quark ());
+ rspamd_session_add_event (task->s, NULL, rspamd_redis_fin_learn, rt, rspamd_redis_stat_quark ());
rt->has_event = TRUE;
/* Set timeout */
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
"HGET %s %s",
rt->ctx->redis_object, h) == REDIS_OK) {
- rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
- rspamd_stat_cache_redis_quark ());
+ rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ());
event_add (&rt->timeout_event, &tv);
rt->has_event = TRUE;
}
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
"HSET %s %s %d",
rt->ctx->redis_object, h, flag) == REDIS_OK) {
- rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
- rspamd_stat_cache_redis_quark ());
+ rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ());
event_add (&rt->timeout_event, &tv);
rt->has_event = TRUE;
}
static void
lua_http_maybe_free (struct lua_http_cbdata *cbd)
{
- if (cbd->session && cbd->w) {
- /* We still need to clear watcher */
- rspamd_session_watcher_pop (cbd->session, cbd->w);
+ if (cbd->session) {
+ if (cbd->w) {
+ /* We still need to clear watcher */
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_RESOLVED) {
/* Event is added merely for resolved events */
cbd->msg = NULL;
if (cbd->session) {
- rspamd_session_add_event (cbd->session,
- (event_finalizer_t)lua_http_fin,
- cbd,
+ rspamd_session_add_event (cbd->session, cbd->w,
+ (event_finalizer_t) lua_http_fin, cbd,
g_quark_from_static_string ("lua http"));
cbd->flags |= RSPAMD_LUA_HTTP_FLAG_RESOLVED;
}
if (ret == REDIS_OK) {
if (ud->s) {
- rspamd_session_add_event (ud->s,
- lua_redis_fin,
- sp_ud,
- g_quark_from_static_string ("lua redis"));
+ rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
sp_ud->w = rspamd_session_get_watcher (ud->s);
rspamd_session_watcher_push (ud->s);
}
if (ret == REDIS_OK) {
if (ud->s) {
- rspamd_session_add_event (ud->s,
- lua_redis_fin,
- sp_ud,
- g_quark_from_static_string ("lua redis"));
+ rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
sp_ud->w = rspamd_session_get_watcher (ud->s);
rspamd_session_watcher_push (ud->s);
}
if (cbd->session) {
event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin;
- cbd->async_ev = rspamd_session_add_event (cbd->session,
- fin,
- cbd,
- g_quark_from_static_string ("lua tcp"));
+ cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd, g_quark_from_static_string ("lua tcp"));
if (!cbd->async_ev) {
return FALSE;
event_base_set (session->task->ev_base, &session->timev);
event_add (&session->timev, &session->tv);
- rspamd_session_add_event (task->s,
- fuzzy_io_fin,
- session,
+ rspamd_session_add_event (task->s, NULL, fuzzy_io_fin, session,
g_quark_from_static_string ("fuzzy check"));
}
}
event_base_set (s->task->ev_base, &s->timev);
event_add (&s->timev, &s->tv);
- rspamd_session_add_event (task->s,
- fuzzy_lua_fin,
- s,
- g_quark_from_static_string ("fuzzy check"));
+ rspamd_session_add_event (task->s, NULL, fuzzy_lua_fin, s, g_quark_from_static_string ("fuzzy check"));
(*saved)++;
ret = 1;
timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
double_to_tv (surbl_module_ctx->read_timeout, timeout);
- rspamd_session_add_event (task->s,
- free_redirector_session,
- param,
- g_quark_from_static_string ("surbl"));
+ rspamd_session_add_event (task->s, NULL, free_redirector_session, param, g_quark_from_static_string ("surbl"));
rspamd_http_connection_write_message (param->conn, msg, NULL,
NULL, param, s, timeout, task->ev_base);
end
end
+ rspamd_logger.errx(task, 'do http request with callback')
rspamd_http.request({
url = 'http://127.0.0.1:18080' .. url,
task = task,
})
--[[ request to this address involved DNS resolver subsystem ]]
+ rspamd_logger.errx(task, 'do http request with callback + dns resolving')
rspamd_http.request({
url = 'http://site.resolveme:18080' .. url,
task = task,
task:insert_result('HTTP_CORO_ERROR', 1.0, err)
end
+ rspamd_logger.errx(task, 'do http request after coroutine finished')
err, response = rspamd_http.request({
url = 'http://site.resolveme:18080' .. url,
task = task,