aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/fuzzy_check.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/fuzzy_check.c')
-rw-r--r--src/plugins/fuzzy_check.c299
1 files changed, 129 insertions, 170 deletions
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 2c91869d6..75df2a645 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -47,6 +47,7 @@
#include "libstat/stat_api.h"
#include <math.h>
#include <src/libmime/message.h>
+#include "libutil/libev_helper.h"
#define DEFAULT_SYMBOL "R_FUZZY_HASH"
@@ -129,9 +130,8 @@ struct fuzzy_client_session {
struct rspamd_symcache_item *item;
struct upstream *server;
struct fuzzy_rule *rule;
- struct event ev;
- struct event timev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
gint state;
gint fd;
guint retransmits;
@@ -146,9 +146,8 @@ struct fuzzy_learn_session {
struct upstream *server;
struct fuzzy_rule *rule;
struct rspamd_task *task;
- struct event ev;
- struct event timev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
gint fd;
guint retransmits;
};
@@ -1185,8 +1184,7 @@ fuzzy_io_fin (void *ud)
g_ptr_array_free (session->results, TRUE);
}
- event_del (&session->ev);
- event_del (&session->timev);
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
close (session->fd);
}
@@ -2181,13 +2179,49 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session)
return FALSE;
}
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_client_session *session = arg;
+ struct rspamd_task *task;
+
+ task = session->task;
+
+ /* We might be here because of other checks being slow */
+ if (fuzzy_check_try_read (session) > 0) {
+ if (fuzzy_check_session_is_completed (session)) {
+ return;
+ }
+ }
+
+ if (session->retransmits >= session->rule->ctx->retransmits) {
+ msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+ rspamd_upstream_name (session->server),
+ rspamd_inet_address_to_string_pretty (
+ rspamd_upstream_addr_cur (session->server)),
+ session->retransmits);
+ rspamd_upstream_fail (session->server, TRUE);
+
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+ }
+ rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Plan write event */
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ|EV_WRITE);
+ session->retransmits ++;
+ }
+}
+
/* Fuzzy check callback */
static void
fuzzy_check_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
struct rspamd_task *task;
- struct ev_loop *ev_base;
gint r;
enum {
@@ -2224,18 +2258,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
}
}
else {
- /* Should not happen */
- g_assert (0);
+ fuzzy_check_timer_callback (fd, what, arg);
+ return;
}
if (ret == return_want_more) {
/* Processed write, switch to reading */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
}
else if (ret == return_error) {
/* Error state */
@@ -2258,78 +2288,82 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
/* Read something from network */
if (!fuzzy_check_session_is_completed (session)) {
/* Need to read more */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, session->fd, EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
}
}
}
-/* Fuzzy check timeout callback */
+
static void
-fuzzy_check_timer_callback (gint fd, short what, void *arg)
+fuzzy_lua_fin (void *ud)
{
- struct fuzzy_client_session *session = arg;
+ struct fuzzy_learn_session *session = ud;
+
+ (*session->saved)--;
+
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
+ close (session->fd);
+}
+
+/* Controller IO */
+
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_learn_session *session = arg;
struct rspamd_task *task;
- struct ev_loop *ev_base;
task = session->task;
- /* We might be here because of other checks being slow */
- if (fuzzy_check_try_read (session) > 0) {
- if (fuzzy_check_session_is_completed (session)) {
- return;
- }
- }
-
if (session->retransmits >= session->rule->ctx->retransmits) {
- msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+ rspamd_upstream_fail (session->server, TRUE);
+ msg_err_task_check ("got IO timeout with server %s(%s), "
+ "after %d retransmits",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr_cur (session->server)),
session->retransmits);
- rspamd_upstream_fail (session->server, TRUE);
- if (session->item) {
- rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+ if (session->session) {
+ rspamd_session_remove_event (session->session, fuzzy_lua_fin,
+ session);
+ }
+ else {
+ if (session->http_entry) {
+ rspamd_controller_send_error (session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ }
+
+ if (*session->saved > 0 ) {
+ (*session->saved)--;
+ if (*session->saved == 0) {
+ if (session->http_entry) {
+ rspamd_task_free (session->task);
+ }
+
+ session->task = NULL;
+ }
+ }
+
+ if (session->http_entry) {
+ rspamd_http_connection_unref (session->http_entry->conn);
+ }
+
+ rspamd_ev_watcher_stop (session->event_loop,
+ &session->ev);
+ close (session->fd);
}
- rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_WRITE|EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
-
- /* Plan new retransmit timer */
- ev_base = event_get_base (&session->timev);
- event_del (&session->timev);
- event_base_set (ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ|EV_WRITE);
session->retransmits ++;
}
}
static void
-fuzzy_lua_fin (void *ud)
-{
- struct fuzzy_learn_session *session = ud;
-
- (*session->saved)--;
-
- event_del (&session->ev);
- event_del (&session->timev);
- close (session->fd);
-}
-
-/* Controller IO */
-static void
fuzzy_controller_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
@@ -2340,7 +2374,6 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
struct fuzzy_cmd_io *io;
struct rspamd_fuzzy_cmd *cmd = NULL;
const gchar *symbol, *ftype;
- struct ev_loop *ev_base;
gint r;
enum {
return_error = 0,
@@ -2355,7 +2388,8 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
if (what & EV_READ) {
if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
return;
}
@@ -2482,16 +2516,14 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
}
}
else {
- g_assert (0);
+ fuzzy_controller_timer_callback (fd, what, arg);
+
+ return;
}
if (ret == return_want_more) {
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ,
- fuzzy_controller_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
return;
}
@@ -2518,8 +2550,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
rspamd_http_connection_unref (session->http_entry->conn);
}
- event_del (&session->ev);
- event_del (&session->timev);
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
close (session->fd);
if (*session->saved == 0) {
@@ -2555,7 +2586,6 @@ cleanup:
if (session->http_entry) {
ucl_object_t *reply, *hashes;
- guint i;
gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
reply = ucl_object_typed_new (UCL_OBJECT);
@@ -2588,72 +2618,6 @@ cleanup:
}
-static void
-fuzzy_controller_timer_callback (gint fd, short what, void *arg)
-{
- struct fuzzy_learn_session *session = arg;
- struct rspamd_task *task;
- struct ev_loop *ev_base;
-
- task = session->task;
-
- if (session->retransmits >= session->rule->ctx->retransmits) {
- rspamd_upstream_fail (session->server, TRUE);
- msg_err_task_check ("got IO timeout with server %s(%s), "
- "after %d retransmits",
- rspamd_upstream_name (session->server),
- rspamd_inet_address_to_string_pretty (
- rspamd_upstream_addr_cur (session->server)),
- session->retransmits);
-
- if (session->session) {
- rspamd_session_remove_event (session->session, fuzzy_lua_fin,
- session);
- }
- else {
- if (session->http_entry) {
- rspamd_controller_send_error (session->http_entry,
- 500, "IO timeout with fuzzy storage");
- }
-
- if (*session->saved > 0 ) {
- (*session->saved)--;
- if (*session->saved == 0) {
- if (session->http_entry) {
- rspamd_task_free (session->task);
- }
-
- session->task = NULL;
- }
- }
-
- if (session->http_entry) {
- rspamd_http_connection_unref (session->http_entry->conn);
- }
-
- event_del (&session->ev);
- event_del (&session->timev);
- close (session->fd);
- }
- }
- else {
- /* Plan write event */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_WRITE|EV_READ,
- fuzzy_controller_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
-
- /* Plan new retransmit timer */
- ev_base = event_get_base (&session->timev);
- event_del (&session->timev);
- event_base_set (ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
- session->retransmits ++;
- }
-}
-
static GPtrArray *
fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule,
gint c, gint flag, guint32 value, guint flags)
@@ -2774,7 +2738,6 @@ register_fuzzy_client_call (struct rspamd_task *task,
session =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_client_session));
- msec_to_tv (rule->ctx->io_timeout, &session->tv);
session->state = 0;
session->commands = commands;
session->task = task;
@@ -2782,16 +2745,15 @@ register_fuzzy_client_call (struct rspamd_task *task,
session->server = selected;
session->rule = rule;
session->results = g_ptr_array_sized_new (32);
+ session->event_loop = task->event_loop;
- event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
- session);
- event_base_set (session->task->event_loop, &session->ev);
- event_add (&session->ev, NULL);
-
- evtimer_set (&session->timev, fuzzy_check_timer_callback,
+ rspamd_ev_watcher_init (&session->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_check_io_callback,
session);
- event_base_set (session->task->event_loop, &session->timev);
- event_add (&session->timev, &session->tv);
+ rspamd_ev_watcher_start (session->event_loop, &session->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
rspamd_session_add_event (task->s, fuzzy_io_fin, session, M);
session->item = rspamd_symcache_get_cur_item (task);
@@ -2881,7 +2843,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
struct rspamd_controller_session *session = entry->ud;
gint sock;
gint ret = -1;
- struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);
/* Get upstream */
@@ -2899,7 +2860,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
rspamd_mempool_alloc0 (session->pool,
sizeof (struct fuzzy_learn_session));
- msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
s->http_entry = entry;
@@ -2908,17 +2868,17 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
s->fd = sock;
s->err = err;
s->rule = rule;
+ s->event_loop = task->event_loop;
/* We ref connection to avoid freeing before we process fuzzy rule */
rspamd_http_connection_ref (entry->conn);
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (entry->rt->event_loop, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback,
+ rspamd_ev_watcher_init (&s->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_controller_io_callback,
s);
- event_base_set (s->task->event_loop, &s->timev);
- event_add (&s->timev, &s->tv);
+ rspamd_ev_watcher_start (s->event_loop, &s->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
(*saved)++;
ret = 1;
@@ -3258,8 +3218,6 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
s =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_learn_session));
-
- msec_to_tv (rule->ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
s->http_entry = NULL;
@@ -3269,14 +3227,15 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
s->err = err;
s->rule = rule;
s->session = task->s;
+ s->event_loop = task->event_loop;
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (task->event_loop, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
- event_base_set (s->task->event_loop, &s->timev);
- event_add (&s->timev, &s->tv);
+ rspamd_ev_watcher_init (&s->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_controller_io_callback,
+ s);
+ rspamd_ev_watcher_start (s->event_loop, &s->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
rspamd_session_add_event (task->s,
fuzzy_lua_fin,
@@ -3367,7 +3326,7 @@ static gint
fuzzy_lua_learn_handler (lua_State *L)
{
struct rspamd_task *task = lua_check_task (L, 1);
- guint flag = 0, weight = 1.0, send_flags = 0;
+ guint flag = 0, weight = 1, send_flags = 0;
const gchar *symbol;
struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);