/*- * Copyright 2016 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include <event.h> #include "redis_pool.h" #include "cfg_file.h" #include "contrib/hiredis/hiredis.h" #include "contrib/hiredis/async.h" #include "contrib/hiredis/adapters/libevent.h" #include "cryptobox.h" #include "ref.h" #include "logger.h" struct rspamd_redis_pool_elt; struct rspamd_redis_pool_connection { struct redisAsyncContext *ctx; struct rspamd_redis_pool_elt *elt; GList *entry; struct event timeout; gboolean active; gchar tag[MEMPOOL_UID_LEN]; ref_entry_t ref; }; struct rspamd_redis_pool_elt { struct rspamd_redis_pool *pool; guint64 key; GQueue *active; GQueue *inactive; }; struct rspamd_redis_pool { struct event_base *ev_base; struct rspamd_config *cfg; GHashTable *elts_by_key; GHashTable *elts_by_ctx; gdouble timeout; guint max_conns; }; static const gdouble default_timeout = 10.0; static const guint default_max_conns = 100; #define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ "redis_pool", conn->tag, \ G_STRFUNC, \ __VA_ARGS__) #define msg_warn_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ "redis_pool", conn->tag, \ G_STRFUNC, \ __VA_ARGS__) #define msg_info_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ "redis_pool", conn->tag, \ G_STRFUNC, \ __VA_ARGS__) #define msg_debug_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ "redis_pool", conn->tag, \ G_STRFUNC, \ __VA_ARGS__) static inline guint64 rspamd_redis_pool_get_key (const gchar *db, const gchar *password, const char *ip, int port) { rspamd_cryptobox_fast_hash_state_t st; rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ()); if (db) { rspamd_cryptobox_fast_hash_update (&st, db, strlen (db)); } if (password) { rspamd_cryptobox_fast_hash_update (&st, password, strlen (password)); } rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip)); rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port)); return rspamd_cryptobox_fast_hash_final (&st); } static void rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) { if (conn->active) { msg_debug_rpool ("active connection removed"); if (conn->ctx) { if (!(conn->ctx->c.flags & REDIS_FREEING)) { redisAsyncContext *ac = conn->ctx; conn->ctx = NULL; g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx); ac->onDisconnect = NULL; redisAsyncFree (ac); } } if (conn->entry) { g_queue_unlink (conn->elt->active, conn->entry); } } else { msg_debug_rpool ("inactive connection removed"); if (event_get_base (&conn->timeout)) { event_del (&conn->timeout); } if (conn->ctx && !(conn->ctx->c.flags & REDIS_FREEING)) { redisAsyncContext *ac = conn->ctx; /* To prevent on_disconnect here */ conn->active = TRUE; g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx); conn->ctx = NULL; ac->onDisconnect = NULL; redisAsyncFree (ac); } if (conn->entry) { g_queue_unlink (conn->elt->inactive, conn->entry); } } if (conn->entry) { g_list_free (conn->entry); } g_slice_free1 (sizeof (*conn), conn); } static void rspamd_redis_pool_elt_dtor (gpointer p) { GList *cur; struct rspamd_redis_pool_elt *elt = p; struct rspamd_redis_pool_connection *c; for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) { c = cur->data; c->entry = NULL; REF_RELEASE (c); } for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) { c = cur->data; c->entry = NULL; REF_RELEASE (c); } g_queue_free (elt->active); g_queue_free (elt->inactive); g_slice_free1 (sizeof (*elt), elt); } static void rspamd_redis_conn_timeout (gint fd, short what, gpointer p) { struct rspamd_redis_pool_connection *conn = p; g_assert (!conn->active); msg_debug_rpool ("scheduled removal of connection, refcount: %d", conn->ref.refcount); REF_RELEASE (conn); } static void rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn) { struct timeval tv; gdouble real_timeout; guint active_elts; active_elts = g_queue_get_length (conn->elt->active); if (active_elts > conn->elt->pool->max_conns) { real_timeout = conn->elt->pool->timeout / 2.0; real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0); } else { real_timeout = conn->elt->pool->timeout; real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0); } msg_debug_rpool ("scheduled connection cleanup in %.1f seconds", real_timeout); double_to_tv (real_timeout, &tv); event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn); event_base_set (conn->elt->pool->ev_base, &conn->timeout); event_add (&conn->timeout, &tv); } static void rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status, void *ud) { struct rspamd_redis_pool_connection *conn = ud; /* * Here, we know that redis itself will free this connection * so, we need to do something very clever about it */ if (!conn->active) { /* Do nothing for active connections as it is already handled somewhere */ if (conn->ctx) { msg_info_rpool ("inactive connection terminated: %s, refs: %d", conn->ctx->errstr, conn->ref.refcount); } REF_RELEASE (conn); } } static struct rspamd_redis_pool_connection * rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, struct rspamd_redis_pool_elt *elt, const char *db, const char *password, const char *ip, gint port) { struct rspamd_redis_pool_connection *conn; struct redisAsyncContext *ctx; if (*ip == '/' || *ip == '.') { ctx = redisAsyncConnectUnix (ip); } else { ctx = redisAsyncConnect (ip, port); } if (ctx) { if (ctx->err != REDIS_OK) { redisAsyncFree (ctx); return NULL; } else { conn = g_slice_alloc0 (sizeof (*conn)); conn->entry = g_list_prepend (NULL, conn); conn->elt = elt; conn->active = TRUE; g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); g_queue_push_head_link (elt->active, conn->entry); conn->ctx = ctx; rspamd_random_hex (conn->tag, sizeof (conn->tag)); REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor); msg_debug_rpool ("created new connection to %s:%d", ip, port); redisLibeventAttach (ctx, pool->ev_base); redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect, conn); if (password) { redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); } if (db) { redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); } } return conn; } return NULL; } static struct rspamd_redis_pool_elt * rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool) { struct rspamd_redis_pool_elt *elt; elt = g_slice_alloc0 (sizeof (*elt)); elt->active = g_queue_new (); elt->inactive = g_queue_new (); elt->pool = pool; return elt; } struct rspamd_redis_pool * rspamd_redis_pool_init (void) { struct rspamd_redis_pool *pool; pool = g_slice_alloc0 (sizeof (*pool)); pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL, rspamd_redis_pool_elt_dtor); pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal); return pool; } void rspamd_redis_pool_config (struct rspamd_redis_pool *pool, struct rspamd_config *cfg, struct event_base *ev_base) { g_assert (pool != NULL); pool->ev_base = ev_base; pool->cfg = cfg; pool->timeout = default_timeout; pool->max_conns = default_max_conns; } struct redisAsyncContext* rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, const gchar *db, const gchar *password, const char *ip, int port) { guint64 key; struct rspamd_redis_pool_elt *elt; GList *conn_entry; struct rspamd_redis_pool_connection *conn; g_assert (pool != NULL); g_assert (pool->ev_base != NULL); g_assert (ip != NULL); key = rspamd_redis_pool_get_key (db, password, ip, port); elt = g_hash_table_lookup (pool->elts_by_key, &key); if (elt) { if (g_queue_get_length (elt->inactive) > 0) { conn_entry = g_queue_pop_head_link (elt->inactive); conn = conn_entry->data; g_assert (!conn->active); if (conn->ctx->err == REDIS_OK) { event_del (&conn->timeout); conn->active = TRUE; g_queue_push_tail_link (elt->active, conn_entry); msg_debug_rpool ("reused existing connection to %s:%d", ip, port); } else { g_list_free (conn->entry); conn->entry = NULL; REF_RELEASE (conn); conn = rspamd_redis_pool_new_connection (pool, elt, db, password, ip, port); } } else { /* Need to create connection */ conn = rspamd_redis_pool_new_connection (pool, elt, db, password, ip, port); } } else { /* Need to create a pool */ elt = rspamd_redis_pool_new_elt (pool); elt->key = key; g_hash_table_insert (pool->elts_by_key, &elt->key, elt); conn = rspamd_redis_pool_new_connection (pool, elt, db, password, ip, port); } if (!conn) { return NULL; } REF_RETAIN (conn); return conn->ctx; } void rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, struct redisAsyncContext *ctx, gboolean is_fatal) { struct rspamd_redis_pool_connection *conn; g_assert (pool != NULL); g_assert (ctx != NULL); conn = g_hash_table_lookup (pool->elts_by_ctx, ctx); if (conn != NULL) { g_assert (conn->active); if (is_fatal || ctx->err != REDIS_OK) { /* We need to terminate connection forcefully */ msg_debug_rpool ("closed connection forcefully"); REF_RELEASE (conn); } else { /* Ensure that there are no callbacks attached to this conn */ if (ctx->replies.head == NULL) { /* Just move it to the inactive queue */ g_queue_unlink (conn->elt->active, conn->entry); g_queue_push_head_link (conn->elt->inactive, conn->entry); conn->active = FALSE; rspamd_redis_pool_schedule_timeout (conn); msg_debug_rpool ("mark connection inactive"); } else { msg_debug_rpool ("closed connection due to callbacks leftover"); REF_RELEASE (conn); } } REF_RELEASE (conn); } else { g_assert_not_reached (); } } void rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool) { struct rspamd_redis_pool_elt *elt; GHashTableIter it; gpointer k, v; g_assert (pool != NULL); g_hash_table_iter_init (&it, pool->elts_by_key); while (g_hash_table_iter_next (&it, &k, &v)) { elt = v; rspamd_redis_pool_elt_dtor (elt); g_hash_table_iter_steal (&it); } g_hash_table_unref (pool->elts_by_ctx); g_hash_table_unref (pool->elts_by_key); g_slice_free1 (sizeof (*pool), pool); }