|
|
@@ -26,12 +26,18 @@ |
|
|
|
|
|
|
|
struct rspamd_redis_pool_elt; |
|
|
|
|
|
|
|
enum rspamd_redis_pool_connection_state { |
|
|
|
RSPAMD_REDIS_POOL_CONN_INACTIVE = 0, |
|
|
|
RSPAMD_REDIS_POOL_CONN_ACTIVE, |
|
|
|
RSPAMD_REDIS_POOL_CONN_FINALISING |
|
|
|
}; |
|
|
|
|
|
|
|
struct rspamd_redis_pool_connection { |
|
|
|
struct redisAsyncContext *ctx; |
|
|
|
struct rspamd_redis_pool_elt *elt; |
|
|
|
GList *entry; |
|
|
|
ev_timer timeout; |
|
|
|
gboolean active; |
|
|
|
enum rspamd_redis_pool_connection_state state; |
|
|
|
gchar tag[MEMPOOL_UID_LEN]; |
|
|
|
ref_entry_t ref; |
|
|
|
}; |
|
|
@@ -99,7 +105,7 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password, |
|
|
|
static void |
|
|
|
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) |
|
|
|
{ |
|
|
|
if (conn->active) { |
|
|
|
if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) { |
|
|
|
msg_debug_rpool ("active connection removed"); |
|
|
|
|
|
|
|
if (conn->ctx) { |
|
|
@@ -126,7 +132,7 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) |
|
|
|
redisAsyncContext *ac = conn->ctx; |
|
|
|
|
|
|
|
/* To prevent on_disconnect here */ |
|
|
|
conn->active = TRUE; |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING; |
|
|
|
g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac); |
|
|
|
conn->ctx = NULL; |
|
|
|
ac->onDisconnect = NULL; |
|
|
@@ -170,16 +176,52 @@ rspamd_redis_pool_elt_dtor (gpointer p) |
|
|
|
g_free (elt); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
{ |
|
|
|
struct rspamd_redis_pool_connection *conn = |
|
|
|
(struct rspamd_redis_pool_connection *)priv; |
|
|
|
|
|
|
|
msg_debug_rpool ("quit command reply for the connection %p, refcount: %d", |
|
|
|
conn->ctx, conn->ref.refcount); |
|
|
|
/* |
|
|
|
* We now schedule timer to enforce removal after callback is executed |
|
|
|
* to prevent races. But actually, the connection will likely be freed by |
|
|
|
* hiredis itself. It is quite brain damaged logic but it is better to |
|
|
|
* deal with it... Dtor will definitely stop this timer. |
|
|
|
*/ |
|
|
|
conn->timeout.repeat = 0.1; |
|
|
|
ev_timer_again (conn->elt->pool->event_loop, &conn->timeout); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents) |
|
|
|
{ |
|
|
|
struct rspamd_redis_pool_connection *conn = |
|
|
|
(struct rspamd_redis_pool_connection *)w->data; |
|
|
|
|
|
|
|
g_assert (!conn->active); |
|
|
|
msg_debug_rpool ("scheduled removal of connection %p, refcount: %d", |
|
|
|
conn->ctx, conn->ref.refcount); |
|
|
|
REF_RELEASE (conn); |
|
|
|
g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE); |
|
|
|
|
|
|
|
if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) { |
|
|
|
msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d", |
|
|
|
conn->ctx, conn->ref.refcount); |
|
|
|
redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT"); |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING; |
|
|
|
ev_timer_again (EV_A_ w); |
|
|
|
|
|
|
|
/* Prevent reusing */ |
|
|
|
if (conn->entry) { |
|
|
|
g_queue_unlink (conn->elt->inactive, conn->entry); |
|
|
|
conn->entry = NULL; |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
/* Finalising by timeout */ |
|
|
|
msg_debug_rpool ("final removal of connection %p, refcount: %d", |
|
|
|
conn->ctx, conn->ref.refcount); |
|
|
|
REF_RELEASE (conn); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
@@ -205,7 +247,7 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn) |
|
|
|
conn->timeout.data = conn; |
|
|
|
ev_timer_init (&conn->timeout, |
|
|
|
rspamd_redis_conn_timeout, |
|
|
|
real_timeout, 0.0); |
|
|
|
real_timeout, real_timeout / 2.0); |
|
|
|
ev_timer_start (conn->elt->pool->event_loop, &conn->timeout); |
|
|
|
} |
|
|
|
|
|
|
@@ -219,8 +261,7 @@ rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status, |
|
|
|
* Here, we know that redis itself will free this connection |
|
|
|
* so, we need to do something very clever about it |
|
|
|
*/ |
|
|
|
|
|
|
|
if (!conn->active) { |
|
|
|
if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) { |
|
|
|
/* Do nothing for active connections as it is already handled somewhere */ |
|
|
|
if (conn->ctx) { |
|
|
|
msg_debug_rpool ("inactive connection terminated: %s, refs: %d", |
|
|
@@ -261,7 +302,7 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, |
|
|
|
conn = g_malloc0 (sizeof (*conn)); |
|
|
|
conn->entry = g_list_prepend (NULL, conn); |
|
|
|
conn->elt = elt; |
|
|
|
conn->active = TRUE; |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE; |
|
|
|
|
|
|
|
g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); |
|
|
|
g_queue_push_head_link (elt->active, conn->entry); |
|
|
@@ -275,10 +316,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, |
|
|
|
conn); |
|
|
|
|
|
|
|
if (password) { |
|
|
|
redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); |
|
|
|
redisAsyncCommand (ctx, NULL, NULL, |
|
|
|
"AUTH %s", password); |
|
|
|
} |
|
|
|
if (db) { |
|
|
|
redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); |
|
|
|
redisAsyncCommand (ctx, NULL, NULL, |
|
|
|
"SELECT %s", db); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -307,8 +350,8 @@ rspamd_redis_pool_init (void) |
|
|
|
struct rspamd_redis_pool *pool; |
|
|
|
|
|
|
|
pool = g_malloc0 (sizeof (*pool)); |
|
|
|
pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL, |
|
|
|
rspamd_redis_pool_elt_dtor); |
|
|
|
pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, |
|
|
|
NULL, rspamd_redis_pool_elt_dtor); |
|
|
|
pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal); |
|
|
|
|
|
|
|
return pool; |
|
|
@@ -349,11 +392,11 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, |
|
|
|
if (g_queue_get_length (elt->inactive) > 0) { |
|
|
|
conn_entry = g_queue_pop_head_link (elt->inactive); |
|
|
|
conn = conn_entry->data; |
|
|
|
g_assert (!conn->active); |
|
|
|
g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE); |
|
|
|
|
|
|
|
if (conn->ctx->err == REDIS_OK) { |
|
|
|
ev_timer_stop (elt->pool->event_loop, &conn->timeout); |
|
|
|
conn->active = TRUE; |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE; |
|
|
|
g_queue_push_tail_link (elt->active, conn_entry); |
|
|
|
msg_debug_rpool ("reused existing connection to %s:%d: %p", |
|
|
|
ip, port, conn->ctx); |
|
|
@@ -404,7 +447,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, |
|
|
|
|
|
|
|
conn = g_hash_table_lookup (pool->elts_by_ctx, ctx); |
|
|
|
if (conn != NULL) { |
|
|
|
g_assert (conn->active); |
|
|
|
g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE); |
|
|
|
|
|
|
|
if (ctx->err != REDIS_OK) { |
|
|
|
/* We need to terminate connection forcefully */ |
|
|
@@ -418,7 +461,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, |
|
|
|
/* Just move it to the inactive queue */ |
|
|
|
g_queue_unlink (conn->elt->active, conn->entry); |
|
|
|
g_queue_push_head_link (conn->elt->inactive, conn->entry); |
|
|
|
conn->active = FALSE; |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE; |
|
|
|
rspamd_redis_pool_schedule_timeout (conn); |
|
|
|
msg_debug_rpool ("mark connection %p inactive", conn->ctx); |
|
|
|
} |