diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/fuzzy_storage.c | 153 | ||||
-rw-r--r-- | src/fuzzy_storage.h | 1 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend.c | 33 |
3 files changed, 150 insertions, 37 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index e4b0b4e6c..921f17964 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -75,11 +75,6 @@ worker_t fuzzy_worker = { SOCK_DGRAM /* UDP socket */ }; -static GHashTable *static_hash; -static rspamd_bloom_filter_t *bf; - -/* Number of cache modifications */ -static guint32 mods = 0; /* For evtimer */ static struct timeval tmv; static struct event tev; @@ -106,7 +101,7 @@ struct rspamd_legacy_fuzzy_node { struct fuzzy_session { struct rspamd_worker *worker; - struct rspamd_fuzzy_cmd cmd; + struct rspamd_fuzzy_cmd *cmd; gint fd; guint64 time; gboolean legacy; @@ -143,18 +138,112 @@ sigusr2_handler (void *arg) ctx = worker->ctx; } -/* - * MDB Interface - */ +static gboolean +rspamd_fuzzy_check_client (struct fuzzy_session *session) +{ + if (session->ctx->update_ips != NULL) { + if (radix_find_compressed_addr (session->ctx->update_ips, + &session->addr) == RADIX_NO_VALUE) { + return FALSE; + } + } + return TRUE; +} + +static void +rspamd_fuzzy_write_reply (struct fuzzy_session *session, + struct rspamd_fuzzy_reply *rep) +{ + gint r; + gchar buf[64]; + + if (session->legacy) { + if (rep->prob > 0.5) { + if (session->cmd->cmd == FUZZY_CHECK) { + r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF, + rep->value, rep->flag); + } + else { + r = rspamd_snprintf (buf, sizeof (buf), "OK" CRLF); + } + + } + else { + r = rspamd_snprintf (buf, sizeof (buf), "ERR" CRLF); + } + r = sendto (session->fd, buf, r, 0, &session->addr.addr.sa, + session->addr.slen); + } + else { + r = sendto (session->fd, rep, sizeof (*rep), 0, &session->addr.addr.sa, + session->addr.slen); + } + + if (r == -1) { + if (errno == EINTR) { + rspamd_fuzzy_write_reply (session, rep); + } + else { + msg_err ("error while writing reply: %s", strerror (errno)); + } + } +} static void rspamd_fuzzy_process_command (struct fuzzy_session *session) { - struct rspamd_fuzzy_reply rep; - guint64 value; - int rc, match = 0, i; + struct rspamd_fuzzy_reply rep = {0, 0, 0.0}; + gboolean res = FALSE; + + if (session->cmd->cmd == FUZZY_CHECK) { + rep = rspamd_fuzzy_backend_check (session->ctx->backend, session->cmd, + session->ctx->expire); + } + else { + if (rspamd_fuzzy_check_client (session)) { + if (session->cmd->cmd == FUZZY_WRITE) { + res = rspamd_fuzzy_backend_add (session->ctx->backend, + session->cmd); + } + else { + res = rspamd_fuzzy_backend_del (session->ctx->backend, + session->cmd); + } + if (!res) { + rep.value = 404; + rep.prob = 0.0; + } + else { + rep.value = 0; + rep.prob = 1.0; + } + } + else { + rep.value = 403; + rep.prob = 0.0; + } + } + + rspamd_fuzzy_write_reply (session, &rep); } + +static gboolean +rspamd_fuzzy_command_valid (struct rspamd_fuzzy_cmd *cmd, gint r) +{ + if (cmd->version == RSPAMD_FUZZY_VERSION) { + if (cmd->shingles_count > 0) { + if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) { + return TRUE; + } + } + else { + return (r == sizeof (*cmd)); + } + } + + return FALSE; +} /* * Accept new connection and construct task */ @@ -164,15 +253,10 @@ accept_fuzzy_socket (gint fd, short what, void *arg) struct rspamd_worker *worker = (struct rspamd_worker *)arg; struct rspamd_fuzzy_storage_ctx *ctx; struct fuzzy_session session; - ssize_t r; - struct rspamd_fuzzy_reply rep; - struct { - u_char cmd; - guint32 blocksize; - gint32 value; - u_char hash[FUZZY_HASHLEN]; - } legacy_cmd; + gint r; guint8 buf[2048]; + struct rspamd_fuzzy_cmd *cmd = NULL, lcmd; + struct legacy_fuzzy_cmd *l; ctx = worker->ctx; session.worker = worker; @@ -194,14 +278,32 @@ accept_fuzzy_socket (gint fd, short what, void *arg) return; } session.addr.af = session.addr.addr.sa.sa_family; - if (r == sizeof (struct legacy_fuzzy_cmd)) { - /* Old command */ + if ((guint)r == sizeof (struct legacy_fuzzy_cmd)) { + session.legacy = TRUE; + l = (struct legacy_fuzzy_cmd *)buf; + lcmd.version = 2; + memcpy (lcmd.digest, l->hash, sizeof (lcmd.digest)); + lcmd.cmd = l->cmd; + lcmd.flag = l->flag; + lcmd.shingles_count = 0; + lcmd.value = l->value; + cmd = &lcmd; } - else if (r == sizeof (struct rspamd_fuzzy_cmd)) { - /* New command */ + else if ((guint)r >= sizeof (struct rspamd_fuzzy_cmd)) { + /* Check shingles count sanity */ + cmd = (struct rspamd_fuzzy_cmd *)buf; + if (!rspamd_fuzzy_command_valid (cmd, r)) { + /* Bad input */ + msg_debug ("invalid fuzzy command of size %d received", r); + } } else { /* Discard input */ + msg_debug ("invalid fuzzy command of size %d received", r); + } + if (cmd != NULL) { + session.cmd = cmd; + rspamd_fuzzy_process_command (&session); } } } @@ -222,6 +324,7 @@ sync_callback (gint fd, short what, void *arg) evtimer_add (&tev, &tmv); /* Call backend sync */ + rspamd_fuzzy_backend_sync (ctx->backend); } gpointer @@ -341,6 +444,8 @@ start_fuzzy (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); + rspamd_fuzzy_backend_sync (ctx->backend); + rspamd_fuzzy_backend_close (ctx->backend); rspamd_log_close (rspamd_main->logger); exit (EXIT_SUCCESS); } diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h index 5c659a7b9..1d1b05edb 100644 --- a/src/fuzzy_storage.h +++ b/src/fuzzy_storage.h @@ -37,6 +37,7 @@ struct rspamd_fuzzy_shingle_cmd { struct rspamd_fuzzy_reply { guint32 value; + guint32 flag; gdouble prob; }; diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index 9556e0447..2df970543 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -60,7 +60,7 @@ const char *create_tables_sql = "COMMIT;"; const char *create_index_sql = "BEGIN;" - "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest, flag);" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" "COMMIT;"; enum rspamd_fuzzy_statement_idx { @@ -115,8 +115,8 @@ static struct rspamd_fuzzy_stmts { { .idx = RSPAMD_FUZZY_BACKEND_UPDATE, .sql = "UPDATE digests SET value = value + ?1 WHERE " - "digest==?2 AND flag==?3;", - .args = "IDI", + "digest==?2;", + .args = "ID", .stmt = NULL, .result = SQLITE_DONE }, @@ -130,8 +130,8 @@ static struct rspamd_fuzzy_stmts { }, { .idx = RSPAMD_FUZZY_BACKEND_CHECK, - .sql = "SELECT value, time FROM digests WHERE digest==?1 AND flag==?2;", - .args = "DS", + .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;", + .args = "D", .stmt = NULL, .result = SQLITE_ROW }, @@ -144,8 +144,8 @@ static struct rspamd_fuzzy_stmts { }, { .idx = RSPAMD_FUZZY_BACKEND_DELETE, - .sql = "DELETE FROM digests WHERE digest==?1 AND flag==?2;", - .args = "DS", + .sql = "DELETE FROM digests WHERE digest==?1;", + .args = "D", .stmt = NULL, .result = SQLITE_DONE } @@ -237,6 +237,10 @@ rspamd_fuzzy_backend_run_stmt (struct rspamd_fuzzy_backend *bk, int idx, ...) if (retcode == prepared_stmts[idx].result) { return SQLITE_OK; } + else { + msg_debug ("failed to execute query %s: %s", prepared_stmts[idx].sql, + sqlite3_errmsg (bk->db)); + } return retcode; } @@ -487,27 +491,30 @@ struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend, const struct rspamd_fuzzy_cmd *cmd, gint64 expire) { - struct rspamd_fuzzy_reply rep = {0, 0.0}; + struct rspamd_fuzzy_reply rep = {0, 0, 0.0}; const struct rspamd_fuzzy_shingle_cmd *shcmd; int rc; gint64 timestamp; /* Try direct match first of all */ rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK, - cmd->digest, cmd->flag); + cmd->digest); if (rc == SQLITE_OK) { timestamp = sqlite3_column_int64 ( prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1); if (time (NULL) - timestamp > expire) { /* Expire element */ + msg_debug ("requested hash has been expired"); rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_DELETE, - cmd->digest, cmd->flag); + cmd->digest, (gint)cmd->flag); } else { rep.value = sqlite3_column_int64 ( prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0); rep.prob = 1.0; + rep.flag = sqlite3_column_int ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2); } } else if (cmd->shingles_count > 0) { @@ -524,12 +531,12 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend, int rc; rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK, - cmd->digest, cmd->flag); + cmd->digest); if (rc == SQLITE_OK) { /* We need to increase weight */ rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_UPDATE, - (gint64)cmd->value, cmd->digest, (gint)cmd->flag); + (gint64)cmd->value, cmd->digest); } else { rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_INSERT, @@ -551,7 +558,7 @@ rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend, int rc; rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_DELETE, - cmd->digest, cmd->flag); + cmd->digest); return (rc == SQLITE_OK); } |