d->offset += off;
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
}
}
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
}
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
else {
}
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
}
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
else {
}
event_del (d->ev);
event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
return TRUE;
}
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
else {
/* Plan other write event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
}
if (d->out_buffers == NULL) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
if (d->is_restored && d->write_callback) {
if (!d->write_callback (d->user_data)) {
rspamd_io_dispatcher_t *
-rspamd_create_dispatcher (gint fd, enum io_policy policy,
+rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy,
dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
{
rspamd_io_dispatcher_t *new;
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
+ new->ev_base = base;
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
+ event_base_set (new->ev_base, new->ev);
event_add (new->ev, new->tv);
return new;
debug_ip ("restored dispatcher");
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d);
+ event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->is_restored = TRUE;
}
gboolean in_sendfile; /**< whether buffer is in sendfile mode */
gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */
gboolean is_restored; /**< call a callback when dispatcher is restored */
+ struct event_base *ev_base; /**< event base for io operations */
#ifndef HAVE_SENDFILE
void *map;
#endif
* @param user_data pointer to user's data
* @return new dispatcher object or NULL in case of failure
*/
-rspamd_io_dispatcher_t* rspamd_create_dispatcher (gint fd,
+rspamd_io_dispatcher_t* rspamd_create_dispatcher (struct event_base *base, gint fd,
enum io_policy policy,
dispatcher_read_callback_t read_cb,
dispatcher_write_callback_t write_cb,
char *password;
guint32 timeout;
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
static struct controller_command commands[] = {
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
task->msg->len = in->len;
task->resolver = session->resolver;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+ task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
new_session->state = STATE_COMMAND;
new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
new_session->resolver = ctx->resolver;
+ new_session->ev_base = ctx->ev_base;
worker->srv->stat->control_connections_count++;
/* Set up dispatcher */
new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
- new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+ new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
+ controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
+
if (su.ss.ss_family == AF_UNIX) {
msg_info ("accepted connection from unix socket");
new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
worker->srv->pid = getpid ();
ctx = worker->ctx;
- event_init ();
+ ctx->ev_base = event_init ();
g_mime_init (0);
init_signals (&signals, sig_handler);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
start_time = time (NULL);
/* Start statfile synchronization */
- if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg)) {
+ if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) {
msg_info ("cannot start statfile synchronization, statfiles would not be synchronized");
}
rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
- start_map_watch ();
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
+ start_map_watch (ctx->ev_base);
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
gperf_profiler_init (worker->srv->cfg, "controller");
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
close_log (worker->srv->logger);
if (r == -1) {
if (errno == EAGAIN) {
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
+ event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
return 0;
}
else if (r < req->pos) {
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
+ event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, FALSE);
return 0;
/* Init throttling timeout */
resolver->throttling = TRUE;
evtimer_set (&resolver->throttling_event, dns_throttling_cb, resolver);
+ event_base_set (resolver->ev_base, &resolver->throttling_event);
event_add (&resolver->throttling_event, &resolver->throttling_time);
}
}
/* Add timer event */
event_del (&req->timer_event);
evtimer_set (&req->timer_event, dns_timer_cb, req);
+ event_base_set (req->resolver->ev_base, &req->timer_event);
evtimer_add (&req->timer_event, &req->tv);
/* Add request to hash table */
/* Fill timeout */
msec_to_tv (resolver->request_timeout, &req->tv);
evtimer_set (&req->timer_event, dns_timer_cb, req);
+ event_base_set (req->resolver->ev_base, &req->timer_event);
/* Now send request to server */
r = send_dns_request (req);
}
struct rspamd_dns_resolver *
-dns_resolver_init (struct config_file *cfg)
+dns_resolver_init (struct event_base *ev_base, struct config_file *cfg)
{
GList *cur;
struct rspamd_dns_resolver *new;
struct rspamd_dns_server *serv;
new = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_dns_resolver));
+ new->ev_base = ev_base;
new->requests = g_hash_table_new (g_direct_hash, g_direct_equal);
new->permutor = memory_pool_alloc (cfg->cfg_pool, sizeof (struct dns_k_permutor));
dns_k_permutor_init (new->permutor, 0, G_MAXUINT16);
}
else {
event_set (&serv->ev, serv->sock, EV_READ | EV_PERSIST, dns_read_cb, new);
+ event_base_set (new->ev_base, &serv->ev);
event_add (&serv->ev, NULL);
}
}
guint errors; /**< resolver errors */
struct timeval throttling_time; /**< throttling time */
struct event throttling_event; /**< throttling event */
+ struct event_base *ev_base; /**< base for event ops */
};
struct dns_header;
/*
* Init DNS resolver, params are obtained from a config file or system file /etc/resolv.conf
*/
-struct rspamd_dns_resolver *dns_resolver_init (struct config_file *cfg);
+struct rspamd_dns_resolver *dns_resolver_init (struct event_base *ev_base, struct config_file *cfg);
/*
* Make a DNS request
/* Add destructor for recipients list (it would be better to use anonymous function here */
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task);
new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+ new_task->ev_base = worker->ctx;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results);
worker->srv->stat->connections_count++;
lmtp->task = new_task;
lmtp->state = LMTP_READ_LHLO;
/* Set up dispatcher */
- new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
+ new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base, nfd, BUFFER_LINE, lmtp_read_socket, lmtp_write_socket, lmtp_err_socket, &io_tv, (void *)lmtp);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
if (! rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
msg_warn ("cannot write greeting");
worker->srv->pid = getpid ();
worker->srv->type = TYPE_LMTP;
- event_init ();
+ worker->ctx = event_init ();
g_mime_init (0);
init_signals (&signals, sig_handler);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (worker->ctx, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (worker->ctx, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
/* Perform modules configuring */
gperf_profiler_init (worker->srv->cfg, "lmtp");
- event_loop (0);
+ event_base_loop (worker->ctx, 0);
exit (EXIT_SUCCESS);
}
cd = memory_pool_alloc (task->task_pool, sizeof (struct mta_callback_data));
cd->task = task;
cd->state = LMTP_WANT_GREETING;
- cd->dispatcher = rspamd_create_dispatcher (sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd);
+ cd->dispatcher = rspamd_create_dispatcher (task->ev_base, sock, BUFFER_LINE, mta_read_socket, NULL, mta_err_socket, NULL, (void *)cd);
return 0;
}
/* Create dispatcher for HTTP protocol */
msec_to_tv (ud->timeout, &tv);
- ud->io_dispatcher = rspamd_create_dispatcher (ud->fd, BUFFER_LINE, lua_http_read_cb, NULL, lua_http_err_cb,
- &tv, ud);
+ ud->io_dispatcher = rspamd_create_dispatcher (ud->task->ev_base, ud->fd, BUFFER_LINE, lua_http_read_cb, NULL,
+ lua_http_err_cb, &tv, ud);
/* Write request */
register_async_event (ud->task->s, lua_http_fin, ud, FALSE);
struct rspamd_async_session* s; /**< async session object */
struct worker_task *learn_task;
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
+ struct event_base *ev_base; /**< Event base */
};
typedef void (*controller_func_t)(gchar **args, struct controller_session *session);
guint32 dns_requests; /**< number of DNS requests per this task */
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
+ struct event_base *ev_base; /**< Event base */
};
/**
struct http_callback_data {
struct event ev;
+ struct event_base *ev_base;
struct timeval tv;
struct rspamd_map *map;
struct http_map_data *data;
write_http_request (cbd->map, cbd->data, fd);
/* Plan reading */
event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, http_async_callback, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
cbd->tv.tv_sec = HTTP_READ_TIMEOUT;
cbd->tv.tv_usec = 0;
cbd->state = 1;
else {
/* Plan event */
cbd = g_malloc (sizeof (struct http_callback_data));
+ cbd->ev_base = map->ev_base;
event_set (&cbd->ev, sock, EV_WRITE, http_async_callback, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
cbd->tv.tv_usec = 0;
cbd->map = map;
/* Start watching event for all maps */
void
-start_map_watch (void)
+start_map_watch (struct event_base *ev_base)
{
GList *cur = maps;
struct rspamd_map *map;
/* First of all do synced read of data */
while (cur) {
map = cur->data;
+ map->ev_base = ev_base;
if (map->protocol == PROTO_FILE) {
evtimer_set (&map->ev, file_callback, map);
+ event_base_set (map->ev_base, &map->ev);
/* Read initial data */
read_map_file (map, map->map_data);
/* Plan event with jitter */
}
else if (map->protocol == PROTO_HTTP) {
evtimer_set (&map->ev, http_callback, map);
+ event_base_set (map->ev_base, &map->ev);
/* Read initial data */
read_http_sync (map, map->map_data);
/* Plan event with jitter */
void **user_data;
struct event ev;
struct timeval tv;
+ struct event_base *ev_base;
void *map_data;
};
/**
* Start watching of maps by adding events to libevent event loop
*/
-void start_map_watch (void);
+void start_map_watch (struct event_base *ev_base);
/**
* Remove all maps watched (remove events)
session->cfg = worker->srv->cfg;
session->session_time = time (NULL);
session->resolver = ctx->resolver;
+ session->ev_base = ctx->ev_base;
worker->srv->stat->connections_count++;
/* Resolve client's addr */
return;
}
else {
- session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE,
+ session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_LINE,
smtp_read_socket, smtp_write_socket, smtp_err_socket, &session->ctx->smtp_timeout, session);
session->dispatcher->peer_addr = session->client_addr.s_addr;
}
if ((value = ctx->smtp_capabilities_str) != NULL) {
make_capabilities (ctx, value);
}
-
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
return TRUE;
}
start_smtp_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
+ struct smtp_worker_ctx *ctx = worker->ctx;
gperf_profiler_init (worker->srv->cfg, "worker");
worker->srv->pid = getpid ();
- event_init ();
+ ctx->ev_base = event_init ();
/* Set smtp options */
if ( !config_smtp_worker (worker)) {
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
/* Maps events */
- start_map_watch ();
+ start_map_watch (ctx->ev_base);
+
+ /* DNS resolver */
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
/* Set umask */
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
gchar *metric;
GList *smtp_filters[SMTP_STAGE_MAX];
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
enum rspamd_smtp_state {
gboolean resolved;
gboolean esmtp;
struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
};
typedef gboolean (*smtp_filter_t)(struct smtp_session *session, gpointer filter_data);
return FALSE;
}
/* Create a dispatcher for upstream connection */
- session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE,
+ session->upstream_dispatcher = rspamd_create_dispatcher (session->ev_base, session->upstream_sock, BUFFER_LINE,
smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket,
&session->ctx->smtp_timeout, session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
stat_file_t *real_statfile;
statfile_pool_t *pool;
rspamd_io_dispatcher_t *dispatcher;
+ struct event_base *ev_base;
struct event tm_ev;
}
/* Now create and activate dispatcher */
msec_to_tv (ctx->timeout, &ctx->io_tv);
- ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
+ ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
ctx->state = SYNC_STATE_GREETING;
ctx->is_busy = TRUE;
}
static gboolean
-add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg)
+add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg, struct event_base *ev_base)
{
struct rspamd_sync_ctx *ctx;
guint32 jittered_interval;
ctx->st = st;
ctx->timeout = cfg->statfile_sync_timeout;
ctx->sync_interval = cfg->statfile_sync_interval;
+ ctx->ev_base = ev_base;
/* Add some jittering for synchronization */
jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
msec_to_tv (jittered_interval, &ctx->interval);
}
/* Now plan event for it's future executing */
evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+ event_base_set (ctx->ev_base, &ctx->tm_ev);
evtimer_add (&ctx->tm_ev, &ctx->interval);
log_next_sync (st->symbol, ctx->interval.tv_sec);
}
}
gboolean
-start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
+start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base)
{
GList *cur, *l;
struct classifier_config *cl;
while (l) {
st = l->data;
if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
- if (!add_statfile_watch (pool, st, cfg)) {
+ if (!add_statfile_watch (pool, st, cfg, ev_base)) {
return FALSE;
}
}
/*
* Start synchronization of statfiles. Must be called after event_init as it adds events
*/
-gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg);
+gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base);
#endif
guint32 tasks;
/* Limit of tasks */
guint32 max_tasks;
+ struct event_base *ev_base;
};
static gboolean write_socket (void *arg);
/* Set up dispatcher */
new_task->dispatcher =
- rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket,
+ rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket,
err_socket, &ctx->io_tv, (void *) new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+ new_task->ev_base = ctx->ev_base;
ctx->tasks ++;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
worker->srv->pid = getpid ();
- event_init ();
+ ctx->ev_base = event_init ();
init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
accept_socket, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
else {
#endif
/* Maps events */
- start_map_watch ();
+ start_map_watch (ctx->ev_base);
#ifndef BUILD_STATIC
}
#endif
- ctx->resolver = dns_resolver_init (worker->srv->cfg);
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
#ifndef BUILD_STATIC
if (ctx->is_custom) {