aboutsummaryrefslogtreecommitdiffstats
path: root/src/controller.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-26 21:13:19 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-26 21:13:19 +0400
commit608432786ad77ce7ce071dd975d6c59d503d2302 (patch)
treed3991e93c04b0eef602afab272e7316490ba1d54 /src/controller.c
parent2e15cacc80101d91108be8aaa4ea722f31d22d6b (diff)
downloadrspamd-608432786ad77ce7ce071dd975d6c59d503d2302.tar.gz
rspamd-608432786ad77ce7ce071dd975d6c59d503d2302.zip
* Use event_base thread safe API to allow parallelism based on threads
Diffstat (limited to 'src/controller.c')
-rw-r--r--src/controller.c21
1 files changed, 15 insertions, 6 deletions
diff --git a/src/controller.c b/src/controller.c
index cb0c3cd22..07eeef0ff 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -77,6 +77,7 @@ struct rspamd_controller_ctx {
char *password;
guint32 timeout;
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
static struct controller_command commands[] = {
@@ -880,6 +881,7 @@ controller_read_socket (f_str_t * in, void *arg)
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
@@ -925,6 +927,7 @@ controller_read_socket (f_str_t * in, void *arg)
task->msg->len = in->len;
task->resolver = session->resolver;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
@@ -983,6 +986,7 @@ controller_read_socket (f_str_t * in, void *arg)
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
@@ -1166,6 +1170,7 @@ accept_socket (gint fd, short what, void *arg)
new_session->state = STATE_COMMAND;
new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
new_session->resolver = ctx->resolver;
+ new_session->ev_base = ctx->ev_base;
worker->srv->stat->control_connections_count++;
/* Set up dispatcher */
@@ -1175,7 +1180,9 @@ accept_socket (gint fd, short what, void *arg)
new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
- new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+ new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
+ controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+
if (su.ss.ss_family == AF_UNIX) {
msg_info ("accepted connection from unix socket");
new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
@@ -1217,7 +1224,7 @@ start_controller (struct rspamd_worker *worker)
worker->srv->pid = getpid ();
ctx = worker->ctx;
- event_init ();
+ ctx->ev_base = event_init ();
g_mime_init (0);
init_signals (&signals, sig_handler);
@@ -1225,13 +1232,14 @@ start_controller (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
start_time = time (NULL);
/* Start statfile synchronization */
- if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+ if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) {
msg_info ("cannot start statfile synchronization, statfiles would not be synchronized");
}
@@ -1243,14 +1251,15 @@ start_controller (struct rspamd_worker *worker)
rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
- start_map_watch ();
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
+ start_map_watch (ctx->ev_base);
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
gperf_profiler_init (worker->srv->cfg, "controller");
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
close_log (worker->srv->logger);