/*
 * Copyright 2024 Vsevolod Stakhov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "lua_common.h"
#include "lua_thread_pool.h"
#include "utlist.h"

#include "contrib/hiredis/hiredis.h"
#include "contrib/hiredis/async.h"

#define REDIS_DEFAULT_TIMEOUT 1.0

static const char *M = "rspamd lua redis";
static void *redis_null;

/***
 * @module rspamd_redis
 * This module implements redis asynchronous client for rspamd LUA API.
 * Here is an example of using of this module:
 * @example
local rspamd_redis = require "rspamd_redis"
local rspamd_logger = require "rspamd_logger"

local function symbol_callback(task)
	local redis_key = 'some_key'
	local function redis_cb(err, data)
		if not err then
			rspamd_logger.infox('redis returned %1=%2', redis_key, data)
		end
	end

	rspamd_redis.make_request(task, "127.0.0.1:6379", redis_cb,
		'GET', {redis_key})
	-- or in table form:
	-- rspamd_redis.make_request({task=task, host="127.0.0.1:6379,
	--	callback=redis_cb, timeout=2.0, cmd='GET', args={redis_key}})
end
 */

LUA_FUNCTION_DEF(redis, make_request);
LUA_FUNCTION_DEF(redis, make_request_sync);
LUA_FUNCTION_DEF(redis, connect);
LUA_FUNCTION_DEF(redis, connect_sync);
LUA_FUNCTION_DEF(redis, add_cmd);
LUA_FUNCTION_DEF(redis, exec);
LUA_FUNCTION_DEF(redis, gc);

static const struct luaL_reg redislib_f[] = {
	LUA_INTERFACE_DEF(redis, make_request),
	LUA_INTERFACE_DEF(redis, make_request_sync),
	LUA_INTERFACE_DEF(redis, connect),
	LUA_INTERFACE_DEF(redis, connect_sync),
	{NULL, NULL}};

static const struct luaL_reg redislib_m[] = {
	LUA_INTERFACE_DEF(redis, add_cmd),
	LUA_INTERFACE_DEF(redis, exec),
	{"__gc", lua_redis_gc},
	{"__tostring", rspamd_lua_class_tostring},
	{NULL, NULL}};

#undef REDIS_DEBUG_REFS
#ifdef REDIS_DEBUG_REFS
#define REDIS_RETAIN(x)                                                 \
	do {                                                                \
		msg_err("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \
		REF_RETAIN(x);                                                  \
	} while (0)

#define REDIS_RELEASE(x)                                                 \
	do {                                                                 \
		msg_err("release ref %p, refcount: %d", (x), (x)->ref.refcount); \
		REF_RELEASE(x);                                                  \
	} while (0)
#else
#define REDIS_RETAIN REF_RETAIN
#define REDIS_RELEASE REF_RELEASE
#endif

struct lua_redis_request_specific_userdata;
/**
 * Struct for userdata representation
 */
struct lua_redis_userdata {
	redisAsyncContext *ctx;
	struct rspamd_task *task;
	struct rspamd_symcache_dynamic_item *item;
	struct rspamd_async_session *s;
	struct ev_loop *event_loop;
	struct rspamd_config *cfg;
	struct rspamd_redis_pool *pool;
	char *server;
	char log_tag[RSPAMD_LOG_ID_LEN + 1];
	struct lua_redis_request_specific_userdata *specific;
	ev_tstamp timeout;
	uint16_t port;
	uint16_t terminated;
};

#define msg_debug_lua_redis(...) rspamd_conditional_debug_fast(NULL, NULL,                                        \
															   rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \
															   G_STRFUNC,                                         \
															   __VA_ARGS__)
INIT_LOG_MODULE(lua_redis)

#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
/* session was finished */
#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
#define LUA_REDIS_ASYNC (1 << 0)
#define LUA_REDIS_TEXTDATA (1 << 1)
#define LUA_REDIS_TERMINATED (1 << 2)
#define LUA_REDIS_NO_POOL (1 << 3)
#define LUA_REDIS_SUBSCRIBED (1 << 4)
#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)

struct lua_redis_request_specific_userdata {
	int cbref;
	unsigned int nargs;
	char **args;
	gsize *arglens;
	struct lua_redis_userdata *c;
	struct lua_redis_ctx *ctx;
	struct lua_redis_request_specific_userdata *next;
	ev_timer timeout_ev;
	unsigned int flags;
};

struct lua_redis_ctx {
	unsigned int flags;
	struct lua_redis_userdata async;
	unsigned int cmds_pending;
	ref_entry_t ref;
	GQueue *replies;             /* for sync connection only */
	GQueue *events_cleanup;      /* for sync connection only */
	struct thread_entry *thread; /* for sync mode, set only if there was yield */
};

struct lua_redis_result {
	gboolean is_error;
	int result_ref;
	struct rspamd_symcache_dynamic_item *item;
	struct rspamd_async_session *s;
	struct rspamd_task *task;
	struct lua_redis_request_specific_userdata *sp_ud;
};

static struct lua_redis_ctx *
lua_check_redis(lua_State *L, int pos)
{
	void *ud = rspamd_lua_check_udata(L, pos, rspamd_redis_classname);
	luaL_argcheck(L, ud != NULL, pos, "'redis' expected");
	return ud ? *((struct lua_redis_ctx **) ud) : NULL;
}

static void
lua_redis_free_args(char **args, gsize *arglens, unsigned int nargs)
{
	unsigned int i;

	if (args) {
		for (i = 0; i < nargs; i++) {
			g_free(args[i]);
		}

		g_free(args);
		g_free(arglens);
	}
}

static void
lua_redis_dtor(struct lua_redis_ctx *ctx)
{
	struct lua_redis_userdata *ud;
	struct lua_redis_request_specific_userdata *cur, *tmp;
	gboolean is_successful = TRUE;
	struct redisAsyncContext *ac;

	ud = &ctx->async;
	msg_debug_lua_redis("destructing %p", ctx);

	if (ud->ctx) {

		LL_FOREACH_SAFE(ud->specific, cur, tmp)
		{
			ev_timer_stop(ud->event_loop, &cur->timeout_ev);

			if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
				is_successful = FALSE;
			}

			cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
		}

		ctx->flags |= LUA_REDIS_TERMINATED;

		ud->terminated = 1;
		ac = ud->ctx;
		ud->ctx = NULL;

		if (!is_successful) {
			rspamd_redis_pool_release_connection(ud->pool, ac,
												 RSPAMD_REDIS_RELEASE_FATAL);
		}
		else {
			rspamd_redis_pool_release_connection(ud->pool, ac,
												 (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
		}
	}

	LL_FOREACH_SAFE(ud->specific, cur, tmp)
	{
		lua_redis_free_args(cur->args, cur->arglens, cur->nargs);

		if (cur->cbref != -1) {
			luaL_unref(ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
		}

		g_free(cur);
	}

	if (ctx->events_cleanup) {
		g_queue_free(ctx->events_cleanup);
		ctx->events_cleanup = NULL;
	}
	if (ctx->replies) {
		g_queue_free(ctx->replies);
		ctx->replies = NULL;
	}

	g_free(ctx);
}

static int
lua_redis_gc(lua_State *L)
{
	struct lua_redis_ctx *ctx = lua_check_redis(L, 1);

	if (ctx) {
		REDIS_RELEASE(ctx);
	}

	return 0;
}

static void
lua_redis_fin(void *arg)
{
	struct lua_redis_request_specific_userdata *sp_ud = arg;
	struct lua_redis_userdata *ud;
	struct lua_redis_ctx *ctx;

	ctx = sp_ud->ctx;
	ud = sp_ud->c;

	if (ev_can_stop(&sp_ud->timeout_ev)) {
		ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
	}

	msg_debug_lua_redis("finished redis query %p from session %p; refcount=%d",
						sp_ud, ctx, ctx->ref.refcount);
	sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;

	REDIS_RELEASE(ctx);
}

/**
 * Push error of redis request to lua callback
 * @param code
 * @param ud
 */
#ifdef __GNUC__
__attribute__((format(printf, 1, 5)))
#endif
static void
lua_redis_push_error(const char *err,
					 struct lua_redis_ctx *ctx,
					 struct lua_redis_request_specific_userdata *sp_ud,
					 gboolean connected,
					 ...)
{
	struct lua_redis_userdata *ud = sp_ud->c;
	struct lua_callback_state cbs;
	lua_State *L;

	va_list ap;
	va_start(ap, connected);

	if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED))) {
		if (sp_ud->cbref != -1) {

			lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs);
			L = cbs.L;

			lua_pushcfunction(L, &rspamd_lua_traceback);
			int err_idx = lua_gettop(L);
			/* Push error */
			lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);

			/* String of error */
			lua_pushvfstring(cbs.L, err, ap);
			/* Data is nil */
			lua_pushnil(cbs.L);

			if (ud->item) {
				rspamd_symcache_set_cur_item(ud->task, ud->item);
			}

			if (lua_pcall(cbs.L, 2, 0, err_idx) != 0) {
				msg_info("call to callback failed: %s", lua_tostring(cbs.L, -1));
			}

			lua_settop(L, err_idx - 1);
			lua_thread_pool_restore_callback(&cbs);
		}

		sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;

		if (connected && ud->s) {
			if (ud->item) {
				rspamd_symcache_item_async_dec_check(ud->task, ud->item, M);
			}

			rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud);
		}
		else {
			lua_redis_fin(sp_ud);
		}
	}

	va_end(ap);
}

static void
lua_redis_push_reply(lua_State *L, const redisReply *r, gboolean text_data)
{
	unsigned int i;
	struct rspamd_lua_text *t;

	switch (r->type) {
	case REDIS_REPLY_INTEGER:
		lua_pushinteger(L, r->integer);
		break;
	case REDIS_REPLY_NIL:
		lua_getfield(L, LUA_REGISTRYINDEX, "redis.null");
		break;
	case REDIS_REPLY_STRING:
	case REDIS_REPLY_STATUS:
		if (text_data) {
			t = lua_newuserdata(L, sizeof(*t));
			rspamd_lua_setclass(L, rspamd_text_classname, -1);
			t->flags = 0;
			t->start = r->str;
			t->len = r->len;
		}
		else {
			lua_pushlstring(L, r->str, r->len);
		}
		break;
	case REDIS_REPLY_ARRAY:
		lua_createtable(L, r->elements, 0);
		for (i = 0; i < r->elements; ++i) {
			lua_redis_push_reply(L, r->element[i], text_data);
			lua_rawseti(L, -2, i + 1); /* Store sub-reply */
		}
		break;
	default: /* should not happen */
		msg_info("unknown reply type: %d", r->type);
		break;
	}
}

/**
 * Push data of redis request to lua callback
 * @param r redis reply data
 * @param ud
 */
static void
lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx,
					struct lua_redis_request_specific_userdata *sp_ud)
{
	struct lua_redis_userdata *ud = sp_ud->c;
	struct lua_callback_state cbs;
	lua_State *L;

	if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED)) ||
		(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
		if (sp_ud->cbref != -1) {
			lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs);
			L = cbs.L;

			lua_pushcfunction(L, &rspamd_lua_traceback);
			int err_idx = lua_gettop(L);
			/* Push error */
			lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
			/* Error is nil */
			lua_pushnil(cbs.L);
			/* Data */
			lua_redis_push_reply(cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);

			if (ud->item) {
				rspamd_symcache_set_cur_item(ud->task, ud->item);
			}

			int ret = lua_pcall(cbs.L, 2, 0, err_idx);

			if (ret != 0) {
				msg_info("call to lua_redis callback failed (%d): %s",
						 ret, lua_tostring(cbs.L, -1));
			}

			lua_settop(L, err_idx - 1);
			lua_thread_pool_restore_callback(&cbs);
		}

		if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
			if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
				if (ev_can_stop(&sp_ud->timeout_ev)) {
					ev_timer_stop(sp_ud->ctx->async.event_loop,
								  &sp_ud->timeout_ev);
				}
			}
		}

		sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;

		if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
			if (ud->s) {
				if (ud->item) {
					rspamd_symcache_item_async_dec_check(ud->task,
														 ud->item, M);
				}

				rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud);
			}
			else {
				lua_redis_fin(sp_ud);
			}
		}
	}
}

/**
 * Callback for redis replies
 * @param c context of redis connection
 * @param r redis reply
 * @param priv userdata
 */
static void
lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv)
{
	redisReply *reply = r;
	struct lua_redis_request_specific_userdata *sp_ud = priv;
	struct lua_redis_ctx *ctx;
	struct lua_redis_userdata *ud;
	redisAsyncContext *ac;

	ctx = sp_ud->ctx;
	ud = sp_ud->c;

	if (ud->terminated || !rspamd_lua_is_initialised()) {
		/* We are already at the termination stage, just go out */
		return;
	}

	msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx,
						sp_ud);

	REDIS_RETAIN(ctx);

	/* If session is finished, we cannot call lua callbacks */
	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
		(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
		if (c->err == 0) {
			if (r != NULL) {
				if (reply->type != REDIS_REPLY_ERROR) {
					lua_redis_push_data(reply, ctx, sp_ud);
				}
				else {
					lua_redis_push_error("%s", ctx, sp_ud, TRUE, reply->str);
				}
			}
			else {
				lua_redis_push_error("received no data from server", ctx, sp_ud, TRUE);
			}
		}
		else {
			if (c->err == REDIS_ERR_IO) {
				lua_redis_push_error("%s", ctx, sp_ud, TRUE, strerror(errno));
			}
			else {
				lua_redis_push_error("%s", ctx, sp_ud, TRUE, c->errstr);
			}
		}
	}

	if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
		ctx->cmds_pending--;

		if (ctx->cmds_pending == 0 && !ud->terminated) {
			/* Disconnect redis early as we don't need it anymore */
			ud->terminated = 1;
			ac = ud->ctx;
			ud->ctx = NULL;

			if (ac) {
				msg_debug_lua_redis("release redis connection ud=%p; ctx=%p; refcount=%d",
									ud, ctx, ctx->ref.refcount);
				rspamd_redis_pool_release_connection(ud->pool, ac,
													 (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
			}
		}
	}

	REDIS_RELEASE(ctx);
}

static int
lua_redis_push_results(struct lua_redis_ctx *ctx, lua_State *L)
{
	int results = g_queue_get_length(ctx->replies);
	int i;
	gboolean can_use_lua = TRUE;

	results = g_queue_get_length(ctx->replies);

	if (!lua_checkstack(L, (results * 2) + 1)) {
		luaL_error(L, "cannot resize stack to fit %d commands",
				   ctx->cmds_pending);

		can_use_lua = FALSE;
	}

	for (i = 0; i < results; i++) {
		struct lua_redis_result *result = g_queue_pop_head(ctx->replies);

		if (can_use_lua) {
			lua_pushboolean(L, !result->is_error);
			lua_rawgeti(L, LUA_REGISTRYINDEX, result->result_ref);
		}

		luaL_unref(L, LUA_REGISTRYINDEX, result->result_ref);

		g_queue_push_tail(ctx->events_cleanup, result);
	}

	return can_use_lua ? results * 2 : 0;
}

static void
lua_redis_cleanup_events(struct lua_redis_ctx *ctx)
{
	REDIS_RETAIN(ctx); /* To avoid preliminary destruction */

	while (!g_queue_is_empty(ctx->events_cleanup)) {
		struct lua_redis_result *result = g_queue_pop_head(ctx->events_cleanup);

		if (result->item) {
			rspamd_symcache_item_async_dec_check(result->task, result->item, M);
		}

		if (result->s) {
			rspamd_session_remove_event(result->s, lua_redis_fin, result->sp_ud);
		}
		else {
			lua_redis_fin(result->sp_ud);
		}

		g_free(result);
	}

	REDIS_RELEASE(ctx);
}

/**
 * Callback for redis replies
 * @param c context of redis connection
 * @param r redis reply
 * @param priv userdata
 */
static void
lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
{
	redisReply *reply = r;

	struct lua_redis_request_specific_userdata *sp_ud = priv;
	struct lua_redis_ctx *ctx;
	struct lua_redis_userdata *ud;
	struct thread_entry *thread;
	int results;

	ctx = sp_ud->ctx;
	ud = sp_ud->c;
	lua_State *L = ctx->async.cfg->lua_state;

	sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;

	if (ud->terminated) {
		/* We are already at the termination stage, just go out */
		/* TODO:
		   if somebody is waiting for us (ctx->thread), return result,
		   otherwise, indeed, ignore
		 */
		return;
	}

	if (ev_can_stop(&sp_ud->timeout_ev)) {
		ev_timer_stop(ud->event_loop, &sp_ud->timeout_ev);
	}

	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
		msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud);

		struct lua_redis_result *result = g_malloc0(sizeof *result);

		if (ac->err == 0) {
			if (r != NULL) {
				if (reply->type != REDIS_REPLY_ERROR) {
					result->is_error = FALSE;
					lua_redis_push_reply(L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
				}
				else {
					result->is_error = TRUE;
					lua_pushstring(L, reply->str);
				}
			}
			else {
				result->is_error = TRUE;
				lua_pushliteral(L, "received no data from server");
			}
		}
		else {
			result->is_error = TRUE;
			if (ac->err == REDIS_ERR_IO) {
				lua_pushstring(L, strerror(errno));
			}
			else {
				lua_pushstring(L, ac->errstr);
			}
		}

		/* if error happened, we should terminate the connection,
		   and release it */

		if (result->is_error && sp_ud->c->ctx) {
			ac = sp_ud->c->ctx;
			/* Set to NULL to avoid double free in dtor */
			sp_ud->c->ctx = NULL;
			ctx->flags |= LUA_REDIS_TERMINATED;

			/*
			 * This will call all callbacks pending so the entire context
			 * will be destructed
			 */
			rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
												 RSPAMD_REDIS_RELEASE_FATAL);
		}

		result->result_ref = luaL_ref(L, LUA_REGISTRYINDEX);
		result->s = ud->s;
		result->item = ud->item;
		result->task = ud->task;
		result->sp_ud = sp_ud;

		g_queue_push_tail(ctx->replies, result);
	}

	ctx->cmds_pending--;

	if (ctx->cmds_pending == 0) {
		if (ctx->thread) {
			if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
				/* somebody yielded and waits for results */
				thread = ctx->thread;
				ctx->thread = NULL;

				results = lua_redis_push_results(ctx, thread->lua_state);

				if (ud->item) {
					rspamd_symcache_set_cur_item(ud->task, ud->item);
				}

				lua_thread_resume(thread, results);
				lua_redis_cleanup_events(ctx);
			}
			else {
				/* We cannot resume the thread as the associated task has gone */
				lua_thread_pool_terminate_entry_full(ud->cfg->lua_thread_pool,
													 ctx->thread, G_STRLOC, true);
				ctx->thread = NULL;
			}
		}
	}
}

static void
lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents)
{
	struct lua_redis_request_specific_userdata *sp_ud =
		(struct lua_redis_request_specific_userdata *) w->data;
	struct lua_redis_ctx *ctx;
	struct lua_redis_userdata *ud;
	redisAsyncContext *ac;

	if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
		return;
	}

	ud = sp_ud->c;
	ctx = sp_ud->ctx;
	msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud,
						sp_ud->c->ctx);

	if (sp_ud->c->ctx) {
		ac = sp_ud->c->ctx;

		/* Set to NULL to avoid double free in dtor */
		sp_ud->c->ctx = NULL;
		ac->err = REDIS_ERR_IO;
		errno = ETIMEDOUT;
		ctx->flags |= LUA_REDIS_TERMINATED;

		/*
		 * This will call all callbacks pending so the entire context
		 * will be destructed
		 */
		rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
											 RSPAMD_REDIS_RELEASE_FATAL);
	}
}

static void
lua_redis_timeout(EV_P_ ev_timer *w, int revents)
{
	struct lua_redis_request_specific_userdata *sp_ud =
		(struct lua_redis_request_specific_userdata *) w->data;
	struct lua_redis_userdata *ud;
	struct lua_redis_ctx *ctx;
	redisAsyncContext *ac;

	if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
		return;
	}

	ctx = sp_ud->ctx;
	ud = sp_ud->c;

	REDIS_RETAIN(ctx);
	msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud,
						sp_ud->c->ctx);
	lua_redis_push_error("timeout while connecting the server (%.2f sec)", ctx, sp_ud, TRUE, ud->timeout);

	if (sp_ud->c->ctx) {
		ac = sp_ud->c->ctx;
		/* Set to NULL to avoid double free in dtor */
		sp_ud->c->ctx = NULL;
		ac->err = REDIS_ERR_IO;
		errno = ETIMEDOUT;
		/*
		 * This will call all callbacks pending so the entire context
		 * will be destructed
		 */
		rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
											 RSPAMD_REDIS_RELEASE_FATAL);
	}

	REDIS_RELEASE(ctx);
}


static void
lua_redis_parse_args(lua_State *L, int idx, const char *cmd,
					 char ***pargs, gsize **parglens, unsigned int *nargs)
{
	char **args = NULL;
	gsize *arglens;
	int top;

	if (idx != 0 && lua_type(L, idx) == LUA_TTABLE) {
		/* Get all arguments */
		lua_pushvalue(L, idx);
		lua_pushnil(L);
		top = 0;

		while (lua_next(L, -2) != 0) {
			int type = lua_type(L, -1);

			if (type == LUA_TNUMBER || type == LUA_TSTRING ||
				type == LUA_TUSERDATA) {
				top++;
			}
			lua_pop(L, 1);
		}

		args = g_malloc((top + 1) * sizeof(char *));
		arglens = g_malloc((top + 1) * sizeof(gsize));
		arglens[0] = strlen(cmd);
		args[0] = g_malloc(arglens[0]);
		memcpy(args[0], cmd, arglens[0]);
		top = 1;
		lua_pushnil(L);

		while (lua_next(L, -2) != 0) {
			int type = lua_type(L, -1);

			if (type == LUA_TSTRING) {
				const char *s;

				s = lua_tolstring(L, -1, &arglens[top]);
				args[top] = g_malloc(arglens[top]);
				memcpy(args[top], s, arglens[top]);
				top++;
			}
			else if (type == LUA_TUSERDATA) {
				struct rspamd_lua_text *t;

				t = lua_check_text(L, -1);

				if (t && t->start) {
					arglens[top] = t->len;
					args[top] = g_malloc(arglens[top]);
					memcpy(args[top], t->start, arglens[top]);
					top++;
				}
			}
			else if (type == LUA_TNUMBER) {
				double val = lua_tonumber(L, -1);
				int r;
				char numbuf[64];

				if (val == (double) ((int64_t) val)) {
					r = rspamd_snprintf(numbuf, sizeof(numbuf), "%L",
										(int64_t) val);
				}
				else {
					r = rspamd_snprintf(numbuf, sizeof(numbuf), "%f",
										val);
				}

				arglens[top] = r;
				args[top] = g_malloc(arglens[top]);
				memcpy(args[top], numbuf, arglens[top]);
				top++;
			}

			lua_pop(L, 1);
		}

		lua_pop(L, 1);
	}
	else {
		/* Use merely cmd */

		args = g_malloc(sizeof(char *));
		arglens = g_malloc(sizeof(gsize));
		arglens[0] = strlen(cmd);
		args[0] = g_malloc(arglens[0]);
		memcpy(args[0], cmd, arglens[0]);
		top = 1;
	}

	*pargs = args;
	*parglens = arglens;
	*nargs = top;
}

static struct lua_redis_ctx *
rspamd_lua_redis_prepare_connection(lua_State *L, int *pcbref, gboolean is_async)
{
	struct lua_redis_ctx *ctx = NULL;
	rspamd_inet_addr_t *ip = NULL;
	struct lua_redis_userdata *ud = NULL;
	struct rspamd_lua_ip *addr = NULL;
	struct rspamd_task *task = NULL;
	const char *host = NULL;
	const char *username = NULL, *password = NULL, *dbname = NULL, *log_tag = NULL;
	int cbref = -1;
	struct rspamd_config *cfg = NULL;
	struct rspamd_async_session *session = NULL;
	struct ev_loop *ev_base = NULL;
	gboolean ret = FALSE;
	unsigned int flags = 0;

	if (lua_istable(L, 1)) {
		/* Table version */
		lua_pushvalue(L, 1);
		lua_pushstring(L, "task");
		lua_gettable(L, -2);
		if (lua_type(L, -1) == LUA_TUSERDATA) {
			task = lua_check_task_maybe(L, -1);
		}
		lua_pop(L, 1);

		if (!task) {
			/* We need to get ev_base, config and session separately */
			lua_pushstring(L, "config");
			lua_gettable(L, -2);
			if (lua_type(L, -1) == LUA_TUSERDATA) {
				cfg = lua_check_config(L, -1);
			}
			lua_pop(L, 1);

			lua_pushstring(L, "session");
			lua_gettable(L, -2);
			if (lua_type(L, -1) == LUA_TUSERDATA) {
				session = lua_check_session(L, -1);
			}
			lua_pop(L, 1);

			lua_pushstring(L, "ev_base");
			lua_gettable(L, -2);
			if (lua_type(L, -1) == LUA_TUSERDATA) {
				ev_base = lua_check_ev_base(L, -1);
			}
			lua_pop(L, 1);

			if (cfg && ev_base) {
				ret = TRUE;
			}
			else if (!cfg) {
				msg_err_task_check("config is not passed");
			}
			else {
				msg_err_task_check("ev_base is not set");
			}
		}
		else {
			cfg = task->cfg;
			session = task->s;
			ev_base = task->event_loop;
			log_tag = task->task_pool->tag.uid;
			ret = TRUE;
		}

		if (pcbref) {
			lua_pushstring(L, "callback");
			lua_gettable(L, -2);
			if (lua_type(L, -1) == LUA_TFUNCTION) {
				/* This also pops function from the stack */
				cbref = luaL_ref(L, LUA_REGISTRYINDEX);
				*pcbref = cbref;
			}
			else {
				*pcbref = -1;
				lua_pop(L, 1);
			}
		}

		lua_pushstring(L, "host");
		lua_gettable(L, -2);

		if (lua_type(L, -1) == LUA_TUSERDATA) {
			addr = lua_check_ip(L, -1);
			host = rspamd_inet_address_to_string_pretty(addr->addr);
		}
		else if (lua_type(L, -1) == LUA_TSTRING) {
			host = lua_tostring(L, -1);

			if (rspamd_parse_inet_address(&ip,
										  host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
				addr = g_alloca(sizeof(*addr));
				addr->addr = ip;

				if (rspamd_inet_address_get_port(ip) == 0) {
					rspamd_inet_address_set_port(ip, 6379);
				}
			}
		}
		lua_pop(L, 1);

		lua_pushstring(L, "username");
		lua_gettable(L, -2);
		if (lua_type(L, -1) == LUA_TSTRING) {
			username = lua_tostring(L, -1);
		}
		lua_pop(L, 1);

		lua_pushstring(L, "password");
		lua_gettable(L, -2);
		if (lua_type(L, -1) == LUA_TSTRING) {
			password = lua_tostring(L, -1);
		}
		lua_pop(L, 1);

		lua_pushstring(L, "dbname");
		lua_gettable(L, -2);
		if (lua_type(L, -1) == LUA_TSTRING) {
			dbname = lua_tostring(L, -1);
		}
		lua_pop(L, 1);

		lua_pushstring(L, "opaque_data");
		lua_gettable(L, -2);
		if (!!lua_toboolean(L, -1)) {
			flags |= LUA_REDIS_TEXTDATA;
		}
		lua_pop(L, 1);

		lua_pushstring(L, "no_pool");
		lua_gettable(L, -2);
		if (!!lua_toboolean(L, -1)) {
			flags |= LUA_REDIS_NO_POOL;
		}
		lua_pop(L, 1);

		lua_pop(L, 1); /* table */

		if (session && rspamd_session_blocked(session)) {
			msg_err_task_check("Session is being destroying");
			ret = FALSE;
		}

		if (ret && addr != NULL) {
			ctx = g_malloc0(sizeof(struct lua_redis_ctx));
			REF_INIT_RETAIN(ctx, lua_redis_dtor);
			if (is_async) {
				ctx->flags |= flags | LUA_REDIS_ASYNC;
				ud = &ctx->async;
			}
			else {
				ud = &ctx->async;
				ctx->replies = g_queue_new();
				ctx->events_cleanup = g_queue_new();
			}

			ud->s = session;
			ud->cfg = cfg;
			ud->pool = cfg->redis_pool;
			ud->event_loop = ev_base;
			ud->task = task;

			if (log_tag) {
				rspamd_strlcpy(ud->log_tag, log_tag, sizeof(ud->log_tag));
			}
			else {
				/* Use pointer itself as a tag */
				rspamd_snprintf(ud->log_tag, sizeof(ud->log_tag),
								"%ud",
								(int) rspamd_cryptobox_fast_hash(&ud, sizeof(ud), 0));
			}

			if (task) {
				ud->item = rspamd_symcache_get_cur_item(task);
			}

			ret = TRUE;
		}
		else {
			if (cbref != -1) {
				luaL_unref(L, LUA_REGISTRYINDEX, cbref);
			}

			msg_err_task_check("incorrect function invocation");
			ret = FALSE;
		}
	}

	if (ret) {
		ud->terminated = 0;
		ud->ctx = rspamd_redis_pool_connect(ud->pool,
											dbname, username, password,
											rspamd_inet_address_to_string(addr->addr),
											rspamd_inet_address_get_port(addr->addr));

		if (ip) {
			rspamd_inet_address_free(ip);
		}

		if (ud->ctx == NULL || ud->ctx->err) {
			if (ud->ctx) {
				msg_err_task_check("cannot connect to redis: %s",
								   ud->ctx->errstr);
				rspamd_redis_pool_release_connection(ud->pool, ud->ctx,
													 RSPAMD_REDIS_RELEASE_FATAL);
				ud->ctx = NULL;
			}
			else {
				msg_err_task_check("cannot connect to redis (OS error): %s",
								   strerror(errno));
			}

			REDIS_RELEASE(ctx);

			return NULL;
		}

		msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p",
							host, ctx, ud);

		return ctx;
	}

	if (ip) {
		rspamd_inet_address_free(ip);
	}

	return NULL;
}

/***
 * @function rspamd_redis.make_request({params})
 * Make request to redis server, params is a table of key=value arguments in any order
 * @param {task} task worker task object
 * @param {ip|string} host server address
 * @param {function} callback callback to be called in form `function (task, err, data)`
 * @param {string} cmd command to be sent to redis
 * @param {table} args numeric array of strings used as redis arguments
 * @param {number} timeout timeout in seconds for request (1.0 by default)
 * @return {boolean} `true` if a request has been scheduled
 */
static int
lua_redis_make_request(lua_State *L)
{
	LUA_TRACE_POINT;
	struct lua_redis_request_specific_userdata *sp_ud;
	struct lua_redis_userdata *ud;
	struct lua_redis_ctx *ctx, **pctx;
	const char *cmd = NULL;
	double timeout = REDIS_DEFAULT_TIMEOUT;
	int cbref = -1;
	gboolean ret = FALSE;

	ctx = rspamd_lua_redis_prepare_connection(L, &cbref, TRUE);

	if (ctx) {
		ud = &ctx->async;
		sp_ud = g_malloc0(sizeof(*sp_ud));
		sp_ud->cbref = cbref;
		sp_ud->c = ud;
		sp_ud->ctx = ctx;

		lua_pushstring(L, "cmd");
		lua_gettable(L, -2);
		cmd = lua_tostring(L, -1);
		lua_pop(L, 1);

		lua_pushstring(L, "timeout");
		lua_gettable(L, 1);
		if (lua_type(L, -1) == LUA_TNUMBER) {
			timeout = lua_tonumber(L, -1);
		}
		lua_pop(L, 1);
		ud->timeout = timeout;


		lua_pushstring(L, "args");
		lua_gettable(L, 1);
		lua_redis_parse_args(L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
							 &sp_ud->nargs);
		lua_pop(L, 1);
		LL_PREPEND(ud->specific, sp_ud);

		ret = redisAsyncCommandArgv(ud->ctx,
									lua_redis_callback,
									sp_ud,
									sp_ud->nargs,
									(const char **) sp_ud->args,
									sp_ud->arglens);

		if (ret == REDIS_OK) {
			if (ud->s) {
				rspamd_session_add_event(ud->s,
										 lua_redis_fin, sp_ud,
										 M);

				if (ud->item) {
					rspamd_symcache_item_async_inc(ud->task, ud->item, M);
				}
			}

			REDIS_RETAIN(ctx); /* Cleared by fin event */
			ctx->cmds_pending++;

			if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
				msg_debug_lua_redis("subscribe command, never unref/timeout");
				sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
			}

			sp_ud->timeout_ev.data = sp_ud;
			ev_now_update_if_cheap((struct ev_loop *) ud->event_loop);
			ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
			ev_timer_start(ud->event_loop, &sp_ud->timeout_ev);

			ret = TRUE;
		}
		else {
			msg_info("call to redis failed: %s", ud->ctx->errstr);
			rspamd_redis_pool_release_connection(ud->pool, ud->ctx,
												 RSPAMD_REDIS_RELEASE_FATAL);
			ud->ctx = NULL;
			REDIS_RELEASE(ctx);
			ret = FALSE;
		}
	}
	else {
		lua_pushboolean(L, FALSE);
		lua_pushnil(L);

		return 2;
	}

	lua_pushboolean(L, ret);

	if (ret) {
		pctx = lua_newuserdata(L, sizeof(ctx));
		*pctx = ctx;
		rspamd_lua_setclass(L, rspamd_redis_classname, -1);
	}
	else {
		lua_pushnil(L);
	}

	return 2;
}

/***
 * @function rspamd_redis.make_request_sync({params})
 * Make blocking request to redis server, params is a table of key=value arguments in any order
 * @param {ip|string} host server address
 * @param {string} cmd command to be sent to redis
 * @param {table} args numeric array of strings used as redis arguments
 * @param {number} timeout timeout in seconds for request (1.0 by default)
 * @return {boolean + result} `true` and a result if a request has been successful
 */
static int
lua_redis_make_request_sync(lua_State *L)
{
	LUA_TRACE_POINT;
	struct rspamd_lua_ip *addr = NULL;
	rspamd_inet_addr_t *ip = NULL;
	const char *cmd = NULL, *host;
	struct timeval tv;
	gboolean ret = FALSE;
	double timeout = REDIS_DEFAULT_TIMEOUT;
	char **args = NULL;
	gsize *arglens = NULL;
	unsigned int nargs = 0, flags = 0;
	redisContext *ctx;
	redisReply *r;

	if (lua_istable(L, 1)) {
		lua_pushvalue(L, 1);

		lua_pushstring(L, "cmd");
		lua_gettable(L, -2);
		cmd = lua_tostring(L, -1);
		lua_pop(L, 1);

		lua_pushstring(L, "host");
		lua_gettable(L, -2);
		if (lua_type(L, -1) == LUA_TUSERDATA) {
			addr = lua_check_ip(L, -1);
		}
		else if (lua_type(L, -1) == LUA_TSTRING) {
			host = lua_tostring(L, -1);
			if (rspamd_parse_inet_address(&ip,
										  host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
				addr = g_alloca(sizeof(*addr));
				addr->addr = ip;

				if (rspamd_inet_address_get_port(ip) == 0) {
					rspamd_inet_address_set_port(ip, 6379);
				}
			}
		}
		lua_pop(L, 1);

		lua_pushstring(L, "timeout");
		lua_gettable(L, -2);
		if (lua_type(L, -1) == LUA_TNUMBER) {
			timeout = lua_tonumber(L, -1);
		}
		lua_pop(L, 1);

		lua_pushstring(L, "opaque_data");
		lua_gettable(L, -2);
		if (!!lua_toboolean(L, -1)) {
			flags |= LUA_REDIS_TEXTDATA;
		}
		lua_pop(L, 1);


		if (cmd) {
			lua_pushstring(L, "args");
			lua_gettable(L, -2);
			lua_redis_parse_args(L, -1, cmd, &args, &arglens, &nargs);
			lua_pop(L, 1);
		}

		lua_pop(L, 1);

		if (addr && cmd) {
			ret = TRUE;
		}
	}

	if (ret) {
		double_to_tv(timeout, &tv);

		if (rspamd_inet_address_get_af(addr->addr) == AF_UNIX) {
			ctx = redisConnectUnixWithTimeout(
				rspamd_inet_address_to_string(addr->addr), tv);
		}
		else {
			ctx = redisConnectWithTimeout(
				rspamd_inet_address_to_string(addr->addr),
				rspamd_inet_address_get_port(addr->addr), tv);
		}

		if (ip) {
			rspamd_inet_address_free(ip);
		}

		if (ctx == NULL || ctx->err) {
			redisFree(ctx);
			lua_redis_free_args(args, arglens, nargs);
			lua_pushboolean(L, FALSE);

			return 1;
		}

		r = redisCommandArgv(ctx,
							 nargs,
							 (const char **) args,
							 arglens);

		if (r != NULL) {
			if (r->type != REDIS_REPLY_ERROR) {
				lua_pushboolean(L, TRUE);
				lua_redis_push_reply(L, r, flags & LUA_REDIS_TEXTDATA);
			}
			else {
				lua_pushboolean(L, FALSE);
				lua_pushstring(L, r->str);
			}

			freeReplyObject(r);
			redisFree(ctx);
			lua_redis_free_args(args, arglens, nargs);

			return 2;
		}
		else {
			msg_info("call to redis failed: %s", ctx->errstr);
			redisFree(ctx);
			lua_redis_free_args(args, arglens, nargs);
			lua_pushboolean(L, FALSE);
		}
	}
	else {
		if (ip) {
			rspamd_inet_address_free(ip);
		}
		msg_err("bad arguments for redis request");
		lua_redis_free_args(args, arglens, nargs);

		lua_pushboolean(L, FALSE);
	}

	return 1;
}

/***
 * @function rspamd_redis.connect({params})
 * Make request to redis server, params is a table of key=value arguments in any order
 * @param {task} task worker task object
 * @param {ip|string} host server address
 * @param {number} timeout timeout in seconds for request (1.0 by default)
 * @return {boolean,redis} new connection object or nil if connection failed
 */
static int
lua_redis_connect(lua_State *L)
{
	LUA_TRACE_POINT;
	struct lua_redis_userdata *ud;
	struct lua_redis_ctx *ctx, **pctx;
	double timeout = REDIS_DEFAULT_TIMEOUT;

	ctx = rspamd_lua_redis_prepare_connection(L, NULL, TRUE);

	if (ctx) {
		ud = &ctx->async;

		lua_pushstring(L, "timeout");
		lua_gettable(L, 1);
		if (lua_type(L, -1) == LUA_TNUMBER) {
			timeout = lua_tonumber(L, -1);
		}

		lua_pop(L, 1);
		ud->timeout = timeout;
	}
	else {
		lua_pushboolean(L, FALSE);
		lua_pushnil(L);

		return 2;
	}

	lua_pushboolean(L, TRUE);
	pctx = lua_newuserdata(L, sizeof(ctx));
	*pctx = ctx;
	rspamd_lua_setclass(L, rspamd_redis_classname, -1);

	return 2;
}

/***
 * @function rspamd_redis.connect_sync({params})
 * Make blocking request to redis server, params is a table of key=value arguments in any order
 * @param {ip|string} host server address
 * @param {number} timeout timeout in seconds for request (1.0 by default)
 * @return {redis} redis object if a request has been successful
 */
static int
lua_redis_connect_sync(lua_State *L)
{
	LUA_TRACE_POINT;
	double timeout = REDIS_DEFAULT_TIMEOUT;
	struct lua_redis_ctx *ctx, **pctx;

	ctx = rspamd_lua_redis_prepare_connection(L, NULL, FALSE);

	if (ctx) {
		if (lua_istable(L, 1)) {
			lua_pushstring(L, "timeout");
			lua_gettable(L, 1);
			if (lua_type(L, -1) == LUA_TNUMBER) {
				timeout = lua_tonumber(L, -1);
			}
			lua_pop(L, 1);
		}

		ctx->async.timeout = timeout;

		lua_pushboolean(L, TRUE);
		pctx = lua_newuserdata(L, sizeof(ctx));
		*pctx = ctx;
		rspamd_lua_setclass(L, rspamd_redis_classname, -1);
	}
	else {
		lua_pushboolean(L, FALSE);
		lua_pushstring(L, "bad arguments for redis request");
		return 2;
	}

	return 2;
}

/***
 * @method rspamd_redis:add_cmd(cmd, {args})
 * Append new cmd to redis pipeline
 * @param {string} cmd command to be sent to redis
 * @param {table} args array of strings used as redis arguments
 * @return {boolean} `true` if a request has been successful
 */
static int
lua_redis_add_cmd(lua_State *L)
{
	LUA_TRACE_POINT;
	struct lua_redis_ctx *ctx = lua_check_redis(L, 1);
	struct lua_redis_request_specific_userdata *sp_ud;
	struct lua_redis_userdata *ud;
	const char *cmd = NULL;
	int args_pos = 2;
	int cbref = -1, ret;

	if (ctx) {
		if (ctx->flags & LUA_REDIS_TERMINATED) {
			lua_pushboolean(L, FALSE);
			lua_pushstring(L, "Connection is terminated");

			return 2;
		}

		/* Async version */
		if (lua_type(L, 2) == LUA_TSTRING) {
			/* No callback version */
			cmd = lua_tostring(L, 2);
			args_pos = 3;
		}
		else if (lua_type(L, 2) == LUA_TFUNCTION) {
			lua_pushvalue(L, 2);
			cbref = luaL_ref(L, LUA_REGISTRYINDEX);
			cmd = lua_tostring(L, 3);
			args_pos = 4;
		}
		else {
			return luaL_error(L, "invalid arguments");
		}

		sp_ud = g_malloc0(sizeof(*sp_ud));
		if (IS_ASYNC(ctx)) {
			sp_ud->c = &ctx->async;
			ud = &ctx->async;
			sp_ud->cbref = cbref;
		}
		else {
			sp_ud->c = &ctx->async;
			ud = &ctx->async;
		}
		sp_ud->ctx = ctx;

		lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args,
							 &sp_ud->arglens, &sp_ud->nargs);

		LL_PREPEND(sp_ud->c->specific, sp_ud);

		if (ud->s && rspamd_session_blocked(ud->s)) {
			lua_pushboolean(L, 0);
			lua_pushstring(L, "session is terminating");

			return 2;
		}

		if (IS_ASYNC(ctx)) {
			ret = redisAsyncCommandArgv(sp_ud->c->ctx,
										lua_redis_callback,
										sp_ud,
										sp_ud->nargs,
										(const char **) sp_ud->args,
										sp_ud->arglens);
		}
		else {
			ret = redisAsyncCommandArgv(sp_ud->c->ctx,
										lua_redis_callback_sync,
										sp_ud,
										sp_ud->nargs,
										(const char **) sp_ud->args,
										sp_ud->arglens);
		}

		if (ret == REDIS_OK) {
			if (ud->s) {
				rspamd_session_add_event(ud->s,
										 lua_redis_fin,
										 sp_ud,
										 M);

				if (ud->item) {
					rspamd_symcache_item_async_inc(ud->task, ud->item, M);
				}
			}

			sp_ud->timeout_ev.data = sp_ud;

			if (IS_ASYNC(ctx)) {
				ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout,
							  sp_ud->c->timeout, 0.0);
			}
			else {
				ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync,
							  sp_ud->c->timeout, 0.0);
			}

			ev_timer_start(ud->event_loop, &sp_ud->timeout_ev);
			REDIS_RETAIN(ctx);
			ctx->cmds_pending++;
		}
		else {
			msg_info("call to redis failed: %s",
					 sp_ud->c->ctx->errstr);
			lua_pushboolean(L, 0);
			lua_pushstring(L, sp_ud->c->ctx->errstr);

			return 2;
		}
	}

	lua_pushboolean(L, true);

	return 1;
}

/***
 * @method rspamd_redis:exec()
 * Executes pending commands (suitable for blocking IO only for now)
 * @return {boolean}, {table}, ...: pairs in format [bool, result] for each request pending
 */
static int
lua_redis_exec(lua_State *L)
{
	LUA_TRACE_POINT;
	struct lua_redis_ctx *ctx = lua_check_redis(L, 1);

	if (ctx == NULL) {
		lua_error(L);

		return 1;
	}

	if (IS_ASYNC(ctx)) {
		lua_pushstring(L, "Async redis pipelining is not implemented");
		lua_error(L);
		return 0;
	}
	else {
		if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) {
			lua_pushstring(L, "No pending commands to execute");
			lua_error(L);
		}
		if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) {
			int results = lua_redis_push_results(ctx, L);
			return results;
		}
		else {
			ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool);
			return lua_thread_yield(ctx->thread, 0);
		}
	}
}

static int
lua_load_redis(lua_State *L)
{
	lua_newtable(L);
	luaL_register(L, NULL, redislib_f);

	return 1;
}

static int
lua_redis_null_idx(lua_State *L)
{
	lua_pushnil(L);

	return 1;
}

static void
lua_redis_null_mt(lua_State *L)
{
	luaL_newmetatable(L, "redis{null}");

	lua_pushcfunction(L, lua_redis_null_idx);
	lua_setfield(L, -2, "__index");
	lua_pushcfunction(L, lua_redis_null_idx);
	lua_setfield(L, -2, "__tostring");

	lua_pop(L, 1);
}

/**
 * Open redis library
 * @param L lua stack
 * @return
 */
void luaopen_redis(lua_State *L)
{
	rspamd_lua_new_class(L, rspamd_redis_classname, redislib_m);
	lua_pop(L, 1);
	rspamd_lua_add_preload(L, "rspamd_redis", lua_load_redis);

	/* Set null element */
	lua_redis_null_mt(L);
	redis_null = lua_newuserdata(L, 0);
	luaL_getmetatable(L, "redis{null}");
	lua_setmetatable(L, -2);
	lua_setfield(L, LUA_REGISTRYINDEX, "redis.null");
}