summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-08-30 18:09:47 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-08-30 18:09:47 +0100
commit75c5450fe3c7156b9df7bc88d6d0377eef909c21 (patch)
tree31c0c446ce3276ef4f6d31eecfd116f6d759cf5c
parent6030bd28ff3f08cd798ab2c30a726accbaa0596a (diff)
downloadrspamd-75c5450fe3c7156b9df7bc88d6d0377eef909c21.tar.gz
rspamd-75c5450fe3c7156b9df7bc88d6d0377eef909c21.zip
[Minor] Various fixes in redis pool
-rw-r--r--src/libserver/redis_pool.c59
-rw-r--r--src/lua/lua_redis.c60
2 files changed, 57 insertions, 62 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
index cfc2757be..8ef0f9ad5 100644
--- a/src/libserver/redis_pool.c
+++ b/src/libserver/redis_pool.c
@@ -20,6 +20,7 @@
#include "cfg_file.h"
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
+#include "hiredis/adapters/libevent.h"
#include "cryptobox.h"
#include "ref.h"
@@ -78,14 +79,19 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
static void
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
{
- if (c->active && c->ctx != NULL) {
- g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
- redisAsyncFree (c->ctx);
+ if (c->active) {
+ if (c->ctx) {
+ g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
+ redisAsyncFree (c->ctx);
+ }
+
g_queue_unlink (c->elt->active, c->entry);
}
+ else {
+ if (event_get_base (&c->timeout)) {
+ event_del (&c->timeout);
+ }
- if (!c->active && event_get_base (&c->timeout)) {
- event_del (&c->timeout);
g_queue_unlink (c->elt->inactive, c->entry);
}
@@ -162,20 +168,30 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
ctx = redisAsyncConnect (ip, port);
if (ctx) {
- conn = g_slice_alloc0 (sizeof (conn));
- conn->entry = g_list_prepend (NULL, conn);
- conn->elt = elt;
- conn->active = TRUE;
- g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
- g_queue_push_head_link (elt->active, conn->entry);
- conn->ctx = ctx;
- REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
-
- if (password) {
- redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+
+ if (ctx->err != REDIS_OK) {
+ redisAsyncFree (ctx);
+
+ return NULL;
}
- if (db) {
- redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ else {
+ conn = g_slice_alloc0 (sizeof (*conn));
+ conn->entry = g_list_prepend (NULL, conn);
+ conn->elt = elt;
+ conn->active = TRUE;
+ g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+ g_queue_push_head_link (elt->active, conn->entry);
+ conn->ctx = ctx;
+ REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+
+ redisLibeventAttach (ctx, pool->ev_base);
+
+ if (password) {
+ redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+ }
+ if (db) {
+ redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ }
}
return conn;
@@ -252,7 +268,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
conn->active = TRUE;
g_queue_push_tail_link (elt->active, conn_entry);
- REF_RETAIN (conn);
}
else {
@@ -260,8 +275,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
conn = rspamd_redis_pool_new_connection (pool, elt,
db, password, ip, port);
}
-
- return conn->ctx;
}
else {
elt = rspamd_redis_pool_new_elt (pool);
@@ -272,6 +285,8 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
db, password, ip, port);
}
+ REF_RETAIN (conn);
+
return conn->ctx;
}
@@ -289,7 +304,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
if (conn != NULL) {
REF_RELEASE (conn);
- if (is_fatal) {
+ if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) {
/* We need to terminate connection forcefully */
REF_RELEASE (conn);
}
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index c35d9614b..b7fb8f6c4 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -17,10 +17,8 @@
#include "dns.h"
#include "utlist.h"
-#ifdef WITH_HIREDIS
-#include "hiredis.h"
-#include "adapters/libevent.h"
-#endif
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"
#define REDIS_DEFAULT_TIMEOUT 1.0
@@ -155,6 +153,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
struct lua_redis_userdata *ud;
struct lua_redis_specific_userdata *cur, *tmp;
gboolean is_connected = FALSE;
+ struct redisAsyncContext *ac;
if (ctx->async) {
msg_debug ("desctructing %p", ctx);
@@ -168,7 +167,10 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
* still be alive here!
*/
ctx->ref.refcount = 100500;
- redisAsyncFree (ud->ctx);
+ ac = ud->ctx;
+ ud->ctx = NULL;
+ rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool,
+ ac, FALSE);
ctx->ref.refcount = 0;
is_connected = TRUE;
}
@@ -384,8 +386,9 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
ac = ud->ctx;
ud->ctx = NULL;
- if (ac != NULL) {
- redisAsyncFree (ac);
+ if (ac) {
+ rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool,
+ ac, FALSE);
}
}
@@ -413,7 +416,8 @@ lua_redis_timeout (int fd, short what, gpointer u)
* This will call all callbacks pending so the entire context
* will be destructed
*/
- redisAsyncFree (ac);
+ rspamd_redis_pool_release_connection (sp_ud->c->task->cfg->redis_pool,
+ ac, TRUE);
}
REDIS_RELEASE (ctx);
}
@@ -464,22 +468,6 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
*nargs = top;
}
-static void
-lua_redis_connect_cb (const struct redisAsyncContext *c, int status)
-{
- /*
- * Workaround to prevent double close:
- * https://groups.google.com/forum/#!topic/redis-db/mQm46XkIPOY
- */
-#if defined(HIREDIS_MAJOR) && HIREDIS_MAJOR == 0 && HIREDIS_MINOR <= 11
- struct redisAsyncContext *nc = (struct redisAsyncContext *)c;
- if (status == REDIS_ERR) {
- nc->c.fd = -1;
- }
-#endif
-}
-
-
/***
* @function rspamd_redis.make_request({params})
@@ -662,14 +650,15 @@ lua_redis_make_request (lua_State *L)
if (ret) {
ud->terminated = 0;
ud->timeout = timeout;
- ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+ ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool,
+ dbname, password,
+ rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));
if (ud->ctx == NULL || ud->ctx->err) {
if (ud->ctx) {
msg_err_task_check ("cannot connect to redis: %s",
ud->ctx->errstr);
- redisAsyncFree (ud->ctx);
ud->ctx = NULL;
}
else {
@@ -683,16 +672,6 @@ lua_redis_make_request (lua_State *L)
return 2;
}
- redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb);
- redisLibeventAttach (ud->ctx, ud->task->ev_base);
-
- if (password) {
- redisAsyncCommand (ud->ctx, NULL, NULL, "AUTH %s", password);
- }
- if (dbname) {
- redisAsyncCommand (ud->ctx, NULL, NULL, "SELECT %s", dbname);
- }
-
ret = redisAsyncCommandArgv (ud->ctx,
lua_redis_callback,
sp_ud,
@@ -719,7 +698,8 @@ lua_redis_make_request (lua_State *L)
}
else {
msg_info_task_check ("call to redis failed: %s", ud->ctx->errstr);
- redisAsyncFree (ud->ctx);
+ rspamd_redis_pool_release_connection (task->cfg->redis_pool,
+ ud->ctx, FALSE);
ud->ctx = NULL;
REDIS_RELEASE (ctx);
ret = FALSE;
@@ -936,7 +916,9 @@ lua_redis_connect (lua_State *L)
if (ret && ctx) {
ud->terminated = 0;
ud->timeout = timeout;
- ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+ ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool,
+ NULL, NULL,
+ rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));
if (ud->ctx == NULL || ud->ctx->err) {
@@ -948,8 +930,6 @@ lua_redis_connect (lua_State *L)
return 1;
}
- redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb);
- redisLibeventAttach (ud->ctx, ud->task->ev_base);
pctx = lua_newuserdata (L, sizeof (ctx));
*pctx = ctx;
rspamd_lua_setclass (L, "rspamd{redis}", -1);