aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/fuzzy_storage.c52
-rw-r--r--src/fuzzy_storage.h1
-rw-r--r--src/plugins/fuzzy_check.c68
3 files changed, 105 insertions, 16 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index e05a479f6..c3a29e17f 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -62,6 +62,7 @@ static struct timeval tmv;
static struct event tev;
struct rspamd_fuzzy_node {
+ int32_t value;
fuzzy_hash_t h;
uint64_t time;
};
@@ -239,7 +240,7 @@ read_hashes_file (struct rspamd_worker *wrk)
return TRUE;
}
-static gboolean
+static int
process_check_command (struct fuzzy_cmd *cmd)
{
GList *cur;
@@ -248,7 +249,7 @@ process_check_command (struct fuzzy_cmd *cmd)
int prob = 0;
if (!bloom_check (bf, cmd->hash)) {
- return FALSE;
+ return 0;
}
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
@@ -260,12 +261,38 @@ process_check_command (struct fuzzy_cmd *cmd)
h = cur->data;
if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) {
msg_info ("fuzzy hash was found, probability %d%%", prob);
- return TRUE;
+ return h->value;
}
cur = g_list_next (cur);
}
msg_debug ("fuzzy hash was NOT found, prob is %d%%", prob);
+ return 0;
+}
+
+static gboolean
+update_hash (struct fuzzy_cmd *cmd)
+{
+ GList *cur;
+ struct rspamd_fuzzy_node *h;
+ fuzzy_hash_t s;
+ int prob = 0;
+
+ memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
+ s.block_size = cmd->blocksize;
+ cur = hashes[cmd->blocksize % BUCKETS]->head;
+
+ /* XXX: too slow way */
+ while (cur) {
+ h = cur->data;
+ if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) {
+ h->value += cmd->value;
+ msg_info ("fuzzy hash was found, probability %d%%, set new value to %d", prob, h->value);
+ return TRUE;
+ }
+ cur = g_list_next (cur);
+ }
+
return FALSE;
}
@@ -275,7 +302,9 @@ process_write_command (struct fuzzy_cmd *cmd)
struct rspamd_fuzzy_node *h;
if (bloom_check (bf, cmd->hash)) {
- return FALSE;
+ if (update_hash (cmd)) {
+ return TRUE;
+ }
}
h = g_malloc (sizeof (struct rspamd_fuzzy_node));
@@ -343,9 +372,22 @@ else { \
static void
process_fuzzy_command (struct fuzzy_session *session)
{
+ int r;
+ char buf[64];
+
switch (session->cmd.cmd) {
case FUZZY_CHECK:
- CMD_PROCESS (check);
+ if ((r = process_check_command (&session->cmd))) {
+ r = snprintf (buf, sizeof (buf), "OK %d" CRLF, r);
+ if (sendto (session->fd, buf, r, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+ else {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
break;
case FUZZY_WRITE:
CMD_PROCESS (write);
diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h
index 1630171e4..aa3d50133 100644
--- a/src/fuzzy_storage.h
+++ b/src/fuzzy_storage.h
@@ -13,6 +13,7 @@
struct fuzzy_cmd {
u_char cmd;
uint32_t blocksize;
+ int32_t value;
u_char hash[FUZZY_HASHLEN];
};
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 2ac7475bf..987a2cb40 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -74,6 +74,7 @@ struct fuzzy_learn_session {
struct event ev;
fuzzy_hash_t *h;
int cmd;
+ int value;
int *saved;
struct timeval tv;
struct controller_session *session;
@@ -218,6 +219,7 @@ fuzzy_check_module_reconfig (struct config_file *cfg)
return fuzzy_check_module_config (cfg);
}
+/* Finalize IO */
static void
fuzzy_io_fin (void *ud)
{
@@ -232,12 +234,14 @@ fuzzy_io_fin (void *ud)
}
}
+/* Call this whenever we got data from fuzzy storage */
static void
fuzzy_io_callback (int fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR")];
+ char buf[62], *err_str;
+ int value;
if (what == EV_WRITE) {
/* Send command to storage */
@@ -253,11 +257,16 @@ fuzzy_io_callback (int fd, short what, void *arg)
}
}
else if (what == EV_READ) {
+ /* Got reply */
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
else if (buf[0] == 'O' && buf[1] == 'K') {
- insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+ /* Now try to get value */
+ value = strtol (buf + 3, &err_str, 10);
+ *err_str = '\0';
+ insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, value, g_list_prepend (NULL,
+ memory_pool_strdup (session->task->task_pool, buf + 3)));
}
goto ok;
}
@@ -289,13 +298,15 @@ fuzzy_learn_callback (int fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR" CRLF)];
+ char buf[64];
+ int r;
if (what == EV_WRITE) {
/* Send command to storage */
cmd.blocksize = session->h->block_size;
memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
cmd.cmd = session->cmd;
+ cmd.value = session->value;
if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
goto err;
}
@@ -308,18 +319,26 @@ fuzzy_learn_callback (int fd, short what, void *arg)
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
- goto ok;
+ else if (buf[0] == 'O' && buf[1] == 'K') {
+ r = snprintf (buf, sizeof (buf), "OK" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
+ goto ok;
+ }
+ goto err;
}
return;
err:
msg_err ("got error in IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
+ r = snprintf (buf, sizeof (buf), "Error" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
ok:
close (fd);
remove_normal_event (session->session->s, fuzzy_learn_fin, session);
}
+/* This callback is called when we check message via fuzzy hashes storage */
static void
fuzzy_symbol_callback (struct worker_task *task, void *unused)
{
@@ -337,6 +356,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
cur = g_list_next (cur);
continue;
}
+ /* Get upstream */
selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
sizeof (struct storage_server), task->ts.tv_sec,
DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
@@ -345,6 +365,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
}
else {
+ /* Create session for a socket */
session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session));
event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session);
session->tv.tv_sec = IO_TIMEOUT;
@@ -370,19 +391,26 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
struct mime_text_part *part;
struct storage_server *selected;
GList *cur;
- int sock, r, cmd = 0, *saved;
+ int sock, r, cmd = 0, value = 0, *saved, *sargs;
char out_buf[BUFSIZ];
+ /* Extract arguments */
if (session->other_data) {
- cmd = GPOINTER_TO_SIZE (session->other_data);
+ sargs = session->other_data;
+ cmd = sargs[0];
+ value = sargs[1];
}
+
+ /* Prepare task */
task = construct_task (session->worker);
session->other_data = task;
session->state = STATE_WAIT;
-
+
+ /* Allocate message from string */
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+
saved = memory_pool_alloc0 (session->session_pool, sizeof (int));
r = process_message (task);
@@ -404,10 +432,12 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
cur = g_list_next (cur);
continue;
}
+ /* Get upstream */
selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
sizeof (struct storage_server), task->ts.tv_sec,
DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
+ /* Create UDP socket */
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
session->state = STATE_REPLY;
@@ -417,6 +447,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
return;
}
else {
+ /* Socket is made, create session */
s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session));
event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
s->tv.tv_sec = IO_TIMEOUT;
@@ -427,6 +458,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
s->session = session;
s->server = selected;
s->cmd = cmd;
+ s->value = value;
s->saved = saved;
event_add (&s->ev, &s->tv);
(*saved)++;
@@ -434,6 +466,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
}
}
else {
+ /* Cannot write hash */
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
@@ -457,9 +490,10 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
{
char *arg, out_buf[BUFSIZ], *err_str;
uint32_t size;
- int r;
+ int r, value, *sargs;
- arg = *args;
+ /* Process size */
+ arg = args[0];
if (!arg || *arg == '\0') {
msg_info ("empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
@@ -467,7 +501,6 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
session->state = STATE_REPLY;
return;
}
-
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
@@ -475,11 +508,24 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
session->state = STATE_REPLY;
return;
}
+ /* Process value */
+ arg = args[1];
+ if (!arg || *arg == '\0') {
+ msg_info ("empty value, assume it 1");
+ value = 1;
+ }
+ else {
+ value = strtol (arg, &err_str, 10);
+ }
session->state = STATE_OTHER;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
session->other_handler = fuzzy_process_handler;
- session->other_data = GSIZE_TO_POINTER (cmd);
+ /* Prepare args */
+ sargs = memory_pool_alloc (session->session_pool, sizeof (int) * 2);
+ sargs[0] = cmd;
+ sargs[1] = value;
+ session->other_data = sargs;
}
static void