Browse Source

Restore functions of fuzzy_check.

tags/0.8.0
Vsevolod Stakhov 9 years ago
parent
commit
084e2e2a39
3 changed files with 150 additions and 37 deletions
  1. 129
    24
      src/fuzzy_storage.c
  2. 1
    0
      src/fuzzy_storage.h
  3. 20
    13
      src/libserver/fuzzy_backend.c

+ 129
- 24
src/fuzzy_storage.c View File

@@ -75,11 +75,6 @@ worker_t fuzzy_worker = {
SOCK_DGRAM /* UDP socket */
};

static GHashTable *static_hash;
static rspamd_bloom_filter_t *bf;

/* Number of cache modifications */
static guint32 mods = 0;
/* For evtimer */
static struct timeval tmv;
static struct event tev;
@@ -106,7 +101,7 @@ struct rspamd_legacy_fuzzy_node {

struct fuzzy_session {
struct rspamd_worker *worker;
struct rspamd_fuzzy_cmd cmd;
struct rspamd_fuzzy_cmd *cmd;
gint fd;
guint64 time;
gboolean legacy;
@@ -143,18 +138,112 @@ sigusr2_handler (void *arg)
ctx = worker->ctx;
}

/*
* MDB Interface
*/
static gboolean
rspamd_fuzzy_check_client (struct fuzzy_session *session)
{
if (session->ctx->update_ips != NULL) {
if (radix_find_compressed_addr (session->ctx->update_ips,
&session->addr) == RADIX_NO_VALUE) {
return FALSE;
}
}
return TRUE;
}

static void
rspamd_fuzzy_write_reply (struct fuzzy_session *session,
struct rspamd_fuzzy_reply *rep)
{
gint r;
gchar buf[64];

if (session->legacy) {
if (rep->prob > 0.5) {
if (session->cmd->cmd == FUZZY_CHECK) {
r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF,
rep->value, rep->flag);
}
else {
r = rspamd_snprintf (buf, sizeof (buf), "OK" CRLF);
}

}
else {
r = rspamd_snprintf (buf, sizeof (buf), "ERR" CRLF);
}
r = sendto (session->fd, buf, r, 0, &session->addr.addr.sa,
session->addr.slen);
}
else {
r = sendto (session->fd, rep, sizeof (*rep), 0, &session->addr.addr.sa,
session->addr.slen);
}

if (r == -1) {
if (errno == EINTR) {
rspamd_fuzzy_write_reply (session, rep);
}
else {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
}

static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
struct rspamd_fuzzy_reply rep;
guint64 value;
int rc, match = 0, i;
struct rspamd_fuzzy_reply rep = {0, 0, 0.0};
gboolean res = FALSE;

if (session->cmd->cmd == FUZZY_CHECK) {
rep = rspamd_fuzzy_backend_check (session->ctx->backend, session->cmd,
session->ctx->expire);
}
else {
if (rspamd_fuzzy_check_client (session)) {
if (session->cmd->cmd == FUZZY_WRITE) {
res = rspamd_fuzzy_backend_add (session->ctx->backend,
session->cmd);
}
else {
res = rspamd_fuzzy_backend_del (session->ctx->backend,
session->cmd);
}
if (!res) {
rep.value = 404;
rep.prob = 0.0;
}
else {
rep.value = 0;
rep.prob = 1.0;
}
}
else {
rep.value = 403;
rep.prob = 0.0;
}
}

rspamd_fuzzy_write_reply (session, &rep);
}


static gboolean
rspamd_fuzzy_command_valid (struct rspamd_fuzzy_cmd *cmd, gint r)
{
if (cmd->version == RSPAMD_FUZZY_VERSION) {
if (cmd->shingles_count > 0) {
if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
return TRUE;
}
}
else {
return (r == sizeof (*cmd));
}
}

return FALSE;
}
/*
* Accept new connection and construct task
*/
@@ -164,15 +253,10 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct rspamd_fuzzy_storage_ctx *ctx;
struct fuzzy_session session;
ssize_t r;
struct rspamd_fuzzy_reply rep;
struct {
u_char cmd;
guint32 blocksize;
gint32 value;
u_char hash[FUZZY_HASHLEN];
} legacy_cmd;
gint r;
guint8 buf[2048];
struct rspamd_fuzzy_cmd *cmd = NULL, lcmd;
struct legacy_fuzzy_cmd *l;

ctx = worker->ctx;
session.worker = worker;
@@ -194,14 +278,32 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
return;
}
session.addr.af = session.addr.addr.sa.sa_family;
if (r == sizeof (struct legacy_fuzzy_cmd)) {
/* Old command */
if ((guint)r == sizeof (struct legacy_fuzzy_cmd)) {
session.legacy = TRUE;
l = (struct legacy_fuzzy_cmd *)buf;
lcmd.version = 2;
memcpy (lcmd.digest, l->hash, sizeof (lcmd.digest));
lcmd.cmd = l->cmd;
lcmd.flag = l->flag;
lcmd.shingles_count = 0;
lcmd.value = l->value;
cmd = &lcmd;
}
else if (r == sizeof (struct rspamd_fuzzy_cmd)) {
/* New command */
else if ((guint)r >= sizeof (struct rspamd_fuzzy_cmd)) {
/* Check shingles count sanity */
cmd = (struct rspamd_fuzzy_cmd *)buf;
if (!rspamd_fuzzy_command_valid (cmd, r)) {
/* Bad input */
msg_debug ("invalid fuzzy command of size %d received", r);
}
}
else {
/* Discard input */
msg_debug ("invalid fuzzy command of size %d received", r);
}
if (cmd != NULL) {
session.cmd = cmd;
rspamd_fuzzy_process_command (&session);
}
}
}
@@ -222,6 +324,7 @@ sync_callback (gint fd, short what, void *arg)
evtimer_add (&tev, &tmv);

/* Call backend sync */
rspamd_fuzzy_backend_sync (ctx->backend);
}

gpointer
@@ -341,6 +444,8 @@ start_fuzzy (struct rspamd_worker *worker)

event_base_loop (ctx->ev_base, 0);

rspamd_fuzzy_backend_sync (ctx->backend);
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (rspamd_main->logger);
exit (EXIT_SUCCESS);
}

+ 1
- 0
src/fuzzy_storage.h View File

@@ -37,6 +37,7 @@ struct rspamd_fuzzy_shingle_cmd {

struct rspamd_fuzzy_reply {
guint32 value;
guint32 flag;
gdouble prob;
};


+ 20
- 13
src/libserver/fuzzy_backend.c View File

@@ -60,7 +60,7 @@ const char *create_tables_sql =
"COMMIT;";
const char *create_index_sql =
"BEGIN;"
"CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest, flag);"
"CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
"CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
"COMMIT;";
enum rspamd_fuzzy_statement_idx {
@@ -115,8 +115,8 @@ static struct rspamd_fuzzy_stmts {
{
.idx = RSPAMD_FUZZY_BACKEND_UPDATE,
.sql = "UPDATE digests SET value = value + ?1 WHERE "
"digest==?2 AND flag==?3;",
.args = "IDI",
"digest==?2;",
.args = "ID",
.stmt = NULL,
.result = SQLITE_DONE
},
@@ -130,8 +130,8 @@ static struct rspamd_fuzzy_stmts {
},
{
.idx = RSPAMD_FUZZY_BACKEND_CHECK,
.sql = "SELECT value, time FROM digests WHERE digest==?1 AND flag==?2;",
.args = "DS",
.sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
.args = "D",
.stmt = NULL,
.result = SQLITE_ROW
},
@@ -144,8 +144,8 @@ static struct rspamd_fuzzy_stmts {
},
{
.idx = RSPAMD_FUZZY_BACKEND_DELETE,
.sql = "DELETE FROM digests WHERE digest==?1 AND flag==?2;",
.args = "DS",
.sql = "DELETE FROM digests WHERE digest==?1;",
.args = "D",
.stmt = NULL,
.result = SQLITE_DONE
}
@@ -237,6 +237,10 @@ rspamd_fuzzy_backend_run_stmt (struct rspamd_fuzzy_backend *bk, int idx, ...)
if (retcode == prepared_stmts[idx].result) {
return SQLITE_OK;
}
else {
msg_debug ("failed to execute query %s: %s", prepared_stmts[idx].sql,
sqlite3_errmsg (bk->db));
}

return retcode;
}
@@ -487,27 +491,30 @@ struct rspamd_fuzzy_reply
rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
{
struct rspamd_fuzzy_reply rep = {0, 0.0};
struct rspamd_fuzzy_reply rep = {0, 0, 0.0};
const struct rspamd_fuzzy_shingle_cmd *shcmd;
int rc;
gint64 timestamp;

/* Try direct match first of all */
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK,
cmd->digest, cmd->flag);
cmd->digest);

if (rc == SQLITE_OK) {
timestamp = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
if (time (NULL) - timestamp > expire) {
/* Expire element */
msg_debug ("requested hash has been expired");
rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_DELETE,
cmd->digest, cmd->flag);
cmd->digest, (gint)cmd->flag);
}
else {
rep.value = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
rep.prob = 1.0;
rep.flag = sqlite3_column_int (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
}
}
else if (cmd->shingles_count > 0) {
@@ -524,12 +531,12 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
int rc;

rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK,
cmd->digest, cmd->flag);
cmd->digest);

if (rc == SQLITE_OK) {
/* We need to increase weight */
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_UPDATE,
(gint64)cmd->value, cmd->digest, (gint)cmd->flag);
(gint64)cmd->value, cmd->digest);
}
else {
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_INSERT,
@@ -551,7 +558,7 @@ rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
int rc;

rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_DELETE,
cmd->digest, cmd->flag);
cmd->digest);

return (rc == SQLITE_OK);
}

Loading…
Cancel
Save