aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_redis.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2025-01-07 13:39:04 +0000
committerVsevolod Stakhov <vsevolod@rspamd.com>2025-01-07 13:39:04 +0000
commit5fc76da699df17a28b0dc1fda0314758f7a5a813 (patch)
tree493cfa16ff9ecd7e734582ab5926b81e44294117 /src/lua/lua_redis.c
parent27e277024a413115e702e1518469d2694a390bf5 (diff)
downloadrspamd-5fc76da699df17a28b0dc1fda0314758f7a5a813.tar.gz
rspamd-5fc76da699df17a28b0dc1fda0314758f7a5a813.zip
[Fix] Add timer update before timer setting
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r--src/lua/lua_redis.c97
1 files changed, 54 insertions, 43 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index d20c496ed..491007df3 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -1,5 +1,5 @@
/*
- * Copyright 2024 Vsevolod Stakhov
+ * Copyright 2025 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -130,7 +130,7 @@ struct lua_redis_request_specific_userdata {
unsigned int nargs;
char **args;
gsize *arglens;
- struct lua_redis_userdata *c;
+ struct lua_redis_userdata *common_ud;
struct lua_redis_ctx *ctx;
struct lua_redis_request_specific_userdata *next;
ev_timer timeout_ev;
@@ -262,7 +262,7 @@ lua_redis_fin(void *arg)
struct lua_redis_ctx *ctx;
ctx = sp_ud->ctx;
- ud = sp_ud->c;
+ ud = sp_ud->common_ud;
if (ev_can_stop(&sp_ud->timeout_ev)) {
ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
@@ -290,7 +290,7 @@ lua_redis_push_error(const char *err,
gboolean connected,
...)
{
- struct lua_redis_userdata *ud = sp_ud->c;
+ struct lua_redis_userdata *ud = sp_ud->common_ud;
struct lua_callback_state cbs;
lua_State *L;
@@ -390,7 +390,7 @@ static void
lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx,
struct lua_redis_request_specific_userdata *sp_ud)
{
- struct lua_redis_userdata *ud = sp_ud->c;
+ struct lua_redis_userdata *ud = sp_ud->common_ud;
struct lua_callback_state cbs;
lua_State *L;
@@ -467,14 +467,14 @@ lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv)
redisAsyncContext *ac;
ctx = sp_ud->ctx;
- ud = sp_ud->c;
+ ud = sp_ud->common_ud;
if (ud->terminated || !rspamd_lua_is_initialised()) {
/* We are already at the termination stage, just go out */
return;
}
- msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx,
+ msg_debug_lua_redis("got async reply from redis %p for query %p", sp_ud->common_ud->ctx,
sp_ud);
REDIS_RETAIN(ctx);
@@ -601,7 +601,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
int results;
ctx = sp_ud->ctx;
- ud = sp_ud->c;
+ ud = sp_ud->common_ud;
lua_State *L = ctx->async.cfg->lua_state;
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -620,7 +620,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
}
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
- msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud);
+ msg_debug_lua_redis("got sync reply from redis: %p for query %p", ac, sp_ud);
struct lua_redis_result *result = g_malloc0(sizeof *result);
@@ -653,17 +653,17 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
/* if error happened, we should terminate the connection,
and release it */
- if (result->is_error && sp_ud->c->ctx) {
- ac = sp_ud->c->ctx;
+ if (result->is_error && sp_ud->common_ud->ctx) {
+ ac = sp_ud->common_ud->ctx;
/* Set to NULL to avoid double free in dtor */
- sp_ud->c->ctx = NULL;
+ sp_ud->common_ud->ctx = NULL;
ctx->flags |= LUA_REDIS_TERMINATED;
/*
* This will call all callbacks pending so the entire context
* will be destructed
*/
- rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
+ rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac,
RSPAMD_REDIS_RELEASE_FATAL);
}
@@ -679,6 +679,8 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
ctx->cmds_pending--;
if (ctx->cmds_pending == 0) {
+ msg_debug_lua_redis("no more commands left for: %p for query %p", ac, sp_ud);
+
if (ctx->thread) {
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
/* somebody yielded and waits for results */
@@ -717,16 +719,16 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents)
return;
}
- ud = sp_ud->c;
+ ud = sp_ud->common_ud;
ctx = sp_ud->ctx;
msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud,
- sp_ud->c->ctx);
+ sp_ud->common_ud->ctx);
- if (sp_ud->c->ctx) {
- ac = sp_ud->c->ctx;
+ if (sp_ud->common_ud->ctx) {
+ ac = sp_ud->common_ud->ctx;
/* Set to NULL to avoid double free in dtor */
- sp_ud->c->ctx = NULL;
+ sp_ud->common_ud->ctx = NULL;
ac->err = REDIS_ERR_IO;
errno = ETIMEDOUT;
ctx->flags |= LUA_REDIS_TERMINATED;
@@ -735,7 +737,7 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents)
* This will call all callbacks pending so the entire context
* will be destructed
*/
- rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
+ rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac,
RSPAMD_REDIS_RELEASE_FATAL);
}
}
@@ -754,24 +756,24 @@ lua_redis_timeout(EV_P_ ev_timer *w, int revents)
}
ctx = sp_ud->ctx;
- ud = sp_ud->c;
+ ud = sp_ud->common_ud;
REDIS_RETAIN(ctx);
msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud,
- sp_ud->c->ctx);
+ sp_ud->common_ud->ctx);
lua_redis_push_error("timeout while connecting the server (%.2f sec)", ctx, sp_ud, TRUE, ud->timeout);
- if (sp_ud->c->ctx) {
- ac = sp_ud->c->ctx;
+ if (sp_ud->common_ud->ctx) {
+ ac = sp_ud->common_ud->ctx;
/* Set to NULL to avoid double free in dtor */
- sp_ud->c->ctx = NULL;
+ sp_ud->common_ud->ctx = NULL;
ac->err = REDIS_ERR_IO;
errno = ETIMEDOUT;
/*
* This will call all callbacks pending so the entire context
* will be destructed
*/
- rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
+ rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac,
RSPAMD_REDIS_RELEASE_FATAL);
}
@@ -1095,8 +1097,8 @@ rspamd_lua_redis_prepare_connection(lua_State *L, int *pcbref, gboolean is_async
return NULL;
}
- msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p",
- host, ctx, ud);
+ msg_debug_lua_redis("opened redis connection host=%s; lua_ctx=%p; redis_ctx=%p; ud=%p",
+ host, ctx, ud->ctx, ud);
return ctx;
}
@@ -1137,7 +1139,7 @@ lua_redis_make_request(lua_State *L)
ud = &ctx->async;
sp_ud = g_malloc0(sizeof(*sp_ud));
sp_ud->cbref = cbref;
- sp_ud->c = ud;
+ sp_ud->common_ud = ud;
sp_ud->ctx = ctx;
lua_pushstring(L, "cmd");
@@ -1501,21 +1503,18 @@ lua_redis_add_cmd(lua_State *L)
}
sp_ud = g_malloc0(sizeof(*sp_ud));
+ sp_ud->common_ud = &ctx->async;
+ ud = &ctx->async;
if (IS_ASYNC(ctx)) {
- sp_ud->c = &ctx->async;
- ud = &ctx->async;
sp_ud->cbref = cbref;
}
- else {
- sp_ud->c = &ctx->async;
- ud = &ctx->async;
- }
+
sp_ud->ctx = ctx;
lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args,
&sp_ud->arglens, &sp_ud->nargs);
- LL_PREPEND(sp_ud->c->specific, sp_ud);
+ LL_PREPEND(sp_ud->common_ud->specific, sp_ud);
if (ud->s && rspamd_session_blocked(ud->s)) {
lua_pushboolean(L, 0);
@@ -1525,7 +1524,7 @@ lua_redis_add_cmd(lua_State *L)
}
if (IS_ASYNC(ctx)) {
- ret = redisAsyncCommandArgv(sp_ud->c->ctx,
+ ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx,
lua_redis_callback,
sp_ud,
sp_ud->nargs,
@@ -1533,7 +1532,7 @@ lua_redis_add_cmd(lua_State *L)
sp_ud->arglens);
}
else {
- ret = redisAsyncCommandArgv(sp_ud->c->ctx,
+ ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx,
lua_redis_callback_sync,
sp_ud,
sp_ud->nargs,
@@ -1554,25 +1553,28 @@ lua_redis_add_cmd(lua_State *L)
}
sp_ud->timeout_ev.data = sp_ud;
+ ev_now_update_if_cheap(ud->event_loop);
if (IS_ASYNC(ctx)) {
ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout,
- sp_ud->c->timeout, 0.0);
+ sp_ud->common_ud->timeout, 0.0);
}
else {
ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync,
- sp_ud->c->timeout, 0.0);
+ sp_ud->common_ud->timeout, 0.0);
}
ev_timer_start(ud->event_loop, &sp_ud->timeout_ev);
+ msg_debug_lua_redis("added timeout %f for %p", sp_ud->common_ud->timeout, sp_ud);
+
REDIS_RETAIN(ctx);
ctx->cmds_pending++;
}
else {
msg_info("call to redis failed: %s",
- sp_ud->c->ctx->errstr);
+ sp_ud->common_ud->ctx->errstr);
lua_pushboolean(L, 0);
- lua_pushstring(L, sp_ud->c->ctx->errstr);
+ lua_pushstring(L, sp_ud->common_ud->ctx->errstr);
return 2;
}
@@ -1606,11 +1608,20 @@ lua_redis_exec(lua_State *L)
return 0;
}
else {
- if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) {
+ struct lua_redis_userdata *ud = &ctx->async;
+ int replies_pending = g_queue_get_length(ctx->replies);
+
+ msg_debug_lua_redis("execute pending commands for %p; commands pending = %d; replies pending = %d",
+ ctx,
+ ctx->cmds_pending,
+ replies_pending);
+
+ if (ctx->cmds_pending == 0 && replies_pending == 0) {
lua_pushstring(L, "No pending commands to execute");
lua_error(L);
}
- if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) {
+
+ if (ctx->cmds_pending == 0 && replies_pending > 0) {
int results = lua_redis_push_results(ctx, L);
return results;
}