|
|
@@ -33,6 +33,7 @@ struct rspamd_redis_pool_connection { |
|
|
|
GList *entry; |
|
|
|
struct event timeout; |
|
|
|
gboolean active; |
|
|
|
gchar tag[MEMPOOL_UID_LEN]; |
|
|
|
ref_entry_t ref; |
|
|
|
}; |
|
|
|
|
|
|
@@ -55,6 +56,23 @@ struct rspamd_redis_pool { |
|
|
|
static const gdouble default_timeout = 60.0; |
|
|
|
static const guint default_max_conns = 100; |
|
|
|
|
|
|
|
#define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ |
|
|
|
"redis_pool", conn->tag, \ |
|
|
|
G_STRFUNC, \ |
|
|
|
__VA_ARGS__) |
|
|
|
#define msg_warn_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ |
|
|
|
"redis_pool", conn->tag, \ |
|
|
|
G_STRFUNC, \ |
|
|
|
__VA_ARGS__) |
|
|
|
#define msg_info_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ |
|
|
|
"redis_pool", conn->tag, \ |
|
|
|
G_STRFUNC, \ |
|
|
|
__VA_ARGS__) |
|
|
|
#define msg_debug_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ |
|
|
|
"redis_pool", conn->tag, \ |
|
|
|
G_STRFUNC, \ |
|
|
|
__VA_ARGS__) |
|
|
|
|
|
|
|
static inline guint64 |
|
|
|
rspamd_redis_pool_get_key (const gchar *db, const gchar *password, |
|
|
|
const char *ip, int port) |
|
|
@@ -78,27 +96,31 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password, |
|
|
|
|
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c) |
|
|
|
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) |
|
|
|
{ |
|
|
|
if (c->active) { |
|
|
|
if (c->ctx) { |
|
|
|
g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx); |
|
|
|
redisAsyncFree (c->ctx); |
|
|
|
if (conn->active) { |
|
|
|
msg_debug_rpool ("active connection removed"); |
|
|
|
|
|
|
|
if (conn->ctx) { |
|
|
|
g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx); |
|
|
|
redisAsyncFree (conn->ctx); |
|
|
|
} |
|
|
|
|
|
|
|
g_queue_unlink (c->elt->active, c->entry); |
|
|
|
g_queue_unlink (conn->elt->active, conn->entry); |
|
|
|
} |
|
|
|
else { |
|
|
|
if (event_get_base (&c->timeout)) { |
|
|
|
event_del (&c->timeout); |
|
|
|
msg_debug_rpool ("inactive connection removed"); |
|
|
|
|
|
|
|
if (event_get_base (&conn->timeout)) { |
|
|
|
event_del (&conn->timeout); |
|
|
|
} |
|
|
|
|
|
|
|
g_queue_unlink (c->elt->inactive, c->entry); |
|
|
|
g_queue_unlink (conn->elt->inactive, conn->entry); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
g_list_free (c->entry); |
|
|
|
g_slice_free1 (sizeof (*c), c); |
|
|
|
g_list_free (conn->entry); |
|
|
|
g_slice_free1 (sizeof (*conn), conn); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
@@ -128,6 +150,7 @@ rspamd_redis_conn_timeout (gint fd, short what, gpointer p) |
|
|
|
{ |
|
|
|
struct rspamd_redis_pool_connection *conn = p; |
|
|
|
|
|
|
|
msg_debug_rpool ("scheduled removal of connection"); |
|
|
|
REF_RELEASE (conn); |
|
|
|
} |
|
|
|
|
|
|
@@ -149,6 +172,8 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn) |
|
|
|
real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0); |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug_rpool ("scheduled connection cleanup in %.1f seconds", |
|
|
|
real_timeout); |
|
|
|
double_to_tv (real_timeout, &tv); |
|
|
|
event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn); |
|
|
|
event_base_set (conn->elt->pool->ev_base, &conn->timeout); |
|
|
@@ -180,10 +205,13 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, |
|
|
|
conn->entry = g_list_prepend (NULL, conn); |
|
|
|
conn->elt = elt; |
|
|
|
conn->active = TRUE; |
|
|
|
|
|
|
|
g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); |
|
|
|
g_queue_push_head_link (elt->active, conn->entry); |
|
|
|
conn->ctx = ctx; |
|
|
|
rspamd_random_hex (conn->tag, sizeof (conn->tag)); |
|
|
|
REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor); |
|
|
|
msg_debug_rpool ("created new connection to %s:%d", ip, port); |
|
|
|
|
|
|
|
redisLibeventAttach (ctx, pool->ev_base); |
|
|
|
|
|
|
@@ -269,6 +297,7 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, |
|
|
|
|
|
|
|
conn->active = TRUE; |
|
|
|
g_queue_push_tail_link (elt->active, conn_entry); |
|
|
|
msg_debug_rpool ("reused existing connection to %s:%d", ip, port); |
|
|
|
|
|
|
|
} |
|
|
|
else { |
|
|
@@ -307,6 +336,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, |
|
|
|
|
|
|
|
if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) { |
|
|
|
/* We need to terminate connection forcefully */ |
|
|
|
msg_debug_rpool ("closed connection forcefully"); |
|
|
|
REF_RELEASE (conn); |
|
|
|
} |
|
|
|
else { |
|
|
@@ -315,6 +345,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, |
|
|
|
g_queue_push_head_link (conn->elt->inactive, conn->entry); |
|
|
|
conn->active = FALSE; |
|
|
|
rspamd_redis_pool_schedule_timeout (conn); |
|
|
|
msg_debug_rpool ("mark connection inactive"); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |