diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-12-25 13:04:49 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-12-25 13:04:49 +0000 |
commit | ead250d5804be03dbd6d7ade87450ddf3ef3a259 (patch) | |
tree | de12a7daf490407f3372d5b09d4d890af54a5c28 | |
parent | 147af4de56ab222847d883ea53f738cd14b65577 (diff) | |
download | rspamd-ead250d5804be03dbd6d7ade87450ddf3ef3a259.tar.gz rspamd-ead250d5804be03dbd6d7ade87450ddf3ef3a259.zip |
[Feature] Send quit command to Redis
-rw-r--r-- | src/libserver/redis_pool.c | 81 |
1 files changed, 62 insertions, 19 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c index 0d310d968..732d3b5bf 100644 --- a/src/libserver/redis_pool.c +++ b/src/libserver/redis_pool.c @@ -26,12 +26,18 @@ struct rspamd_redis_pool_elt; +enum rspamd_redis_pool_connection_state { + RSPAMD_REDIS_POOL_CONN_INACTIVE = 0, + RSPAMD_REDIS_POOL_CONN_ACTIVE, + RSPAMD_REDIS_POOL_CONN_FINALISING +}; + struct rspamd_redis_pool_connection { struct redisAsyncContext *ctx; struct rspamd_redis_pool_elt *elt; GList *entry; ev_timer timeout; - gboolean active; + enum rspamd_redis_pool_connection_state state; gchar tag[MEMPOOL_UID_LEN]; ref_entry_t ref; }; @@ -99,7 +105,7 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password, static void rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) { - if (conn->active) { + if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) { msg_debug_rpool ("active connection removed"); if (conn->ctx) { @@ -126,7 +132,7 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) redisAsyncContext *ac = conn->ctx; /* To prevent on_disconnect here */ - conn->active = TRUE; + conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING; g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac); conn->ctx = NULL; ac->onDisconnect = NULL; @@ -171,15 +177,51 @@ rspamd_redis_pool_elt_dtor (gpointer p) } static void +rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct rspamd_redis_pool_connection *conn = + (struct rspamd_redis_pool_connection *)priv; + + msg_debug_rpool ("quit command reply for the connection %p, refcount: %d", + conn->ctx, conn->ref.refcount); + /* + * We now schedule timer to enforce removal after callback is executed + * to prevent races. But actually, the connection will likely be freed by + * hiredis itself. It is quite brain damaged logic but it is better to + * deal with it... Dtor will definitely stop this timer. + */ + conn->timeout.repeat = 0.1; + ev_timer_again (conn->elt->pool->event_loop, &conn->timeout); +} + +static void rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents) { struct rspamd_redis_pool_connection *conn = (struct rspamd_redis_pool_connection *)w->data; - g_assert (!conn->active); - msg_debug_rpool ("scheduled removal of connection %p, refcount: %d", - conn->ctx, conn->ref.refcount); - REF_RELEASE (conn); + g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE); + + if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) { + msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d", + conn->ctx, conn->ref.refcount); + redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT"); + conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING; + ev_timer_again (EV_A_ w); + + /* Prevent reusing */ + if (conn->entry) { + g_queue_unlink (conn->elt->inactive, conn->entry); + conn->entry = NULL; + } + } + else { + /* Finalising by timeout */ + msg_debug_rpool ("final removal of connection %p, refcount: %d", + conn->ctx, conn->ref.refcount); + REF_RELEASE (conn); + } + } static void @@ -205,7 +247,7 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn) conn->timeout.data = conn; ev_timer_init (&conn->timeout, rspamd_redis_conn_timeout, - real_timeout, 0.0); + real_timeout, real_timeout / 2.0); ev_timer_start (conn->elt->pool->event_loop, &conn->timeout); } @@ -219,8 +261,7 @@ rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status, * Here, we know that redis itself will free this connection * so, we need to do something very clever about it */ - - if (!conn->active) { + if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) { /* Do nothing for active connections as it is already handled somewhere */ if (conn->ctx) { msg_debug_rpool ("inactive connection terminated: %s, refs: %d", @@ -261,7 +302,7 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, conn = g_malloc0 (sizeof (*conn)); conn->entry = g_list_prepend (NULL, conn); conn->elt = elt; - conn->active = TRUE; + conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE; g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); g_queue_push_head_link (elt->active, conn->entry); @@ -275,10 +316,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, conn); if (password) { - redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); + redisAsyncCommand (ctx, NULL, NULL, + "AUTH %s", password); } if (db) { - redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); + redisAsyncCommand (ctx, NULL, NULL, + "SELECT %s", db); } } @@ -307,8 +350,8 @@ rspamd_redis_pool_init (void) struct rspamd_redis_pool *pool; pool = g_malloc0 (sizeof (*pool)); - pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL, - rspamd_redis_pool_elt_dtor); + pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, + NULL, rspamd_redis_pool_elt_dtor); pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal); return pool; @@ -349,11 +392,11 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, if (g_queue_get_length (elt->inactive) > 0) { conn_entry = g_queue_pop_head_link (elt->inactive); conn = conn_entry->data; - g_assert (!conn->active); + g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE); if (conn->ctx->err == REDIS_OK) { ev_timer_stop (elt->pool->event_loop, &conn->timeout); - conn->active = TRUE; + conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE; g_queue_push_tail_link (elt->active, conn_entry); msg_debug_rpool ("reused existing connection to %s:%d: %p", ip, port, conn->ctx); @@ -404,7 +447,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, conn = g_hash_table_lookup (pool->elts_by_ctx, ctx); if (conn != NULL) { - g_assert (conn->active); + g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE); if (ctx->err != REDIS_OK) { /* We need to terminate connection forcefully */ @@ -418,7 +461,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, /* Just move it to the inactive queue */ g_queue_unlink (conn->elt->active, conn->entry); g_queue_push_head_link (conn->elt->inactive, conn->entry); - conn->active = FALSE; + conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE; rspamd_redis_pool_schedule_timeout (conn); msg_debug_rpool ("mark connection %p inactive", conn->ctx); } |