Browse Source

[Feature] Rework librdns to interact with external upstreams

tags/1.2.0
Vsevolod Stakhov 8 years ago
parent
commit
844e34179a
3 changed files with 121 additions and 28 deletions
  1. 2
    1
      contrib/librdns/dns_private.h
  2. 28
    4
      contrib/librdns/rdns.h
  3. 91
    23
      contrib/librdns/resolver.c

+ 2
- 1
contrib/librdns/dns_private.h View File

@@ -51,6 +51,7 @@ struct rdns_server {
unsigned int io_cnt;

struct rdns_io_channel **io_channels;
void *ups_elt;
upstream_entry_t up;
};

@@ -112,7 +113,7 @@ struct rdns_resolver {
struct rdns_io_channel *io_channels; /**< hash of io chains indexed by socket */
struct rdns_async_context *async; /** async callbacks */
void *periodic; /** periodic event for resolver */
struct rdns_upstream_context *ups;
struct rdns_plugin *curve_plugin;

rdns_log_function logger;

+ 28
- 4
contrib/librdns/rdns.h View File

@@ -126,7 +126,7 @@ enum dns_rcode {
RDNS_RC_NETERR = 12,
RDNS_RC_NOREC = 13
};
struct rdns_reply {
struct rdns_request *request;
struct rdns_resolver *resolver;
@@ -152,11 +152,26 @@ struct rdns_async_context {
void (*cleanup)(void *priv_data);
};

struct rdns_upstream_elt {
void *server;
void *lib_data;
};

struct rdns_upstream_context {
void *data;
struct rdns_upstream_elt* (*select)(const char *name,
size_t len, void *ups_data);
struct rdns_upstream_elt* (*select_retransmit)(const char *name,
size_t len, void *ups_data);
void (*ok)(struct rdns_upstream_elt *elt, void *ups_data);
void (*fail)(struct rdns_upstream_elt *elt, void *ups_data);
};

/**
* Type of rdns plugin
*/
enum rdns_plugin_type {
RDNS_PLUGIN_CURVE = 0//!< use the specified plugin instead of send/recv functions
RDNS_PLUGIN_CURVE = 0
};

typedef ssize_t (*rdns_network_send_callback) (struct rdns_request *req, void *plugin_data);
@@ -226,9 +241,9 @@ void rdns_resolver_async_bind (struct rdns_resolver *resolver,
* @param name name of DNS server (should be ipv4 or ipv6 address)
* @param priority priority (can be 0 for fair round-robin)
* @param io_cnt a number of sockets that are simultaneously opened to this server
* @return true if a server has been added to resolver
* @return opaque pointer that could be used to select upstream
*/
bool rdns_resolver_add_server (struct rdns_resolver *resolver,
void* rdns_resolver_add_server (struct rdns_resolver *resolver,
const char *name, unsigned int port,
int priority, unsigned int io_cnt);

@@ -259,6 +274,15 @@ void rdns_resolver_set_logger (struct rdns_resolver *resolver,
void rdns_resolver_set_log_level (struct rdns_resolver *resolver,
enum rdns_log_level level);

/**
* Set upstream library for selecting DNS upstreams
* @param resolver resolver object
* @param ups_ctx upstream functions
* @param ups_data opaque data
*/
void rdns_resolver_set_upstream_lib (struct rdns_resolver *resolver,
struct rdns_upstream_context *ups_ctx,
void *ups_data);

/**
* Set maximum number of dns requests to be sent to a socket to be refreshed

+ 91
- 23
contrib/librdns/resolver.c View File

@@ -87,13 +87,13 @@ rdns_send_request (struct rdns_request *req, int fd, bool new_req)
* should take care about events processing
*/
return 0;
}
}
else {
rdns_debug ("send failed: %s for server %s", strerror (errno), serv->name);
return -1;
}
}
if (new_req) {
/* Add request to hash table */
HASH_ADD_INT (req->io->requests, id, req);
@@ -131,7 +131,7 @@ rdns_find_dns_request (uint8_t *in, struct rdns_io_channel *ioc)
struct rdns_request *req;
int id;
struct rdns_resolver *resolver = ioc->resolver;
id = header->qid;
HASH_FIND_INT (ioc->requests, &id, req);
if (req == NULL) {
@@ -170,7 +170,7 @@ rdns_parse_reply (uint8_t *in, int r, struct rdns_request *req,
return false;
}

/*
/*
* Now we have request and query data is now at the end of header, so compare
* request QR section and reply QR section
*/
@@ -221,7 +221,7 @@ rdns_parse_reply (uint8_t *in, int r, struct rdns_request *req,
}
}
}
if (!found && type != RDNS_REQUEST_ANY) {
/* We have not found the requested RR type */
rep->code = RDNS_RC_NOREC;
@@ -251,7 +251,7 @@ rdns_process_read (int fd, void *arg)
uint8_t in[UDP_PACKET_SIZE];

resolver = ioc->resolver;
/* First read packet from socket */
if (resolver->curve_plugin == NULL) {
r = read (fd, in, sizeof (in));
@@ -271,6 +271,12 @@ rdns_process_read (int fd, void *arg)
if (req != NULL) {
if (rdns_parse_reply (in, r, req, &rep)) {
UPSTREAM_OK (req->io->srv);

if (req->resolver->ups && req->io->srv->ups_elt) {
req->resolver->ups->ok (req->io->srv->ups_elt,
req->resolver->ups->data);
}

req->state = RDNS_REQUEST_REPLIED;
rdns_request_unschedule (req);
req->func (rep, req->arg);
@@ -297,7 +303,14 @@ rdns_process_timer (void *arg)
resolver = req->resolver;

if (req->retransmits == 0) {
UPSTREAM_FAIL (req->io->srv, time (NULL));
if (req->resolver->ups && req->io->srv->ups_elt) {
req->resolver->ups->fail (req->io->srv->ups_elt,
req->resolver->ups->data);
}
else {
UPSTREAM_FAIL (req->io->srv, time (NULL));
}

rep = rdns_make_reply (req, RDNS_RC_TIMEOUT);
req->state = RDNS_REQUEST_REPLIED;
rdns_request_unschedule (req);
@@ -307,13 +320,29 @@ rdns_process_timer (void *arg)
return;
}

if (!req->io->active) {
if (!req->io->active || req->retransmits == 1) {
/* Do not reschedule IO requests on inactive sockets */
rdns_debug ("reschedule request with id: %d", (int)req->id);
rdns_request_unschedule (req);
REF_RELEASE (req->io);

UPSTREAM_SELECT_ROUND_ROBIN (resolver->servers, serv);
if (resolver->ups) {
struct rdns_upstream_elt *elt;

elt = resolver->ups->select_retransmit (req->requested_names[0].name,
req->requested_names[0].len, resolver->ups->data);

if (elt) {
serv = elt->server;
serv->ups_elt = elt;
}
else {
UPSTREAM_SELECT_ROUND_ROBIN (resolver->servers, serv);
}
}
else {
UPSTREAM_SELECT_ROUND_ROBIN (resolver->servers, serv);
}

if (serv == NULL) {
rdns_warn ("cannot find suitable server for request");
@@ -340,7 +369,14 @@ rdns_process_timer (void *arg)
req->state = RDNS_REQUEST_REGISTERED;
}
else if (r == -1) {
UPSTREAM_FAIL (req->io->srv, time (NULL));
if (req->resolver->ups && req->io->srv->ups_elt) {
req->resolver->ups->fail (req->io->srv->ups_elt,
req->resolver->ups->data);
}
else {
UPSTREAM_FAIL (req->io->srv, time (NULL));
}

rep = rdns_make_reply (req, RDNS_RC_NETERR);
req->state = RDNS_REQUEST_REPLIED;
rdns_request_unschedule (req);
@@ -426,7 +462,14 @@ rdns_process_retransmit (int fd, void *arg)
req->state = RDNS_REQUEST_REGISTERED;
}
else if (r == -1) {
UPSTREAM_FAIL (req->io->srv, time (NULL));
if (req->resolver->ups && req->io->srv->ups_elt) {
req->resolver->ups->fail (req->io->srv->ups_elt,
req->resolver->ups->data);
}
else {
UPSTREAM_FAIL (req->io->srv, time (NULL));
}

rep = rdns_make_reply (req, RDNS_RC_NETERR);
req->state = RDNS_REQUEST_REPLIED;
req->func (rep, req->arg);
@@ -487,7 +530,7 @@ rdns_make_request_full (
req->curve_plugin_data = NULL;
#endif
REF_INIT_RETAIN (req, rdns_request_free);
/* Calculate packet's total length based on records count */
va_start (args, queries);
for (i = 0; i < queries * 2; i += 2) {
@@ -554,18 +597,34 @@ rdns_make_request_full (
req->state = RDNS_REQUEST_NEW;
req->async = resolver->async;

UPSTREAM_SELECT_ROUND_ROBIN (resolver->servers, serv);
if (resolver->ups) {
struct rdns_upstream_elt *elt;

elt = resolver->ups->select (req->requested_names[0].name,
req->requested_names[0].len, resolver->ups->data);

if (elt) {
serv = elt->server;
serv->ups_elt = elt;
}
else {
UPSTREAM_SELECT_ROUND_ROBIN (resolver->servers, serv);
}
}
else {
UPSTREAM_SELECT_ROUND_ROBIN (resolver->servers, serv);
}

if (serv == NULL) {
rdns_warn ("cannot find suitable server for request");
REF_RELEASE (req);
return NULL;
}
/* Select random IO channel */
req->io = serv->io_channels[ottery_rand_uint32 () % serv->io_cnt];
req->io->uses ++;
/* Now send request to server */
r = rdns_send_request (req, req->io->sock, true);

@@ -590,7 +649,7 @@ rdns_resolver_init (struct rdns_resolver *resolver)
if (!resolver->async_binded) {
return false;
}
if (resolver->servers == NULL) {
return false;
}
@@ -644,7 +703,7 @@ rdns_resolver_register_plugin (struct rdns_resolver *resolver,
}
}

bool
void *
rdns_resolver_add_server (struct rdns_resolver *resolver,
const char *name, unsigned int port,
int priority, unsigned int io_cnt)
@@ -658,24 +717,24 @@ rdns_resolver_add_server (struct rdns_resolver *resolver,
if (inet_pton (AF_INET, name, &addr) == 0 &&
inet_pton (AF_INET6, name, &addr) == 0) {
/* Invalid IP */
return false;
return NULL;
}

if (io_cnt == 0) {
return false;
return NULL;
}
if (port == 0 || port > UINT16_MAX) {
return false;
return NULL;
}

serv = calloc (1, sizeof (struct rdns_server));
if (serv == NULL) {
return false;
return NULL;
}
serv->name = strdup (name);
if (serv->name == NULL) {
free (serv);
return false;
return NULL;
}

serv->io_cnt = io_cnt;
@@ -683,7 +742,7 @@ rdns_resolver_add_server (struct rdns_resolver *resolver,

UPSTREAM_ADD (resolver->servers, serv, priority);

return true;
return serv;
}

void
@@ -701,6 +760,15 @@ rdns_resolver_set_log_level (struct rdns_resolver *resolver,
resolver->log_level = level;
}

void
rdns_resolver_set_upstream_lib (struct rdns_resolver *resolver,
struct rdns_upstream_context *ups_ctx,
void *ups_data)
{
resolver->ups = ups_ctx;
resolver->ups->data = ups_data;
}


void
rdns_resolver_set_max_io_uses (struct rdns_resolver *resolver,

Loading…
Cancel
Save