};
struct rspamd_worker_bind_conf {
- gchar *bind_host;
- guint16 bind_port;
- gint ai;
+ rspamd_inet_addr_t *addrs;
+ guint cnt;
+ gchar *name;
gboolean is_systemd;
struct rspamd_worker_bind_conf *next;
};
const gchar *str)
{
struct rspamd_worker_bind_conf *cnf;
- gchar **tokens, *tmp, *err;
- gboolean ret = TRUE, rc;
+ gchar **tokens, *err;
+ gboolean ret = TRUE;
if (str == NULL) {
return FALSE;
cnf =
rspamd_mempool_alloc0 (cfg->cfg_pool,
sizeof (struct rspamd_worker_bind_conf));
- cnf->bind_port = DEFAULT_BIND_PORT;
- cnf->bind_host = rspamd_mempool_strdup (cfg->cfg_pool, str);
- cnf->ai = AF_UNSPEC;
-
- if (*tokens[0] == '/' || *tokens[0] == '.') {
- cnf->ai = AF_UNIX;
- LL_PREPEND (cf->bind_conf, cnf);
- }
- else if (strcmp (tokens[0], "*") == 0) {
- /* We need to add two listen entries: one for ipv4 and one for ipv6 */
- tmp = tokens[0];
- tokens[0] = "*v4";
- cnf->ai = AF_INET;
- if ((ret = parse_host_port_priority_strv (cfg->cfg_pool, tokens,
- &cnf->bind_host, &cnf->bind_port, NULL, DEFAULT_BIND_PORT))) {
- LL_PREPEND (cf->bind_conf, cnf);
- }
- cnf =
- rspamd_mempool_alloc0 (cfg->cfg_pool,
- sizeof (struct rspamd_worker_bind_conf));
- cnf->bind_port = DEFAULT_BIND_PORT;
- cnf->bind_host = rspamd_mempool_strdup (cfg->cfg_pool, str);
- cnf->ai = AF_INET6;
- tokens[0] = "*v6";
- if ((rc = parse_host_port_priority_strv (cfg->cfg_pool, tokens,
- &cnf->bind_host, &cnf->bind_port, NULL, DEFAULT_BIND_PORT))) {
- LL_PREPEND (cf->bind_conf, cnf);
- }
- tokens[0] = tmp;
- }
- else if (strcmp (tokens[0], "systemd") == 0) {
+
+ cnf->cnt = 1024;
+ if (strcmp (tokens[0], "systemd") == 0) {
/* The actual socket will be passed by systemd environment */
- cnf->bind_host = rspamd_mempool_strdup (cfg->cfg_pool, str);
- cnf->ai = strtoul (tokens[1], &err, 10);
cnf->is_systemd = TRUE;
+ cnf->cnt = strtoul (tokens[1], &err, 10);
+ cnf->addrs = NULL;
if (err == NULL || *err == '\0') {
LL_PREPEND (cf->bind_conf, cnf);
}
else {
+ msg_err ("cannot parse bind line: %s", str);
ret = FALSE;
}
}
else {
- cnf->bind_host = rspamd_mempool_strdup (cfg->cfg_pool, tokens[0]);
- if (tokens[1] == NULL) {
- cnf->bind_port = DEFAULT_BIND_PORT;
- err = NULL;
+ if (!rspamd_parse_host_port_priority_strv (tokens, &cnf->addrs,
+ &cnf->cnt, NULL, &cnf->name, DEFAULT_BIND_PORT, cfg->cfg_pool)) {
+ msg_err ("cannot parse bind line: %s", str);
+ ret = FALSE;
}
else {
- cnf->bind_port = strtoul (tokens[1], &err, 10);
- }
- if (err == NULL || *err == '\0') {
LL_PREPEND (cf->bind_conf, cnf);
}
- else {
- ret = FALSE;
- }
}
g_strfreev (tokens);
return fd;
}
+int
+rspamd_inet_address_listen (rspamd_inet_addr_t *addr, gint type,
+ gboolean async)
+{
+ gint fd, r;
+ gint on = 1;
+
+ if (addr == NULL) {
+ return -1;
+ }
+
+ fd = rspamd_socket_create (addr->af, type, 0, async);
+ if (fd == -1) {
+ return -1;
+ }
+
+ setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&on, sizeof (gint));
+ r = bind (fd, &addr->addr.sa, addr->slen);
+ if (r == -1) {
+ if (!async || errno != EINPROGRESS) {
+ close (fd);
+ msg_warn ("bind failed: %d, '%s'", errno,
+ strerror (errno));
+ return -1;
+ }
+ }
+
+ if (type != SOCK_DGRAM) {
+ r = listen (fd, -1);
+
+ if (r == -1) {
+ msg_warn ("listen failed: %d, '%s'", errno, strerror (errno));
+ close (fd);
+ return -1;
+ }
+ }
+
+ return fd;
+}
+
gboolean
rspamd_parse_host_port_priority_strv (gchar **tokens,
rspamd_inet_addr_t **addr,
cur_tok = tokens[0];
- if (strcmp (cur_tok, "*v6") == 0) {
- hints.ai_family = AF_INET6;
+ if (strcmp (cur_tok, "*") == 0) {
hints.ai_flags |= AI_PASSIVE;
cur_tok = NULL;
}
- else if (strcmp (cur_tok, "*v4") == 0) {
- hints.ai_family = AF_INET;
- hints.ai_flags |= AI_PASSIVE;
- cur_tok = NULL;
- }
- else {
- hints.ai_family = AF_UNSPEC;
- }
+
+ hints.ai_family = AF_UNSPEC;
if (tokens[1] != NULL) {
/* Port part */
int rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type,
gboolean async);
+/**
+ * Listen on a specified inet address
+ * @param addr
+ * @param type
+ * @param async
+ * @return
+ */
+int rspamd_inet_address_listen (rspamd_inet_addr_t *addr, gint type,
+ gboolean async);
/**
* Check whether specified ip is valid (not INADDR_ANY or INADDR_NONE) for ipv4 or ipv6
* @param ptr pointer to struct in_addr or struct in6_addr
}
static GList *
-create_listen_socket (const gchar *addr, gint port, gint listen_type)
+create_listen_socket (rspamd_inet_addr_t *addrs, guint cnt, gint listen_type)
{
- gint listen_sock = -1;
- GList *result, *cur;
- /* Create listen sockets */
- result = make_universal_sockets_list (addr,
- port,
- listen_type,
- TRUE,
- TRUE,
- TRUE);
-
- cur = result;
- while (cur != NULL) {
- listen_sock = GPOINTER_TO_INT (cur->data);
- if (listen_sock != -1 && listen_type != SOCK_DGRAM) {
- if (listen (listen_sock, -1) == -1) {
- msg_err ("cannot listen on socket. %s", strerror (errno));
- }
+ GList *result = NULL;
+ gint fd;
+ guint i;
+
+ for (i = 0; i < cnt; i ++) {
+ fd = rspamd_inet_address_listen (&addrs[i], listen_type, TRUE);
+ if (fd != -1) {
+ result = g_list_prepend (result, GINT_TO_POINTER (fd));
}
- cur = g_list_next (cur);
}
return result;
}
static inline uintptr_t
-make_listen_key (gint ai, const gchar *addr, gint port)
+make_listen_key (struct rspamd_worker_bind_conf *cf)
{
gpointer xxh;
+ guint i;
- xxh = XXH32_init (0xbeef);
- XXH32_update (xxh, addr, strlen (addr));
- XXH32_update (xxh, (guchar *)&ai, sizeof (gint));
- XXH32_update (xxh, (guchar *)&port, sizeof (gint));
+ xxh = XXH32_init (0xdeadbeef);
+ if (cf->is_systemd) {
+ XXH32_update (xxh, "systemd", sizeof ("systemd"));
+ XXH32_update (xxh, &cf->cnt, sizeof (cf->cnt));
+ }
+ else {
+ XXH32_update (xxh, cf->name, strlen (cf->name));
+ for (i = 0; i < cf->cnt; i ++) {
+ XXH32_update (xxh, &cf->addrs[i].addr, cf->addrs[i].slen);
+ }
+ }
return XXH32_digest (xxh);
}
{
GList *cur, *ls;
struct rspamd_worker_conf *cf;
- gint i, key;
+ gint i;
gpointer p;
+ guintptr key;
struct rspamd_worker_bind_conf *bcf;
cur = rspamd->cfg->workers;
if (cf->worker->has_socket) {
LL_FOREACH (cf->bind_conf, bcf)
{
- key = make_listen_key (bcf->ai,
- bcf->bind_host,
- bcf->bind_port);
+ key = make_listen_key (bcf);
if ((p =
g_hash_table_lookup (listen_sockets,
GINT_TO_POINTER (key))) == NULL) {
if (!bcf->is_systemd) {
/* Create listen socket */
- ls = create_listen_socket (bcf->bind_host,
- bcf->bind_port,
+ ls = create_listen_socket (bcf->addrs, bcf->cnt,
cf->worker->listen_type);
}
else {
- ls = systemd_get_socket (bcf->ai);
+ ls = systemd_get_socket (bcf->cnt);
}
if (ls == NULL) {
msg_err ("cannot listen on socket %s: %s",
- bcf->bind_host,
+ bcf->name,
strerror (errno));
exit (-errno);
}
- g_hash_table_insert (listen_sockets,
- GINT_TO_POINTER (key), ls);
+ g_hash_table_insert (listen_sockets, (gpointer)key, ls);
}
else {
/* We had socket for this type of worker */