aboutsummaryrefslogtreecommitdiffstats
path: root/src/rspamd_proxy.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-19 18:23:46 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commitdc96f9b37ff98c12b7aeacd348162950c129098f (patch)
tree22b6f950bd71a90b020cdd708de5d1bd7f3e1dde /src/rspamd_proxy.c
parent63f823eb9d6b4cfed6c3014ab350dfc61f33cb28 (diff)
downloadrspamd-dc96f9b37ff98c12b7aeacd348162950c129098f.tar.gz
rspamd-dc96f9b37ff98c12b7aeacd348162950c129098f.zip
[Project] Make it compileable again...
Diffstat (limited to 'src/rspamd_proxy.c')
-rw-r--r--src/rspamd_proxy.c86
1 files changed, 36 insertions, 50 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 5907a7ba4..b14bd086a 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -86,7 +86,6 @@ struct rspamd_http_upstream {
struct upstream_list *u;
struct rspamd_cryptobox_pubkey *key;
gdouble timeout;
- struct timeval io_tv;
gint parser_from_ref;
gint parser_to_ref;
gboolean local;
@@ -101,7 +100,6 @@ struct rspamd_http_mirror {
struct rspamd_cryptobox_pubkey *key;
gdouble prob;
gdouble timeout;
- struct timeval io_tv;
gint parser_from_ref;
gint parser_to_ref;
gboolean local;
@@ -113,14 +111,13 @@ static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
struct rspamd_proxy_ctx {
guint64 magic;
/* Events base */
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
struct rspamd_config *cfg;
/* END OF COMMON PART */
gdouble timeout;
- struct timeval io_tv;
/* Encryption key for clients */
struct rspamd_cryptobox_keypair *key;
/* HTTP context */
@@ -174,8 +171,8 @@ struct rspamd_proxy_backend_connection {
ucl_object_t *results;
const gchar *err;
struct rspamd_proxy_session *s;
- struct timeval *io_tv;
gint backend_sock;
+ ev_tstamp timeout;
enum rspamd_backend_flags flags;
gint parser_from_ref;
gint parser_to_ref;
@@ -464,8 +461,6 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
rspamd_lua_add_ref_dtor (L, pool, up->parser_to_ref);
}
- double_to_tv (up->timeout, &up->io_tv);
-
g_hash_table_insert (ctx->upstreams, up->name, up);
return TRUE;
@@ -617,8 +612,6 @@ rspamd_proxy_parse_mirror (rspamd_mempool_t *pool,
up->settings_id = rspamd_mempool_strdup (pool, ucl_object_tostring (elt));
}
- double_to_tv (up->timeout, &up->io_tv);
-
g_ptr_array_add (ctx->mirrors, up);
return TRUE;
@@ -1144,8 +1137,6 @@ proxy_request_decompress (struct rspamd_http_message *msg)
rspamd_http_message_set_body_from_fstring_steal (msg, body);
rspamd_http_message_remove_header (msg, "Compression");
}
-
- return;
}
static struct rspamd_proxy_session *
@@ -1350,7 +1341,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
sizeof (*bk_conn));
bk_conn->s = session;
bk_conn->name = m->name;
- bk_conn->io_tv = &m->io_tv;
+ bk_conn->timeout = m->timeout;
bk_conn->up = rspamd_upstream_get (m->u,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
@@ -1415,7 +1406,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
msg->method = HTTP_GET;
rspamd_http_connection_write_message_shared (bk_conn->backend_conn,
msg, NULL, NULL, bk_conn,
- bk_conn->io_tv);
+ bk_conn->timeout);
}
else {
if (session->fname) {
@@ -1442,7 +1433,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
rspamd_http_connection_write_message (bk_conn->backend_conn,
msg, NULL, NULL, bk_conn,
- bk_conn->io_tv);
+ bk_conn->timeout);
}
g_ptr_array_add (session->mirror_conns, bk_conn);
@@ -1468,7 +1459,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code,
reply->status = rspamd_fstring_new_init (status, strlen (status));
rspamd_http_connection_write_message (session->client_conn,
reply, NULL, NULL, session,
- &session->ctx->io_tv);
+ session->ctx->timeout);
}
}
@@ -1566,7 +1557,7 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
else {
rspamd_http_connection_write_message (session->client_conn,
msg, NULL, NULL, session,
- bk_conn->io_tv);
+ bk_conn->timeout);
}
return 0;
@@ -1625,7 +1616,7 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task)
NULL,
ctype,
session,
- NULL);
+ 0);
}
}
@@ -1666,7 +1657,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
msg = session->client_message;
task = rspamd_task_new (session->worker, session->ctx->cfg,
session->pool, session->ctx->lang_det,
- session->ctx->ev_base);
+ session->ctx->event_loop);
task->flags |= RSPAMD_TASK_FLAG_MIME;
task->sock = -1;
@@ -1711,23 +1702,18 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
/* Set global timeout for the task */
if (session->ctx->default_upstream->timeout > 0.0) {
- struct timeval task_tv;
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ session->ctx->default_upstream->timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (session->ctx->ev_base, &task->timeout_ev);
- double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
}
else if (session->ctx->has_self_scan) {
if (session->ctx->cfg->task_timeout > 0) {
- struct timeval task_tv;
-
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (session->ctx->ev_base, &task->timeout_ev);
- double_to_tv (session->ctx->cfg->task_timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ session->ctx->cfg->task_timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
}
}
@@ -1783,7 +1769,7 @@ retry:
session->master_conn->up = rspamd_upstream_get (backend->u,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
- session->master_conn->io_tv = &backend->io_tv;
+ session->master_conn->timeout = backend->timeout;
if (session->master_conn->up == NULL) {
msg_err_session ("cannot select upstream for %s",
@@ -1853,7 +1839,7 @@ retry:
rspamd_http_connection_write_message_shared (
session->master_conn->backend_conn,
msg, NULL, NULL, session->master_conn,
- session->master_conn->io_tv);
+ session->master_conn->timeout);
}
else {
if (session->fname) {
@@ -1881,7 +1867,7 @@ retry:
rspamd_http_connection_write_message (
session->master_conn->backend_conn,
msg, NULL, NULL, session->master_conn,
- session->master_conn->io_tv);
+ session->master_conn->timeout);
}
}
@@ -2031,9 +2017,9 @@ proxy_milter_error_handler (gint fd,
}
static void
-proxy_accept_socket (gint fd, short what, void *arg)
+proxy_accept_socket (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct rspamd_proxy_ctx *ctx;
rspamd_inet_addr_t *addr;
struct rspamd_proxy_session *session;
@@ -2042,7 +2028,8 @@ proxy_accept_socket (gint fd, short what, void *arg)
ctx = worker->ctx;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
@@ -2086,7 +2073,7 @@ proxy_accept_socket (gint fd, short what, void *arg)
rspamd_http_connection_read_message_shared (session->client_conn,
session,
- &ctx->io_tv);
+ session->ctx->timeout);
}
else {
msg_info_session ("accepted milter connection from %s port %d",
@@ -2110,9 +2097,9 @@ proxy_accept_socket (gint fd, short what, void *arg)
}
#endif
- rspamd_milter_handle_socket (nfd, NULL,
+ rspamd_milter_handle_socket (nfd, 0.0,
session->pool,
- ctx->ev_base,
+ ctx->event_loop,
proxy_milter_finish_handler,
proxy_milter_error_handler,
session);
@@ -2153,30 +2140,29 @@ start_rspamd_proxy (struct rspamd_worker *worker)
struct rspamd_proxy_ctx *ctx = worker->ctx;
ctx->cfg = worker->srv->cfg;
- ctx->ev_base = rspamd_prepare_worker (worker, "rspamd_proxy",
+ ctx->event_loop = rspamd_prepare_worker (worker, "rspamd_proxy",
proxy_accept_socket);
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
- ctx->ev_base,
+ ctx->event_loop,
worker->srv->cfg);
- double_to_tv (ctx->timeout, &ctx->io_tv);
- rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
- ctx->ev_base, ctx->resolver->r);
+ ctx->event_loop, ctx->resolver->r);
- ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+ ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
if (ctx->has_self_scan) {
/* Additional initialisation needed */
- rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+ rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
}
if (worker->srv->cfg->enable_sessions_cache) {
ctx->sessions_cache = rspamd_worker_session_cache_new (worker,
- ctx->ev_base);
+ ctx->event_loop);
}
ctx->milter_ctx.spam_header = ctx->spam_header;
@@ -2188,11 +2174,11 @@ start_rspamd_proxy (struct rspamd_worker *worker)
ctx->milter_ctx.cfg = ctx->cfg;
rspamd_milter_init_library (&ctx->milter_ctx);
- rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+ rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
adjust_upstreams_limits (ctx);
- event_base_loop (ctx->ev_base, 0);
+ ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
if (ctx->has_self_scan) {