diff options
-rw-r--r-- | src/lua/lua_redis.c | 109 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.c | 6 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.h | 13 |
3 files changed, 78 insertions, 50 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 8e9e11dda..b624fc43b 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -612,75 +612,86 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); } - msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud); + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { + msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud); - struct lua_redis_result *result = g_malloc0 (sizeof *result); + struct lua_redis_result *result = g_malloc0 (sizeof *result); - /* If session is finished, we cannot call lua callbacks */ - if (ac->err == 0) { - if (r != NULL) { - if (reply->type != REDIS_REPLY_ERROR) { - result->is_error = FALSE; - lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA); + if (ac->err == 0) { + if (r != NULL) { + if (reply->type != REDIS_REPLY_ERROR) { + result->is_error = FALSE; + lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA); + } + else { + result->is_error = TRUE; + lua_pushstring (L, reply->str); + } } else { result->is_error = TRUE; - lua_pushstring (L, reply->str); + lua_pushliteral (L, "received no data from server"); } } else { result->is_error = TRUE; - lua_pushliteral (L, "received no data from server"); - } - } - else { - result->is_error = TRUE; - if (ac->err == REDIS_ERR_IO) { - lua_pushstring (L, strerror (errno)); - } - else { - lua_pushstring (L, ac->errstr); + if (ac->err == REDIS_ERR_IO) { + lua_pushstring (L, strerror (errno)); + } + else { + lua_pushstring (L, ac->errstr); + } } - } - /* if error happened, we should terminate the connection, - and release it */ + /* if error happened, we should terminate the connection, + and release it */ - if (result->is_error && sp_ud->c->ctx) { - ac = sp_ud->c->ctx; - /* Set to NULL to avoid double free in dtor */ - sp_ud->c->ctx = NULL; - ctx->flags |= LUA_REDIS_TERMINATED; + if (result->is_error && sp_ud->c->ctx) { + ac = sp_ud->c->ctx; + /* Set to NULL to avoid double free in dtor */ + sp_ud->c->ctx = NULL; + ctx->flags |= LUA_REDIS_TERMINATED; - /* - * This will call all callbacks pending so the entire context - * will be destructed - */ - rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, - RSPAMD_REDIS_RELEASE_FATAL); - } + /* + * This will call all callbacks pending so the entire context + * will be destructed + */ + rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); + } + + result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX); + result->s = ud->s; + result->item = ud->item; + result->task = ud->task; + result->sp_ud = sp_ud; - result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX); - result->s = ud->s; - result->item = ud->item; - result->task = ud->task; - result->sp_ud = sp_ud; + g_queue_push_tail (ctx->replies, result); - g_queue_push_tail (ctx->replies, result); + } ctx->cmds_pending --; if (ctx->cmds_pending == 0) { if (ctx->thread) { - /* somebody yielded and waits for results */ - thread = ctx->thread; - ctx->thread = NULL; - - results = lua_redis_push_results (ctx, thread->lua_state); - lua_thread_resume (thread, results); - lua_redis_cleanup_events (ctx); + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { + /* somebody yielded and waits for results */ + thread = ctx->thread; + ctx->thread = NULL; + + results = lua_redis_push_results(ctx, thread->lua_state); + lua_thread_resume (thread, results); + lua_redis_cleanup_events(ctx); + } + else { + /* We cannot resume the thread as the associated task has gone */ + lua_thread_pool_terminate_entry (ud->cfg->lua_thread_pool, + ctx->thread); + ctx->thread = NULL; + } } } + } static void @@ -692,6 +703,10 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents) struct lua_redis_userdata *ud; redisAsyncContext *ac; + if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) { + return; + } + ud = sp_ud->c; ctx = sp_ud->ctx; msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud, diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c index 89a516a27..01c55b4a9 100644 --- a/src/lua/lua_thread_pool.c +++ b/src/lua/lua_thread_pool.c @@ -157,8 +157,8 @@ lua_thread_pool_return_full (struct lua_thread_pool *pool, } } -static void -lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, +void +lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool, struct thread_entry *thread_entry, const gchar *loc) { struct thread_entry *ent = NULL; @@ -327,7 +327,7 @@ lua_resume_thread_internal_full (struct thread_entry *thread_entry, * Maybe there is a way to recover here. * For now, just remove faulty thread */ - lua_thread_pool_terminate_entry (pool, thread_entry, loc); + lua_thread_pool_terminate_entry_full (pool, thread_entry, loc); } } } diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h index be954271d..66c8b991c 100644 --- a/src/lua/lua_thread_pool.h +++ b/src/lua/lua_thread_pool.h @@ -183,6 +183,19 @@ lua_thread_resume_full (struct thread_entry *thread_entry, #define lua_thread_resume(thread_entry, narg) \ lua_thread_resume_full (thread_entry, narg, G_STRLOC) +/** + * Terminates thread pool entry and fill the pool with another thread entry if needed + * @param pool + * @param thread_entry + * @param loc + */ +void +lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool, + struct thread_entry *thread_entry, + const gchar *loc); +#define lua_thread_pool_terminate_entry(pool, thread_entry) \ + lua_thread_pool_terminate_entry_full (pool, thread_entry, G_STRLOC) + #ifdef __cplusplus } #endif |