aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-02 17:52:20 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-02 17:52:20 +0100
commit66a7ef2d1f692978549a72f01a99b7789d22005f (patch)
treec828f169ab306e254f4f26c3b9501c3085c82bea /src
parentf206fff3277fbda728a608653636072f1c8ba411 (diff)
downloadrspamd-66a7ef2d1f692978549a72f01a99b7789d22005f.tar.gz
rspamd-66a7ef2d1f692978549a72f01a99b7789d22005f.zip
[Minor] Implement some features of redis backend
Diffstat (limited to 'src')
-rw-r--r--src/libserver/fuzzy_backend.c6
-rw-r--r--src/libserver/fuzzy_backend.h2
-rw-r--r--src/libserver/fuzzy_backend_redis.c262
3 files changed, 269 insertions, 1 deletions
diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c
index 12c506eb5..84f2289e8 100644
--- a/src/libserver/fuzzy_backend.c
+++ b/src/libserver/fuzzy_backend.c
@@ -436,3 +436,9 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
g_slice_free1 (sizeof (*bk), bk);
}
+
+struct event_base*
+rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
+{
+ return backend->ev_base;
+}
diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h
index 21b873581..1eaa0fe2b 100644
--- a/src/libserver/fuzzy_backend.h
+++ b/src/libserver/fuzzy_backend.h
@@ -101,6 +101,8 @@ void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend,
rspamd_fuzzy_periodic_cb cb,
void *ud);
+struct event_base* rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend);
+
/**
* Closes backend
* @param backend
diff --git a/src/libserver/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend_redis.c
index a5e9ea846..c5961a69c 100644
--- a/src/libserver/fuzzy_backend_redis.c
+++ b/src/libserver/fuzzy_backend_redis.c
@@ -15,9 +15,13 @@
*/
#include "config.h"
+#include "ref.h"
#include "fuzzy_backend.h"
#include "fuzzy_backend_redis.h"
#include "redis_pool.h"
+#include "cryptobox.h"
+#include "str_util.h"
+#include "upstream.h"
#include "contrib/hiredis/hiredis.h"
#include "contrib/hiredis/async.h"
@@ -31,11 +35,67 @@ struct rspamd_fuzzy_backend_redis {
const gchar *redis_object;
const gchar *password;
const gchar *dbname;
+ gchar *id;
+ struct rspamd_redis_pool *pool;
gdouble timeout;
+ ref_entry_t ref;
};
+struct rspamd_fuzzy_redis_session {
+ struct rspamd_fuzzy_backend_redis *backend;
+ redisAsyncContext *ctx;
+ struct event timeout;
+ enum {
+ RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
+ RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
+ RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
+ RSPAMD_FUZZY_REDIS_COMMAND_CHECK
+ } command;
+ guint nargs;
+
+ union {
+ rspamd_fuzzy_check_cb cb_check;
+ rspamd_fuzzy_update_cb cb_update;
+ rspamd_fuzzy_version_cb cb_version;
+ rspamd_fuzzy_count_cb cb_count;
+ } callback;
+ void *cbdata;
+
+ gchar **argv;
+ struct upstream *up;
+};
+
+static void
+rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session)
+{
+ redisAsyncContext *ac;
+ guint i;
+
+ if (session->ctx) {
+ ac = session->ctx;
+ session->ctx = NULL;
+ rspamd_redis_pool_release_connection (session->backend->pool,
+ ac, FALSE);
+ }
+
+ if (event_get_base (&session->timeout)) {
+ event_del (&session->timeout);
+ }
+
+ if (session->argv) {
+ for (i = 0; i < session->nargs; i ++) {
+ g_free (session->argv[i]);
+ }
+
+ g_free (session->argv);
+ }
+
+ REF_RELEASE (session->backend);
+ g_slice_free1 (sizeof (*session), session);
+}
+
static gboolean
-rspamd_redis_try_ucl (struct rspamd_fuzzy_backend_redis *backend,
+rspamd_fuzzy_backend_redis_try_ucl (struct rspamd_fuzzy_backend_redis *backend,
const ucl_object_t *obj,
struct rspamd_config *cfg)
{
@@ -112,11 +172,103 @@ rspamd_redis_try_ucl (struct rspamd_fuzzy_backend_redis *backend,
return TRUE;
}
+static void
+rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend)
+{
+ if (backend->read_servers) {
+ rspamd_upstreams_destroy (backend->read_servers);
+ }
+ if (backend->write_servers) {
+ rspamd_upstreams_destroy (backend->read_servers);
+ }
+
+ if (backend->id) {
+ g_free (backend->id);
+ }
+
+ g_slice_free1 (sizeof (*backend), backend);
+}
+
void*
rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk,
const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
{
+ struct rspamd_fuzzy_backend_redis *backend;
+ const ucl_object_t *elt;
+ gboolean ret = FALSE;
+ guchar id_hash[rspamd_cryptobox_HASHBYTES];
+ rspamd_cryptobox_hash_state_t st;
+
+ backend = g_slice_alloc0 (sizeof (*backend));
+
+ backend->timeout = REDIS_DEFAULT_TIMEOUT;
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
+
+ ret = rspamd_fuzzy_backend_redis_try_ucl (backend, obj, cfg);
+
+ /* Now try global redis settings */
+ if (!ret) {
+ elt = ucl_object_lookup (cfg->rcl_obj, "redis");
+
+ if (elt) {
+ const ucl_object_t *specific_obj;
+
+ specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage",
+ NULL);
+
+ if (specific_obj) {
+ ret = rspamd_fuzzy_backend_redis_try_ucl (backend, specific_obj,
+ cfg);
+ }
+ else {
+ ret = rspamd_fuzzy_backend_redis_try_ucl (backend, elt, cfg);
+ }
+ }
+ }
+
+ if (!ret) {
+ msg_err_config ("cannot init redis backend for fuzzy storage");
+ g_slice_free1 (sizeof (*backend), backend);
+ return NULL;
+ }
+
+ REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor);
+ backend->pool = cfg->redis_pool;
+ rspamd_cryptobox_hash_init (&st, NULL, 0);
+ rspamd_cryptobox_hash_update (&st, backend->redis_object,
+ strlen (backend->redis_object));
+
+ if (backend->dbname) {
+ rspamd_cryptobox_hash_update (&st, backend->dbname,
+ strlen (backend->dbname));
+ }
+
+ if (backend->password) {
+ rspamd_cryptobox_hash_update (&st, backend->password,
+ strlen (backend->password));
+ }
+
+ rspamd_cryptobox_hash_final (&st, id_hash);
+ backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash));
+
+ return backend;
+}
+
+static void
+rspamd_fuzzy_redis_timeout (gint fd, short what, gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisAsyncContext *ac;
+
+ if (session->ctx) {
+ ac = session->ctx;
+ session->ctx = NULL;
+ ac->err = REDIS_ERR_IO;
+ /* This will cause session closing */
+ rspamd_redis_pool_release_connection (session->backend->pool,
+ ac, TRUE);
+ }
}
void
@@ -125,6 +277,9 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
rspamd_fuzzy_check_cb cb, void *ud,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+
+ g_assert (backend != NULL);
}
@@ -134,7 +289,40 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
rspamd_fuzzy_update_cb cb, void *ud,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+}
+
+static void
+rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+
+ event_del (&session->timeout);
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ if (session->callback.cb_count) {
+ session->callback.cb_count (reply->integer, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_count) {
+ session->callback.cb_count (0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_count) {
+ session->callback.cb_count (0, session->cbdata);
+ }
+ rspamd_upstream_fail (session->up);
+ }
+ rspamd_fuzzy_redis_session_dtor (session);
}
void
@@ -142,34 +330,106 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
rspamd_fuzzy_count_cb cb, void *ud,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct timeval tv;
+ rspamd_inet_addr_t *addr;
+
+ g_assert (backend != NULL);
+
+ session = g_slice_alloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ session->callback.cb_count = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
+
+ session->nargs = 2;
+ session->argv = g_malloc (sizeof (gchar *) * 2);
+ session->argv[0] = g_strdup ("HLEN");
+ session->argv[1] = g_strdup (backend->redis_object);
+
+ up = rspamd_upstream_get (backend->read_servers,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_fuzzy_redis_session_dtor (session);
+ if (cb) {
+ cb (0, subr_ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback,
+ session, session->nargs,
+ (const gchar **)session->argv, NULL) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ if (cb) {
+ cb (0, subr_ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
+ session);
+ event_base_set (rspamd_fuzzy_backend_event_base (bk),
+ &session->timeout);
+ double_to_tv (backend->timeout, &tv);
+ event_add (&session->timeout, &tv);
+ }
+ }
}
+
void
rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
const gchar *src,
rspamd_fuzzy_version_cb cb, void *ud,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ g_assert (backend != NULL);
}
const gchar*
rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ g_assert (backend != NULL);
+ return backend->id;
}
void
rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ g_assert (backend != NULL);
}
void
rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
void *subr_ud)
{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+
+ g_assert (backend != NULL);
+ REF_RELEASE (backend);
}