aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libserver/fuzzy_backend.c6
-rw-r--r--src/libserver/fuzzy_backend.h1
-rw-r--r--src/libserver/fuzzy_backend_redis.c449
3 files changed, 447 insertions, 9 deletions
diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c
index 84f2289e8..030028389 100644
--- a/src/libserver/fuzzy_backend.c
+++ b/src/libserver/fuzzy_backend.c
@@ -442,3 +442,9 @@ rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
{
return backend->ev_base;
}
+
+gdouble
+rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend)
+{
+ return backend->expire;
+}
diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h
index 1eaa0fe2b..6c880d9c8 100644
--- a/src/libserver/fuzzy_backend.h
+++ b/src/libserver/fuzzy_backend.h
@@ -102,6 +102,7 @@ void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend,
void *ud);
struct event_base* rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend);
+gdouble rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend);
/**
* Closes backend
diff --git a/src/libserver/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend_redis.c
index 007b75856..66490d386 100644
--- a/src/libserver/fuzzy_backend_redis.c
+++ b/src/libserver/fuzzy_backend_redis.c
@@ -634,15 +634,6 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
}
}
-void
-rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
- GQueue *updates, const gchar *src,
- rspamd_fuzzy_update_cb cb, void *ud,
- void *subr_ud)
-{
- struct rspamd_fuzzy_backend_redis *backend = subr_ud;
-}
-
static void
rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
gpointer priv)
@@ -759,6 +750,47 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
}
}
+static void
+rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+ gulong nelts;
+
+ event_del (&session->timeout);
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ if (session->callback.cb_version) {
+ session->callback.cb_version (reply->integer, session->cbdata);
+ }
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ nelts = strtoul (reply->str, NULL, 10);
+
+ if (session->callback.cb_version) {
+ session->callback.cb_version (nelts, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_version) {
+ session->callback.cb_version (0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_version) {
+ session->callback.cb_version (0, session->cbdata);
+ }
+ rspamd_upstream_fail (session->up);
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session);
+}
+
void
rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
const gchar *src,
@@ -766,8 +798,73 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
void *subr_ud)
{
struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct timeval tv;
+ rspamd_inet_addr_t *addr;
+ GString *key;
g_assert (backend != NULL);
+
+ session = g_slice_alloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ session->callback.cb_version = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
+ session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+
+ session->nargs = 2;
+ session->argv = g_malloc (sizeof (gchar *) * 2);
+ session->argv_lens = g_malloc (sizeof (gsize) * 2);
+ key = g_string_new (backend->redis_object);
+ g_string_append (key, src);
+ session->argv[0] = g_strdup ("GET");
+ session->argv_lens[0] = 3;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ g_string_free (key, FALSE); /* Do not free underlying array */
+
+ up = rspamd_upstream_get (backend->read_servers,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ if (cb) {
+ cb (0, subr_ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback,
+ session, session->nargs,
+ (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ if (cb) {
+ cb (0, subr_ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
+ session);
+ event_base_set (session->ev_base, &session->timeout);
+ double_to_tv (backend->timeout, &tv);
+ event_add (&session->timeout, &tv);
+ }
+ }
}
const gchar*
@@ -789,6 +886,340 @@ rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
g_assert (backend != NULL);
}
+static gboolean
+rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
+ struct rspamd_fuzzy_redis_session *session,
+ struct fuzzy_peer_cmd *io_cmd, guint *shift)
+{
+ GString *key, *value;
+ guint cur_shift = *shift;
+ struct rspamd_fuzzy_cmd *cmd;
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+
+ if (cmd->cmd == FUZZY_WRITE) {
+
+ }
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ /*
+ * For each normal hash addition we do 3 redis commands:
+ * HSET <key> F <flag>
+ * HINCRBY <key> V <weight>
+ * EXPIRE <key> <expire>
+ * Where <key> is <prefix> || <digest>
+ */
+
+ /* HSET */
+ key = g_string_new (session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (32);
+ rspamd_printf_gstring (value, "%d", cmd->flag);
+ session->argv[cur_shift] = g_strdup ("HSET");
+ session->argv_lens[cur_shift++] = sizeof ("HSET") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup ("F");
+ session->argv_lens[cur_shift++] = sizeof ("F") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 3,
+ (const gchar **)&session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* HINCRBY */
+ key = g_string_new (session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (32);
+ rspamd_printf_gstring (value, "%d", cmd->value);
+ session->argv[cur_shift] = g_strdup ("HINCRBY");
+ session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup ("V");
+ session->argv_lens[cur_shift++] = sizeof ("V") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 3,
+ (const gchar **)&session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* EXPIRE */
+ key = g_string_new (session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (32);
+ rspamd_printf_gstring (value, "%d",
+ (gint)rspamd_fuzzy_backend_get_expire (bk));
+ session->argv[cur_shift] = g_strdup ("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 3,
+ (const gchar **)&session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+
+ *shift = cur_shift;
+
+ return TRUE;
+}
+
+static void
+rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+ event_del (&session->timeout);
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY) {
+ /* TODO: check all replies somehow */
+ if (session->callback.cb_update) {
+ session->callback.cb_update (TRUE, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_update) {
+ session->callback.cb_update (FALSE, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_update) {
+ session->callback.cb_update (FALSE, session->cbdata);
+ }
+
+ rspamd_upstream_fail (session->up);
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session);
+}
+
+void
+rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
+ GQueue *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct timeval tv;
+ rspamd_inet_addr_t *addr;
+ GList *cur;
+ GString *key;
+ struct fuzzy_peer_cmd *io_cmd;
+ struct rspamd_fuzzy_cmd *cmd;
+ guint nargs, ncommands, cur_shift;
+
+ g_assert (backend != NULL);
+
+ session = g_slice_alloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ /*
+ * For each normal hash addition we do 3 redis commands:
+ * HSET <key> F <flag>
+ * HINCRBY <key> V <weight>
+ * EXPIRE <key> <expire>
+ *
+ * Where <key> is <prefix> || <digest>
+ *
+ * For each command with shingles we additionally emit 32 commands:
+ * SETEX <prefix>_<number>_<value> <expire> <digest>
+ *
+ * For each delete command we emit:
+ * DEL <key>
+ *
+ * For each delete command with shingles we emit also 32 commands:
+ * DEL <prefix>_<number>_<value>
+ */
+
+ ncommands = 3; /* For MULTI + EXEC */
+ nargs = 5;
+
+ for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) {
+ io_cmd = cur->data;
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ ncommands += 3;
+ nargs += 11;
+
+ if (io_cmd->is_shingle) {
+ ncommands += RSPAMD_SHINGLE_SIZE;
+ nargs += RSPAMD_SHINGLE_SIZE * 4;
+ }
+
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ ncommands += 1;
+ nargs += 2;
+
+ if (io_cmd->is_shingle) {
+ ncommands += RSPAMD_SHINGLE_SIZE;
+ nargs += RSPAMD_SHINGLE_SIZE * 2;
+ }
+ }
+ }
+
+ /* Now we need to create a new request */
+ session->callback.cb_update = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
+ session->cmd = cmd;
+ session->prob = 1.0;
+ session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+
+ /* First of all check digest */
+ session->nargs = nargs;
+ session->argv = g_malloc (sizeof (gchar *) * session->nargs);
+ session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
+
+ up = rspamd_upstream_get (backend->write_servers,
+ RSPAMD_UPSTREAM_MASTER_SLAVE,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ if (cb) {
+ cb (FALSE, subr_ud);
+ }
+ }
+ else {
+ /* Start with MULTI command */
+ session->argv[0] = g_strdup ("MULTI");
+ session->argv_lens[0] = 5;
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 1,
+ (const gchar **)session->argv,
+ session->argv_lens) != REDIS_OK) {
+
+ if (cb) {
+ cb (FALSE, subr_ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ return;
+ }
+
+ /* Now split the rest of commands in packs and emit them command by command */
+ cur_shift = 1;
+
+ for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) {
+ io_cmd = cur->data;
+
+ if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd,
+ &cur_shift)) {
+ if (cb) {
+ cb (FALSE, subr_ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ return;
+ }
+ }
+
+ /* Now INCR command for the source */
+ key = g_string_new (backend->redis_object);
+ g_string_append (key, src);
+ session->argv[cur_shift] = g_strdup ("INCR");
+ session->argv_lens[cur_shift ++] = 4;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift ++] = key->len;
+ g_string_free (key, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 2,
+ (const gchar **)&session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ if (cb) {
+ cb (FALSE, subr_ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ return;
+ }
+
+ /* Finally we call EXEC with a specific callback */
+ session->argv[cur_shift] = g_strdup ("EXEC");
+ session->argv_lens[cur_shift] = 4;
+
+ if (redisAsyncCommandArgv (session->ctx,
+ rspamd_fuzzy_redis_update_callback, session,
+ 1,
+ (const gchar **)&session->argv[cur_shift],
+ &session->argv_lens[cur_shift]) != REDIS_OK) {
+
+ if (cb) {
+ cb (FALSE, subr_ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session);
+
+ return;
+ }
+ else {
+ /* Add timeout */
+ event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
+ session);
+ event_base_set (session->ev_base, &session->timeout);
+ double_to_tv (backend->timeout, &tv);
+ event_add (&session->timeout, &tv);
+ }
+ }
+}
+
void
rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
void *subr_ud)