ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
}
- msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+ msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
- struct lua_redis_result *result = g_malloc0 (sizeof *result);
+ struct lua_redis_result *result = g_malloc0 (sizeof *result);
- /* If session is finished, we cannot call lua callbacks */
- if (ac->err == 0) {
- if (r != NULL) {
- if (reply->type != REDIS_REPLY_ERROR) {
- result->is_error = FALSE;
- lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+ if (ac->err == 0) {
+ if (r != NULL) {
+ if (reply->type != REDIS_REPLY_ERROR) {
+ result->is_error = FALSE;
+ lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+ }
+ else {
+ result->is_error = TRUE;
+ lua_pushstring (L, reply->str);
+ }
}
else {
result->is_error = TRUE;
- lua_pushstring (L, reply->str);
+ lua_pushliteral (L, "received no data from server");
}
}
else {
result->is_error = TRUE;
- lua_pushliteral (L, "received no data from server");
- }
- }
- else {
- result->is_error = TRUE;
- if (ac->err == REDIS_ERR_IO) {
- lua_pushstring (L, strerror (errno));
- }
- else {
- lua_pushstring (L, ac->errstr);
+ if (ac->err == REDIS_ERR_IO) {
+ lua_pushstring (L, strerror (errno));
+ }
+ else {
+ lua_pushstring (L, ac->errstr);
+ }
}
- }
- /* if error happened, we should terminate the connection,
- and release it */
+ /* if error happened, we should terminate the connection,
+ and release it */
- if (result->is_error && sp_ud->c->ctx) {
- ac = sp_ud->c->ctx;
- /* Set to NULL to avoid double free in dtor */
- sp_ud->c->ctx = NULL;
- ctx->flags |= LUA_REDIS_TERMINATED;
+ if (result->is_error && sp_ud->c->ctx) {
+ ac = sp_ud->c->ctx;
+ /* Set to NULL to avoid double free in dtor */
+ sp_ud->c->ctx = NULL;
+ ctx->flags |= LUA_REDIS_TERMINATED;
- /*
- * This will call all callbacks pending so the entire context
- * will be destructed
- */
- rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
- RSPAMD_REDIS_RELEASE_FATAL);
- }
+ /*
+ * This will call all callbacks pending so the entire context
+ * will be destructed
+ */
+ rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
+ RSPAMD_REDIS_RELEASE_FATAL);
+ }
+
+ result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ result->s = ud->s;
+ result->item = ud->item;
+ result->task = ud->task;
+ result->sp_ud = sp_ud;
- result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
- result->s = ud->s;
- result->item = ud->item;
- result->task = ud->task;
- result->sp_ud = sp_ud;
+ g_queue_push_tail (ctx->replies, result);
- g_queue_push_tail (ctx->replies, result);
+ }
ctx->cmds_pending --;
if (ctx->cmds_pending == 0) {
if (ctx->thread) {
- /* somebody yielded and waits for results */
- thread = ctx->thread;
- ctx->thread = NULL;
-
- results = lua_redis_push_results (ctx, thread->lua_state);
- lua_thread_resume (thread, results);
- lua_redis_cleanup_events (ctx);
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+ /* somebody yielded and waits for results */
+ thread = ctx->thread;
+ ctx->thread = NULL;
+
+ results = lua_redis_push_results(ctx, thread->lua_state);
+ lua_thread_resume (thread, results);
+ lua_redis_cleanup_events(ctx);
+ }
+ else {
+ /* We cannot resume the thread as the associated task has gone */
+ lua_thread_pool_terminate_entry (ud->cfg->lua_thread_pool,
+ ctx->thread);
+ ctx->thread = NULL;
+ }
}
}
+
}
static void
struct lua_redis_userdata *ud;
redisAsyncContext *ac;
+ if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
+ return;
+ }
+
ud = sp_ud->c;
ctx = sp_ud->ctx;
msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,