summaryrefslogtreecommitdiffstats
path: root/src/plugins/fuzzy_check.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-12-22 15:19:59 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-12-22 15:19:59 +0000
commit18984aadd0ca1dc0906a7c89c22c8a6cf5467717 (patch)
treeee69895198b5fbe2c5fb990c5a581f77c633c56b /src/plugins/fuzzy_check.c
parent324701717d9cb8f97f4e2565dde4fdd129e245c4 (diff)
downloadrspamd-18984aadd0ca1dc0906a7c89c22c8a6cf5467717.tar.gz
rspamd-18984aadd0ca1dc0906a7c89c22c8a6cf5467717.zip
Rewrite fuzzy check plugin.
Diffstat (limited to 'src/plugins/fuzzy_check.c')
-rw-r--r--src/plugins/fuzzy_check.c639
1 files changed, 307 insertions, 332 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index e2ef52d4b..a6c22096d 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -48,6 +48,7 @@
#include "utlist.h"
#include "main.h"
#include "blake2.h"
+#include "ottery.h"
#define DEFAULT_SYMBOL "R_FUZZY_HASH"
#define DEFAULT_UPSTREAM_ERROR_TIME 10
@@ -97,7 +98,7 @@ struct fuzzy_ctx {
struct fuzzy_client_session {
gint state;
- rspamd_fuzzy_t *h;
+ GPtrArray *commands;
struct event ev;
struct timeval tv;
struct rspamd_task *task;
@@ -108,13 +109,9 @@ struct fuzzy_client_session {
struct fuzzy_learn_session {
struct event ev;
- rspamd_fuzzy_t *h;
- gint cmd;
- gint value;
- gint flag;
+ GPtrArray *commands;
gint *saved;
GError **err;
- struct fuzzy_mapping *map;
struct timeval tv;
struct rspamd_http_connection_entry *http_entry;
struct upstream *server;
@@ -492,6 +489,9 @@ fuzzy_io_fin (void *ud)
{
struct fuzzy_client_session *session = ud;
+ if (session->commands) {
+ g_ptr_array_free (session->commands, FALSE);
+ }
event_del (&session->ev);
close (session->fd);
}
@@ -519,24 +519,16 @@ fuzzy_cmd_from_text_part (struct fuzzy_rule *rule,
if (legacy || part->words == NULL) {
cmd = rspamd_mempool_alloc0 (pool, sizeof (*cmd));
- cmd->cmd = c;
- cmd->version = RSPAMD_FUZZY_VERSION;
- if (c != FUZZY_CHECK) {
- cmd->flag = flag;
- cmd->value = weight;
- }
+
cmd->shingles_count = 0;
rspamd_strlcpy (cmd->digest, part->fuzzy->hash_pipe, sizeof (cmd->digest));
- *size = sizeof (struct rspamd_fuzzy_cmd);
+
+ if (size != NULL) {
+ *size = sizeof (struct rspamd_fuzzy_cmd);
+ }
}
else {
shcmd = rspamd_mempool_alloc0 (pool, sizeof (*shcmd));
- shcmd->basic.cmd = c;
- shcmd->basic.version = RSPAMD_FUZZY_VERSION;
- if (c != FUZZY_CHECK) {
- shcmd->basic.flag = flag;
- shcmd->basic.value = weight;
- }
/*
* Generate hash from all words in the part
@@ -557,7 +549,18 @@ fuzzy_cmd_from_text_part (struct fuzzy_rule *rule,
}
cmd = (struct rspamd_fuzzy_cmd *)shcmd;
- *size = sizeof (struct rspamd_fuzzy_shingle_cmd);
+
+ if (size != NULL) {
+ *size = sizeof (struct rspamd_fuzzy_shingle_cmd);
+ }
+ }
+
+ cmd->tag = ottery_rand_uint32 ();
+ cmd->cmd = c;
+ cmd->version = RSPAMD_FUZZY_VERSION;
+ if (c != FUZZY_CHECK) {
+ cmd->flag = flag;
+ cmd->value = weight;
}
return cmd;
@@ -584,6 +587,7 @@ fuzzy_cmd_from_data_part (struct fuzzy_rule *rule,
cmd->value = weight;
}
cmd->shingles_count = 0;
+ cmd->tag = ottery_rand_uint32 ();
if (legacy) {
GChecksum *cksum;
@@ -603,7 +607,9 @@ fuzzy_cmd_from_data_part (struct fuzzy_rule *rule,
blake2b_final (&st, cmd->digest, sizeof (cmd->digest));
}
- *size = sizeof (struct rspamd_fuzzy_cmd);
+ if (size != NULL) {
+ *size = sizeof (struct rspamd_fuzzy_cmd);
+ }
return cmd;
}
@@ -623,33 +629,81 @@ fuzzy_cmd_to_wire (gint fd, const struct rspamd_fuzzy_cmd *cmd, gsize len)
return TRUE;
}
+static gboolean
+fuzzy_cmd_vector_to_wire (gint fd, GPtrArray *v)
+{
+ guint i;
+ const struct rspamd_fuzzy_cmd *cmd;
+ gsize len;
+
+ for (i = 0; i < v->len; i ++) {
+ cmd = g_ptr_array_index (v, i);
+ len = cmd->shingles_count > 0 ? sizeof (struct rspamd_fuzzy_shingle_cmd) :
+ sizeof (struct rspamd_fuzzy_cmd);
+ if (!fuzzy_cmd_to_wire (fd, cmd, len)) {
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+/*
+ * Read replies one-by-one and remove them from req array
+ */
+static const struct rspamd_fuzzy_reply *
+fuzzy_process_reply (guchar **pos, gint *r, GPtrArray *req)
+{
+ const guchar *p = *pos;
+ gint remain = *r;
+ guint i;
+ const struct rspamd_fuzzy_cmd *cmd;
+ const struct rspamd_fuzzy_reply *rep;
+
+ if (remain == 0 || (guint)remain < sizeof (struct rspamd_fuzzy_reply)) {
+ return NULL;
+ }
+
+ rep = (const struct rspamd_fuzzy_reply *)p;
+ /*
+ * Search for tag
+ */
+ for (i = 0; i < req->len; i ++) {
+ cmd = g_ptr_array_index (req, i);
+ if (cmd->tag == rep->tag) {
+ g_ptr_array_remove_index (req, i);
+ *pos += sizeof (struct rspamd_fuzzy_reply);
+ *r -= sizeof (struct rspamd_fuzzy_reply);
+ return rep;
+ }
+ }
+
+ msg_info ("unexpected tag: %ud", rep->tag);
+
+ return NULL;
+}
+
/* Call this whenever we got data from fuzzy storage */
static void
fuzzy_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
- struct legacy_fuzzy_cmd cmd;
+ const struct rspamd_fuzzy_reply *rep;
struct fuzzy_mapping *map;
- gchar buf[62], *err_str;
+ guchar buf[2048], *p;
const gchar *symbol;
- gint value = 0, flag = 0, r;
+ gint r;
double nval;
gint ret = 0;
if (what == EV_WRITE) {
- /* Send command to storage */
- memset (&cmd, 0, sizeof (cmd));
- cmd.blocksize = session->h->block_size;
- cmd.value = 0;
- memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
- cmd.cmd = FUZZY_CHECK;
- cmd.flag = 0;
- if (write (fd, &cmd, sizeof (struct legacy_fuzzy_cmd)) == -1) {
+ if (!fuzzy_cmd_vector_to_wire (fd, session->commands)) {
ret = -1;
}
else {
event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+ event_set (&session->ev, fd, EV_READ | EV_PERSIST,
+ fuzzy_io_callback, session);
event_add (&session->ev, &session->tv);
session->state = 1;
}
@@ -659,48 +713,44 @@ fuzzy_io_callback (gint fd, short what, void *arg)
if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
ret = -1;
}
- else if (buf[0] == 'O' && buf[1] == 'K') {
- buf[r] = 0;
- /* Now try to get value */
- value = strtol (buf + 3, &err_str, 10);
- if (*err_str == ' ') {
- /* Now read flag */
- flag = strtol (err_str + 1, &err_str, 10);
- }
- *err_str = '\0';
- /* Get mapping by flag */
- if ((map =
- g_hash_table_lookup (session->rule->mappings,
- GINT_TO_POINTER (flag))) == NULL) {
- /* Default symbol and default weight */
- symbol = session->rule->symbol;
- nval = fuzzy_normalize (value, session->rule->max_score);
- }
- else {
- /* Get symbol and weight from map */
- symbol = map->symbol;
- nval = fuzzy_normalize (value, map->weight);
- }
- msg_info (
- "<%s>, found fuzzy hash '%s' with weight: %.2f, in list: %s:%d%s",
- session->task->message_id,
- fuzzy_to_string (session->h),
- nval,
- symbol,
- flag,
- map == NULL ? "(unknown)" : "");
- if (map != NULL || !session->rule->skip_unknown) {
- rspamd_snprintf (buf,
- sizeof (buf),
- "%d: %d / %.2f",
- flag,
- value,
- nval);
- rspamd_task_insert_result_single (session->task,
- symbol,
- nval,
- g_list_prepend (NULL,
- rspamd_mempool_strdup (session->task->task_pool, buf)));
+ else {
+ p = buf;
+ while ((rep = fuzzy_process_reply (&p, &r, session->commands)) != NULL) {
+ /* Get mapping by flag */
+ if ((map =
+ g_hash_table_lookup (session->rule->mappings,
+ GINT_TO_POINTER (rep->flag))) == NULL) {
+ /* Default symbol and default weight */
+ symbol = session->rule->symbol;
+
+ }
+ else {
+ /* Get symbol and weight from map */
+ symbol = map->symbol;
+ }
+
+ nval = fuzzy_normalize (rep->value, session->rule->max_score);
+ nval *= rep->prob;
+ msg_info (
+ "<%s>, found fuzzy hash with weight: %.2f, in list: %s:%d%s",
+ session->task->message_id,
+ nval,
+ symbol,
+ rep->flag,
+ map == NULL ? "(unknown)" : "");
+ if (map != NULL || !session->rule->skip_unknown) {
+ rspamd_snprintf (buf,
+ sizeof (buf),
+ "%d: %.2f / %.2f",
+ rep->flag,
+ rep->prob,
+ nval);
+ rspamd_task_insert_result_single (session->task,
+ symbol,
+ nval,
+ g_list_prepend (NULL,
+ rspamd_mempool_strdup (session->task->task_pool, buf)));
+ }
}
}
ret = 1;
@@ -719,32 +769,33 @@ fuzzy_io_callback (gint fd, short what, void *arg)
errno,
strerror (errno));
rspamd_upstream_fail (session->server);
+ remove_normal_event (session->task->s, fuzzy_io_fin, session);
}
else {
rspamd_upstream_ok (session->server);
+ if (session->commands->len == 0) {
+ /*
+ * All requests are processed now
+ */
+ remove_normal_event (session->task->s, fuzzy_io_fin, session);
+ }
}
-
- remove_normal_event (session->task->s, fuzzy_io_fin, session);
}
static void
fuzzy_learn_callback (gint fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
- struct legacy_fuzzy_cmd cmd;
- gchar buf[512];
- const gchar *cmd_name, *symbol;
+ const struct rspamd_fuzzy_reply *rep;
+ struct fuzzy_mapping *map;
+ guchar buf[2048], *p;
+ const gchar *symbol;
+ gint r;
gint ret = 0;
- cmd_name = (session->cmd == FUZZY_WRITE ? "add" : "delete");
if (what == EV_WRITE) {
- /* Send command to storage */
- cmd.blocksize = session->h->block_size;
- memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
- cmd.cmd = session->cmd;
- cmd.value = session->value;
- cmd.flag = session->flag;
- if (write (fd, &cmd, sizeof (struct legacy_fuzzy_cmd)) == -1) {
+ /* Send commands to storage */
+ if (!fuzzy_cmd_vector_to_wire (fd, session->commands)) {
if (*(session->err) == NULL) {
g_set_error (session->err,
g_quark_from_static_string ("fuzzy check"),
@@ -754,61 +805,65 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
}
else {
event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ, fuzzy_learn_callback,
- session);
+ event_set (&session->ev, fd, EV_READ | EV_PERSIST,
+ fuzzy_learn_callback, session);
event_add (&session->ev, &session->tv);
}
}
else if (what == EV_READ) {
- if (session->map) {
- symbol = session->map->symbol;
- }
- else {
- symbol = session->rule->symbol;
- }
- if (read (fd, buf, sizeof (buf)) == -1) {
- msg_info ("cannot %s fuzzy hash for message <%s>, list %s:%d",
- cmd_name,
- session->task->message_id,
- symbol,
- session->flag);
+ if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
+ msg_info ("cannot process fuzzy hash for message <%s>",
+ session->task->message_id);
if (*(session->err) == NULL) {
g_set_error (session->err,
- g_quark_from_static_string ("fuzzy check"),
- errno, "read socket error: %s", strerror (errno));
+ g_quark_from_static_string ("fuzzy check"),
+ errno, "read socket error: %s", strerror (errno));
}
ret = -1;
}
- else if (buf[0] == 'O' && buf[1] == 'K') {
- msg_info ("%s fuzzy hash '%s', list: %s:%d for message <%s>",
- cmd_name,
- fuzzy_to_string (session->h),
- symbol,
- session->flag,
- session->task->message_id);
- ret = 1;
- }
else {
- msg_info ("cannot %s fuzzy hash '%s' for message <%s>, list %s:%d",
- cmd_name,
- fuzzy_to_string (session->h),
- session->task->message_id,
- symbol,
- session->flag);
- if (*(session->err) == NULL) {
- g_set_error (session->err,
- g_quark_from_static_string (
- "fuzzy check"), EINVAL, "%s fuzzy error", cmd_name);
+ p = buf;
+ while ((rep = fuzzy_process_reply (&p, &r, session->commands)) != NULL) {
+ if ((map =
+ g_hash_table_lookup (session->rule->mappings,
+ GINT_TO_POINTER (rep->flag))) == NULL) {
+ /* Default symbol and default weight */
+ symbol = session->rule->symbol;
+
+ }
+ else {
+ /* Get symbol and weight from map */
+ symbol = map->symbol;
+ }
+
+ if (rep->prob > 0.5) {
+ msg_info ("processed fuzzy hash, list: %s:%d for message <%s>",
+ symbol,
+ rep->flag,
+ session->task->message_id);
+ ret = 1;
+ }
+ else {
+ msg_info ("cannot process fuzzy hash for message <%s>, list %s:%d",
+ session->task->message_id,
+ symbol,
+ rep->flag);
+ if (*(session->err) == NULL) {
+ g_set_error (session->err,
+ g_quark_from_static_string ("fuzzy check"),
+ EINVAL, "process fuzzy error");
+ }
+ ret = 1;
+ }
}
- ret = 1;
}
}
else {
errno = ETIMEDOUT;
if (*(session->err) == NULL) {
g_set_error (session->err,
- g_quark_from_static_string (
- "fuzzy check"), EINVAL, "%s fuzzy, IO timeout", cmd_name);
+ g_quark_from_static_string ("fuzzy check"), EINVAL,
+ "process fuzzy, IO timeout");
}
ret = -1;
}
@@ -825,11 +880,14 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
rspamd_upstream_ok (session->server);
}
- rspamd_http_connection_unref (session->http_entry->conn);
- event_del (&session->ev);
- close (session->fd);
+ if (ret == -1 || session->commands->len == 0) {
+ (*session->saved) --;
+ rspamd_http_connection_unref (session->http_entry->conn);
+ event_del (&session->ev);
+ close (session->fd);
+ }
- if (--(*(session->saved)) == 0) {
+ if (*session->saved == 0) {
if (*(session->err) != NULL) {
rspamd_controller_send_error (session->http_entry,
(*session->err)->code, (*session->err)->message);
@@ -843,61 +901,20 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
}
}
-static inline void
-register_fuzzy_call (struct rspamd_task *task,
- struct fuzzy_rule *rule,
- rspamd_fuzzy_t *h)
-{
- struct fuzzy_client_session *session;
- struct upstream *selected;
- gint sock;
-
- /* Get upstream */
- selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_HASHED,
- h->hash_pipe, sizeof (h->hash_pipe));
- if (selected) {
- if ((sock = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
- SOCK_DGRAM, TRUE)) == -1) {
- msg_warn ("cannot connect to %s, %d, %s",
- rspamd_upstream_name (selected),
- errno,
- strerror (errno));
- }
- else {
- /* Create session for a socket */
- session =
- rspamd_mempool_alloc (task->task_pool,
- sizeof (struct fuzzy_client_session));
- event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback,
- session);
- msec_to_tv (fuzzy_module_ctx->io_timeout, &session->tv);
- session->state = 0;
- session->h = h;
- session->task = task;
- session->fd = sock;
- session->server = selected;
- session->rule = rule;
- event_add (&session->ev, &session->tv);
- register_async_event (task->s,
- fuzzy_io_fin,
- session,
- g_quark_from_static_string ("fuzzy check"));
- }
- }
-}
-
-static void
-fuzzy_check_rule (struct rspamd_task *task, struct fuzzy_rule *rule)
+static GPtrArray *
+fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule,
+ gint c, gint flag, guint32 value)
{
struct mime_text_part *part;
struct mime_part *mime_part;
struct rspamd_image *image;
- gchar *checksum;
+ struct rspamd_fuzzy_cmd *cmd;
gsize hashlen;
GList *cur;
- rspamd_fuzzy_t *fake_fuzzy;
+ GPtrArray *res;
cur = task->text_parts;
+ res = g_ptr_array_new ();
while (cur) {
part = cur->data;
@@ -932,8 +949,19 @@ fuzzy_check_rule (struct rspamd_task *task, struct fuzzy_rule *rule)
continue;
}
- register_fuzzy_call (task, rule, part->fuzzy);
- register_fuzzy_call (task, rule, part->double_fuzzy);
+ /*
+ * Try legacy first
+ */
+ cmd = fuzzy_cmd_from_text_part (rule, c, flag, value, task->task_pool,
+ part, TRUE, NULL);
+ if (cmd) {
+ g_ptr_array_add (res, cmd);
+ }
+ cmd = fuzzy_cmd_from_text_part (rule, c, flag, value, task->task_pool,
+ part, FALSE, NULL);
+ if (cmd) {
+ g_ptr_array_add (res, cmd);
+ }
cur = g_list_next (cur);
}
@@ -946,16 +974,20 @@ fuzzy_check_rule (struct rspamd_task *task, struct fuzzy_rule *rule)
fuzzy_module_ctx->min_height) {
if (fuzzy_module_ctx->min_width <= 0 || image->width >=
fuzzy_module_ctx->min_width) {
- checksum = g_compute_checksum_for_data (G_CHECKSUM_MD5,
- image->data->data,
- image->data->len);
- /* Construct fake fuzzy hash */
- fake_fuzzy = rspamd_mempool_alloc0 (task->task_pool,
- sizeof (rspamd_fuzzy_t));
- rspamd_strlcpy (fake_fuzzy->hash_pipe, checksum,
- sizeof (fake_fuzzy->hash_pipe));
- register_fuzzy_call (task, rule, fake_fuzzy);
- g_free (checksum);
+ cmd = fuzzy_cmd_from_data_part (rule, c, flag, value,
+ task->task_pool,
+ image->data->data, image->data->len,
+ TRUE, NULL);
+ if (cmd) {
+ g_ptr_array_add (res, cmd);
+ }
+ cmd = fuzzy_cmd_from_data_part (rule, c, flag, value,
+ task->task_pool,
+ image->data->data, image->data->len,
+ FALSE, NULL);
+ if (cmd) {
+ g_ptr_array_add (res, cmd);
+ }
}
}
}
@@ -969,28 +1001,83 @@ fuzzy_check_rule (struct rspamd_task *task, struct fuzzy_rule *rule)
fuzzy_check_content_type (rule, mime_part->type)) {
if (fuzzy_module_ctx->min_bytes <= 0 || mime_part->content->len >=
fuzzy_module_ctx->min_bytes) {
- checksum = g_compute_checksum_for_data (G_CHECKSUM_MD5,
- mime_part->content->data, mime_part->content->len);
- /* Construct fake fuzzy hash */
- fake_fuzzy =
- rspamd_mempool_alloc0 (task->task_pool,
- sizeof (rspamd_fuzzy_t));
- rspamd_strlcpy (fake_fuzzy->hash_pipe, checksum,
- sizeof (fake_fuzzy->hash_pipe));
- register_fuzzy_call (task, rule, fake_fuzzy);
- g_free (checksum);
+ cmd = fuzzy_cmd_from_data_part (rule, c, flag, value,
+ task->task_pool,
+ mime_part->content->data, mime_part->content->len,
+ TRUE, NULL);
+ if (cmd) {
+ g_ptr_array_add (res, cmd);
+ }
+ cmd = fuzzy_cmd_from_data_part (rule, c, flag, value,
+ task->task_pool,
+ mime_part->content->data, mime_part->content->len,
+ FALSE, NULL);
+ if (cmd) {
+ g_ptr_array_add (res, cmd);
+ }
}
}
cur = g_list_next (cur);
}
+
+ if (res->len == 0) {
+ g_ptr_array_free (res, FALSE);
+ return NULL;
+ }
+
+ return res;
}
-/* This callback is called when we check message via fuzzy hashes storage */
+
+static inline void
+register_fuzzy_client_call (struct rspamd_task *task,
+ struct fuzzy_rule *rule,
+ GPtrArray *commands)
+{
+ struct fuzzy_client_session *session;
+ struct upstream *selected;
+ gint sock;
+
+ /* Get upstream */
+ selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN);
+ if (selected) {
+ if ((sock = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
+ SOCK_DGRAM, TRUE)) == -1) {
+ msg_warn ("cannot connect to %s, %d, %s",
+ rspamd_upstream_name (selected),
+ errno,
+ strerror (errno));
+ }
+ else {
+ /* Create session for a socket */
+ session =
+ rspamd_mempool_alloc (task->task_pool,
+ sizeof (struct fuzzy_client_session));
+ event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback,
+ session);
+ msec_to_tv (fuzzy_module_ctx->io_timeout, &session->tv);
+ session->state = 0;
+ session->commands = commands;
+ session->task = task;
+ session->fd = sock;
+ session->server = selected;
+ session->rule = rule;
+ event_add (&session->ev, &session->tv);
+ register_async_event (task->s,
+ fuzzy_io_fin,
+ session,
+ g_quark_from_static_string ("fuzzy check"));
+ }
+ }
+}
+
+/* This callback is called when we check message in fuzzy hashes storage */
static void
fuzzy_symbol_callback (struct rspamd_task *task, void *unused)
{
struct fuzzy_rule *rule;
GList *cur;
+ GPtrArray *commands;
/* Check whitelist */
if (fuzzy_module_ctx->whitelist) {
@@ -1006,23 +1093,28 @@ fuzzy_symbol_callback (struct rspamd_task *task, void *unused)
cur = fuzzy_module_ctx->fuzzy_rules;
while (cur) {
rule = cur->data;
- fuzzy_check_rule (task, rule);
+ commands = fuzzy_generate_commands (task, rule, FUZZY_CHECK, 0, 0);
+ if (commands != NULL) {
+ register_fuzzy_client_call (task, rule, commands);
+ }
cur = g_list_next (cur);
}
}
static inline gboolean
register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
- struct fuzzy_rule *rule, struct rspamd_task *task, rspamd_fuzzy_t *h,
- gint cmd, gint value, gint flag, gint *saved, GError **err)
+ struct fuzzy_rule *rule,
+ struct rspamd_task *task,
+ GPtrArray *commands,
+ gint *saved,
+ GError **err)
{
struct fuzzy_learn_session *s;
struct upstream *selected;
gint sock;
/* Get upstream */
- selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_HASHED,
- h->hash_pipe, sizeof (h->hash_pipe));
+ selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN);
if (selected) {
/* Create UDP socket */
if ((sock = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
@@ -1030,7 +1122,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
return FALSE;
}
else {
- /* Socket is made, create session */
s =
rspamd_mempool_alloc (task->task_pool,
sizeof (struct fuzzy_learn_session));
@@ -1038,20 +1129,13 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
event_base_set (entry->rt->ev_base, &s->ev);
msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
s->task = task;
- s->h =
- rspamd_mempool_alloc (task->task_pool, sizeof (rspamd_fuzzy_t));
- memcpy (s->h, h, sizeof (rspamd_fuzzy_t));
+ s->commands = commands;
s->http_entry = entry;
s->server = selected;
- s->cmd = cmd;
- s->value = value;
- s->flag = flag;
s->saved = saved;
s->fd = sock;
s->err = err;
s->rule = rule;
- s->map = g_hash_table_lookup (rule->mappings,
- GINT_TO_POINTER (flag));
/* We ref connection to avoid freeing before we process fuzzy rule */
rspamd_http_connection_ref (entry->conn);
event_add (&s->ev, &s->tv);
@@ -1063,131 +1147,17 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
return FALSE;
}
-static int
-fuzzy_process_rule (struct rspamd_http_connection_entry *entry,
- struct fuzzy_rule *rule,
- struct rspamd_task *task,
- GError **err,
- gint cmd,
- gint flag,
- gint value,
- gint *saved)
-{
- struct mime_text_part *part;
- struct mime_part *mime_part;
- struct rspamd_image *image;
- GList *cur;
- gchar *checksum;
- rspamd_fuzzy_t fake_fuzzy;
- gint processed = 0;
-
- /* Plan new event for writing */
- cur = task->text_parts;
-
- while (cur) {
- part = cur->data;
- if (part->is_empty || part->fuzzy == NULL ||
- part->fuzzy->hash_pipe[0] == '\0' ||
- (fuzzy_module_ctx->min_bytes > 0 && part->content->len <
- fuzzy_module_ctx->min_bytes)) {
- /* Skip empty parts */
- msg_info ("<%s>: part %Xd is too short for fuzzy process, skip it",
- task->message_id, part->fuzzy ? part->fuzzy->h : 0);
- cur = g_list_next (cur);
- continue;
- }
- if (!register_fuzzy_controller_call (entry, rule, task,
- part->fuzzy, cmd, value, flag, saved, err)) {
- goto err;
- }
- if (!register_fuzzy_controller_call (entry, rule, task,
- part->double_fuzzy, cmd, value, flag, saved, err)) {
- /* Cannot write hash */
- goto err;
- }
- processed++;
- cur = g_list_next (cur);
- }
-
- /* Process images */
- cur = task->images;
- while (cur) {
- image = cur->data;
- if (image->data->len > 0) {
- if (fuzzy_module_ctx->min_height <= 0 || image->height >=
- fuzzy_module_ctx->min_height) {
- if (fuzzy_module_ctx->min_width <= 0 || image->width >=
- fuzzy_module_ctx->min_width) {
- checksum = g_compute_checksum_for_data (G_CHECKSUM_MD5,
- image->data->data,
- image->data->len);
- /* Construct fake fuzzy hash */
- fake_fuzzy.block_size = 0;
- memset (fake_fuzzy.hash_pipe, 0,
- sizeof (fake_fuzzy.hash_pipe));
- rspamd_strlcpy (fake_fuzzy.hash_pipe, checksum,
- sizeof (fake_fuzzy.hash_pipe));
- if (!register_fuzzy_controller_call (entry, rule, task,
- &fake_fuzzy, cmd, value, flag, saved, err)) {
- g_free (checksum);
- goto err;
- }
-
- msg_info ("save hash of image: [%s] to list: %d",
- checksum,
- flag);
- g_free (checksum);
- processed++;
- }
- }
- }
- cur = g_list_next (cur);
- }
- /* Process other parts */
- cur = task->parts;
- while (cur) {
- mime_part = cur->data;
- if (mime_part->content->len > 0 &&
- fuzzy_check_content_type (rule, mime_part->type)) {
- if (fuzzy_module_ctx->min_bytes <= 0 || mime_part->content->len >=
- fuzzy_module_ctx->min_bytes) {
- checksum = g_compute_checksum_for_data (G_CHECKSUM_MD5,
- mime_part->content->data, mime_part->content->len);
- /* Construct fake fuzzy hash */
- fake_fuzzy.block_size = 0;
- memset (fake_fuzzy.hash_pipe, 0, sizeof (fake_fuzzy.hash_pipe));
- rspamd_strlcpy (fake_fuzzy.hash_pipe, checksum,
- sizeof (fake_fuzzy.hash_pipe));
- if (!register_fuzzy_controller_call (entry, rule, task,
- &fake_fuzzy, cmd, value, flag, saved, err)) {
- goto err;
- }
- msg_info ("save hash of part of type: %s/%s: [%s] to list %d",
- mime_part->type->type, mime_part->type->subtype,
- checksum, flag);
- g_free (checksum);
- processed++;
- }
- }
- cur = g_list_next (cur);
- }
-
- return processed;
-
-err:
- return -1;
-}
-
static void
fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_http_message *msg, gint cmd, gint value, gint flag,
struct fuzzy_ctx *ctx)
{
struct fuzzy_rule *rule;
+ struct rspamd_task *task;
gboolean processed = FALSE, res = TRUE;
GList *cur;
- struct rspamd_task *task;
GError **err;
+ GPtrArray *commands;
gint r, *saved, rules = 0;
/* Prepare task */
@@ -1223,8 +1193,13 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
continue;
}
rules ++;
- res = fuzzy_process_rule (conn_ent, rule, task, err, cmd, flag,
- value, saved);
+
+ res = 0;
+ commands = fuzzy_generate_commands (task, rule, cmd, flag, value);
+ if (commands != NULL) {
+ res = register_fuzzy_controller_call (conn_ent, rule, task, commands,
+ saved, err);
+ }
if (res) {
processed = TRUE;