summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-31 19:42:21 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-31 19:42:21 +0400
commita9c4a7af9415b57397687bd3005dfcd96a01881d (patch)
tree3aba3246cfe4284ed40ed14c2e1a1d59518de30c /src
parentb227b759d6775b4ad3dc1a9739e16ed93e0e345e (diff)
downloadrspamd-a9c4a7af9415b57397687bd3005dfcd96a01881d.tar.gz
rspamd-a9c4a7af9415b57397687bd3005dfcd96a01881d.zip
* Make fuzzy storage working (tested checking, adding and deleting of fuzzy hashes from storage)
* Fix stupid bug in fuzzy distance calculations
Diffstat (limited to 'src')
-rw-r--r--src/controller.c5
-rw-r--r--src/fuzzy.c4
-rw-r--r--src/fuzzy_storage.c36
-rw-r--r--src/plugins/fuzzy_check.c86
4 files changed, 90 insertions, 41 deletions
diff --git a/src/controller.c b/src/controller.c
index a4c0a7bb4..30daa26fc 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -466,7 +466,7 @@ controller_read_socket (f_str_t *in, void *arg)
if (session->state == STATE_COMMAND) {
session->state = STATE_REPLY;
}
- if (session->state != STATE_LEARN) {
+ if (session->state != STATE_LEARN && session->state != STATE_OTHER) {
rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE);
}
@@ -639,7 +639,8 @@ void
register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message)
{
struct custom_controller_command *cmd;
-
+
+ cmd = g_malloc (sizeof (struct custom_controller_command));
cmd->command = name;
cmd->handler = handler;
cmd->privilleged = privilleged;
diff --git a/src/fuzzy.c b/src/fuzzy.c
index 9e49649aa..cb595a9ab 100644
--- a/src/fuzzy.c
+++ b/src/fuzzy.c
@@ -268,7 +268,7 @@ fuzzy_compare_hashes (fuzzy_hash_t *h1, fuzzy_hash_t *h2)
/* If we have hashes of different size, input strings are too different */
if (h1->block_size != h2->block_size) {
- return 100;
+ return 0;
}
l1 = strlen (h1->hash_pipe);
@@ -279,7 +279,7 @@ fuzzy_compare_hashes (fuzzy_hash_t *h1, fuzzy_hash_t *h2)
}
res = lev_distance (h1->hash_pipe, l1, h2->hash_pipe, l2);
- res = (res * 100) / (l1 + l2);
+ res = 100 - (2 * res * 100) / (l1 + l2);
return res;
}
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index a0aa00e50..6abcbfa42 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -115,10 +115,10 @@ sync_cache (struct rspamd_worker *wrk)
node = cur->data;
if (now - node->time > expire) {
/* Remove expired item */
+ tmp = cur;
cur = g_list_next (cur);
- hashes->head = g_list_remove_link (hashes->head, cur);
+ g_queue_delete_link (hashes, tmp);
g_free (node);
- g_list_free1 (tmp);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
@@ -218,6 +218,7 @@ process_check_command (struct fuzzy_cmd *cmd)
GList *cur;
struct rspamd_fuzzy_node *h;
fuzzy_hash_t s;
+ int prob = 0;
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
@@ -226,11 +227,13 @@ process_check_command (struct fuzzy_cmd *cmd)
/* XXX: too slow way */
while (cur) {
h = cur->data;
- if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) {
+ if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) {
+ msg_info ("process_check_command: fuzzy hash was found, probability %d%%", prob);
return TRUE;
}
cur = g_list_next (cur);
}
+ msg_info ("process_check_command: fuzzy hash was NOT found, prob is %d%%", prob);
return FALSE;
}
@@ -246,6 +249,7 @@ process_write_command (struct fuzzy_cmd *cmd)
h->time = (uint64_t)time (NULL);
g_queue_push_head (hashes, h);
mods ++;
+ msg_info ("process_write_command: fuzzy hash was successfully added");
return TRUE;
}
@@ -253,9 +257,10 @@ process_write_command (struct fuzzy_cmd *cmd)
static gboolean
process_delete_command (struct fuzzy_cmd *cmd)
{
- GList *cur;
+ GList *cur, *tmp;
struct rspamd_fuzzy_node *h;
fuzzy_hash_t s;
+ gboolean res = FALSE;
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
s.block_size = cmd->blocksize;
@@ -265,16 +270,19 @@ process_delete_command (struct fuzzy_cmd *cmd)
while (cur) {
h = cur->data;
if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) {
- hashes->head = g_list_remove_link (hashes->head, cur);
g_free (h);
- g_list_free1 (cur);
+ tmp = cur;
+ cur = g_list_next (cur);
+ g_queue_delete_link (hashes, tmp);
+ msg_info ("process_delete_command: fuzzy hash was successfully deleted");
+ res = TRUE;
mods ++;
- return TRUE;
+ continue;
}
cur = g_list_next (cur);
}
- return FALSE;
+ return res;
}
#define CMD_PROCESS(x) \
@@ -346,7 +354,7 @@ fuzzy_io_callback (int fd, short what, void *arg)
* Accept new connection and construct task
*/
static void
-accept_socket (int fd, short what, void *arg)
+accept_fuzzy_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct sockaddr_storage ss;
@@ -356,21 +364,21 @@ accept_socket (int fd, short what, void *arg)
int nfd;
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
- msg_warn ("accept_socket: accept failed: %s", strerror (errno));
+ msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno));
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
- msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
+ msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker");
return;
}
if (ss.ss_family == AF_UNIX) {
- msg_info ("accept_socket: accepted connection from unix socket");
+ msg_info ("accept_fuzzy_socket: accepted connection from unix socket");
}
else if (ss.ss_family == AF_INET) {
sin = (struct sockaddr_in *) &ss;
- msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+ msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
}
session = g_malloc (sizeof (struct fuzzy_session));
@@ -439,7 +447,7 @@ start_fuzzy_storage (struct rspamd_worker *worker)
evtimer_add (&tev, &tmv);
/* Accept event */
- event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index e4fdaba93..c016530d0 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -46,6 +46,7 @@
#define DEFAULT_PORT 11335
struct storage_server {
+ struct upstream up;
char *name;
struct in_addr addr;
uint16_t port;
@@ -71,8 +72,10 @@ struct fuzzy_client_session {
struct fuzzy_learn_session {
struct event ev;
fuzzy_hash_t *h;
+ int cmd;
struct timeval tv;
struct controller_session *session;
+ struct storage_server *server;
struct worker_task *task;
};
@@ -80,11 +83,13 @@ static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
static int fuzzy_mime_filter (struct worker_task *task);
static void fuzzy_symbol_callback (struct worker_task *task, void *unused);
+static void fuzzy_add_handler (char **args, struct controller_session *session);
+static void fuzzy_delete_handler (char **args, struct controller_session *session);
static void
parse_servers_string (char *str)
{
- char **strvec, *p, portbuf[5], *name;
+ char **strvec, *p, portbuf[6], *name;
int num, i, j, port;
struct hostent *hent;
struct in_addr addr;
@@ -94,14 +99,15 @@ parse_servers_string (char *str)
fuzzy_module_ctx->servers = memory_pool_alloc0 (fuzzy_module_ctx->fuzzy_pool, sizeof (struct storage_server) * num);
- for (i = 0; i <= num; i ++) {
+ for (i = 0; i < num; i ++) {
g_strstrip (strvec[i]);
if ((p = strchr (strvec[i], ':')) != NULL) {
j = 0;
p ++;
- while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
- portbuf[j ++] = *p ++;
+ while (g_ascii_isdigit (*(p + j)) && j < sizeof (portbuf) - 1) {
+ portbuf[j] = *(p + j);
+ j ++;
}
portbuf[j] = '\0';
port = atoi (portbuf);
@@ -110,8 +116,8 @@ parse_servers_string (char *str)
/* Default http port */
port = DEFAULT_PORT;
}
- name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i] + 1);
- g_strlcpy (name, strvec[i], p - strvec[i] + 1);
+ name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i]);
+ g_strlcpy (name, strvec[i], p - strvec[i]);
if (!inet_aton (name, &addr)) {
/* Resolve using dns */
hent = gethostbyname (name);
@@ -162,21 +168,21 @@ fuzzy_check_module_config (struct config_file *cfg)
struct metric *metric;
double *w;
- if ((value = get_module_opt (cfg, "fuzzy", "metric")) != NULL) {
+ if ((value = get_module_opt (cfg, "fuzzy_check", "metric")) != NULL) {
fuzzy_module_ctx->metric = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
g_free (value);
}
else {
fuzzy_module_ctx->metric = DEFAULT_METRIC;
}
- if ((value = get_module_opt (cfg, "fuzzy", "symbol")) != NULL) {
+ if ((value = get_module_opt (cfg, "fuzzy_check", "symbol")) != NULL) {
fuzzy_module_ctx->symbol = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
g_free (value);
}
else {
fuzzy_module_ctx->symbol = DEFAULT_SYMBOL;
}
- if ((value = get_module_opt (cfg, "fuzzy", "servers")) != NULL) {
+ if ((value = get_module_opt (cfg, "fuzzy_check", "servers")) != NULL) {
parse_servers_string (value);
}
@@ -195,6 +201,9 @@ fuzzy_check_module_config (struct config_file *cfg)
register_symbol (&metric->cache, fuzzy_module_ctx->symbol, *w, fuzzy_symbol_callback, NULL);
}
+ register_custom_controller_command ("fuzzy_add", fuzzy_add_handler, TRUE, TRUE);
+ register_custom_controller_command ("fuzzy_del", fuzzy_delete_handler, TRUE, TRUE);
+
return res;
}
@@ -258,18 +267,19 @@ fuzzy_learn_callback (int fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR")];
+ char buf[sizeof ("ERR" CRLF)];
+ int r;
if (what == EV_WRITE) {
/* Send command to storage */
cmd.blocksize = session->h->block_size;
memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
- cmd.cmd = FUZZY_WRITE;
+ cmd.cmd = session->cmd;
if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
goto err;
}
else {
- event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+ event_set (&session->ev, fd, EV_READ, fuzzy_learn_callback, session);
event_add (&session->ev, &session->tv);
}
}
@@ -277,24 +287,22 @@ fuzzy_learn_callback (int fd, short what, void *arg)
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
- else if (buf[0] == 'O' && buf[1] == 'K') {
- insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
- }
goto ok;
}
return;
err:
- msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+ msg_err ("fuzzy_learn_callback: got error in IO with server %s:%d, %d, %s", session->server->name,
+ session->server->port, errno, strerror (errno));
ok:
event_del (&session->ev);
close (fd);
session->task->save.saved --;
if (session->task->save.saved == 0) {
- /* Call other filters */
- session->task->save.saved = 1;
- process_filters (session->task);
+ session->session->state = WRITE_REPLY;
+ r = snprintf (buf, sizeof (buf), "OK" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
}
}
@@ -344,8 +352,12 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
struct mime_text_part *part;
struct storage_server *selected;
GList *cur;
- int sock, r;
-
+ int sock, r, cmd = 0;
+ char out_buf[BUFSIZ];
+
+ if (session->other_data) {
+ cmd = GPOINTER_TO_SIZE (session->other_data);
+ }
task = construct_task (session->worker);
session->other_data = task;
session->state = STATE_WAIT;
@@ -381,17 +393,31 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
s->task = task;
s->h = part->fuzzy;
s->session = session;
+ s->server = selected;
+ s->cmd = cmd;
event_add (&s->ev, &s->tv);
+ task->save.saved ++;
}
}
+ else {
+ r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ session->state = WRITE_REPLY;
+ return;
+ }
cur = g_list_next (cur);
}
}
+ if (task->save.saved == 0) {
+ r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ session->state = WRITE_REPLY;
+ }
}
static void
-fuzzy_controller_handler (char **args, struct controller_session *session)
+fuzzy_controller_handler (char **args, struct controller_session *session, int cmd)
{
char *arg, out_buf[BUFSIZ], *err_str;
uint32_t size;
@@ -402,20 +428,34 @@ fuzzy_controller_handler (char **args, struct controller_session *session)
msg_info ("fuzzy_controller_handler: empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ session->state = WRITE_REPLY;
return;
}
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
- msg_debug ("process_command: statfile size is invalid: %s", arg);
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+ session->state = WRITE_REPLY;
return;
}
session->state = STATE_OTHER;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
session->other_handler = fuzzy_process_handler;
+ session->other_data = GSIZE_TO_POINTER (cmd);
+}
+
+static void
+fuzzy_add_handler (char **args, struct controller_session *session)
+{
+ fuzzy_controller_handler (args, session, FUZZY_WRITE);
+}
+
+static void
+fuzzy_delete_handler (char **args, struct controller_session *session)
+{
+ fuzzy_controller_handler (args, session, FUZZY_DEL);
}
static int