From a9c4a7af9415b57397687bd3005dfcd96a01881d Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 31 Jul 2009 19:42:21 +0400 Subject: [PATCH] * Make fuzzy storage working (tested checking, adding and deleting of fuzzy hashes from storage) * Fix stupid bug in fuzzy distance calculations --- rspamc.pl.in | 26 +++++++++++- src/controller.c | 5 ++- src/fuzzy.c | 4 +- src/fuzzy_storage.c | 36 +++++++++------- src/plugins/fuzzy_check.c | 86 ++++++++++++++++++++++++++++----------- 5 files changed, 115 insertions(+), 42 deletions(-) diff --git a/rspamc.pl.in b/rspamc.pl.in index ccbc73cc4..4185039de 100755 --- a/rspamc.pl.in +++ b/rspamc.pl.in @@ -188,6 +188,30 @@ sub do_control_command { print "Authentication failed\n"; } } + elsif ($cfg{'command'} =~ /(fuzzy_add|fuzzy_del)/i) { + while (defined (my $line = <>)) { + $input .= $line; + } + + if (do_ctrl_auth ($sock)) { + my $len = length ($input); + print "Sending $len bytes...\n"; + syswrite $sock, $cfg{'command'} . " $len" . $CRLF; + syswrite $sock, $input . $CRLF; + if (defined (my $reply = <$sock>)) { + if ($reply =~ /^OK/) { + print $cfg{'command'} . " succeed\n"; + } + else { + print $cfg{'command'} . " failed\n"; + } + } + } + else { + print "Authentication failed\n"; + } + + } else { syswrite $sock, $cfg{'command'} . $CRLF; while (defined (my $line = <$sock>)) { @@ -240,7 +264,7 @@ if ($cmd =~ /(SYMBOLS|SCAN|PROCESS|CHECK|REPORT_IFSPAM|REPORT|URLS|EMAILS)/i) { $cfg{'command'} = $1; $cfg{'control'} = 0; } -elsif ($cmd =~ /(STAT|LEARN|SHUTDOWN|RELOAD|UPTIME|COUNTERS)/i) { +elsif ($cmd =~ /(STAT|LEARN|SHUTDOWN|RELOAD|UPTIME|COUNTERS|FUZZY_ADD|FUZZY_DEL)/i) { $cfg{'command'} = $1; $cfg{'control'} = 1; } diff --git a/src/controller.c b/src/controller.c index a4c0a7bb4..30daa26fc 100644 --- a/src/controller.c +++ b/src/controller.c @@ -466,7 +466,7 @@ controller_read_socket (f_str_t *in, void *arg) if (session->state == STATE_COMMAND) { session->state = STATE_REPLY; } - if (session->state != STATE_LEARN) { + if (session->state != STATE_LEARN && session->state != STATE_OTHER) { rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE); } @@ -639,7 +639,8 @@ void register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message) { struct custom_controller_command *cmd; - + + cmd = g_malloc (sizeof (struct custom_controller_command)); cmd->command = name; cmd->handler = handler; cmd->privilleged = privilleged; diff --git a/src/fuzzy.c b/src/fuzzy.c index 9e49649aa..cb595a9ab 100644 --- a/src/fuzzy.c +++ b/src/fuzzy.c @@ -268,7 +268,7 @@ fuzzy_compare_hashes (fuzzy_hash_t *h1, fuzzy_hash_t *h2) /* If we have hashes of different size, input strings are too different */ if (h1->block_size != h2->block_size) { - return 100; + return 0; } l1 = strlen (h1->hash_pipe); @@ -279,7 +279,7 @@ fuzzy_compare_hashes (fuzzy_hash_t *h1, fuzzy_hash_t *h2) } res = lev_distance (h1->hash_pipe, l1, h2->hash_pipe, l2); - res = (res * 100) / (l1 + l2); + res = 100 - (2 * res * 100) / (l1 + l2); return res; } diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index a0aa00e50..6abcbfa42 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -115,10 +115,10 @@ sync_cache (struct rspamd_worker *wrk) node = cur->data; if (now - node->time > expire) { /* Remove expired item */ + tmp = cur; cur = g_list_next (cur); - hashes->head = g_list_remove_link (hashes->head, cur); + g_queue_delete_link (hashes, tmp); g_free (node); - g_list_free1 (tmp); continue; } if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { @@ -218,6 +218,7 @@ process_check_command (struct fuzzy_cmd *cmd) GList *cur; struct rspamd_fuzzy_node *h; fuzzy_hash_t s; + int prob = 0; memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; @@ -226,11 +227,13 @@ process_check_command (struct fuzzy_cmd *cmd) /* XXX: too slow way */ while (cur) { h = cur->data; - if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) { + if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) { + msg_info ("process_check_command: fuzzy hash was found, probability %d%%", prob); return TRUE; } cur = g_list_next (cur); } + msg_info ("process_check_command: fuzzy hash was NOT found, prob is %d%%", prob); return FALSE; } @@ -246,6 +249,7 @@ process_write_command (struct fuzzy_cmd *cmd) h->time = (uint64_t)time (NULL); g_queue_push_head (hashes, h); mods ++; + msg_info ("process_write_command: fuzzy hash was successfully added"); return TRUE; } @@ -253,9 +257,10 @@ process_write_command (struct fuzzy_cmd *cmd) static gboolean process_delete_command (struct fuzzy_cmd *cmd) { - GList *cur; + GList *cur, *tmp; struct rspamd_fuzzy_node *h; fuzzy_hash_t s; + gboolean res = FALSE; memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; @@ -265,16 +270,19 @@ process_delete_command (struct fuzzy_cmd *cmd) while (cur) { h = cur->data; if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) { - hashes->head = g_list_remove_link (hashes->head, cur); g_free (h); - g_list_free1 (cur); + tmp = cur; + cur = g_list_next (cur); + g_queue_delete_link (hashes, tmp); + msg_info ("process_delete_command: fuzzy hash was successfully deleted"); + res = TRUE; mods ++; - return TRUE; + continue; } cur = g_list_next (cur); } - return FALSE; + return res; } #define CMD_PROCESS(x) \ @@ -346,7 +354,7 @@ fuzzy_io_callback (int fd, short what, void *arg) * Accept new connection and construct task */ static void -accept_socket (int fd, short what, void *arg) +accept_fuzzy_socket (int fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; struct sockaddr_storage ss; @@ -356,21 +364,21 @@ accept_socket (int fd, short what, void *arg) int nfd; if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { - msg_warn ("accept_socket: accept failed: %s", strerror (errno)); + msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno)); return; } /* Check for EAGAIN */ if (nfd == 0) { - msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker"); + msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker"); return; } if (ss.ss_family == AF_UNIX) { - msg_info ("accept_socket: accepted connection from unix socket"); + msg_info ("accept_fuzzy_socket: accepted connection from unix socket"); } else if (ss.ss_family == AF_INET) { sin = (struct sockaddr_in *) &ss; - msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); + msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); } session = g_malloc (sizeof (struct fuzzy_session)); @@ -439,7 +447,7 @@ start_fuzzy_storage (struct rspamd_worker *worker) evtimer_add (&tev, &tmv); /* Accept event */ - event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker); event_add(&worker->bind_ev, NULL); diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index e4fdaba93..c016530d0 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -46,6 +46,7 @@ #define DEFAULT_PORT 11335 struct storage_server { + struct upstream up; char *name; struct in_addr addr; uint16_t port; @@ -71,8 +72,10 @@ struct fuzzy_client_session { struct fuzzy_learn_session { struct event ev; fuzzy_hash_t *h; + int cmd; struct timeval tv; struct controller_session *session; + struct storage_server *server; struct worker_task *task; }; @@ -80,11 +83,13 @@ static struct fuzzy_ctx *fuzzy_module_ctx = NULL; static int fuzzy_mime_filter (struct worker_task *task); static void fuzzy_symbol_callback (struct worker_task *task, void *unused); +static void fuzzy_add_handler (char **args, struct controller_session *session); +static void fuzzy_delete_handler (char **args, struct controller_session *session); static void parse_servers_string (char *str) { - char **strvec, *p, portbuf[5], *name; + char **strvec, *p, portbuf[6], *name; int num, i, j, port; struct hostent *hent; struct in_addr addr; @@ -94,14 +99,15 @@ parse_servers_string (char *str) fuzzy_module_ctx->servers = memory_pool_alloc0 (fuzzy_module_ctx->fuzzy_pool, sizeof (struct storage_server) * num); - for (i = 0; i <= num; i ++) { + for (i = 0; i < num; i ++) { g_strstrip (strvec[i]); if ((p = strchr (strvec[i], ':')) != NULL) { j = 0; p ++; - while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) { - portbuf[j ++] = *p ++; + while (g_ascii_isdigit (*(p + j)) && j < sizeof (portbuf) - 1) { + portbuf[j] = *(p + j); + j ++; } portbuf[j] = '\0'; port = atoi (portbuf); @@ -110,8 +116,8 @@ parse_servers_string (char *str) /* Default http port */ port = DEFAULT_PORT; } - name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i] + 1); - g_strlcpy (name, strvec[i], p - strvec[i] + 1); + name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i]); + g_strlcpy (name, strvec[i], p - strvec[i]); if (!inet_aton (name, &addr)) { /* Resolve using dns */ hent = gethostbyname (name); @@ -162,21 +168,21 @@ fuzzy_check_module_config (struct config_file *cfg) struct metric *metric; double *w; - if ((value = get_module_opt (cfg, "fuzzy", "metric")) != NULL) { + if ((value = get_module_opt (cfg, "fuzzy_check", "metric")) != NULL) { fuzzy_module_ctx->metric = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value); g_free (value); } else { fuzzy_module_ctx->metric = DEFAULT_METRIC; } - if ((value = get_module_opt (cfg, "fuzzy", "symbol")) != NULL) { + if ((value = get_module_opt (cfg, "fuzzy_check", "symbol")) != NULL) { fuzzy_module_ctx->symbol = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value); g_free (value); } else { fuzzy_module_ctx->symbol = DEFAULT_SYMBOL; } - if ((value = get_module_opt (cfg, "fuzzy", "servers")) != NULL) { + if ((value = get_module_opt (cfg, "fuzzy_check", "servers")) != NULL) { parse_servers_string (value); } @@ -195,6 +201,9 @@ fuzzy_check_module_config (struct config_file *cfg) register_symbol (&metric->cache, fuzzy_module_ctx->symbol, *w, fuzzy_symbol_callback, NULL); } + register_custom_controller_command ("fuzzy_add", fuzzy_add_handler, TRUE, TRUE); + register_custom_controller_command ("fuzzy_del", fuzzy_delete_handler, TRUE, TRUE); + return res; } @@ -258,18 +267,19 @@ fuzzy_learn_callback (int fd, short what, void *arg) { struct fuzzy_learn_session *session = arg; struct fuzzy_cmd cmd; - char buf[sizeof ("ERR")]; + char buf[sizeof ("ERR" CRLF)]; + int r; if (what == EV_WRITE) { /* Send command to storage */ cmd.blocksize = session->h->block_size; memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash)); - cmd.cmd = FUZZY_WRITE; + cmd.cmd = session->cmd; if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) { goto err; } else { - event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session); + event_set (&session->ev, fd, EV_READ, fuzzy_learn_callback, session); event_add (&session->ev, &session->tv); } } @@ -277,24 +287,22 @@ fuzzy_learn_callback (int fd, short what, void *arg) if (read (fd, buf, sizeof (buf)) == -1) { goto err; } - else if (buf[0] == 'O' && buf[1] == 'K') { - insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL); - } goto ok; } return; err: - msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno)); + msg_err ("fuzzy_learn_callback: got error in IO with server %s:%d, %d, %s", session->server->name, + session->server->port, errno, strerror (errno)); ok: event_del (&session->ev); close (fd); session->task->save.saved --; if (session->task->save.saved == 0) { - /* Call other filters */ - session->task->save.saved = 1; - process_filters (session->task); + session->session->state = WRITE_REPLY; + r = snprintf (buf, sizeof (buf), "OK" CRLF); + rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE); } } @@ -344,8 +352,12 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) struct mime_text_part *part; struct storage_server *selected; GList *cur; - int sock, r; - + int sock, r, cmd = 0; + char out_buf[BUFSIZ]; + + if (session->other_data) { + cmd = GPOINTER_TO_SIZE (session->other_data); + } task = construct_task (session->worker); session->other_data = task; session->state = STATE_WAIT; @@ -381,17 +393,31 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in) s->task = task; s->h = part->fuzzy; s->session = session; + s->server = selected; + s->cmd = cmd; event_add (&s->ev, &s->tv); + task->save.saved ++; } } + else { + r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + session->state = WRITE_REPLY; + return; + } cur = g_list_next (cur); } } + if (task->save.saved == 0) { + r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF); + rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + session->state = WRITE_REPLY; + } } static void -fuzzy_controller_handler (char **args, struct controller_session *session) +fuzzy_controller_handler (char **args, struct controller_session *session, int cmd) { char *arg, out_buf[BUFSIZ], *err_str; uint32_t size; @@ -402,20 +428,34 @@ fuzzy_controller_handler (char **args, struct controller_session *session) msg_info ("fuzzy_controller_handler: empty content length"); r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + session->state = WRITE_REPLY; return; } size = strtoul (arg, &err_str, 10); if (err_str && *err_str != '\0') { - msg_debug ("process_command: statfile size is invalid: %s", arg); r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); + session->state = WRITE_REPLY; return; } session->state = STATE_OTHER; rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size); session->other_handler = fuzzy_process_handler; + session->other_data = GSIZE_TO_POINTER (cmd); +} + +static void +fuzzy_add_handler (char **args, struct controller_session *session) +{ + fuzzy_controller_handler (args, session, FUZZY_WRITE); +} + +static void +fuzzy_delete_handler (char **args, struct controller_session *session) +{ + fuzzy_controller_handler (args, session, FUZZY_DEL); } static int -- 2.39.5