diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-06-19 17:07:56 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-06-22 10:57:29 +0100 |
commit | 675b33dd2025cc1f8e732efa9ffc72d55e5a35d9 (patch) | |
tree | 1492d1527bd84c5b032ada128d88a1fb12449102 /src | |
parent | eeb0beb73d7769341d1b6aa8fac4f27f7dc76b2e (diff) | |
download | rspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.tar.gz rspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.zip |
[Project] Adopt normal worker and contorller
Diffstat (limited to 'src')
-rw-r--r-- | src/controller.c | 89 | ||||
-rw-r--r-- | src/libserver/task.c | 9 | ||||
-rw-r--r-- | src/libserver/task.h | 2 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 8 | ||||
-rw-r--r-- | src/libstat/backends/redis_backend.c | 5 | ||||
-rw-r--r-- | src/rspamadm/control.c | 6 | ||||
-rw-r--r-- | src/worker.c | 96 | ||||
-rw-r--r-- | src/worker_private.h | 11 |
8 files changed, 87 insertions, 139 deletions
diff --git a/src/controller.c b/src/controller.c index 851087945..374880952 100644 --- a/src/controller.c +++ b/src/controller.c @@ -134,8 +134,7 @@ struct rspamd_controller_worker_ctx { /* Config */ struct rspamd_config *cfg; /* END OF COMMON PART */ - guint32 timeout; - struct timeval io_tv; + ev_tstamp timeout; /* Whether we use ssl for this server */ gboolean use_ssl; /* Webui password */ @@ -728,7 +727,7 @@ rspamd_controller_handle_auth (struct rspamd_http_connection_entry *conn_ent, data[4] = st->actions_stat[METRIC_ACTION_SOFT_REJECT]; /* Get uptime */ - uptime = time (NULL) - session->ctx->start_time; + uptime = ev_time () - session->ctx->start_time; ucl_object_insert_key (obj, ucl_object_fromstring ( RVERSION), "version", 0, false); @@ -996,7 +995,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent, struct rspamd_controller_session *session = conn_ent->ud; GList *cur; struct rspamd_map *map; - struct rspamd_map_backend *bk; + struct rspamd_map_backend *bk = NULL; const rspamd_ftok_t *idstr; struct stat st; gint fd; @@ -1037,7 +1036,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent, cur = g_list_next (cur); } - if (!found) { + if (!found || bk == NULL) { msg_info_session ("map not found"); rspamd_controller_send_error (conn_ent, 404, "Map not found"); return 0; @@ -1075,7 +1074,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent, rspamd_http_router_insert_headers (conn_ent->rt, reply); rspamd_http_connection_write_message (conn_ent->conn, reply, NULL, "text/plain", conn_ent, - conn_ent->rt->ptv); + conn_ent->rt->timeout); conn_ent->is_reply = TRUE; return 0; @@ -1385,13 +1384,13 @@ rspamd_controller_handle_legacy_history ( row = &copied_rows[row_num]; /* Get only completed rows */ if (row->completed) { - rspamd_localtime (row->tv.tv_sec, &tm); + rspamd_localtime (row->timestamp, &tm); strftime (timebuf, sizeof (timebuf) - 1, "%Y-%m-%d %H:%M:%S", &tm); obj = ucl_object_typed_new (UCL_OBJECT); ucl_object_insert_key (obj, ucl_object_fromstring ( timebuf), "time", 0, false); ucl_object_insert_key (obj, ucl_object_fromint ( - row->tv.tv_sec), "unix_time", 0, false); + row->timestamp), "unix_time", 0, false); ucl_object_insert_key (obj, ucl_object_fromstring ( row->message_id), "id", 0, false); ucl_object_insert_key (obj, ucl_object_fromstring (row->from_addr), @@ -1935,7 +1934,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task) rspamd_http_connection_reset (conn_ent->conn); rspamd_http_router_insert_headers (conn_ent->rt, msg); rspamd_http_connection_write_message (conn_ent->conn, msg, NULL, - "application/json", conn_ent, conn_ent->rt->ptv); + "application/json", conn_ent, conn_ent->rt->timeout); conn_ent->is_reply = TRUE; } @@ -2125,13 +2124,10 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent, } if (ctx->task_timeout > 0.0) { - struct timeval task_tv; - - event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, - task); - event_base_set (ctx->event_loop, &task->timeout_ev); - double_to_tv (ctx->task_timeout, &task_tv); - event_add (&task->timeout_ev, &task_tv); + task->timeout_ev.data = task; + ev_timer_init (&task->timeout_ev, rspamd_task_timeout, + ctx->task_timeout, 0.0); + ev_timer_start (task->event_loop, &task->timeout_ev); } end: @@ -2210,6 +2206,7 @@ rspamd_controller_handle_saveactions ( switch (i) { case 0: + default: act = METRIC_ACTION_REJECT; break; case 1: @@ -2404,7 +2401,7 @@ rspamd_controller_handle_savemap (struct rspamd_http_connection_entry *conn_ent, { struct rspamd_controller_session *session = conn_ent->ud; GList *cur; - struct rspamd_map *map; + struct rspamd_map *map = NULL; struct rspamd_map_backend *bk; struct rspamd_controller_worker_ctx *ctx; const rspamd_ftok_t *idstr; @@ -2903,7 +2900,7 @@ rspamd_controller_handle_ping (struct rspamd_http_connection_entry *conn_ent, NULL, "text/plain", conn_ent, - conn_ent->rt->ptv); + conn_ent->rt->timeout); conn_ent->is_reply = TRUE; return 0; @@ -2937,7 +2934,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent, NULL, "text/plain", conn_ent, - conn_ent->rt->ptv); + conn_ent->rt->timeout); conn_ent->is_reply = TRUE; } else { @@ -2953,7 +2950,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent, NULL, "text/plain", conn_ent, - conn_ent->rt->ptv); + conn_ent->rt->timeout); conn_ent->is_reply = TRUE; } @@ -3077,9 +3074,9 @@ rspamd_controller_finish_handler (struct rspamd_http_connection_entry *conn_ent) } static void -rspamd_controller_accept_socket (gint fd, short what, void *arg) +rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents) { - struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_worker *worker = (struct rspamd_worker *)w->data; struct rspamd_controller_worker_ctx *ctx; struct rspamd_controller_session *session; rspamd_inet_addr_t *addr; @@ -3088,7 +3085,8 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg) ctx = worker->ctx; if ((nfd = - rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { + rspamd_accept_from_socket (w->fd, &addr, + rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) { msg_warn_ctx ("accept failed: %s", strerror (errno)); return; } @@ -3113,9 +3111,10 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg) } static void -rspamd_controller_rrd_update (gint fd, short what, void *arg) +rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents) { - struct rspamd_controller_worker_ctx *ctx = arg; + struct rspamd_controller_worker_ctx *ctx = + (struct rspamd_controller_worker_ctx *)w->data; struct rspamd_stat *stat; GArray ar; gdouble points[METRIC_ACTION_MAX]; @@ -3139,8 +3138,7 @@ rspamd_controller_rrd_update (gint fd, short what, void *arg) } /* Plan new event */ - event_del (ctx->rrd_event); - evtimer_add (ctx->rrd_event, &rrd_update_time); + ev_timer_again (ctx->event_loop, &ctx->rrd_event); } static void @@ -3278,11 +3276,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx) } static void -rspamd_controller_stats_save_periodic (int fd, short what, gpointer ud) +rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents) { - struct rspamd_controller_worker_ctx *ctx = ud; + struct rspamd_controller_worker_ctx *ctx = + (struct rspamd_controller_worker_ctx *)w->data; rspamd_controller_store_saved_stats (ctx); + ev_timer_again (EV_A_ w); } static void @@ -3375,7 +3375,7 @@ init_controller_worker (struct rspamd_config *cfg) ctx, G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx, timeout), - RSPAMD_CL_FLAG_TIME_INTEGER, + RSPAMD_CL_FLAG_TIME_FLOAT, "Protocol timeout"); rspamd_rcl_register_worker_option (cfg, @@ -3573,7 +3573,7 @@ rspamd_controller_on_terminate (struct rspamd_worker *worker) if (ctx->rrd) { msg_info ("closing rrd file: %s", ctx->rrd->filename); - event_del (ctx->rrd_event); + ev_timer_stop (ctx->event_loop, &ctx->rrd_event); rspamd_rrd_close (ctx->rrd); } @@ -3694,16 +3694,14 @@ start_controller_worker (struct rspamd_worker *worker) GHashTableIter iter; gpointer key, value; guint i; - struct timeval stv; - const guint save_stats_interval = 60 * 1000; /* 1 minute */ + const ev_tstamp save_stats_interval = 60; /* 1 minute */ gpointer m; ctx->event_loop = rspamd_prepare_worker (worker, "controller", rspamd_controller_accept_socket); - msec_to_tv (ctx->timeout, &ctx->io_tv); - ctx->start_time = time (NULL); + ctx->start_time = ev_time (); ctx->worker = worker; ctx->cfg = worker->srv->cfg; ctx->srv = worker->srv; @@ -3746,10 +3744,10 @@ start_controller_worker (struct rspamd_worker *worker) ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err); if (ctx->rrd) { - ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event)); - evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx); - event_base_set (ctx->event_loop, ctx->rrd_event); - event_add (ctx->rrd_event, &rrd_update_time); + ctx->rrd_event.data = ctx; + ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update, + rrd_update_time, rrd_update_time); + ev_timer_start (ctx->event_loop, &ctx->rrd_event); } else if (rrd_err) { msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file, @@ -3772,7 +3770,7 @@ start_controller_worker (struct rspamd_worker *worker) ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop, ctx->cfg->ups_ctx); ctx->http = rspamd_http_router_new (rspamd_controller_error_handler, - rspamd_controller_finish_handler, &ctx->io_tv, + rspamd_controller_finish_handler, ctx->timeout, ctx->static_files_dir, ctx->http_ctx); /* Add callbacks for different methods */ @@ -3903,12 +3901,11 @@ start_controller_worker (struct rspamd_worker *worker) ctx->resolver, worker, TRUE); /* Schedule periodic stats saving, see #1823 */ - event_set (&ctx->save_stats_event, -1, EV_PERSIST, + ctx->save_stats_event.data = ctx; + ev_timer_init (&ctx->save_stats_event, rspamd_controller_stats_save_periodic, - ctx); - event_base_set (ctx->event_loop, &ctx->save_stats_event); - msec_to_tv (save_stats_interval, &stv); - evtimer_add (&ctx->save_stats_event, &stv); + save_stats_interval, save_stats_interval); + ev_timer_start (ctx->event_loop, &ctx->save_stats_event); } else { rspamd_map_watch (worker->srv->cfg, ctx->event_loop, @@ -3918,7 +3915,7 @@ start_controller_worker (struct rspamd_worker *worker) rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker); /* Start event loop */ - event_base_loop (ctx->event_loop, 0); + ev_loop (ctx->event_loop, 0); rspamd_worker_block_signals (); rspamd_stat_close (); diff --git a/src/libserver/task.c b/src/libserver/task.c index 84ea1417a..3c92e05b9 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -316,13 +316,8 @@ rspamd_task_free (struct rspamd_task *task) g_error_free (task->err); } - if (rspamd_event_pending (&task->timeout_ev, EV_TIMEOUT)) { - event_del (&task->timeout_ev); - } - - if (task->guard_ev) { - event_del (task->guard_ev); - } + ev_timer_stop (task->event_loop, &task->timeout_ev); + ev_io_stop (task->event_loop, &task->guard_ev); if (task->sock != -1) { close (task->sock); diff --git a/src/libserver/task.h b/src/libserver/task.h index a73102424..7b30f97cd 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -200,7 +200,7 @@ struct rspamd_task { struct rspamd_dns_resolver *resolver; /**< DNS resolver */ struct ev_loop *event_loop; /**< Event base */ struct ev_timer timeout_ev; /**< Global task timeout */ - struct ev_io *guard_ev; /**< Event for input sanity guard */ + struct ev_io guard_ev; /**< Event for input sanity guard */ gpointer checkpoint; /**< Opaque checkpoint data */ ucl_object_t *settings; /**< Settings applied to task */ diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 0aa0c9cf3..70d349c2c 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -274,10 +274,6 @@ void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base) { struct sigaction signals; - /* We ignore these signals in the worker */ - rspamd_worker_ignore_signal (SIGPIPE); - rspamd_worker_ignore_signal (SIGALRM); - rspamd_worker_ignore_signal (SIGCHLD); /* A set of terminating signals */ rspamd_worker_set_signal_handler (SIGTERM, worker, base, @@ -298,11 +294,8 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base) sigaddset (&signals.sa_mask, SIGTERM); sigaddset (&signals.sa_mask, SIGINT); sigaddset (&signals.sa_mask, SIGHUP); - sigaddset (&signals.sa_mask, SIGCHLD); sigaddset (&signals.sa_mask, SIGUSR1); sigaddset (&signals.sa_mask, SIGUSR2); - sigaddset (&signals.sa_mask, SIGALRM); - sigaddset (&signals.sa_mask, SIGPIPE); sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); } @@ -345,6 +338,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, if (ls->fd != -1) { accept_ev = g_malloc0 (sizeof (*accept_ev)); accept_ev->event_loop = event_loop; + accept_ev->accept_ev.data = worker; ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ); ev_io_start (event_loop, &accept_ev->accept_ev); diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 4e0d806f9..5d8ccc065 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -1039,9 +1039,7 @@ rspamd_redis_fin_learn (gpointer data) rt->has_event = FALSE; /* Stop timeout */ - if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) { - event_del (&rt->timeout_event); - } + ev_timer_stop (rt->task->event_loop, &rt->timeout_event); if (rt->redis) { redis = rt->redis; @@ -1654,7 +1652,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, struct upstream *up; struct upstream_list *ups; rspamd_inet_addr_t *addr; - struct timeval tv; rspamd_fstring_t *query; const gchar *redis_cmd; rspamd_token_t *tok; diff --git a/src/rspamadm/control.c b/src/rspamadm/control.c index 8a42bdac1..754d874a2 100644 --- a/src/rspamadm/control.c +++ b/src/rspamadm/control.c @@ -173,7 +173,6 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd) struct rspamd_http_connection *conn; struct rspamd_http_message *msg; rspamd_inet_addr_t *addr; - struct timeval tv; static struct rspamadm_control_cbdata cbdata; context = g_option_context_new ( @@ -239,16 +238,15 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd) addr); msg = rspamd_http_new_message (HTTP_REQUEST); msg->url = rspamd_fstring_new_init (path, strlen (path)); - double_to_tv (timeout, &tv); cbdata.argc = argc; cbdata.argv = argv; cbdata.path = path; rspamd_http_connection_write_message (conn, msg, NULL, NULL, &cbdata, - &tv); + timeout); - event_base_loop (rspamd_main->event_loop, 0); + ev_loop (rspamd_main->event_loop, 0); rspamd_http_connection_unref (conn); rspamd_inet_address_free (addr); diff --git a/src/worker.c b/src/worker.c index 5a4adb325..ad0782b17 100644 --- a/src/worker.c +++ b/src/worker.c @@ -42,7 +42,7 @@ #include "lua/lua_common.h" /* 60 seconds for worker's IO */ -#define DEFAULT_WORKER_IO_TIMEOUT 60000 +#define DEFAULT_WORKER_IO_TIMEOUT 60.0 gpointer init_worker (struct rspamd_config *cfg); void start_worker (struct rspamd_worker *worker); @@ -73,11 +73,10 @@ static gboolean rspamd_worker_finalize (gpointer user_data) { struct rspamd_task *task = user_data; - struct timeval tv = {.tv_sec = 0, .tv_usec = 0}; if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) { msg_info_task ("finishing actions has been processed, terminating"); - event_base_loopexit (task->event_loop, &tv); + ev_break (task->event_loop, EVBREAK_ALL); rspamd_session_destroy (task->s); return TRUE; @@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg) } void -rspamd_task_timeout (gint fd, short what, gpointer ud) +rspamd_task_timeout (EV_P_ ev_timer *w, int revents) { - struct rspamd_task *task = (struct rspamd_task *) ud; + struct rspamd_task *task = (struct rspamd_task *)w->data; if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) { msg_info_task ("processing of task timed out, forced processing"); @@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud) } void -rspamd_worker_guard_handler (gint fd, short what, void *data) +rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents) { - struct rspamd_task *task = data; + struct rspamd_task *task = (struct rspamd_task *)w->data; gchar fake_buf[1024]; gssize r; -#ifdef EV_CLOSED - if (what == EV_CLOSED) { - if (!(task->flags & RSPAMD_TASK_FLAG_JSON) && - task->cfg->enable_shutdown_workaround) { - msg_info_task ("workaround for shutdown enabled, please update " - "your client, this support might be removed in future"); - shutdown (fd, SHUT_RD); - event_del (task->guard_ev); - task->guard_ev = NULL; - } - else { - msg_err_task ("the peer has closed connection unexpectedly"); - rspamd_session_destroy (task->s); - } - - return; - } -#endif - - r = read (fd, fake_buf, sizeof (fake_buf)); + r = read (w->fd, fake_buf, sizeof (fake_buf)); if (r > 0) { msg_warn_task ("received extra data after task is loaded, ignoring"); @@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data) task->cfg->enable_shutdown_workaround) { msg_info_task ("workaround for shutdown enabled, please update " "your client, this support might be removed in future"); - shutdown (fd, SHUT_RD); - event_del (task->guard_ev); - task->guard_ev = NULL; + shutdown (w->fd, SHUT_RD); + ev_io_stop (task->event_loop, &task->guard_ev); } else { msg_err_task ("the peer has closed connection unexpectedly"); @@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, { struct rspamd_task *task = (struct rspamd_task *) conn->ud; struct rspamd_worker_ctx *ctx; - struct timeval task_tv; - struct event *guard_ev; ctx = task->worker->ctx; @@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, /* Set global timeout for the task */ if (ctx->task_timeout > 0.0) { - event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, - task); - event_base_set (ctx->ev_base, &task->timeout_ev); - double_to_tv (ctx->task_timeout, &task_tv); - event_add (&task->timeout_ev, &task_tv); + task->timeout_ev.data = task; + ev_timer_init (&task->timeout_ev, rspamd_task_timeout, + ctx->task_timeout, 0.0); + ev_timer_start (task->event_loop, &task->timeout_ev); } /* Set socket guard */ - guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev)); -#ifdef EV_CLOSED - event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED, - rspamd_worker_guard_handler, task); -#else - event_set (guard_ev, task->sock, EV_READ|EV_PERSIST, - rspamd_worker_guard_handler, task); -#endif - event_base_set (task->event_loop, guard_ev); - event_add (guard_ev, NULL); - task->guard_ev = guard_ev; + task->guard_ev.data = task; + ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ); + ev_io_start (task->event_loop, &task->guard_ev); rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); @@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, * Accept new connection and construct task */ static void -accept_socket (gint fd, short what, void *arg) +accept_socket (EV_P_ ev_io *w, int revents) { - struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_worker_ctx *ctx; struct rspamd_task *task; rspamd_inet_addr_t *addr; @@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg) } if ((nfd = - rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { + rspamd_accept_from_socket (w->fd, &addr, + rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) { msg_warn_ctx ("accept failed: %s", strerror (errno)); return; } @@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg) return; } - task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base); + task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop); msg_info_task ("accepted connection from %s port %d, task ptr: %p", rspamd_inet_address_to_string (addr), @@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg) rspamd_http_connection_read_message (task->http_conn, task, - &ctx->io_tv); + ctx->timeout); } #ifdef WITH_HYPERSCAN @@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg) ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout), - RSPAMD_CL_FLAG_TIME_INTEGER, + RSPAMD_CL_FLAG_TIME_FLOAT, "Protocol IO timeout"); rspamd_rcl_register_worker_option (cfg, @@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker) struct rspamd_worker_ctx *ctx = worker->ctx; ctx->cfg = worker->srv->cfg; - ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket); - msec_to_tv (ctx->timeout, &ctx->io_tv); - rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base, + ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket); + rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop, worker); if (isnan (ctx->task_timeout)) { @@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker) } ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger, - ctx->ev_base, + ctx->event_loop, worker->srv->cfg); - rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0); + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0); rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx, - ctx->ev_base, ctx->resolver->r); + ctx->event_loop, ctx->resolver->r); - ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base, + ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop, ctx->cfg->ups_ctx); - rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver, + rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver, &ctx->lang_det); - rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base, + rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker); - event_base_loop (ctx->ev_base, 0); + ev_loop (ctx->event_loop, 0); rspamd_worker_block_signals (); rspamd_stat_close (); diff --git a/src/worker_private.h b/src/worker_private.h index 35a2b465b..6d0e763aa 100644 --- a/src/worker_private.h +++ b/src/worker_private.h @@ -30,14 +30,13 @@ struct rspamd_lang_detector; struct rspamd_worker_ctx { guint64 magic; /* Events base */ - struct ev_loop *ev_base; + struct ev_loop *event_loop; /* DNS resolver */ struct rspamd_dns_resolver *resolver; /* Config */ struct rspamd_config *cfg; - guint32 timeout; - struct timeval io_tv; + ev_tstamp timeout; /* Detect whether this worker is mime worker */ gboolean is_mime; /* Allow encrypted requests only using network */ @@ -45,7 +44,7 @@ struct rspamd_worker_ctx { /* Limit of tasks */ guint32 max_tasks; /* Maximum time for task processing */ - gdouble task_timeout; + ev_tstamp task_timeout; /* Encryption key */ struct rspamd_cryptobox_keypair *key; /* Keys cache */ @@ -64,11 +63,11 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker, /* * Called on forced timeout */ -void rspamd_task_timeout (gint fd, short what, gpointer ud); +void rspamd_task_timeout (EV_P_ ev_timer *w, int revents); /* * Called on unexpected IO error (e.g. ECONNRESET) */ -void rspamd_worker_guard_handler (gint fd, short what, void *data); +void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents); #endif |