summaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
authorcebka@lenovo-laptop <cebka@lenovo-laptop>2010-02-08 19:23:53 +0300
committercebka@lenovo-laptop <cebka@lenovo-laptop>2010-02-08 19:23:53 +0300
commit341a236aa614d66bd76764e3eb315b6df0688ef5 (patch)
tree8db27485dbf87bc338cf311d4b6c70070774a618 /src/plugins
parent10c8ad2246130d77b19ee7036e3f0a74c47425a1 (diff)
downloadrspamd-341a236aa614d66bd76764e3eb315b6df0688ef5.tar.gz
rspamd-341a236aa614d66bd76764e3eb315b6df0688ef5.zip
* Add ability to add weight for fuzzy hashes, this can be very useful for autolearning fuzzy storage by users
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/fuzzy_check.c68
1 files changed, 57 insertions, 11 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 2ac7475bf..987a2cb40 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -74,6 +74,7 @@ struct fuzzy_learn_session {
struct event ev;
fuzzy_hash_t *h;
int cmd;
+ int value;
int *saved;
struct timeval tv;
struct controller_session *session;
@@ -218,6 +219,7 @@ fuzzy_check_module_reconfig (struct config_file *cfg)
return fuzzy_check_module_config (cfg);
}
+/* Finalize IO */
static void
fuzzy_io_fin (void *ud)
{
@@ -232,12 +234,14 @@ fuzzy_io_fin (void *ud)
}
}
+/* Call this whenever we got data from fuzzy storage */
static void
fuzzy_io_callback (int fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR")];
+ char buf[62], *err_str;
+ int value;
if (what == EV_WRITE) {
/* Send command to storage */
@@ -253,11 +257,16 @@ fuzzy_io_callback (int fd, short what, void *arg)
}
}
else if (what == EV_READ) {
+ /* Got reply */
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
else if (buf[0] == 'O' && buf[1] == 'K') {
- insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+ /* Now try to get value */
+ value = strtol (buf + 3, &err_str, 10);
+ *err_str = '\0';
+ insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, value, g_list_prepend (NULL,
+ memory_pool_strdup (session->task->task_pool, buf + 3)));
}
goto ok;
}
@@ -289,13 +298,15 @@ fuzzy_learn_callback (int fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR" CRLF)];
+ char buf[64];
+ int r;
if (what == EV_WRITE) {
/* Send command to storage */
cmd.blocksize = session->h->block_size;
memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
cmd.cmd = session->cmd;
+ cmd.value = session->value;
if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
goto err;
}
@@ -308,18 +319,26 @@ fuzzy_learn_callback (int fd, short what, void *arg)
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
- goto ok;
+ else if (buf[0] == 'O' && buf[1] == 'K') {
+ r = snprintf (buf, sizeof (buf), "OK" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
+ goto ok;
+ }
+ goto err;
}
return;
err:
msg_err ("got error in IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
+ r = snprintf (buf, sizeof (buf), "Error" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
ok:
close (fd);
remove_normal_event (session->session->s, fuzzy_learn_fin, session);
}
+/* This callback is called when we check message via fuzzy hashes storage */
static void
fuzzy_symbol_callback (struct worker_task *task, void *unused)
{
@@ -337,6 +356,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
cur = g_list_next (cur);
continue;
}
+ /* Get upstream */
selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
sizeof (struct storage_server), task->ts.tv_sec,
DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
@@ -345,6 +365,7 @@ fuzzy_symbol_callback (struct worker_task *task, void *unused)
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
}
else {
+ /* Create session for a socket */
session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session));
event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session);
session->tv.tv_sec = IO_TIMEOUT;
@@ -370,19 +391,26 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
struct mime_text_part *part;
struct storage_server *selected;
GList *cur;
- int sock, r, cmd = 0, *saved;
+ int sock, r, cmd = 0, value = 0, *saved, *sargs;
char out_buf[BUFSIZ];
+ /* Extract arguments */
if (session->other_data) {
- cmd = GPOINTER_TO_SIZE (session->other_data);
+ sargs = session->other_data;
+ cmd = sargs[0];
+ value = sargs[1];
}
+
+ /* Prepare task */
task = construct_task (session->worker);
session->other_data = task;
session->state = STATE_WAIT;
-
+
+ /* Allocate message from string */
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+
saved = memory_pool_alloc0 (session->session_pool, sizeof (int));
r = process_message (task);
@@ -404,10 +432,12 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
cur = g_list_next (cur);
continue;
}
+ /* Get upstream */
selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
sizeof (struct storage_server), task->ts.tv_sec,
DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
+ /* Create UDP socket */
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
session->state = STATE_REPLY;
@@ -417,6 +447,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
return;
}
else {
+ /* Socket is made, create session */
s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session));
event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
s->tv.tv_sec = IO_TIMEOUT;
@@ -427,6 +458,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
s->session = session;
s->server = selected;
s->cmd = cmd;
+ s->value = value;
s->saved = saved;
event_add (&s->ev, &s->tv);
(*saved)++;
@@ -434,6 +466,7 @@ fuzzy_process_handler (struct controller_session *session, f_str_t * in)
}
}
else {
+ /* Cannot write hash */
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
@@ -457,9 +490,10 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
{
char *arg, out_buf[BUFSIZ], *err_str;
uint32_t size;
- int r;
+ int r, value, *sargs;
- arg = *args;
+ /* Process size */
+ arg = args[0];
if (!arg || *arg == '\0') {
msg_info ("empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
@@ -467,7 +501,6 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
session->state = STATE_REPLY;
return;
}
-
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
@@ -475,11 +508,24 @@ fuzzy_controller_handler (char **args, struct controller_session *session, int c
session->state = STATE_REPLY;
return;
}
+ /* Process value */
+ arg = args[1];
+ if (!arg || *arg == '\0') {
+ msg_info ("empty value, assume it 1");
+ value = 1;
+ }
+ else {
+ value = strtol (arg, &err_str, 10);
+ }
session->state = STATE_OTHER;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
session->other_handler = fuzzy_process_handler;
- session->other_data = GSIZE_TO_POINTER (cmd);
+ /* Prepare args */
+ sargs = memory_pool_alloc (session->session_pool, sizeof (int) * 2);
+ sargs[0] = cmd;
+ sargs[1] = value;
+ session->other_data = sargs;
}
static void