]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Send quit command to Redis
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Dec 2019 13:04:49 +0000 (13:04 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Dec 2019 13:04:49 +0000 (13:04 +0000)
src/libserver/redis_pool.c

index 0d310d968329ffd2bd8183f8449a9fb1a60465f3..732d3b5bf375d00fdb99e586ddfc2ea525e9cf3c 100644 (file)
 
 struct rspamd_redis_pool_elt;
 
+enum rspamd_redis_pool_connection_state {
+       RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
+       RSPAMD_REDIS_POOL_CONN_ACTIVE,
+       RSPAMD_REDIS_POOL_CONN_FINALISING
+};
+
 struct rspamd_redis_pool_connection {
        struct redisAsyncContext *ctx;
        struct rspamd_redis_pool_elt *elt;
        GList *entry;
        ev_timer timeout;
-       gboolean active;
+       enum rspamd_redis_pool_connection_state state;
        gchar tag[MEMPOOL_UID_LEN];
        ref_entry_t ref;
 };
@@ -99,7 +105,7 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
 static void
 rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
 {
-       if (conn->active) {
+       if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) {
                msg_debug_rpool ("active connection removed");
 
                if (conn->ctx) {
@@ -126,7 +132,7 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
                        redisAsyncContext *ac = conn->ctx;
 
                        /* To prevent on_disconnect here */
-                       conn->active = TRUE;
+                       conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
                        g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
                        conn->ctx = NULL;
                        ac->onDisconnect = NULL;
@@ -170,16 +176,52 @@ rspamd_redis_pool_elt_dtor (gpointer p)
        g_free (elt);
 }
 
+static void
+rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_pool_connection *conn =
+                       (struct rspamd_redis_pool_connection *)priv;
+
+       msg_debug_rpool ("quit command reply for the connection %p, refcount: %d",
+                       conn->ctx, conn->ref.refcount);
+       /*
+        * We now schedule timer to enforce removal after callback is executed
+        * to prevent races. But actually, the connection will likely be freed by
+        * hiredis itself. It is quite brain damaged logic but it is better to
+        * deal with it... Dtor will definitely stop this timer.
+        */
+       conn->timeout.repeat = 0.1;
+       ev_timer_again (conn->elt->pool->event_loop, &conn->timeout);
+}
+
 static void
 rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents)
 {
        struct rspamd_redis_pool_connection *conn =
                        (struct rspamd_redis_pool_connection *)w->data;
 
-       g_assert (!conn->active);
-       msg_debug_rpool ("scheduled removal of connection %p, refcount: %d",
-                       conn->ctx, conn->ref.refcount);
-       REF_RELEASE (conn);
+       g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
+
+       if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) {
+               msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d",
+                               conn->ctx, conn->ref.refcount);
+               redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT");
+               conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
+               ev_timer_again (EV_A_ w);
+
+               /* Prevent reusing */
+               if (conn->entry) {
+                       g_queue_unlink (conn->elt->inactive, conn->entry);
+                       conn->entry = NULL;
+               }
+       }
+       else {
+               /* Finalising by timeout */
+               msg_debug_rpool ("final removal of connection %p, refcount: %d",
+                               conn->ctx, conn->ref.refcount);
+               REF_RELEASE (conn);
+       }
+
 }
 
 static void
@@ -205,7 +247,7 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
        conn->timeout.data = conn;
        ev_timer_init (&conn->timeout,
                        rspamd_redis_conn_timeout,
-                       real_timeout, 0.0);
+                       real_timeout, real_timeout / 2.0);
        ev_timer_start (conn->elt->pool->event_loop, &conn->timeout);
 }
 
@@ -219,8 +261,7 @@ rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status,
         * Here, we know that redis itself will free this connection
         * so, we need to do something very clever about it
         */
-
-       if (!conn->active) {
+       if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) {
                /* Do nothing for active connections as it is already handled somewhere */
                if (conn->ctx) {
                        msg_debug_rpool ("inactive connection terminated: %s, refs: %d",
@@ -261,7 +302,7 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
                        conn = g_malloc0 (sizeof (*conn));
                        conn->entry = g_list_prepend (NULL, conn);
                        conn->elt = elt;
-                       conn->active = TRUE;
+                       conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
 
                        g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
                        g_queue_push_head_link (elt->active, conn->entry);
@@ -275,10 +316,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
                                        conn);
 
                        if (password) {
-                               redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+                               redisAsyncCommand (ctx, NULL, NULL,
+                                               "AUTH %s", password);
                        }
                        if (db) {
-                               redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+                               redisAsyncCommand (ctx, NULL, NULL,
+                                               "SELECT %s", db);
                        }
                }
 
@@ -307,8 +350,8 @@ rspamd_redis_pool_init (void)
        struct rspamd_redis_pool *pool;
 
        pool = g_malloc0 (sizeof (*pool));
-       pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
-                       rspamd_redis_pool_elt_dtor);
+       pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal,
+                       NULL, rspamd_redis_pool_elt_dtor);
        pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
 
        return pool;
@@ -349,11 +392,11 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
                if (g_queue_get_length (elt->inactive) > 0) {
                        conn_entry = g_queue_pop_head_link (elt->inactive);
                        conn = conn_entry->data;
-                       g_assert (!conn->active);
+                       g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
                        if (conn->ctx->err == REDIS_OK) {
                                ev_timer_stop (elt->pool->event_loop, &conn->timeout);
-                               conn->active = TRUE;
+                               conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
                                g_queue_push_tail_link (elt->active, conn_entry);
                                msg_debug_rpool ("reused existing connection to %s:%d: %p",
                                                ip, port, conn->ctx);
@@ -404,7 +447,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
 
        conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
        if (conn != NULL) {
-               g_assert (conn->active);
+               g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
                if (ctx->err != REDIS_OK) {
                        /* We need to terminate connection forcefully */
@@ -418,7 +461,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
                                        /* Just move it to the inactive queue */
                                        g_queue_unlink (conn->elt->active, conn->entry);
                                        g_queue_push_head_link (conn->elt->inactive, conn->entry);
-                                       conn->active = FALSE;
+                                       conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
                                        rspamd_redis_pool_schedule_timeout (conn);
                                        msg_debug_rpool ("mark connection %p inactive", conn->ctx);
                                }