From 45b60f8df79e6d5ecdb85725aa4841151bdcd853 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 19 Jun 2019 13:16:25 +0100 Subject: [Project] More libserver adoptions --- src/libserver/rspamd_control.c | 154 ++++++++++----------- src/plugins/fuzzy_check.c | 299 ++++++++++++++++++----------------------- src/plugins/surbl.c | 5 +- 3 files changed, 204 insertions(+), 254 deletions(-) (limited to 'src') diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 9788d47ed..62ca24643 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -19,6 +19,7 @@ #include "worker_util.h" #include "libutil/http_connection.h" #include "libutil/http_private.h" +#include "libutil/libev_helper.h" #include "unix-std.h" #include "utlist.h" @@ -26,20 +27,14 @@ #include #endif -static struct timeval io_timeout = { - .tv_sec = 30, - .tv_usec = 0 -}; -static struct timeval worker_io_timeout = { - .tv_sec = 0, - .tv_usec = 500000 -}; +static ev_tstamp io_timeout = 30.0; +static ev_tstamp worker_io_timeout = 0.5; struct rspamd_control_session; struct rspamd_control_reply_elt { struct rspamd_control_reply reply; - struct event io_ev; + struct rspamd_io_ev ev; struct rspamd_worker *wrk; gpointer ud; gint attached_fd; @@ -48,6 +43,7 @@ struct rspamd_control_reply_elt { struct rspamd_control_session { gint fd; + struct ev_loop *event_loop; struct rspamd_main *rspamd_main; struct rspamd_http_connection *conn; struct rspamd_control_command cmd; @@ -131,7 +127,7 @@ rspamd_control_send_error (struct rspamd_control_session *session, NULL, "application/json", session, - &io_timeout); + io_timeout); } static void @@ -154,7 +150,7 @@ rspamd_control_send_ucl (struct rspamd_control_session *session, NULL, "application/json", session, - &io_timeout); + io_timeout); } static void @@ -168,7 +164,8 @@ rspamd_control_connection_close (struct rspamd_control_session *session) rspamd_inet_address_to_string (session->addr)); DL_FOREACH_SAFE (session->replies, elt, telt) { - event_del (&elt->io_ev); + rspamd_ev_watcher_stop (session->event_loop, + &elt->ev); g_free (elt); } @@ -358,7 +355,8 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud) } session->replies_remain --; - event_del (&elt->io_ev); + rspamd_ev_watcher_stop (session->event_loop, + &elt->ev); if (session->replies_remain == 0) { rspamd_control_write_reply (session); @@ -434,12 +432,12 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main, rep_elt = g_malloc0 (sizeof (*rep_elt)); rep_elt->wrk = wrk; rep_elt->ud = ud; - event_set (&rep_elt->io_ev, wrk->control_pipe[0], - EV_READ | EV_PERSIST, handler, + rspamd_ev_watcher_init (&rep_elt->ev, + wrk->control_pipe[0], + EV_READ, handler, rep_elt); - event_base_set (rspamd_main->event_loop, - &rep_elt->io_ev); - event_add (&rep_elt->io_ev, &worker_io_timeout); + rspamd_ev_watcher_start (rspamd_main->event_loop, + &rep_elt->ev, worker_io_timeout); DL_APPEND (res, rep_elt); } @@ -527,11 +525,11 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main, session->rspamd_main = rspamd_main; session->addr = addr; rspamd_http_connection_read_message (session->conn, session, - &io_timeout); + io_timeout); } struct rspamd_worker_control_data { - struct event io_ev; + ev_io io_ev; struct rspamd_worker *worker; struct ev_loop *ev_base; struct { @@ -613,9 +611,10 @@ rspamd_control_default_cmd_handler (gint fd, } static void -rspamd_control_default_worker_handler (gint fd, short what, gpointer ud) +rspamd_control_default_worker_handler (EV_P_ ev_io *w, int revents) { - struct rspamd_worker_control_data *cd = ud; + struct rspamd_worker_control_data *cd = + (struct rspamd_worker_control_data *)w->data; static struct rspamd_control_command cmd; static struct msghdr msg; static struct iovec iov; @@ -631,15 +630,15 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud) msg.msg_iov = &iov; msg.msg_iovlen = 1; - r = recvmsg (fd, &msg, 0); + r = recvmsg (w->fd, &msg, 0); if (r == -1) { msg_err ("cannot read request from the control socket: %s", strerror (errno)); if (errno != EAGAIN && errno != EINTR) { - event_del (&cd->io_ev); - close (fd); + ev_io_stop (cd->ev_base, &cd->io_ev); + close (w->fd); } } else if (r < (gint)sizeof (cmd)) { @@ -647,8 +646,8 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud) (gint)sizeof (cmd)); if (r == 0) { - event_del (&cd->io_ev); - close (fd); + ev_io_stop (cd->ev_base, &cd->io_ev); + close (w->fd); } } else if ((gint)cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) { @@ -660,13 +659,13 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud) if (cd->handlers[cmd.type].handler) { cd->handlers[cmd.type].handler (cd->worker->srv, cd->worker, - fd, + w->fd, rfd, &cmd, cd->handlers[cmd.type].ud); } else { - rspamd_control_default_cmd_handler (fd, rfd, cd, &cmd); + rspamd_control_default_cmd_handler (w->fd, rfd, cd, &cmd); } } else { @@ -684,10 +683,10 @@ rspamd_control_worker_add_default_handler (struct rspamd_worker *worker, cd->worker = worker; cd->ev_base = ev_base; - event_set (&cd->io_ev, worker->control_pipe[1], EV_READ | EV_PERSIST, - rspamd_control_default_worker_handler, cd); - event_base_set (ev_base, &cd->io_ev); - event_add (&cd->io_ev, NULL); + cd->io_ev.data = cd; + ev_io_init (&cd->io_ev, rspamd_control_default_worker_handler, + worker->control_pipe[1], EV_READ); + ev_io_start (ev_base, &cd->io_ev); worker->control_data = cd; } @@ -720,26 +719,28 @@ struct rspamd_srv_reply_data { }; static void -rspamd_control_hs_io_handler (gint fd, short what, gpointer ud) +rspamd_control_hs_io_handler (int fd, short what, void *ud) { - struct rspamd_control_reply_elt *elt = ud; + struct rspamd_control_reply_elt *elt = + (struct rspamd_control_reply_elt *)ud; struct rspamd_control_reply rep; /* At this point we just ignore replies from the workers */ (void)read (fd, &rep, sizeof (rep)); - event_del (&elt->io_ev); + rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev); g_free (elt); } static void -rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud) +rspamd_control_log_pipe_io_handler (int fd, short what, void *ud) { - struct rspamd_control_reply_elt *elt = ud; + struct rspamd_control_reply_elt *elt = + (struct rspamd_control_reply_elt *)ud; struct rspamd_control_reply rep; /* At this point we just ignore replies from the workers */ (void) read (fd, &rep, sizeof (rep)); - event_del (&elt->io_ev); + rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev); g_free (elt); } @@ -794,7 +795,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd, } static void -rspamd_srv_handler (gint fd, short what, gpointer ud) +rspamd_srv_handler (EV_P_ ev_io *w, int revents) { struct rspamd_worker *worker; static struct rspamd_srv_command cmd; @@ -809,8 +810,8 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) struct rspamd_control_command wcmd; gssize r; - if (what == EV_READ) { - worker = ud; + if (revents == EV_READ) { + worker = (struct rspamd_worker *)w->data; srv = worker->srv; iov.iov_base = &cmd; iov.iov_len = sizeof (cmd); @@ -820,7 +821,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) msg.msg_iov = &iov; msg.msg_iovlen = 1; - r = recvmsg (fd, &msg, 0); + r = recvmsg (w->fd, &msg, 0); if (r == -1) { msg_err ("cannot read from worker's srv pipe: %s", @@ -831,7 +832,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) * Usually this means that a worker is dead, so do not try to read * anything */ - event_del (&worker->srv_ev); + ev_io_stop (EV_A_ w); } else if (r != sizeof (cmd)) { msg_err ("cannot read from worker's srv pipe incomplete command: %d", @@ -919,17 +920,14 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) } /* Now plan write event and send data back */ - event_del (&worker->srv_ev); - event_set (&worker->srv_ev, - worker->srv_pipe[0], - EV_WRITE, - rspamd_srv_handler, - rdata); - event_add (&worker->srv_ev, NULL); + w->data = rdata; + ev_io_stop (EV_A_ w); + ev_io_set (w, worker->srv_pipe[0], EV_WRITE); + ev_io_start (EV_A_ w); } } - else if (what == EV_WRITE) { - rdata = ud; + else if (revents == EV_WRITE) { + rdata = (struct rspamd_srv_reply_data *)w->data; worker = rdata->worker; worker->tmp_data = NULL; /* Avoid race */ srv = rdata->srv; @@ -953,7 +951,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) msg.msg_iov = &iov; msg.msg_iovlen = 1; - r = sendmsg (fd, &msg, 0); + r = sendmsg (w->fd, &msg, 0); if (r == -1) { msg_err ("cannot write to worker's srv pipe: %s", @@ -961,13 +959,10 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) } g_free (rdata); - event_del (&worker->srv_ev); - event_set (&worker->srv_ev, - worker->srv_pipe[0], - EV_READ | EV_PERSIST, - rspamd_srv_handler, - worker); - event_add (&worker->srv_ev, NULL); + w->data = worker; + ev_io_stop (EV_A_ w); + ev_io_set (w, worker->srv_pipe[0], EV_READ); + ev_io_start (EV_A_ w); } } @@ -979,10 +974,9 @@ rspamd_srv_start_watching (struct rspamd_main *srv, g_assert (worker != NULL); worker->tmp_data = NULL; - event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST, - rspamd_srv_handler, worker); - event_base_set (ev_base, &worker->srv_ev); - event_add (&worker->srv_ev, NULL); + worker->srv_ev.data = worker; + ev_io_init (&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ); + ev_io_start (ev_base, &worker->srv_ev); } struct rspamd_srv_request_data { @@ -991,14 +985,14 @@ struct rspamd_srv_request_data { gint attached_fd; struct rspamd_srv_reply rep; rspamd_srv_reply_handler handler; - struct event io_ev; + ev_io io_ev; gpointer ud; }; static void -rspamd_srv_request_handler (gint fd, short what, gpointer ud) +rspamd_srv_request_handler (EV_P_ ev_io *w, int revents) { - struct rspamd_srv_request_data *rd = ud; + struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *)w->data; struct msghdr msg; struct iovec iov; guchar fdspace[CMSG_SPACE(sizeof (int))]; @@ -1006,7 +1000,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud) gssize r; gint rfd = -1; - if (what == EV_WRITE) { + if (revents == EV_WRITE) { /* Send request to server */ memset (&msg, 0, sizeof (msg)); @@ -1027,17 +1021,16 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud) msg.msg_iov = &iov; msg.msg_iovlen = 1; - r = sendmsg (fd, &msg, 0); + r = sendmsg (w->fd, &msg, 0); if (r == -1) { msg_err ("cannot write to server pipe: %s", strerror (errno)); goto cleanup; } - event_del (&rd->io_ev); - event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ, - rspamd_srv_request_handler, rd); - event_add (&rd->io_ev, NULL); + ev_io_stop (EV_A_ w); + ev_io_set (w, rd->worker->srv_pipe[1], EV_READ); + ev_io_start (EV_A_ w); } else { iov.iov_base = &rd->rep; @@ -1048,7 +1041,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud) msg.msg_iov = &iov; msg.msg_iovlen = 1; - r = recvmsg (fd, &msg, 0); + r = recvmsg (w->fd, &msg, 0); if (r == -1) { msg_err ("cannot read from server pipe: %s", strerror (errno)); @@ -1075,7 +1068,8 @@ cleanup: if (rd->handler) { rd->handler (rd->worker, &rd->rep, rfd, rd->ud); } - event_del (&rd->io_ev); + + ev_io_stop (EV_A_ w); g_free (rd); } @@ -1102,8 +1096,8 @@ rspamd_srv_send_command (struct rspamd_worker *worker, rd->rep.type = cmd->type; rd->attached_fd = attached_fd; - event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE, - rspamd_srv_request_handler, rd); - event_base_set (ev_base, &rd->io_ev); - event_add (&rd->io_ev, NULL); + rd->io_ev.data = rd; + ev_io_init (&rd->io_ev, rspamd_srv_request_handler, + rd->worker->srv_pipe[1], EV_WRITE); + ev_io_start (ev_base, &rd->io_ev); } diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 2c91869d6..75df2a645 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -47,6 +47,7 @@ #include "libstat/stat_api.h" #include #include +#include "libutil/libev_helper.h" #define DEFAULT_SYMBOL "R_FUZZY_HASH" @@ -129,9 +130,8 @@ struct fuzzy_client_session { struct rspamd_symcache_item *item; struct upstream *server; struct fuzzy_rule *rule; - struct event ev; - struct event timev; - struct timeval tv; + struct ev_loop *event_loop; + struct rspamd_io_ev ev; gint state; gint fd; guint retransmits; @@ -146,9 +146,8 @@ struct fuzzy_learn_session { struct upstream *server; struct fuzzy_rule *rule; struct rspamd_task *task; - struct event ev; - struct event timev; - struct timeval tv; + struct ev_loop *event_loop; + struct rspamd_io_ev ev; gint fd; guint retransmits; }; @@ -1185,8 +1184,7 @@ fuzzy_io_fin (void *ud) g_ptr_array_free (session->results, TRUE); } - event_del (&session->ev); - event_del (&session->timev); + rspamd_ev_watcher_stop (session->event_loop, &session->ev); close (session->fd); } @@ -2181,13 +2179,49 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session) return FALSE; } +/* Fuzzy check timeout callback */ +static void +fuzzy_check_timer_callback (gint fd, short what, void *arg) +{ + struct fuzzy_client_session *session = arg; + struct rspamd_task *task; + + task = session->task; + + /* We might be here because of other checks being slow */ + if (fuzzy_check_try_read (session) > 0) { + if (fuzzy_check_session_is_completed (session)) { + return; + } + } + + if (session->retransmits >= session->rule->ctx->retransmits) { + msg_err_task ("got IO timeout with server %s(%s), after %d retransmits", + rspamd_upstream_name (session->server), + rspamd_inet_address_to_string_pretty ( + rspamd_upstream_addr_cur (session->server)), + session->retransmits); + rspamd_upstream_fail (session->server, TRUE); + + if (session->item) { + rspamd_symcache_item_async_dec_check (session->task, session->item, M); + } + rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); + } + else { + /* Plan write event */ + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ|EV_WRITE); + session->retransmits ++; + } +} + /* Fuzzy check callback */ static void fuzzy_check_io_callback (gint fd, short what, void *arg) { struct fuzzy_client_session *session = arg; struct rspamd_task *task; - struct ev_loop *ev_base; gint r; enum { @@ -2224,18 +2258,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) } } else { - /* Should not happen */ - g_assert (0); + fuzzy_check_timer_callback (fd, what, arg); + return; } if (ret == return_want_more) { /* Processed write, switch to reading */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_READ, - fuzzy_check_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); } else if (ret == return_error) { /* Error state */ @@ -2258,77 +2288,81 @@ fuzzy_check_io_callback (gint fd, short what, void *arg) /* Read something from network */ if (!fuzzy_check_session_is_completed (session)) { /* Need to read more */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, session->fd, EV_READ, - fuzzy_check_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); } } } -/* Fuzzy check timeout callback */ + static void -fuzzy_check_timer_callback (gint fd, short what, void *arg) +fuzzy_lua_fin (void *ud) { - struct fuzzy_client_session *session = arg; + struct fuzzy_learn_session *session = ud; + + (*session->saved)--; + + rspamd_ev_watcher_stop (session->event_loop, &session->ev); + close (session->fd); +} + +/* Controller IO */ + +static void +fuzzy_controller_timer_callback (gint fd, short what, void *arg) +{ + struct fuzzy_learn_session *session = arg; struct rspamd_task *task; - struct ev_loop *ev_base; task = session->task; - /* We might be here because of other checks being slow */ - if (fuzzy_check_try_read (session) > 0) { - if (fuzzy_check_session_is_completed (session)) { - return; - } - } - if (session->retransmits >= session->rule->ctx->retransmits) { - msg_err_task ("got IO timeout with server %s(%s), after %d retransmits", + rspamd_upstream_fail (session->server, TRUE); + msg_err_task_check ("got IO timeout with server %s(%s), " + "after %d retransmits", rspamd_upstream_name (session->server), rspamd_inet_address_to_string_pretty ( rspamd_upstream_addr_cur (session->server)), session->retransmits); - rspamd_upstream_fail (session->server, TRUE); - if (session->item) { - rspamd_symcache_item_async_dec_check (session->task, session->item, M); + if (session->session) { + rspamd_session_remove_event (session->session, fuzzy_lua_fin, + session); + } + else { + if (session->http_entry) { + rspamd_controller_send_error (session->http_entry, + 500, "IO timeout with fuzzy storage"); + } + + if (*session->saved > 0 ) { + (*session->saved)--; + if (*session->saved == 0) { + if (session->http_entry) { + rspamd_task_free (session->task); + } + + session->task = NULL; + } + } + + if (session->http_entry) { + rspamd_http_connection_unref (session->http_entry->conn); + } + + rspamd_ev_watcher_stop (session->event_loop, + &session->ev); + close (session->fd); } - rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session); } else { /* Plan write event */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_WRITE|EV_READ, - fuzzy_check_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); - - /* Plan new retransmit timer */ - ev_base = event_get_base (&session->timev); - event_del (&session->timev); - event_base_set (ev_base, &session->timev); - event_add (&session->timev, &session->tv); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ|EV_WRITE); session->retransmits ++; } } -static void -fuzzy_lua_fin (void *ud) -{ - struct fuzzy_learn_session *session = ud; - - (*session->saved)--; - - event_del (&session->ev); - event_del (&session->timev); - close (session->fd); -} - -/* Controller IO */ static void fuzzy_controller_io_callback (gint fd, short what, void *arg) { @@ -2340,7 +2374,6 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) struct fuzzy_cmd_io *io; struct rspamd_fuzzy_cmd *cmd = NULL; const gchar *symbol, *ftype; - struct ev_loop *ev_base; gint r; enum { return_error = 0, @@ -2355,7 +2388,8 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) if (what & EV_READ) { if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); return; } @@ -2482,16 +2516,14 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) } } else { - g_assert (0); + fuzzy_controller_timer_callback (fd, what, arg); + + return; } if (ret == return_want_more) { - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_READ, - fuzzy_controller_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); + rspamd_ev_watcher_reschedule (session->event_loop, + &session->ev, EV_READ); return; } @@ -2518,8 +2550,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg) rspamd_http_connection_unref (session->http_entry->conn); } - event_del (&session->ev); - event_del (&session->timev); + rspamd_ev_watcher_stop (session->event_loop, &session->ev); close (session->fd); if (*session->saved == 0) { @@ -2555,7 +2586,6 @@ cleanup: if (session->http_entry) { ucl_object_t *reply, *hashes; - guint i; gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1]; reply = ucl_object_typed_new (UCL_OBJECT); @@ -2588,72 +2618,6 @@ cleanup: } -static void -fuzzy_controller_timer_callback (gint fd, short what, void *arg) -{ - struct fuzzy_learn_session *session = arg; - struct rspamd_task *task; - struct ev_loop *ev_base; - - task = session->task; - - if (session->retransmits >= session->rule->ctx->retransmits) { - rspamd_upstream_fail (session->server, TRUE); - msg_err_task_check ("got IO timeout with server %s(%s), " - "after %d retransmits", - rspamd_upstream_name (session->server), - rspamd_inet_address_to_string_pretty ( - rspamd_upstream_addr_cur (session->server)), - session->retransmits); - - if (session->session) { - rspamd_session_remove_event (session->session, fuzzy_lua_fin, - session); - } - else { - if (session->http_entry) { - rspamd_controller_send_error (session->http_entry, - 500, "IO timeout with fuzzy storage"); - } - - if (*session->saved > 0 ) { - (*session->saved)--; - if (*session->saved == 0) { - if (session->http_entry) { - rspamd_task_free (session->task); - } - - session->task = NULL; - } - } - - if (session->http_entry) { - rspamd_http_connection_unref (session->http_entry->conn); - } - - event_del (&session->ev); - event_del (&session->timev); - close (session->fd); - } - } - else { - /* Plan write event */ - ev_base = event_get_base (&session->ev); - event_del (&session->ev); - event_set (&session->ev, fd, EV_WRITE|EV_READ, - fuzzy_controller_io_callback, session); - event_base_set (ev_base, &session->ev); - event_add (&session->ev, NULL); - - /* Plan new retransmit timer */ - ev_base = event_get_base (&session->timev); - event_del (&session->timev); - event_base_set (ev_base, &session->timev); - event_add (&session->timev, &session->tv); - session->retransmits ++; - } -} - static GPtrArray * fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule, gint c, gint flag, guint32 value, guint flags) @@ -2774,7 +2738,6 @@ register_fuzzy_client_call (struct rspamd_task *task, session = rspamd_mempool_alloc0 (task->task_pool, sizeof (struct fuzzy_client_session)); - msec_to_tv (rule->ctx->io_timeout, &session->tv); session->state = 0; session->commands = commands; session->task = task; @@ -2782,16 +2745,15 @@ register_fuzzy_client_call (struct rspamd_task *task, session->server = selected; session->rule = rule; session->results = g_ptr_array_sized_new (32); + session->event_loop = task->event_loop; - event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback, - session); - event_base_set (session->task->event_loop, &session->ev); - event_add (&session->ev, NULL); - - evtimer_set (&session->timev, fuzzy_check_timer_callback, + rspamd_ev_watcher_init (&session->ev, + sock, + EV_WRITE, + fuzzy_check_io_callback, session); - event_base_set (session->task->event_loop, &session->timev); - event_add (&session->timev, &session->tv); + rspamd_ev_watcher_start (session->event_loop, &session->ev, + ((double)rule->ctx->io_timeout) / 1000.0); rspamd_session_add_event (task->s, fuzzy_io_fin, session, M); session->item = rspamd_symcache_get_cur_item (task); @@ -2881,7 +2843,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, struct rspamd_controller_session *session = entry->ud; gint sock; gint ret = -1; - struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg); /* Get upstream */ @@ -2899,7 +2860,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, rspamd_mempool_alloc0 (session->pool, sizeof (struct fuzzy_learn_session)); - msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv); s->task = task; s->commands = commands; s->http_entry = entry; @@ -2908,17 +2868,17 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry, s->fd = sock; s->err = err; s->rule = rule; + s->event_loop = task->event_loop; /* We ref connection to avoid freeing before we process fuzzy rule */ rspamd_http_connection_ref (entry->conn); - event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); - event_base_set (entry->rt->event_loop, &s->ev); - event_add (&s->ev, NULL); - - evtimer_set (&s->timev, fuzzy_controller_timer_callback, + rspamd_ev_watcher_init (&s->ev, + sock, + EV_WRITE, + fuzzy_controller_io_callback, s); - event_base_set (s->task->event_loop, &s->timev); - event_add (&s->timev, &s->tv); + rspamd_ev_watcher_start (s->event_loop, &s->ev, + ((double)rule->ctx->io_timeout) / 1000.0); (*saved)++; ret = 1; @@ -3258,8 +3218,6 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, s = rspamd_mempool_alloc0 (task->task_pool, sizeof (struct fuzzy_learn_session)); - - msec_to_tv (rule->ctx->io_timeout, &s->tv); s->task = task; s->commands = commands; s->http_entry = NULL; @@ -3269,14 +3227,15 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule, s->err = err; s->rule = rule; s->session = task->s; + s->event_loop = task->event_loop; - event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s); - event_base_set (task->event_loop, &s->ev); - event_add (&s->ev, NULL); - - evtimer_set (&s->timev, fuzzy_controller_timer_callback, s); - event_base_set (s->task->event_loop, &s->timev); - event_add (&s->timev, &s->tv); + rspamd_ev_watcher_init (&s->ev, + sock, + EV_WRITE, + fuzzy_controller_io_callback, + s); + rspamd_ev_watcher_start (s->event_loop, &s->ev, + ((double)rule->ctx->io_timeout) / 1000.0); rspamd_session_add_event (task->s, fuzzy_lua_fin, @@ -3367,7 +3326,7 @@ static gint fuzzy_lua_learn_handler (lua_State *L) { struct rspamd_task *task = lua_check_task (L, 1); - guint flag = 0, weight = 1.0, send_flags = 0; + guint flag = 0, weight = 1, send_flags = 0; const gchar *symbol; struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg); diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 338bdaa24..3c467615c 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -1815,7 +1815,6 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task, const gchar *rule) { struct redirector_param *param; - struct timeval *timeout; struct upstream *selected; struct rspamd_http_message *msg; struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg); @@ -1851,8 +1850,6 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task, msg = rspamd_http_new_message (HTTP_REQUEST); msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen); param->redirector = selected; - timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval)); - double_to_tv (surbl_module_ctx->read_timeout, timeout); rspamd_session_add_event (task->s, free_redirector_session, param, @@ -1864,7 +1861,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task, } rspamd_http_connection_write_message (param->conn, msg, NULL, - NULL, param, timeout); + NULL, param, surbl_module_ctx->read_timeout); msg_info_surbl ( "<%s> registered redirector call for %*s to %s, according to rule: %s", -- cgit v1.2.3