aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-19 17:07:56 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commit675b33dd2025cc1f8e732efa9ffc72d55e5a35d9 (patch)
tree1492d1527bd84c5b032ada128d88a1fb12449102 /src
parenteeb0beb73d7769341d1b6aa8fac4f27f7dc76b2e (diff)
downloadrspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.tar.gz
rspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.zip
[Project] Adopt normal worker and contorller
Diffstat (limited to 'src')
-rw-r--r--src/controller.c89
-rw-r--r--src/libserver/task.c9
-rw-r--r--src/libserver/task.h2
-rw-r--r--src/libserver/worker_util.c8
-rw-r--r--src/libstat/backends/redis_backend.c5
-rw-r--r--src/rspamadm/control.c6
-rw-r--r--src/worker.c96
-rw-r--r--src/worker_private.h11
8 files changed, 87 insertions, 139 deletions
diff --git a/src/controller.c b/src/controller.c
index 851087945..374880952 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -134,8 +134,7 @@ struct rspamd_controller_worker_ctx {
/* Config */
struct rspamd_config *cfg;
/* END OF COMMON PART */
- guint32 timeout;
- struct timeval io_tv;
+ ev_tstamp timeout;
/* Whether we use ssl for this server */
gboolean use_ssl;
/* Webui password */
@@ -728,7 +727,7 @@ rspamd_controller_handle_auth (struct rspamd_http_connection_entry *conn_ent,
data[4] = st->actions_stat[METRIC_ACTION_SOFT_REJECT];
/* Get uptime */
- uptime = time (NULL) - session->ctx->start_time;
+ uptime = ev_time () - session->ctx->start_time;
ucl_object_insert_key (obj, ucl_object_fromstring (
RVERSION), "version", 0, false);
@@ -996,7 +995,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_controller_session *session = conn_ent->ud;
GList *cur;
struct rspamd_map *map;
- struct rspamd_map_backend *bk;
+ struct rspamd_map_backend *bk = NULL;
const rspamd_ftok_t *idstr;
struct stat st;
gint fd;
@@ -1037,7 +1036,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
cur = g_list_next (cur);
}
- if (!found) {
+ if (!found || bk == NULL) {
msg_info_session ("map not found");
rspamd_controller_send_error (conn_ent, 404, "Map not found");
return 0;
@@ -1075,7 +1074,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
rspamd_http_router_insert_headers (conn_ent->rt, reply);
rspamd_http_connection_write_message (conn_ent->conn, reply, NULL,
"text/plain", conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
return 0;
@@ -1385,13 +1384,13 @@ rspamd_controller_handle_legacy_history (
row = &copied_rows[row_num];
/* Get only completed rows */
if (row->completed) {
- rspamd_localtime (row->tv.tv_sec, &tm);
+ rspamd_localtime (row->timestamp, &tm);
strftime (timebuf, sizeof (timebuf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
obj = ucl_object_typed_new (UCL_OBJECT);
ucl_object_insert_key (obj, ucl_object_fromstring (
timebuf), "time", 0, false);
ucl_object_insert_key (obj, ucl_object_fromint (
- row->tv.tv_sec), "unix_time", 0, false);
+ row->timestamp), "unix_time", 0, false);
ucl_object_insert_key (obj, ucl_object_fromstring (
row->message_id), "id", 0, false);
ucl_object_insert_key (obj, ucl_object_fromstring (row->from_addr),
@@ -1935,7 +1934,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task)
rspamd_http_connection_reset (conn_ent->conn);
rspamd_http_router_insert_headers (conn_ent->rt, msg);
rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
- "application/json", conn_ent, conn_ent->rt->ptv);
+ "application/json", conn_ent, conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
@@ -2125,13 +2124,10 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
}
if (ctx->task_timeout > 0.0) {
- struct timeval task_tv;
-
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (ctx->event_loop, &task->timeout_ev);
- double_to_tv (ctx->task_timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ ctx->task_timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
}
end:
@@ -2210,6 +2206,7 @@ rspamd_controller_handle_saveactions (
switch (i) {
case 0:
+ default:
act = METRIC_ACTION_REJECT;
break;
case 1:
@@ -2404,7 +2401,7 @@ rspamd_controller_handle_savemap (struct rspamd_http_connection_entry *conn_ent,
{
struct rspamd_controller_session *session = conn_ent->ud;
GList *cur;
- struct rspamd_map *map;
+ struct rspamd_map *map = NULL;
struct rspamd_map_backend *bk;
struct rspamd_controller_worker_ctx *ctx;
const rspamd_ftok_t *idstr;
@@ -2903,7 +2900,7 @@ rspamd_controller_handle_ping (struct rspamd_http_connection_entry *conn_ent,
NULL,
"text/plain",
conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
return 0;
@@ -2937,7 +2934,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
NULL,
"text/plain",
conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
else {
@@ -2953,7 +2950,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
NULL,
"text/plain",
conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
@@ -3077,9 +3074,9 @@ rspamd_controller_finish_handler (struct rspamd_http_connection_entry *conn_ent)
}
static void
-rspamd_controller_accept_socket (gint fd, short what, void *arg)
+rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct rspamd_controller_worker_ctx *ctx;
struct rspamd_controller_session *session;
rspamd_inet_addr_t *addr;
@@ -3088,7 +3085,8 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
ctx = worker->ctx;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
@@ -3113,9 +3111,10 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
}
static void
-rspamd_controller_rrd_update (gint fd, short what, void *arg)
+rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_controller_worker_ctx *ctx = arg;
+ struct rspamd_controller_worker_ctx *ctx =
+ (struct rspamd_controller_worker_ctx *)w->data;
struct rspamd_stat *stat;
GArray ar;
gdouble points[METRIC_ACTION_MAX];
@@ -3139,8 +3138,7 @@ rspamd_controller_rrd_update (gint fd, short what, void *arg)
}
/* Plan new event */
- event_del (ctx->rrd_event);
- evtimer_add (ctx->rrd_event, &rrd_update_time);
+ ev_timer_again (ctx->event_loop, &ctx->rrd_event);
}
static void
@@ -3278,11 +3276,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
}
static void
-rspamd_controller_stats_save_periodic (int fd, short what, gpointer ud)
+rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_controller_worker_ctx *ctx = ud;
+ struct rspamd_controller_worker_ctx *ctx =
+ (struct rspamd_controller_worker_ctx *)w->data;
rspamd_controller_store_saved_stats (ctx);
+ ev_timer_again (EV_A_ w);
}
static void
@@ -3375,7 +3375,7 @@ init_controller_worker (struct rspamd_config *cfg)
ctx,
G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
timeout),
- RSPAMD_CL_FLAG_TIME_INTEGER,
+ RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol timeout");
rspamd_rcl_register_worker_option (cfg,
@@ -3573,7 +3573,7 @@ rspamd_controller_on_terminate (struct rspamd_worker *worker)
if (ctx->rrd) {
msg_info ("closing rrd file: %s", ctx->rrd->filename);
- event_del (ctx->rrd_event);
+ ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
rspamd_rrd_close (ctx->rrd);
}
@@ -3694,16 +3694,14 @@ start_controller_worker (struct rspamd_worker *worker)
GHashTableIter iter;
gpointer key, value;
guint i;
- struct timeval stv;
- const guint save_stats_interval = 60 * 1000; /* 1 minute */
+ const ev_tstamp save_stats_interval = 60; /* 1 minute */
gpointer m;
ctx->event_loop = rspamd_prepare_worker (worker,
"controller",
rspamd_controller_accept_socket);
- msec_to_tv (ctx->timeout, &ctx->io_tv);
- ctx->start_time = time (NULL);
+ ctx->start_time = ev_time ();
ctx->worker = worker;
ctx->cfg = worker->srv->cfg;
ctx->srv = worker->srv;
@@ -3746,10 +3744,10 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
if (ctx->rrd) {
- ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
- evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
- event_base_set (ctx->event_loop, ctx->rrd_event);
- event_add (ctx->rrd_event, &rrd_update_time);
+ ctx->rrd_event.data = ctx;
+ ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update,
+ rrd_update_time, rrd_update_time);
+ ev_timer_start (ctx->event_loop, &ctx->rrd_event);
}
else if (rrd_err) {
msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
@@ -3772,7 +3770,7 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
- rspamd_controller_finish_handler, &ctx->io_tv,
+ rspamd_controller_finish_handler, ctx->timeout,
ctx->static_files_dir, ctx->http_ctx);
/* Add callbacks for different methods */
@@ -3903,12 +3901,11 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->resolver, worker, TRUE);
/* Schedule periodic stats saving, see #1823 */
- event_set (&ctx->save_stats_event, -1, EV_PERSIST,
+ ctx->save_stats_event.data = ctx;
+ ev_timer_init (&ctx->save_stats_event,
rspamd_controller_stats_save_periodic,
- ctx);
- event_base_set (ctx->event_loop, &ctx->save_stats_event);
- msec_to_tv (save_stats_interval, &stv);
- evtimer_add (&ctx->save_stats_event, &stv);
+ save_stats_interval, save_stats_interval);
+ ev_timer_start (ctx->event_loop, &ctx->save_stats_event);
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
@@ -3918,7 +3915,7 @@ start_controller_worker (struct rspamd_worker *worker)
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
/* Start event loop */
- event_base_loop (ctx->event_loop, 0);
+ ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_stat_close ();
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 84ea1417a..3c92e05b9 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -316,13 +316,8 @@ rspamd_task_free (struct rspamd_task *task)
g_error_free (task->err);
}
- if (rspamd_event_pending (&task->timeout_ev, EV_TIMEOUT)) {
- event_del (&task->timeout_ev);
- }
-
- if (task->guard_ev) {
- event_del (task->guard_ev);
- }
+ ev_timer_stop (task->event_loop, &task->timeout_ev);
+ ev_io_stop (task->event_loop, &task->guard_ev);
if (task->sock != -1) {
close (task->sock);
diff --git a/src/libserver/task.h b/src/libserver/task.h
index a73102424..7b30f97cd 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -200,7 +200,7 @@ struct rspamd_task {
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
struct ev_loop *event_loop; /**< Event base */
struct ev_timer timeout_ev; /**< Global task timeout */
- struct ev_io *guard_ev; /**< Event for input sanity guard */
+ struct ev_io guard_ev; /**< Event for input sanity guard */
gpointer checkpoint; /**< Opaque checkpoint data */
ucl_object_t *settings; /**< Settings applied to task */
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 0aa0c9cf3..70d349c2c 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -274,10 +274,6 @@ void
rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
{
struct sigaction signals;
- /* We ignore these signals in the worker */
- rspamd_worker_ignore_signal (SIGPIPE);
- rspamd_worker_ignore_signal (SIGALRM);
- rspamd_worker_ignore_signal (SIGCHLD);
/* A set of terminating signals */
rspamd_worker_set_signal_handler (SIGTERM, worker, base,
@@ -298,11 +294,8 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
sigaddset (&signals.sa_mask, SIGTERM);
sigaddset (&signals.sa_mask, SIGINT);
sigaddset (&signals.sa_mask, SIGHUP);
- sigaddset (&signals.sa_mask, SIGCHLD);
sigaddset (&signals.sa_mask, SIGUSR1);
sigaddset (&signals.sa_mask, SIGUSR2);
- sigaddset (&signals.sa_mask, SIGALRM);
- sigaddset (&signals.sa_mask, SIGPIPE);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
}
@@ -345,6 +338,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
if (ls->fd != -1) {
accept_ev = g_malloc0 (sizeof (*accept_ev));
accept_ev->event_loop = event_loop;
+ accept_ev->accept_ev.data = worker;
ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
ev_io_start (event_loop, &accept_ev->accept_ev);
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 4e0d806f9..5d8ccc065 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -1039,9 +1039,7 @@ rspamd_redis_fin_learn (gpointer data)
rt->has_event = FALSE;
/* Stop timeout */
- if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
- event_del (&rt->timeout_event);
- }
+ ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
if (rt->redis) {
redis = rt->redis;
@@ -1654,7 +1652,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
struct upstream *up;
struct upstream_list *ups;
rspamd_inet_addr_t *addr;
- struct timeval tv;
rspamd_fstring_t *query;
const gchar *redis_cmd;
rspamd_token_t *tok;
diff --git a/src/rspamadm/control.c b/src/rspamadm/control.c
index 8a42bdac1..754d874a2 100644
--- a/src/rspamadm/control.c
+++ b/src/rspamadm/control.c
@@ -173,7 +173,6 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
struct rspamd_http_connection *conn;
struct rspamd_http_message *msg;
rspamd_inet_addr_t *addr;
- struct timeval tv;
static struct rspamadm_control_cbdata cbdata;
context = g_option_context_new (
@@ -239,16 +238,15 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
addr);
msg = rspamd_http_new_message (HTTP_REQUEST);
msg->url = rspamd_fstring_new_init (path, strlen (path));
- double_to_tv (timeout, &tv);
cbdata.argc = argc;
cbdata.argv = argv;
cbdata.path = path;
rspamd_http_connection_write_message (conn, msg, NULL, NULL, &cbdata,
- &tv);
+ timeout);
- event_base_loop (rspamd_main->event_loop, 0);
+ ev_loop (rspamd_main->event_loop, 0);
rspamd_http_connection_unref (conn);
rspamd_inet_address_free (addr);
diff --git a/src/worker.c b/src/worker.c
index 5a4adb325..ad0782b17 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -42,7 +42,7 @@
#include "lua/lua_common.h"
/* 60 seconds for worker's IO */
-#define DEFAULT_WORKER_IO_TIMEOUT 60000
+#define DEFAULT_WORKER_IO_TIMEOUT 60.0
gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
@@ -73,11 +73,10 @@ static gboolean
rspamd_worker_finalize (gpointer user_data)
{
struct rspamd_task *task = user_data;
- struct timeval tv = {.tv_sec = 0, .tv_usec = 0};
if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
msg_info_task ("finishing actions has been processed, terminating");
- event_base_loopexit (task->event_loop, &tv);
+ ev_break (task->event_loop, EVBREAK_ALL);
rspamd_session_destroy (task->s);
return TRUE;
@@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg)
}
void
-rspamd_task_timeout (gint fd, short what, gpointer ud)
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_task *task = (struct rspamd_task *) ud;
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
msg_info_task ("processing of task timed out, forced processing");
@@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
}
void
-rspamd_worker_guard_handler (gint fd, short what, void *data)
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_task *task = data;
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
gchar fake_buf[1024];
gssize r;
-#ifdef EV_CLOSED
- if (what == EV_CLOSED) {
- if (!(task->flags & RSPAMD_TASK_FLAG_JSON) &&
- task->cfg->enable_shutdown_workaround) {
- msg_info_task ("workaround for shutdown enabled, please update "
- "your client, this support might be removed in future");
- shutdown (fd, SHUT_RD);
- event_del (task->guard_ev);
- task->guard_ev = NULL;
- }
- else {
- msg_err_task ("the peer has closed connection unexpectedly");
- rspamd_session_destroy (task->s);
- }
-
- return;
- }
-#endif
-
- r = read (fd, fake_buf, sizeof (fake_buf));
+ r = read (w->fd, fake_buf, sizeof (fake_buf));
if (r > 0) {
msg_warn_task ("received extra data after task is loaded, ignoring");
@@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data)
task->cfg->enable_shutdown_workaround) {
msg_info_task ("workaround for shutdown enabled, please update "
"your client, this support might be removed in future");
- shutdown (fd, SHUT_RD);
- event_del (task->guard_ev);
- task->guard_ev = NULL;
+ shutdown (w->fd, SHUT_RD);
+ ev_io_stop (task->event_loop, &task->guard_ev);
}
else {
msg_err_task ("the peer has closed connection unexpectedly");
@@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
{
struct rspamd_task *task = (struct rspamd_task *) conn->ud;
struct rspamd_worker_ctx *ctx;
- struct timeval task_tv;
- struct event *guard_ev;
ctx = task->worker->ctx;
@@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
/* Set global timeout for the task */
if (ctx->task_timeout > 0.0) {
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (ctx->ev_base, &task->timeout_ev);
- double_to_tv (ctx->task_timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ ctx->task_timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
}
/* Set socket guard */
- guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
-#ifdef EV_CLOSED
- event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
- rspamd_worker_guard_handler, task);
-#else
- event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
- rspamd_worker_guard_handler, task);
-#endif
- event_base_set (task->event_loop, guard_ev);
- event_add (guard_ev, NULL);
- task->guard_ev = guard_ev;
+ task->guard_ev.data = task;
+ ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+ ev_io_start (task->event_loop, &task->guard_ev);
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
@@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
* Accept new connection and construct task
*/
static void
-accept_socket (gint fd, short what, void *arg)
+accept_socket (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_worker_ctx *ctx;
struct rspamd_task *task;
rspamd_inet_addr_t *addr;
@@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg)
}
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
@@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg)
return;
}
- task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base);
+ task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
msg_info_task ("accepted connection from %s port %d, task ptr: %p",
rspamd_inet_address_to_string (addr),
@@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg)
rspamd_http_connection_read_message (task->http_conn,
task,
- &ctx->io_tv);
+ ctx->timeout);
}
#ifdef WITH_HYPERSCAN
@@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg)
ctx,
G_STRUCT_OFFSET (struct rspamd_worker_ctx,
timeout),
- RSPAMD_CL_FLAG_TIME_INTEGER,
+ RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol IO timeout");
rspamd_rcl_register_worker_option (cfg,
@@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker)
struct rspamd_worker_ctx *ctx = worker->ctx;
ctx->cfg = worker->srv->cfg;
- ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
- msec_to_tv (ctx->timeout, &ctx->io_tv);
- rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+ ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
+ rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
worker);
if (isnan (ctx->task_timeout)) {
@@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker)
}
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
- ctx->ev_base,
+ ctx->event_loop,
worker->srv->cfg);
- rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
- ctx->ev_base, ctx->resolver->r);
+ ctx->event_loop, ctx->resolver->r);
- ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+ ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
- rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+ rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
- rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+ rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
- event_base_loop (ctx->ev_base, 0);
+ ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_stat_close ();
diff --git a/src/worker_private.h b/src/worker_private.h
index 35a2b465b..6d0e763aa 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -30,14 +30,13 @@ struct rspamd_lang_detector;
struct rspamd_worker_ctx {
guint64 magic;
/* Events base */
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
struct rspamd_config *cfg;
- guint32 timeout;
- struct timeval io_tv;
+ ev_tstamp timeout;
/* Detect whether this worker is mime worker */
gboolean is_mime;
/* Allow encrypted requests only using network */
@@ -45,7 +44,7 @@ struct rspamd_worker_ctx {
/* Limit of tasks */
guint32 max_tasks;
/* Maximum time for task processing */
- gdouble task_timeout;
+ ev_tstamp task_timeout;
/* Encryption key */
struct rspamd_cryptobox_keypair *key;
/* Keys cache */
@@ -64,11 +63,11 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
/*
* Called on forced timeout
*/
-void rspamd_task_timeout (gint fd, short what, gpointer ud);
+void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
/*
* Called on unexpected IO error (e.g. ECONNRESET)
*/
-void rspamd_worker_guard_handler (gint fd, short what, void *data);
+void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
#endif