Browse Source

[Rework] Implement new version of fuzzy replies

tags/1.7.0
Vsevolod Stakhov 6 years ago
parent
commit
4854a5a405
4 changed files with 128 additions and 45 deletions
  1. 45
    21
      src/fuzzy_storage.c
  2. 55
    11
      src/libserver/fuzzy_backend_redis.c
  3. 18
    11
      src/libserver/fuzzy_backend_sqlite.c
  4. 10
    2
      src/libserver/fuzzy_wire.h

+ 45
- 21
src/fuzzy_storage.c View File

@@ -591,11 +591,23 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
/* Encrypted reply */
data = &session->reply;
len = sizeof (session->reply);

if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
len = sizeof (session->reply);
}
else {
len = sizeof (session->reply.hdr + session->reply.rep.v1);
}
}
else {
data = &session->reply.rep;
len = sizeof (session->reply.rep);

if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
len = sizeof (session->reply.rep);
}
else {
len = sizeof (session->reply.rep.v1);
}
}

r = rspamd_inet_address_sendto (session->fd, data, len, 0,
@@ -700,18 +712,18 @@ rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
gboolean encrypted, gboolean is_shingle)
{
if (cmd) {
result->tag = cmd->tag;
result->v1.tag = cmd->tag;

memcpy (&session->reply.rep, result, sizeof (*result));

rspamd_fuzzy_update_stats (session->ctx,
session->epoch,
result->prob > 0.5,
result->v1.prob > 0.5,
is_shingle,
session->key_stat,
session->ip_stat,
cmd->cmd,
result->value);
result->v1.value);

if (encrypted) {
/* We need also to encrypt reply */
@@ -825,13 +837,13 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
session->ip_stat = ip_stat;
}

result.flag = cmd->flag;
result.v1.flag = cmd->flag;

if (cmd->cmd == FUZZY_CHECK) {
if (G_UNLIKELY (session->ctx->collection_mode)) {
result.prob = 0;
result.value = 500;
result.flag = 0;
result.v1.prob = 0;
result.v1.value = 500;
result.v1.flag = 0;
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
else {
@@ -842,15 +854,15 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
}
else if (cmd->cmd == FUZZY_STAT) {
if (G_UNLIKELY (session->ctx->collection_mode)) {
result.prob = 0;
result.value = 500;
result.flag = 0;
result.v1.prob = 0;
result.v1.value = 500;
result.v1.flag = 0;
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
else {
result.prob = 1.0;
result.value = 0;
result.flag = session->ctx->stat.fuzzy_hashes;
result.v1.prob = 1.0;
result.v1.value = 0;
result.v1.flag = session->ctx->stat.fuzzy_hashes;
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
}
@@ -863,8 +875,8 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
hexbuf[sizeof (hexbuf) - 1] = '\0';

if (g_hash_table_lookup (session->ctx->skip_hashes, hexbuf)) {
result.value = 401;
result.prob = 0.0;
result.v1.value = 401;
result.v1.prob = 0.0;

goto reply;
}
@@ -893,12 +905,12 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
event_add (&up_req->io_ev, NULL);
}

result.value = 0;
result.prob = 1.0;
result.v1.value = 0;
result.v1.prob = 1.0;
}
else {
result.value = 403;
result.prob = 0.0;
result.v1.value = 403;
result.v1.prob = 0.0;
}
reply:
rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
@@ -912,6 +924,18 @@ rspamd_fuzzy_command_valid (struct rspamd_fuzzy_cmd *cmd, gint r)
enum rspamd_fuzzy_epoch ret = RSPAMD_FUZZY_EPOCH_MAX;

switch (cmd->version) {
case 4:
if (cmd->shingles_count > 0) {
if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
ret = RSPAMD_FUZZY_EPOCH11;
}
}
else {
if (r == sizeof (*cmd)) {
ret = RSPAMD_FUZZY_EPOCH11;
}
}
break;
case 3:
if (cmd->shingles_count > 0) {
if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {

+ 55
- 11
src/libserver/fuzzy_backend_redis.c View File

@@ -396,13 +396,13 @@ rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r,

if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE;
rep.prob = session->prob;
rep.v1.prob = session->prob;

g_assert (sel != NULL);

/* Prepare new check command */
rspamd_fuzzy_redis_session_free_args (session);
session->nargs = 4;
session->nargs = 5;
session->argv = g_malloc (sizeof (gchar *) * session->nargs);
session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);

@@ -416,7 +416,10 @@ rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r,
session->argv_lens[2] = 1;
session->argv[3] = g_strdup ("F");
session->argv_lens[3] = 1;
session->argv[3] = g_strdup ("C");
session->argv_lens[3] = 1;
g_string_free (key, FALSE); /* Do not free underlying array */
memcpy (rep.digest, sel->digest, sizeof (rep.digest));

g_assert (session->ctx != NULL);
if (redisAsyncCommandArgv (session->ctx,
@@ -535,12 +538,12 @@ rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
if (c->err == 0) {
rspamd_upstream_ok (session->up);

if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) {
if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
cur = reply->element[0];

if (cur->type == REDIS_REPLY_STRING) {
value = strtoul (cur->str, NULL, 10);
rep.value = value;
rep.v1.value = value;
found_elts ++;
}

@@ -548,12 +551,23 @@ rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,

if (cur->type == REDIS_REPLY_STRING) {
value = strtoul (cur->str, NULL, 10);
rep.flag = value;
rep.v1.flag = value;
found_elts ++;
}

if (found_elts == 2) {
rep.prob = session->prob;
if (found_elts >= 2) {
rep.v1.prob = session->prob;
memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest));
}

rep.ts = 0;

if (reply->elements > 2) {
cur = reply->element[2];

if (cur->type == REDIS_REPLY_STRING) {
rep.ts = strtoul (cur->str, NULL, 10);
}
}
}

@@ -619,7 +633,7 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
session->ev_base = rspamd_fuzzy_backend_event_base (bk);

/* First of all check digest */
session->nargs = 4;
session->nargs = 5;
session->argv = g_malloc (sizeof (gchar *) * session->nargs);
session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);

@@ -633,6 +647,8 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
session->argv_lens[2] = 1;
session->argv[3] = g_strdup ("F");
session->argv_lens[3] = 1;
session->argv[4] = g_strdup ("C");
session->argv_lens[4] = 1;
g_string_free (key, FALSE); /* Do not free underlying array */

up = rspamd_upstream_get (backend->read_servers,
@@ -963,8 +979,9 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,

if (cmd->cmd == FUZZY_WRITE) {
/*
* For each normal hash addition we do 3 redis commands:
* For each normal hash addition we do 5 redis commands:
* HSET <key> F <flag>
* HSETNX <key> C <time>
* HINCRBY <key> V <weight>
* EXPIRE <key> <expire>
* Where <key> is <prefix> || <digest>
@@ -997,6 +1014,33 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
return FALSE;
}

/* HSETNX */
klen = strlen (session->backend->redis_object) +
sizeof (cmd->digest) + 1;
key = g_string_sized_new (klen);
g_string_append (key, session->backend->redis_object);
g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
value = g_string_sized_new (30);
rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ());
session->argv[cur_shift] = g_strdup ("HSETNX");
session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1;
session->argv[cur_shift] = key->str;
session->argv_lens[cur_shift++] = key->len;
session->argv[cur_shift] = g_strdup ("C");
session->argv_lens[cur_shift++] = sizeof ("C") - 1;
session->argv[cur_shift] = value->str;
session->argv_lens[cur_shift++] = value->len;
g_string_free (key, FALSE);
g_string_free (value, FALSE);

if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
4,
(const gchar **)&session->argv[cur_shift - 4],
&session->argv_lens[cur_shift - 4]) != REDIS_OK) {

return FALSE;
}

/* HINCRBY */
key = g_string_sized_new (klen);
g_string_append (key, session->backend->redis_object);
@@ -1282,8 +1326,8 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
}

if (cmd->cmd == FUZZY_WRITE) {
ncommands += 4;
nargs += 13;
ncommands += 5;
nargs += 17;

if (io_cmd->is_shingle) {
ncommands += RSPAMD_SHINGLE_SIZE;

+ 18
- 11
src/libserver/fuzzy_backend_sqlite.c View File

@@ -496,13 +496,16 @@ struct rspamd_fuzzy_reply
rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,
const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
{
struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0};
struct rspamd_fuzzy_reply rep;
const struct rspamd_fuzzy_shingle_cmd *shcmd;
int rc;
gint64 timestamp;
gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
cur_cnt, max_cnt;

memset (&rep, 0, sizeof (rep));
memcpy (rep.digest, cmd->digest, sizeof (rep.digest));

if (backend == NULL) {
return rep;
}
@@ -522,10 +525,10 @@ rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,
msg_debug_fuzzy_backend ("requested hash has been expired");
}
else {
rep.value = sqlite3_column_int64 (
rep.v1.value = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
rep.prob = 1.0;
rep.flag = sqlite3_column_int (
rep.v1.prob = 1.0;
rep.v1.flag = sqlite3_column_int (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
}
}
@@ -586,12 +589,12 @@ rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,

if (sel_id != -1) {
/* We have some id selected here */
rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;
rep.v1.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;

if (rep.prob > 0.5) {
if (rep.v1.prob > 0.5) {
msg_debug_fuzzy_backend (
"found fuzzy hash with probability %.2f",
rep.prob);
rep.v1.prob);
rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
if (rc == SQLITE_OK) {
@@ -602,13 +605,17 @@ rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,
/* Expire element */
msg_debug_fuzzy_backend (
"requested hash has been expired");
rep.prob = 0.0;
rep.v1.prob = 0.0;
}
else {
rep.value = sqlite3_column_int64 (
rep.ts = timestamp;
memcpy (rep.digest, sqlite3_column_blob (
prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
0), sizeof (rep.digest));
rep.v1.value = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
1);
rep.flag = sqlite3_column_int (
rep.v1.flag = sqlite3_column_int (
prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
3);
}
@@ -616,7 +623,7 @@ rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,
}
else {
/* Otherwise we assume that as error */
rep.value = 0;
rep.v1.value = 0;
}

rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,

+ 10
- 2
src/libserver/fuzzy_wire.h View File

@@ -6,7 +6,7 @@
#include "shingles.h"
#include "cryptobox.h"

#define RSPAMD_FUZZY_VERSION 3
#define RSPAMD_FUZZY_VERSION 4
#define RSPAMD_FUZZY_KEYLEN 8

/* Commands for fuzzy storage */
@@ -24,6 +24,7 @@ enum rspamd_fuzzy_epoch {
RSPAMD_FUZZY_EPOCH8, /**< 0.8 till 0.9 */
RSPAMD_FUZZY_EPOCH9, /**< 0.9 + */
RSPAMD_FUZZY_EPOCH10, /**< 1.0+ encryption */
RSPAMD_FUZZY_EPOCH11, /**< 1.7+ extended reply */
RSPAMD_FUZZY_EPOCH_MAX
};

@@ -42,13 +43,20 @@ RSPAMD_PACKED(rspamd_fuzzy_shingle_cmd) {
struct rspamd_shingle sgl;
};

RSPAMD_PACKED(rspamd_fuzzy_reply) {
RSPAMD_PACKED(rspamd_fuzzy_reply_v1) {
gint32 value;
guint32 flag;
guint32 tag;
float prob;
};

RSPAMD_PACKED(rspamd_fuzzy_reply) {
struct rspamd_fuzzy_reply_v1 v1;
gchar digest[rspamd_cryptobox_HASHBYTES];
guint32 ts;
guchar reserved[12];
};

RSPAMD_PACKED(rspamd_fuzzy_encrypted_req_hdr) {
guchar magic[4];
guchar key_id[RSPAMD_FUZZY_KEYLEN];

Loading…
Cancel
Save