]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Fix more issues with watching of async events
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 12 Sep 2018 16:29:15 +0000 (17:29 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 12 Sep 2018 16:29:15 +0000 (17:29 +0100)
src/libserver/dns.c
src/libserver/events.c
src/libserver/events.h
src/libstat/backends/redis_backend.c
src/libstat/learn_cache/redis_cache.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/lua/lua_tcp.c
src/plugins/fuzzy_check.c
src/plugins/surbl.c
test/functional/lua/http.lua

index 5ac215ff725eef9b7a53bec821b82fa5eb2b2202..4f8d70648fef9d4906bed14558230980e03e2992 100644 (file)
@@ -146,9 +146,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
 
        if (session) {
                if (req != NULL) {
-                       rspamd_session_add_event (session,
-                                       (event_finalizer_t)rspamd_dns_fin_cb,
-                                       reqdata,
+                       rspamd_session_add_event (session, NULL, (event_finalizer_t) rspamd_dns_fin_cb, reqdata,
                                        g_quark_from_static_string ("dns resolver"));
                }
        }
index 9fa2c69c1d467322a246cfb8439c4c84b1eb8d12..3201047619abc7f06a3f943dffbe9f7c5be32761 100644 (file)
@@ -161,9 +161,10 @@ rspamd_session_create (rspamd_mempool_t * pool,
 
 struct rspamd_async_event *
 rspamd_session_add_event (struct rspamd_async_session *session,
-       event_finalizer_t fin,
-       void *user_data,
-       GQuark subsystem)
+                                                 struct rspamd_async_watcher *w,
+                                                 event_finalizer_t fin,
+                                                 gpointer user_data,
+                                                 GQuark subsystem)
 {
        struct rspamd_async_event *new_event;
        gint ret;
@@ -187,23 +188,34 @@ rspamd_session_add_event (struct rspamd_async_session *session,
        new_event->user_data = user_data;
        new_event->subsystem = subsystem;
 
-       if (RSPAMD_SESSION_IS_WATCHING (session)) {
-               new_event->w = session->cur_watcher;
-               new_event->w->remain ++;
-               msg_debug_session ("added event: %p, pending %d events, "
-                               "subsystem: %s, watcher: %d",
-                               user_data,
-                               kh_size (session->events),
-                               g_quark_to_string (subsystem),
-                               new_event->w->id);
+       if (w == NULL) {
+               if (RSPAMD_SESSION_IS_WATCHING (session)) {
+                       new_event->w = session->cur_watcher;
+                       new_event->w->remain++;
+                       msg_debug_session ("added event: %p, pending %d events, "
+                                                          "subsystem: %s, watcher: %d",
+                                       user_data,
+                                       kh_size (session->events),
+                                       g_quark_to_string (subsystem),
+                                       new_event->w->id);
+               } else {
+                       new_event->w = NULL;
+                       msg_debug_session ("added event: %p, pending %d events, "
+                                                          "subsystem: %s, no watcher!",
+                                       user_data,
+                                       kh_size (session->events),
+                                       g_quark_to_string (subsystem));
+               }
        }
        else {
-               new_event->w = NULL;
+               new_event->w = w;
+               new_event->w->remain++;
                msg_debug_session ("added event: %p, pending %d events, "
-                               "subsystem: %s, no watcher!",
+                                                  "subsystem: %s, explicit watcher: %d",
                                user_data,
                                kh_size (session->events),
-                               g_quark_to_string (subsystem));
+                               g_quark_to_string (subsystem),
+                               new_event->w->id);
        }
 
        kh_put (rspamd_events_hash, session->events, new_event, &ret);
index f7eeae9d0049532bf774008d8a2a05f47b2659ab..bab1848c29454fd0cd5e569dcebdc5790043a82a 100644 (file)
@@ -47,9 +47,12 @@ struct rspamd_async_session * rspamd_session_create (rspamd_mempool_t *pool,
  * @param user_data abstract user_data
  * @param forced unused
  */
-struct rspamd_async_event* rspamd_session_add_event (
-               struct rspamd_async_session *session,
-               event_finalizer_t fin, gpointer user_data, GQuark subsystem);
+struct rspamd_async_event *
+rspamd_session_add_event (struct rspamd_async_session *session,
+                                                 struct rspamd_async_watcher *w,
+                                                 event_finalizer_t fin,
+                                                 gpointer user_data,
+                                                 GQuark subsystem);
 
 /**
  * Remove normal event
index 7b49db59e3e726acd023c5bc26741988d5b25122..35930c923d6427dd80c7f03f5b2671bdb80e80dc 100644 (file)
@@ -1594,8 +1594,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
        if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
                        rt->redis_object_expanded, learned_key) == REDIS_OK) {
 
-               rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
-                               rspamd_redis_stat_quark ());
+               rspamd_session_add_event (task->s, NULL, rspamd_redis_fin, rt, rspamd_redis_stat_quark ());
                rt->has_event = TRUE;
 
                if (event_get_base (&rt->timeout_event)) {
@@ -1799,8 +1798,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
                                        "RSIG");
                }
 
-               rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt,
-                               rspamd_redis_stat_quark ());
+               rspamd_session_add_event (task->s, NULL, rspamd_redis_fin_learn, rt, rspamd_redis_stat_quark ());
                rt->has_event = TRUE;
 
                /* Set timeout */
index fc928e75eae8ae62e58dee1411c13938e4bb1985..22de2c1bce1da80df73c814137376e7c893187e7 100644 (file)
@@ -453,8 +453,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
        if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
                        "HGET %s %s",
                        rt->ctx->redis_object, h) == REDIS_OK) {
-               rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
-                               rspamd_stat_cache_redis_quark ());
+               rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ());
                event_add (&rt->timeout_event, &tv);
                rt->has_event = TRUE;
        }
@@ -486,8 +485,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
        if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
                        "HSET %s %s %d",
                        rt->ctx->redis_object, h, flag) == REDIS_OK) {
-               rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
-                               rspamd_stat_cache_redis_quark ());
+               rspamd_session_add_event (task->s, NULL, rspamd_redis_cache_fin, rt, rspamd_stat_cache_redis_quark ());
                event_add (&rt->timeout_event, &tv);
                rt->has_event = TRUE;
        }
index f028e63d25dcf84263825902f21bf2439984d939..1fad1d1d8543b8958f237dfbf2071c9ecbfc3ec0 100644 (file)
@@ -145,9 +145,11 @@ lua_http_fin (gpointer arg)
 static void
 lua_http_maybe_free (struct lua_http_cbdata *cbd)
 {
-       if (cbd->session && cbd->w) {
-               /* We still need to clear watcher */
-               rspamd_session_watcher_pop (cbd->session, cbd->w);
+       if (cbd->session) {
+               if (cbd->w) {
+                       /* We still need to clear watcher */
+                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+               }
 
                if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_RESOLVED) {
                        /* Event is added merely for resolved events */
@@ -407,9 +409,8 @@ lua_http_make_connection (struct lua_http_cbdata *cbd)
                cbd->msg = NULL;
 
                if (cbd->session) {
-                       rspamd_session_add_event (cbd->session,
-                                       (event_finalizer_t)lua_http_fin,
-                                       cbd,
+                       rspamd_session_add_event (cbd->session, cbd->w,
+                                       (event_finalizer_t) lua_http_fin, cbd,
                                        g_quark_from_static_string ("lua http"));
                        cbd->flags |= RSPAMD_LUA_HTTP_FLAG_RESOLVED;
                }
index e18e507cb404299d542d2053b72378ea5dfac790..d3eaa300a24d42b994d89a81cc75ec3b6200f484 100644 (file)
@@ -1012,10 +1012,7 @@ lua_redis_make_request (lua_State *L)
 
                if (ret == REDIS_OK) {
                        if (ud->s) {
-                               rspamd_session_add_event (ud->s,
-                                               lua_redis_fin,
-                                               sp_ud,
-                                               g_quark_from_static_string ("lua redis"));
+                               rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
                                sp_ud->w = rspamd_session_get_watcher (ud->s);
                                rspamd_session_watcher_push (ud->s);
                        }
@@ -1382,10 +1379,7 @@ lua_redis_add_cmd (lua_State *L)
 
                if (ret == REDIS_OK) {
                        if (ud->s) {
-                               rspamd_session_add_event (ud->s,
-                                               lua_redis_fin,
-                                               sp_ud,
-                                               g_quark_from_static_string ("lua redis"));
+                               rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
                                sp_ud->w = rspamd_session_get_watcher (ud->s);
                                rspamd_session_watcher_push (ud->s);
                        }
index 85f2941424afb1039fc9fc561e23afd344fe3f85..099fc68961b8a59c7d4a8e3349576de1ddf4385a 100644 (file)
@@ -1133,10 +1133,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
        if (cbd->session) {
                event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin;
 
-               cbd->async_ev = rspamd_session_add_event (cbd->session,
-                               fin,
-                               cbd,
-                               g_quark_from_static_string ("lua tcp"));
+               cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd, g_quark_from_static_string ("lua tcp"));
 
                if (!cbd->async_ev) {
                        return FALSE;
index 61ff8f54e4e791d3e623bcfeb2cc17b51709eee4..23aeacb664478b03f12cce5d031df55c105d737b 100644 (file)
@@ -2869,9 +2869,7 @@ register_fuzzy_client_call (struct rspamd_task *task,
                                event_base_set (session->task->ev_base, &session->timev);
                                event_add (&session->timev, &session->tv);
 
-                               rspamd_session_add_event (task->s,
-                                               fuzzy_io_fin,
-                                               session,
+                               rspamd_session_add_event (task->s, NULL, fuzzy_io_fin, session,
                                                g_quark_from_static_string ("fuzzy check"));
                        }
                }
@@ -3345,10 +3343,7 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
                                event_base_set (s->task->ev_base, &s->timev);
                                event_add (&s->timev, &s->tv);
 
-                               rspamd_session_add_event (task->s,
-                                               fuzzy_lua_fin,
-                                               s,
-                                               g_quark_from_static_string ("fuzzy check"));
+                               rspamd_session_add_event (task->s, NULL, fuzzy_lua_fin, s, g_quark_from_static_string ("fuzzy check"));
 
                                (*saved)++;
                                ret = 1;
index 31c873304fb8092a0b7a0e396d015844487736e4..9781c759e64ac281039a8be62392acabe6c70205 100644 (file)
@@ -1675,10 +1675,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
                timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
                double_to_tv (surbl_module_ctx->read_timeout, timeout);
 
-               rspamd_session_add_event (task->s,
-                               free_redirector_session,
-                               param,
-                               g_quark_from_static_string ("surbl"));
+               rspamd_session_add_event (task->s, NULL, free_redirector_session, param, g_quark_from_static_string ("surbl"));
 
                rspamd_http_connection_write_message (param->conn, msg, NULL,
                                NULL, param, s, timeout, task->ev_base);
index ccd2f141e9fc312e5b2d6630c27b45a28e53a64a..0263beb6f07c42068e369db991719a2fad58bf02 100644 (file)
@@ -25,6 +25,7 @@ local function http_symbol(task)
     end
   end
 
+  rspamd_logger.errx(task, 'do http request with callback')
   rspamd_http.request({
     url = 'http://127.0.0.1:18080' .. url,
     task = task,
@@ -34,6 +35,7 @@ local function http_symbol(task)
   })
 
   --[[ request to this address involved DNS resolver subsystem ]]
+  rspamd_logger.errx(task, 'do http request with callback + dns resolving')
   rspamd_http.request({
     url = 'http://site.resolveme:18080' .. url,
     task = task,
@@ -58,6 +60,7 @@ local function http_symbol(task)
     task:insert_result('HTTP_CORO_ERROR', 1.0, err)
   end
 
+  rspamd_logger.errx(task, 'do http request after coroutine finished')
   err, response = rspamd_http.request({
     url = 'http://site.resolveme:18080' .. url,
     task = task,