diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-08-30 18:09:47 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-08-30 18:09:47 +0100 |
commit | 75c5450fe3c7156b9df7bc88d6d0377eef909c21 (patch) | |
tree | 31c0c446ce3276ef4f6d31eecfd116f6d759cf5c | |
parent | 6030bd28ff3f08cd798ab2c30a726accbaa0596a (diff) | |
download | rspamd-75c5450fe3c7156b9df7bc88d6d0377eef909c21.tar.gz rspamd-75c5450fe3c7156b9df7bc88d6d0377eef909c21.zip |
[Minor] Various fixes in redis pool
-rw-r--r-- | src/libserver/redis_pool.c | 59 | ||||
-rw-r--r-- | src/lua/lua_redis.c | 60 |
2 files changed, 57 insertions, 62 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c index cfc2757be..8ef0f9ad5 100644 --- a/src/libserver/redis_pool.c +++ b/src/libserver/redis_pool.c @@ -20,6 +20,7 @@ #include "cfg_file.h" #include "hiredis/hiredis.h" #include "hiredis/async.h" +#include "hiredis/adapters/libevent.h" #include "cryptobox.h" #include "ref.h" @@ -78,14 +79,19 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password, static void rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c) { - if (c->active && c->ctx != NULL) { - g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx); - redisAsyncFree (c->ctx); + if (c->active) { + if (c->ctx) { + g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx); + redisAsyncFree (c->ctx); + } + g_queue_unlink (c->elt->active, c->entry); } + else { + if (event_get_base (&c->timeout)) { + event_del (&c->timeout); + } - if (!c->active && event_get_base (&c->timeout)) { - event_del (&c->timeout); g_queue_unlink (c->elt->inactive, c->entry); } @@ -162,20 +168,30 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, ctx = redisAsyncConnect (ip, port); if (ctx) { - conn = g_slice_alloc0 (sizeof (conn)); - conn->entry = g_list_prepend (NULL, conn); - conn->elt = elt; - conn->active = TRUE; - g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); - g_queue_push_head_link (elt->active, conn->entry); - conn->ctx = ctx; - REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor); - - if (password) { - redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); + + if (ctx->err != REDIS_OK) { + redisAsyncFree (ctx); + + return NULL; } - if (db) { - redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); + else { + conn = g_slice_alloc0 (sizeof (*conn)); + conn->entry = g_list_prepend (NULL, conn); + conn->elt = elt; + conn->active = TRUE; + g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); + g_queue_push_head_link (elt->active, conn->entry); + conn->ctx = ctx; + REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor); + + redisLibeventAttach (ctx, pool->ev_base); + + if (password) { + redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); + } + if (db) { + redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); + } } return conn; @@ -252,7 +268,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, conn->active = TRUE; g_queue_push_tail_link (elt->active, conn_entry); - REF_RETAIN (conn); } else { @@ -260,8 +275,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, conn = rspamd_redis_pool_new_connection (pool, elt, db, password, ip, port); } - - return conn->ctx; } else { elt = rspamd_redis_pool_new_elt (pool); @@ -272,6 +285,8 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, db, password, ip, port); } + REF_RETAIN (conn); + return conn->ctx; } @@ -289,7 +304,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, if (conn != NULL) { REF_RELEASE (conn); - if (is_fatal) { + if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) { /* We need to terminate connection forcefully */ REF_RELEASE (conn); } diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index c35d9614b..b7fb8f6c4 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -17,10 +17,8 @@ #include "dns.h" #include "utlist.h" -#ifdef WITH_HIREDIS -#include "hiredis.h" -#include "adapters/libevent.h" -#endif +#include "hiredis/hiredis.h" +#include "hiredis/async.h" #define REDIS_DEFAULT_TIMEOUT 1.0 @@ -155,6 +153,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) struct lua_redis_userdata *ud; struct lua_redis_specific_userdata *cur, *tmp; gboolean is_connected = FALSE; + struct redisAsyncContext *ac; if (ctx->async) { msg_debug ("desctructing %p", ctx); @@ -168,7 +167,10 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) * still be alive here! */ ctx->ref.refcount = 100500; - redisAsyncFree (ud->ctx); + ac = ud->ctx; + ud->ctx = NULL; + rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool, + ac, FALSE); ctx->ref.refcount = 0; is_connected = TRUE; } @@ -384,8 +386,9 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) ac = ud->ctx; ud->ctx = NULL; - if (ac != NULL) { - redisAsyncFree (ac); + if (ac) { + rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool, + ac, FALSE); } } @@ -413,7 +416,8 @@ lua_redis_timeout (int fd, short what, gpointer u) * This will call all callbacks pending so the entire context * will be destructed */ - redisAsyncFree (ac); + rspamd_redis_pool_release_connection (sp_ud->c->task->cfg->redis_pool, + ac, TRUE); } REDIS_RELEASE (ctx); } @@ -464,22 +468,6 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, *nargs = top; } -static void -lua_redis_connect_cb (const struct redisAsyncContext *c, int status) -{ - /* - * Workaround to prevent double close: - * https://groups.google.com/forum/#!topic/redis-db/mQm46XkIPOY - */ -#if defined(HIREDIS_MAJOR) && HIREDIS_MAJOR == 0 && HIREDIS_MINOR <= 11 - struct redisAsyncContext *nc = (struct redisAsyncContext *)c; - if (status == REDIS_ERR) { - nc->c.fd = -1; - } -#endif -} - - /*** * @function rspamd_redis.make_request({params}) @@ -662,14 +650,15 @@ lua_redis_make_request (lua_State *L) if (ret) { ud->terminated = 0; ud->timeout = timeout; - ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr), + ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool, + dbname, password, + rspamd_inet_address_to_string (addr->addr), rspamd_inet_address_get_port (addr->addr)); if (ud->ctx == NULL || ud->ctx->err) { if (ud->ctx) { msg_err_task_check ("cannot connect to redis: %s", ud->ctx->errstr); - redisAsyncFree (ud->ctx); ud->ctx = NULL; } else { @@ -683,16 +672,6 @@ lua_redis_make_request (lua_State *L) return 2; } - redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb); - redisLibeventAttach (ud->ctx, ud->task->ev_base); - - if (password) { - redisAsyncCommand (ud->ctx, NULL, NULL, "AUTH %s", password); - } - if (dbname) { - redisAsyncCommand (ud->ctx, NULL, NULL, "SELECT %s", dbname); - } - ret = redisAsyncCommandArgv (ud->ctx, lua_redis_callback, sp_ud, @@ -719,7 +698,8 @@ lua_redis_make_request (lua_State *L) } else { msg_info_task_check ("call to redis failed: %s", ud->ctx->errstr); - redisAsyncFree (ud->ctx); + rspamd_redis_pool_release_connection (task->cfg->redis_pool, + ud->ctx, FALSE); ud->ctx = NULL; REDIS_RELEASE (ctx); ret = FALSE; @@ -936,7 +916,9 @@ lua_redis_connect (lua_State *L) if (ret && ctx) { ud->terminated = 0; ud->timeout = timeout; - ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr), + ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool, + NULL, NULL, + rspamd_inet_address_to_string (addr->addr), rspamd_inet_address_get_port (addr->addr)); if (ud->ctx == NULL || ud->ctx->err) { @@ -948,8 +930,6 @@ lua_redis_connect (lua_State *L) return 1; } - redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb); - redisLibeventAttach (ud->ctx, ud->task->ev_base); pctx = lua_newuserdata (L, sizeof (ctx)); *pctx = ctx; rspamd_lua_setclass (L, "rspamd{redis}", -1); |