aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-10-05 12:08:32 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-10-05 12:08:32 +0100
commitb7de31588d06d254754fcb46381181d9cb91e88e (patch)
tree7ade58da4b841a674c886923a34d8be6397c405c
parent96229b2523c9df05b514504c5e4ef202c334248f (diff)
downloadrspamd-b7de31588d06d254754fcb46381181d9cb91e88e.tar.gz
rspamd-b7de31588d06d254754fcb46381181d9cb91e88e.zip
Implement retransmits for fuzzy_check plugin.
-rw-r--r--src/plugins/fuzzy_check.c222
1 files changed, 172 insertions, 50 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 5b2190208..cdd575136 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -60,6 +60,7 @@
#define DEFAULT_UPSTREAM_MAXERRORS 10
#define DEFAULT_IO_TIMEOUT 500
+#define DEFAULT_RETRANSMITS 3
#define DEFAULT_PORT 11335
struct fuzzy_mapping {
@@ -100,30 +101,35 @@ struct fuzzy_ctx {
guint32 min_height;
guint32 min_width;
guint32 io_timeout;
+ guint32 retransmits;
};
struct fuzzy_client_session {
- gint state;
GPtrArray *commands;
- struct event ev;
- struct timeval tv;
struct rspamd_task *task;
struct upstream *server;
struct fuzzy_rule *rule;
+ struct event ev;
+ struct event timev;
+ struct timeval tv;
+ gint state;
gint fd;
+ guint retransmits;
};
struct fuzzy_learn_session {
- struct event ev;
GPtrArray *commands;
gint *saved;
GError **err;
- struct timeval tv;
struct rspamd_http_connection_entry *http_entry;
struct upstream *server;
struct fuzzy_rule *rule;
struct rspamd_task *task;
+ struct event ev;
+ struct event timev;
+ struct timeval tv;
gint fd;
+ guint retransmits;
};
struct fuzzy_cmd_io {
@@ -509,6 +515,16 @@ fuzzy_check_module_config (struct rspamd_config *cfg)
}
if ((value =
+ rspamd_config_get_module_opt (cfg,
+ "fuzzy_check",
+ "retransmits")) != NULL) {
+ fuzzy_module_ctx->retransmits = ucl_obj_toint (value);
+ }
+ else {
+ fuzzy_module_ctx->retransmits = DEFAULT_RETRANSMITS;
+ }
+
+ if ((value =
rspamd_config_get_module_opt (cfg, "fuzzy_check",
"whitelist")) != NULL) {
fuzzy_module_ctx->whitelist = radix_create_compressed ();
@@ -567,7 +583,9 @@ fuzzy_io_fin (void *ud)
if (session->commands) {
g_ptr_array_free (session->commands, TRUE);
}
+
event_del (&session->ev);
+ event_del (&session->timev);
close (session->fd);
}
@@ -767,16 +785,20 @@ fuzzy_cmd_vector_to_wire (gint fd, GPtrArray *v)
{
guint i;
struct fuzzy_cmd_io *io;
+ gboolean processed = FALSE;
for (i = 0; i < v->len; i ++) {
io = g_ptr_array_index (v, i);
- if (!fuzzy_cmd_to_wire (fd, &io->io)) {
- return FALSE;
+ if (!io->replied) {
+ if (!fuzzy_cmd_to_wire (fd, &io->io)) {
+ return FALSE;
+ }
+ processed = TRUE;
}
}
- return TRUE;
+ return processed;
}
/*
@@ -852,9 +874,9 @@ fuzzy_process_reply (guchar **pos, gint *r, GPtrArray *req,
return NULL;
}
-/* Call this whenever we got data from fuzzy storage */
+/* Fuzzy check callback */
static void
-fuzzy_io_callback (gint fd, short what, void *arg)
+fuzzy_check_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
const struct rspamd_fuzzy_reply *rep;
@@ -875,10 +897,6 @@ fuzzy_io_callback (gint fd, short what, void *arg)
ret = -1;
}
else {
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ | EV_PERSIST,
- fuzzy_io_callback, session);
- event_add (&session->ev, &session->tv);
session->state = 1;
ret = 0;
}
@@ -935,14 +953,19 @@ fuzzy_io_callback (gint fd, short what, void *arg)
}
}
else {
- errno = ETIMEDOUT;
- ret = -1;
+ /* Should not happen */
+ g_assert (0);
}
if (ret == 0) {
- return;
+ /* Processed write, switch to reading */
+ event_del (&session->ev);
+ event_set (&session->ev, fd, EV_READ,
+ fuzzy_check_io_callback, session);
+ event_add (&session->ev, NULL);
}
else if (ret == -1) {
+ /* Error state */
msg_err_task ("got error on IO with server %s, on %s, %d, %s",
rspamd_upstream_name (session->server),
session->state == 1 ? "read" : "write",
@@ -952,6 +975,7 @@ fuzzy_io_callback (gint fd, short what, void *arg)
rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
else {
+ /* Read something from network */
rspamd_upstream_ok (session->server);
guint nreplied = 0;
@@ -966,11 +990,49 @@ fuzzy_io_callback (gint fd, short what, void *arg)
if (nreplied == session->commands->len) {
rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
+ else {
+ /* Need to read more */
+ event_del (&session->ev);
+ event_set (&session->ev, fd, EV_READ,
+ fuzzy_check_io_callback, session);
+ event_add (&session->ev, NULL);
+ }
+ }
+}
+
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_client_session *session = arg;
+ struct rspamd_task *task;
+
+ task = session->task;
+
+ if (session->retransmits >= fuzzy_module_ctx->retransmits) {
+ msg_err_task ("got IO timeout with server %s, after %d retransmits",
+ rspamd_upstream_name (session->server),
+ session->retransmits);
+ rspamd_upstream_fail (session->server);
+ rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Plan write event */
+ event_del (&session->ev);
+ event_set (&session->ev, fd, EV_WRITE,
+ fuzzy_check_io_callback, session);
+ event_add (&session->ev, NULL);
+
+ /* Plan new retransmit timer */
+ event_del (&session->timev);
+ event_add (&session->timev, &session->tv);
+ session->retransmits ++;
}
}
+/* Controller IO */
static void
-fuzzy_learn_callback (gint fd, short what, void *arg)
+fuzzy_controller_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
const struct rspamd_fuzzy_reply *rep;
@@ -993,12 +1055,6 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
}
ret = -1;
}
- else {
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ | EV_PERSIST,
- fuzzy_learn_callback, session);
- event_add (&session->ev, &session->tv);
- }
}
else if (what == EV_READ) {
if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
@@ -1053,43 +1109,90 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
}
}
else {
- errno = ETIMEDOUT;
- if (*(session->err) == NULL) {
- g_set_error (session->err,
- g_quark_from_static_string ("fuzzy check"), EINVAL,
- "process fuzzy, IO timeout");
- }
- ret = -1;
+ g_assert (0);
}
if (ret == 0) {
- return;
+ event_del (&session->ev);
+ event_set (&session->ev, fd, EV_READ,
+ fuzzy_controller_io_callback, session);
+ event_add (&session->ev, NULL);
}
else if (ret == -1) {
msg_err_task ("got error in IO with server %s, %d, %s",
rspamd_upstream_name (session->server), errno, strerror (errno));
+
+ goto cleanup;
+ }
+ else {
+ (*session->saved)--;
+
+ if (*session->saved == 0) {
+ goto cleanup;
+ }
+ else {
+ /* Read more */
+ event_del (&session->ev);
+ event_set (&session->ev, fd, EV_READ,
+ fuzzy_controller_io_callback, session);
+ event_add (&session->ev, NULL);
+ }
+ }
+
+ return;
+
+cleanup:
+ if (*(session->err) != NULL) {
rspamd_upstream_fail (session->server);
+ rspamd_controller_send_error (session->http_entry,
+ (*session->err)->code, (*session->err)->message);
+ g_error_free (*session->err);
}
else {
rspamd_upstream_ok (session->server);
+ rspamd_controller_send_string (session->http_entry,
+ "{\"success\":true}");
}
- (*session->saved) --;
+ rspamd_task_free (session->task, TRUE);
rspamd_http_connection_unref (session->http_entry->conn);
event_del (&session->ev);
+ event_del (&session->timev);
close (session->fd);
+}
- if (*session->saved == 0) {
- if (*(session->err) != NULL) {
- rspamd_controller_send_error (session->http_entry,
- (*session->err)->code, (*session->err)->message);
- g_error_free (*session->err);
- }
- else {
- rspamd_controller_send_string (session->http_entry,
- "{\"success\":true}");
- }
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_learn_session *session = arg;
+ struct rspamd_task *task;
+
+ task = session->task;
+
+ if (session->retransmits >= fuzzy_module_ctx->retransmits) {
+ rspamd_upstream_fail (session->server);
+ rspamd_controller_send_error (session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ msg_err_task ("got IO timeout with server %s, after %d retransmits",
+ rspamd_upstream_name (session->server),
+ session->retransmits);
rspamd_task_free (session->task, TRUE);
+ rspamd_http_connection_unref (session->http_entry->conn);
+ event_del (&session->ev);
+ event_del (&session->timev);
+ close (session->fd);
+ }
+ else {
+ /* Plan write event */
+ event_del (&session->ev);
+ event_set (&session->ev, fd, EV_WRITE,
+ fuzzy_controller_io_callback, session);
+ event_add (&session->ev, NULL);
+
+ /* Plan new retransmit timer */
+ event_del (&session->timev);
+ event_add (&session->timev, &session->tv);
+ session->retransmits ++;
}
}
@@ -1223,7 +1326,7 @@ register_fuzzy_client_call (struct rspamd_task *task,
else {
/* Create session for a socket */
session =
- rspamd_mempool_alloc (task->task_pool,
+ rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_client_session));
msec_to_tv (fuzzy_module_ctx->io_timeout, &session->tv);
session->state = 0;
@@ -1232,10 +1335,17 @@ register_fuzzy_client_call (struct rspamd_task *task,
session->fd = sock;
session->server = selected;
session->rule = rule;
- event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback,
+
+ event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
session);
event_base_set (session->task->ev_base, &session->ev);
- event_add (&session->ev, &session->tv);
+ event_add (&session->ev, NULL);
+
+ evtimer_set (&session->timev, fuzzy_check_timer_callback,
+ session);
+ event_base_set (session->task->ev_base, &session->timev);
+ event_add (&session->timev, &session->tv);
+
rspamd_session_add_event (task->s,
fuzzy_io_fin,
session,
@@ -1298,10 +1408,9 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
}
else {
s =
- rspamd_mempool_alloc (task->task_pool,
+ rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_learn_session));
- event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
- event_base_set (entry->rt->ev_base, &s->ev);
+
msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
@@ -1313,7 +1422,16 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
s->rule = rule;
/* We ref connection to avoid freeing before we process fuzzy rule */
rspamd_http_connection_ref (entry->conn);
- event_add (&s->ev, &s->tv);
+
+ event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
+ event_base_set (entry->rt->ev_base, &s->ev);
+ event_add (&s->ev, NULL);
+
+ evtimer_set (&s->timev, fuzzy_controller_timer_callback,
+ s);
+ event_base_set (s->task->ev_base, &s->timev);
+ event_add (&s->timev, &s->tv);
+
(*saved)++;
ret = TRUE;
}
@@ -1338,6 +1456,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
/* Prepare task */
task = rspamd_task_new (NULL);
task->cfg = ctx->cfg;
+ task->ev_base = conn_ent->rt->ev_base;
/* Allocate message from string */
/* XXX: what about encrypted messsages ? */
@@ -1347,6 +1466,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint));
err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *));
r = rspamd_message_parse (task);
+
if (r == -1) {
msg_warn_task ("<%s>: cannot process message for fuzzy",
task->message_id);
@@ -1355,7 +1475,9 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
"Message processing error");
return;
}
+
cur = fuzzy_module_ctx->fuzzy_rules;
+
while (cur && res) {
rule = cur->data;