From c4bb71aba3a4b6c6f8ca88b52297979410c5c050 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 30 Nov 2015 11:08:45 +0000 Subject: [PATCH] More fixes for learning IO --- src/plugins/fuzzy_check.c | 56 ++++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index cf9f2739d..656b4e77a 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -1051,17 +1051,21 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) guint i; gint r; double nval; - gint ret = -1; + enum { + return_error = 0, + return_want_more, + return_finished + } ret = return_error; task = session->task; if (what == EV_WRITE) { if (!fuzzy_cmd_vector_to_wire (fd, session->commands)) { - ret = -1; + ret = return_error; } else { session->state = 1; - ret = 0; + ret = return_want_more; } } else if (session->state == 1) { @@ -1074,7 +1078,7 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) } else { p = buf; - ret = 0; + ret = return_want_more; while ((rep = fuzzy_process_reply (&p, &r, session->commands, session->rule)) != NULL) { @@ -1116,7 +1120,7 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) session->task->task_pool, buf))); } } - ret = 1; + ret = return_finished; } } } @@ -1125,14 +1129,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) g_assert (0); } - if (ret == 0) { + if (ret == return_want_more) { /* Processed write, switch to reading */ event_del (&session->ev); event_set (&session->ev, fd, EV_READ, fuzzy_check_io_callback, session); event_add (&session->ev, NULL); } - else if (ret == -1) { + else if (ret == return_error) { /* Error state */ msg_err_task ("got error on IO with server %s, on %s, %d, %s", rspamd_upstream_name (session->server), @@ -1207,9 +1211,15 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) struct fuzzy_mapping *map; struct rspamd_task *task; guchar buf[2048], *p; + struct fuzzy_cmd_io *io; const gchar *symbol; gint r; - gint ret = 0; + enum { + return_error = 0, + return_want_more, + return_finished + } ret = return_want_more; + guint i, nreplied; task = session->task; @@ -1221,7 +1231,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) g_quark_from_static_string ("fuzzy check"), errno, "write socket error: %s", strerror (errno)); } - ret = -1; + ret = return_error; } } else if (what == EV_READ) { @@ -1238,11 +1248,11 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) g_quark_from_static_string ("fuzzy check"), errno, "read socket error: %s", strerror (errno)); } - ret = -1; + ret = return_error; } else { p = buf; - ret = 0; + ret = return_want_more; while ((rep = fuzzy_process_reply (&p, &r, session->commands, session->rule)) != NULL) { @@ -1259,12 +1269,12 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) } if (rep->prob > 0.5) { - msg_info_task ("processed fuzzy hash, list: %s:%d for " + msg_info_task ("processed fuzzy hash <%d>, list: %s:%d for " "message <%s>", + rep->tag, symbol, rep->flag, session->task->message_id); - ret = 1; } else { msg_info_task ("cannot process fuzzy hash for message " @@ -1279,23 +1289,37 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) g_quark_from_static_string ("fuzzy check"), EINVAL, "process fuzzy error"); } - ret = 1; + ret = return_finished; + } + } + + nreplied = 0; + + for (i = 0; i < session->commands->len; i++) { + io = g_ptr_array_index (session->commands, i); + + if (io->flags & FUZZY_CMD_FLAG_REPLIED) { + nreplied++; } } + + if (nreplied == session->commands->len) { + ret = return_finished; + } } } else { g_assert (0); } - if (ret == 0) { + if (ret == return_want_more) { event_del (&session->ev); event_set (&session->ev, fd, EV_READ, fuzzy_controller_io_callback, session); event_add (&session->ev, NULL); return; } - else if (ret == -1) { + else if (ret == return_error) { msg_err_task ("got error in IO with server %s, %d, %s", rspamd_upstream_name (session->server), errno, strerror (errno)); rspamd_upstream_fail (session->server); -- 2.39.5