]> source.dussan.org Git - rspamd.git/commitdiff
* Use event_base thread safe API to allow parallelism based on threads
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 26 Oct 2011 17:13:19 +0000 (21:13 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 26 Oct 2011 17:13:19 +0000 (21:13 +0400)
17 files changed:
src/buffer.c
src/buffer.h
src/controller.c
src/dns.c
src/dns.h
src/lmtp.c
src/lmtp_proto.c
src/lua/lua_http.c
src/main.h
src/map.c
src/map.h
src/smtp.c
src/smtp.h
src/smtp_utils.c
src/statfile_sync.c
src/statfile_sync.h
src/worker.c

index a1f4bee9b08b1dd0b4df043b698f0ade147ef02d..c94d5f91c305cdc45da9475173742ea1c251e1d7 100644 (file)
@@ -66,6 +66,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                        d->offset += off;
                        event_del (d->ev);
                        event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_base_set (d->ev_base, d->ev);
                        event_add (d->ev, d->tv);
                }
        }
@@ -78,6 +79,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                }
                event_del (d->ev);
                event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
                d->in_sendfile = FALSE;
        }
@@ -98,6 +100,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                        /* Wait for other event */
                        event_del (d->ev);
                        event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_base_set (d->ev_base, d->ev);
                        event_add (d->ev, d->tv);
                }
        }
@@ -106,6 +109,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                /* Wait for other event */
                event_del (d->ev);
                event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
        }
        else {
@@ -117,6 +121,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                }
                event_del (d->ev);
                event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
                d->in_sendfile = FALSE;
        }
@@ -137,6 +142,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                        /* Wait for other event */
                        event_del (d->ev);
                        event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_base_set (d->ev_base, d->ev);
                        event_add (d->ev, d->tv);
                }
        }
@@ -146,6 +152,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                /* Wait for other event */
                event_del (d->ev);
                event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
        }
        else {
@@ -157,6 +164,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
                }
                event_del (d->ev);
                event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
                d->in_sendfile = FALSE;
        }
@@ -215,6 +223,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
                        /* Wait for other event */
                        event_del (d->ev);
                        event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_base_set (d->ev_base, d->ev);
                        event_add (d->ev, d->tv);
                        return TRUE;
                }
@@ -237,12 +246,14 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
 
                event_del (d->ev);
                event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
        }
        else {
                /* Plan other write event */
                event_del (d->ev);
                event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
        }
 
@@ -464,6 +475,7 @@ dispatcher_cb (gint fd, short what, void *arg)
                        if (d->out_buffers == NULL) {
                                event_del (d->ev);
                                event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+                               event_base_set (d->ev_base, d->ev);
                                event_add (d->ev, d->tv);
                                if (d->is_restored && d->write_callback) {
                                        if (!d->write_callback (d->user_data)) {
@@ -482,7 +494,7 @@ dispatcher_cb (gint fd, short what, void *arg)
 
 
 rspamd_io_dispatcher_t         *
-rspamd_create_dispatcher (gint fd, enum io_policy policy,
+rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy,
        dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
 {
        rspamd_io_dispatcher_t         *new;
@@ -513,8 +525,10 @@ rspamd_create_dispatcher (gint fd, enum io_policy policy,
 
        new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
        new->fd = fd;
+       new->ev_base = base;
 
        event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
+       event_base_set (new->ev_base, new->ev);
        event_add (new->ev, new->tv);
 
        return new;
@@ -647,6 +661,7 @@ rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
                debug_ip ("restored dispatcher");
                event_del (d->ev);
                event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d);
+               event_base_set (d->ev_base, d->ev);
                event_add (d->ev, d->tv);
                d->is_restored = TRUE;
        }
index 16394ae7d4c23b57f4740dad42b4b4b8ef8f6f72..51b321833c61a3b829a4e66afd154edc2abbfa30 100644 (file)
@@ -52,6 +52,7 @@ typedef struct rspamd_io_dispatcher_s {
        gboolean in_sendfile;                                                                                   /**< whether buffer is in sendfile mode */
        gboolean strip_eol;                                                                                             /**< strip or not line ends in BUFFER_LINE policy */
        gboolean is_restored;                                                                                   /**< call a callback when dispatcher is restored */
+       struct event_base *ev_base;                                                                             /**< event base for io operations */
 #ifndef HAVE_SENDFILE
        void *map;
 #endif
@@ -68,7 +69,7 @@ typedef struct rspamd_io_dispatcher_s {
  * @param user_data pointer to user's data
  * @return new dispatcher object or NULL in case of failure
  */
-rspamd_io_dispatcher_t* rspamd_create_dispatcher (gint fd, 
+rspamd_io_dispatcher_t* rspamd_create_dispatcher (struct event_base *base, gint fd,
                                                                                                  enum io_policy policy,
                                                                                                  dispatcher_read_callback_t read_cb,
                                                                                                  dispatcher_write_callback_t write_cb,
index cb0c3cd22fe65b58ea8f0fa6e6728a2b50129b05..07eeef0ff45b838ed00440e41613e3e56ab991e2 100644 (file)
@@ -77,6 +77,7 @@ struct rspamd_controller_ctx {
        char                                               *password;
        guint32                                                 timeout;
        struct rspamd_dns_resolver     *resolver;
+       struct event_base              *ev_base;
 };
 
 static struct controller_command commands[] = {
@@ -880,6 +881,7 @@ controller_read_socket (f_str_t * in, void *arg)
                task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
                task->msg->begin = in->begin;
                task->msg->len = in->len;
+               task->ev_base = session->ev_base;
 
                r = process_message (task);
                if (r == -1) {
@@ -925,6 +927,7 @@ controller_read_socket (f_str_t * in, void *arg)
                task->msg->len = in->len;
 
                task->resolver = session->resolver;
+               task->ev_base = session->ev_base;
 
                r = process_message (task);
                if (r == -1) {
@@ -983,6 +986,7 @@ controller_read_socket (f_str_t * in, void *arg)
                task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
                task->msg->begin = in->begin;
                task->msg->len = in->len;
+               task->ev_base = session->ev_base;
 
                r = process_message (task);
                if (r == -1) {
@@ -1166,6 +1170,7 @@ accept_socket (gint fd, short what, void *arg)
        new_session->state = STATE_COMMAND;
        new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
        new_session->resolver = ctx->resolver;
+       new_session->ev_base = ctx->ev_base;
        worker->srv->stat->control_connections_count++;
 
        /* Set up dispatcher */
@@ -1175,7 +1180,9 @@ accept_socket (gint fd, short what, void *arg)
 
        new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
 
-       new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+       new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
+                       controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+
        if (su.ss.ss_family == AF_UNIX) {
                msg_info ("accepted connection from unix socket");
                new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
@@ -1217,7 +1224,7 @@ start_controller (struct rspamd_worker *worker)
        worker->srv->pid = getpid ();
        ctx = worker->ctx;
 
-       event_init ();
+       ctx->ev_base = event_init ();
        g_mime_init (0);
 
        init_signals (&signals, sig_handler);
@@ -1225,13 +1232,14 @@ start_controller (struct rspamd_worker *worker)
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev);
        signal_add (&worker->sig_ev, NULL);
 
 
        start_time = time (NULL);
 
        /* Start statfile synchronization */
-       if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+       if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) {
                msg_info ("cannot start statfile synchronization, statfiles would not be synchronized");
        }
 
@@ -1243,14 +1251,15 @@ start_controller (struct rspamd_worker *worker)
        rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
        /* Accept event */
        event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_base_set (ctx->ev_base, &worker->bind_ev);
        event_add (&worker->bind_ev, NULL);
 
-       start_map_watch ();
-       ctx->resolver = dns_resolver_init (worker->srv->cfg);
+       start_map_watch (ctx->ev_base);
+       ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
        gperf_profiler_init (worker->srv->cfg, "controller");
 
-       event_loop (0);
+       event_base_loop (ctx->ev_base, 0);
 
        close_log (worker->srv->logger);
 
index 2e9fd34c45da144364ffa070fc35d8992101939e..c3dd87a3269465d8e3c17bbf61f97020e198d783 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -742,6 +742,7 @@ send_dns_request (struct rspamd_dns_request *req)
        if (r == -1) {
                if (errno == EAGAIN) {
                        event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
+                       event_base_set (req->resolver->ev_base, &req->io_event);
                        event_add (&req->io_event, &req->tv);
                        register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
                        return 0;
@@ -754,6 +755,7 @@ send_dns_request (struct rspamd_dns_request *req)
        }
        else if (r < req->pos) {
                event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
+               event_base_set (req->resolver->ev_base, &req->io_event);
                event_add (&req->io_event, &req->tv);
                register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
                return 0;
@@ -1190,6 +1192,7 @@ dns_check_throttling (struct rspamd_dns_resolver *resolver)
                /* Init throttling timeout */
                resolver->throttling = TRUE;
                evtimer_set (&resolver->throttling_event, dns_throttling_cb, resolver);
+               event_base_set (resolver->ev_base, &resolver->throttling_event);
                event_add (&resolver->throttling_event, &resolver->throttling_time);
        }
 }
@@ -1329,6 +1332,7 @@ dns_retransmit_handler (gint fd, short what, void *arg)
                        /* Add timer event */
                        event_del (&req->timer_event);
                        evtimer_set (&req->timer_event, dns_timer_cb, req);
+                       event_base_set (req->resolver->ev_base, &req->timer_event);
                        evtimer_add (&req->timer_event, &req->tv);
 
                        /* Add request to hash table */
@@ -1423,6 +1427,7 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
        /* Fill timeout */
        msec_to_tv (resolver->request_timeout, &req->tv);
        evtimer_set (&req->timer_event, dns_timer_cb, req);
+       event_base_set (req->resolver->ev_base, &req->timer_event);
        
        /* Now send request to server */
        r = send_dns_request (req);
@@ -1498,7 +1503,7 @@ parse_resolv_conf (struct rspamd_dns_resolver *resolver)
 }
 
 struct rspamd_dns_resolver *
-dns_resolver_init (struct config_file *cfg)
+dns_resolver_init (struct event_base *ev_base, struct config_file *cfg)
 {
        GList                          *cur;
        struct rspamd_dns_resolver     *new;
@@ -1507,6 +1512,7 @@ dns_resolver_init (struct config_file *cfg)
        struct rspamd_dns_server       *serv;
        
        new = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_dns_resolver));
+       new->ev_base = ev_base;
        new->requests = g_hash_table_new (g_direct_hash, g_direct_equal);
        new->permutor = memory_pool_alloc (cfg->cfg_pool, sizeof (struct dns_k_permutor));
        dns_k_permutor_init (new->permutor, 0, G_MAXUINT16);
@@ -1588,6 +1594,7 @@ dns_resolver_init (struct config_file *cfg)
                }
                else {
                        event_set (&serv->ev, serv->sock, EV_READ | EV_PERSIST, dns_read_cb, new);
+                       event_base_set (new->ev_base, &serv->ev);
                        event_add (&serv->ev, NULL);
                }
        }
index 526598e6abd67da2d4b0e473ea01c7a5d7aa9150..f174f5026703968c3040c678bb1c303a5f85291a 100644 (file)
--- a/src/dns.h
+++ b/src/dns.h
@@ -56,6 +56,7 @@ struct rspamd_dns_resolver {
        guint errors;                                           /**< resolver errors                                            */
        struct timeval throttling_time;         /**< throttling time                                            */
        struct event throttling_event;          /**< throttling event                                           */
+       struct event_base *ev_base;                     /**< base for event ops                                         */
 };
 
 struct dns_header;
@@ -231,7 +232,7 @@ struct dns_query {
 /*
  * Init DNS resolver, params are obtained from a config file or system file /etc/resolv.conf
  */
-struct rspamd_dns_resolver *dns_resolver_init (struct config_file *cfg);
+struct rspamd_dns_resolver *dns_resolver_init (struct event_base *ev_base, struct config_file *cfg);
 
 /*
  * Make a DNS request
index 043c78485e846a96d7ec72bdda63a9dbac0e127c..96dbb2bab419332fc5a7e2d54967dd7ec7ab02a1 100644 (file)
@@ -250,13 +250,14 @@ accept_socket (gint fd, short what, void *arg)
        /* Add destructor for recipients list (it would be better to use anonymous function here */
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task);
        new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+       new_task->ev_base = worker->ctx;
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results);
        worker->srv->stat->connections_count++;
        lmtp->task = new_task;
        lmtp->state = LMTP_READ_LHLO;
 
        /* Set up dispatcher */
-       new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
+       new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base, nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
        new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
        if (! rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
                msg_warn ("cannot write greeting");
@@ -276,7 +277,7 @@ start_lmtp_worker (struct rspamd_worker *worker)
 
        worker->srv->pid = getpid ();
        worker->srv->type = TYPE_LMTP;
-       event_init ();
+       worker->ctx = event_init ();
        g_mime_init (0);
 
        init_signals (&signals, sig_handler);
@@ -284,10 +285,12 @@ start_lmtp_worker (struct rspamd_worker *worker)
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+       event_base_set (worker->ctx, &worker->sig_ev);
        signal_add (&worker->sig_ev, NULL);
 
        /* Accept event */
        event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_base_set (worker->ctx, &worker->bind_ev);
        event_add (&worker->bind_ev, NULL);
 
        /* Perform modules configuring */
@@ -307,7 +310,7 @@ start_lmtp_worker (struct rspamd_worker *worker)
 
        gperf_profiler_init (worker->srv->cfg, "lmtp");
 
-       event_loop (0);
+       event_base_loop (worker->ctx, 0);
        exit (EXIT_SUCCESS);
 }
 
index 1ea64ddf4a2171f2e25f17757480e6990df13ad3..9711b2ca585f47c512f6f0899a1f423cd33f57cb 100644 (file)
@@ -464,7 +464,7 @@ lmtp_deliver_mta (struct worker_task *task)
        cd = memory_pool_alloc (task->task_pool, sizeof (struct mta_callback_data));
        cd->task = task;
        cd->state = LMTP_WANT_GREETING;
-       cd->dispatcher = rspamd_create_dispatcher (sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd);
+       cd->dispatcher = rspamd_create_dispatcher (task->ev_base, sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd);
        return 0;
 }
 
index 4942b45b4d09629c61db73f20bb338ecd62773d1..3764c96ccbeaf5f418ad593cfbea964f6645f72d 100644 (file)
@@ -305,8 +305,8 @@ lua_http_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
 
        /* Create dispatcher for HTTP protocol */
        msec_to_tv (ud->timeout, &tv);
-       ud->io_dispatcher = rspamd_create_dispatcher (ud->fd, BUFFER_LINE, lua_http_read_cb, NULL, lua_http_err_cb,
-                       &tv, ud);
+       ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL,
+                       lua_http_err_cb, &tv, ud);
        /* Write request */
        register_async_event (ud->task->s, lua_http_fin, ud, FALSE);
 
index f979bf69a8b95d6c4568284b930faf4ac18c1e55..fa72c7794062cddc4aef1571c8d8a438489c6602 100644 (file)
@@ -165,6 +165,7 @@ struct controller_session {
        struct rspamd_async_session* s;                                                         /**< async session object                                                       */
        struct worker_task *learn_task;
        struct rspamd_dns_resolver *resolver;                                           /**< DNS resolver                                                                       */
+       struct event_base *ev_base;                                                                     /**< Event base                                                                         */
 };
 
 typedef void (*controller_func_t)(gchar **args, struct controller_session *session);
@@ -247,6 +248,7 @@ struct worker_task {
        guint32 dns_requests;                                                                           /**< number of DNS requests per this task                       */
 
        struct rspamd_dns_resolver *resolver;                                           /**< DNS resolver                                                                       */
+       struct event_base *ev_base;                                                                     /**< Event base                                                                         */
 };
 
 /**
index b37082bd04e75830d57efa575d540f37474c2094..c82bd1ceba643fbcebc7815509de5c3d650e2112 100644 (file)
--- a/src/map.c
+++ b/src/map.c
@@ -46,6 +46,7 @@ struct http_reply {
 
 struct http_callback_data {
        struct event                    ev;
+       struct event_base                          *ev_base;
        struct timeval                  tv;
        struct rspamd_map              *map;
        struct http_map_data           *data;
@@ -912,6 +913,7 @@ http_async_callback (gint fd, short what, void *ud)
                        write_http_request (cbd->map, cbd->data, fd);
                        /* Plan reading */
                        event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
+                       event_base_set (cbd->ev_base, &cbd->ev);
                        cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
                        cbd->tv.tv_usec = 0;
                        cbd->state = 1;
@@ -997,7 +999,9 @@ http_callback (gint fd, short what, void *ud)
        else {
                /* Plan event */
                cbd = g_malloc (sizeof (struct http_callback_data));
+               cbd->ev_base = map->ev_base;
                event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
+               event_base_set (cbd->ev_base, &cbd->ev);
                cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
                cbd->tv.tv_usec = 0;
                cbd->map = map;
@@ -1011,7 +1015,7 @@ http_callback (gint fd, short what, void *ud)
 
 /* Start watching event for all maps */
 void
-start_map_watch (void)
+start_map_watch (struct event_base *ev_base)
 {
        GList                          *cur = maps;
        struct rspamd_map              *map;
@@ -1019,8 +1023,10 @@ start_map_watch (void)
        /* First of all do synced read of data */
        while (cur) {
                map = cur->data;
+               map->ev_base = ev_base;
                if (map->protocol == PROTO_FILE) {
                        evtimer_set (&map->ev, file_callback, map);
+                       event_base_set (map->ev_base, &map->ev);
                        /* Read initial data */
                        read_map_file (map, map->map_data);
                        /* Plan event with jitter */
@@ -1030,6 +1036,7 @@ start_map_watch (void)
                }
                else if (map->protocol == PROTO_HTTP) {
                        evtimer_set (&map->ev, http_callback, map);
+                       event_base_set (map->ev_base, &map->ev);
                        /* Read initial data */
                        read_http_sync (map, map->map_data);
                        /* Plan event with jitter */
index 23e38a786771c7b0dcab74e63295d456cab83e8a..f4aae10ceeb6bd5567c1b2446bc86d3e19f532cb 100644 (file)
--- a/src/map.h
+++ b/src/map.h
@@ -66,6 +66,7 @@ struct rspamd_map {
        void **user_data;
        struct event ev;
        struct timeval tv;
+       struct event_base *ev_base;
        void *map_data;
 };
 
@@ -81,7 +82,7 @@ gboolean add_map (const gchar *map_line, map_cb_t read_callback, map_fin_cb_t fi
 /**
  * Start watching of maps by adding events to libevent event loop
  */
-void start_map_watch (void);
+void start_map_watch (struct event_base *ev_base);
 
 /**
  * Remove all maps watched (remove events)
index 5e25fb5683cbbc1c2b5635d2de1e91872195020a..3c6726d97627a96c043b1814aa08b6405eb6d9e6 100644 (file)
@@ -684,6 +684,7 @@ accept_socket (gint fd, short what, void *arg)
        session->cfg = worker->srv->cfg;
        session->session_time = time (NULL);
        session->resolver = ctx->resolver;
+       session->ev_base = ctx->ev_base;
        worker->srv->stat->connections_count++;
 
        /* Resolve client's addr */
@@ -698,7 +699,7 @@ accept_socket (gint fd, short what, void *arg)
                return;
        }
        else {
-               session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, 
+               session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_LINE,
                                                                smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session);
                session->dispatcher->peer_addr = session->client_addr.s_addr;
        }
@@ -963,8 +964,6 @@ config_smtp_worker (struct rspamd_worker *worker)
        if ((value = ctx->smtp_capabilities_str) != NULL) {
                make_capabilities (ctx, value);
        }
-
-       ctx->resolver = dns_resolver_init (worker->srv->cfg);
        
        return TRUE;
 }
@@ -977,11 +976,12 @@ void
 start_smtp_worker (struct rspamd_worker *worker)
 {
        struct sigaction                signals;
+       struct smtp_worker_ctx         *ctx = worker->ctx;
 
        gperf_profiler_init (worker->srv->cfg, "worker");
 
        worker->srv->pid = getpid ();
-       event_init ();
+       ctx->ev_base = event_init ();
 
        /* Set smtp options */
        if ( !config_smtp_worker (worker)) {
@@ -994,19 +994,24 @@ start_smtp_worker (struct rspamd_worker *worker)
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev);
        signal_add (&worker->sig_ev, NULL);
 
        /* Accept event */
        event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_base_set (ctx->ev_base, &worker->bind_ev);
        event_add (&worker->bind_ev, NULL);
 
        /* Maps events */
-       start_map_watch ();
+       start_map_watch (ctx->ev_base);
+
+       /* DNS resolver */
+       ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
        /* Set umask */
        umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
 
-       event_loop (0);
+       event_base_loop (ctx->ev_base, 0);
        
        close_log (rspamd_main->logger);
        exit (EXIT_SUCCESS);
index a5a9533bc9e5ff244facfd9cb3c3f9e2be38baad..00d8c3abfac32a635feabfc088218949ecde9820 100644 (file)
@@ -51,6 +51,7 @@ struct smtp_worker_ctx {
        gchar *metric;
        GList *smtp_filters[SMTP_STAGE_MAX];
        struct rspamd_dns_resolver *resolver;
+       struct event_base *ev_base;
 };
 
 enum rspamd_smtp_state {
@@ -110,6 +111,7 @@ struct smtp_session {
        gboolean resolved;
        gboolean esmtp;
        struct rspamd_dns_resolver *resolver;
+       struct event_base *ev_base;
 };
 
 typedef gboolean (*smtp_filter_t)(struct smtp_session *session, gpointer filter_data);
index 1ed92ead6550f3b5010229863fd96e2c72385847..c56397d171f9ad6d133385094f2813ae2bf7bf66 100644 (file)
@@ -89,7 +89,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
                return FALSE;
        }
        /* Create a dispatcher for upstream connection */
-       session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE,
+       session->upstream_dispatcher = rspamd_create_dispatcher (session->ev_base, session->upstream_sock, BUFFER_LINE,
                                                        smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket,
                                                        &session->ctx->smtp_timeout, session);
        session->state = SMTP_STATE_WAIT_UPSTREAM;
index e96555c159c44535cab7259af43638d3df6c4657..4595af85f1dddba0ea01ce1505b37d5da392aa0a 100644 (file)
@@ -44,6 +44,7 @@ struct rspamd_sync_ctx {
        stat_file_t *real_statfile;
        statfile_pool_t *pool;
        rspamd_io_dispatcher_t *dispatcher;
+       struct event_base *ev_base;
 
        struct event tm_ev;
 
@@ -268,7 +269,7 @@ sync_timer_callback (gint fd, short what, void *ud)
        }
        /* Now create and activate dispatcher */
        msec_to_tv (ctx->timeout, &ctx->io_tv);
-       ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
+       ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
        
        ctx->state = SYNC_STATE_GREETING;
        ctx->is_busy = TRUE;
@@ -278,7 +279,7 @@ sync_timer_callback (gint fd, short what, void *ud)
 }
 
 static gboolean
-add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg)
+add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg, struct event_base *ev_base)
 {
        struct rspamd_sync_ctx *ctx;
        guint32 jittered_interval;
@@ -289,6 +290,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi
                ctx->st = st;
                ctx->timeout = cfg->statfile_sync_timeout;
                ctx->sync_interval = cfg->statfile_sync_interval;
+               ctx->ev_base = ev_base;
                /* Add some jittering for synchronization */
                jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
                msec_to_tv (jittered_interval, &ctx->interval);
@@ -305,6 +307,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi
                }
                /* Now plan event for it's future executing */
                evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+               event_base_set (ctx->ev_base, &ctx->tm_ev);
                evtimer_add (&ctx->tm_ev, &ctx->interval);
                log_next_sync (st->symbol, ctx->interval.tv_sec);
        }
@@ -317,7 +320,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_fi
 }
 
 gboolean 
-start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
+start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base)
 {
        GList *cur, *l;
        struct classifier_config *cl;
@@ -334,7 +337,7 @@ start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
                while (l) {
                        st = l->data;
                        if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
-                               if (!add_statfile_watch (pool, st, cfg)) {
+                               if (!add_statfile_watch (pool, st, cfg, ev_base)) {
                                        return FALSE;
                                }
                        }
index fcc305b55980a4020a755ed21eba660dbc2b3aa5..b3abb8b9166e67ec62b9b3a7933e494434e76f62 100644 (file)
@@ -9,6 +9,6 @@
 /*
  * Start synchronization of statfiles. Must be called after event_init as it adds events
  */
-gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg);
+gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base);
 
 #endif
index fb67b119e5ed5af26d2baf2055b9e9915fbf71ff..bd2c5de2e0cc59d2f22c4b5b94a7dbdab0028570 100644 (file)
@@ -91,6 +91,7 @@ struct rspamd_worker_ctx {
        guint32                         tasks;
        /* Limit of tasks */
        guint32                         max_tasks;
+       struct event_base              *ev_base;
 };
 
 static gboolean                 write_socket (void *arg);
@@ -477,9 +478,10 @@ accept_socket (gint fd, short what, void *arg)
 
        /* Set up dispatcher */
        new_task->dispatcher =
-                       rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket,
+                       rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket,
                                        err_socket, &ctx->io_tv, (void *) new_task);
        new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+       new_task->ev_base = ctx->ev_base;
        ctx->tasks ++;
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
 
@@ -638,18 +640,20 @@ start_worker (struct rspamd_worker *worker)
 
        worker->srv->pid = getpid ();
 
-       event_init ();
+       ctx->ev_base = event_init ();
 
        init_signals (&signals, sig_handler);
        sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev);
        signal_add (&worker->sig_ev, NULL);
 
        /* Accept event */
        event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
                        accept_socket, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->bind_ev);
        event_add (&worker->bind_ev, NULL);
 
 
@@ -665,14 +669,14 @@ start_worker (struct rspamd_worker *worker)
        else {
 #endif
                /* Maps events */
-               start_map_watch ();
+               start_map_watch (ctx->ev_base);
 #ifndef BUILD_STATIC
        }
 #endif
 
-       ctx->resolver = dns_resolver_init (worker->srv->cfg);
+       ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
-       event_loop (0);
+       event_base_loop (ctx->ev_base, 0);
 
 #ifndef BUILD_STATIC
        if (ctx->is_custom) {