diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-26 21:13:19 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-26 21:13:19 +0400 |
commit | 608432786ad77ce7ce071dd975d6c59d503d2302 (patch) | |
tree | d3991e93c04b0eef602afab272e7316490ba1d54 /src/controller.c | |
parent | 2e15cacc80101d91108be8aaa4ea722f31d22d6b (diff) | |
download | rspamd-608432786ad77ce7ce071dd975d6c59d503d2302.tar.gz rspamd-608432786ad77ce7ce071dd975d6c59d503d2302.zip |
* Use event_base thread safe API to allow parallelism based on threads
Diffstat (limited to 'src/controller.c')
-rw-r--r-- | src/controller.c | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/src/controller.c b/src/controller.c index cb0c3cd22..07eeef0ff 100644 --- a/src/controller.c +++ b/src/controller.c @@ -77,6 +77,7 @@ struct rspamd_controller_ctx { char *password; guint32 timeout; struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; }; static struct controller_command commands[] = { @@ -880,6 +881,7 @@ controller_read_socket (f_str_t * in, void *arg) task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); task->msg->begin = in->begin; task->msg->len = in->len; + task->ev_base = session->ev_base; r = process_message (task); if (r == -1) { @@ -925,6 +927,7 @@ controller_read_socket (f_str_t * in, void *arg) task->msg->len = in->len; task->resolver = session->resolver; + task->ev_base = session->ev_base; r = process_message (task); if (r == -1) { @@ -983,6 +986,7 @@ controller_read_socket (f_str_t * in, void *arg) task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); task->msg->begin = in->begin; task->msg->len = in->len; + task->ev_base = session->ev_base; r = process_message (task); if (r == -1) { @@ -1166,6 +1170,7 @@ accept_socket (gint fd, short what, void *arg) new_session->state = STATE_COMMAND; new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1); new_session->resolver = ctx->resolver; + new_session->ev_base = ctx->ev_base; worker->srv->stat->control_connections_count++; /* Set up dispatcher */ @@ -1175,7 +1180,9 @@ accept_socket (gint fd, short what, void *arg) new_session->s = new_async_session (new_session->session_pool, free_session, new_session); - new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); + new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket, + controller_write_socket, controller_err_socket, io_tv, (void *)new_session); + if (su.ss.ss_family == AF_UNIX) { msg_info ("accepted connection from unix socket"); new_session->dispatcher->peer_addr = INADDR_LOOPBACK; @@ -1217,7 +1224,7 @@ start_controller (struct rspamd_worker *worker) worker->srv->pid = getpid (); ctx = worker->ctx; - event_init (); + ctx->ev_base = event_init (); g_mime_init (0); init_signals (&signals, sig_handler); @@ -1225,13 +1232,14 @@ start_controller (struct rspamd_worker *worker) /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); + event_base_set (ctx->ev_base, &worker->sig_ev); signal_add (&worker->sig_ev, NULL); start_time = time (NULL); /* Start statfile synchronization */ - if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) { + if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) { msg_info ("cannot start statfile synchronization, statfiles would not be synchronized"); } @@ -1243,14 +1251,15 @@ start_controller (struct rspamd_worker *worker) rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf); /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_base_set (ctx->ev_base, &worker->bind_ev); event_add (&worker->bind_ev, NULL); - start_map_watch (); - ctx->resolver = dns_resolver_init (worker->srv->cfg); + start_map_watch (ctx->ev_base); + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); gperf_profiler_init (worker->srv->cfg, "controller"); - event_loop (0); + event_base_loop (ctx->ev_base, 0); close_log (worker->srv->logger); |