瀏覽代碼

[Fix] Fix more issues with watching of async events

tags/1.8.0
Vsevolod Stakhov 5 年之前
父節點
當前提交
c05c2b3b82

+ 1
- 3
src/libserver/dns.c 查看文件

@@ -146,9 +146,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,

if (session) {
if (req != NULL) {
rspamd_session_add_event (session,
(event_finalizer_t)rspamd_dns_fin_cb,
reqdata,
rspamd_session_add_event (session, NULL, (event_finalizer_t) rspamd_dns_fin_cb, reqdata,
g_quark_from_static_string ("dns resolver"));
}
}

+ 27
- 15
src/libserver/events.c 查看文件

@@ -161,9 +161,10 @@ rspamd_session_create (rspamd_mempool_t * pool,

struct rspamd_async_event *
rspamd_session_add_event (struct rspamd_async_session *session,
event_finalizer_t fin,
void *user_data,
GQuark subsystem)
struct rspamd_async_watcher *w,
event_finalizer_t fin,
gpointer user_data,
GQuark subsystem)
{
struct rspamd_async_event *new_event;
gint ret;
@@ -187,23 +188,34 @@ rspamd_session_add_event (struct rspamd_async_session *session,
new_event->user_data = user_data;
new_event->subsystem = subsystem;

if (RSPAMD_SESSION_IS_WATCHING (session)) {
new_event->w = session->cur_watcher;
new_event->w->remain ++;
msg_debug_session ("added event: %p, pending %d events, "
"subsystem: %s, watcher: %d",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem),
new_event->w->id);
if (w == NULL) {
if (RSPAMD_SESSION_IS_WATCHING (session)) {
new_event->w = session->cur_watcher;
new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
"subsystem: %s, watcher: %d",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem),
new_event->w->id);
} else {
new_event->w = NULL;
msg_debug_session ("added event: %p, pending %d events, "
"subsystem: %s, no watcher!",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem));
}
}
else {
new_event->w = NULL;
new_event->w = w;
new_event->w->remain++;
msg_debug_session ("added event: %p, pending %d events, "
"subsystem: %s, no watcher!",
"subsystem: %s, explicit watcher: %d",
user_data,
kh_size (session->events),
g_quark_to_string (subsystem));
g_quark_to_string (subsystem),
new_event->w->id);
}

kh_put (rspamd_events_hash, session->events, new_event, &ret);

+ 6
- 3
src/libserver/events.h 查看文件

@@ -47,9 +47,12 @@ struct rspamd_async_session * rspamd_session_create (rspamd_mempool_t *pool,
* @param user_data abstract user_data
* @param forced unused
*/
struct rspamd_async_event* rspamd_session_add_event (
struct rspamd_async_session *session,
event_finalizer_t fin, gpointer user_data, GQuark subsystem);
struct rspamd_async_event *
rspamd_session_add_event (struct rspamd_async_session *session,
struct rspamd_async_watcher *w,
event_finalizer_t fin,
gpointer user_data,
GQuark subsystem);

/**
* Remove normal event

+ 2
- 4
src/libstat/backends/redis_backend.c 查看文件

@@ -1594,8 +1594,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
rt->redis_object_expanded, learned_key) == REDIS_OK) {

rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
rspamd_redis_stat_quark ());
rspamd_session_add_event (task->s, NULL, rspamd_redis_fin, rt, rspamd_redis_stat_quark ());
rt->has_event = TRUE;

if (event_get_base (&rt->timeout_event)) {
@@ -1799,8 +1798,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
"RSIG");
}

rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt,
rspamd_redis_stat_quark ());
rspamd_session_add_event (task->s, NULL, rspamd_redis_fin_learn, rt, rspamd_redis_stat_quark ());
rt->has_event = TRUE;

/* Set timeout */

+ 2
- 4
src/libstat/learn_cache/redis_cache.c 查看文件

@@ -453,8 +453,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
"HGET %s %s",
rt->ctx->redis_object, h) == REDIS_OK) {
rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
rspamd_stat_cache_redis_quark ());
rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ());
event_add (&rt->timeout_event, &tv);
rt->has_event = TRUE;
}
@@ -486,8 +485,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
"HSET %s %s %d",
rt->ctx->redis_object, h, flag) == REDIS_OK) {
rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
rspamd_stat_cache_redis_quark ());
rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ());
event_add (&rt->timeout_event, &tv);
rt->has_event = TRUE;
}

+ 7
- 6
src/lua/lua_http.c 查看文件

@@ -145,9 +145,11 @@ lua_http_fin (gpointer arg)
static void
lua_http_maybe_free (struct lua_http_cbdata *cbd)
{
if (cbd->session && cbd->w) {
/* We still need to clear watcher */
rspamd_session_watcher_pop (cbd->session, cbd->w);
if (cbd->session) {
if (cbd->w) {
/* We still need to clear watcher */
rspamd_session_watcher_pop (cbd->session, cbd->w);
}

if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_RESOLVED) {
/* Event is added merely for resolved events */
@@ -407,9 +409,8 @@ lua_http_make_connection (struct lua_http_cbdata *cbd)
cbd->msg = NULL;

if (cbd->session) {
rspamd_session_add_event (cbd->session,
(event_finalizer_t)lua_http_fin,
cbd,
rspamd_session_add_event (cbd->session, cbd->w,
(event_finalizer_t) lua_http_fin, cbd,
g_quark_from_static_string ("lua http"));
cbd->flags |= RSPAMD_LUA_HTTP_FLAG_RESOLVED;
}

+ 2
- 8
src/lua/lua_redis.c 查看文件

@@ -1012,10 +1012,7 @@ lua_redis_make_request (lua_State *L)

if (ret == REDIS_OK) {
if (ud->s) {
rspamd_session_add_event (ud->s,
lua_redis_fin,
sp_ud,
g_quark_from_static_string ("lua redis"));
rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
sp_ud->w = rspamd_session_get_watcher (ud->s);
rspamd_session_watcher_push (ud->s);
}
@@ -1382,10 +1379,7 @@ lua_redis_add_cmd (lua_State *L)

if (ret == REDIS_OK) {
if (ud->s) {
rspamd_session_add_event (ud->s,
lua_redis_fin,
sp_ud,
g_quark_from_static_string ("lua redis"));
rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
sp_ud->w = rspamd_session_get_watcher (ud->s);
rspamd_session_watcher_push (ud->s);
}

+ 1
- 4
src/lua/lua_tcp.c 查看文件

@@ -1133,10 +1133,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
if (cbd->session) {
event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin;

cbd->async_ev = rspamd_session_add_event (cbd->session,
fin,
cbd,
g_quark_from_static_string ("lua tcp"));
cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd, g_quark_from_static_string ("lua tcp"));

if (!cbd->async_ev) {
return FALSE;

+ 2
- 7
src/plugins/fuzzy_check.c 查看文件

@@ -2869,9 +2869,7 @@ register_fuzzy_client_call (struct rspamd_task *task,
event_base_set (session->task->ev_base, &session->timev);
event_add (&session->timev, &session->tv);

rspamd_session_add_event (task->s,
fuzzy_io_fin,
session,
rspamd_session_add_event (task->s, NULL, fuzzy_io_fin, session,
g_quark_from_static_string ("fuzzy check"));
}
}
@@ -3345,10 +3343,7 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
event_base_set (s->task->ev_base, &s->timev);
event_add (&s->timev, &s->tv);

rspamd_session_add_event (task->s,
fuzzy_lua_fin,
s,
g_quark_from_static_string ("fuzzy check"));
rspamd_session_add_event (task->s, NULL, fuzzy_lua_fin, s, g_quark_from_static_string ("fuzzy check"));

(*saved)++;
ret = 1;

+ 1
- 4
src/plugins/surbl.c 查看文件

@@ -1675,10 +1675,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
double_to_tv (surbl_module_ctx->read_timeout, timeout);

rspamd_session_add_event (task->s,
free_redirector_session,
param,
g_quark_from_static_string ("surbl"));
rspamd_session_add_event (task->s, NULL, free_redirector_session, param, g_quark_from_static_string ("surbl"));

rspamd_http_connection_write_message (param->conn, msg, NULL,
NULL, param, s, timeout, task->ev_base);

+ 3
- 0
test/functional/lua/http.lua 查看文件

@@ -25,6 +25,7 @@ local function http_symbol(task)
end
end

rspamd_logger.errx(task, 'do http request with callback')
rspamd_http.request({
url = 'http://127.0.0.1:18080' .. url,
task = task,
@@ -34,6 +35,7 @@ local function http_symbol(task)
})

--[[ request to this address involved DNS resolver subsystem ]]
rspamd_logger.errx(task, 'do http request with callback + dns resolving')
rspamd_http.request({
url = 'http://site.resolveme:18080' .. url,
task = task,
@@ -58,6 +60,7 @@ local function http_symbol(task)
task:insert_result('HTTP_CORO_ERROR', 1.0, err)
end

rspamd_logger.errx(task, 'do http request after coroutine finished')
err, response = rspamd_http.request({
url = 'http://site.resolveme:18080' .. url,
task = task,

Loading…
取消
儲存