diff options
-rw-r--r-- | src/lua/lua_redis.c | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 675a20d3c..f6623db3d 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -72,6 +72,22 @@ static const struct luaL_reg redislib_m[] = { {NULL, NULL} }; +#undef REDIS_DEBUG_REFS +#ifdef REDIS_DEBUG_REFS +#define REDIS_RETAIN(x) do { \ + msg_err ("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \ + REF_RETAIN(x); \ +} while (0) + +#define REDIS_RELEASE(x) do { \ + msg_err ("release ref %p, refcount: %d", (x), (x)->ref.refcount); \ + REF_RELEASE(x); \ +} while (0) +#else +#define REDIS_RETAIN REF_RETAIN +#define REDIS_RELEASE REF_RELEASE +#endif + #ifdef WITH_HIREDIS struct lua_redis_specific_userdata; /** @@ -93,6 +109,7 @@ struct lua_redis_specific_userdata { gint cbref; guint nargs; gchar **args; + struct rspamd_async_watcher *w; struct event timeout; struct lua_redis_userdata *c; struct lua_redis_ctx *ctx; @@ -112,7 +129,7 @@ struct lua_redis_ctx { static struct lua_redis_ctx * lua_check_redis (lua_State * L, gint pos) { - void *ud = luaL_checkudata (L, pos, "rspamd{redis}"); + void *ud = rspamd_lua_check_udata (L, pos, "rspamd{redis}"); luaL_argcheck (L, ud != NULL, pos, "'redis' expected"); return ud ? *((struct lua_redis_ctx **)ud) : NULL; } @@ -182,7 +199,7 @@ lua_redis_gc (lua_State *L) struct lua_redis_ctx *ctx = lua_check_redis (L, 1); if (ctx) { - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); } return 0; @@ -195,7 +212,14 @@ lua_redis_fin (void *arg) struct lua_redis_ctx *ctx; ctx = sp_ud->ctx; - REF_RELEASE (ctx); + event_del (&sp_ud->timeout); + + if (sp_ud->cbref != -1) { + luaL_unref (sp_ud->c->L, LUA_REGISTRYINDEX, sp_ud->cbref); + sp_ud->cbref = -1; + } + + REDIS_RELEASE (ctx); } /** @@ -227,10 +251,11 @@ lua_redis_push_error (const gchar *err, msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); lua_pop (ud->L, 1); } - } - if (connected) { - rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); + if (connected) { + rspamd_session_watcher_pop (ud->task->s, sp_ud->w); + rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); + } } } @@ -292,9 +317,10 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); lua_pop (ud->L, 1); } - } - rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); + rspamd_session_watcher_pop (ud->task->s, sp_ud->w); + rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); + } } /** @@ -320,8 +346,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) return; } - REF_RETAIN (ctx); - event_del (&sp_ud->timeout); + REDIS_RETAIN (ctx); ctx->cmds_pending --; if (c->err == 0) { @@ -357,7 +382,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) } } - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); } static void @@ -369,7 +394,7 @@ lua_redis_timeout (int fd, short what, gpointer u) ctx = sp_ud->ctx; - REF_RETAIN (ctx); + REDIS_RETAIN (ctx); msg_info ("timeout while querying redis server"); lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, FALSE); @@ -384,7 +409,7 @@ lua_redis_timeout (int fd, short what, gpointer u) sp_ud->c->terminated = 1; redisAsyncFree (ac); } - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); } @@ -640,7 +665,7 @@ lua_redis_make_request (lua_State *L) ud->ctx = NULL; } - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); lua_pushboolean (L, FALSE); lua_pushnil (L); @@ -669,9 +694,11 @@ lua_redis_make_request (lua_State *L) lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis")); + sp_ud->w = rspamd_session_get_watcher (ud->task->s); + rspamd_session_watcher_push (ud->task->s); sp_ud->ctx = ctx; - REF_RETAIN (ctx); + REDIS_RETAIN (ctx); ctx->cmds_pending ++; double_to_tv (timeout, &tv); event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); @@ -683,7 +710,7 @@ lua_redis_make_request (lua_State *L) msg_info ("call to redis failed: %s", ud->ctx->errstr); redisAsyncFree (ud->ctx); ud->ctx = NULL; - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); ret = FALSE; } } @@ -903,7 +930,7 @@ lua_redis_connect (lua_State *L) rspamd_inet_address_get_port (addr->addr)); if (ud->ctx == NULL || ud->ctx->err) { - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); lua_pushboolean (L, FALSE); return 1; @@ -994,7 +1021,7 @@ lua_redis_connect_sync (lua_State *L) lua_pushstring (L, "unknown error"); } - REF_RELEASE (ctx); + REDIS_RELEASE (ctx); return 2; } @@ -1077,12 +1104,14 @@ lua_redis_add_cmd (lua_State *L) lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis")); + sp_ud->w = rspamd_session_get_watcher (sp_ud->c->task->s); + rspamd_session_watcher_push (sp_ud->c->task->s); double_to_tv (sp_ud->c->timeout, &tv); event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout); event_add (&sp_ud->timeout, &tv); - REF_RETAIN (ctx); + REDIS_RETAIN (ctx); ctx->cmds_pending ++; } else { |