]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Backport redis pool fixes from master
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 12 Jan 2017 15:05:35 +0000 (15:05 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 12 Jan 2017 15:05:35 +0000 (15:05 +0000)
src/libserver/redis_pool.c

index 578115571edbd6f9a5fab59a0a7b91d814d3bfdd..4e1a788ad453f480b497d8e7005619a1095ff391 100644 (file)
@@ -102,11 +102,19 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
                msg_debug_rpool ("active connection removed");
 
                if (conn->ctx) {
-                       g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx);
-                       redisAsyncFree (conn->ctx);
+                       if (!(conn->ctx->c.flags & REDIS_FREEING)) {
+                               redisAsyncContext *ac = conn->ctx;
+
+                               conn->ctx = NULL;
+                               g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx);
+                               ac->onDisconnect = NULL;
+                               redisAsyncFree (ac);
+                       }
                }
 
-               g_queue_unlink (conn->elt->active, conn->entry);
+               if (conn->entry) {
+                       g_queue_unlink (conn->elt->active, conn->entry);
+               }
        }
        else {
                msg_debug_rpool ("inactive connection removed");
@@ -126,11 +134,16 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
                        redisAsyncFree (ac);
                }
 
-               g_queue_unlink (conn->elt->inactive, conn->entry);
+               if (conn->entry) {
+                       g_queue_unlink (conn->elt->inactive, conn->entry);
+               }
        }
 
 
-       g_list_free (conn->entry);
+       if (conn->entry) {
+               g_list_free (conn->entry);
+       }
+
        g_slice_free1 (sizeof (*conn), conn);
 }
 
@@ -143,11 +156,13 @@ rspamd_redis_pool_elt_dtor (gpointer p)
 
        for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
                c = cur->data;
+               c->entry = NULL;
                REF_RELEASE (c);
        }
 
        for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
                c = cur->data;
+               c->entry = NULL;
                REF_RELEASE (c);
        }
 
@@ -225,7 +240,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
        struct rspamd_redis_pool_connection *conn;
        struct redisAsyncContext *ctx;
 
-       ctx = redisAsyncConnect (ip, port);
+       if (*ip == '/' || *ip == '.') {
+               ctx = redisAsyncConnectUnix (ip);
+       }
+       else {
+               ctx = redisAsyncConnect (ip, port);
+       }
 
        if (ctx) {
 
@@ -327,16 +347,18 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
                        conn_entry = g_queue_pop_head_link (elt->inactive);
                        conn = conn_entry->data;
 
-                       if (event_get_base (&conn->timeout)) {
-                               event_del (&conn->timeout);
-                       }
-
                        if (conn->ctx->err == REDIS_OK) {
+                               if (event_get_base (&conn->timeout)) {
+                                       event_del (&conn->timeout);
+                               }
+
                                conn->active = TRUE;
                                g_queue_push_tail_link (elt->active, conn_entry);
                                msg_debug_rpool ("reused existing connection to %s:%d", ip, port);
                        }
                        else {
+                               g_list_free (conn->entry);
+                               conn->entry = NULL;
                                REF_RELEASE (conn);
                                conn = rspamd_redis_pool_new_connection (pool, elt,
                                                db, password, ip, port);
@@ -388,12 +410,19 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
                        REF_RELEASE (conn);
                }
                else {
-                       /* Just move it to the inactive queue */
-                       g_queue_unlink (conn->elt->active, conn->entry);
-                       g_queue_push_head_link (conn->elt->inactive, conn->entry);
-                       conn->active = FALSE;
-                       rspamd_redis_pool_schedule_timeout (conn);
-                       msg_debug_rpool ("mark connection inactive");
+                       /* Ensure that there are no callbacks attached to this conn */
+                       if (ctx->replies.head == NULL) {
+                               /* Just move it to the inactive queue */
+                               g_queue_unlink (conn->elt->active, conn->entry);
+                               g_queue_push_head_link (conn->elt->inactive, conn->entry);
+                               conn->active = FALSE;
+                               rspamd_redis_pool_schedule_timeout (conn);
+                               msg_debug_rpool ("mark connection inactive");
+                       }
+                       else {
+                               msg_debug_rpool ("closed connection due to callbacks leftover");
+                               REF_RELEASE (conn);
+                       }
                }
 
                REF_RELEASE (conn);