/* Config */
struct rspamd_config *cfg;
/* END OF COMMON PART */
- guint32 timeout;
- struct timeval io_tv;
+ ev_tstamp timeout;
/* Whether we use ssl for this server */
gboolean use_ssl;
/* Webui password */
data[4] = st->actions_stat[METRIC_ACTION_SOFT_REJECT];
/* Get uptime */
- uptime = time (NULL) - session->ctx->start_time;
+ uptime = ev_time () - session->ctx->start_time;
ucl_object_insert_key (obj, ucl_object_fromstring (
RVERSION), "version", 0, false);
struct rspamd_controller_session *session = conn_ent->ud;
GList *cur;
struct rspamd_map *map;
- struct rspamd_map_backend *bk;
+ struct rspamd_map_backend *bk = NULL;
const rspamd_ftok_t *idstr;
struct stat st;
gint fd;
cur = g_list_next (cur);
}
- if (!found) {
+ if (!found || bk == NULL) {
msg_info_session ("map not found");
rspamd_controller_send_error (conn_ent, 404, "Map not found");
return 0;
rspamd_http_router_insert_headers (conn_ent->rt, reply);
rspamd_http_connection_write_message (conn_ent->conn, reply, NULL,
"text/plain", conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
return 0;
row = &copied_rows[row_num];
/* Get only completed rows */
if (row->completed) {
- rspamd_localtime (row->tv.tv_sec, &tm);
+ rspamd_localtime (row->timestamp, &tm);
strftime (timebuf, sizeof (timebuf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
obj = ucl_object_typed_new (UCL_OBJECT);
ucl_object_insert_key (obj, ucl_object_fromstring (
timebuf), "time", 0, false);
ucl_object_insert_key (obj, ucl_object_fromint (
- row->tv.tv_sec), "unix_time", 0, false);
+ row->timestamp), "unix_time", 0, false);
ucl_object_insert_key (obj, ucl_object_fromstring (
row->message_id), "id", 0, false);
ucl_object_insert_key (obj, ucl_object_fromstring (row->from_addr),
rspamd_http_connection_reset (conn_ent->conn);
rspamd_http_router_insert_headers (conn_ent->rt, msg);
rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
- "application/json", conn_ent, conn_ent->rt->ptv);
+ "application/json", conn_ent, conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
}
if (ctx->task_timeout > 0.0) {
- struct timeval task_tv;
-
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (ctx->event_loop, &task->timeout_ev);
- double_to_tv (ctx->task_timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ ctx->task_timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
}
end:
switch (i) {
case 0:
+ default:
act = METRIC_ACTION_REJECT;
break;
case 1:
{
struct rspamd_controller_session *session = conn_ent->ud;
GList *cur;
- struct rspamd_map *map;
+ struct rspamd_map *map = NULL;
struct rspamd_map_backend *bk;
struct rspamd_controller_worker_ctx *ctx;
const rspamd_ftok_t *idstr;
NULL,
"text/plain",
conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
return 0;
NULL,
"text/plain",
conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
else {
NULL,
"text/plain",
conn_ent,
- conn_ent->rt->ptv);
+ conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
}
static void
-rspamd_controller_accept_socket (gint fd, short what, void *arg)
+rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct rspamd_controller_worker_ctx *ctx;
struct rspamd_controller_session *session;
rspamd_inet_addr_t *addr;
ctx = worker->ctx;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
}
static void
-rspamd_controller_rrd_update (gint fd, short what, void *arg)
+rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_controller_worker_ctx *ctx = arg;
+ struct rspamd_controller_worker_ctx *ctx =
+ (struct rspamd_controller_worker_ctx *)w->data;
struct rspamd_stat *stat;
GArray ar;
gdouble points[METRIC_ACTION_MAX];
}
/* Plan new event */
- event_del (ctx->rrd_event);
- evtimer_add (ctx->rrd_event, &rrd_update_time);
+ ev_timer_again (ctx->event_loop, &ctx->rrd_event);
}
static void
}
static void
-rspamd_controller_stats_save_periodic (int fd, short what, gpointer ud)
+rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_controller_worker_ctx *ctx = ud;
+ struct rspamd_controller_worker_ctx *ctx =
+ (struct rspamd_controller_worker_ctx *)w->data;
rspamd_controller_store_saved_stats (ctx);
+ ev_timer_again (EV_A_ w);
}
static void
ctx,
G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
timeout),
- RSPAMD_CL_FLAG_TIME_INTEGER,
+ RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol timeout");
rspamd_rcl_register_worker_option (cfg,
if (ctx->rrd) {
msg_info ("closing rrd file: %s", ctx->rrd->filename);
- event_del (ctx->rrd_event);
+ ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
rspamd_rrd_close (ctx->rrd);
}
GHashTableIter iter;
gpointer key, value;
guint i;
- struct timeval stv;
- const guint save_stats_interval = 60 * 1000; /* 1 minute */
+ const ev_tstamp save_stats_interval = 60; /* 1 minute */
gpointer m;
ctx->event_loop = rspamd_prepare_worker (worker,
"controller",
rspamd_controller_accept_socket);
- msec_to_tv (ctx->timeout, &ctx->io_tv);
- ctx->start_time = time (NULL);
+ ctx->start_time = ev_time ();
ctx->worker = worker;
ctx->cfg = worker->srv->cfg;
ctx->srv = worker->srv;
ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
if (ctx->rrd) {
- ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
- evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
- event_base_set (ctx->event_loop, ctx->rrd_event);
- event_add (ctx->rrd_event, &rrd_update_time);
+ ctx->rrd_event.data = ctx;
+ ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update,
+ rrd_update_time, rrd_update_time);
+ ev_timer_start (ctx->event_loop, &ctx->rrd_event);
}
else if (rrd_err) {
msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
- rspamd_controller_finish_handler, &ctx->io_tv,
+ rspamd_controller_finish_handler, ctx->timeout,
ctx->static_files_dir, ctx->http_ctx);
/* Add callbacks for different methods */
ctx->resolver, worker, TRUE);
/* Schedule periodic stats saving, see #1823 */
- event_set (&ctx->save_stats_event, -1, EV_PERSIST,
+ ctx->save_stats_event.data = ctx;
+ ev_timer_init (&ctx->save_stats_event,
rspamd_controller_stats_save_periodic,
- ctx);
- event_base_set (ctx->event_loop, &ctx->save_stats_event);
- msec_to_tv (save_stats_interval, &stv);
- evtimer_add (&ctx->save_stats_event, &stv);
+ save_stats_interval, save_stats_interval);
+ ev_timer_start (ctx->event_loop, &ctx->save_stats_event);
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
/* Start event loop */
- event_base_loop (ctx->event_loop, 0);
+ ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_stat_close ();
g_error_free (task->err);
}
- if (rspamd_event_pending (&task->timeout_ev, EV_TIMEOUT)) {
- event_del (&task->timeout_ev);
- }
-
- if (task->guard_ev) {
- event_del (task->guard_ev);
- }
+ ev_timer_stop (task->event_loop, &task->timeout_ev);
+ ev_io_stop (task->event_loop, &task->guard_ev);
if (task->sock != -1) {
close (task->sock);
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
struct ev_loop *event_loop; /**< Event base */
struct ev_timer timeout_ev; /**< Global task timeout */
- struct ev_io *guard_ev; /**< Event for input sanity guard */
+ struct ev_io guard_ev; /**< Event for input sanity guard */
gpointer checkpoint; /**< Opaque checkpoint data */
ucl_object_t *settings; /**< Settings applied to task */
rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
{
struct sigaction signals;
- /* We ignore these signals in the worker */
- rspamd_worker_ignore_signal (SIGPIPE);
- rspamd_worker_ignore_signal (SIGALRM);
- rspamd_worker_ignore_signal (SIGCHLD);
/* A set of terminating signals */
rspamd_worker_set_signal_handler (SIGTERM, worker, base,
sigaddset (&signals.sa_mask, SIGTERM);
sigaddset (&signals.sa_mask, SIGINT);
sigaddset (&signals.sa_mask, SIGHUP);
- sigaddset (&signals.sa_mask, SIGCHLD);
sigaddset (&signals.sa_mask, SIGUSR1);
sigaddset (&signals.sa_mask, SIGUSR2);
- sigaddset (&signals.sa_mask, SIGALRM);
- sigaddset (&signals.sa_mask, SIGPIPE);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
}
if (ls->fd != -1) {
accept_ev = g_malloc0 (sizeof (*accept_ev));
accept_ev->event_loop = event_loop;
+ accept_ev->accept_ev.data = worker;
ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
ev_io_start (event_loop, &accept_ev->accept_ev);
rt->has_event = FALSE;
/* Stop timeout */
- if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
- event_del (&rt->timeout_event);
- }
+ ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
if (rt->redis) {
redis = rt->redis;
struct upstream *up;
struct upstream_list *ups;
rspamd_inet_addr_t *addr;
- struct timeval tv;
rspamd_fstring_t *query;
const gchar *redis_cmd;
rspamd_token_t *tok;
struct rspamd_http_connection *conn;
struct rspamd_http_message *msg;
rspamd_inet_addr_t *addr;
- struct timeval tv;
static struct rspamadm_control_cbdata cbdata;
context = g_option_context_new (
addr);
msg = rspamd_http_new_message (HTTP_REQUEST);
msg->url = rspamd_fstring_new_init (path, strlen (path));
- double_to_tv (timeout, &tv);
cbdata.argc = argc;
cbdata.argv = argv;
cbdata.path = path;
rspamd_http_connection_write_message (conn, msg, NULL, NULL, &cbdata,
- &tv);
+ timeout);
- event_base_loop (rspamd_main->event_loop, 0);
+ ev_loop (rspamd_main->event_loop, 0);
rspamd_http_connection_unref (conn);
rspamd_inet_address_free (addr);
#include "lua/lua_common.h"
/* 60 seconds for worker's IO */
-#define DEFAULT_WORKER_IO_TIMEOUT 60000
+#define DEFAULT_WORKER_IO_TIMEOUT 60.0
gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
rspamd_worker_finalize (gpointer user_data)
{
struct rspamd_task *task = user_data;
- struct timeval tv = {.tv_sec = 0, .tv_usec = 0};
if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
msg_info_task ("finishing actions has been processed, terminating");
- event_base_loopexit (task->event_loop, &tv);
+ ev_break (task->event_loop, EVBREAK_ALL);
rspamd_session_destroy (task->s);
return TRUE;
}
void
-rspamd_task_timeout (gint fd, short what, gpointer ud)
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_task *task = (struct rspamd_task *) ud;
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
msg_info_task ("processing of task timed out, forced processing");
}
void
-rspamd_worker_guard_handler (gint fd, short what, void *data)
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_task *task = data;
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
gchar fake_buf[1024];
gssize r;
-#ifdef EV_CLOSED
- if (what == EV_CLOSED) {
- if (!(task->flags & RSPAMD_TASK_FLAG_JSON) &&
- task->cfg->enable_shutdown_workaround) {
- msg_info_task ("workaround for shutdown enabled, please update "
- "your client, this support might be removed in future");
- shutdown (fd, SHUT_RD);
- event_del (task->guard_ev);
- task->guard_ev = NULL;
- }
- else {
- msg_err_task ("the peer has closed connection unexpectedly");
- rspamd_session_destroy (task->s);
- }
-
- return;
- }
-#endif
-
- r = read (fd, fake_buf, sizeof (fake_buf));
+ r = read (w->fd, fake_buf, sizeof (fake_buf));
if (r > 0) {
msg_warn_task ("received extra data after task is loaded, ignoring");
task->cfg->enable_shutdown_workaround) {
msg_info_task ("workaround for shutdown enabled, please update "
"your client, this support might be removed in future");
- shutdown (fd, SHUT_RD);
- event_del (task->guard_ev);
- task->guard_ev = NULL;
+ shutdown (w->fd, SHUT_RD);
+ ev_io_stop (task->event_loop, &task->guard_ev);
}
else {
msg_err_task ("the peer has closed connection unexpectedly");
{
struct rspamd_task *task = (struct rspamd_task *) conn->ud;
struct rspamd_worker_ctx *ctx;
- struct timeval task_tv;
- struct event *guard_ev;
ctx = task->worker->ctx;
/* Set global timeout for the task */
if (ctx->task_timeout > 0.0) {
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (ctx->ev_base, &task->timeout_ev);
- double_to_tv (ctx->task_timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ ctx->task_timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
}
/* Set socket guard */
- guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
-#ifdef EV_CLOSED
- event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
- rspamd_worker_guard_handler, task);
-#else
- event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
- rspamd_worker_guard_handler, task);
-#endif
- event_base_set (task->event_loop, guard_ev);
- event_add (guard_ev, NULL);
- task->guard_ev = guard_ev;
+ task->guard_ev.data = task;
+ ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+ ev_io_start (task->event_loop, &task->guard_ev);
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
* Accept new connection and construct task
*/
static void
-accept_socket (gint fd, short what, void *arg)
+accept_socket (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_worker_ctx *ctx;
struct rspamd_task *task;
rspamd_inet_addr_t *addr;
}
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
return;
}
- task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base);
+ task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
msg_info_task ("accepted connection from %s port %d, task ptr: %p",
rspamd_inet_address_to_string (addr),
rspamd_http_connection_read_message (task->http_conn,
task,
- &ctx->io_tv);
+ ctx->timeout);
}
#ifdef WITH_HYPERSCAN
ctx,
G_STRUCT_OFFSET (struct rspamd_worker_ctx,
timeout),
- RSPAMD_CL_FLAG_TIME_INTEGER,
+ RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol IO timeout");
rspamd_rcl_register_worker_option (cfg,
struct rspamd_worker_ctx *ctx = worker->ctx;
ctx->cfg = worker->srv->cfg;
- ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
- msec_to_tv (ctx->timeout, &ctx->io_tv);
- rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+ ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
+ rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
worker);
if (isnan (ctx->task_timeout)) {
}
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
- ctx->ev_base,
+ ctx->event_loop,
worker->srv->cfg);
- rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
- ctx->ev_base, ctx->resolver->r);
+ ctx->event_loop, ctx->resolver->r);
- ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+ ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
- rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+ rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
- rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+ rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
- event_base_loop (ctx->ev_base, 0);
+ ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_stat_close ();
struct rspamd_worker_ctx {
guint64 magic;
/* Events base */
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
struct rspamd_config *cfg;
- guint32 timeout;
- struct timeval io_tv;
+ ev_tstamp timeout;
/* Detect whether this worker is mime worker */
gboolean is_mime;
/* Allow encrypted requests only using network */
/* Limit of tasks */
guint32 max_tasks;
/* Maximum time for task processing */
- gdouble task_timeout;
+ ev_tstamp task_timeout;
/* Encryption key */
struct rspamd_cryptobox_keypair *key;
/* Keys cache */
/*
* Called on forced timeout
*/
-void rspamd_task_timeout (gint fd, short what, gpointer ud);
+void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
/*
* Called on unexpected IO error (e.g. ECONNRESET)
*/
-void rspamd_worker_guard_handler (gint fd, short what, void *data);
+void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
#endif