aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-19 13:16:25 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commit45b60f8df79e6d5ecdb85725aa4841151bdcd853 (patch)
treee7ee4d6739eb37edf60a7e2105e7b3f31b7f600c /src
parent0334b8e433a45513c0087dda20f22a26b2e16ad1 (diff)
downloadrspamd-45b60f8df79e6d5ecdb85725aa4841151bdcd853.tar.gz
rspamd-45b60f8df79e6d5ecdb85725aa4841151bdcd853.zip
[Project] More libserver adoptions
Diffstat (limited to 'src')
-rw-r--r--src/libserver/rspamd_control.c154
-rw-r--r--src/plugins/fuzzy_check.c299
-rw-r--r--src/plugins/surbl.c5
3 files changed, 204 insertions, 254 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 9788d47ed..62ca24643 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -19,6 +19,7 @@
#include "worker_util.h"
#include "libutil/http_connection.h"
#include "libutil/http_private.h"
+#include "libutil/libev_helper.h"
#include "unix-std.h"
#include "utlist.h"
@@ -26,20 +27,14 @@
#include <sys/resource.h>
#endif
-static struct timeval io_timeout = {
- .tv_sec = 30,
- .tv_usec = 0
-};
-static struct timeval worker_io_timeout = {
- .tv_sec = 0,
- .tv_usec = 500000
-};
+static ev_tstamp io_timeout = 30.0;
+static ev_tstamp worker_io_timeout = 0.5;
struct rspamd_control_session;
struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
- struct event io_ev;
+ struct rspamd_io_ev ev;
struct rspamd_worker *wrk;
gpointer ud;
gint attached_fd;
@@ -48,6 +43,7 @@ struct rspamd_control_reply_elt {
struct rspamd_control_session {
gint fd;
+ struct ev_loop *event_loop;
struct rspamd_main *rspamd_main;
struct rspamd_http_connection *conn;
struct rspamd_control_command cmd;
@@ -131,7 +127,7 @@ rspamd_control_send_error (struct rspamd_control_session *session,
NULL,
"application/json",
session,
- &io_timeout);
+ io_timeout);
}
static void
@@ -154,7 +150,7 @@ rspamd_control_send_ucl (struct rspamd_control_session *session,
NULL,
"application/json",
session,
- &io_timeout);
+ io_timeout);
}
static void
@@ -168,7 +164,8 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
rspamd_inet_address_to_string (session->addr));
DL_FOREACH_SAFE (session->replies, elt, telt) {
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (session->event_loop,
+ &elt->ev);
g_free (elt);
}
@@ -358,7 +355,8 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud)
}
session->replies_remain --;
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (session->event_loop,
+ &elt->ev);
if (session->replies_remain == 0) {
rspamd_control_write_reply (session);
@@ -434,12 +432,12 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
rep_elt = g_malloc0 (sizeof (*rep_elt));
rep_elt->wrk = wrk;
rep_elt->ud = ud;
- event_set (&rep_elt->io_ev, wrk->control_pipe[0],
- EV_READ | EV_PERSIST, handler,
+ rspamd_ev_watcher_init (&rep_elt->ev,
+ wrk->control_pipe[0],
+ EV_READ, handler,
rep_elt);
- event_base_set (rspamd_main->event_loop,
- &rep_elt->io_ev);
- event_add (&rep_elt->io_ev, &worker_io_timeout);
+ rspamd_ev_watcher_start (rspamd_main->event_loop,
+ &rep_elt->ev, worker_io_timeout);
DL_APPEND (res, rep_elt);
}
@@ -527,11 +525,11 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main,
session->rspamd_main = rspamd_main;
session->addr = addr;
rspamd_http_connection_read_message (session->conn, session,
- &io_timeout);
+ io_timeout);
}
struct rspamd_worker_control_data {
- struct event io_ev;
+ ev_io io_ev;
struct rspamd_worker *worker;
struct ev_loop *ev_base;
struct {
@@ -613,9 +611,10 @@ rspamd_control_default_cmd_handler (gint fd,
}
static void
-rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
+rspamd_control_default_worker_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker_control_data *cd = ud;
+ struct rspamd_worker_control_data *cd =
+ (struct rspamd_worker_control_data *)w->data;
static struct rspamd_control_command cmd;
static struct msghdr msg;
static struct iovec iov;
@@ -631,15 +630,15 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = recvmsg (fd, &msg, 0);
+ r = recvmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot read request from the control socket: %s",
strerror (errno));
if (errno != EAGAIN && errno != EINTR) {
- event_del (&cd->io_ev);
- close (fd);
+ ev_io_stop (cd->ev_base, &cd->io_ev);
+ close (w->fd);
}
}
else if (r < (gint)sizeof (cmd)) {
@@ -647,8 +646,8 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
(gint)sizeof (cmd));
if (r == 0) {
- event_del (&cd->io_ev);
- close (fd);
+ ev_io_stop (cd->ev_base, &cd->io_ev);
+ close (w->fd);
}
}
else if ((gint)cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) {
@@ -660,13 +659,13 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
if (cd->handlers[cmd.type].handler) {
cd->handlers[cmd.type].handler (cd->worker->srv,
cd->worker,
- fd,
+ w->fd,
rfd,
&cmd,
cd->handlers[cmd.type].ud);
}
else {
- rspamd_control_default_cmd_handler (fd, rfd, cd, &cmd);
+ rspamd_control_default_cmd_handler (w->fd, rfd, cd, &cmd);
}
}
else {
@@ -684,10 +683,10 @@ rspamd_control_worker_add_default_handler (struct rspamd_worker *worker,
cd->worker = worker;
cd->ev_base = ev_base;
- event_set (&cd->io_ev, worker->control_pipe[1], EV_READ | EV_PERSIST,
- rspamd_control_default_worker_handler, cd);
- event_base_set (ev_base, &cd->io_ev);
- event_add (&cd->io_ev, NULL);
+ cd->io_ev.data = cd;
+ ev_io_init (&cd->io_ev, rspamd_control_default_worker_handler,
+ worker->control_pipe[1], EV_READ);
+ ev_io_start (ev_base, &cd->io_ev);
worker->control_data = cd;
}
@@ -720,26 +719,28 @@ struct rspamd_srv_reply_data {
};
static void
-rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_hs_io_handler (int fd, short what, void *ud)
{
- struct rspamd_control_reply_elt *elt = ud;
+ struct rspamd_control_reply_elt *elt =
+ (struct rspamd_control_reply_elt *)ud;
struct rspamd_control_reply rep;
/* At this point we just ignore replies from the workers */
(void)read (fd, &rep, sizeof (rep));
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
g_free (elt);
}
static void
-rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
{
- struct rspamd_control_reply_elt *elt = ud;
+ struct rspamd_control_reply_elt *elt =
+ (struct rspamd_control_reply_elt *)ud;
struct rspamd_control_reply rep;
/* At this point we just ignore replies from the workers */
(void) read (fd, &rep, sizeof (rep));
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
g_free (elt);
}
@@ -794,7 +795,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
}
static void
-rspamd_srv_handler (gint fd, short what, gpointer ud)
+rspamd_srv_handler (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker;
static struct rspamd_srv_command cmd;
@@ -809,8 +810,8 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
struct rspamd_control_command wcmd;
gssize r;
- if (what == EV_READ) {
- worker = ud;
+ if (revents == EV_READ) {
+ worker = (struct rspamd_worker *)w->data;
srv = worker->srv;
iov.iov_base = &cmd;
iov.iov_len = sizeof (cmd);
@@ -820,7 +821,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = recvmsg (fd, &msg, 0);
+ r = recvmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot read from worker's srv pipe: %s",
@@ -831,7 +832,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
* Usually this means that a worker is dead, so do not try to read
* anything
*/
- event_del (&worker->srv_ev);
+ ev_io_stop (EV_A_ w);
}
else if (r != sizeof (cmd)) {
msg_err ("cannot read from worker's srv pipe incomplete command: %d",
@@ -919,17 +920,14 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
}
/* Now plan write event and send data back */
- event_del (&worker->srv_ev);
- event_set (&worker->srv_ev,
- worker->srv_pipe[0],
- EV_WRITE,
- rspamd_srv_handler,
- rdata);
- event_add (&worker->srv_ev, NULL);
+ w->data = rdata;
+ ev_io_stop (EV_A_ w);
+ ev_io_set (w, worker->srv_pipe[0], EV_WRITE);
+ ev_io_start (EV_A_ w);
}
}
- else if (what == EV_WRITE) {
- rdata = ud;
+ else if (revents == EV_WRITE) {
+ rdata = (struct rspamd_srv_reply_data *)w->data;
worker = rdata->worker;
worker->tmp_data = NULL; /* Avoid race */
srv = rdata->srv;
@@ -953,7 +951,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = sendmsg (fd, &msg, 0);
+ r = sendmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot write to worker's srv pipe: %s",
@@ -961,13 +959,10 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
}
g_free (rdata);
- event_del (&worker->srv_ev);
- event_set (&worker->srv_ev,
- worker->srv_pipe[0],
- EV_READ | EV_PERSIST,
- rspamd_srv_handler,
- worker);
- event_add (&worker->srv_ev, NULL);
+ w->data = worker;
+ ev_io_stop (EV_A_ w);
+ ev_io_set (w, worker->srv_pipe[0], EV_READ);
+ ev_io_start (EV_A_ w);
}
}
@@ -979,10 +974,9 @@ rspamd_srv_start_watching (struct rspamd_main *srv,
g_assert (worker != NULL);
worker->tmp_data = NULL;
- event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
- rspamd_srv_handler, worker);
- event_base_set (ev_base, &worker->srv_ev);
- event_add (&worker->srv_ev, NULL);
+ worker->srv_ev.data = worker;
+ ev_io_init (&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ);
+ ev_io_start (ev_base, &worker->srv_ev);
}
struct rspamd_srv_request_data {
@@ -991,14 +985,14 @@ struct rspamd_srv_request_data {
gint attached_fd;
struct rspamd_srv_reply rep;
rspamd_srv_reply_handler handler;
- struct event io_ev;
+ ev_io io_ev;
gpointer ud;
};
static void
-rspamd_srv_request_handler (gint fd, short what, gpointer ud)
+rspamd_srv_request_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_srv_request_data *rd = ud;
+ struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *)w->data;
struct msghdr msg;
struct iovec iov;
guchar fdspace[CMSG_SPACE(sizeof (int))];
@@ -1006,7 +1000,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
gssize r;
gint rfd = -1;
- if (what == EV_WRITE) {
+ if (revents == EV_WRITE) {
/* Send request to server */
memset (&msg, 0, sizeof (msg));
@@ -1027,17 +1021,16 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = sendmsg (fd, &msg, 0);
+ r = sendmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot write to server pipe: %s", strerror (errno));
goto cleanup;
}
- event_del (&rd->io_ev);
- event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ,
- rspamd_srv_request_handler, rd);
- event_add (&rd->io_ev, NULL);
+ ev_io_stop (EV_A_ w);
+ ev_io_set (w, rd->worker->srv_pipe[1], EV_READ);
+ ev_io_start (EV_A_ w);
}
else {
iov.iov_base = &rd->rep;
@@ -1048,7 +1041,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = recvmsg (fd, &msg, 0);
+ r = recvmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot read from server pipe: %s", strerror (errno));
@@ -1075,7 +1068,8 @@ cleanup:
if (rd->handler) {
rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
}
- event_del (&rd->io_ev);
+
+ ev_io_stop (EV_A_ w);
g_free (rd);
}
@@ -1102,8 +1096,8 @@ rspamd_srv_send_command (struct rspamd_worker *worker,
rd->rep.type = cmd->type;
rd->attached_fd = attached_fd;
- event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE,
- rspamd_srv_request_handler, rd);
- event_base_set (ev_base, &rd->io_ev);
- event_add (&rd->io_ev, NULL);
+ rd->io_ev.data = rd;
+ ev_io_init (&rd->io_ev, rspamd_srv_request_handler,
+ rd->worker->srv_pipe[1], EV_WRITE);
+ ev_io_start (ev_base, &rd->io_ev);
}
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 2c91869d6..75df2a645 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -47,6 +47,7 @@
#include "libstat/stat_api.h"
#include <math.h>
#include <src/libmime/message.h>
+#include "libutil/libev_helper.h"
#define DEFAULT_SYMBOL "R_FUZZY_HASH"
@@ -129,9 +130,8 @@ struct fuzzy_client_session {
struct rspamd_symcache_item *item;
struct upstream *server;
struct fuzzy_rule *rule;
- struct event ev;
- struct event timev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
gint state;
gint fd;
guint retransmits;
@@ -146,9 +146,8 @@ struct fuzzy_learn_session {
struct upstream *server;
struct fuzzy_rule *rule;
struct rspamd_task *task;
- struct event ev;
- struct event timev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
gint fd;
guint retransmits;
};
@@ -1185,8 +1184,7 @@ fuzzy_io_fin (void *ud)
g_ptr_array_free (session->results, TRUE);
}
- event_del (&session->ev);
- event_del (&session->timev);
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
close (session->fd);
}
@@ -2181,13 +2179,49 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session)
return FALSE;
}
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_client_session *session = arg;
+ struct rspamd_task *task;
+
+ task = session->task;
+
+ /* We might be here because of other checks being slow */
+ if (fuzzy_check_try_read (session) > 0) {
+ if (fuzzy_check_session_is_completed (session)) {
+ return;
+ }
+ }
+
+ if (session->retransmits >= session->rule->ctx->retransmits) {
+ msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+ rspamd_upstream_name (session->server),
+ rspamd_inet_address_to_string_pretty (
+ rspamd_upstream_addr_cur (session->server)),
+ session->retransmits);
+ rspamd_upstream_fail (session->server, TRUE);
+
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+ }
+ rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Plan write event */
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ|EV_WRITE);
+ session->retransmits ++;
+ }
+}
+
/* Fuzzy check callback */
static void
fuzzy_check_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
struct rspamd_task *task;
- struct ev_loop *ev_base;
gint r;
enum {
@@ -2224,18 +2258,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
}
}
else {
- /* Should not happen */
- g_assert (0);
+ fuzzy_check_timer_callback (fd, what, arg);
+ return;
}
if (ret == return_want_more) {
/* Processed write, switch to reading */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
}
else if (ret == return_error) {
/* Error state */
@@ -2258,78 +2288,82 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
/* Read something from network */
if (!fuzzy_check_session_is_completed (session)) {
/* Need to read more */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, session->fd, EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
}
}
}
-/* Fuzzy check timeout callback */
+
static void
-fuzzy_check_timer_callback (gint fd, short what, void *arg)
+fuzzy_lua_fin (void *ud)
{
- struct fuzzy_client_session *session = arg;
+ struct fuzzy_learn_session *session = ud;
+
+ (*session->saved)--;
+
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
+ close (session->fd);
+}
+
+/* Controller IO */
+
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_learn_session *session = arg;
struct rspamd_task *task;
- struct ev_loop *ev_base;
task = session->task;
- /* We might be here because of other checks being slow */
- if (fuzzy_check_try_read (session) > 0) {
- if (fuzzy_check_session_is_completed (session)) {
- return;
- }
- }
-
if (session->retransmits >= session->rule->ctx->retransmits) {
- msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+ rspamd_upstream_fail (session->server, TRUE);
+ msg_err_task_check ("got IO timeout with server %s(%s), "
+ "after %d retransmits",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr_cur (session->server)),
session->retransmits);
- rspamd_upstream_fail (session->server, TRUE);
- if (session->item) {
- rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+ if (session->session) {
+ rspamd_session_remove_event (session->session, fuzzy_lua_fin,
+ session);
+ }
+ else {
+ if (session->http_entry) {
+ rspamd_controller_send_error (session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ }
+
+ if (*session->saved > 0 ) {
+ (*session->saved)--;
+ if (*session->saved == 0) {
+ if (session->http_entry) {
+ rspamd_task_free (session->task);
+ }
+
+ session->task = NULL;
+ }
+ }
+
+ if (session->http_entry) {
+ rspamd_http_connection_unref (session->http_entry->conn);
+ }
+
+ rspamd_ev_watcher_stop (session->event_loop,
+ &session->ev);
+ close (session->fd);
}
- rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_WRITE|EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
-
- /* Plan new retransmit timer */
- ev_base = event_get_base (&session->timev);
- event_del (&session->timev);
- event_base_set (ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ|EV_WRITE);
session->retransmits ++;
}
}
static void
-fuzzy_lua_fin (void *ud)
-{
- struct fuzzy_learn_session *session = ud;
-
- (*session->saved)--;
-
- event_del (&session->ev);
- event_del (&session->timev);
- close (session->fd);
-}
-
-/* Controller IO */
-static void
fuzzy_controller_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_learn_session *session = arg;
@@ -2340,7 +2374,6 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
struct fuzzy_cmd_io *io;
struct rspamd_fuzzy_cmd *cmd = NULL;
const gchar *symbol, *ftype;
- struct ev_loop *ev_base;
gint r;
enum {
return_error = 0,
@@ -2355,7 +2388,8 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
if (what & EV_READ) {
if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
return;
}
@@ -2482,16 +2516,14 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
}
}
else {
- g_assert (0);
+ fuzzy_controller_timer_callback (fd, what, arg);
+
+ return;
}
if (ret == return_want_more) {
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ,
- fuzzy_controller_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
return;
}
@@ -2518,8 +2550,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
rspamd_http_connection_unref (session->http_entry->conn);
}
- event_del (&session->ev);
- event_del (&session->timev);
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
close (session->fd);
if (*session->saved == 0) {
@@ -2555,7 +2586,6 @@ cleanup:
if (session->http_entry) {
ucl_object_t *reply, *hashes;
- guint i;
gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
reply = ucl_object_typed_new (UCL_OBJECT);
@@ -2588,72 +2618,6 @@ cleanup:
}
-static void
-fuzzy_controller_timer_callback (gint fd, short what, void *arg)
-{
- struct fuzzy_learn_session *session = arg;
- struct rspamd_task *task;
- struct ev_loop *ev_base;
-
- task = session->task;
-
- if (session->retransmits >= session->rule->ctx->retransmits) {
- rspamd_upstream_fail (session->server, TRUE);
- msg_err_task_check ("got IO timeout with server %s(%s), "
- "after %d retransmits",
- rspamd_upstream_name (session->server),
- rspamd_inet_address_to_string_pretty (
- rspamd_upstream_addr_cur (session->server)),
- session->retransmits);
-
- if (session->session) {
- rspamd_session_remove_event (session->session, fuzzy_lua_fin,
- session);
- }
- else {
- if (session->http_entry) {
- rspamd_controller_send_error (session->http_entry,
- 500, "IO timeout with fuzzy storage");
- }
-
- if (*session->saved > 0 ) {
- (*session->saved)--;
- if (*session->saved == 0) {
- if (session->http_entry) {
- rspamd_task_free (session->task);
- }
-
- session->task = NULL;
- }
- }
-
- if (session->http_entry) {
- rspamd_http_connection_unref (session->http_entry->conn);
- }
-
- event_del (&session->ev);
- event_del (&session->timev);
- close (session->fd);
- }
- }
- else {
- /* Plan write event */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_WRITE|EV_READ,
- fuzzy_controller_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
-
- /* Plan new retransmit timer */
- ev_base = event_get_base (&session->timev);
- event_del (&session->timev);
- event_base_set (ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
- session->retransmits ++;
- }
-}
-
static GPtrArray *
fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule,
gint c, gint flag, guint32 value, guint flags)
@@ -2774,7 +2738,6 @@ register_fuzzy_client_call (struct rspamd_task *task,
session =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_client_session));
- msec_to_tv (rule->ctx->io_timeout, &session->tv);
session->state = 0;
session->commands = commands;
session->task = task;
@@ -2782,16 +2745,15 @@ register_fuzzy_client_call (struct rspamd_task *task,
session->server = selected;
session->rule = rule;
session->results = g_ptr_array_sized_new (32);
+ session->event_loop = task->event_loop;
- event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
- session);
- event_base_set (session->task->event_loop, &session->ev);
- event_add (&session->ev, NULL);
-
- evtimer_set (&session->timev, fuzzy_check_timer_callback,
+ rspamd_ev_watcher_init (&session->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_check_io_callback,
session);
- event_base_set (session->task->event_loop, &session->timev);
- event_add (&session->timev, &session->tv);
+ rspamd_ev_watcher_start (session->event_loop, &session->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
rspamd_session_add_event (task->s, fuzzy_io_fin, session, M);
session->item = rspamd_symcache_get_cur_item (task);
@@ -2881,7 +2843,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
struct rspamd_controller_session *session = entry->ud;
gint sock;
gint ret = -1;
- struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);
/* Get upstream */
@@ -2899,7 +2860,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
rspamd_mempool_alloc0 (session->pool,
sizeof (struct fuzzy_learn_session));
- msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
s->http_entry = entry;
@@ -2908,17 +2868,17 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
s->fd = sock;
s->err = err;
s->rule = rule;
+ s->event_loop = task->event_loop;
/* We ref connection to avoid freeing before we process fuzzy rule */
rspamd_http_connection_ref (entry->conn);
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (entry->rt->event_loop, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback,
+ rspamd_ev_watcher_init (&s->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_controller_io_callback,
s);
- event_base_set (s->task->event_loop, &s->timev);
- event_add (&s->timev, &s->tv);
+ rspamd_ev_watcher_start (s->event_loop, &s->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
(*saved)++;
ret = 1;
@@ -3258,8 +3218,6 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
s =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_learn_session));
-
- msec_to_tv (rule->ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
s->http_entry = NULL;
@@ -3269,14 +3227,15 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
s->err = err;
s->rule = rule;
s->session = task->s;
+ s->event_loop = task->event_loop;
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (task->event_loop, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
- event_base_set (s->task->event_loop, &s->timev);
- event_add (&s->timev, &s->tv);
+ rspamd_ev_watcher_init (&s->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_controller_io_callback,
+ s);
+ rspamd_ev_watcher_start (s->event_loop, &s->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
rspamd_session_add_event (task->s,
fuzzy_lua_fin,
@@ -3367,7 +3326,7 @@ static gint
fuzzy_lua_learn_handler (lua_State *L)
{
struct rspamd_task *task = lua_check_task (L, 1);
- guint flag = 0, weight = 1.0, send_flags = 0;
+ guint flag = 0, weight = 1, send_flags = 0;
const gchar *symbol;
struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index 338bdaa24..3c467615c 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -1815,7 +1815,6 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
const gchar *rule)
{
struct redirector_param *param;
- struct timeval *timeout;
struct upstream *selected;
struct rspamd_http_message *msg;
struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);
@@ -1851,8 +1850,6 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
msg = rspamd_http_new_message (HTTP_REQUEST);
msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
param->redirector = selected;
- timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
- double_to_tv (surbl_module_ctx->read_timeout, timeout);
rspamd_session_add_event (task->s,
free_redirector_session, param,
@@ -1864,7 +1861,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
}
rspamd_http_connection_write_message (param->conn, msg, NULL,
- NULL, param, timeout);
+ NULL, param, surbl_module_ctx->read_timeout);
msg_info_surbl (
"<%s> registered redirector call for %*s to %s, according to rule: %s",