From 55252c359cf6a20031ddfdab1bad31eb284dcd34 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 5 Sep 2016 13:50:39 +0100 Subject: [PATCH] [Minor] Implement checks in redis fuzzy backend --- src/libserver/fuzzy_backend_redis.c | 392 +++++++++++++++++++++++++++- 1 file changed, 379 insertions(+), 13 deletions(-) diff --git a/src/libserver/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend_redis.c index c5961a69c..007b75856 100644 --- a/src/libserver/fuzzy_backend_redis.c +++ b/src/libserver/fuzzy_backend_redis.c @@ -45,6 +45,11 @@ struct rspamd_fuzzy_redis_session { struct rspamd_fuzzy_backend_redis *backend; redisAsyncContext *ctx; struct event timeout; + const struct rspamd_fuzzy_cmd *cmd; + struct event_base *ev_base; + float prob; + gboolean shingles_checked; + enum { RSPAMD_FUZZY_REDIS_COMMAND_COUNT, RSPAMD_FUZZY_REDIS_COMMAND_VERSION, @@ -62,14 +67,29 @@ struct rspamd_fuzzy_redis_session { void *cbdata; gchar **argv; + gsize *argv_lens; struct upstream *up; }; +static inline void +rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session) +{ + guint i; + + if (session->argv) { + for (i = 0; i < session->nargs; i ++) { + g_free (session->argv[i]); + } + + g_free (session->argv); + g_free (session->argv_lens); + } +} static void rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session) { redisAsyncContext *ac; - guint i; + if (session->ctx) { ac = session->ctx; @@ -82,13 +102,7 @@ rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session) event_del (&session->timeout); } - if (session->argv) { - for (i = 0; i < session->nargs; i ++) { - g_free (session->argv[i]); - } - - g_free (session->argv); - } + rspamd_fuzzy_redis_session_free_args (session); REF_RELEASE (session->backend); g_slice_free1 (sizeof (*session), session); @@ -271,6 +285,268 @@ rspamd_fuzzy_redis_timeout (gint fd, short what, gpointer priv) } } +static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, + gpointer priv); + +struct _rspamd_fuzzy_shingles_helper { + guchar digest[64]; + guint found; +}; + +static gint +rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b) +{ + const struct _rspamd_fuzzy_shingles_helper *sha = a, + *shb = b; + + return memcmp (sha->digest, shb->digest, sizeof (sha->digest)); +} + +static void +rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + struct timeval tv; + GString *key; + struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL; + guint i, found = 0, max_found = 0, cur_found = 0; + + event_del (&session->timeout); + memset (&rep, 0, sizeof (rep)); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY && + reply->elements == RSPAMD_SHINGLE_SIZE) { + shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) * + RSPAMD_SHINGLE_SIZE); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + cur = reply->element[i]; + + if (cur->type == REDIS_REPLY_STRING) { + shingles[i].found = 1; + memcpy (shingles[i].digest, cur->str, MIN (64, cur->len)); + found ++; + } + else { + memset (shingles[i].digest, 0, sizeof (shingles[i].digest)); + shingles[i].found = 0; + } + } + + session->prob = ((float)found) / RSPAMD_SHINGLE_SIZE; + rep.prob = session->prob; + + if (found > RSPAMD_SHINGLE_SIZE / 2) { + /* Now sort to find the most frequent element */ + qsort (shingles, RSPAMD_SHINGLE_SIZE, + sizeof (struct _rspamd_fuzzy_shingles_helper), + rspamd_fuzzy_backend_redis_shingles_cmp); + + prev = &shingles[0]; + + for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) { + if (!shingles[i].found) { + continue; + } + + if (memcmp (shingles[i].digest, prev->digest, 64) == 0) { + cur_found ++; + + if (cur_found > max_found) { + max_found = cur_found; + sel = &shingles[i]; + } + } + else { + cur_found = 1; + prev = &shingles[i]; + } + } + + g_assert (sel != NULL); + + /* Prepare new check command */ + rspamd_fuzzy_redis_session_free_args (session); + session->nargs = 4; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + key = g_string_new (session->backend->redis_object); + g_string_append_len (key, sel->digest, sizeof (sel->digest)); + session->argv[0] = g_strdup ("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup ("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup ("F"); + session->argv_lens[3] = 1; + g_string_free (key, FALSE); /* Do not free underlying array */ + + g_assert (session->ctx != NULL); + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + if (session->callback.cb_check) { + memset (&rep, 0, sizeof (rep)); + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor (session); + } + else { + /* Add timeout */ + event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout, + session); + event_base_set (session->ev_base, &session->timeout); + double_to_tv (session->backend->timeout, &tv); + event_add (&session->timeout, &tv); + } + + return; + } + } + + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_upstream_fail (session->up); + } + + rspamd_fuzzy_redis_session_dtor (session); +} + +static void +rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session) +{ + struct timeval tv; + struct rspamd_fuzzy_reply rep; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + GString *key; + guint i; + + rspamd_fuzzy_redis_session_free_args (session); + /* First of all check digest */ + session->nargs = 2 * session->cmd->shingles_count; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + key = g_string_new (session->backend->redis_object); + rspamd_printf_gstring (key, "_%d_%uL", i, shcmd->sgl.hashes[i]); + session->argv[i * 2] = g_strdup ("GET"); + session->argv_lens[i * 2] = 3; + session->argv[i * 2 + 1] = key->str; + session->argv_lens[i * 2 + 1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + } + + session->shingles_checked = TRUE; + + g_assert (session->ctx != NULL); + + + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + if (session->callback.cb_check) { + memset (&rep, 0, sizeof (rep)); + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor (session); + } + else { + /* Add timeout */ + event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout, + session); + event_base_set (session->ev_base, &session->timeout); + double_to_tv (session->backend->timeout, &tv); + event_add (&session->timeout, &tv); + } +} + +static void +rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + gulong value; + guint found_elts = 0; + + event_del (&session->timeout); + memset (&rep, 0, sizeof (rep)); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) { + cur = reply->element[0]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul (cur->str, NULL, 10); + rep.value = value; + found_elts ++; + } + + cur = reply->element[1]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul (cur->str, NULL, 10); + rep.flag = value; + found_elts ++; + } + + if (found_elts == 2) { + rep.prob = session->prob; + } + } + + if (found_elts != 2) { + if (session->cmd->shingles_count > 0 && !session->shingles_checked) { + /* We also need to check all shingles here */ + rspamd_fuzzy_backend_check_shingles (session); + /* Do not free session */ + return; + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_upstream_fail (session->up); + } + + rspamd_fuzzy_redis_session_dtor (session); +} + void rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, const struct rspamd_fuzzy_cmd *cmd, @@ -278,9 +554,84 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct timeval tv; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_reply rep; + GString *key; g_assert (backend != NULL); + session = g_slice_alloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_check = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK; + session->cmd = cmd; + session->prob = 1.0; + session->ev_base = rspamd_fuzzy_backend_event_base (bk); + + /* First of all check digest */ + session->nargs = 4; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + key = g_string_new (backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + session->argv[0] = g_strdup ("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup ("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup ("F"); + session->argv_lens[3] = 1; + g_string_free (key, FALSE); /* Do not free underlying array */ + + up = rspamd_upstream_get (backend->read_servers, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_fuzzy_redis_session_dtor (session); + + if (cb) { + memset (&rep, 0, sizeof (rep)); + cb (&rep, subr_ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session); + + if (cb) { + memset (&rep, 0, sizeof (rep)); + cb (&rep, subr_ud); + } + } + else { + /* Add timeout */ + event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout, + session); + event_base_set (session->ev_base, &session->timeout); + double_to_tv (backend->timeout, &tv); + event_add (&session->timeout, &tv); + } + } } void @@ -298,6 +649,7 @@ rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r, { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r; + gulong nelts; event_del (&session->timeout); @@ -309,6 +661,13 @@ rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r, session->callback.cb_count (reply->integer, session->cbdata); } } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul (reply->str, NULL, 10); + + if (session->callback.cb_count) { + session->callback.cb_count (nelts, session->cbdata); + } + } else { if (session->callback.cb_count) { session->callback.cb_count (0, session->cbdata); @@ -335,6 +694,7 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, struct upstream *up; struct timeval tv; rspamd_inet_addr_t *addr; + GString *key; g_assert (backend != NULL); @@ -345,11 +705,18 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, session->callback.cb_count = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT; + session->ev_base = rspamd_fuzzy_backend_event_base (bk); session->nargs = 2; session->argv = g_malloc (sizeof (gchar *) * 2); - session->argv[0] = g_strdup ("HLEN"); - session->argv[1] = g_strdup (backend->redis_object); + session->argv_lens = g_malloc (sizeof (gsize) * 2); + key = g_string_new (backend->redis_object); + g_string_append (key, "_count"); + session->argv[0] = g_strdup ("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ up = rspamd_upstream_get (backend->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, @@ -374,7 +741,7 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, else { if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback, session, session->nargs, - (const gchar **)session->argv, NULL) != REDIS_OK) { + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { rspamd_fuzzy_redis_session_dtor (session); if (cb) { @@ -385,8 +752,7 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, /* Add timeout */ event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout, session); - event_base_set (rspamd_fuzzy_backend_event_base (bk), - &session->timeout); + event_base_set (session->ev_base, &session->timeout); double_to_tv (backend->timeout, &tv); event_add (&session->timeout, &tv); } -- 2.39.5