diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-09-05 15:26:08 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-09-05 15:26:08 +0100 |
commit | 8d2d6d3d26098b94339500563dc5df92c4d26e45 (patch) | |
tree | 4775130a2b81d48e3ab6c014871fccff44c7f885 /src | |
parent | 55252c359cf6a20031ddfdab1bad31eb284dcd34 (diff) | |
download | rspamd-8d2d6d3d26098b94339500563dc5df92c4d26e45.tar.gz rspamd-8d2d6d3d26098b94339500563dc5df92c4d26e45.zip |
[Minor] Implement the rest functions for redis fuzzy backend
Diffstat (limited to 'src')
-rw-r--r-- | src/libserver/fuzzy_backend.c | 6 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend.h | 1 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend_redis.c | 449 |
3 files changed, 447 insertions, 9 deletions
diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index 84f2289e8..030028389 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -442,3 +442,9 @@ rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend) { return backend->ev_base; } + +gdouble +rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend) +{ + return backend->expire; +} diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h index 1eaa0fe2b..6c880d9c8 100644 --- a/src/libserver/fuzzy_backend.h +++ b/src/libserver/fuzzy_backend.h @@ -102,6 +102,7 @@ void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend, void *ud); struct event_base* rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend); +gdouble rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend); /** * Closes backend diff --git a/src/libserver/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend_redis.c index 007b75856..66490d386 100644 --- a/src/libserver/fuzzy_backend_redis.c +++ b/src/libserver/fuzzy_backend_redis.c @@ -634,15 +634,6 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, } } -void -rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, - GQueue *updates, const gchar *src, - rspamd_fuzzy_update_cb cb, void *ud, - void *subr_ud) -{ - struct rspamd_fuzzy_backend_redis *backend = subr_ud; -} - static void rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r, gpointer priv) @@ -759,6 +750,47 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, } } +static void +rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + event_del (&session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_version) { + session->callback.cb_version (reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul (reply->str, NULL, 10); + + if (session->callback.cb_version) { + session->callback.cb_version (nelts, session->cbdata); + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version (0, session->cbdata); + } + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version (0, session->cbdata); + } + rspamd_upstream_fail (session->up); + } + + rspamd_fuzzy_redis_session_dtor (session); +} + void rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, const gchar *src, @@ -766,8 +798,73 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct timeval tv; + rspamd_inet_addr_t *addr; + GString *key; g_assert (backend != NULL); + + session = g_slice_alloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_version = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION; + session->ev_base = rspamd_fuzzy_backend_event_base (bk); + + session->nargs = 2; + session->argv = g_malloc (sizeof (gchar *) * 2); + session->argv_lens = g_malloc (sizeof (gsize) * 2); + key = g_string_new (backend->redis_object); + g_string_append (key, src); + session->argv[0] = g_strdup ("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + + up = rspamd_upstream_get (backend->read_servers, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_fuzzy_redis_session_dtor (session); + + if (cb) { + cb (0, subr_ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session); + + if (cb) { + cb (0, subr_ud); + } + } + else { + /* Add timeout */ + event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout, + session); + event_base_set (session->ev_base, &session->timeout); + double_to_tv (backend->timeout, &tv); + event_add (&session->timeout, &tv); + } + } } const gchar* @@ -789,6 +886,340 @@ rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk, g_assert (backend != NULL); } +static gboolean +rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, + struct rspamd_fuzzy_redis_session *session, + struct fuzzy_peer_cmd *io_cmd, guint *shift) +{ + GString *key, *value; + guint cur_shift = *shift; + struct rspamd_fuzzy_cmd *cmd; + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + + if (cmd->cmd == FUZZY_WRITE) { + + } + } + else { + cmd = &io_cmd->cmd.normal; + + } + + if (cmd->cmd == FUZZY_WRITE) { + /* + * For each normal hash addition we do 3 redis commands: + * HSET <key> F <flag> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + /* HSET */ + key = g_string_new (session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (32); + rspamd_printf_gstring (value, "%d", cmd->flag); + session->argv[cur_shift] = g_strdup ("HSET"); + session->argv_lens[cur_shift++] = sizeof ("HSET") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("F"); + session->argv_lens[cur_shift++] = sizeof ("F") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HINCRBY */ + key = g_string_new (session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (32); + rspamd_printf_gstring (value, "%d", cmd->value); + session->argv[cur_shift] = g_strdup ("HINCRBY"); + session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("V"); + session->argv_lens[cur_shift++] = sizeof ("V") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* EXPIRE */ + key = g_string_new (session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (32); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + + *shift = cur_shift; + + return TRUE; +} + +static void +rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + event_del (&session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY) { + /* TODO: check all replies somehow */ + if (session->callback.cb_update) { + session->callback.cb_update (TRUE, session->cbdata); + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update (FALSE, session->cbdata); + } + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update (FALSE, session->cbdata); + } + + rspamd_upstream_fail (session->up); + } + + rspamd_fuzzy_redis_session_dtor (session); +} + +void +rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, + GQueue *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct timeval tv; + rspamd_inet_addr_t *addr; + GList *cur; + GString *key; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd; + guint nargs, ncommands, cur_shift; + + g_assert (backend != NULL); + + session = g_slice_alloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + /* + * For each normal hash addition we do 3 redis commands: + * HSET <key> F <flag> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * + * Where <key> is <prefix> || <digest> + * + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + * + * For each delete command we emit: + * DEL <key> + * + * For each delete command with shingles we emit also 32 commands: + * DEL <prefix>_<number>_<value> + */ + + ncommands = 3; /* For MULTI + EXEC */ + nargs = 5; + + for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) { + io_cmd = cur->data; + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + ncommands += 3; + nargs += 11; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 4; + } + + } + else if (cmd->cmd == FUZZY_DEL) { + ncommands += 1; + nargs += 2; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 2; + } + } + } + + /* Now we need to create a new request */ + session->callback.cb_update = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES; + session->cmd = cmd; + session->prob = 1.0; + session->ev_base = rspamd_fuzzy_backend_event_base (bk); + + /* First of all check digest */ + session->nargs = nargs; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + up = rspamd_upstream_get (backend->write_servers, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_fuzzy_redis_session_dtor (session); + + if (cb) { + cb (FALSE, subr_ud); + } + } + else { + /* Start with MULTI command */ + session->argv[0] = g_strdup ("MULTI"); + session->argv_lens[0] = 5; + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 1, + (const gchar **)session->argv, + session->argv_lens) != REDIS_OK) { + + if (cb) { + cb (FALSE, subr_ud); + } + rspamd_fuzzy_redis_session_dtor (session); + + return; + } + + /* Now split the rest of commands in packs and emit them command by command */ + cur_shift = 1; + + for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) { + io_cmd = cur->data; + + if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd, + &cur_shift)) { + if (cb) { + cb (FALSE, subr_ud); + } + rspamd_fuzzy_redis_session_dtor (session); + + return; + } + } + + /* Now INCR command for the source */ + key = g_string_new (backend->redis_object); + g_string_append (key, src); + session->argv[cur_shift] = g_strdup ("INCR"); + session->argv_lens[cur_shift ++] = 4; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift ++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + if (cb) { + cb (FALSE, subr_ud); + } + rspamd_fuzzy_redis_session_dtor (session); + + return; + } + + /* Finally we call EXEC with a specific callback */ + session->argv[cur_shift] = g_strdup ("EXEC"); + session->argv_lens[cur_shift] = 4; + + if (redisAsyncCommandArgv (session->ctx, + rspamd_fuzzy_redis_update_callback, session, + 1, + (const gchar **)&session->argv[cur_shift], + &session->argv_lens[cur_shift]) != REDIS_OK) { + + if (cb) { + cb (FALSE, subr_ud); + } + rspamd_fuzzy_redis_session_dtor (session); + + return; + } + else { + /* Add timeout */ + event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout, + session); + event_base_set (session->ev_base, &session->timeout); + double_to_tv (backend->timeout, &tv); + event_add (&session->timeout, &tv); + } + } +} + void rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk, void *subr_ud) |