diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-02-10 21:03:29 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-02-10 21:12:13 +0000 |
commit | c607c36edda8eba5e073225ae923f80d5d22f9c6 (patch) | |
tree | 388eb673086a721aede6384cb4e935b058aae4de /src/libserver/fuzzy_backend/fuzzy_backend_redis.c | |
parent | 03a195e7b35f75b5f0ef81a5dc1580f2962cd5ab (diff) | |
download | rspamd-c607c36edda8eba5e073225ae923f80d5d22f9c6.tar.gz rspamd-c607c36edda8eba5e073225ae923f80d5d22f9c6.zip |
[Rework] Refactor libraries structure
* Move logger implementation to libserver
* Move fuzzy backend files to a separate subdir
TODO: Move HTTP code from libutil
Diffstat (limited to 'src/libserver/fuzzy_backend/fuzzy_backend_redis.c')
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_redis.c | 1564 |
1 files changed, 1564 insertions, 0 deletions
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c new file mode 100644 index 000000000..7070773b3 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c @@ -0,0 +1,1564 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "ref.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_redis.h" +#include "redis_pool.h" +#include "cryptobox.h" +#include "str_util.h" +#include "upstream.h" +#include "contrib/hiredis/hiredis.h" +#include "contrib/hiredis/async.h" +#include "lua/lua_common.h" + +#define REDIS_DEFAULT_PORT 6379 +#define REDIS_DEFAULT_OBJECT "fuzzy" +#define REDIS_DEFAULT_TIMEOUT 2.0 + +#define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_redis_session(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(fuzzy_redis) + +struct rspamd_fuzzy_backend_redis { + lua_State *L; + const gchar *redis_object; + const gchar *password; + const gchar *dbname; + gchar *id; + struct rspamd_redis_pool *pool; + gdouble timeout; + gint conf_ref; + ref_entry_t ref; +}; + +enum rspamd_fuzzy_redis_command { + RSPAMD_FUZZY_REDIS_COMMAND_COUNT, + RSPAMD_FUZZY_REDIS_COMMAND_VERSION, + RSPAMD_FUZZY_REDIS_COMMAND_UPDATES, + RSPAMD_FUZZY_REDIS_COMMAND_CHECK +}; + +struct rspamd_fuzzy_redis_session { + struct rspamd_fuzzy_backend_redis *backend; + redisAsyncContext *ctx; + ev_timer timeout; + const struct rspamd_fuzzy_cmd *cmd; + struct ev_loop *event_loop; + float prob; + gboolean shingles_checked; + + enum rspamd_fuzzy_redis_command command; + guint nargs; + + guint nadded; + guint ndeleted; + guint nextended; + guint nignored; + + union { + rspamd_fuzzy_check_cb cb_check; + rspamd_fuzzy_update_cb cb_update; + rspamd_fuzzy_version_cb cb_version; + rspamd_fuzzy_count_cb cb_count; + } callback; + void *cbdata; + + gchar **argv; + gsize *argv_lens; + struct upstream *up; + guchar found_digest[rspamd_cryptobox_HASHBYTES]; +}; + +static inline struct upstream_list * +rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx, + const gchar *what) +{ + lua_State *L = ctx->L; + struct upstream_list *res; + + lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref); + lua_pushstring (L, what); + lua_gettable (L, -2); + res = *((struct upstream_list**)lua_touserdata (L, -1)); + lua_settop (L, 0); + + return res; +} + +static inline void +rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session) +{ + guint i; + + if (session->argv) { + for (i = 0; i < session->nargs; i ++) { + g_free (session->argv[i]); + } + + g_free (session->argv); + g_free (session->argv_lens); + } +} +static void +rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session, + gboolean is_fatal) +{ + redisAsyncContext *ac; + + + if (session->ctx) { + ac = session->ctx; + session->ctx = NULL; + rspamd_redis_pool_release_connection (session->backend->pool, + ac, + is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); + } + + ev_timer_stop (session->event_loop, &session->timeout); + rspamd_fuzzy_redis_session_free_args (session); + + REF_RELEASE (session->backend); + g_free (session); +} + +static void +rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend) +{ + lua_State *L = backend->L; + + if (backend->conf_ref) { + luaL_unref (L, LUA_REGISTRYINDEX, backend->conf_ref); + } + + if (backend->id) { + g_free (backend->id); + } + + g_free (backend); +} + +void* +rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) +{ + struct rspamd_fuzzy_backend_redis *backend; + const ucl_object_t *elt; + gboolean ret = FALSE; + guchar id_hash[rspamd_cryptobox_HASHBYTES]; + rspamd_cryptobox_hash_state_t st; + lua_State *L = (lua_State *)cfg->lua_state; + gint conf_ref = -1; + + backend = g_malloc0 (sizeof (*backend)); + + backend->timeout = REDIS_DEFAULT_TIMEOUT; + backend->redis_object = REDIS_DEFAULT_OBJECT; + backend->L = L; + + ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref); + + /* Now try global redis settings */ + if (!ret) { + elt = ucl_object_lookup (cfg->rcl_obj, "redis"); + + if (elt) { + const ucl_object_t *specific_obj; + + specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage", + NULL); + + if (specific_obj) { + ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref); + } + else { + ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref); + } + } + } + + if (!ret) { + msg_err_config ("cannot init redis backend for fuzzy storage"); + g_free (backend); + + return NULL; + } + + elt = ucl_object_lookup (obj, "prefix"); + if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + backend->redis_object = REDIS_DEFAULT_OBJECT; + } + else { + backend->redis_object = ucl_object_tostring (elt); + } + + backend->conf_ref = conf_ref; + + /* Check some common table values */ + lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref); + + lua_pushstring (L, "timeout"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TNUMBER) { + backend->timeout = lua_tonumber (L, -1); + } + lua_pop (L, 1); + + lua_pushstring (L, "db"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TSTRING) { + backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool, + lua_tostring (L, -1)); + } + lua_pop (L, 1); + + lua_pushstring (L, "password"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TSTRING) { + backend->password = rspamd_mempool_strdup (cfg->cfg_pool, + lua_tostring (L, -1)); + } + lua_pop (L, 1); + + lua_settop (L, 0); + + REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor); + backend->pool = cfg->redis_pool; + rspamd_cryptobox_hash_init (&st, NULL, 0); + rspamd_cryptobox_hash_update (&st, backend->redis_object, + strlen (backend->redis_object)); + + if (backend->dbname) { + rspamd_cryptobox_hash_update (&st, backend->dbname, + strlen (backend->dbname)); + } + + if (backend->password) { + rspamd_cryptobox_hash_update (&st, backend->password, + strlen (backend->password)); + } + + rspamd_cryptobox_hash_final (&st, id_hash); + backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash)); + + return backend; +} + +static void +rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_fuzzy_redis_session *session = + (struct rspamd_fuzzy_redis_session *)w->data; + redisAsyncContext *ac; + static char errstr[128]; + + if (session->ctx) { + ac = session->ctx; + session->ctx = NULL; + ac->err = REDIS_ERR_IO; + /* Should be safe as in hiredis it is char[128] */ + rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT)); + ac->errstr = errstr; + + /* This will cause session closing */ + rspamd_redis_pool_release_connection (session->backend->pool, + ac, RSPAMD_REDIS_RELEASE_FATAL); + } +} + +static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, + gpointer priv); + +struct _rspamd_fuzzy_shingles_helper { + guchar digest[64]; + guint found; +}; + +static gint +rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b) +{ + const struct _rspamd_fuzzy_shingles_helper *sha = a, + *shb = b; + + return memcmp (sha->digest, shb->digest, sizeof (sha->digest)); +} + +static void +rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + GString *key; + struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL; + guint i, found = 0, max_found = 0, cur_found = 0; + + ev_timer_stop (session->event_loop, &session->timeout); + memset (&rep, 0, sizeof (rep)); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY && + reply->elements == RSPAMD_SHINGLE_SIZE) { + shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) * + RSPAMD_SHINGLE_SIZE); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + cur = reply->element[i]; + + if (cur->type == REDIS_REPLY_STRING) { + shingles[i].found = 1; + memcpy (shingles[i].digest, cur->str, MIN (64, cur->len)); + found ++; + } + else { + memset (shingles[i].digest, 0, sizeof (shingles[i].digest)); + shingles[i].found = 0; + } + } + + if (found > RSPAMD_SHINGLE_SIZE / 2) { + /* Now sort to find the most frequent element */ + qsort (shingles, RSPAMD_SHINGLE_SIZE, + sizeof (struct _rspamd_fuzzy_shingles_helper), + rspamd_fuzzy_backend_redis_shingles_cmp); + + prev = &shingles[0]; + + for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) { + if (!shingles[i].found) { + continue; + } + + if (memcmp (shingles[i].digest, prev->digest, 64) == 0) { + cur_found ++; + + if (cur_found > max_found) { + max_found = cur_found; + sel = &shingles[i]; + } + } + else { + cur_found = 1; + prev = &shingles[i]; + } + } + + if (max_found > RSPAMD_SHINGLE_SIZE / 2) { + session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE; + rep.v1.prob = session->prob; + + g_assert (sel != NULL); + + /* Prepare new check command */ + rspamd_fuzzy_redis_session_free_args (session); + session->nargs = 5; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + key = g_string_new (session->backend->redis_object); + g_string_append_len (key, sel->digest, sizeof (sel->digest)); + session->argv[0] = g_strdup ("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup ("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup ("F"); + session->argv_lens[3] = 1; + session->argv[4] = g_strdup ("C"); + session->argv_lens[4] = 1; + g_string_free (key, FALSE); /* Do not free underlying array */ + memcpy (session->found_digest, sel->digest, + sizeof (session->cmd->digest)); + + g_assert (session->ctx != NULL); + if (redisAsyncCommandArgv (session->ctx, + rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **)session->argv, + session->argv_lens) != REDIS_OK) { + + if (session->callback.cb_check) { + memset (&rep, 0, sizeof (rep)); + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor (session, TRUE); + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + + return; + } + } + } + + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting shingles: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +static void +rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session) +{ + struct rspamd_fuzzy_reply rep; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + GString *key; + guint i, init_len; + + rspamd_fuzzy_redis_session_free_args (session); + /* First of all check digest */ + session->nargs = RSPAMD_SHINGLE_SIZE + 1; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd; + + session->argv[0] = g_strdup ("MGET"); + session->argv_lens[0] = 4; + init_len = strlen (session->backend->redis_object); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + + key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616")); + rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object, + i, shcmd->sgl.hashes[i]); + session->argv[i + 1] = key->str; + session->argv_lens[i + 1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + } + + session->shingles_checked = TRUE; + + g_assert (session->ctx != NULL); + + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + msg_err ("cannot execute redis command: %s", session->ctx->errstr); + + if (session->callback.cb_check) { + memset (&rep, 0, sizeof (rep)); + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor (session, TRUE); + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } +} + +static void +rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + gulong value; + guint found_elts = 0; + + ev_timer_stop (session->event_loop, &session->timeout); + memset (&rep, 0, sizeof (rep)); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) { + cur = reply->element[0]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul (cur->str, NULL, 10); + rep.v1.value = value; + found_elts ++; + } + + cur = reply->element[1]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul (cur->str, NULL, 10); + rep.v1.flag = value; + found_elts ++; + } + + if (found_elts >= 2) { + rep.v1.prob = session->prob; + memcpy (rep.digest, session->found_digest, sizeof (rep.digest)); + } + + rep.ts = 0; + + if (reply->elements > 2) { + cur = reply->element[2]; + + if (cur->type == REDIS_REPLY_STRING) { + rep.ts = strtoul (cur->str, NULL, 10); + } + } + } + + if (found_elts != 2) { + if (session->cmd->shingles_count > 0 && !session->shingles_checked) { + /* We also need to check all shingles here */ + rspamd_fuzzy_backend_check_shingles (session); + /* Do not free session */ + return; + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting hashes: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_reply rep; + GString *key; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_check = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK; + session->cmd = cmd; + session->prob = 1.0; + memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest)); + memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest)); + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + /* First of all check digest */ + session->nargs = 5; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + key = g_string_new (backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + session->argv[0] = g_strdup ("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup ("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup ("F"); + session->argv_lens[3] = 1; + session->argv[4] = g_strdup ("C"); + session->argv_lens[4] = 1; + g_string_free (key, FALSE); /* Do not free underlying array */ + + ups = rspamd_redis_get_servers (backend, "read_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, TRUE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + memset (&rep, 0, sizeof (rep)); + cb (&rep, ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + memset (&rep, 0, sizeof (rep)); + cb (&rep, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +static void +rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + ev_timer_stop (session->event_loop, &session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_count) { + session->callback.cb_count (reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul (reply->str, NULL, 10); + + if (session->callback.cb_count) { + session->callback.cb_count (nelts, session->cbdata); + } + } + else { + if (session->callback.cb_count) { + session->callback.cb_count (0, session->cbdata); + } + } + } + else { + if (session->callback.cb_count) { + session->callback.cb_count (0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting count: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + GString *key; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_count = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT; + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + session->nargs = 2; + session->argv = g_malloc (sizeof (gchar *) * 2); + session->argv_lens = g_malloc (sizeof (gsize) * 2); + key = g_string_new (backend->redis_object); + g_string_append (key, "_count"); + session->argv[0] = g_strdup ("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + + ups = rspamd_redis_get_servers (backend, "read_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, TRUE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +static void +rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + ev_timer_stop (session->event_loop, &session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_version) { + session->callback.cb_version (reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul (reply->str, NULL, 10); + + if (session->callback.cb_version) { + session->callback.cb_version (nelts, session->cbdata); + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version (0, session->cbdata); + } + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version (0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting version: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + GString *key; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_version = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION; + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + session->nargs = 2; + session->argv = g_malloc (sizeof (gchar *) * 2); + session->argv_lens = g_malloc (sizeof (gsize) * 2); + key = g_string_new (backend->redis_object); + g_string_append (key, src); + session->argv[0] = g_strdup ("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + + ups = rspamd_redis_get_servers (backend, "read_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, FALSE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +const gchar* +rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + g_assert (backend != NULL); + + return backend->id; +} + +void +rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + + g_assert (backend != NULL); +} + +static gboolean +rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, + struct rspamd_fuzzy_redis_session *session, + struct fuzzy_peer_cmd *io_cmd, guint *shift) +{ + GString *key, *value; + guint cur_shift = *shift; + guint i, klen; + struct rspamd_fuzzy_cmd *cmd; + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + + } + + if (cmd->cmd == FUZZY_WRITE) { + /* + * For each normal hash addition we do 5 redis commands: + * HSET <key> F <flag> + * HSETNX <key> C <time> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + /* HSET */ + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", cmd->flag); + session->argv[cur_shift] = g_strdup ("HSET"); + session->argv_lens[cur_shift++] = sizeof ("HSET") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("F"); + session->argv_lens[cur_shift++] = sizeof ("F") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HSETNX */ + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("18446744073709551616")); + rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ()); + session->argv[cur_shift] = g_strdup ("HSETNX"); + session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("C"); + session->argv_lens[cur_shift++] = sizeof ("C") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HINCRBY */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", cmd->value); + session->argv[cur_shift] = g_strdup ("HINCRBY"); + session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("V"); + session->argv_lens[cur_shift++] = sizeof ("V") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* EXPIRE */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + + /* INCR */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append (key, "_count"); + session->argv[cur_shift] = g_strdup ("INCR"); + session->argv_lens[cur_shift++] = sizeof ("INCR") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_DEL) { + /* DEL */ + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + session->argv[cur_shift] = g_strdup ("DEL"); + session->argv_lens[cur_shift++] = sizeof ("DEL") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + + /* DECR */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append (key, "_count"); + session->argv[cur_shift] = g_strdup ("DECR"); + session->argv_lens[cur_shift++] = sizeof ("DECR") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + /* + * Issue refresh command by just EXPIRE command + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + + /* EXPIRE */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } + else { + g_assert_not_reached (); + } + + if (io_cmd->is_shingle) { + if (cmd->cmd == FUZZY_WRITE) { + klen = strlen (session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + guchar *hval; + /* + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + */ + + /* SETEX */ + key = g_string_sized_new (klen); + rspamd_printf_gstring (key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest)); + memcpy (hval, io_cmd->cmd.shingle.basic.digest, + sizeof (io_cmd->cmd.shingle.basic.digest)); + session->argv[cur_shift] = g_strdup ("SETEX"); + session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + session->argv[cur_shift] = hval; + session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest); + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_DEL) { + klen = strlen (session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + key = g_string_sized_new (klen); + rspamd_printf_gstring (key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + session->argv[cur_shift] = g_strdup ("DEL"); + session->argv_lens[cur_shift++] = sizeof ("DEL") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + klen = strlen (session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + /* + * For each command with shingles we additionally emit 32 commands: + * EXPIRE <prefix>_<number>_<value> <expire> + */ + + /* Expire */ + key = g_string_sized_new (klen); + rspamd_printf_gstring (key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new (sizeof ("18446744073709551616")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } + else { + g_assert_not_reached (); + } + } + + *shift = cur_shift; + + return TRUE; +} + +static void +rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + + ev_timer_stop (session->event_loop, &session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY) { + /* TODO: check all replies somehow */ + if (session->callback.cb_update) { + session->callback.cb_update (TRUE, + session->nadded, + session->ndeleted, + session->nextended, + session->nignored, + session->cbdata); + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata); + } + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error sending update to redis: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + guint i; + GString *key; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd = NULL; + guint nargs, ncommands, cur_shift; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + /* + * For each normal hash addition we do 3 redis commands: + * HSET <key> F <flag> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * INCR <prefix||fuzzy_count> + * + * Where <key> is <prefix> || <digest> + * + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + * + * For each delete command we emit: + * DEL <key> + * + * For each delete command with shingles we emit also 32 commands: + * DEL <prefix>_<number>_<value> + * DECR <prefix||fuzzy_count> + */ + + ncommands = 3; /* For MULTI + EXEC + INCR <src> */ + nargs = 4; + + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + ncommands += 5; + nargs += 17; + session->nadded ++; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 4; + } + + } + else if (cmd->cmd == FUZZY_DEL) { + ncommands += 2; + nargs += 4; + session->ndeleted ++; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 2; + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + ncommands += 1; + nargs += 3; + session->nextended ++; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 3; + } + } + else { + session->nignored ++; + } + } + + /* Now we need to create a new request */ + session->callback.cb_update = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES; + session->cmd = cmd; + session->prob = 1.0; + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + /* First of all check digest */ + session->nargs = nargs; + session->argv = g_malloc0 (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs); + + ups = rspamd_redis_get_servers (backend, "write_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, TRUE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + } + else { + /* Start with MULTI command */ + session->argv[0] = g_strdup ("MULTI"); + session->argv_lens[0] = 5; + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 1, + (const gchar **)session->argv, + session->argv_lens) != REDIS_OK) { + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + + /* Now split the rest of commands in packs and emit them command by command */ + cur_shift = 1; + + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd, + &cur_shift)) { + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + } + + /* Now INCR command for the source */ + key = g_string_new (backend->redis_object); + g_string_append (key, src); + session->argv[cur_shift] = g_strdup ("INCR"); + session->argv_lens[cur_shift ++] = 4; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift ++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + + /* Finally we call EXEC with a specific callback */ + session->argv[cur_shift] = g_strdup ("EXEC"); + session->argv_lens[cur_shift] = 4; + + if (redisAsyncCommandArgv (session->ctx, + rspamd_fuzzy_redis_update_callback, session, + 1, + (const gchar **)&session->argv[cur_shift], + &session->argv_lens[cur_shift]) != REDIS_OK) { + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +void +rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + + g_assert (backend != NULL); + + REF_RELEASE (backend); +} |