]> source.dussan.org Git - rspamd.git/commitdiff
* Make fuzzy storage working (tested checking, adding and deleting of fuzzy hashes...
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 31 Jul 2009 15:42:21 +0000 (19:42 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 31 Jul 2009 15:42:21 +0000 (19:42 +0400)
* Fix stupid bug in fuzzy distance calculations

rspamc.pl.in
src/controller.c
src/fuzzy.c
src/fuzzy_storage.c
src/plugins/fuzzy_check.c

index ccbc73cc4657f5bac1ed5e5d7d1431966e90885e..4185039de20d4024087a194dedcb35de542481ef 100755 (executable)
@@ -188,6 +188,30 @@ sub do_control_command {
             print "Authentication failed\n";
         }
     }
+    elsif ($cfg{'command'} =~ /(fuzzy_add|fuzzy_del)/i) {
+        while (defined (my $line = <>)) {
+            $input .= $line;
+        }
+        
+        if (do_ctrl_auth ($sock)) {
+            my $len = length ($input);
+            print "Sending $len bytes...\n";
+            syswrite $sock, $cfg{'command'} . " $len" . $CRLF;
+            syswrite $sock, $input . $CRLF;
+            if (defined (my $reply = <$sock>)) {
+                if ($reply =~ /^OK/) {
+                    print $cfg{'command'} . " succeed\n";
+                }
+                else {
+                    print $cfg{'command'} . " failed\n";
+                }
+            }
+        }
+        else {
+            print "Authentication failed\n";
+        }
+    
+    }
     else {
         syswrite $sock, $cfg{'command'} . $CRLF;
         while (defined (my $line = <$sock>)) {
@@ -240,7 +264,7 @@ if ($cmd =~ /(SYMBOLS|SCAN|PROCESS|CHECK|REPORT_IFSPAM|REPORT|URLS|EMAILS)/i) {
     $cfg{'command'} = $1;
     $cfg{'control'} = 0;
 }
-elsif ($cmd =~ /(STAT|LEARN|SHUTDOWN|RELOAD|UPTIME|COUNTERS)/i) {
+elsif ($cmd =~ /(STAT|LEARN|SHUTDOWN|RELOAD|UPTIME|COUNTERS|FUZZY_ADD|FUZZY_DEL)/i) {
     $cfg{'command'} = $1;
     $cfg{'control'} = 1;
 }
index a4c0a7bb482a4093f67c269a774a84f059c45e1a..30daa26fc0f497f66e969859d7d8c611fc3f5d6a 100644 (file)
@@ -466,7 +466,7 @@ controller_read_socket (f_str_t *in, void *arg)
                        if (session->state == STATE_COMMAND) {
                                session->state = STATE_REPLY;
                        }
-                       if (session->state != STATE_LEARN) {
+                       if (session->state != STATE_LEARN && session->state != STATE_OTHER) {
                                rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE);
                        }
 
@@ -639,7 +639,8 @@ void
 register_custom_controller_command (const char *name, controller_func_t handler, gboolean privilleged, gboolean require_message)
 {
        struct custom_controller_command *cmd;
-
+       
+       cmd = g_malloc (sizeof (struct custom_controller_command));
        cmd->command = name;
        cmd->handler = handler;
        cmd->privilleged = privilleged;
index 9e49649aa754ce288743d9a09cbd168bc4c336c6..cb595a9ab04fd57fbc58cd334f5dd7632c19c0dc 100644 (file)
@@ -268,7 +268,7 @@ fuzzy_compare_hashes (fuzzy_hash_t *h1, fuzzy_hash_t *h2)
 
        /* If we have hashes of different size, input strings are too different */
        if (h1->block_size != h2->block_size) {
-               return 100;
+               return 0;
        }
        
        l1 = strlen (h1->hash_pipe);
@@ -279,7 +279,7 @@ fuzzy_compare_hashes (fuzzy_hash_t *h1, fuzzy_hash_t *h2)
        }
 
        res = lev_distance (h1->hash_pipe, l1, h2->hash_pipe, l2);
-       res = (res * 100) / (l1 + l2);
+       res = 100 - (2 * res * 100) / (l1 + l2);
 
        return res;
 }
index a0aa00e500e1f73e6cfe12a48ab3825b5036b6fb..6abcbfa4201001b2fad511e805a68f612076a530 100644 (file)
@@ -115,10 +115,10 @@ sync_cache (struct rspamd_worker *wrk)
                node = cur->data;
                if (now - node->time > expire) {
                        /* Remove expired item */
+                       tmp = cur;
                        cur = g_list_next (cur);
-                       hashes->head = g_list_remove_link (hashes->head, cur);
+                       g_queue_delete_link (hashes, tmp);
                        g_free (node);
-                       g_list_free1 (tmp);
                        continue;
                }
                if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
@@ -218,6 +218,7 @@ process_check_command (struct fuzzy_cmd *cmd)
        GList *cur;
        struct rspamd_fuzzy_node *h;
        fuzzy_hash_t s;
+       int prob = 0;
        
        memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
        s.block_size = cmd->blocksize;
@@ -226,11 +227,13 @@ process_check_command (struct fuzzy_cmd *cmd)
        /* XXX: too slow way */
        while (cur) {
                h = cur->data;
-               if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) {
+               if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) {
+                       msg_info ("process_check_command: fuzzy hash was found, probability %d%%", prob);
                        return TRUE;
                }
                cur = g_list_next (cur);
        }
+       msg_info ("process_check_command: fuzzy hash was NOT found, prob is %d%%", prob);
 
        return FALSE;
 }
@@ -246,6 +249,7 @@ process_write_command (struct fuzzy_cmd *cmd)
        h->time = (uint64_t)time (NULL);
        g_queue_push_head (hashes, h);
        mods ++;
+       msg_info ("process_write_command: fuzzy hash was successfully added");
        
        return TRUE;
 }
@@ -253,9 +257,10 @@ process_write_command (struct fuzzy_cmd *cmd)
 static gboolean
 process_delete_command (struct fuzzy_cmd *cmd)
 {
-       GList *cur;
+       GList *cur, *tmp;
        struct rspamd_fuzzy_node *h;
        fuzzy_hash_t s;
+       gboolean res = FALSE;
        
        memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
        s.block_size = cmd->blocksize;
@@ -265,16 +270,19 @@ process_delete_command (struct fuzzy_cmd *cmd)
        while (cur) {
                h = cur->data;
                if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) {
-                       hashes->head = g_list_remove_link (hashes->head, cur);
                        g_free (h);
-                       g_list_free1 (cur);
+                       tmp = cur;
+                       cur = g_list_next (cur);
+                       g_queue_delete_link (hashes, tmp);
+                       msg_info ("process_delete_command: fuzzy hash was successfully deleted");
+                       res = TRUE;
                        mods ++;
-                       return TRUE;
+                       continue;
                }
                cur = g_list_next (cur);
        }
 
-       return FALSE;
+       return res;
 }
 
 #define CMD_PROCESS(x)                                                                                                                                                 \
@@ -346,7 +354,7 @@ fuzzy_io_callback (int fd, short what, void *arg)
  * Accept new connection and construct task
  */
 static void
-accept_socket (int fd, short what, void *arg)
+accept_fuzzy_socket (int fd, short what, void *arg)
 {
        struct rspamd_worker *worker = (struct rspamd_worker *)arg;
        struct sockaddr_storage ss;
@@ -356,21 +364,21 @@ accept_socket (int fd, short what, void *arg)
        int nfd;
        
        if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
-               msg_warn ("accept_socket: accept failed: %s", strerror (errno));
+               msg_warn ("accept_fuzzy_socket: accept failed: %s", strerror (errno));
                return;
        }
        /* Check for EAGAIN */
        if (nfd == 0) {
-               msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
+               msg_debug ("accept_fuzzy_socket: cannot accept socket as it was already accepted by other worker");
                return;
        }
 
     if (ss.ss_family == AF_UNIX) {
-        msg_info ("accept_socket: accepted connection from unix socket");
+        msg_info ("accept_fuzzy_socket: accepted connection from unix socket");
     }
     else if (ss.ss_family == AF_INET) {
         sin = (struct sockaddr_in *) &ss;
-        msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+        msg_info ("accept_fuzzy_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
     }
        
        session = g_malloc (sizeof (struct fuzzy_session));
@@ -439,7 +447,7 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        evtimer_add (&tev, &tmv);
 
        /* Accept event */
-       event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
        event_add(&worker->bind_ev, NULL);
 
 
index e4fdaba9363fb401c1db467c80f2a21f08ae4f34..c016530d096c447b5f0611a9c6f592519fac503b 100644 (file)
@@ -46,6 +46,7 @@
 #define DEFAULT_PORT 11335
 
 struct storage_server {
+       struct upstream up;
        char *name;
        struct in_addr addr;
        uint16_t port;
@@ -71,8 +72,10 @@ struct fuzzy_client_session {
 struct fuzzy_learn_session {
        struct event ev;
        fuzzy_hash_t *h;
+       int cmd;
        struct timeval tv;
        struct controller_session *session;
+       struct storage_server *server;
        struct worker_task *task;
 };
 
@@ -80,11 +83,13 @@ static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
 
 static int fuzzy_mime_filter (struct worker_task *task);
 static void fuzzy_symbol_callback (struct worker_task *task, void *unused);
+static void fuzzy_add_handler (char **args, struct controller_session *session);
+static void fuzzy_delete_handler (char **args, struct controller_session *session);
 
 static void
 parse_servers_string (char *str)
 {
-       char **strvec, *p, portbuf[5], *name;
+       char **strvec, *p, portbuf[6], *name;
        int num, i, j, port;
        struct hostent *hent;
        struct in_addr addr;
@@ -94,14 +99,15 @@ parse_servers_string (char *str)
 
        fuzzy_module_ctx->servers = memory_pool_alloc0 (fuzzy_module_ctx->fuzzy_pool, sizeof (struct storage_server) * num);
 
-       for (i = 0; i <= num; i ++) {
+       for (i = 0; i < num; i ++) {
                g_strstrip (strvec[i]);
 
                if ((p = strchr (strvec[i], ':')) != NULL) {
                        j = 0;
                        p ++;
-                       while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
-                               portbuf[j ++] = *p ++;
+                       while (g_ascii_isdigit (*(p + j)) && j < sizeof (portbuf) - 1) {
+                               portbuf[j] = *(p + j);
+                               j ++;
                        }
                        portbuf[j] = '\0';
                        port = atoi (portbuf);
@@ -110,8 +116,8 @@ parse_servers_string (char *str)
                        /* Default http port */
                        port = DEFAULT_PORT;
                }
-               name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i] + 1);
-               g_strlcpy (name, strvec[i], p - strvec[i] + 1);
+               name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i]);
+               g_strlcpy (name, strvec[i], p - strvec[i]);
                if (!inet_aton (name, &addr)) {
                        /* Resolve using dns */
                        hent = gethostbyname (name);
@@ -162,21 +168,21 @@ fuzzy_check_module_config (struct config_file *cfg)
        struct metric *metric;
        double *w;
 
-       if ((value = get_module_opt (cfg, "fuzzy", "metric")) != NULL) {
+       if ((value = get_module_opt (cfg, "fuzzy_check", "metric")) != NULL) {
                fuzzy_module_ctx->metric = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
                g_free (value);
        }
        else {
                fuzzy_module_ctx->metric = DEFAULT_METRIC;
        }
-       if ((value = get_module_opt (cfg, "fuzzy", "symbol")) != NULL) {
+       if ((value = get_module_opt (cfg, "fuzzy_check", "symbol")) != NULL) {
                fuzzy_module_ctx->symbol = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
                g_free (value);
        }
        else {
                fuzzy_module_ctx->symbol = DEFAULT_SYMBOL;
        }
-       if ((value = get_module_opt (cfg, "fuzzy", "servers")) != NULL) {
+       if ((value = get_module_opt (cfg, "fuzzy_check", "servers")) != NULL) {
                parse_servers_string (value);
        }       
 
@@ -195,6 +201,9 @@ fuzzy_check_module_config (struct config_file *cfg)
                register_symbol (&metric->cache, fuzzy_module_ctx->symbol, *w, fuzzy_symbol_callback, NULL);
        }
 
+       register_custom_controller_command ("fuzzy_add", fuzzy_add_handler, TRUE, TRUE);
+       register_custom_controller_command ("fuzzy_del", fuzzy_delete_handler, TRUE, TRUE);
+       
        return res;
 }
 
@@ -258,18 +267,19 @@ fuzzy_learn_callback (int fd, short what, void *arg)
 {
        struct fuzzy_learn_session *session = arg;
        struct fuzzy_cmd cmd;
-       char buf[sizeof ("ERR")];
+       char buf[sizeof ("ERR" CRLF)];
+       int r;
 
        if (what == EV_WRITE) {
                /* Send command to storage */
                cmd.blocksize = session->h->block_size;
                memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
-               cmd.cmd = FUZZY_WRITE;
+               cmd.cmd = session->cmd;
                if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
                        goto err;
                }
                else {
-                       event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+                       event_set (&session->ev, fd, EV_READ, fuzzy_learn_callback, session);
                        event_add (&session->ev, &session->tv);
                }
        }
@@ -277,24 +287,22 @@ fuzzy_learn_callback (int fd, short what, void *arg)
                if (read (fd, buf, sizeof (buf)) == -1) {
                        goto err;
                }
-               else if (buf[0] == 'O' && buf[1] == 'K') {
-                       insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
-               }
                goto ok;
        }
        
        return;
 
        err:
-               msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+               msg_err ("fuzzy_learn_callback: got error in IO with server %s:%d, %d, %s", session->server->name,
+                                       session->server->port, errno, strerror (errno));
        ok:
                event_del (&session->ev);
                close (fd);
                session->task->save.saved --;
                if (session->task->save.saved == 0) {
-                       /* Call other filters */
-                       session->task->save.saved = 1;
-                       process_filters (session->task);
+                       session->session->state = WRITE_REPLY;
+                       r = snprintf (buf, sizeof (buf), "OK" CRLF);
+                       rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
                }
 }
 
@@ -344,8 +352,12 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
        struct mime_text_part *part;
        struct storage_server *selected;
        GList *cur;
-       int sock, r;
-
+       int sock, r, cmd = 0;
+       char out_buf[BUFSIZ];
+       
+       if (session->other_data) {
+               cmd = GPOINTER_TO_SIZE (session->other_data);
+       }
        task = construct_task (session->worker);
        session->other_data = task;
        session->state = STATE_WAIT;
@@ -381,17 +393,31 @@ fuzzy_process_handler (struct controller_session *session, f_str_t *in)
                                        s->task = task;
                                        s->h = part->fuzzy;
                                        s->session = session;
+                                       s->server = selected;
+                                       s->cmd = cmd;
                                        event_add (&s->ev, &s->tv);
+                                       task->save.saved ++;
                                }
                        }
+                       else {
+                               r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+                               session->state = WRITE_REPLY;
+                               return;
+                       }
                        cur = g_list_next (cur);
                }
        }
 
+       if (task->save.saved == 0) {
+               r = snprintf (out_buf, sizeof (out_buf), "no hashes written" CRLF);
+               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+               session->state = WRITE_REPLY;
+       }
 }
 
 static void
-fuzzy_controller_handler (char **args, struct controller_session *session)
+fuzzy_controller_handler (char **args, struct controller_session *session, int cmd)
 {
        char *arg, out_buf[BUFSIZ], *err_str;
        uint32_t size;
@@ -402,20 +428,34 @@ fuzzy_controller_handler (char **args, struct controller_session *session)
                msg_info ("fuzzy_controller_handler: empty content length");
                r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
                rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+               session->state = WRITE_REPLY;
                return;
        }
 
        size = strtoul (arg, &err_str, 10);
        if (err_str && *err_str != '\0') {
-               msg_debug ("process_command: statfile size is invalid: %s", arg);
                r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
                rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
+               session->state = WRITE_REPLY;
                return;
        }
 
        session->state = STATE_OTHER;
        rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
        session->other_handler = fuzzy_process_handler;
+       session->other_data = GSIZE_TO_POINTER (cmd);
+}
+
+static void
+fuzzy_add_handler (char **args, struct controller_session *session)
+{
+       fuzzy_controller_handler (args, session, FUZZY_WRITE);
+}
+
+static void
+fuzzy_delete_handler (char **args, struct controller_session *session)
+{
+       fuzzy_controller_handler (args, session, FUZZY_DEL);
 }
 
 static int