aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-26 21:13:19 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-26 21:13:19 +0400
commit608432786ad77ce7ce071dd975d6c59d503d2302 (patch)
treed3991e93c04b0eef602afab272e7316490ba1d54
parent2e15cacc80101d91108be8aaa4ea722f31d22d6b (diff)
downloadrspamd-608432786ad77ce7ce071dd975d6c59d503d2302.tar.gz
rspamd-608432786ad77ce7ce071dd975d6c59d503d2302.zip
* Use event_base thread safe API to allow parallelism based on threads
-rw-r--r--src/buffer.c17
-rw-r--r--src/buffer.h3
-rw-r--r--src/controller.c21
-rw-r--r--src/dns.c9
-rw-r--r--src/dns.h3
-rw-r--r--src/lmtp.c9
-rw-r--r--src/lmtp_proto.c2
-rw-r--r--src/lua/lua_http.c4
-rw-r--r--src/main.h2
-rw-r--r--src/map.c9
-rw-r--r--src/map.h3
-rw-r--r--src/smtp.c17
-rw-r--r--src/smtp.h2
-rw-r--r--src/smtp_utils.c2
-rw-r--r--src/statfile_sync.c11
-rw-r--r--src/statfile_sync.h2
-rw-r--r--src/worker.c14
17 files changed, 95 insertions, 35 deletions
diff --git a/src/buffer.c b/src/buffer.c
index a1f4bee9b..c94d5f91c 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -66,6 +66,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
d->offset += off;
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
}
@@ -78,6 +79,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
}
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
@@ -98,6 +100,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
}
@@ -106,6 +109,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
else {
@@ -117,6 +121,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
}
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
@@ -137,6 +142,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
}
@@ -146,6 +152,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
else {
@@ -157,6 +164,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
}
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
@@ -215,6 +223,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
return TRUE;
}
@@ -237,12 +246,14 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
else {
/* Plan other write event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
@@ -464,6 +475,7 @@ dispatcher_cb (gint fd, short what, void *arg)
if (d->out_buffers == NULL) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
if (d->is_restored && d->write_callback) {
if (!d->write_callback (d->user_data)) {
@@ -482,7 +494,7 @@ dispatcher_cb (gint fd, short what, void *arg)
rspamd_io_dispatcher_t *
-rspamd_create_dispatcher (gint fd, enum io_policy policy,
+rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy,
dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
{
rspamd_io_dispatcher_t *new;
@@ -513,8 +525,10 @@ rspamd_create_dispatcher (gint fd, enum io_policy policy,
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
+ new->ev_base = base;
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
+ event_base_set (new->ev_base, new->ev);
event_add (new->ev, new->tv);
return new;
@@ -647,6 +661,7 @@ rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
debug_ip ("restored dispatcher");
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->is_restored = TRUE;
}
diff --git a/src/buffer.h b/src/buffer.h
index 16394ae7d..51b321833 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -52,6 +52,7 @@ typedef struct rspamd_io_dispatcher_s {
gboolean in_sendfile; /**< whether buffer is in sendfile mode */
gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */
gboolean is_restored; /**< call a callback when dispatcher is restored */
+ struct event_base *ev_base; /**< event base for io operations */
#ifndef HAVE_SENDFILE
void *map;
#endif
@@ -68,7 +69,7 @@ typedef struct rspamd_io_dispatcher_s {
* @param user_data pointer to user's data
* @return new dispatcher object or NULL in case of failure
*/
-rspamd_io_dispatcher_t* rspamd_create_dispatcher (gint fd,
+rspamd_io_dispatcher_t* rspamd_create_dispatcher (struct event_base *base, gint fd,
enum io_policy policy,
dispatcher_read_callback_t read_cb,
dispatcher_write_callback_t write_cb,
diff --git a/src/controller.c b/src/controller.c
index cb0c3cd22..07eeef0ff 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -77,6 +77,7 @@ struct rspamd_controller_ctx {
char *password;
guint32 timeout;
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
static struct controller_command commands[] = {
@@ -880,6 +881,7 @@ controller_read_socket (f_str_t * in, void *arg)
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
@@ -925,6 +927,7 @@ controller_read_socket (f_str_t * in, void *arg)
task->msg->len = in->len;
task->resolver = session->resolver;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
@@ -983,6 +986,7 @@ controller_read_socket (f_str_t * in, void *arg)
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
@@ -1166,6 +1170,7 @@ accept_socket (gint fd, short what, void *arg)
new_session->state = STATE_COMMAND;
new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
new_session->resolver = ctx->resolver;
+ new_session->ev_base = ctx->ev_base;
worker->srv->stat->control_connections_count++;
/* Set up dispatcher */
@@ -1175,7 +1180,9 @@ accept_socket (gint fd, short what, void *arg)
new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
- new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+ new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
+ controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+
if (su.ss.ss_family == AF_UNIX) {
msg_info ("accepted connection from unix socket");
new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
@@ -1217,7 +1224,7 @@ start_controller (struct rspamd_worker *worker)
worker->srv->pid = getpid ();
ctx = worker->ctx;
- event_init ();
+ ctx->ev_base = event_init ();
g_mime_init (0);
init_signals (&signals, sig_handler);
@@ -1225,13 +1232,14 @@ start_controller (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
start_time = time (NULL);
/* Start statfile synchronization */
- if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+ if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) {
msg_info ("cannot start statfile synchronization, statfiles would not be synchronized");
}
@@ -1243,14 +1251,15 @@ start_controller (struct rspamd_worker *worker)
rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
- start_map_watch ();
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
+ start_map_watch (ctx->ev_base);
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
gperf_profiler_init (worker->srv->cfg, "controller");
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
close_log (worker->srv->logger);
diff --git a/src/dns.c b/src/dns.c
index 2e9fd34c4..c3dd87a32 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -742,6 +742,7 @@ send_dns_request (struct rspamd_dns_request *req)
if (r == -1) {
if (errno == EAGAIN) {
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
+ event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
return 0;
@@ -754,6 +755,7 @@ send_dns_request (struct rspamd_dns_request *req)
}
else if (r < req->pos) {
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
+ event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
return 0;
@@ -1190,6 +1192,7 @@ dns_check_throttling (struct rspamd_dns_resolver *resolver)
/* Init throttling timeout */
resolver->throttling = TRUE;
evtimer_set (&resolver->throttling_event, dns_throttling_cb, resolver);
+ event_base_set (resolver->ev_base, &resolver->throttling_event);
event_add (&resolver->throttling_event, &resolver->throttling_time);
}
}
@@ -1329,6 +1332,7 @@ dns_retransmit_handler (gint fd, short what, void *arg)
/* Add timer event */
event_del (&req->timer_event);
evtimer_set (&req->timer_event, dns_timer_cb, req);
+ event_base_set (req->resolver->ev_base, &req->timer_event);
evtimer_add (&req->timer_event, &req->tv);
/* Add request to hash table */
@@ -1423,6 +1427,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
/* Fill timeout */
msec_to_tv (resolver->request_timeout, &req->tv);
evtimer_set (&req->timer_event, dns_timer_cb, req);
+ event_base_set (req->resolver->ev_base, &req->timer_event);
/* Now send request to server */
r = send_dns_request (req);
@@ -1498,7 +1503,7 @@ parse_resolv_conf (struct rspamd_dns_resolver *resolver)
}
struct rspamd_dns_resolver *
-dns_resolver_init (struct config_file *cfg)
+dns_resolver_init (struct event_base *ev_base, struct config_file *cfg)
{
GList *cur;
struct rspamd_dns_resolver *new;
@@ -1507,6 +1512,7 @@ dns_resolver_init (struct config_file *cfg)
struct rspamd_dns_server *serv;
new = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_dns_resolver));
+ new->ev_base = ev_base;
new->requests = g_hash_table_new (g_direct_hash, g_direct_equal);
new->permutor = memory_pool_alloc (cfg->cfg_pool, sizeof (struct dns_k_permutor));
dns_k_permutor_init (new->permutor, 0, G_MAXUINT16);
@@ -1588,6 +1594,7 @@ dns_resolver_init (struct config_file *cfg)
}
else {
event_set (&serv->ev, serv->sock, EV_READ | EV_PERSIST, dns_read_cb, new);
+ event_base_set (new->ev_base, &serv->ev);
event_add (&serv->ev, NULL);
}
}
diff --git a/src/dns.h b/src/dns.h
index 526598e6a..f174f5026 100644
--- a/src/dns.h
+++ b/src/dns.h
@@ -56,6 +56,7 @@ struct rspamd_dns_resolver {
guint errors; /**< resolver errors */
struct timeval throttling_time; /**< throttling time */
struct event throttling_event; /**< throttling event */
+ struct event_base *ev_base; /**< base for event ops */
};
struct dns_header;
@@ -231,7 +232,7 @@ struct dns_query {
/*
* Init DNS resolver, params are obtained from a config file or system file /etc/resolv.conf
*/
-struct rspamd_dns_resolver *dns_resolver_init (struct config_file *cfg);
+struct rspamd_dns_resolver *dns_resolver_init (struct event_base *ev_base, struct config_file *cfg);
/*
* Make a DNS request
diff --git a/src/lmtp.c b/src/lmtp.c
index 043c78485..96dbb2bab 100644
--- a/src/lmtp.c
+++ b/src/lmtp.c
@@ -250,13 +250,14 @@ accept_socket (gint fd, short what, void *arg)
/* Add destructor for recipients list (it would be better to use anonymous function here */
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task);
new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+ new_task->ev_base = worker->ctx;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results);
worker->srv->stat->connections_count++;
lmtp->task = new_task;
lmtp->state = LMTP_READ_LHLO;
/* Set up dispatcher */
- new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
+ new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base, nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
if (! rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
msg_warn ("cannot write greeting");
@@ -276,7 +277,7 @@ start_lmtp_worker (struct rspamd_worker *worker)
worker->srv->pid = getpid ();
worker->srv->type = TYPE_LMTP;
- event_init ();
+ worker->ctx = event_init ();
g_mime_init (0);
init_signals (&signals, sig_handler);
@@ -284,10 +285,12 @@ start_lmtp_worker (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (worker->ctx, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (worker->ctx, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
/* Perform modules configuring */
@@ -307,7 +310,7 @@ start_lmtp_worker (struct rspamd_worker *worker)
gperf_profiler_init (worker->srv->cfg, "lmtp");
- event_loop (0);
+ event_base_loop (worker->ctx, 0);
exit (EXIT_SUCCESS);
}
diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c
index 1ea64ddf4..9711b2ca5 100644
--- a/src/lmtp_proto.c
+++ b/src/lmtp_proto.c
@@ -464,7 +464,7 @@ lmtp_deliver_mta (struct worker_task *task)
cd = memory_pool_alloc (task->task_pool, sizeof (struct mta_callback_data));
cd->task = task;
cd->state = LMTP_WANT_GREETING;
- cd->dispatcher = rspamd_create_dispatcher (sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd);
+ cd->dispatcher = rspamd_create_dispatcher (task->ev_base, sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd);
return 0;
}
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index 4942b45b4..3764c96cc 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -305,8 +305,8 @@ lua_http_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
/* Create dispatcher for HTTP protocol */
msec_to_tv (ud->timeout, &tv);
- ud->io_dispatcher = rspamd_create_dispatcher (ud->fd, BUFFER_LINE, lua_http_read_cb, NULL, lua_http_err_cb,
- &tv, ud);
+ ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL,
+ lua_http_err_cb, &tv, ud);
/* Write request */
register_async_event (ud->task->s, lua_http_fin, ud, FALSE);
diff --git a/src/main.h b/src/main.h
index f979bf69a..fa72c7794 100644
--- a/src/main.h
+++ b/src/main.h
@@ -165,6 +165,7 @@ struct controller_session {
struct rspamd_async_session* s; /**< async session object */
struct worker_task *learn_task;
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
+ struct event_base *ev_base; /**< Event base */
};
typedef void (*controller_func_t)(gchar **args, struct controller_session *session);
@@ -247,6 +248,7 @@ struct worker_task {
guint32 dns_requests; /**< number of DNS requests per this task */
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
+ struct event_base *ev_base; /**< Event base */
};
/**
diff --git a/src/map.c b/src/map.c
index b37082bd0..c82bd1ceb 100644
--- a/src/map.c
+++ b/src/map.c
@@ -46,6 +46,7 @@ struct http_reply {
struct http_callback_data {
struct event ev;
+ struct event_base *ev_base;
struct timeval tv;
struct rspamd_map *map;
struct http_map_data *data;
@@ -912,6 +913,7 @@ http_async_callback (gint fd, short what, void *ud)
write_http_request (cbd->map, cbd->data, fd);
/* Plan reading */
event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
cbd->tv.tv_usec = 0;
cbd->state = 1;
@@ -997,7 +999,9 @@ http_callback (gint fd, short what, void *ud)
else {
/* Plan event */
cbd = g_malloc (sizeof (struct http_callback_data));
+ cbd->ev_base = map->ev_base;
event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
cbd->tv.tv_usec = 0;
cbd->map = map;
@@ -1011,7 +1015,7 @@ http_callback (gint fd, short what, void *ud)
/* Start watching event for all maps */
void
-start_map_watch (void)
+start_map_watch (struct event_base *ev_base)
{
GList *cur = maps;
struct rspamd_map *map;
@@ -1019,8 +1023,10 @@ start_map_watch (void)
/* First of all do synced read of data */
while (cur) {
map = cur->data;
+ map->ev_base = ev_base;
if (map->protocol == PROTO_FILE) {
evtimer_set (&map->ev, file_callback, map);
+ event_base_set (map->ev_base, &map->ev);
/* Read initial data */
read_map_file (map, map->map_data);
/* Plan event with jitter */
@@ -1030,6 +1036,7 @@ start_map_watch (void)
}
else if (map->protocol == PROTO_HTTP) {
evtimer_set (&map->ev, http_callback, map);
+ event_base_set (map->ev_base, &map->ev);
/* Read initial data */
read_http_sync (map, map->map_data);
/* Plan event with jitter */
diff --git a/src/map.h b/src/map.h
index 23e38a786..f4aae10ce 100644
--- a/src/map.h
+++ b/src/map.h
@@ -66,6 +66,7 @@ struct rspamd_map {
void **user_data;
struct event ev;
struct timeval tv;
+ struct event_base *ev_base;
void *map_data;
};
@@ -81,7 +82,7 @@ gboolean add_map (const gchar *map_line, map_cb_t read_callback, map_fin_cb_t fi
/**
* Start watching of maps by adding events to libevent event loop
*/
-void start_map_watch (void);
+void start_map_watch (struct event_base *ev_base);
/**
* Remove all maps watched (remove events)
diff --git a/src/smtp.c b/src/smtp.c
index 5e25fb568..3c6726d97 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -684,6 +684,7 @@ accept_socket (gint fd, short what, void *arg)
session->cfg = worker->srv->cfg;
session->session_time = time (NULL);
session->resolver = ctx->resolver;
+ session->ev_base = ctx->ev_base;
worker->srv->stat->connections_count++;
/* Resolve client's addr */
@@ -698,7 +699,7 @@ accept_socket (gint fd, short what, void *arg)
return;
}
else {
- session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE,
+ session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_LINE,
smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session);
session->dispatcher->peer_addr = session->client_addr.s_addr;
}
@@ -963,8 +964,6 @@ config_smtp_worker (struct rspamd_worker *worker)
if ((value = ctx->smtp_capabilities_str) != NULL) {
make_capabilities (ctx, value);
}
-
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
return TRUE;
}
@@ -977,11 +976,12 @@ void
start_smtp_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
+ struct smtp_worker_ctx *ctx = worker->ctx;
gperf_profiler_init (worker->srv->cfg, "worker");
worker->srv->pid = getpid ();
- event_init ();
+ ctx->ev_base = event_init ();
/* Set smtp options */
if ( !config_smtp_worker (worker)) {
@@ -994,19 +994,24 @@ start_smtp_worker (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
/* Maps events */
- start_map_watch ();
+ start_map_watch (ctx->ev_base);
+
+ /* DNS resolver */
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
/* Set umask */
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
diff --git a/src/smtp.h b/src/smtp.h
index a5a9533bc..00d8c3abf 100644
--- a/src/smtp.h
+++ b/src/smtp.h
@@ -51,6 +51,7 @@ struct smtp_worker_ctx {
gchar *metric;
GList *smtp_filters[SMTP_STAGE_MAX];
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
enum rspamd_smtp_state {
@@ -110,6 +111,7 @@ struct smtp_session {
gboolean resolved;
gboolean esmtp;
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
typedef gboolean (*smtp_filter_t)(struct smtp_session *session, gpointer filter_data);
diff --git a/src/smtp_utils.c b/src/smtp_utils.c
index 1ed92ead6..c56397d17 100644
--- a/src/smtp_utils.c
+++ b/src/smtp_utils.c
@@ -89,7 +89,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
return FALSE;
}
/* Create a dispatcher for upstream connection */
- session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE,
+ session->upstream_dispatcher = rspamd_create_dispatcher (session->ev_base, session->upstream_sock, BUFFER_LINE,
smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket,
&session->ctx->smtp_timeout, session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
diff --git a/src/statfile_sync.c b/src/statfile_sync.c
index e96555c15..4595af85f 100644
--- a/src/statfile_sync.c
+++ b/src/statfile_sync.c
@@ -44,6 +44,7 @@ struct rspamd_sync_ctx {
stat_file_t *real_statfile;
statfile_pool_t *pool;
rspamd_io_dispatcher_t *dispatcher;
+ struct event_base *ev_base;
struct event tm_ev;
@@ -268,7 +269,7 @@ sync_timer_callback (gint fd, short what, void *ud)
}
/* Now create and activate dispatcher */
msec_to_tv (ctx->timeout, &ctx->io_tv);
- ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
+ ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
ctx->state = SYNC_STATE_GREETING;
ctx->is_busy = TRUE;
@@ -278,7 +279,7 @@ sync_timer_callback (gint fd, short what, void *ud)
}
static gboolean
-add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg)
+add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg, struct event_base *ev_base)
{
struct rspamd_sync_ctx *ctx;
guint32 jittered_interval;
@@ -289,6 +290,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi
ctx->st = st;
ctx->timeout = cfg->statfile_sync_timeout;
ctx->sync_interval = cfg->statfile_sync_interval;
+ ctx->ev_base = ev_base;
/* Add some jittering for synchronization */
jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
msec_to_tv (jittered_interval, &ctx->interval);
@@ -305,6 +307,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi
}
/* Now plan event for it's future executing */
evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+ event_base_set (ctx->ev_base, &ctx->tm_ev);
evtimer_add (&ctx->tm_ev, &ctx->interval);
log_next_sync (st->symbol, ctx->interval.tv_sec);
}
@@ -317,7 +320,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi
}
gboolean
-start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
+start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base)
{
GList *cur, *l;
struct classifier_config *cl;
@@ -334,7 +337,7 @@ start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
while (l) {
st = l->data;
if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
- if (!add_statfile_watch (pool, st, cfg)) {
+ if (!add_statfile_watch (pool, st, cfg, ev_base)) {
return FALSE;
}
}
diff --git a/src/statfile_sync.h b/src/statfile_sync.h
index fcc305b55..b3abb8b91 100644
--- a/src/statfile_sync.h
+++ b/src/statfile_sync.h
@@ -9,6 +9,6 @@
/*
* Start synchronization of statfiles. Must be called after event_init as it adds events
*/
-gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg);
+gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base);
#endif
diff --git a/src/worker.c b/src/worker.c
index fb67b119e..bd2c5de2e 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -91,6 +91,7 @@ struct rspamd_worker_ctx {
guint32 tasks;
/* Limit of tasks */
guint32 max_tasks;
+ struct event_base *ev_base;
};
static gboolean write_socket (void *arg);
@@ -477,9 +478,10 @@ accept_socket (gint fd, short what, void *arg)
/* Set up dispatcher */
new_task->dispatcher =
- rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket,
+ rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket,
err_socket, &ctx->io_tv, (void *) new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+ new_task->ev_base = ctx->ev_base;
ctx->tasks ++;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
@@ -638,18 +640,20 @@ start_worker (struct rspamd_worker *worker)
worker->srv->pid = getpid ();
- event_init ();
+ ctx->ev_base = event_init ();
init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
accept_socket, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
@@ -665,14 +669,14 @@ start_worker (struct rspamd_worker *worker)
else {
#endif
/* Maps events */
- start_map_watch ();
+ start_map_watch (ctx->ev_base);
#ifndef BUILD_STATIC
}
#endif
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
#ifndef BUILD_STATIC
if (ctx->is_custom) {