]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Add implementation of redis connections pool
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 30 Aug 2016 15:50:29 +0000 (16:50 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 30 Aug 2016 15:50:29 +0000 (16:50 +0100)
src/libserver/CMakeLists.txt
src/libserver/redis_pool.c [new file with mode: 0644]
src/libserver/redis_pool.h [new file with mode: 0644]

index 49e4e6d253b7c9a4c31aae6971ca5ab2c32c1fc1..295ad59c882073b0591cdfab7a2c2a92c3f0d071 100644 (file)
@@ -13,6 +13,7 @@ SET(LIBRSPAMDSERVERSRC
                                ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/proxy.c
+                               ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/spf.c
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
new file mode 100644 (file)
index 0000000..cfc2757
--- /dev/null
@@ -0,0 +1,331 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include <event.h>
+#include "redis_pool.h"
+#include "cfg_file.h"
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"
+#include "cryptobox.h"
+#include "ref.h"
+
+struct rspamd_redis_pool_elt;
+
+struct rspamd_redis_pool_connection {
+       struct redisAsyncContext *ctx;
+       struct rspamd_redis_pool_elt *elt;
+       GList *entry;
+       struct event timeout;
+       gboolean active;
+       ref_entry_t ref;
+};
+
+struct rspamd_redis_pool_elt {
+       struct rspamd_redis_pool *pool;
+       guint64 key;
+       GQueue *active;
+       GQueue *inactive;
+};
+
+struct rspamd_redis_pool {
+       struct event_base *ev_base;
+       struct rspamd_config *cfg;
+       GHashTable *elts_by_key;
+       GHashTable *elts_by_ctx;
+       gdouble timeout;
+       guint max_conns;
+};
+
+static const gdouble default_timeout = 60.0;
+static const guint default_max_conns = 100;
+
+static inline guint64
+rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
+               const char *ip, int port)
+{
+       rspamd_cryptobox_fast_hash_state_t st;
+
+       rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
+
+       if (db) {
+               rspamd_cryptobox_fast_hash_update (&st, db, strlen (db));
+       }
+       if (password) {
+               rspamd_cryptobox_fast_hash_update (&st, password, strlen (password));
+       }
+
+       rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip));
+       rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
+
+       return rspamd_cryptobox_fast_hash_final (&st);
+}
+
+
+static void
+rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
+{
+       if (c->active && c->ctx != NULL) {
+               g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
+               redisAsyncFree (c->ctx);
+               g_queue_unlink (c->elt->active, c->entry);
+       }
+
+       if (!c->active && event_get_base (&c->timeout)) {
+               event_del (&c->timeout);
+               g_queue_unlink (c->elt->inactive, c->entry);
+       }
+
+
+       g_list_free (c->entry);
+       g_slice_free1 (sizeof (*c), c);
+}
+
+static void
+rspamd_redis_pool_elt_dtor (gpointer p)
+{
+       GList *cur;
+       struct rspamd_redis_pool_elt *elt = p;
+       struct rspamd_redis_pool_connection *c;
+
+       for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
+               c = cur->data;
+               REF_RELEASE (c);
+       }
+
+       for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
+               c = cur->data;
+               REF_RELEASE (c);
+       }
+
+       g_queue_free (elt->active);
+       g_queue_free (elt->inactive);
+       g_slice_free1 (sizeof (*elt), elt);
+}
+
+static void
+rspamd_redis_conn_timeout (gint fd, short what, gpointer p)
+{
+       struct rspamd_redis_pool_connection *conn = p;
+
+       REF_RELEASE (conn);
+}
+
+static void
+rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
+{
+       struct timeval tv;
+       gdouble real_timeout;
+       guint active_elts;
+
+       active_elts = g_queue_get_length (conn->elt->active);
+
+       if (active_elts > conn->elt->pool->max_conns) {
+               real_timeout = conn->elt->pool->timeout / 2.0;
+               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
+       }
+       else {
+               real_timeout = conn->elt->pool->timeout;
+               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
+       }
+
+       double_to_tv (real_timeout, &tv);
+       event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn);
+       event_base_set (conn->elt->pool->ev_base, &conn->timeout);
+       event_add (&conn->timeout, &tv);
+}
+
+static struct rspamd_redis_pool_connection *
+rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
+               struct rspamd_redis_pool_elt *elt,
+               const char *db,
+               const char *password,
+               const char *ip,
+               gint port)
+{
+       struct rspamd_redis_pool_connection *conn;
+       struct redisAsyncContext *ctx;
+
+       ctx = redisAsyncConnect (ip, port);
+
+       if (ctx) {
+               conn = g_slice_alloc0 (sizeof (conn));
+               conn->entry = g_list_prepend (NULL, conn);
+               conn->elt = elt;
+               conn->active = TRUE;
+               g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+               g_queue_push_head_link (elt->active, conn->entry);
+               conn->ctx = ctx;
+               REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+
+               if (password) {
+                       redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+               }
+               if (db) {
+                       redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+               }
+
+               return conn;
+       }
+
+       return NULL;
+}
+
+static struct rspamd_redis_pool_elt *
+rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool)
+{
+       struct rspamd_redis_pool_elt *elt;
+
+       elt = g_slice_alloc0 (sizeof (*elt));
+       elt->active = g_queue_new ();
+       elt->inactive = g_queue_new ();
+       elt->pool = pool;
+
+       return elt;
+}
+
+struct rspamd_redis_pool *
+rspamd_redis_pool_init (void)
+{
+       struct rspamd_redis_pool *pool;
+
+       pool = g_slice_alloc0 (sizeof (*pool));
+       pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
+                       rspamd_redis_pool_elt_dtor);
+       pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
+
+       return pool;
+}
+
+void
+rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
+               struct rspamd_config *cfg,
+               struct event_base *ev_base)
+{
+       g_assert (pool != NULL);
+
+       pool->ev_base = ev_base;
+       pool->cfg = cfg;
+       pool->timeout = default_timeout;
+       pool->max_conns = default_max_conns;
+}
+
+
+struct redisAsyncContext*
+rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
+               const gchar *db, const gchar *password,
+               const char *ip, int port)
+{
+       guint64 key;
+       struct rspamd_redis_pool_elt *elt;
+       GList *conn_entry;
+       struct rspamd_redis_pool_connection *conn;
+
+       g_assert (pool != NULL);
+       g_assert (pool->ev_base != NULL);
+       g_assert (ip != NULL);
+
+       key = rspamd_redis_pool_get_key (db, password, ip, port);
+       elt = g_hash_table_lookup (pool->elts_by_key, &key);
+
+       if (elt) {
+               if (g_queue_get_length (elt->inactive) > 0) {
+                       conn_entry = g_queue_pop_head_link (elt->inactive);
+                       conn = conn_entry->data;
+
+                       if (event_get_base (&conn->timeout)) {
+                               event_del (&conn->timeout);
+                       }
+
+                       conn->active = TRUE;
+                       g_queue_push_tail_link (elt->active, conn_entry);
+                       REF_RETAIN (conn);
+
+               }
+               else {
+                       /* Need to create connection */
+                       conn = rspamd_redis_pool_new_connection (pool, elt,
+                                       db, password, ip, port);
+               }
+
+               return conn->ctx;
+       }
+       else {
+               elt = rspamd_redis_pool_new_elt (pool);
+               elt->key = key;
+               g_hash_table_insert (pool->elts_by_key, &elt->key, elt);
+
+               conn = rspamd_redis_pool_new_connection (pool, elt,
+                               db, password, ip, port);
+       }
+
+       return conn->ctx;
+}
+
+
+void
+rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
+               struct redisAsyncContext *ctx, gboolean is_fatal)
+{
+       struct rspamd_redis_pool_connection *conn;
+
+       g_assert (pool != NULL);
+       g_assert (ctx != NULL);
+
+       conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
+       if (conn != NULL) {
+               REF_RELEASE (conn);
+
+               if (is_fatal) {
+                       /* We need to terminate connection forcefully */
+                       REF_RELEASE (conn);
+               }
+               else {
+                       /* Just move it to the inactive queue */
+                       g_queue_unlink (conn->elt->active, conn->entry);
+                       g_queue_push_head_link (conn->elt->inactive, conn->entry);
+                       conn->active = FALSE;
+                       rspamd_redis_pool_schedule_timeout (conn);
+               }
+       }
+       else {
+               g_assert_not_reached ();
+       }
+}
+
+
+void
+rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
+{
+       struct rspamd_redis_pool_elt *elt;
+       GHashTableIter it;
+       gpointer k, v;
+
+       g_assert (pool != NULL);
+
+       g_hash_table_iter_init (&it, pool->elts_by_key);
+
+       while (g_hash_table_iter_next (&it, &k, &v)) {
+               elt = v;
+               rspamd_redis_pool_elt_dtor (elt);
+               g_hash_table_iter_steal (&it);
+       }
+
+       g_hash_table_unref (pool->elts_by_ctx);
+       g_hash_table_unref (pool->elts_by_key);
+
+       g_slice_free1 (sizeof (*pool), pool);
+}
diff --git a/src/libserver/redis_pool.h b/src/libserver/redis_pool.h
new file mode 100644 (file)
index 0000000..5e5dc0b
--- /dev/null
@@ -0,0 +1,70 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_LIBSERVER_REDIS_POOL_H_
+#define SRC_LIBSERVER_REDIS_POOL_H_
+
+#include "config.h"
+
+struct rspamd_redis_pool;
+struct rspamd_config;
+struct redisAsyncContext;
+struct event_base;
+
+/**
+ * Creates new redis pool
+ * @return
+ */
+struct rspamd_redis_pool *rspamd_redis_pool_init (void);
+
+/**
+ * Configure redis pool and binds it to a specific event base
+ * @param cfg
+ * @param ev_base
+ */
+void rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
+               struct rspamd_config *cfg,
+               struct event_base *ev_base);
+
+
+/**
+ * Create or reuse the specific redis connection
+ * @param pool
+ * @param db
+ * @param password
+ * @param ip
+ * @param port
+ * @return
+ */
+struct redisAsyncContext* rspamd_redis_pool_connect (
+               struct rspamd_redis_pool *pool,
+               const gchar *db, const gchar *password,
+               const char *ip, int port);
+
+/**
+ * Release a connection to the pool
+ * @param pool
+ * @param ctx
+ */
+void rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
+               struct redisAsyncContext *ctx, gboolean is_fatal);
+
+/**
+ * Stops redis pool and destroys it
+ * @param pool
+ */
+void rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool);
+
+#endif /* SRC_LIBSERVER_REDIS_POOL_H_ */