aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-08-30 16:50:29 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-08-30 16:50:29 +0100
commitb5b0137de78bd9ea656adb64633fe91d210e14f6 (patch)
tree00ddd45378c8bcc6c0cf9d7ffd9ba4cd2f8cc00a
parentd81db26db5f350242b8d5df25ff8e9993c0b8a15 (diff)
downloadrspamd-b5b0137de78bd9ea656adb64633fe91d210e14f6.tar.gz
rspamd-b5b0137de78bd9ea656adb64633fe91d210e14f6.zip
[Feature] Add implementation of redis connections pool
-rw-r--r--src/libserver/CMakeLists.txt1
-rw-r--r--src/libserver/redis_pool.c331
-rw-r--r--src/libserver/redis_pool.h70
3 files changed, 402 insertions, 0 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt
index 49e4e6d25..295ad59c8 100644
--- a/src/libserver/CMakeLists.txt
+++ b/src/libserver/CMakeLists.txt
@@ -13,6 +13,7 @@ SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
${CMAKE_CURRENT_SOURCE_DIR}/proxy.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.c
${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c
${CMAKE_CURRENT_SOURCE_DIR}/spf.c
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
new file mode 100644
index 000000000..cfc2757be
--- /dev/null
+++ b/src/libserver/redis_pool.c
@@ -0,0 +1,331 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include <event.h>
+#include "redis_pool.h"
+#include "cfg_file.h"
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"
+#include "cryptobox.h"
+#include "ref.h"
+
+struct rspamd_redis_pool_elt;
+
+struct rspamd_redis_pool_connection {
+ struct redisAsyncContext *ctx;
+ struct rspamd_redis_pool_elt *elt;
+ GList *entry;
+ struct event timeout;
+ gboolean active;
+ ref_entry_t ref;
+};
+
+struct rspamd_redis_pool_elt {
+ struct rspamd_redis_pool *pool;
+ guint64 key;
+ GQueue *active;
+ GQueue *inactive;
+};
+
+struct rspamd_redis_pool {
+ struct event_base *ev_base;
+ struct rspamd_config *cfg;
+ GHashTable *elts_by_key;
+ GHashTable *elts_by_ctx;
+ gdouble timeout;
+ guint max_conns;
+};
+
+static const gdouble default_timeout = 60.0;
+static const guint default_max_conns = 100;
+
+static inline guint64
+rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
+ const char *ip, int port)
+{
+ rspamd_cryptobox_fast_hash_state_t st;
+
+ rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
+
+ if (db) {
+ rspamd_cryptobox_fast_hash_update (&st, db, strlen (db));
+ }
+ if (password) {
+ rspamd_cryptobox_fast_hash_update (&st, password, strlen (password));
+ }
+
+ rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip));
+ rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
+
+ return rspamd_cryptobox_fast_hash_final (&st);
+}
+
+
+static void
+rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
+{
+ if (c->active && c->ctx != NULL) {
+ g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
+ redisAsyncFree (c->ctx);
+ g_queue_unlink (c->elt->active, c->entry);
+ }
+
+ if (!c->active && event_get_base (&c->timeout)) {
+ event_del (&c->timeout);
+ g_queue_unlink (c->elt->inactive, c->entry);
+ }
+
+
+ g_list_free (c->entry);
+ g_slice_free1 (sizeof (*c), c);
+}
+
+static void
+rspamd_redis_pool_elt_dtor (gpointer p)
+{
+ GList *cur;
+ struct rspamd_redis_pool_elt *elt = p;
+ struct rspamd_redis_pool_connection *c;
+
+ for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
+ c = cur->data;
+ REF_RELEASE (c);
+ }
+
+ for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
+ c = cur->data;
+ REF_RELEASE (c);
+ }
+
+ g_queue_free (elt->active);
+ g_queue_free (elt->inactive);
+ g_slice_free1 (sizeof (*elt), elt);
+}
+
+static void
+rspamd_redis_conn_timeout (gint fd, short what, gpointer p)
+{
+ struct rspamd_redis_pool_connection *conn = p;
+
+ REF_RELEASE (conn);
+}
+
+static void
+rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
+{
+ struct timeval tv;
+ gdouble real_timeout;
+ guint active_elts;
+
+ active_elts = g_queue_get_length (conn->elt->active);
+
+ if (active_elts > conn->elt->pool->max_conns) {
+ real_timeout = conn->elt->pool->timeout / 2.0;
+ real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
+ }
+ else {
+ real_timeout = conn->elt->pool->timeout;
+ real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
+ }
+
+ double_to_tv (real_timeout, &tv);
+ event_set (&conn->timeout, -1, EV_TIMEOUT, rspamd_redis_conn_timeout, conn);
+ event_base_set (conn->elt->pool->ev_base, &conn->timeout);
+ event_add (&conn->timeout, &tv);
+}
+
+static struct rspamd_redis_pool_connection *
+rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
+ struct rspamd_redis_pool_elt *elt,
+ const char *db,
+ const char *password,
+ const char *ip,
+ gint port)
+{
+ struct rspamd_redis_pool_connection *conn;
+ struct redisAsyncContext *ctx;
+
+ ctx = redisAsyncConnect (ip, port);
+
+ if (ctx) {
+ conn = g_slice_alloc0 (sizeof (conn));
+ conn->entry = g_list_prepend (NULL, conn);
+ conn->elt = elt;
+ conn->active = TRUE;
+ g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+ g_queue_push_head_link (elt->active, conn->entry);
+ conn->ctx = ctx;
+ REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+
+ if (password) {
+ redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+ }
+ if (db) {
+ redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ }
+
+ return conn;
+ }
+
+ return NULL;
+}
+
+static struct rspamd_redis_pool_elt *
+rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool)
+{
+ struct rspamd_redis_pool_elt *elt;
+
+ elt = g_slice_alloc0 (sizeof (*elt));
+ elt->active = g_queue_new ();
+ elt->inactive = g_queue_new ();
+ elt->pool = pool;
+
+ return elt;
+}
+
+struct rspamd_redis_pool *
+rspamd_redis_pool_init (void)
+{
+ struct rspamd_redis_pool *pool;
+
+ pool = g_slice_alloc0 (sizeof (*pool));
+ pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
+ rspamd_redis_pool_elt_dtor);
+ pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
+
+ return pool;
+}
+
+void
+rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
+ struct rspamd_config *cfg,
+ struct event_base *ev_base)
+{
+ g_assert (pool != NULL);
+
+ pool->ev_base = ev_base;
+ pool->cfg = cfg;
+ pool->timeout = default_timeout;
+ pool->max_conns = default_max_conns;
+}
+
+
+struct redisAsyncContext*
+rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
+ const gchar *db, const gchar *password,
+ const char *ip, int port)
+{
+ guint64 key;
+ struct rspamd_redis_pool_elt *elt;
+ GList *conn_entry;
+ struct rspamd_redis_pool_connection *conn;
+
+ g_assert (pool != NULL);
+ g_assert (pool->ev_base != NULL);
+ g_assert (ip != NULL);
+
+ key = rspamd_redis_pool_get_key (db, password, ip, port);
+ elt = g_hash_table_lookup (pool->elts_by_key, &key);
+
+ if (elt) {
+ if (g_queue_get_length (elt->inactive) > 0) {
+ conn_entry = g_queue_pop_head_link (elt->inactive);
+ conn = conn_entry->data;
+
+ if (event_get_base (&conn->timeout)) {
+ event_del (&conn->timeout);
+ }
+
+ conn->active = TRUE;
+ g_queue_push_tail_link (elt->active, conn_entry);
+ REF_RETAIN (conn);
+
+ }
+ else {
+ /* Need to create connection */
+ conn = rspamd_redis_pool_new_connection (pool, elt,
+ db, password, ip, port);
+ }
+
+ return conn->ctx;
+ }
+ else {
+ elt = rspamd_redis_pool_new_elt (pool);
+ elt->key = key;
+ g_hash_table_insert (pool->elts_by_key, &elt->key, elt);
+
+ conn = rspamd_redis_pool_new_connection (pool, elt,
+ db, password, ip, port);
+ }
+
+ return conn->ctx;
+}
+
+
+void
+rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
+ struct redisAsyncContext *ctx, gboolean is_fatal)
+{
+ struct rspamd_redis_pool_connection *conn;
+
+ g_assert (pool != NULL);
+ g_assert (ctx != NULL);
+
+ conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
+ if (conn != NULL) {
+ REF_RELEASE (conn);
+
+ if (is_fatal) {
+ /* We need to terminate connection forcefully */
+ REF_RELEASE (conn);
+ }
+ else {
+ /* Just move it to the inactive queue */
+ g_queue_unlink (conn->elt->active, conn->entry);
+ g_queue_push_head_link (conn->elt->inactive, conn->entry);
+ conn->active = FALSE;
+ rspamd_redis_pool_schedule_timeout (conn);
+ }
+ }
+ else {
+ g_assert_not_reached ();
+ }
+}
+
+
+void
+rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
+{
+ struct rspamd_redis_pool_elt *elt;
+ GHashTableIter it;
+ gpointer k, v;
+
+ g_assert (pool != NULL);
+
+ g_hash_table_iter_init (&it, pool->elts_by_key);
+
+ while (g_hash_table_iter_next (&it, &k, &v)) {
+ elt = v;
+ rspamd_redis_pool_elt_dtor (elt);
+ g_hash_table_iter_steal (&it);
+ }
+
+ g_hash_table_unref (pool->elts_by_ctx);
+ g_hash_table_unref (pool->elts_by_key);
+
+ g_slice_free1 (sizeof (*pool), pool);
+}
diff --git a/src/libserver/redis_pool.h b/src/libserver/redis_pool.h
new file mode 100644
index 000000000..5e5dc0b5d
--- /dev/null
+++ b/src/libserver/redis_pool.h
@@ -0,0 +1,70 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_LIBSERVER_REDIS_POOL_H_
+#define SRC_LIBSERVER_REDIS_POOL_H_
+
+#include "config.h"
+
+struct rspamd_redis_pool;
+struct rspamd_config;
+struct redisAsyncContext;
+struct event_base;
+
+/**
+ * Creates new redis pool
+ * @return
+ */
+struct rspamd_redis_pool *rspamd_redis_pool_init (void);
+
+/**
+ * Configure redis pool and binds it to a specific event base
+ * @param cfg
+ * @param ev_base
+ */
+void rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
+ struct rspamd_config *cfg,
+ struct event_base *ev_base);
+
+
+/**
+ * Create or reuse the specific redis connection
+ * @param pool
+ * @param db
+ * @param password
+ * @param ip
+ * @param port
+ * @return
+ */
+struct redisAsyncContext* rspamd_redis_pool_connect (
+ struct rspamd_redis_pool *pool,
+ const gchar *db, const gchar *password,
+ const char *ip, int port);
+
+/**
+ * Release a connection to the pool
+ * @param pool
+ * @param ctx
+ */
+void rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
+ struct redisAsyncContext *ctx, gboolean is_fatal);
+
+/**
+ * Stops redis pool and destroys it
+ * @param pool
+ */
+void rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool);
+
+#endif /* SRC_LIBSERVER_REDIS_POOL_H_ */