Browse Source

[Project] Adopt normal worker and contorller

tags/2.0
Vsevolod Stakhov 5 years ago
parent
commit
675b33dd20

+ 43
- 46
src/controller.c View File

@@ -134,8 +134,7 @@ struct rspamd_controller_worker_ctx {
/* Config */
struct rspamd_config *cfg;
/* END OF COMMON PART */
guint32 timeout;
struct timeval io_tv;
ev_tstamp timeout;
/* Whether we use ssl for this server */
gboolean use_ssl;
/* Webui password */
@@ -728,7 +727,7 @@ rspamd_controller_handle_auth (struct rspamd_http_connection_entry *conn_ent,
data[4] = st->actions_stat[METRIC_ACTION_SOFT_REJECT];

/* Get uptime */
uptime = time (NULL) - session->ctx->start_time;
uptime = ev_time () - session->ctx->start_time;

ucl_object_insert_key (obj, ucl_object_fromstring (
RVERSION), "version", 0, false);
@@ -996,7 +995,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_controller_session *session = conn_ent->ud;
GList *cur;
struct rspamd_map *map;
struct rspamd_map_backend *bk;
struct rspamd_map_backend *bk = NULL;
const rspamd_ftok_t *idstr;
struct stat st;
gint fd;
@@ -1037,7 +1036,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
cur = g_list_next (cur);
}

if (!found) {
if (!found || bk == NULL) {
msg_info_session ("map not found");
rspamd_controller_send_error (conn_ent, 404, "Map not found");
return 0;
@@ -1075,7 +1074,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
rspamd_http_router_insert_headers (conn_ent->rt, reply);
rspamd_http_connection_write_message (conn_ent->conn, reply, NULL,
"text/plain", conn_ent,
conn_ent->rt->ptv);
conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;

return 0;
@@ -1385,13 +1384,13 @@ rspamd_controller_handle_legacy_history (
row = &copied_rows[row_num];
/* Get only completed rows */
if (row->completed) {
rspamd_localtime (row->tv.tv_sec, &tm);
rspamd_localtime (row->timestamp, &tm);
strftime (timebuf, sizeof (timebuf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
obj = ucl_object_typed_new (UCL_OBJECT);
ucl_object_insert_key (obj, ucl_object_fromstring (
timebuf), "time", 0, false);
ucl_object_insert_key (obj, ucl_object_fromint (
row->tv.tv_sec), "unix_time", 0, false);
row->timestamp), "unix_time", 0, false);
ucl_object_insert_key (obj, ucl_object_fromstring (
row->message_id), "id", 0, false);
ucl_object_insert_key (obj, ucl_object_fromstring (row->from_addr),
@@ -1935,7 +1934,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task)
rspamd_http_connection_reset (conn_ent->conn);
rspamd_http_router_insert_headers (conn_ent->rt, msg);
rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
"application/json", conn_ent, conn_ent->rt->ptv);
"application/json", conn_ent, conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}

@@ -2125,13 +2124,10 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
}

if (ctx->task_timeout > 0.0) {
struct timeval task_tv;

event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
task);
event_base_set (ctx->event_loop, &task->timeout_ev);
double_to_tv (ctx->task_timeout, &task_tv);
event_add (&task->timeout_ev, &task_tv);
task->timeout_ev.data = task;
ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
ctx->task_timeout, 0.0);
ev_timer_start (task->event_loop, &task->timeout_ev);
}

end:
@@ -2210,6 +2206,7 @@ rspamd_controller_handle_saveactions (

switch (i) {
case 0:
default:
act = METRIC_ACTION_REJECT;
break;
case 1:
@@ -2404,7 +2401,7 @@ rspamd_controller_handle_savemap (struct rspamd_http_connection_entry *conn_ent,
{
struct rspamd_controller_session *session = conn_ent->ud;
GList *cur;
struct rspamd_map *map;
struct rspamd_map *map = NULL;
struct rspamd_map_backend *bk;
struct rspamd_controller_worker_ctx *ctx;
const rspamd_ftok_t *idstr;
@@ -2903,7 +2900,7 @@ rspamd_controller_handle_ping (struct rspamd_http_connection_entry *conn_ent,
NULL,
"text/plain",
conn_ent,
conn_ent->rt->ptv);
conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;

return 0;
@@ -2937,7 +2934,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
NULL,
"text/plain",
conn_ent,
conn_ent->rt->ptv);
conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}
else {
@@ -2953,7 +2950,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
NULL,
"text/plain",
conn_ent,
conn_ent->rt->ptv);
conn_ent->rt->timeout);
conn_ent->is_reply = TRUE;
}

@@ -3077,9 +3074,9 @@ rspamd_controller_finish_handler (struct rspamd_http_connection_entry *conn_ent)
}

static void
rspamd_controller_accept_socket (gint fd, short what, void *arg)
rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker = (struct rspamd_worker *) arg;
struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct rspamd_controller_worker_ctx *ctx;
struct rspamd_controller_session *session;
rspamd_inet_addr_t *addr;
@@ -3088,7 +3085,8 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
ctx = worker->ctx;

if ((nfd =
rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
rspamd_accept_from_socket (w->fd, &addr,
rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
@@ -3113,9 +3111,10 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
}

static void
rspamd_controller_rrd_update (gint fd, short what, void *arg)
rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
{
struct rspamd_controller_worker_ctx *ctx = arg;
struct rspamd_controller_worker_ctx *ctx =
(struct rspamd_controller_worker_ctx *)w->data;
struct rspamd_stat *stat;
GArray ar;
gdouble points[METRIC_ACTION_MAX];
@@ -3139,8 +3138,7 @@ rspamd_controller_rrd_update (gint fd, short what, void *arg)
}

/* Plan new event */
event_del (ctx->rrd_event);
evtimer_add (ctx->rrd_event, &rrd_update_time);
ev_timer_again (ctx->event_loop, &ctx->rrd_event);
}

static void
@@ -3278,11 +3276,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
}

static void
rspamd_controller_stats_save_periodic (int fd, short what, gpointer ud)
rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
{
struct rspamd_controller_worker_ctx *ctx = ud;
struct rspamd_controller_worker_ctx *ctx =
(struct rspamd_controller_worker_ctx *)w->data;

rspamd_controller_store_saved_stats (ctx);
ev_timer_again (EV_A_ w);
}

static void
@@ -3375,7 +3375,7 @@ init_controller_worker (struct rspamd_config *cfg)
ctx,
G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
timeout),
RSPAMD_CL_FLAG_TIME_INTEGER,
RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol timeout");

rspamd_rcl_register_worker_option (cfg,
@@ -3573,7 +3573,7 @@ rspamd_controller_on_terminate (struct rspamd_worker *worker)

if (ctx->rrd) {
msg_info ("closing rrd file: %s", ctx->rrd->filename);
event_del (ctx->rrd_event);
ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
rspamd_rrd_close (ctx->rrd);
}

@@ -3694,16 +3694,14 @@ start_controller_worker (struct rspamd_worker *worker)
GHashTableIter iter;
gpointer key, value;
guint i;
struct timeval stv;
const guint save_stats_interval = 60 * 1000; /* 1 minute */
const ev_tstamp save_stats_interval = 60; /* 1 minute */
gpointer m;

ctx->event_loop = rspamd_prepare_worker (worker,
"controller",
rspamd_controller_accept_socket);
msec_to_tv (ctx->timeout, &ctx->io_tv);

ctx->start_time = time (NULL);
ctx->start_time = ev_time ();
ctx->worker = worker;
ctx->cfg = worker->srv->cfg;
ctx->srv = worker->srv;
@@ -3746,10 +3744,10 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);

if (ctx->rrd) {
ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
event_base_set (ctx->event_loop, ctx->rrd_event);
event_add (ctx->rrd_event, &rrd_update_time);
ctx->rrd_event.data = ctx;
ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update,
rrd_update_time, rrd_update_time);
ev_timer_start (ctx->event_loop, &ctx->rrd_event);
}
else if (rrd_err) {
msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
@@ -3772,7 +3770,7 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
rspamd_controller_finish_handler, &ctx->io_tv,
rspamd_controller_finish_handler, ctx->timeout,
ctx->static_files_dir, ctx->http_ctx);

/* Add callbacks for different methods */
@@ -3903,12 +3901,11 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->resolver, worker, TRUE);

/* Schedule periodic stats saving, see #1823 */
event_set (&ctx->save_stats_event, -1, EV_PERSIST,
ctx->save_stats_event.data = ctx;
ev_timer_init (&ctx->save_stats_event,
rspamd_controller_stats_save_periodic,
ctx);
event_base_set (ctx->event_loop, &ctx->save_stats_event);
msec_to_tv (save_stats_interval, &stv);
evtimer_add (&ctx->save_stats_event, &stv);
save_stats_interval, save_stats_interval);
ev_timer_start (ctx->event_loop, &ctx->save_stats_event);
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
@@ -3918,7 +3915,7 @@ start_controller_worker (struct rspamd_worker *worker)
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);

/* Start event loop */
event_base_loop (ctx->event_loop, 0);
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();

rspamd_stat_close ();

+ 2
- 7
src/libserver/task.c View File

@@ -316,13 +316,8 @@ rspamd_task_free (struct rspamd_task *task)
g_error_free (task->err);
}

if (rspamd_event_pending (&task->timeout_ev, EV_TIMEOUT)) {
event_del (&task->timeout_ev);
}

if (task->guard_ev) {
event_del (task->guard_ev);
}
ev_timer_stop (task->event_loop, &task->timeout_ev);
ev_io_stop (task->event_loop, &task->guard_ev);

if (task->sock != -1) {
close (task->sock);

+ 1
- 1
src/libserver/task.h View File

@@ -200,7 +200,7 @@ struct rspamd_task {
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
struct ev_loop *event_loop; /**< Event base */
struct ev_timer timeout_ev; /**< Global task timeout */
struct ev_io *guard_ev; /**< Event for input sanity guard */
struct ev_io guard_ev; /**< Event for input sanity guard */

gpointer checkpoint; /**< Opaque checkpoint data */
ucl_object_t *settings; /**< Settings applied to task */

+ 1
- 7
src/libserver/worker_util.c View File

@@ -274,10 +274,6 @@ void
rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
{
struct sigaction signals;
/* We ignore these signals in the worker */
rspamd_worker_ignore_signal (SIGPIPE);
rspamd_worker_ignore_signal (SIGALRM);
rspamd_worker_ignore_signal (SIGCHLD);

/* A set of terminating signals */
rspamd_worker_set_signal_handler (SIGTERM, worker, base,
@@ -298,11 +294,8 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
sigaddset (&signals.sa_mask, SIGTERM);
sigaddset (&signals.sa_mask, SIGINT);
sigaddset (&signals.sa_mask, SIGHUP);
sigaddset (&signals.sa_mask, SIGCHLD);
sigaddset (&signals.sa_mask, SIGUSR1);
sigaddset (&signals.sa_mask, SIGUSR2);
sigaddset (&signals.sa_mask, SIGALRM);
sigaddset (&signals.sa_mask, SIGPIPE);

sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
}
@@ -345,6 +338,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
if (ls->fd != -1) {
accept_ev = g_malloc0 (sizeof (*accept_ev));
accept_ev->event_loop = event_loop;
accept_ev->accept_ev.data = worker;
ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
ev_io_start (event_loop, &accept_ev->accept_ev);


+ 1
- 4
src/libstat/backends/redis_backend.c View File

@@ -1039,9 +1039,7 @@ rspamd_redis_fin_learn (gpointer data)

rt->has_event = FALSE;
/* Stop timeout */
if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
event_del (&rt->timeout_event);
}
ev_timer_stop (rt->task->event_loop, &rt->timeout_event);

if (rt->redis) {
redis = rt->redis;
@@ -1654,7 +1652,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
struct upstream *up;
struct upstream_list *ups;
rspamd_inet_addr_t *addr;
struct timeval tv;
rspamd_fstring_t *query;
const gchar *redis_cmd;
rspamd_token_t *tok;

+ 2
- 4
src/rspamadm/control.c View File

@@ -173,7 +173,6 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
struct rspamd_http_connection *conn;
struct rspamd_http_message *msg;
rspamd_inet_addr_t *addr;
struct timeval tv;
static struct rspamadm_control_cbdata cbdata;

context = g_option_context_new (
@@ -239,16 +238,15 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
addr);
msg = rspamd_http_new_message (HTTP_REQUEST);
msg->url = rspamd_fstring_new_init (path, strlen (path));
double_to_tv (timeout, &tv);

cbdata.argc = argc;
cbdata.argv = argv;
cbdata.path = path;

rspamd_http_connection_write_message (conn, msg, NULL, NULL, &cbdata,
&tv);
timeout);

event_base_loop (rspamd_main->event_loop, 0);
ev_loop (rspamd_main->event_loop, 0);

rspamd_http_connection_unref (conn);
rspamd_inet_address_free (addr);

+ 32
- 64
src/worker.c View File

@@ -42,7 +42,7 @@
#include "lua/lua_common.h"

/* 60 seconds for worker's IO */
#define DEFAULT_WORKER_IO_TIMEOUT 60000
#define DEFAULT_WORKER_IO_TIMEOUT 60.0

gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
@@ -73,11 +73,10 @@ static gboolean
rspamd_worker_finalize (gpointer user_data)
{
struct rspamd_task *task = user_data;
struct timeval tv = {.tv_sec = 0, .tv_usec = 0};

if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
msg_info_task ("finishing actions has been processed, terminating");
event_base_loopexit (task->event_loop, &tv);
ev_break (task->event_loop, EVBREAK_ALL);
rspamd_session_destroy (task->s);

return TRUE;
@@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg)
}

void
rspamd_task_timeout (gint fd, short what, gpointer ud)
rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
{
struct rspamd_task *task = (struct rspamd_task *) ud;
struct rspamd_task *task = (struct rspamd_task *)w->data;

if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
msg_info_task ("processing of task timed out, forced processing");
@@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
}

void
rspamd_worker_guard_handler (gint fd, short what, void *data)
rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
{
struct rspamd_task *task = data;
struct rspamd_task *task = (struct rspamd_task *)w->data;
gchar fake_buf[1024];
gssize r;

#ifdef EV_CLOSED
if (what == EV_CLOSED) {
if (!(task->flags & RSPAMD_TASK_FLAG_JSON) &&
task->cfg->enable_shutdown_workaround) {
msg_info_task ("workaround for shutdown enabled, please update "
"your client, this support might be removed in future");
shutdown (fd, SHUT_RD);
event_del (task->guard_ev);
task->guard_ev = NULL;
}
else {
msg_err_task ("the peer has closed connection unexpectedly");
rspamd_session_destroy (task->s);
}

return;
}
#endif

r = read (fd, fake_buf, sizeof (fake_buf));
r = read (w->fd, fake_buf, sizeof (fake_buf));

if (r > 0) {
msg_warn_task ("received extra data after task is loaded, ignoring");
@@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data)
task->cfg->enable_shutdown_workaround) {
msg_info_task ("workaround for shutdown enabled, please update "
"your client, this support might be removed in future");
shutdown (fd, SHUT_RD);
event_del (task->guard_ev);
task->guard_ev = NULL;
shutdown (w->fd, SHUT_RD);
ev_io_stop (task->event_loop, &task->guard_ev);
}
else {
msg_err_task ("the peer has closed connection unexpectedly");
@@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
{
struct rspamd_task *task = (struct rspamd_task *) conn->ud;
struct rspamd_worker_ctx *ctx;
struct timeval task_tv;
struct event *guard_ev;

ctx = task->worker->ctx;

@@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,

/* Set global timeout for the task */
if (ctx->task_timeout > 0.0) {
event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
task);
event_base_set (ctx->ev_base, &task->timeout_ev);
double_to_tv (ctx->task_timeout, &task_tv);
event_add (&task->timeout_ev, &task_tv);
task->timeout_ev.data = task;
ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
ctx->task_timeout, 0.0);
ev_timer_start (task->event_loop, &task->timeout_ev);
}

/* Set socket guard */
guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
#ifdef EV_CLOSED
event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
rspamd_worker_guard_handler, task);
#else
event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
rspamd_worker_guard_handler, task);
#endif
event_base_set (task->event_loop, guard_ev);
event_add (guard_ev, NULL);
task->guard_ev = guard_ev;
task->guard_ev.data = task;
ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
ev_io_start (task->event_loop, &task->guard_ev);

rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);

@@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
* Accept new connection and construct task
*/
static void
accept_socket (gint fd, short what, void *arg)
accept_socket (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker = (struct rspamd_worker *) arg;
struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_worker_ctx *ctx;
struct rspamd_task *task;
rspamd_inet_addr_t *addr;
@@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg)
}

if ((nfd =
rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
rspamd_accept_from_socket (w->fd, &addr,
rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
@@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg)
return;
}

task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base);
task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);

msg_info_task ("accepted connection from %s port %d, task ptr: %p",
rspamd_inet_address_to_string (addr),
@@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg)

rspamd_http_connection_read_message (task->http_conn,
task,
&ctx->io_tv);
ctx->timeout);
}

#ifdef WITH_HYPERSCAN
@@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg)
ctx,
G_STRUCT_OFFSET (struct rspamd_worker_ctx,
timeout),
RSPAMD_CL_FLAG_TIME_INTEGER,
RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol IO timeout");

rspamd_rcl_register_worker_option (cfg,
@@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker)
struct rspamd_worker_ctx *ctx = worker->ctx;

ctx->cfg = worker->srv->cfg;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
msec_to_tv (ctx->timeout, &ctx->io_tv);
rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
worker);

if (isnan (ctx->task_timeout)) {
@@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker)
}

ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
ctx->ev_base,
ctx->event_loop,
worker->srv->cfg);
rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->ev_base, ctx->resolver->r);
ctx->event_loop, ctx->resolver->r);

ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);

event_base_loop (ctx->ev_base, 0);
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();

rspamd_stat_close ();

+ 5
- 6
src/worker_private.h View File

@@ -30,14 +30,13 @@ struct rspamd_lang_detector;
struct rspamd_worker_ctx {
guint64 magic;
/* Events base */
struct ev_loop *ev_base;
struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
struct rspamd_config *cfg;

guint32 timeout;
struct timeval io_tv;
ev_tstamp timeout;
/* Detect whether this worker is mime worker */
gboolean is_mime;
/* Allow encrypted requests only using network */
@@ -45,7 +44,7 @@ struct rspamd_worker_ctx {
/* Limit of tasks */
guint32 max_tasks;
/* Maximum time for task processing */
gdouble task_timeout;
ev_tstamp task_timeout;
/* Encryption key */
struct rspamd_cryptobox_keypair *key;
/* Keys cache */
@@ -64,11 +63,11 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
/*
* Called on forced timeout
*/
void rspamd_task_timeout (gint fd, short what, gpointer ud);
void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);

/*
* Called on unexpected IO error (e.g. ECONNRESET)
*/
void rspamd_worker_guard_handler (gint fd, short what, void *data);
void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);

#endif

Loading…
Cancel
Save