aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-12-25 13:04:49 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-12-25 13:04:49 +0000
commitead250d5804be03dbd6d7ade87450ddf3ef3a259 (patch)
treede12a7daf490407f3372d5b09d4d890af54a5c28
parent147af4de56ab222847d883ea53f738cd14b65577 (diff)
downloadrspamd-ead250d5804be03dbd6d7ade87450ddf3ef3a259.tar.gz
rspamd-ead250d5804be03dbd6d7ade87450ddf3ef3a259.zip
[Feature] Send quit command to Redis
-rw-r--r--src/libserver/redis_pool.c81
1 files changed, 62 insertions, 19 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
index 0d310d968..732d3b5bf 100644
--- a/src/libserver/redis_pool.c
+++ b/src/libserver/redis_pool.c
@@ -26,12 +26,18 @@
struct rspamd_redis_pool_elt;
+enum rspamd_redis_pool_connection_state {
+ RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
+ RSPAMD_REDIS_POOL_CONN_ACTIVE,
+ RSPAMD_REDIS_POOL_CONN_FINALISING
+};
+
struct rspamd_redis_pool_connection {
struct redisAsyncContext *ctx;
struct rspamd_redis_pool_elt *elt;
GList *entry;
ev_timer timeout;
- gboolean active;
+ enum rspamd_redis_pool_connection_state state;
gchar tag[MEMPOOL_UID_LEN];
ref_entry_t ref;
};
@@ -99,7 +105,7 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
static void
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
{
- if (conn->active) {
+ if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) {
msg_debug_rpool ("active connection removed");
if (conn->ctx) {
@@ -126,7 +132,7 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
redisAsyncContext *ac = conn->ctx;
/* To prevent on_disconnect here */
- conn->active = TRUE;
+ conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
conn->ctx = NULL;
ac->onDisconnect = NULL;
@@ -171,15 +177,51 @@ rspamd_redis_pool_elt_dtor (gpointer p)
}
static void
+rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct rspamd_redis_pool_connection *conn =
+ (struct rspamd_redis_pool_connection *)priv;
+
+ msg_debug_rpool ("quit command reply for the connection %p, refcount: %d",
+ conn->ctx, conn->ref.refcount);
+ /*
+ * We now schedule timer to enforce removal after callback is executed
+ * to prevent races. But actually, the connection will likely be freed by
+ * hiredis itself. It is quite brain damaged logic but it is better to
+ * deal with it... Dtor will definitely stop this timer.
+ */
+ conn->timeout.repeat = 0.1;
+ ev_timer_again (conn->elt->pool->event_loop, &conn->timeout);
+}
+
+static void
rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents)
{
struct rspamd_redis_pool_connection *conn =
(struct rspamd_redis_pool_connection *)w->data;
- g_assert (!conn->active);
- msg_debug_rpool ("scheduled removal of connection %p, refcount: %d",
- conn->ctx, conn->ref.refcount);
- REF_RELEASE (conn);
+ g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
+
+ if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) {
+ msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d",
+ conn->ctx, conn->ref.refcount);
+ redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT");
+ conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
+ ev_timer_again (EV_A_ w);
+
+ /* Prevent reusing */
+ if (conn->entry) {
+ g_queue_unlink (conn->elt->inactive, conn->entry);
+ conn->entry = NULL;
+ }
+ }
+ else {
+ /* Finalising by timeout */
+ msg_debug_rpool ("final removal of connection %p, refcount: %d",
+ conn->ctx, conn->ref.refcount);
+ REF_RELEASE (conn);
+ }
+
}
static void
@@ -205,7 +247,7 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
conn->timeout.data = conn;
ev_timer_init (&conn->timeout,
rspamd_redis_conn_timeout,
- real_timeout, 0.0);
+ real_timeout, real_timeout / 2.0);
ev_timer_start (conn->elt->pool->event_loop, &conn->timeout);
}
@@ -219,8 +261,7 @@ rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status,
* Here, we know that redis itself will free this connection
* so, we need to do something very clever about it
*/
-
- if (!conn->active) {
+ if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) {
/* Do nothing for active connections as it is already handled somewhere */
if (conn->ctx) {
msg_debug_rpool ("inactive connection terminated: %s, refs: %d",
@@ -261,7 +302,7 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
conn = g_malloc0 (sizeof (*conn));
conn->entry = g_list_prepend (NULL, conn);
conn->elt = elt;
- conn->active = TRUE;
+ conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
g_queue_push_head_link (elt->active, conn->entry);
@@ -275,10 +316,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
conn);
if (password) {
- redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+ redisAsyncCommand (ctx, NULL, NULL,
+ "AUTH %s", password);
}
if (db) {
- redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ redisAsyncCommand (ctx, NULL, NULL,
+ "SELECT %s", db);
}
}
@@ -307,8 +350,8 @@ rspamd_redis_pool_init (void)
struct rspamd_redis_pool *pool;
pool = g_malloc0 (sizeof (*pool));
- pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
- rspamd_redis_pool_elt_dtor);
+ pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal,
+ NULL, rspamd_redis_pool_elt_dtor);
pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
return pool;
@@ -349,11 +392,11 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
if (g_queue_get_length (elt->inactive) > 0) {
conn_entry = g_queue_pop_head_link (elt->inactive);
conn = conn_entry->data;
- g_assert (!conn->active);
+ g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
if (conn->ctx->err == REDIS_OK) {
ev_timer_stop (elt->pool->event_loop, &conn->timeout);
- conn->active = TRUE;
+ conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
g_queue_push_tail_link (elt->active, conn_entry);
msg_debug_rpool ("reused existing connection to %s:%d: %p",
ip, port, conn->ctx);
@@ -404,7 +447,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
if (conn != NULL) {
- g_assert (conn->active);
+ g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
if (ctx->err != REDIS_OK) {
/* We need to terminate connection forcefully */
@@ -418,7 +461,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
/* Just move it to the inactive queue */
g_queue_unlink (conn->elt->active, conn->entry);
g_queue_push_head_link (conn->elt->inactive, conn->entry);
- conn->active = FALSE;
+ conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
rspamd_redis_pool_schedule_timeout (conn);
msg_debug_rpool ("mark connection %p inactive", conn->ctx);
}