From 341a236aa614d66bd76764e3eb315b6df0688ef5 Mon Sep 17 00:00:00 2001 From: "cebka@lenovo-laptop" Date: Mon, 8 Feb 2010 19:23:53 +0300 Subject: [PATCH] * Add ability to add weight for fuzzy hashes, this can be very useful for autolearning fuzzy storage by users --- rspamc.pl.in | 20 ++++++++---- src/fuzzy_storage.c | 52 +++++++++++++++++++++++++++--- src/fuzzy_storage.h | 1 + src/plugins/fuzzy_check.c | 68 ++++++++++++++++++++++++++++++++------- 4 files changed, 119 insertions(+), 22 deletions(-) diff --git a/rspamc.pl.in b/rspamc.pl.in index ad2de90b8..40fad293b 100755 --- a/rspamc.pl.in +++ b/rspamc.pl.in @@ -22,6 +22,7 @@ my %cfg = ( 'control' => 0, 'statfile' => '', 'deliver_to'=> '', + 'weight' => 1, ); $main::VERSION = '@RSPAMD_VERSION@'; @@ -35,7 +36,8 @@ Usage: rspamc.pl [-h host] [-p port] [-P password] [-c conf_file] [-s statfile] -c config file to parse -s statfile to use for learn commands -d define deliver-to header -imap format: imap:user::password::host::mbox: +-w define weight for fuzzy operations +imap format: imap:user::password:[]:host::mbox: Version: @RSPAMD_VERSION@ EOD }; @@ -146,16 +148,19 @@ sub do_rspamc_command { sub do_ctrl_auth { my ($sock) = @_; + my $res = 0; syswrite $sock, "password $cfg{'password'}" . $CRLF; if (defined (my $reply = <$sock>)) { - my $end = <$sock>; if ($reply =~ /^password accepted/) { - return 1; + $res = 1; } } + + # END + return 0 unless <$sock>; - return 0; + return $res; } sub do_control_command { @@ -205,7 +210,7 @@ sub do_control_command { if (do_ctrl_auth ($sock)) { my $len = length ($input); print "Sending $len bytes...\n"; - syswrite $sock, $cfg{'command'} . " $len" . $CRLF; + syswrite $sock, $cfg{'command'} . " $len $cfg{'weight'}" . $CRLF; syswrite $sock, $input . $CRLF; if (defined (my $reply = <$sock>)) { if ($reply =~ /^OK/) { @@ -436,7 +441,7 @@ sub do_cmd { ############################# Main part ########################################### my %args; -getopt('c:h:p:P:s:d:', \%args); +getopt('c:h:p:P:s:d:w:', \%args); my $cmd = shift; my @path = shift; my $do_parse_config = 1; @@ -480,6 +485,9 @@ if (defined ($args{P})) { if (defined ($args{d})) { $cfg{'deliver_to'} = $args{d}; } +if (defined ($args{w})) { + $cfg{'weight'} = $args{w}; +} if ($cmd =~ /(SYMBOLS|SCAN|PROCESS|CHECK|REPORT_IFSPAM|REPORT|URLS|EMAILS)/i) { $cfg{'command'} = $1; diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index e05a479f6..c3a29e17f 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -62,6 +62,7 @@ static struct timeval tmv; static struct event tev; struct rspamd_fuzzy_node { + int32_t value; fuzzy_hash_t h; uint64_t time; }; @@ -239,7 +240,7 @@ read_hashes_file (struct rspamd_worker *wrk) return TRUE; } -static gboolean +static int process_check_command (struct fuzzy_cmd *cmd) { GList *cur; @@ -248,7 +249,7 @@ process_check_command (struct fuzzy_cmd *cmd) int prob = 0; if (!bloom_check (bf, cmd->hash)) { - return FALSE; + return 0; } memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); @@ -260,12 +261,38 @@ process_check_command (struct fuzzy_cmd *cmd) h = cur->data; if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) { msg_info ("fuzzy hash was found, probability %d%%", prob); - return TRUE; + return h->value; } cur = g_list_next (cur); } msg_debug ("fuzzy hash was NOT found, prob is %d%%", prob); + return 0; +} + +static gboolean +update_hash (struct fuzzy_cmd *cmd) +{ + GList *cur; + struct rspamd_fuzzy_node *h; + fuzzy_hash_t s; + int prob = 0; + + memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); + s.block_size = cmd->blocksize; + cur = hashes[cmd->blocksize % BUCKETS]->head; + + /* XXX: too slow way */ + while (cur) { + h = cur->data; + if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) { + h->value += cmd->value; + msg_info ("fuzzy hash was found, probability %d%%, set new value to %d", prob, h->value); + return TRUE; + } + cur = g_list_next (cur); + } + return FALSE; } @@ -275,7 +302,9 @@ process_write_command (struct fuzzy_cmd *cmd) struct rspamd_fuzzy_node *h; if (bloom_check (bf, cmd->hash)) { - return FALSE; + if (update_hash (cmd)) { + return TRUE; + } } h = g_malloc (sizeof (struct rspamd_fuzzy_node)); @@ -343,9 +372,22 @@ else { \ static void process_fuzzy_command (struct fuzzy_session *session) { + int r; + char buf[64]; + switch (session->cmd.cmd) { case FUZZY_CHECK: - CMD_PROCESS (check); + if ((r = process_check_command (&session->cmd))) { + r = snprintf (buf, sizeof (buf), "OK %d" CRLF, r); + if (sendto (session->fd, buf, r, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { + msg_err ("error while writing reply: %s", strerror (errno)); + } + } + else { + if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { + msg_err ("error while writing reply: %s", strerror (errno)); + } + } break; case FUZZY_WRITE: CMD_PROCESS (write); diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h index 1630171e4..aa3d50133 100644 --- a/src/fuzzy_storage.h +++ b/src/fuzzy_storage.h @@ -13,6 +13,7 @@ struct fuzzy_cmd { u_char cmd; uint32_t blocksize; + int32_t value; u_char hash[FUZZY_HASHLEN]; }; diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 2ac7475bf..987a2cb40 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -74,6 +74,7 @@ struct fuzzy_learn_session { struct event ev; fuzzy_hash_t *h; int cmd; + int value; int *saved; struct timeval tv; struct controller_session *session; @@ -218,6 +219,7 @@ fuzzy_check_module_reconfig (struct config_file *cfg) return fuzzy_check_module_config (cfg); } +/* Finalize IO */ static void fuzzy_io_fin (void *ud) { @@ -232,12 +234,14 @@ fuzzy_io_fin (void *ud) } } +/* Call this whenever we got data from fuzzy storage */ static void fuzzy_io_callback (int fd, short what, void *arg) { struct fuzzy_client_session *session = arg; struct fuzzy_cmd cmd; - char buf[sizeof ("ERR")]; + char buf[62], *err_str; + int value; if (what == EV_WRITE) { /* Send command to storage */ @@ -253,11 +257,16 @@ fuzzy_io_callback (int fd, short what, void *arg) } } else if (what == EV_READ) { + /* Got reply */ if (read (fd, buf, sizeof (buf)) == -1) { goto err; } else if (buf[0] == 'O' && buf[1] == 'K') { - insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL); + /* Now try to get value */ + value = strtol (buf + 3, &err_str, 10); + *err_str = '\0'; + insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, value, g_list_prepend (NULL, + memory_pool_strdup (session->task->task_pool, buf + 3))); } goto ok; } @@ -289,13 +298,15 @@ fuzzy_learn_callback (int fd, short what, void *arg) { struct fuzzy_learn_session *session = arg; struct fuzzy_cmd cmd; - char buf[sizeof ("ERR" CRLF)]; + char buf[64]; + int r; if (what == EV_WRITE) { /* Send command to storage */ cmd.blocksize = session->h->block_size; memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash)); cmd.cmd = session->cmd; + cmd.value = session->value; if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) { goto err; } @@ -308,18 +319,26 @@ fuzzy_learn_callback (int fd, short what, void *arg) if (read (fd, buf, sizeof (buf)) == -1) { goto err; } - goto ok; + else if (buf[0] == 'O' && buf[1] == 'K') { + r = snprintf (buf, sizeof (buf), "OK" CRLF); + rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE); + goto ok; + } + goto err; } return; err: msg_err ("got error in IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno)); + r = snprintf (buf, sizeof (buf), "Error" CRLF); + rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE); ok: close (fd); remove_normal_event (session->session->s, fuzzy_learn_fin, session); } +/* This callback is called when we check message via fuzzy hashes storage */ static void fuzzy_symbol_callback (struct worker_task *task, void *unused) { @@ -337,6 +356,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused) cur = g_list_next (cur); continue; } + /* Get upstream */ selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num, sizeof (struct storage_server), task->ts.tv_sec, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); @@ -345,6 +365,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused) msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); } else { + /* Create session for a socket */ session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session)); event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session); session->tv.tv_sec = IO_TIMEOUT; @@ -370,19 +391,26 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) struct mime_text_part *part; struct storage_server *selected; GList *cur; - int sock, r, cmd = 0, *saved; + int sock, r, cmd = 0, value = 0, *saved, *sargs; char out_buf[BUFSIZ]; + /* Extract arguments */ if (session->other_data) { - cmd = GPOINTER_TO_SIZE (session->other_data); + sargs = session->other_data; + cmd = sargs[0]; + value = sargs[1]; } + + /* Prepare task */ task = construct_task (session->worker); session->other_data = task; session->state = STATE_WAIT; - + + /* Allocate message from string */ task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); task->msg->begin = in->begin; task->msg->len = in->len; + saved = memory_pool_alloc0 (session->session_pool, sizeof (int)); r = process_message (task); @@ -404,10 +432,12 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) cur = g_list_next (cur); continue; } + /* Get upstream */ selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num, sizeof (struct storage_server), task->ts.tv_sec, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); if (selected) { + /* Create UDP socket */ if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); session->state = STATE_REPLY; @@ -417,6 +447,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) return; } else { + /* Socket is made, create session */ s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session)); event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s); s->tv.tv_sec = IO_TIMEOUT; @@ -427,6 +458,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) s->session = session; s->server = selected; s->cmd = cmd; + s->value = value; s->saved = saved; event_add (&s->ev, &s->tv); (*saved)++; @@ -434,6 +466,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in) } } else { + /* Cannot write hash */ session->state = STATE_REPLY; r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF); rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE); @@ -457,9 +490,10 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c { char *arg, out_buf[BUFSIZ], *err_str; uint32_t size; - int r; + int r, value, *sargs; - arg = *args; + /* Process size */ + arg = args[0]; if (!arg || *arg == '\0') { msg_info ("empty content length"); r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF); @@ -467,7 +501,6 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c session->state = STATE_REPLY; return; } - size = strtoul (arg, &err_str, 10); if (err_str && *err_str != '\0') { r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF); @@ -475,11 +508,24 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c session->state = STATE_REPLY; return; } + /* Process value */ + arg = args[1]; + if (!arg || *arg == '\0') { + msg_info ("empty value, assume it 1"); + value = 1; + } + else { + value = strtol (arg, &err_str, 10); + } session->state = STATE_OTHER; rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size); session->other_handler = fuzzy_process_handler; - session->other_data = GSIZE_TO_POINTER (cmd); + /* Prepare args */ + sargs = memory_pool_alloc (session->session_pool, sizeof (int) * 2); + sargs[0] = cmd; + sargs[1] = value; + session->other_data = sargs; } static void -- 2.39.5