#include "config.h"
#include "dns.h"
#include "main.h"
+#include "utlist.h"
#ifdef HAVE_OPENSSL
#include <openssl/rand.h>
#endif
send_dns_request (struct rspamd_dns_request *req)
{
gint r;
+ struct rspamd_dns_server *serv = req->io->srv;
r = send (req->sock, req->packet, req->pos, 0);
if (r == -1) {
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
- register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
+ register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event,
+ g_quark_from_static_string ("dns resolver"));
return 0;
}
else {
- msg_err ("send failed: %s for server %s", strerror (errno), req->server->name);
- upstream_fail (&req->server->up, req->time);
+ msg_err ("send failed: %s for server %s", strerror (errno), serv->name);
+ upstream_fail (&serv->up, req->time);
return -1;
}
}
event_set (&req->io_event, req->sock, EV_WRITE, dns_retransmit_handler, req);
event_base_set (req->resolver->ev_base, &req->io_event);
event_add (&req->io_event, &req->tv);
- register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event, g_quark_from_static_string ("dns resolver"));
+ register_async_event (req->session, (event_finalizer_t)event_del, &req->io_event,
+ g_quark_from_static_string ("dns resolver"));
return 0;
}
struct rspamd_dns_request *req = arg;
event_del (&req->timer_event);
- g_hash_table_remove (req->resolver->requests, &req->id);
+ g_hash_table_remove (req->io->requests, &req->id);
}
static guint8 *
}
static gboolean
-dns_parse_reply (guint8 *in, gint r, struct rspamd_dns_resolver *resolver,
+dns_parse_reply (gint sock, guint8 *in, gint r, struct rspamd_dns_resolver *resolver,
struct rspamd_dns_request **req_out, struct rspamd_dns_reply **_rep)
{
struct dns_header *header = (struct dns_header *)in;
struct rspamd_dns_request *req;
struct rspamd_dns_reply *rep;
+ struct rspamd_dns_io_channel *ioc;
union rspamd_reply_element *elt;
guint8 *pos;
guint16 id;
return FALSE;
}
+ /* Find io channel */
+ if ((ioc = g_hash_table_lookup (resolver->io_channels, GINT_TO_POINTER (sock))) == NULL) {
+ msg_err ("io channel is not found for this resolver: %d", sock);
+ return FALSE;
+ }
+
/* Now try to find corresponding request */
id = header->qid;
- if ((req = g_hash_table_lookup (resolver->requests, &id)) == NULL) {
+ if ((req = g_hash_table_lookup (ioc->requests, &id)) == NULL) {
/* No such requests found */
return FALSE;
}
* Now we have request and query data is now at the end of header, so compare
* request QR section and reply QR section
*/
- if ((pos = dns_request_reply_cmp (req, in + sizeof (struct dns_header), r - sizeof (struct dns_header))) == NULL) {
+ if ((pos = dns_request_reply_cmp (req, in + sizeof (struct dns_header),
+ r - sizeof (struct dns_header))) == NULL) {
return FALSE;
}
/*
/* First read packet from socket */
r = read (fd, in, sizeof (in));
if (r > (gint)(sizeof (struct dns_header) + sizeof (struct dns_query))) {
- if (dns_parse_reply (in, r, resolver, &req, &rep)) {
+ if (dns_parse_reply (fd, in, r, resolver, &req, &rep)) {
/* Decrease errors count */
if (rep->request->resolver->errors > 0) {
rep->request->resolver->errors --;
}
- upstream_ok (&rep->request->server->up, rep->request->time);
+ upstream_ok (&rep->request->io->srv->up, rep->request->time);
rep->request->func (rep, rep->request->arg);
remove_normal_event (req->session, dns_fin_cb, req);
}
{
struct rspamd_dns_request *req = arg;
struct rspamd_dns_reply *rep;
+ struct rspamd_dns_server *serv;
gint r;
/* Retransmit dns request */
req->retransmits ++;
+ serv = req->io->srv;
if (req->retransmits >= req->resolver->max_retransmits) {
msg_err ("maximum number of retransmits expired for resolving %s of type %s", req->requested_name, dns_strtype (req->type));
- rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
- rep->request = req;
- rep->code = DNS_RC_SERVFAIL;
- upstream_fail (&rep->request->server->up, rep->request->time);
dns_check_throttling (req->resolver);
req->resolver->errors ++;
-
- req->func (rep, req->arg);
- remove_normal_event (req->session, dns_fin_cb, req);
-
- return;
+ goto err;
}
/* Select other server */
if (req->resolver->is_master_slave) {
- req->server = (struct rspamd_dns_server *)get_upstream_master_slave (req->resolver->servers,
+ serv = (struct rspamd_dns_server *)get_upstream_master_slave (req->resolver->servers,
req->resolver->servers_num, sizeof (struct rspamd_dns_server),
req->time, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
}
else {
- req->server = (struct rspamd_dns_server *)get_upstream_round_robin (req->resolver->servers,
+ serv = (struct rspamd_dns_server *)get_upstream_round_robin (req->resolver->servers,
req->resolver->servers_num, sizeof (struct rspamd_dns_server),
req->time, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
}
- if (req->server == NULL) {
- rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
- rep->request = req;
- rep->code = DNS_RC_SERVFAIL;
-
- req->func (rep, req->arg);
- remove_normal_event (req->session, dns_fin_cb, req);
- return;
+ if (serv == NULL) {
+ goto err;
}
+
+ req->io = serv->cur_io_channel;
+ if (req->io == NULL) {
+ msg_err ("cannot find suitable io channel for the server %s", serv->name);
+ goto err;
+ }
+ serv->cur_io_channel = serv->cur_io_channel->next;
- if (req->server->sock == -1) {
- req->server->sock = make_universal_socket (req->server->name, dns_port, SOCK_DGRAM, TRUE, FALSE, FALSE);
+ if (req->io->sock == -1) {
+ req->io->sock = make_universal_socket (serv->name, dns_port, SOCK_DGRAM, TRUE, FALSE, FALSE);
}
- req->sock = req->server->sock;
+ req->sock = req->io->sock;
if (req->sock == -1) {
- rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
- rep->request = req;
- rep->code = DNS_RC_SERVFAIL;
- upstream_fail (&rep->request->server->up, rep->request->time);
-
- req->func (rep, req->arg);
- remove_normal_event (req->session, dns_fin_cb, req);
-
- return;
+ goto err;
}
/* Add other retransmit event */
r = send_dns_request (req);
if (r == -1) {
- rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
- rep->request = req;
- rep->code = DNS_RC_SERVFAIL;
- upstream_fail (&rep->request->server->up, rep->request->time);
- req->func (rep, req->arg);
- remove_normal_event (req->session, dns_fin_cb, req);
- return;
+ goto err;
}
evtimer_add (&req->timer_event, &req->tv);
+
+ return;
+err:
+ rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
+ rep->request = req;
+ rep->code = DNS_RC_SERVFAIL;
+ if (serv) {
+ upstream_fail (&serv->up, rep->request->time);
+ }
+ req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
+ return;
}
static void
{
struct rspamd_dns_request *req = arg;
struct rspamd_dns_reply *rep;
+ struct rspamd_dns_server *serv;
gint r;
remove_normal_event (req->session, (event_finalizer_t)event_del, &req->io_event);
+ serv = req->io->srv;
if (what == EV_WRITE) {
/* Retransmit dns request */
rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
- upstream_fail (&rep->request->server->up, rep->request->time);
+ upstream_fail (&serv->up, rep->request->time);
req->resolver->errors ++;
dns_check_throttling (req->resolver);
rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
- upstream_fail (&rep->request->server->up, rep->request->time);
+ upstream_fail (&serv->up, rep->request->time);
req->func (rep, req->arg);
}
evtimer_add (&req->timer_event, &req->tv);
/* Add request to hash table */
- g_hash_table_insert (req->resolver->requests, &req->id, req);
- register_async_event (req->session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
+ g_hash_table_insert (req->io->requests, &req->id, req);
+ register_async_event (req->session, (event_finalizer_t)dns_fin_cb,
+ req, g_quark_from_static_string ("dns resolver"));
}
}
}
{
va_list args;
struct rspamd_dns_request *req;
+ struct rspamd_dns_server *serv;
struct in_addr *addr;
const gchar *name, *service, *proto;
gint r;
+ const gint max_id_cycles = 32;
struct dns_header *header;
/* If no DNS servers defined silently return FALSE */
req->retransmits = 0;
req->time = time (NULL);
if (resolver->is_master_slave) {
- req->server = (struct rspamd_dns_server *)get_upstream_master_slave (resolver->servers,
+ serv = (struct rspamd_dns_server *)get_upstream_master_slave (resolver->servers,
resolver->servers_num, sizeof (struct rspamd_dns_server),
req->time, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
}
else {
- req->server = (struct rspamd_dns_server *)get_upstream_round_robin (resolver->servers,
+ serv = (struct rspamd_dns_server *)get_upstream_round_robin (resolver->servers,
resolver->servers_num, sizeof (struct rspamd_dns_server),
req->time, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
}
- if (req->server == NULL) {
+ if (serv == NULL) {
msg_err ("cannot find suitable server for request");
return FALSE;
}
- if (req->server->sock == -1) {
- req->server->sock = make_universal_socket (req->server->name, dns_port, SOCK_DGRAM, TRUE, FALSE, FALSE);
+ /* Now select IO channel */
+
+ req->io = serv->cur_io_channel;
+ if (req->io == NULL) {
+ msg_err ("cannot find suitable io channel for the server %s", serv->name);
+ return FALSE;
}
- req->sock = req->server->sock;
+ serv->cur_io_channel = serv->cur_io_channel->next;
+ req->sock = req->io->sock;
if (req->sock == -1) {
return FALSE;
evtimer_add (&req->timer_event, &req->tv);
/* Add request to hash table */
- while (g_hash_table_lookup (resolver->requests, &req->id)) {
+ r = 0;
+ while (g_hash_table_lookup (req->io->requests, &req->id)) {
/* Check for unique id */
header = (struct dns_header *)req->packet;
header->qid = dns_k_permutor_step (resolver->permutor);
req->id = header->qid;
+ if (++r > max_id_cycles) {
+ msg_err ("cannot generate new id for server %s", serv->name);
+ return FALSE;
+ }
}
- g_hash_table_insert (resolver->requests, &req->id, req);
- register_async_event (session, (event_finalizer_t)dns_fin_cb, req, g_quark_from_static_string ("dns resolver"));
+ g_hash_table_insert (req->io->requests, &req->id, req);
+ register_async_event (session, (event_finalizer_t)dns_fin_cb, req,
+ g_quark_from_static_string ("dns resolver"));
}
else if (r == -1) {
return FALSE;
GList *cur;
struct rspamd_dns_resolver *new;
gchar *begin, *p, *err, addr_holder[16];
- gint priority, i;
+ gint priority, i, j;
struct rspamd_dns_server *serv;
+ struct rspamd_dns_io_channel *ioc;
new = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_dns_resolver));
new->ev_base = ev_base;
- new->requests = g_hash_table_new (dns_id_hash, dns_id_equal);
new->permutor = memory_pool_alloc (cfg->cfg_pool, sizeof (struct dns_k_permutor));
dns_k_permutor_init (new->permutor, 0, G_MAXUINT16);
+ new->io_channels = g_hash_table_new (g_direct_hash, g_direct_equal);
new->static_pool = cfg->cfg_pool;
new->request_timeout = cfg->dns_timeout;
new->max_retransmits = cfg->dns_retransmits;
}
}
- /* Now init all servers */
+ /* Now init io channels to all servers */
for (i = 0; i < new->servers_num; i ++) {
serv = &new->servers[i];
- serv->sock = make_universal_socket (serv->name, dns_port, SOCK_DGRAM, TRUE, FALSE, FALSE);
- if (serv->sock == -1) {
- msg_warn ("cannot create socket to server %s", serv->name);
- }
- else {
- event_set (&serv->ev, serv->sock, EV_READ | EV_PERSIST, dns_read_cb, new);
- event_base_set (new->ev_base, &serv->ev);
- event_add (&serv->ev, NULL);
+ for (j = 0; j < (gint)cfg->dns_io_per_server; j ++) {
+ ioc = memory_pool_alloc (new->static_pool, sizeof (struct rspamd_dns_io_channel));
+ ioc->sock = make_universal_socket (serv->name, dns_port, SOCK_DGRAM, TRUE, FALSE, FALSE);
+ if (ioc->sock == -1) {
+ msg_warn ("cannot create socket to server %s", serv->name);
+ }
+ else {
+ ioc->requests = g_hash_table_new (dns_id_hash, dns_id_equal);
+ memory_pool_add_destructor (new->static_pool, (pool_destruct_func)g_hash_table_unref,
+ ioc->requests);
+ ioc->srv = serv;
+ ioc->resolver = new;
+ event_set (&ioc->ev, ioc->sock, EV_READ | EV_PERSIST, dns_read_cb, new);
+ event_base_set (new->ev_base, &ioc->ev);
+ event_add (&ioc->ev, NULL);
+ CDL_PREPEND (serv->io_channels, ioc);
+ serv->cur_io_channel = ioc;
+ g_hash_table_insert (new->io_channels, GINT_TO_POINTER (ioc->sock), ioc);
+ }
}
}