Browse Source

Fix simultaneous bind to ipv6 and ipv6 sockets.

tags/0.6.7
Vsevolod Stakhov 10 years ago
parent
commit
2883047899
8 changed files with 139 additions and 92 deletions
  1. 1
    0
      CMakeLists.txt
  2. 2
    0
      config.h.in
  3. 4
    14
      src/cfg_file.h
  4. 0
    2
      src/cfg_rcl.c
  5. 116
    64
      src/cfg_utils.c
  6. 9
    11
      src/main.c
  7. 2
    1
      src/settings.c
  8. 5
    0
      src/util.c

+ 1
- 0
CMakeLists.txt View File

@@ -790,6 +790,7 @@ CHECK_SYMBOL_EXISTS(MAP_SHARED sys/mman.h HAVE_MMAP_SHARED)
CHECK_SYMBOL_EXISTS(MAP_ANON sys/mman.h HAVE_MMAP_ANON)
CHECK_SYMBOL_EXISTS(MAP_NOCORE sys/mman.h HAVE_MMAP_NOCORE)
CHECK_SYMBOL_EXISTS(O_DIRECT fcntl.h HAVE_O_DIRECT)
CHECK_SYMBOL_EXISTS(IPV6_V6ONLY "sys/socket.h;netinet/in.h" HAVE_IPV6_V6ONLY)
CHECK_SYMBOL_EXISTS(posix_fadvise fcntl.h HAVE_FADVISE)
CHECK_SYMBOL_EXISTS(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
CHECK_SYMBOL_EXISTS(fallocate fcntl.h HAVE_FALLOCATE)

+ 2
- 0
config.h.in View File

@@ -207,6 +207,8 @@
#cmakedefine HAVE_READPASSPHRASE_H 1
#cmakedefine HAVE_TERMIOS_H 1

#cmakedefine HAVE_IPV6_V6ONLY 1

#if (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86))
/* Use murmur hash for UTHash for these platforms */
#define HASH_FUNCTION HASH_MUR

+ 4
- 14
src/cfg_file.h View File

@@ -15,10 +15,9 @@
#include "utlist.h"
#include "ucl.h"

#define DEFAULT_BIND_PORT 768
#define DEFAULT_CONTROL_PORT 7608
#define DEFAULT_LMTP_PORT 7609
#define MAX_MEMCACHED_SERVERS 48
#define DEFAULT_BIND_PORT 11333
#define DEFAULT_CONTROL_PORT 11334
#define MAX_MEMCACHED_SERVERS 4
#define DEFAULT_MEMCACHED_PORT 11211
/* Memcached timeouts */
#define DEFAULT_MEMCACHED_CONNECT_TIMEOUT 1000
@@ -27,14 +26,6 @@
#define DEFAULT_UPSTREAM_ERROR_TIME 10
#define DEFAULT_UPSTREAM_DEAD_TIME 300
#define DEFAULT_UPSTREAM_MAXERRORS 10
/* Statfile pool size, 50Mb */
#define DEFAULT_STATFILE_SIZE 52428800L

/* 1 worker by default */
#define DEFAULT_WORKERS_NUM 1

#define DEFAULT_SCORE 10.0
#define DEFAULT_REJECT_SCORE 999.0

struct expression;
struct tokenizer;
@@ -235,7 +226,7 @@ struct classifier_config {
struct rspamd_worker_bind_conf {
gchar *bind_host;
guint16 bind_port;
gboolean is_unix;
gint ai;
struct rspamd_worker_bind_conf *next;
};

@@ -308,7 +299,6 @@ struct config_file {
gboolean log_color; /**< output colors for console output */
gboolean log_extended; /**< log extended information */

gsize max_statfile_size; /**< maximum size for statfile */
guint32 statfile_sync_interval; /**< synchronization interval */
guint32 statfile_sync_timeout; /**< synchronization timeout */
gboolean mlock_statfile_pool; /**< use mlock (2) for locking statfiles */

+ 0
- 2
src/cfg_rcl.c View File

@@ -1023,8 +1023,6 @@ rspamd_rcl_config_init (void)
*/
sub = rspamd_rcl_add_section (&new, "options", rspamd_rcl_options_handler, UCL_OBJECT,
FALSE, TRUE);
rspamd_rcl_add_default_handler (sub, "statfile_pool_size", rspamd_rcl_parse_struct_integer,
G_STRUCT_OFFSET (struct config_file, max_statfile_size), RSPAMD_CL_FLAG_INT_SIZE);
rspamd_rcl_add_default_handler (sub, "cache_file", rspamd_rcl_parse_struct_string,
G_STRUCT_OFFSET (struct config_file, cache_filename), RSPAMD_CL_FLAG_STRING_PATH);
rspamd_rcl_add_default_handler (sub, "dns_nameserver", rspamd_rcl_parse_struct_string_list,

+ 116
- 64
src/cfg_utils.c View File

@@ -49,11 +49,13 @@ struct rspamd_ucl_map_cbdata {
static gchar* rspamd_ucl_read_cb (memory_pool_t * pool, gchar * chunk, gint len, struct map_cb_data *data);
static void rspamd_ucl_fin_cb (memory_pool_t * pool, struct map_cb_data *data);

gboolean
parse_host_port_priority (memory_pool_t *pool, const gchar *str, gchar **addr, guint16 *port, guint *priority)
static gboolean
parse_host_port_priority_strv (memory_pool_t *pool, gchar **tokens,
gchar **addr, guint16 *port, guint *priority, guint default_port)
{
gchar **tokens, *err_str, *cur_tok;
struct addrinfo hints, *res;
gchar *err_str, portbuf[8];
const gchar *cur_tok, *cur_port;
struct addrinfo hints, *res;
guint port_parsed, priority_parsed, saved_errno = errno;
gint r;
union {
@@ -61,70 +63,46 @@ parse_host_port_priority (memory_pool_t *pool, const gchar *str, gchar **addr, g
struct sockaddr_in6 v6;
} addr_holder;

tokens = g_strsplit_set (str, ":", 0);
if (!tokens || !tokens[0]) {
return FALSE;
}
/* Now try to parse host and write address to ina */
memset (&hints, 0, sizeof (hints));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Type of the socket */
hints.ai_flags = 0;
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;

if (strcmp (tokens[0], "*") == 0) {
/* XXX: actually we still cannot listen on multiply protocols */
if (pool != NULL) {
*addr = memory_pool_alloc (pool, INET_ADDRSTRLEN + 1);
}
rspamd_strlcpy (*addr, "0.0.0.0", INET_ADDRSTRLEN + 1);
goto port_parse;
}
else {
cur_tok = tokens[0];
}
hints.ai_flags = AI_NUMERICSERV;

if ((r = getaddrinfo (cur_tok, NULL, &hints, &res)) == 0) {
memcpy (&addr_holder, res->ai_addr, MIN (sizeof (addr_holder), res->ai_addrlen));
if (res->ai_family == AF_INET) {
if (pool != NULL) {
*addr = memory_pool_alloc (pool, INET_ADDRSTRLEN + 1);
}
inet_ntop (res->ai_family, &addr_holder.v4.sin_addr, *addr, INET_ADDRSTRLEN + 1);
}
else {
if (pool != NULL) {
*addr = memory_pool_alloc (pool, INET6_ADDRSTRLEN + 1);
}
inet_ntop (res->ai_family, &addr_holder.v6.sin6_addr, *addr, INET6_ADDRSTRLEN + 1);
}
freeaddrinfo (res);
cur_tok = tokens[0];

if (strcmp (cur_tok, "*v6") == 0) {
hints.ai_family = AF_INET6;
hints.ai_flags |= AI_PASSIVE;
cur_tok = NULL;
}
else if (strcmp (cur_tok, "*v4") == 0) {
hints.ai_family = AF_INET;
hints.ai_flags |= AI_PASSIVE;
cur_tok = NULL;
}
else {
msg_err ("address resolution for %s failed: %s", tokens[0], gai_strerror (r));
goto err;
hints.ai_family = AF_UNSPEC;
}

port_parse:
if (tokens[1] != NULL) {
/* Port part */
rspamd_strlcpy (portbuf, tokens[1], sizeof (portbuf));
cur_port = portbuf;
if (port != NULL) {
errno = 0;
port_parsed = strtoul (tokens[1], &err_str, 10);
if (*err_str != '\0' || errno != 0) {
msg_warn ("cannot parse port: %s, at symbol %c, error: %s", tokens[1], *err_str, strerror (errno));
goto err;
hints.ai_flags ^= AI_NUMERICSERV;
}
if (port_parsed > G_MAXUINT16) {
else if (port_parsed > G_MAXUINT16) {
errno = ERANGE;
msg_warn ("cannot parse port: %s, error: %s", tokens[1], *err_str, strerror (errno));
goto err;
hints.ai_flags ^= AI_NUMERICSERV;
}
else {
*port = port_parsed;
}
*port = port_parsed;
}
if (priority != NULL) {
if (port != NULL) {
@@ -139,24 +117,69 @@ port_parse:
priority_parsed = strtoul (cur_tok, &err_str, 10);
if (*err_str != '\0' || errno != 0) {
msg_warn ("cannot parse priority: %s, at symbol %c, error: %s", tokens[1], *err_str, strerror (errno));
goto err;
}
*priority = priority_parsed;
else {
*priority = priority_parsed;
}
}
}
}
else if (default_port != 0) {
rspamd_snprintf (portbuf, sizeof (portbuf), "%u", default_port);
cur_port = portbuf;
}
else {
cur_port = NULL;
}

if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) {
memcpy (&addr_holder, res->ai_addr, MIN (sizeof (addr_holder), res->ai_addrlen));
if (res->ai_family == AF_INET) {
if (pool != NULL) {
*addr = memory_pool_alloc (pool, INET_ADDRSTRLEN + 1);
}
inet_ntop (res->ai_family, &addr_holder.v4.sin_addr, *addr, INET_ADDRSTRLEN + 1);
}
else {
if (pool != NULL) {
*addr = memory_pool_alloc (pool, INET6_ADDRSTRLEN + 1);
}
inet_ntop (res->ai_family, &addr_holder.v6.sin6_addr, *addr, INET6_ADDRSTRLEN + 1);
}
freeaddrinfo (res);
}
else {
msg_err ("address resolution for %s failed: %s", tokens[0], gai_strerror (r));
goto err;
}

/* Restore errno */
errno = saved_errno;
g_strfreev (tokens);
return TRUE;

err:
errno = saved_errno;
g_strfreev (tokens);
return FALSE;
}

gboolean
parse_host_port_priority (memory_pool_t *pool, const gchar *str, gchar **addr, guint16 *port, guint *priority)
{
gchar **tokens;
gboolean ret;

tokens = g_strsplit_set (str, ":", 0);
if (!tokens || !tokens[0]) {
return FALSE;
}

ret = parse_host_port_priority_strv (pool, tokens, addr, port, priority, 0);

g_strfreev (tokens);

return ret;
}

gboolean
parse_host_port (memory_pool_t *pool, const gchar *str, gchar **addr, guint16 *port)
{
@@ -173,28 +196,58 @@ gboolean
parse_bind_line (struct config_file *cfg, struct worker_conf *cf, const gchar *str)
{
struct rspamd_worker_bind_conf *cnf;
gchar **tokens, *tmp;
gboolean ret;

if (str == NULL)
return 0;
if (str == NULL) {
return FALSE;
}

tokens = g_strsplit_set (str, ":", 0);
if (!tokens || !tokens[0]) {
return FALSE;
}

cnf = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_worker_bind_conf));
cnf->bind_port = DEFAULT_BIND_PORT;
cnf->bind_host = memory_pool_strdup (cfg->cfg_pool, str);
cnf->ai = AF_UNSPEC;

if (str[0] == '/' || str[0] == '.') {
cnf->bind_host = memory_pool_strdup (cfg->cfg_pool, str);
cnf->is_unix = TRUE;
if (*tokens[0] == '/' || *tokens[0] == '.') {
cnf->ai = AF_UNIX;
LL_PREPEND (cf->bind_conf, cnf);
return TRUE;
}
else {
else if (strcmp (tokens[0], "*") == 0) {
/* We need to add two listen entries: one for ipv4 and one for ipv6 */
tmp = tokens[0];
tokens[0] = "*v4";
cnf->ai = AF_INET;
if ((ret = parse_host_port_priority_strv (cfg->cfg_pool, tokens,
&cnf->bind_host, &cnf->bind_port, NULL, DEFAULT_BIND_PORT))) {
LL_PREPEND (cf->bind_conf, cnf);
}
cnf = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_worker_bind_conf));
cnf->bind_port = DEFAULT_BIND_PORT;
cnf->bind_host = memory_pool_strdup (cfg->cfg_pool, str);
if (parse_host_port (cfg->cfg_pool, str, &cnf->bind_host, &cnf->bind_port)) {
cnf->ai = AF_INET6;
tokens[0] = "*v6";
if ((ret &= parse_host_port_priority_strv (cfg->cfg_pool, tokens,
&cnf->bind_host, &cnf->bind_port, NULL, DEFAULT_BIND_PORT))) {
LL_PREPEND (cf->bind_conf, cnf);
}
tokens[0] = tmp;
}
else {
if ((ret = parse_host_port_priority_strv (cfg->cfg_pool, tokens,
&cnf->bind_host, &cnf->bind_port, NULL, DEFAULT_BIND_PORT))) {
LL_PREPEND (cf->bind_conf, cnf);
return TRUE;
}
}

return FALSE;
g_strfreev (tokens);

return ret;
}

void
@@ -220,7 +273,6 @@ init_defaults (struct config_file *cfg)
/* 20 Kb */
cfg->max_diff = 20480;

cfg->max_statfile_size = DEFAULT_STATFILE_SIZE;
cfg->metrics = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
cfg->c_modules = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
cfg->composite_symbols = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);

+ 9
- 11
src/main.c View File

@@ -527,7 +527,7 @@ delay_fork (struct worker_conf *cf)
}

static GList *
create_listen_socket (const gchar *addr, gint port, gint family, gint listen_type)
create_listen_socket (const gchar *addr, gint port, gint listen_type)
{
gint listen_sock = -1;
GList *result, *cur;
@@ -565,12 +565,13 @@ fork_delayed (struct rspamd_main *rspamd)
}

static inline uintptr_t
make_listen_key (const gchar *addr, gint port, gint family)
make_listen_key (gint ai, const gchar *addr, gint port)
{
uintptr_t res = 0;

res = murmur32_hash (addr, strlen (addr));
res ^= murmur32_hash ((guchar *)&port, sizeof (gint));
res += murmur32_hash ((guchar *)&ai, sizeof (gint));
res += murmur32_hash ((guchar *)&port, sizeof (gint));

return res;
}
@@ -580,7 +581,7 @@ spawn_workers (struct rspamd_main *rspamd)
{
GList *cur, *ls;
struct worker_conf *cf;
gint i;
gint i, key;
gpointer p;
struct rspamd_worker_bind_conf *bcf;

@@ -595,24 +596,21 @@ spawn_workers (struct rspamd_main *rspamd)
else {
if (cf->worker->has_socket) {
LL_FOREACH (cf->bind_conf, bcf) {
if ((p = g_hash_table_lookup (listen_sockets, GINT_TO_POINTER (
make_listen_key (bcf->bind_host, bcf->bind_port, bcf->is_unix)))) == NULL) {
key = make_listen_key (bcf->ai, bcf->bind_host, bcf->bind_port);
if ((p = g_hash_table_lookup (listen_sockets, GINT_TO_POINTER (key))) == NULL) {
/* Create listen socket */
ls = create_listen_socket (bcf->bind_host, bcf->bind_port,
bcf->is_unix ? AF_UNIX : AF_INET,
cf->worker->listen_type);
if (ls == NULL) {
exit (-errno);
}
g_hash_table_insert (listen_sockets, GINT_TO_POINTER (
make_listen_key (bcf->bind_host, bcf->bind_port, bcf->is_unix)),
ls);
g_hash_table_insert (listen_sockets, GINT_TO_POINTER (key), ls);
}
else {
/* We had socket for this type of worker */
ls = p;
}
cf->listen_socks = ls;
cf->listen_socks = g_list_concat (cf->listen_socks, ls);
}
}


+ 2
- 1
src/settings.c View File

@@ -474,7 +474,8 @@ check_metric_settings (struct metric_result *res, double *score, double *rscore)
double *sc, *rs;
struct metric *metric = res->metric;

*rscore = DEFAULT_REJECT_SCORE;
/* XXX: what the fuck is that? */
*rscore = 10.0;

if (us != NULL) {
if ((rs = g_hash_table_lookup (us->reject_scores, metric->name)) != NULL) {

+ 5
- 0
src/util.c View File

@@ -116,6 +116,11 @@ make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean

if (is_server) {
setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&on, sizeof (gint));
#ifdef HAVE_IPV6_V6ONLY
if (cur->ai_family == AF_INET6) {
setsockopt (fd, IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on, sizeof (gint));
}
#endif
r = bind (fd, cur->ai_addr, cur->ai_addrlen);
}
else {

Loading…
Cancel
Save