From 608432786ad77ce7ce071dd975d6c59d503d2302 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 26 Oct 2011 21:13:19 +0400 Subject: [PATCH] * Use event_base thread safe API to allow parallelism based on threads --- src/buffer.c | 17 ++++++++++++++++- src/buffer.h | 3 ++- src/controller.c | 21 +++++++++++++++------ src/dns.c | 9 ++++++++- src/dns.h | 3 ++- src/lmtp.c | 9 ++++++--- src/lmtp_proto.c | 2 +- src/lua/lua_http.c | 4 ++-- src/main.h | 2 ++ src/map.c | 9 ++++++++- src/map.h | 3 ++- src/smtp.c | 17 +++++++++++------ src/smtp.h | 2 ++ src/smtp_utils.c | 2 +- src/statfile_sync.c | 11 +++++++---- src/statfile_sync.h | 2 +- src/worker.c | 14 +++++++++----- 17 files changed, 95 insertions(+), 35 deletions(-) diff --git a/src/buffer.c b/src/buffer.c index a1f4bee9b..c94d5f91c 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -66,6 +66,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) d->offset += off; event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } } @@ -78,6 +79,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } event_del (d->ev); event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } @@ -98,6 +100,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } } @@ -106,6 +109,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } else { @@ -117,6 +121,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } event_del (d->ev); event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } @@ -137,6 +142,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } } @@ -146,6 +152,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } else { @@ -157,6 +164,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } event_del (d->ev); event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } @@ -215,6 +223,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); return TRUE; } @@ -237,12 +246,14 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } else { /* Plan other write event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); } @@ -464,6 +475,7 @@ dispatcher_cb (gint fd, short what, void *arg) if (d->out_buffers == NULL) { event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); if (d->is_restored && d->write_callback) { if (!d->write_callback (d->user_data)) { @@ -482,7 +494,7 @@ dispatcher_cb (gint fd, short what, void *arg) rspamd_io_dispatcher_t * -rspamd_create_dispatcher (gint fd, enum io_policy policy, +rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy, dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data) { rspamd_io_dispatcher_t *new; @@ -513,8 +525,10 @@ rspamd_create_dispatcher (gint fd, enum io_policy policy, new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); new->fd = fd; + new->ev_base = base; event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); + event_base_set (new->ev_base, new->ev); event_add (new->ev, new->tv); return new; @@ -647,6 +661,7 @@ rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d) debug_ip ("restored dispatcher"); event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d); + event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->is_restored = TRUE; } diff --git a/src/buffer.h b/src/buffer.h index 16394ae7d..51b321833 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -52,6 +52,7 @@ typedef struct rspamd_io_dispatcher_s { gboolean in_sendfile; /**< whether buffer is in sendfile mode */ gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */ gboolean is_restored; /**< call a callback when dispatcher is restored */ + struct event_base *ev_base; /**< event base for io operations */ #ifndef HAVE_SENDFILE void *map; #endif @@ -68,7 +69,7 @@ typedef struct rspamd_io_dispatcher_s { * @param user_data pointer to user's data * @return new dispatcher object or NULL in case of failure */ -rspamd_io_dispatcher_t* rspamd_create_dispatcher (gint fd, +rspamd_io_dispatcher_t* rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy, dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, diff --git a/src/controller.c b/src/controller.c index cb0c3cd22..07eeef0ff 100644 --- a/src/controller.c +++ b/src/controller.c @@ -77,6 +77,7 @@ struct rspamd_controller_ctx { char *password; guint32 timeout; struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; }; static struct controller_command commands[] = { @@ -880,6 +881,7 @@ controller_read_socket (f_str_t * in, void *arg) task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); task->msg->begin = in->begin; task->msg->len = in->len; + task->ev_base = session->ev_base; r = process_message (task); if (r == -1) { @@ -925,6 +927,7 @@ controller_read_socket (f_str_t * in, void *arg) task->msg->len = in->len; task->resolver = session->resolver; + task->ev_base = session->ev_base; r = process_message (task); if (r == -1) { @@ -983,6 +986,7 @@ controller_read_socket (f_str_t * in, void *arg) task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); task->msg->begin = in->begin; task->msg->len = in->len; + task->ev_base = session->ev_base; r = process_message (task); if (r == -1) { @@ -1166,6 +1170,7 @@ accept_socket (gint fd, short what, void *arg) new_session->state = STATE_COMMAND; new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1); new_session->resolver = ctx->resolver; + new_session->ev_base = ctx->ev_base; worker->srv->stat->control_connections_count++; /* Set up dispatcher */ @@ -1175,7 +1180,9 @@ accept_socket (gint fd, short what, void *arg) new_session->s = new_async_session (new_session->session_pool, free_session, new_session); - new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); + new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket, + controller_write_socket, controller_err_socket, io_tv, (void *)new_session); + if (su.ss.ss_family == AF_UNIX) { msg_info ("accepted connection from unix socket"); new_session->dispatcher->peer_addr = INADDR_LOOPBACK; @@ -1217,7 +1224,7 @@ start_controller (struct rspamd_worker *worker) worker->srv->pid = getpid (); ctx = worker->ctx; - event_init (); + ctx->ev_base = event_init (); g_mime_init (0); init_signals (&signals, sig_handler); @@ -1225,13 +1232,14 @@ start_controller (struct rspamd_worker *worker) /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); + event_base_set (ctx->ev_base, &worker->sig_ev); signal_add (&worker->sig_ev, NULL); start_time = time (NULL); /* Start statfile synchronization */ - if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) { + if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) { msg_info ("cannot start statfile synchronization, statfiles would not be synchronized"); } @@ -1243,14 +1251,15 @@ start_controller (struct rspamd_worker *worker) rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf); /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_base_set (ctx->ev_base, &worker->bind_ev); event_add (&worker->bind_ev, NULL); - start_map_watch (); - ctx->resolver = dns_resolver_init (worker->srv->cfg); + start_map_watch (ctx->ev_base); + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); gperf_profiler_init (worker->srv->cfg, "controller"); - event_loop (0); + event_base_loop (ctx->ev_base, 0); close_log (worker->srv->logger); diff --git a/src/dns.c b/src/dns.c index 2e9fd34c4..c3dd87a32 100644 --- a/src/dns.c +++ b/src/dns.c @@ -742,6 +742,7 @@ send_dns_request (struct rspamd_dns_request *req) if (r == -1) { if (errno == EAGAIN) { event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req); + event_base_set (req->resolver->ev_base, &req->io_event); event_add (&req->io_event, &req->tv); register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE); return 0; @@ -754,6 +755,7 @@ send_dns_request (struct rspamd_dns_request *req) } else if (r < req->pos) { event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req); + event_base_set (req->resolver->ev_base, &req->io_event); event_add (&req->io_event, &req->tv); register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE); return 0; @@ -1190,6 +1192,7 @@ dns_check_throttling (struct rspamd_dns_resolver *resolver) /* Init throttling timeout */ resolver->throttling = TRUE; evtimer_set (&resolver->throttling_event, dns_throttling_cb, resolver); + event_base_set (resolver->ev_base, &resolver->throttling_event); event_add (&resolver->throttling_event, &resolver->throttling_time); } } @@ -1329,6 +1332,7 @@ dns_retransmit_handler (gint fd, short what, void *arg) /* Add timer event */ event_del (&req->timer_event); evtimer_set (&req->timer_event, dns_timer_cb, req); + event_base_set (req->resolver->ev_base, &req->timer_event); evtimer_add (&req->timer_event, &req->tv); /* Add request to hash table */ @@ -1423,6 +1427,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver, /* Fill timeout */ msec_to_tv (resolver->request_timeout, &req->tv); evtimer_set (&req->timer_event, dns_timer_cb, req); + event_base_set (req->resolver->ev_base, &req->timer_event); /* Now send request to server */ r = send_dns_request (req); @@ -1498,7 +1503,7 @@ parse_resolv_conf (struct rspamd_dns_resolver *resolver) } struct rspamd_dns_resolver * -dns_resolver_init (struct config_file *cfg) +dns_resolver_init (struct event_base *ev_base, struct config_file *cfg) { GList *cur; struct rspamd_dns_resolver *new; @@ -1507,6 +1512,7 @@ dns_resolver_init (struct config_file *cfg) struct rspamd_dns_server *serv; new = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_dns_resolver)); + new->ev_base = ev_base; new->requests = g_hash_table_new (g_direct_hash, g_direct_equal); new->permutor = memory_pool_alloc (cfg->cfg_pool, sizeof (struct dns_k_permutor)); dns_k_permutor_init (new->permutor, 0, G_MAXUINT16); @@ -1588,6 +1594,7 @@ dns_resolver_init (struct config_file *cfg) } else { event_set (&serv->ev, serv->sock, EV_READ | EV_PERSIST, dns_read_cb, new); + event_base_set (new->ev_base, &serv->ev); event_add (&serv->ev, NULL); } } diff --git a/src/dns.h b/src/dns.h index 526598e6a..f174f5026 100644 --- a/src/dns.h +++ b/src/dns.h @@ -56,6 +56,7 @@ struct rspamd_dns_resolver { guint errors; /**< resolver errors */ struct timeval throttling_time; /**< throttling time */ struct event throttling_event; /**< throttling event */ + struct event_base *ev_base; /**< base for event ops */ }; struct dns_header; @@ -231,7 +232,7 @@ struct dns_query { /* * Init DNS resolver, params are obtained from a config file or system file /etc/resolv.conf */ -struct rspamd_dns_resolver *dns_resolver_init (struct config_file *cfg); +struct rspamd_dns_resolver *dns_resolver_init (struct event_base *ev_base, struct config_file *cfg); /* * Make a DNS request diff --git a/src/lmtp.c b/src/lmtp.c index 043c78485..96dbb2bab 100644 --- a/src/lmtp.c +++ b/src/lmtp.c @@ -250,13 +250,14 @@ accept_socket (gint fd, short what, void *arg) /* Add destructor for recipients list (it would be better to use anonymous function here */ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task); new_task->results = g_hash_table_new (g_str_hash, g_str_equal); + new_task->ev_base = worker->ctx; memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results); worker->srv->stat->connections_count++; lmtp->task = new_task; lmtp->state = LMTP_READ_LHLO; /* Set up dispatcher */ - new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp); + new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base, nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; if (! rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) { msg_warn ("cannot write greeting"); @@ -276,7 +277,7 @@ start_lmtp_worker (struct rspamd_worker *worker) worker->srv->pid = getpid (); worker->srv->type = TYPE_LMTP; - event_init (); + worker->ctx = event_init (); g_mime_init (0); init_signals (&signals, sig_handler); @@ -284,10 +285,12 @@ start_lmtp_worker (struct rspamd_worker *worker) /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); + event_base_set (worker->ctx, &worker->sig_ev); signal_add (&worker->sig_ev, NULL); /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_base_set (worker->ctx, &worker->bind_ev); event_add (&worker->bind_ev, NULL); /* Perform modules configuring */ @@ -307,7 +310,7 @@ start_lmtp_worker (struct rspamd_worker *worker) gperf_profiler_init (worker->srv->cfg, "lmtp"); - event_loop (0); + event_base_loop (worker->ctx, 0); exit (EXIT_SUCCESS); } diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c index 1ea64ddf4..9711b2ca5 100644 --- a/src/lmtp_proto.c +++ b/src/lmtp_proto.c @@ -464,7 +464,7 @@ lmtp_deliver_mta (struct worker_task *task) cd = memory_pool_alloc (task->task_pool, sizeof (struct mta_callback_data)); cd->task = task; cd->state = LMTP_WANT_GREETING; - cd->dispatcher = rspamd_create_dispatcher (sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd); + cd->dispatcher = rspamd_create_dispatcher (task->ev_base, sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd); return 0; } diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 4942b45b4..3764c96cc 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -305,8 +305,8 @@ lua_http_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) /* Create dispatcher for HTTP protocol */ msec_to_tv (ud->timeout, &tv); - ud->io_dispatcher = rspamd_create_dispatcher (ud->fd, BUFFER_LINE, lua_http_read_cb, NULL, lua_http_err_cb, - &tv, ud); + ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL, + lua_http_err_cb, &tv, ud); /* Write request */ register_async_event (ud->task->s, lua_http_fin, ud, FALSE); diff --git a/src/main.h b/src/main.h index f979bf69a..fa72c7794 100644 --- a/src/main.h +++ b/src/main.h @@ -165,6 +165,7 @@ struct controller_session { struct rspamd_async_session* s; /**< async session object */ struct worker_task *learn_task; struct rspamd_dns_resolver *resolver; /**< DNS resolver */ + struct event_base *ev_base; /**< Event base */ }; typedef void (*controller_func_t)(gchar **args, struct controller_session *session); @@ -247,6 +248,7 @@ struct worker_task { guint32 dns_requests; /**< number of DNS requests per this task */ struct rspamd_dns_resolver *resolver; /**< DNS resolver */ + struct event_base *ev_base; /**< Event base */ }; /** diff --git a/src/map.c b/src/map.c index b37082bd0..c82bd1ceb 100644 --- a/src/map.c +++ b/src/map.c @@ -46,6 +46,7 @@ struct http_reply { struct http_callback_data { struct event ev; + struct event_base *ev_base; struct timeval tv; struct rspamd_map *map; struct http_map_data *data; @@ -912,6 +913,7 @@ http_async_callback (gint fd, short what, void *ud) write_http_request (cbd->map, cbd->data, fd); /* Plan reading */ event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd); + event_base_set (cbd->ev_base, &cbd->ev); cbd->tv.tv_sec = HTTP_READ_TIMEOUT; cbd->tv.tv_usec = 0; cbd->state = 1; @@ -997,7 +999,9 @@ http_callback (gint fd, short what, void *ud) else { /* Plan event */ cbd = g_malloc (sizeof (struct http_callback_data)); + cbd->ev_base = map->ev_base; event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd); + event_base_set (cbd->ev_base, &cbd->ev); cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT; cbd->tv.tv_usec = 0; cbd->map = map; @@ -1011,7 +1015,7 @@ http_callback (gint fd, short what, void *ud) /* Start watching event for all maps */ void -start_map_watch (void) +start_map_watch (struct event_base *ev_base) { GList *cur = maps; struct rspamd_map *map; @@ -1019,8 +1023,10 @@ start_map_watch (void) /* First of all do synced read of data */ while (cur) { map = cur->data; + map->ev_base = ev_base; if (map->protocol == PROTO_FILE) { evtimer_set (&map->ev, file_callback, map); + event_base_set (map->ev_base, &map->ev); /* Read initial data */ read_map_file (map, map->map_data); /* Plan event with jitter */ @@ -1030,6 +1036,7 @@ start_map_watch (void) } else if (map->protocol == PROTO_HTTP) { evtimer_set (&map->ev, http_callback, map); + event_base_set (map->ev_base, &map->ev); /* Read initial data */ read_http_sync (map, map->map_data); /* Plan event with jitter */ diff --git a/src/map.h b/src/map.h index 23e38a786..f4aae10ce 100644 --- a/src/map.h +++ b/src/map.h @@ -66,6 +66,7 @@ struct rspamd_map { void **user_data; struct event ev; struct timeval tv; + struct event_base *ev_base; void *map_data; }; @@ -81,7 +82,7 @@ gboolean add_map (const gchar *map_line, map_cb_t read_callback, map_fin_cb_t fi /** * Start watching of maps by adding events to libevent event loop */ -void start_map_watch (void); +void start_map_watch (struct event_base *ev_base); /** * Remove all maps watched (remove events) diff --git a/src/smtp.c b/src/smtp.c index 5e25fb568..3c6726d97 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -684,6 +684,7 @@ accept_socket (gint fd, short what, void *arg) session->cfg = worker->srv->cfg; session->session_time = time (NULL); session->resolver = ctx->resolver; + session->ev_base = ctx->ev_base; worker->srv->stat->connections_count++; /* Resolve client's addr */ @@ -698,7 +699,7 @@ accept_socket (gint fd, short what, void *arg) return; } else { - session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, + session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_LINE, smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session); session->dispatcher->peer_addr = session->client_addr.s_addr; } @@ -963,8 +964,6 @@ config_smtp_worker (struct rspamd_worker *worker) if ((value = ctx->smtp_capabilities_str) != NULL) { make_capabilities (ctx, value); } - - ctx->resolver = dns_resolver_init (worker->srv->cfg); return TRUE; } @@ -977,11 +976,12 @@ void start_smtp_worker (struct rspamd_worker *worker) { struct sigaction signals; + struct smtp_worker_ctx *ctx = worker->ctx; gperf_profiler_init (worker->srv->cfg, "worker"); worker->srv->pid = getpid (); - event_init (); + ctx->ev_base = event_init (); /* Set smtp options */ if ( !config_smtp_worker (worker)) { @@ -994,19 +994,24 @@ start_smtp_worker (struct rspamd_worker *worker) /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); + event_base_set (ctx->ev_base, &worker->sig_ev); signal_add (&worker->sig_ev, NULL); /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_base_set (ctx->ev_base, &worker->bind_ev); event_add (&worker->bind_ev, NULL); /* Maps events */ - start_map_watch (); + start_map_watch (ctx->ev_base); + + /* DNS resolver */ + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); /* Set umask */ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); - event_loop (0); + event_base_loop (ctx->ev_base, 0); close_log (rspamd_main->logger); exit (EXIT_SUCCESS); diff --git a/src/smtp.h b/src/smtp.h index a5a9533bc..00d8c3abf 100644 --- a/src/smtp.h +++ b/src/smtp.h @@ -51,6 +51,7 @@ struct smtp_worker_ctx { gchar *metric; GList *smtp_filters[SMTP_STAGE_MAX]; struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; }; enum rspamd_smtp_state { @@ -110,6 +111,7 @@ struct smtp_session { gboolean resolved; gboolean esmtp; struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; }; typedef gboolean (*smtp_filter_t)(struct smtp_session *session, gpointer filter_data); diff --git a/src/smtp_utils.c b/src/smtp_utils.c index 1ed92ead6..c56397d17 100644 --- a/src/smtp_utils.c +++ b/src/smtp_utils.c @@ -89,7 +89,7 @@ create_smtp_upstream_connection (struct smtp_session *session) return FALSE; } /* Create a dispatcher for upstream connection */ - session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE, + session->upstream_dispatcher = rspamd_create_dispatcher (session->ev_base, session->upstream_sock, BUFFER_LINE, smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket, &session->ctx->smtp_timeout, session); session->state = SMTP_STATE_WAIT_UPSTREAM; diff --git a/src/statfile_sync.c b/src/statfile_sync.c index e96555c15..4595af85f 100644 --- a/src/statfile_sync.c +++ b/src/statfile_sync.c @@ -44,6 +44,7 @@ struct rspamd_sync_ctx { stat_file_t *real_statfile; statfile_pool_t *pool; rspamd_io_dispatcher_t *dispatcher; + struct event_base *ev_base; struct event tm_ev; @@ -268,7 +269,7 @@ sync_timer_callback (gint fd, short what, void *ud) } /* Now create and activate dispatcher */ msec_to_tv (ctx->timeout, &ctx->io_tv); - ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx); + ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx); ctx->state = SYNC_STATE_GREETING; ctx->is_busy = TRUE; @@ -278,7 +279,7 @@ sync_timer_callback (gint fd, short what, void *ud) } static gboolean -add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg) +add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg, struct event_base *ev_base) { struct rspamd_sync_ctx *ctx; guint32 jittered_interval; @@ -289,6 +290,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi ctx->st = st; ctx->timeout = cfg->statfile_sync_timeout; ctx->sync_interval = cfg->statfile_sync_interval; + ctx->ev_base = ev_base; /* Add some jittering for synchronization */ jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2); msec_to_tv (jittered_interval, &ctx->interval); @@ -305,6 +307,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi } /* Now plan event for it's future executing */ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); + event_base_set (ctx->ev_base, &ctx->tm_ev); evtimer_add (&ctx->tm_ev, &ctx->interval); log_next_sync (st->symbol, ctx->interval.tv_sec); } @@ -317,7 +320,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi } gboolean -start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg) +start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base) { GList *cur, *l; struct classifier_config *cl; @@ -334,7 +337,7 @@ start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg) while (l) { st = l->data; if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) { - if (!add_statfile_watch (pool, st, cfg)) { + if (!add_statfile_watch (pool, st, cfg, ev_base)) { return FALSE; } } diff --git a/src/statfile_sync.h b/src/statfile_sync.h index fcc305b55..b3abb8b91 100644 --- a/src/statfile_sync.h +++ b/src/statfile_sync.h @@ -9,6 +9,6 @@ /* * Start synchronization of statfiles. Must be called after event_init as it adds events */ -gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg); +gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base); #endif diff --git a/src/worker.c b/src/worker.c index fb67b119e..bd2c5de2e 100644 --- a/src/worker.c +++ b/src/worker.c @@ -91,6 +91,7 @@ struct rspamd_worker_ctx { guint32 tasks; /* Limit of tasks */ guint32 max_tasks; + struct event_base *ev_base; }; static gboolean write_socket (void *arg); @@ -477,9 +478,10 @@ accept_socket (gint fd, short what, void *arg) /* Set up dispatcher */ new_task->dispatcher = - rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, + rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *) new_task); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; + new_task->ev_base = ctx->ev_base; ctx->tasks ++; memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); @@ -638,18 +640,20 @@ start_worker (struct rspamd_worker *worker) worker->srv->pid = getpid (); - event_init (); + ctx->ev_base = event_init (); init_signals (&signals, sig_handler); sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev); signal_add (&worker->sig_ev, NULL); /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *) worker); + event_base_set (ctx->ev_base, &worker->bind_ev); event_add (&worker->bind_ev, NULL); @@ -665,14 +669,14 @@ start_worker (struct rspamd_worker *worker) else { #endif /* Maps events */ - start_map_watch (); + start_map_watch (ctx->ev_base); #ifndef BUILD_STATIC } #endif - ctx->resolver = dns_resolver_init (worker->srv->cfg); + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); - event_loop (0); + event_base_loop (ctx->ev_base, 0); #ifndef BUILD_STATIC if (ctx->is_custom) { -- 2.39.5