diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2025-01-07 13:39:04 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2025-01-07 13:39:04 +0000 |
commit | 5fc76da699df17a28b0dc1fda0314758f7a5a813 (patch) | |
tree | 493cfa16ff9ecd7e734582ab5926b81e44294117 /src/lua/lua_redis.c | |
parent | 27e277024a413115e702e1518469d2694a390bf5 (diff) | |
download | rspamd-5fc76da699df17a28b0dc1fda0314758f7a5a813.tar.gz rspamd-5fc76da699df17a28b0dc1fda0314758f7a5a813.zip |
[Fix] Add timer update before timer setting
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r-- | src/lua/lua_redis.c | 97 |
1 files changed, 54 insertions, 43 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index d20c496ed..491007df3 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -130,7 +130,7 @@ struct lua_redis_request_specific_userdata { unsigned int nargs; char **args; gsize *arglens; - struct lua_redis_userdata *c; + struct lua_redis_userdata *common_ud; struct lua_redis_ctx *ctx; struct lua_redis_request_specific_userdata *next; ev_timer timeout_ev; @@ -262,7 +262,7 @@ lua_redis_fin(void *arg) struct lua_redis_ctx *ctx; ctx = sp_ud->ctx; - ud = sp_ud->c; + ud = sp_ud->common_ud; if (ev_can_stop(&sp_ud->timeout_ev)) { ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); @@ -290,7 +290,7 @@ lua_redis_push_error(const char *err, gboolean connected, ...) { - struct lua_redis_userdata *ud = sp_ud->c; + struct lua_redis_userdata *ud = sp_ud->common_ud; struct lua_callback_state cbs; lua_State *L; @@ -390,7 +390,7 @@ static void lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx, struct lua_redis_request_specific_userdata *sp_ud) { - struct lua_redis_userdata *ud = sp_ud->c; + struct lua_redis_userdata *ud = sp_ud->common_ud; struct lua_callback_state cbs; lua_State *L; @@ -467,14 +467,14 @@ lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv) redisAsyncContext *ac; ctx = sp_ud->ctx; - ud = sp_ud->c; + ud = sp_ud->common_ud; if (ud->terminated || !rspamd_lua_is_initialised()) { /* We are already at the termination stage, just go out */ return; } - msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx, + msg_debug_lua_redis("got async reply from redis %p for query %p", sp_ud->common_ud->ctx, sp_ud); REDIS_RETAIN(ctx); @@ -601,7 +601,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) int results; ctx = sp_ud->ctx; - ud = sp_ud->c; + ud = sp_ud->common_ud; lua_State *L = ctx->async.cfg->lua_state; sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; @@ -620,7 +620,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) } if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { - msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud); + msg_debug_lua_redis("got sync reply from redis: %p for query %p", ac, sp_ud); struct lua_redis_result *result = g_malloc0(sizeof *result); @@ -653,17 +653,17 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) /* if error happened, we should terminate the connection, and release it */ - if (result->is_error && sp_ud->c->ctx) { - ac = sp_ud->c->ctx; + if (result->is_error && sp_ud->common_ud->ctx) { + ac = sp_ud->common_ud->ctx; /* Set to NULL to avoid double free in dtor */ - sp_ud->c->ctx = NULL; + sp_ud->common_ud->ctx = NULL; ctx->flags |= LUA_REDIS_TERMINATED; /* * This will call all callbacks pending so the entire context * will be destructed */ - rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } @@ -679,6 +679,8 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) ctx->cmds_pending--; if (ctx->cmds_pending == 0) { + msg_debug_lua_redis("no more commands left for: %p for query %p", ac, sp_ud); + if (ctx->thread) { if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { /* somebody yielded and waits for results */ @@ -717,16 +719,16 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) return; } - ud = sp_ud->c; + ud = sp_ud->common_ud; ctx = sp_ud->ctx; msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, - sp_ud->c->ctx); + sp_ud->common_ud->ctx); - if (sp_ud->c->ctx) { - ac = sp_ud->c->ctx; + if (sp_ud->common_ud->ctx) { + ac = sp_ud->common_ud->ctx; /* Set to NULL to avoid double free in dtor */ - sp_ud->c->ctx = NULL; + sp_ud->common_ud->ctx = NULL; ac->err = REDIS_ERR_IO; errno = ETIMEDOUT; ctx->flags |= LUA_REDIS_TERMINATED; @@ -735,7 +737,7 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) * This will call all callbacks pending so the entire context * will be destructed */ - rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } } @@ -754,24 +756,24 @@ lua_redis_timeout(EV_P_ ev_timer *w, int revents) } ctx = sp_ud->ctx; - ud = sp_ud->c; + ud = sp_ud->common_ud; REDIS_RETAIN(ctx); msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, - sp_ud->c->ctx); + sp_ud->common_ud->ctx); lua_redis_push_error("timeout while connecting the server (%.2f sec)", ctx, sp_ud, TRUE, ud->timeout); - if (sp_ud->c->ctx) { - ac = sp_ud->c->ctx; + if (sp_ud->common_ud->ctx) { + ac = sp_ud->common_ud->ctx; /* Set to NULL to avoid double free in dtor */ - sp_ud->c->ctx = NULL; + sp_ud->common_ud->ctx = NULL; ac->err = REDIS_ERR_IO; errno = ETIMEDOUT; /* * This will call all callbacks pending so the entire context * will be destructed */ - rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } @@ -1095,8 +1097,8 @@ rspamd_lua_redis_prepare_connection(lua_State *L, int *pcbref, gboolean is_async return NULL; } - msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p", - host, ctx, ud); + msg_debug_lua_redis("opened redis connection host=%s; lua_ctx=%p; redis_ctx=%p; ud=%p", + host, ctx, ud->ctx, ud); return ctx; } @@ -1137,7 +1139,7 @@ lua_redis_make_request(lua_State *L) ud = &ctx->async; sp_ud = g_malloc0(sizeof(*sp_ud)); sp_ud->cbref = cbref; - sp_ud->c = ud; + sp_ud->common_ud = ud; sp_ud->ctx = ctx; lua_pushstring(L, "cmd"); @@ -1501,21 +1503,18 @@ lua_redis_add_cmd(lua_State *L) } sp_ud = g_malloc0(sizeof(*sp_ud)); + sp_ud->common_ud = &ctx->async; + ud = &ctx->async; if (IS_ASYNC(ctx)) { - sp_ud->c = &ctx->async; - ud = &ctx->async; sp_ud->cbref = cbref; } - else { - sp_ud->c = &ctx->async; - ud = &ctx->async; - } + sp_ud->ctx = ctx; lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args, &sp_ud->arglens, &sp_ud->nargs); - LL_PREPEND(sp_ud->c->specific, sp_ud); + LL_PREPEND(sp_ud->common_ud->specific, sp_ud); if (ud->s && rspamd_session_blocked(ud->s)) { lua_pushboolean(L, 0); @@ -1525,7 +1524,7 @@ lua_redis_add_cmd(lua_State *L) } if (IS_ASYNC(ctx)) { - ret = redisAsyncCommandArgv(sp_ud->c->ctx, + ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx, lua_redis_callback, sp_ud, sp_ud->nargs, @@ -1533,7 +1532,7 @@ lua_redis_add_cmd(lua_State *L) sp_ud->arglens); } else { - ret = redisAsyncCommandArgv(sp_ud->c->ctx, + ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx, lua_redis_callback_sync, sp_ud, sp_ud->nargs, @@ -1554,25 +1553,28 @@ lua_redis_add_cmd(lua_State *L) } sp_ud->timeout_ev.data = sp_ud; + ev_now_update_if_cheap(ud->event_loop); if (IS_ASYNC(ctx)) { ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, - sp_ud->c->timeout, 0.0); + sp_ud->common_ud->timeout, 0.0); } else { ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync, - sp_ud->c->timeout, 0.0); + sp_ud->common_ud->timeout, 0.0); } ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); + msg_debug_lua_redis("added timeout %f for %p", sp_ud->common_ud->timeout, sp_ud); + REDIS_RETAIN(ctx); ctx->cmds_pending++; } else { msg_info("call to redis failed: %s", - sp_ud->c->ctx->errstr); + sp_ud->common_ud->ctx->errstr); lua_pushboolean(L, 0); - lua_pushstring(L, sp_ud->c->ctx->errstr); + lua_pushstring(L, sp_ud->common_ud->ctx->errstr); return 2; } @@ -1606,11 +1608,20 @@ lua_redis_exec(lua_State *L) return 0; } else { - if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) { + struct lua_redis_userdata *ud = &ctx->async; + int replies_pending = g_queue_get_length(ctx->replies); + + msg_debug_lua_redis("execute pending commands for %p; commands pending = %d; replies pending = %d", + ctx, + ctx->cmds_pending, + replies_pending); + + if (ctx->cmds_pending == 0 && replies_pending == 0) { lua_pushstring(L, "No pending commands to execute"); lua_error(L); } - if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) { + + if (ctx->cmds_pending == 0 && replies_pending > 0) { int results = lua_redis_push_results(ctx, L); return results; } |