aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lua/lua_redis.c109
-rw-r--r--src/lua/lua_thread_pool.c6
-rw-r--r--src/lua/lua_thread_pool.h13
3 files changed, 78 insertions, 50 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 8e9e11dda..b624fc43b 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -612,75 +612,86 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
}
- msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+ msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
- struct lua_redis_result *result = g_malloc0 (sizeof *result);
+ struct lua_redis_result *result = g_malloc0 (sizeof *result);
- /* If session is finished, we cannot call lua callbacks */
- if (ac->err == 0) {
- if (r != NULL) {
- if (reply->type != REDIS_REPLY_ERROR) {
- result->is_error = FALSE;
- lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+ if (ac->err == 0) {
+ if (r != NULL) {
+ if (reply->type != REDIS_REPLY_ERROR) {
+ result->is_error = FALSE;
+ lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+ }
+ else {
+ result->is_error = TRUE;
+ lua_pushstring (L, reply->str);
+ }
}
else {
result->is_error = TRUE;
- lua_pushstring (L, reply->str);
+ lua_pushliteral (L, "received no data from server");
}
}
else {
result->is_error = TRUE;
- lua_pushliteral (L, "received no data from server");
- }
- }
- else {
- result->is_error = TRUE;
- if (ac->err == REDIS_ERR_IO) {
- lua_pushstring (L, strerror (errno));
- }
- else {
- lua_pushstring (L, ac->errstr);
+ if (ac->err == REDIS_ERR_IO) {
+ lua_pushstring (L, strerror (errno));
+ }
+ else {
+ lua_pushstring (L, ac->errstr);
+ }
}
- }
- /* if error happened, we should terminate the connection,
- and release it */
+ /* if error happened, we should terminate the connection,
+ and release it */
- if (result->is_error && sp_ud->c->ctx) {
- ac = sp_ud->c->ctx;
- /* Set to NULL to avoid double free in dtor */
- sp_ud->c->ctx = NULL;
- ctx->flags |= LUA_REDIS_TERMINATED;
+ if (result->is_error && sp_ud->c->ctx) {
+ ac = sp_ud->c->ctx;
+ /* Set to NULL to avoid double free in dtor */
+ sp_ud->c->ctx = NULL;
+ ctx->flags |= LUA_REDIS_TERMINATED;
- /*
- * This will call all callbacks pending so the entire context
- * will be destructed
- */
- rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
- RSPAMD_REDIS_RELEASE_FATAL);
- }
+ /*
+ * This will call all callbacks pending so the entire context
+ * will be destructed
+ */
+ rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
+ RSPAMD_REDIS_RELEASE_FATAL);
+ }
+
+ result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ result->s = ud->s;
+ result->item = ud->item;
+ result->task = ud->task;
+ result->sp_ud = sp_ud;
- result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
- result->s = ud->s;
- result->item = ud->item;
- result->task = ud->task;
- result->sp_ud = sp_ud;
+ g_queue_push_tail (ctx->replies, result);
- g_queue_push_tail (ctx->replies, result);
+ }
ctx->cmds_pending --;
if (ctx->cmds_pending == 0) {
if (ctx->thread) {
- /* somebody yielded and waits for results */
- thread = ctx->thread;
- ctx->thread = NULL;
-
- results = lua_redis_push_results (ctx, thread->lua_state);
- lua_thread_resume (thread, results);
- lua_redis_cleanup_events (ctx);
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+ /* somebody yielded and waits for results */
+ thread = ctx->thread;
+ ctx->thread = NULL;
+
+ results = lua_redis_push_results(ctx, thread->lua_state);
+ lua_thread_resume (thread, results);
+ lua_redis_cleanup_events(ctx);
+ }
+ else {
+ /* We cannot resume the thread as the associated task has gone */
+ lua_thread_pool_terminate_entry (ud->cfg->lua_thread_pool,
+ ctx->thread);
+ ctx->thread = NULL;
+ }
}
}
+
}
static void
@@ -692,6 +703,10 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
struct lua_redis_userdata *ud;
redisAsyncContext *ac;
+ if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
+ return;
+ }
+
ud = sp_ud->c;
ctx = sp_ud->ctx;
msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
index 89a516a27..01c55b4a9 100644
--- a/src/lua/lua_thread_pool.c
+++ b/src/lua/lua_thread_pool.c
@@ -157,8 +157,8 @@ lua_thread_pool_return_full (struct lua_thread_pool *pool,
}
}
-static void
-lua_thread_pool_terminate_entry (struct lua_thread_pool *pool,
+void
+lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool,
struct thread_entry *thread_entry, const gchar *loc)
{
struct thread_entry *ent = NULL;
@@ -327,7 +327,7 @@ lua_resume_thread_internal_full (struct thread_entry *thread_entry,
* Maybe there is a way to recover here.
* For now, just remove faulty thread
*/
- lua_thread_pool_terminate_entry (pool, thread_entry, loc);
+ lua_thread_pool_terminate_entry_full (pool, thread_entry, loc);
}
}
}
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
index be954271d..66c8b991c 100644
--- a/src/lua/lua_thread_pool.h
+++ b/src/lua/lua_thread_pool.h
@@ -183,6 +183,19 @@ lua_thread_resume_full (struct thread_entry *thread_entry,
#define lua_thread_resume(thread_entry, narg) \
lua_thread_resume_full (thread_entry, narg, G_STRLOC)
+/**
+ * Terminates thread pool entry and fill the pool with another thread entry if needed
+ * @param pool
+ * @param thread_entry
+ * @param loc
+ */
+void
+lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool,
+ struct thread_entry *thread_entry,
+ const gchar *loc);
+#define lua_thread_pool_terminate_entry(pool, thread_entry) \
+ lua_thread_pool_terminate_entry_full (pool, thread_entry, G_STRLOC)
+
#ifdef __cplusplus
}
#endif