SOCK_DGRAM /* UDP socket */
};
-static GHashTable *static_hash;
-static rspamd_bloom_filter_t *bf;
-
-/* Number of cache modifications */
-static guint32 mods = 0;
/* For evtimer */
static struct timeval tmv;
static struct event tev;
struct fuzzy_session {
struct rspamd_worker *worker;
- struct rspamd_fuzzy_cmd cmd;
+ struct rspamd_fuzzy_cmd *cmd;
gint fd;
guint64 time;
gboolean legacy;
ctx = worker->ctx;
}
-/*
- * MDB Interface
- */
+static gboolean
+rspamd_fuzzy_check_client (struct fuzzy_session *session)
+{
+ if (session->ctx->update_ips != NULL) {
+ if (radix_find_compressed_addr (session->ctx->update_ips,
+ &session->addr) == RADIX_NO_VALUE) {
+ return FALSE;
+ }
+ }
+ return TRUE;
+}
+
+static void
+rspamd_fuzzy_write_reply (struct fuzzy_session *session,
+ struct rspamd_fuzzy_reply *rep)
+{
+ gint r;
+ gchar buf[64];
+
+ if (session->legacy) {
+ if (rep->prob > 0.5) {
+ if (session->cmd->cmd == FUZZY_CHECK) {
+ r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF,
+ rep->value, rep->flag);
+ }
+ else {
+ r = rspamd_snprintf (buf, sizeof (buf), "OK" CRLF);
+ }
+
+ }
+ else {
+ r = rspamd_snprintf (buf, sizeof (buf), "ERR" CRLF);
+ }
+ r = sendto (session->fd, buf, r, 0, &session->addr.addr.sa,
+ session->addr.slen);
+ }
+ else {
+ r = sendto (session->fd, rep, sizeof (*rep), 0, &session->addr.addr.sa,
+ session->addr.slen);
+ }
+
+ if (r == -1) {
+ if (errno == EINTR) {
+ rspamd_fuzzy_write_reply (session, rep);
+ }
+ else {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+}
static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
- struct rspamd_fuzzy_reply rep;
- guint64 value;
- int rc, match = 0, i;
+ struct rspamd_fuzzy_reply rep = {0, 0, 0.0};
+ gboolean res = FALSE;
+
+ if (session->cmd->cmd == FUZZY_CHECK) {
+ rep = rspamd_fuzzy_backend_check (session->ctx->backend, session->cmd,
+ session->ctx->expire);
+ }
+ else {
+ if (rspamd_fuzzy_check_client (session)) {
+ if (session->cmd->cmd == FUZZY_WRITE) {
+ res = rspamd_fuzzy_backend_add (session->ctx->backend,
+ session->cmd);
+ }
+ else {
+ res = rspamd_fuzzy_backend_del (session->ctx->backend,
+ session->cmd);
+ }
+ if (!res) {
+ rep.value = 404;
+ rep.prob = 0.0;
+ }
+ else {
+ rep.value = 0;
+ rep.prob = 1.0;
+ }
+ }
+ else {
+ rep.value = 403;
+ rep.prob = 0.0;
+ }
+ }
+
+ rspamd_fuzzy_write_reply (session, &rep);
}
+
+static gboolean
+rspamd_fuzzy_command_valid (struct rspamd_fuzzy_cmd *cmd, gint r)
+{
+ if (cmd->version == RSPAMD_FUZZY_VERSION) {
+ if (cmd->shingles_count > 0) {
+ if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
+ return TRUE;
+ }
+ }
+ else {
+ return (r == sizeof (*cmd));
+ }
+ }
+
+ return FALSE;
+}
/*
* Accept new connection and construct task
*/
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct rspamd_fuzzy_storage_ctx *ctx;
struct fuzzy_session session;
- ssize_t r;
- struct rspamd_fuzzy_reply rep;
- struct {
- u_char cmd;
- guint32 blocksize;
- gint32 value;
- u_char hash[FUZZY_HASHLEN];
- } legacy_cmd;
+ gint r;
guint8 buf[2048];
+ struct rspamd_fuzzy_cmd *cmd = NULL, lcmd;
+ struct legacy_fuzzy_cmd *l;
ctx = worker->ctx;
session.worker = worker;
return;
}
session.addr.af = session.addr.addr.sa.sa_family;
- if (r == sizeof (struct legacy_fuzzy_cmd)) {
- /* Old command */
+ if ((guint)r == sizeof (struct legacy_fuzzy_cmd)) {
+ session.legacy = TRUE;
+ l = (struct legacy_fuzzy_cmd *)buf;
+ lcmd.version = 2;
+ memcpy (lcmd.digest, l->hash, sizeof (lcmd.digest));
+ lcmd.cmd = l->cmd;
+ lcmd.flag = l->flag;
+ lcmd.shingles_count = 0;
+ lcmd.value = l->value;
+ cmd = &lcmd;
}
- else if (r == sizeof (struct rspamd_fuzzy_cmd)) {
- /* New command */
+ else if ((guint)r >= sizeof (struct rspamd_fuzzy_cmd)) {
+ /* Check shingles count sanity */
+ cmd = (struct rspamd_fuzzy_cmd *)buf;
+ if (!rspamd_fuzzy_command_valid (cmd, r)) {
+ /* Bad input */
+ msg_debug ("invalid fuzzy command of size %d received", r);
+ }
}
else {
/* Discard input */
+ msg_debug ("invalid fuzzy command of size %d received", r);
+ }
+ if (cmd != NULL) {
+ session.cmd = cmd;
+ rspamd_fuzzy_process_command (&session);
}
}
}
evtimer_add (&tev, &tmv);
/* Call backend sync */
+ rspamd_fuzzy_backend_sync (ctx->backend);
}
gpointer
event_base_loop (ctx->ev_base, 0);
+ rspamd_fuzzy_backend_sync (ctx->backend);
+ rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (rspamd_main->logger);
exit (EXIT_SUCCESS);
}
"COMMIT;";
const char *create_index_sql =
"BEGIN;"
- "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest, flag);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
"CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
"COMMIT;";
enum rspamd_fuzzy_statement_idx {
{
.idx = RSPAMD_FUZZY_BACKEND_UPDATE,
.sql = "UPDATE digests SET value = value + ?1 WHERE "
- "digest==?2 AND flag==?3;",
- .args = "IDI",
+ "digest==?2;",
+ .args = "ID",
.stmt = NULL,
.result = SQLITE_DONE
},
},
{
.idx = RSPAMD_FUZZY_BACKEND_CHECK,
- .sql = "SELECT value, time FROM digests WHERE digest==?1 AND flag==?2;",
- .args = "DS",
+ .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
+ .args = "D",
.stmt = NULL,
.result = SQLITE_ROW
},
},
{
.idx = RSPAMD_FUZZY_BACKEND_DELETE,
- .sql = "DELETE FROM digests WHERE digest==?1 AND flag==?2;",
- .args = "DS",
+ .sql = "DELETE FROM digests WHERE digest==?1;",
+ .args = "D",
.stmt = NULL,
.result = SQLITE_DONE
}
if (retcode == prepared_stmts[idx].result) {
return SQLITE_OK;
}
+ else {
+ msg_debug ("failed to execute query %s: %s", prepared_stmts[idx].sql,
+ sqlite3_errmsg (bk->db));
+ }
return retcode;
}
rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
{
- struct rspamd_fuzzy_reply rep = {0, 0.0};
+ struct rspamd_fuzzy_reply rep = {0, 0, 0.0};
const struct rspamd_fuzzy_shingle_cmd *shcmd;
int rc;
gint64 timestamp;
/* Try direct match first of all */
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK,
- cmd->digest, cmd->flag);
+ cmd->digest);
if (rc == SQLITE_OK) {
timestamp = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
if (time (NULL) - timestamp > expire) {
/* Expire element */
+ msg_debug ("requested hash has been expired");
rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_DELETE,
- cmd->digest, cmd->flag);
+ cmd->digest, (gint)cmd->flag);
}
else {
rep.value = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
rep.prob = 1.0;
+ rep.flag = sqlite3_column_int (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
}
}
else if (cmd->shingles_count > 0) {
int rc;
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK,
- cmd->digest, cmd->flag);
+ cmd->digest);
if (rc == SQLITE_OK) {
/* We need to increase weight */
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_UPDATE,
- (gint64)cmd->value, cmd->digest, (gint)cmd->flag);
+ (gint64)cmd->value, cmd->digest);
}
else {
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_INSERT,
int rc;
rc = rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_DELETE,
- cmd->digest, cmd->flag);
+ cmd->digest);
return (rc == SQLITE_OK);
}