diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-10-28 17:27:59 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-10-28 17:27:59 +0000 |
commit | 3ca7de99e6194a89e65682449641bf53a447931c (patch) | |
tree | 0d5ee7a3256f95e0f778f1bf4eb02590554d9349 | |
parent | d366ea9c3111136b3757660bf21e345dc110b957 (diff) | |
parent | bbec275025aa9a61d8723be5b781269fe031adb2 (diff) | |
download | rspamd-3ca7de99e6194a89e65682449641bf53a447931c.tar.gz rspamd-3ca7de99e6194a89e65682449641bf53a447931c.zip |
Merge branch 'upstream-rework'
-rw-r--r-- | src/libserver/cfg_file.h | 33 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 174 | ||||
-rw-r--r-- | src/libutil/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/libutil/addr.c | 352 | ||||
-rw-r--r-- | src/libutil/addr.h | 124 | ||||
-rw-r--r-- | src/libutil/upstream.c | 892 | ||||
-rw-r--r-- | src/libutil/upstream.h | 148 | ||||
-rw-r--r-- | src/libutil/util.c | 159 | ||||
-rw-r--r-- | src/libutil/util.h | 79 | ||||
-rw-r--r-- | src/lua/lua_upstream.c | 321 |
10 files changed, 947 insertions, 1339 deletions
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 9366b7718..f9e2aa46e 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -15,11 +15,6 @@ #define DEFAULT_BIND_PORT 11333 #define DEFAULT_CONTROL_PORT 11334 -/* Upstream timeouts */ -#define DEFAULT_UPSTREAM_ERROR_TIME 10 -#define DEFAULT_UPSTREAM_ERROR_TIME 10 -#define DEFAULT_UPSTREAM_DEAD_TIME 300 -#define DEFAULT_UPSTREAM_MAXERRORS 10 struct expression; struct tokenizer; @@ -331,34 +326,6 @@ struct rspamd_config { /** - * Parse host[:port[:priority]] line - * @param ina host address - * @param port port - * @param priority priority - * @return TRUE if string was parsed - */ -gboolean rspamd_parse_host_port_priority (rspamd_mempool_t *pool, - const gchar *str, gchar **addr, guint16 *port, guint *priority); - -/** - * Parse host:port line - * @param ina host address - * @param port port - * @return TRUE if string was parsed - */ -gboolean rspamd_parse_host_port (rspamd_mempool_t *pool, const gchar *str, - gchar **addr, guint16 *port); - -/** - * Parse host:priority line - * @param ina host address - * @param priority priority - * @return TRUE if string was parsed - */ -gboolean rspamd_parse_host_priority (rspamd_mempool_t *pool, const gchar *str, - gchar **addr, guint *priority); - -/** * Parse bind credits * @param cf config file to use * @param str line that presents bind line diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index d01fd2a0e..8d7988b01 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -53,180 +53,6 @@ static gchar * rspamd_ucl_read_cb (rspamd_mempool_t * pool, static void rspamd_ucl_fin_cb (rspamd_mempool_t * pool, struct map_cb_data *data); -static gboolean -parse_host_port_priority_strv (rspamd_mempool_t *pool, gchar **tokens, - gchar **addr, guint16 *port, guint *priority, guint default_port) -{ - gchar *err_str, portbuf[8]; - const gchar *cur_tok, *cur_port; - struct addrinfo hints, *res; - guint port_parsed, priority_parsed, saved_errno = errno; - gint r; - union { - struct sockaddr_in v4; - struct sockaddr_in6 v6; - } addr_holder; - - /* Now try to parse host and write address to ina */ - memset (&hints, 0, sizeof (hints)); - hints.ai_socktype = SOCK_STREAM; /* Type of the socket */ - hints.ai_flags = AI_NUMERICSERV; - - cur_tok = tokens[0]; - - if (strcmp (cur_tok, "*v6") == 0) { - hints.ai_family = AF_INET6; - hints.ai_flags |= AI_PASSIVE; - cur_tok = NULL; - } - else if (strcmp (cur_tok, "*v4") == 0) { - hints.ai_family = AF_INET; - hints.ai_flags |= AI_PASSIVE; - cur_tok = NULL; - } - else { - hints.ai_family = AF_UNSPEC; - } - - if (tokens[1] != NULL) { - /* Port part */ - rspamd_strlcpy (portbuf, tokens[1], sizeof (portbuf)); - cur_port = portbuf; - if (port != NULL) { - errno = 0; - port_parsed = strtoul (tokens[1], &err_str, 10); - if (*err_str != '\0' || errno != 0) { - msg_warn ("cannot parse port: %s, at symbol %c, error: %s", - tokens[1], - *err_str, - strerror (errno)); - hints.ai_flags ^= AI_NUMERICSERV; - } - else if (port_parsed > G_MAXUINT16) { - errno = ERANGE; - msg_warn ("cannot parse port: %s, error: %s", - tokens[1], - *err_str, - strerror (errno)); - hints.ai_flags ^= AI_NUMERICSERV; - } - else { - *port = port_parsed; - } - } - if (priority != NULL) { - const gchar *tok; - - if (port != NULL) { - tok = tokens[2]; - } - else { - tok = tokens[1]; - } - if (tok != NULL) { - /* Priority part */ - errno = 0; - priority_parsed = strtoul (tok, &err_str, 10); - if (*err_str != '\0' || errno != 0) { - msg_warn ( - "cannot parse priority: %s, at symbol %c, error: %s", - tok, - *err_str, - strerror (errno)); - } - else { - *priority = priority_parsed; - } - } - } - } - else if (default_port != 0) { - rspamd_snprintf (portbuf, sizeof (portbuf), "%ud", default_port); - cur_port = portbuf; - } - else { - cur_port = NULL; - } - - if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) { - memcpy (&addr_holder, res->ai_addr, - MIN (sizeof (addr_holder), res->ai_addrlen)); - if (res->ai_family == AF_INET) { - if (pool != NULL) { - *addr = rspamd_mempool_alloc (pool, INET_ADDRSTRLEN + 1); - } - inet_ntop (res->ai_family, - &addr_holder.v4.sin_addr, - *addr, - INET_ADDRSTRLEN + 1); - } - else { - if (pool != NULL) { - *addr = rspamd_mempool_alloc (pool, INET6_ADDRSTRLEN + 1); - } - inet_ntop (res->ai_family, - &addr_holder.v6.sin6_addr, - *addr, - INET6_ADDRSTRLEN + 1); - } - freeaddrinfo (res); - } - else { - msg_err ("address resolution for %s failed: %s", - tokens[0], - gai_strerror (r)); - goto err; - } - - /* Restore errno */ - errno = saved_errno; - return TRUE; - -err: - errno = saved_errno; - return FALSE; -} - -gboolean -rspamd_parse_host_port_priority (rspamd_mempool_t *pool, - const gchar *str, - gchar **addr, - guint16 *port, - guint *priority) -{ - gchar **tokens; - gboolean ret; - - tokens = g_strsplit_set (str, ":", 0); - if (!tokens || !tokens[0]) { - return FALSE; - } - - ret = parse_host_port_priority_strv (pool, tokens, addr, port, priority, 0); - - g_strfreev (tokens); - - return ret; -} - -gboolean -rspamd_parse_host_port (rspamd_mempool_t *pool, - const gchar *str, - gchar **addr, - guint16 *port) -{ - return rspamd_parse_host_port_priority (pool, str, addr, port, NULL); -} - -gboolean -rspamd_parse_host_priority (rspamd_mempool_t *pool, - const gchar *str, - gchar **addr, - guint *priority) -{ - return rspamd_parse_host_port_priority (pool, str, addr, NULL, priority); -} - gboolean rspamd_parse_bind_line (struct rspamd_config *cfg, struct rspamd_worker_conf *cf, diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt index 7a8b3add5..01c88769b 100644 --- a/src/libutil/CMakeLists.txt +++ b/src/libutil/CMakeLists.txt @@ -1,5 +1,7 @@ # Librspamd-util -SET(LIBRSPAMDUTILSRC aio_event.c +SET(LIBRSPAMDUTILSRC + addr.c + aio_event.c bloom.c diff.c fstring.c diff --git a/src/libutil/addr.c b/src/libutil/addr.c new file mode 100644 index 000000000..6aeed21e9 --- /dev/null +++ b/src/libutil/addr.c @@ -0,0 +1,352 @@ +/* Copyright (c) 2014, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "addr.h" +#include "util.h" +#include "logger.h" + +gboolean +rspamd_ip_is_valid (rspamd_inet_addr_t *addr) +{ + const struct in_addr ip4_any = { INADDR_ANY }, ip4_none = { INADDR_NONE }; + const struct in6_addr ip6_any = IN6ADDR_ANY_INIT; + + gboolean ret = FALSE; + + if (G_LIKELY (addr->af == AF_INET)) { + if (memcmp (&addr->addr.s4.sin_addr, &ip4_any, + sizeof (struct in_addr)) != 0 && + memcmp (&addr->addr.s4.sin_addr, &ip4_none, + sizeof (struct in_addr)) != 0) { + ret = TRUE; + } + } + else if (G_UNLIKELY (addr->af == AF_INET6)) { + if (memcmp (&addr->addr.s6.sin6_addr, &ip6_any, + sizeof (struct in6_addr)) != 0) { + ret = TRUE; + } + } + + return ret; +} + +gint +rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr) +{ + gint nfd, serrno; + socklen_t len = sizeof (addr->addr.ss); + + if ((nfd = accept (sock, &addr->addr.sa, &len)) == -1) { + if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { + return 0; + } + return -1; + } + + addr->slen = len; + addr->af = addr->addr.sa.sa_family; + + if (make_socket_nonblocking (nfd) < 0) { + goto out; + } + + /* Set close on exec */ + if (fcntl (nfd, F_SETFD, FD_CLOEXEC) == -1) { + msg_warn ("fcntl failed: %d, '%s'", errno, strerror (errno)); + goto out; + } + + return (nfd); + +out: + serrno = errno; + close (nfd); + errno = serrno; + return (-1); + +} + +gboolean +rspamd_parse_inet_address (rspamd_inet_addr_t *target, const char *src) +{ + gboolean ret = FALSE; + + if (inet_pton (AF_INET6, src, &target->addr.s6.sin6_addr) == 1) { + target->af = AF_INET6; + target->slen = sizeof (target->addr.s6); + ret = TRUE; + } + else if (inet_pton (AF_INET, src, &target->addr.s4.sin_addr) == 1) { + target->af = AF_INET; + target->slen = sizeof (target->addr.s4); + ret = TRUE; + } + + target->addr.sa.sa_family = target->af; + + return ret; +} + +const char * +rspamd_inet_address_to_string (rspamd_inet_addr_t *addr) +{ + static char addr_str[INET6_ADDRSTRLEN + 1]; + + switch (addr->af) { + case AF_INET: + return inet_ntop (addr->af, &addr->addr.s4.sin_addr, addr_str, + sizeof (addr_str)); + case AF_INET6: + return inet_ntop (addr->af, &addr->addr.s6.sin6_addr, addr_str, + sizeof (addr_str)); + case AF_UNIX: + return addr->addr.su.sun_path; + } + + return "undefined"; +} + +uint16_t +rspamd_inet_address_get_port (rspamd_inet_addr_t *addr) +{ + switch (addr->af) { + case AF_INET: + return ntohs (addr->addr.s4.sin_port); + case AF_INET6: + return ntohs (addr->addr.s6.sin6_port); + } + + return 0; +} + +void +rspamd_inet_address_set_port (rspamd_inet_addr_t *addr, uint16_t port) +{ + switch (addr->af) { + case AF_INET: + addr->addr.s4.sin_port = htons (port); + break; + case AF_INET6: + addr->addr.s6.sin6_port = htons (port); + break; + } +} + +int +rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type, + gboolean async) +{ + int fd, r; + + if (addr == NULL) { + return -1; + } + + fd = rspamd_socket_create (addr->af, type, 0, async); + if (fd == -1) { + return -1; + } + + r = connect (fd, &addr->addr.sa, addr->slen); + + if (r == -1) { + if (!async || errno != EINPROGRESS) { + close (fd); + msg_warn ("connect failed: %d, '%s'", errno, + strerror (errno)); + return -1; + } + } + + return fd; +} + +gboolean +rspamd_parse_host_port_priority_strv (gchar **tokens, + rspamd_inet_addr_t **addr, + guint *max_addrs, + guint *priority, + gchar **name, + guint default_port) +{ + gchar *err_str, portbuf[8]; + const gchar *cur_tok, *cur_port; + struct addrinfo hints, *res, *cur; + rspamd_inet_addr_t *cur_addr; + guint addr_cnt; + guint port_parsed, priority_parsed, saved_errno = errno; + gint r; + + /* Now try to parse host and write address to ina */ + memset (&hints, 0, sizeof (hints)); + hints.ai_socktype = SOCK_STREAM; /* Type of the socket */ + hints.ai_flags = AI_NUMERICSERV; + + cur_tok = tokens[0]; + + if (strcmp (cur_tok, "*v6") == 0) { + hints.ai_family = AF_INET6; + hints.ai_flags |= AI_PASSIVE; + cur_tok = NULL; + } + else if (strcmp (cur_tok, "*v4") == 0) { + hints.ai_family = AF_INET; + hints.ai_flags |= AI_PASSIVE; + cur_tok = NULL; + } + else { + hints.ai_family = AF_UNSPEC; + } + + if (tokens[1] != NULL) { + /* Port part */ + rspamd_strlcpy (portbuf, tokens[1], sizeof (portbuf)); + cur_port = portbuf; + errno = 0; + port_parsed = strtoul (tokens[1], &err_str, 10); + if (*err_str != '\0' || errno != 0) { + msg_warn ("cannot parse port: %s, at symbol %c, error: %s", + tokens[1], + *err_str, + strerror (errno)); + hints.ai_flags ^= AI_NUMERICSERV; + } + else if (port_parsed > G_MAXUINT16) { + errno = ERANGE; + msg_warn ("cannot parse port: %s, error: %s", + tokens[1], + *err_str, + strerror (errno)); + hints.ai_flags ^= AI_NUMERICSERV; + } + if (priority != NULL) { + const gchar *tok; + + tok = tokens[2]; + if (tok != NULL) { + /* Priority part */ + errno = 0; + priority_parsed = strtoul (tok, &err_str, 10); + if (*err_str != '\0' || errno != 0) { + msg_warn ( + "cannot parse priority: %s, at symbol %c, error: %s", + tok, + *err_str, + strerror (errno)); + } + else { + *priority = priority_parsed; + } + } + } + } + else if (default_port != 0) { + rspamd_snprintf (portbuf, sizeof (portbuf), "%ud", default_port); + cur_port = portbuf; + } + else { + cur_port = NULL; + } + + if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) { + /* Now copy up to max_addrs of addresses */ + addr_cnt = 0; + cur = res; + while (cur && addr_cnt < *max_addrs) { + cur = cur->ai_next; + addr_cnt ++; + } + + *addr = g_new (rspamd_inet_addr_t, addr_cnt); + + cur = res; + addr_cnt = 0; + while (cur && addr_cnt < *max_addrs) { + cur_addr = &(*addr)[addr_cnt]; + memcpy (&cur_addr->addr, cur->ai_addr, + MIN (sizeof (cur_addr->addr), cur->ai_addrlen)); + cur_addr->af = cur->ai_family; + cur = cur->ai_next; + addr_cnt ++; + } + + *max_addrs = addr_cnt; + + freeaddrinfo (res); + } + else { + msg_err ("address resolution for %s failed: %s", + tokens[0], + gai_strerror (r)); + goto err; + } + + /* Restore errno */ + if (name != NULL) { + *name = g_strdup (tokens[0]); + } + errno = saved_errno; + return TRUE; + +err: + errno = saved_errno; + return FALSE; +} + +gboolean +rspamd_parse_host_port_priority ( + const gchar *str, + rspamd_inet_addr_t **addr, + guint *max_addrs, + guint *priority, + gchar **name, + guint default_port) +{ + gchar **tokens; + gboolean ret; + + tokens = g_strsplit_set (str, ":", 0); + if (!tokens || !tokens[0]) { + return FALSE; + } + + ret = rspamd_parse_host_port_priority_strv (tokens, addr, max_addrs, + priority, name, default_port); + + g_strfreev (tokens); + + return ret; +} + +gboolean +rspamd_parse_host_port (const gchar *str, + rspamd_inet_addr_t **addr, + guint *max_addrs, + gchar **name, + guint default_port) +{ + return rspamd_parse_host_port_priority (str, addr, max_addrs, NULL, + name, default_port); +} diff --git a/src/libutil/addr.h b/src/libutil/addr.h new file mode 100644 index 000000000..5989b740b --- /dev/null +++ b/src/libutil/addr.h @@ -0,0 +1,124 @@ +/* Copyright (c) 2014, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef ADDR_H_ +#define ADDR_H_ + +#include "config.h" + +/** + * Union that is used for storing sockaddrs + */ +union sa_union { + struct sockaddr_storage ss; + struct sockaddr sa; + struct sockaddr_in s4; + struct sockaddr_in6 s6; + struct sockaddr_un su; +}; + +typedef struct _rspamd_inet_addr_s { + union sa_union addr; + socklen_t slen; + int af; +} rspamd_inet_addr_t; + +/** + * Try to parse address from string + * @param target target to fill + * @param src IP string representation + * @return TRUE if addr has been parsed + */ +gboolean rspamd_parse_inet_address (rspamd_inet_addr_t *target, + const char *src); + +/** + * Returns string representation of inet address + * @param addr + * @return statically allocated string pointer (not thread safe) + */ +const char * rspamd_inet_address_to_string (rspamd_inet_addr_t *addr); + +/** + * Returns port number for the specified inet address in host byte order + * @param addr + * @return + */ +uint16_t rspamd_inet_address_get_port (rspamd_inet_addr_t *addr); + +/** + * Set port for inet address + */ +void rspamd_inet_address_set_port (rspamd_inet_addr_t *addr, uint16_t port); + +/** + * Connect to inet_addr address + * @param addr + * @param async perform operations asynchronously + * @return newly created and connected socket + */ +int rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type, + gboolean async); + +/** + * Check whether specified ip is valid (not INADDR_ANY or INADDR_NONE) for ipv4 or ipv6 + * @param ptr pointer to struct in_addr or struct in6_addr + * @param af address family (AF_INET or AF_INET6) + * @return TRUE if the address is valid + */ +gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr); + +/** + * Accept from listening socket filling addr structure + * @param sock listening socket + * @param addr + * @return + */ +gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr); + +gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, + rspamd_inet_addr_t **addr, guint *max_addrs, guint *priority, + gchar **name, guint default_port); + +/** + * Parse host[:port[:priority]] line + * @param ina host address + * @param port port + * @param priority priority + * @return TRUE if string was parsed + */ +gboolean rspamd_parse_host_port_priority (const gchar *str, + rspamd_inet_addr_t **addr, guint *max_addrs, + guint *priority, gchar **name, guint default_port); + +/** + * Parse host:port line + * @param ina host address + * @param port port + * @return TRUE if string was parsed + */ +gboolean rspamd_parse_host_port (const gchar *str, + rspamd_inet_addr_t **addr, guint *max_addrs, + gchar **name, guint default_port); + + +#endif /* ADDR_H_ */ diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index ee21a5ec3..ca3c4a947 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -24,616 +24,460 @@ #include "config.h" #include "upstream.h" +#include "ottery.h" +#include "ref.h" +#include "rdns.h" +#include "xxhash.h" +#include "utlist.h" + +struct upstream_inet_addr_entry { + rspamd_inet_addr_t addr; + struct upstream_inet_addr_entry *next; +}; +struct upstream { + guint weight; + guint cur_weight; + guint errors; + gint active_idx; + gchar *name; + struct event ev; + struct timeval tv; + gpointer ud; + struct upstream_list *ls; + + struct { + rspamd_inet_addr_t *addr; + guint count; + guint cur; + } addrs; + + struct upstream_inet_addr_entry *new_addrs; + rspamd_mutex_t *lock; + + ref_entry_t ref; +}; -#ifdef _THREAD_SAFE -pthread_rwlock_t upstream_mtx = PTHREAD_RWLOCK_INITIALIZER; -# define U_RLOCK() do { pthread_rwlock_rdlock (&upstream_mtx); } while (0) -# define U_WLOCK() do { pthread_rwlock_wrlock (&upstream_mtx); } while (0) -# define U_UNLOCK() do { pthread_rwlock_unlock (&upstream_mtx); } while (0) -#else -# define U_RLOCK() do {} while (0) -# define U_WLOCK() do {} while (0) -# define U_UNLOCK() do {} while (0) -#endif +struct upstream_list { + GPtrArray *ups; + GPtrArray *alive; + rspamd_mutex_t *lock; + guint64 hash_seed; +}; -#define MAX_TRIES 20 -#define HASH_COMPAT +static struct rdns_resolver *res = NULL; +static struct event_base *ev_base = NULL; +/* 4 errors in 10 seconds */ +const guint default_max_errors = 4; +const guint default_revive_time = 60; +const guint default_error_time = 10; +const gdouble default_dns_timeout = 1.0; +const guint default_dns_retransmits = 2; +const guint default_max_addresses = 1024; -/* - * Poly: 0xedb88320 - * Init: 0x0 - */ +static void +rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) +{ + rspamd_mutex_lock (ls->lock); + g_ptr_array_add (ls->alive, up); + up->active_idx = ls->alive->len - 1; + rspamd_mutex_unlock (ls->lock); +} -static const guint32 crc32lookup[256] = { - 0x00000000U, 0x77073096U, 0xee0e612cU, 0x990951baU, 0x076dc419U, - 0x706af48fU, - 0xe963a535U, 0x9e6495a3U, 0x0edb8832U, 0x79dcb8a4U, 0xe0d5e91eU, - 0x97d2d988U, - 0x09b64c2bU, 0x7eb17cbdU, 0xe7b82d07U, 0x90bf1d91U, 0x1db71064U, - 0x6ab020f2U, - 0xf3b97148U, 0x84be41deU, 0x1adad47dU, 0x6ddde4ebU, 0xf4d4b551U, - 0x83d385c7U, - 0x136c9856U, 0x646ba8c0U, 0xfd62f97aU, 0x8a65c9ecU, 0x14015c4fU, - 0x63066cd9U, - 0xfa0f3d63U, 0x8d080df5U, 0x3b6e20c8U, 0x4c69105eU, 0xd56041e4U, - 0xa2677172U, - 0x3c03e4d1U, 0x4b04d447U, 0xd20d85fdU, 0xa50ab56bU, 0x35b5a8faU, - 0x42b2986cU, - 0xdbbbc9d6U, 0xacbcf940U, 0x32d86ce3U, 0x45df5c75U, 0xdcd60dcfU, - 0xabd13d59U, - 0x26d930acU, 0x51de003aU, 0xc8d75180U, 0xbfd06116U, 0x21b4f4b5U, - 0x56b3c423U, - 0xcfba9599U, 0xb8bda50fU, 0x2802b89eU, 0x5f058808U, 0xc60cd9b2U, - 0xb10be924U, - 0x2f6f7c87U, 0x58684c11U, 0xc1611dabU, 0xb6662d3dU, 0x76dc4190U, - 0x01db7106U, - 0x98d220bcU, 0xefd5102aU, 0x71b18589U, 0x06b6b51fU, 0x9fbfe4a5U, - 0xe8b8d433U, - 0x7807c9a2U, 0x0f00f934U, 0x9609a88eU, 0xe10e9818U, 0x7f6a0dbbU, - 0x086d3d2dU, - 0x91646c97U, 0xe6635c01U, 0x6b6b51f4U, 0x1c6c6162U, 0x856530d8U, - 0xf262004eU, - 0x6c0695edU, 0x1b01a57bU, 0x8208f4c1U, 0xf50fc457U, 0x65b0d9c6U, - 0x12b7e950U, - 0x8bbeb8eaU, 0xfcb9887cU, 0x62dd1ddfU, 0x15da2d49U, 0x8cd37cf3U, - 0xfbd44c65U, - 0x4db26158U, 0x3ab551ceU, 0xa3bc0074U, 0xd4bb30e2U, 0x4adfa541U, - 0x3dd895d7U, - 0xa4d1c46dU, 0xd3d6f4fbU, 0x4369e96aU, 0x346ed9fcU, 0xad678846U, - 0xda60b8d0U, - 0x44042d73U, 0x33031de5U, 0xaa0a4c5fU, 0xdd0d7cc9U, 0x5005713cU, - 0x270241aaU, - 0xbe0b1010U, 0xc90c2086U, 0x5768b525U, 0x206f85b3U, 0xb966d409U, - 0xce61e49fU, - 0x5edef90eU, 0x29d9c998U, 0xb0d09822U, 0xc7d7a8b4U, 0x59b33d17U, - 0x2eb40d81U, - 0xb7bd5c3bU, 0xc0ba6cadU, 0xedb88320U, 0x9abfb3b6U, 0x03b6e20cU, - 0x74b1d29aU, - 0xead54739U, 0x9dd277afU, 0x04db2615U, 0x73dc1683U, 0xe3630b12U, - 0x94643b84U, - 0x0d6d6a3eU, 0x7a6a5aa8U, 0xe40ecf0bU, 0x9309ff9dU, 0x0a00ae27U, - 0x7d079eb1U, - 0xf00f9344U, 0x8708a3d2U, 0x1e01f268U, 0x6906c2feU, 0xf762575dU, - 0x806567cbU, - 0x196c3671U, 0x6e6b06e7U, 0xfed41b76U, 0x89d32be0U, 0x10da7a5aU, - 0x67dd4accU, - 0xf9b9df6fU, 0x8ebeeff9U, 0x17b7be43U, 0x60b08ed5U, 0xd6d6a3e8U, - 0xa1d1937eU, - 0x38d8c2c4U, 0x4fdff252U, 0xd1bb67f1U, 0xa6bc5767U, 0x3fb506ddU, - 0x48b2364bU, - 0xd80d2bdaU, 0xaf0a1b4cU, 0x36034af6U, 0x41047a60U, 0xdf60efc3U, - 0xa867df55U, - 0x316e8eefU, 0x4669be79U, 0xcb61b38cU, 0xbc66831aU, 0x256fd2a0U, - 0x5268e236U, - 0xcc0c7795U, 0xbb0b4703U, 0x220216b9U, 0x5505262fU, 0xc5ba3bbeU, - 0xb2bd0b28U, - 0x2bb45a92U, 0x5cb36a04U, 0xc2d7ffa7U, 0xb5d0cf31U, 0x2cd99e8bU, - 0x5bdeae1dU, - 0x9b64c2b0U, 0xec63f226U, 0x756aa39cU, 0x026d930aU, 0x9c0906a9U, - 0xeb0e363fU, - 0x72076785U, 0x05005713U, 0x95bf4a82U, 0xe2b87a14U, 0x7bb12baeU, - 0x0cb61b38U, - 0x92d28e9bU, 0xe5d5be0dU, 0x7cdcefb7U, 0x0bdbdf21U, 0x86d3d2d4U, - 0xf1d4e242U, - 0x68ddb3f8U, 0x1fda836eU, 0x81be16cdU, 0xf6b9265bU, 0x6fb077e1U, - 0x18b74777U, - 0x88085ae6U, 0xff0f6a70U, 0x66063bcaU, 0x11010b5cU, 0x8f659effU, - 0xf862ae69U, - 0x616bffd3U, 0x166ccf45U, 0xa00ae278U, 0xd70dd2eeU, 0x4e048354U, - 0x3903b3c2U, - 0xa7672661U, 0xd06016f7U, 0x4969474dU, 0x3e6e77dbU, 0xaed16a4aU, - 0xd9d65adcU, - 0x40df0b66U, 0x37d83bf0U, 0xa9bcae53U, 0xdebb9ec5U, 0x47b2cf7fU, - 0x30b5ffe9U, - 0xbdbdf21cU, 0xcabac28aU, 0x53b39330U, 0x24b4a3a6U, 0xbad03605U, - 0xcdd70693U, - 0x54de5729U, 0x23d967bfU, 0xb3667a2eU, 0xc4614ab8U, 0x5d681b02U, - 0x2a6f2b94U, - 0xb40bbe37U, 0xc30c8ea1U, 0x5a05df1bU, 0x2d02ef8dU -}; +static void +rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) +{ + struct upstream *up = (struct upstream *)arg; + struct rdns_reply_entry *entry; + struct upstream_inet_addr_entry *up_ent; + + if (reply->code == RDNS_RC_NOERROR) { + entry = reply->entries; + + rspamd_mutex_lock (up->lock); + while (entry) { + + if (entry->type == RDNS_REQUEST_A) { + up_ent = g_malloc (sizeof (*up_ent)); + + up_ent->addr.addr.s4.sin_addr = entry->content.a.addr; + up_ent->addr.af = AF_INET; + up_ent->addr.slen = sizeof (up_ent->addr.addr.s4); + LL_PREPEND (up->new_addrs, up_ent); + } + else if (entry->type == RDNS_REQUEST_AAAA) { + up_ent = g_malloc (sizeof (*up_ent)); + + memcpy (&up_ent->addr.addr.s6.sin6_addr, + &entry->content.aaa.addr, sizeof (struct in6_addr)); + up_ent->addr.af = AF_INET6; + up_ent->addr.slen = sizeof (up_ent->addr.addr.s6); + LL_PREPEND (up->new_addrs, up_ent); + } + entry = entry->next; + } + rspamd_mutex_unlock (up->lock); + } + + REF_RELEASE (up); +} -/* - * Check upstream parameters and mark it whether valid or dead - */ static void -check_upstream (struct upstream *up, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors) +rspamd_upstream_update_addrs (struct upstream *up) { - if (up->dead) { - if (now - up->time >= revive_timeout) { - U_WLOCK (); - up->dead = 0; - up->errors = 0; - up->time = 0; - up->weight = up->priority; - U_UNLOCK (); + guint16 port; + guint addr_cnt; + struct upstream_inet_addr_entry *cur, *tmp; + rspamd_inet_addr_t *new_addrs, *old; + + /* + * We need first of all get the saved port, since DNS gives us no + * idea about what port has been used previously + */ + if (up->addrs.count > 0 && up->new_addrs) { + port = rspamd_inet_address_get_port (&up->addrs.addr[0]); + + /* Now calculate new addrs count */ + addr_cnt = 0; + LL_FOREACH (up->new_addrs, cur) { + addr_cnt ++; } + new_addrs = g_new (rspamd_inet_addr_t, addr_cnt); + + /* Copy addrs back */ + addr_cnt = 0; + LL_FOREACH (up->new_addrs, cur) { + memcpy (&new_addrs[addr_cnt], cur, sizeof (rspamd_inet_addr_t)); + rspamd_inet_address_set_port (&new_addrs[addr_cnt], port); + addr_cnt ++; + } + + old = up->addrs.addr; + up->addrs.cur = 0; + up->addrs.count = addr_cnt; + up->addrs.addr = new_addrs; + g_free (old); } - else { - if (now - up->time >= error_timeout && up->errors >= max_errors) { - U_WLOCK (); - up->dead = 1; - up->time = now; - up->weight = 0; - U_UNLOCK (); + + LL_FOREACH_SAFE (up->new_addrs, cur, tmp) { + g_free (cur); + } + up->new_addrs = NULL; +} + +static void +rspamd_upstream_revive_cb (int fd, short what, void *arg) +{ + struct upstream *up = (struct upstream *)arg; + + rspamd_mutex_lock (up->lock); + event_del (&up->ev); + if (up->ls) { + rspamd_upstream_set_active (up->ls, up); + + if (up->new_addrs) { + rspamd_upstream_update_addrs (up); } } + + rspamd_mutex_unlock (up->lock); + REF_RELEASE (up); +} + +static void +rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) +{ + rspamd_mutex_lock (ls->lock); + g_ptr_array_remove_index (ls->alive, up->active_idx); + up->active_idx = -1; + + /* Resolve name of the upstream one more time */ + if (up->name[0] != '/') { + REF_RETAIN (up); + rdns_make_request_full (res, rspamd_upstream_dns_cb, up, + default_dns_timeout, default_dns_retransmits, + RDNS_REQUEST_A, up->name); + REF_RETAIN (up); + rdns_make_request_full (res, rspamd_upstream_dns_cb, up, + default_dns_timeout, default_dns_retransmits, + RDNS_REQUEST_AAAA, up->name); + } + + REF_RETAIN (up); + evtimer_set (&up->ev, rspamd_upstream_revive_cb, up); + event_base_set (ev_base, &up->ev); + up->tv.tv_sec = default_revive_time; + up->tv.tv_usec = 0; + event_add (&up->ev, &up->tv); + + rspamd_mutex_unlock (ls->lock); +} + +void +rspamd_upstreams_library_init (struct rdns_resolver *resolver, + struct event_base *base) +{ + res = resolver; + ev_base = base; } -/* - * Call this function after failed upstream request - */ void -upstream_fail (struct upstream *up, time_t now) +rspamd_upstream_fail (struct upstream *up) { - if (up->time != 0) { - up->errors++; + struct timeval tv; + gdouble error_rate, max_error_rate; + + rspamd_mutex_lock (up->lock); + if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) { + gettimeofday (&up->tv, NULL); + up->errors ++; } else { - U_WLOCK (); - up->time = now; - up->errors++; - U_UNLOCK (); + g_atomic_int_inc (&up->errors); } + + gettimeofday (&tv, NULL); + + error_rate = ((gdouble)up->errors) / (tv.tv_sec - up->tv.tv_sec); + max_error_rate = (gdouble)default_max_errors / (gdouble)default_error_time; + + if (error_rate > max_error_rate) { + /* Remove upstream from the active list */ + rspamd_upstream_set_inactive (up->ls, up); + } + rspamd_mutex_unlock (up->lock); } -/* - * Call this function after successfull upstream request - */ void -upstream_ok (struct upstream *up, time_t now) +rspamd_upstream_ok (struct upstream *up) { - if (up->errors != 0) { - U_WLOCK (); + rspamd_mutex_lock (up->lock); + if (up->errors > 0) { up->errors = 0; - up->time = 0; - U_UNLOCK (); + rspamd_upstream_set_active (up->ls, up); } - up->weight--; + rspamd_mutex_unlock (up->lock); } -/* - * Mark all upstreams as active. This function is used when all upstreams are marked as inactive - */ -void -revive_all_upstreams (void *ups, size_t members, size_t msize) +struct upstream_list* +rspamd_upstreams_create (void) { - guint i; - struct upstream *cur; - guchar *p; - - U_WLOCK (); - p = ups; - for (i = 0; i < members; i++) { - cur = (struct upstream *)p; - cur->time = 0; - cur->errors = 0; - cur->dead = 0; - cur->weight = cur->priority; - p += msize; - } - U_UNLOCK (); + struct upstream_list *ls; + + ls = g_slice_alloc (sizeof (*ls)); + ottery_rand_bytes (&ls->hash_seed, sizeof (ls->hash_seed)); + ls->ups = g_ptr_array_new (); + ls->alive = g_ptr_array_new (); + ls->lock = rspamd_mutex_new (); + + return ls; } -/* - * Scan all upstreams for errors and mark upstreams dead or alive depends on conditions, - * return number of alive upstreams - */ -static gint -rescan_upstreams (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors) +static void +rspamd_upstream_dtor (struct upstream *up) { - guint i, alive; - struct upstream *cur; - guchar *p; - - /* Recheck all upstreams */ - p = ups; - alive = members; - for (i = 0; i < members; i++) { - cur = (struct upstream *)p; - check_upstream (cur, now, error_timeout, revive_timeout, max_errors); - alive -= cur->dead; - p += msize; - } + struct upstream_inet_addr_entry *cur, *tmp; - /* All upstreams are dead */ - if (alive == 0) { - revive_all_upstreams (ups, members, msize); - alive = members; + if (up->new_addrs) { + LL_FOREACH_SAFE(up->new_addrs, cur, tmp) { + g_free (cur); + } } + rspamd_mutex_free (up->lock); + g_free (up->name); + g_slice_free1 (sizeof (*up), up); +} - return alive; - +rspamd_inet_addr_t* +rspamd_upstream_addr (struct upstream *up) +{ + return &up->addrs.addr[up->addrs.cur++ % up->addrs.count]; } -/* Return alive upstream by its number */ -static struct upstream * -get_upstream_by_number (void *ups, size_t members, size_t msize, gint selected) +gboolean +rspamd_upstreams_add_upstream (struct upstream_list *ups, + const gchar *str, guint16 def_port, void *data) { - guint i; - u_char *p, *c; - struct upstream *cur; - - i = 0; - p = ups; - c = ups; - U_RLOCK (); - for (;; ) { - /* Out of range, return NULL */ - if (p > c + members * msize) { - break; - } + struct upstream *up; - cur = (struct upstream *)p; - p += msize; + up = g_slice_alloc0 (sizeof (*up)); - if (cur->dead) { - /* Skip inactive upstreams */ - continue; - } - /* Return selected upstream */ - if ((gint)i == selected) { - U_UNLOCK (); - return cur; - } - i++; + up->addrs.count = default_max_addresses; + if (!rspamd_parse_host_port_priority (str, &up->addrs.addr, + &up->addrs.count, &up->weight, + &up->name, def_port)) { + g_slice_free1 (sizeof (*up), up); + return FALSE; } - U_UNLOCK (); - /* Error */ - return NULL; + g_ptr_array_add (ups->ups, up); + up->ud = data; + up->cur_weight = up->weight; + REF_INIT_RETAIN (up, rspamd_upstream_dtor); + up->lock = rspamd_mutex_new (); + + rspamd_upstream_set_active (ups, up); + return TRUE; } -/* - * Get hash key for specified key (perl hash) - */ -static guint32 -get_hash_for_key (guint32 hash, const gchar *key, size_t keylen) +void +rspamd_upstreams_destroy (struct upstream_list *ups) { - guint32 h, index; - const gchar *end = key + keylen; + guint i; + struct upstream *up; - h = ~hash; + g_ptr_array_free (ups->alive, TRUE); - if (end != key) { - while (key < end) { - index = (h ^ (u_char) * key) & 0x000000ffU; - h = (h >> 8) ^ crc32lookup[index]; - ++key; - } - } - else { - while (*key) { - index = (h ^ (u_char) * key) & 0x000000ffU; - h = (h >> 8) ^ crc32lookup[index]; - ++key; - } + for (i = 0; i < ups->ups->len; i ++) { + up = g_ptr_array_index (ups->ups, i); + up->ls = NULL; + REF_RELEASE (up); } - return (~h); + g_ptr_array_free (ups->ups, TRUE); + rspamd_mutex_free (ups->lock); + g_slice_free1 (sizeof (*ups), ups); } -/* - * Recheck all upstreams and return random active upstream - */ -struct upstream * -get_random_upstream (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors) +static void +rspamd_upstream_restore_cb (gpointer elt, gpointer ls) { - gint alive, selected; - - alive = rescan_upstreams (ups, - members, - msize, - now, - error_timeout, - revive_timeout, - max_errors); - selected = rand () % alive; - - return get_upstream_by_number (ups, members, msize, selected); -} + struct upstream *up = (struct upstream *)elt; + struct upstream_list *ups = (struct upstream_list *)ls; -/* - * Return upstream by hash, that is calculated from active upstreams number - */ -struct upstream * -get_upstream_by_hash (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors, - const gchar *key, - size_t keylen) -{ - gint alive, tries = 0, r; - guint32 h = 0, ht; - gchar *p, numbuf[4]; - struct upstream *cur; - - alive = rescan_upstreams (ups, - members, - msize, - now, - error_timeout, - revive_timeout, - max_errors); - - if (alive == 0) { - return NULL; - } + /* Here the upstreams list is already locked */ + rspamd_mutex_lock (up->lock); + event_del (&up->ev); - h = get_hash_for_key (0, key, keylen); -#ifdef HASH_COMPAT - h = (h >> 16) & 0x7fff; -#endif - h %= members; - - for (;; ) { - p = (gchar *)ups + msize * h; - cur = (struct upstream *)p; - if (!cur->dead) { - break; - } - r = snprintf (numbuf, sizeof (numbuf), "%d", tries); - ht = get_hash_for_key (0, numbuf, r); - ht = get_hash_for_key (ht, key, keylen); -#ifdef HASH_COMPAT - h += (ht >> 16) & 0x7fff; -#else - h += ht; -#endif - h %= members; - tries++; - if (tries > MAX_TRIES) { - return NULL; - } + if (up->new_addrs) { + rspamd_upstream_update_addrs (up); } - U_RLOCK (); - p = ups; - U_UNLOCK (); - return cur; + g_ptr_array_add (ups->alive, up); + up->active_idx = ups->alive->len - 1; + rspamd_mutex_lock (up->lock); + /* For revive event */ + REF_RELEASE (up); } -/* - * Recheck all upstreams and return upstream in round-robin order according to weight and priority - */ -struct upstream * -get_upstream_round_robin (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors) +static struct upstream* +rspamd_upstream_get_random (struct upstream_list *ups) +{ + guint idx = ottery_rand_range (ups->alive->len - 1); + + return g_ptr_array_index (ups->alive, idx); +} + +static struct upstream* +rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) { - guint max_weight, i; - struct upstream *cur, *selected = NULL; - u_char *p; - - /* Recheck all upstreams */ - (void)rescan_upstreams (ups, - members, - msize, - now, - error_timeout, - revive_timeout, - max_errors); - - p = ups; - max_weight = 0; - selected = (struct upstream *)p; - U_RLOCK (); - for (i = 0; i < members; i++) { - cur = (struct upstream *)p; - if (!cur->dead) { - if (max_weight < (guint)cur->weight) { - max_weight = cur->weight; - selected = cur; + guint max_weight = 0; + struct upstream *up, *selected; + guint i; + + /* Select upstream with the maximum cur_weight */ + rspamd_mutex_lock (ups->lock); + for (i = 0; i < ups->alive->len; i ++) { + up = g_ptr_array_index (ups->alive, i); + if (use_cur) { + if (up->cur_weight > max_weight) { + selected = up; + max_weight = up->cur_weight; } } - p += msize; - } - U_UNLOCK (); - - if (max_weight == 0) { - p = ups; - U_WLOCK (); - for (i = 0; i < members; i++) { - cur = (struct upstream *)p; - cur->weight = cur->priority; - if (!cur->dead) { - if (max_weight < cur->priority) { - max_weight = cur->priority; - selected = cur; - } + else { + if (up->weight > max_weight) { + selected = up; + max_weight = up->weight; } - p += msize; } - U_UNLOCK (); } - return selected; -} - -/* - * Recheck all upstreams and return upstream in round-robin order according to only priority (master-slaves) - */ -struct upstream * -get_upstream_master_slave (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors) -{ - guint max_weight, i; - struct upstream *cur, *selected = NULL; - u_char *p; - - /* Recheck all upstreams */ - (void)rescan_upstreams (ups, - members, - msize, - now, - error_timeout, - revive_timeout, - max_errors); - - p = ups; - max_weight = 0; - selected = (struct upstream *)p; - U_RLOCK (); - for (i = 0; i < members; i++) { - cur = (struct upstream *)p; - if (!cur->dead) { - if (max_weight < cur->priority) { - max_weight = cur->priority; - selected = cur; - } + if (use_cur) { + if (selected->cur_weight > 0) { + selected->cur_weight--; + } + else { + selected->cur_weight = selected->weight; } - p += msize; } - U_UNLOCK (); + rspamd_mutex_unlock (ups->lock); return selected; } /* - * Ketama manipulation functions + * The key idea of this function is obtained from the following paper: + * A Fast, Minimal Memory, Consistent Hash Algorithm + * John Lamping, Eric Veach + * + * http://arxiv.org/abs/1406.2294 */ - -static gint -ketama_sort_cmp (const void *a1, const void *a2) +static guint32 +rspamd_consistent_hash (guint64 key, guint32 nbuckets) { - return *((guint32 *) a1) - *((guint32 *) a2); + gint64 b = -1, j = 0; + + while (j < nbuckets) { + b = j; + key *= 2862933555777941757ULL + 1; + j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL); + } + + return b; } -/* - * Add ketama points for specified upstream - */ -gint -upstream_ketama_add (struct upstream *up, - gchar *up_key, - size_t keylen, - size_t keypoints) +static struct upstream* +rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen) { - guint32 h = 0; - gchar tmp[4]; - guint i; + union { + guint64 k64; + guint32 k32[2]; + } h; - /* Allocate ketama points array */ - if (up->ketama_points == NULL) { - up->ketama_points_size = keypoints; - up->ketama_points = malloc (sizeof (guint32) * up->ketama_points_size); - if (up->ketama_points == NULL) { - return -1; - } - } + guint32 idx; - h = get_hash_for_key (h, up_key, keylen); + /* Generate 64 bits input key */ + h.k32[0] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[0]); + h.k32[1] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[1]); - for (i = 0; i < keypoints; i++) { - tmp[0] = i & 0xff; - tmp[1] = (i >> 8) & 0xff; - tmp[2] = (i >> 16) & 0xff; - tmp[3] = (i >> 24) & 0xff; - - h = get_hash_for_key (h, tmp, sizeof (tmp) * sizeof (gchar)); - up->ketama_points[i] = h; - } - /* Keep points sorted */ - qsort (up->ketama_points, keypoints, sizeof (guint32), ketama_sort_cmp); + rspamd_mutex_lock (ups->lock); + idx = rspamd_consistent_hash (h.k64, ups->alive->len); + rspamd_mutex_unlock (ups->lock); - return 0; + return g_ptr_array_index (ups->alive, idx); } -/* - * Return upstream by hash and find nearest ketama point in some server - */ -struct upstream * -get_upstream_by_hash_ketama (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors, - const gchar *key, - size_t keylen) +struct upstream* +rspamd_upstream_get (struct upstream_list *ups, + enum rspamd_upstream_rotation type, ...) { - guint alive, i; - guint32 h = 0, step, middle, d, min_diff = UINT_MAX; - gchar *p; - struct upstream *cur = NULL, *nearest = NULL; - - alive = rescan_upstreams (ups, - members, - msize, - now, - error_timeout, - revive_timeout, - max_errors); - - if (alive == 0) { - return NULL; + va_list ap; + const guint8 *key; + guint keylen; + + rspamd_mutex_lock (ups->lock); + if (ups->alive->len == 0) { + /* We have no upstreams alive */ + g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups); } - - h = get_hash_for_key (h, key, keylen); - - U_RLOCK (); - p = ups; - nearest = (struct upstream *)p; - for (i = 0; i < members; i++) { - cur = (struct upstream *)p; - if (!cur->dead && cur->ketama_points != NULL) { - /* Find nearest ketama point for this key */ - step = cur->ketama_points_size / 2; - middle = step; - while (step != 1) { - d = cur->ketama_points[middle] - h; - if (abs (d) < (gint)min_diff) { - min_diff = abs (d); - nearest = cur; - } - step /= 2; - if (d > 0) { - middle -= step; - } - else { - middle += step; - } - } - } + rspamd_mutex_unlock (ups->lock); + + switch (type) { + case RSPAMD_UPSTREAM_RANDOM: + return rspamd_upstream_get_random (ups); + case RSPAMD_UPSTREAM_HASHED: + va_start (ap, type); + key = va_arg (ap, const guint8 *); + keylen = va_arg (ap, guint); + va_end (ap); + return rspamd_upstream_get_hashed (ups, key, keylen); + case RSPAMD_UPSTREAM_ROUND_ROBIN: + return rspamd_upstream_get_round_robin (ups, TRUE); + case RSPAMD_UPSTREAM_MASTER_SLAVE: + return rspamd_upstream_get_round_robin (ups, FALSE); } - U_UNLOCK (); - return nearest; } - -#undef U_LOCK -#undef U_UNLOCK -/* - * vi:ts=4 - */ diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index bc11ebf77..7adad8d22 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -1,21 +1,27 @@ #ifndef UPSTREAM_H #define UPSTREAM_H -#include <sys/types.h> -#include <stdint.h> +#include "config.h" +#include "util.h" +#include "rdns.h" + +enum rspamd_upstream_rotation { + RSPAMD_UPSTREAM_RANDOM, + RSPAMD_UPSTREAM_HASHED, + RSPAMD_UPSTREAM_ROUND_ROBIN, + RSPAMD_UPSTREAM_MASTER_SLAVE +}; + +/* Opaque upstream structures */ +struct upstream; +struct upstream_list; /** - * Structure of generic upstream + * Init upstreams library + * @param resolver */ -struct upstream { - guint errors; /**< Errors for this upstream */ - time_t time; /**< Time of marking */ - guint dead; /**< Dead flag */ - guint priority; /**< Fixed priority */ - gint16 weight; /**< Dynamic weight */ - guint32 *ketama_points; /**< Ketama points array */ - size_t ketama_points_size; /**< Ketama array size */ -}; +void rspamd_upstreams_library_init (struct rdns_resolver *resolver, + struct event_base *base); /** * Upstream error logic @@ -28,115 +34,49 @@ struct upstream { /** * Add an error to an upstream */ -void upstream_fail (struct upstream *up, time_t now); +void rspamd_upstream_fail (struct upstream *up); /** * Increase upstream successes count */ -void upstream_ok (struct upstream *up, time_t now); +void rspamd_upstream_ok (struct upstream *up); /** - * Make all upstreams alive + * Create new list of upstreams + * @return */ -void revive_all_upstreams (void *ups, size_t members, size_t msize); - +struct upstream_list* rspamd_upstreams_create (void); /** - * Add ketama points for upstream + * Destroy list of upstreams + * @param ups */ -gint upstream_ketama_add (struct upstream *up, - gchar *up_key, - size_t keylen, - size_t keypoints); - +void rspamd_upstreams_destroy (struct upstream_list *ups); /** - * Get a random upstream from array of upstreams - * @param ups array of structures that contains struct upstream as their first element - * @param members number of elements in array - * @param msize size of each member - * @param now current time - * @param error_timeout time during which we are counting errors - * @param revive_timeout time during which we counts upstream dead - * @param max_errors maximum errors during error_timeout to mark upstream dead + * Add upstream from the string + * @param ups upstream list + * @param str string in format "name[:port[:priority]] + * @param def_port default port number + * @param data optional userdata + * @return TRUE if upstream has been added */ -struct upstream * get_random_upstream (void *ups, size_t members, size_t msize, - time_t now, time_t error_timeout, - time_t revive_timeout, size_t max_errors); +gboolean rspamd_upstreams_add_upstream (struct upstream_list *ups, + const gchar *str, guint16 def_port, void *data); /** - * Get upstream based on hash from array of upstreams - * @param ups array of structures that contains struct upstream as their first element - * @param members number of elements in array - * @param msize size of each member - * @param now current time - * @param error_timeout time during which we are counting errors - * @param revive_timeout time during which we counts upstream dead - * @param max_errors maximum errors during error_timeout to mark upstream dead - * @param key key for hashing - * @param keylen length of the key + * Returns the current IP address of the upstream + * @param up + * @return */ -struct upstream * get_upstream_by_hash (void *ups, size_t members, size_t msize, - time_t now, time_t error_timeout, - time_t revive_timeout, size_t max_errors, - const gchar *key, size_t keylen); +rspamd_inet_addr_t* rspamd_upstream_addr (struct upstream *up); /** - * Get an upstream from array of upstreams based on its current weight - * @param ups array of structures that contains struct upstream as their first element - * @param members number of elements in array - * @param msize size of each member - * @param now current time - * @param error_timeout time during which we are counting errors - * @param revive_timeout time during which we counts upstream dead - * @param max_errors maximum errors during error_timeout to mark upstream dead + * Get new upstream from the list + * @param ups upstream list + * @param type type of rotation algorithm, for `RSPAMD_UPSTREAM_HASHED` it is required to specify `key` and `keylen` as arguments + * @return */ -struct upstream * get_upstream_round_robin (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors); - -/** - * Get upstream based on hash from array of upstreams, this functions is using ketama algorithm - * @param ups array of structures that contains struct upstream as their first element - * @param members number of elements in array - * @param msize size of each member - * @param now current time - * @param error_timeout time during which we are counting errors - * @param revive_timeout time during which we counts upstream dead - * @param max_errors maximum errors during error_timeout to mark upstream dead - * @param key key for hashing - * @param keylen length of the key - */ -struct upstream * get_upstream_by_hash_ketama (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors, - const gchar *key, - size_t keylen); - -/** - * Get an upstream from array of upstreams based on its current priority (not weight) - * @param ups array of structures that contains struct upstream as their first element - * @param members number of elements in array - * @param msize size of each member - * @param now current time - * @param error_timeout time during which we are counting errors - * @param revive_timeout time during which we counts upstream dead - * @param max_errors maximum errors during error_timeout to mark upstream dead - */ -struct upstream * get_upstream_master_slave (void *ups, - size_t members, - size_t msize, - time_t now, - time_t error_timeout, - time_t revive_timeout, - size_t max_errors); - +struct upstream* rspamd_upstream_get (struct upstream_list *ups, + enum rspamd_upstream_rotation type, ...); #endif /* UPSTREAM_H */ /* diff --git a/src/libutil/util.c b/src/libutil/util.c index 73ab64453..ec125704f 100644 --- a/src/libutil/util.c +++ b/src/libutil/util.c @@ -98,7 +98,7 @@ poll_sync_socket (gint fd, gint timeout, short events) return r; } -static gint +gint rspamd_socket_create (gint af, gint type, gint protocol, gboolean async) { gint fd; @@ -1973,32 +1973,6 @@ restart: #endif } -gboolean -rspamd_ip_is_valid (rspamd_inet_addr_t *addr) -{ - const struct in_addr ip4_any = { INADDR_ANY }, ip4_none = { INADDR_NONE }; - const struct in6_addr ip6_any = IN6ADDR_ANY_INIT; - - gboolean ret = FALSE; - - if (G_LIKELY (addr->af == AF_INET)) { - if (memcmp (&addr->addr.s4.sin_addr, &ip4_any, - sizeof (struct in_addr)) != 0 && - memcmp (&addr->addr.s4.sin_addr, &ip4_none, - sizeof (struct in_addr)) != 0) { - ret = TRUE; - } - } - else if (G_UNLIKELY (addr->af == AF_INET6)) { - if (memcmp (&addr->addr.s6.sin6_addr, &ip6_any, - sizeof (struct in6_addr)) != 0) { - ret = TRUE; - } - } - - return ret; -} - /* * GString ucl emitting functions */ @@ -2079,137 +2053,6 @@ rspamd_ucl_emit_gstring (ucl_object_t *obj, ucl_object_emit_full (obj, emit_type, &func); } -gint -rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr) -{ - gint nfd, serrno; - socklen_t len = sizeof (addr->addr.ss); - - if ((nfd = accept (sock, &addr->addr.sa, &len)) == -1) { - if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { - return 0; - } - return -1; - } - - addr->slen = len; - addr->af = addr->addr.sa.sa_family; - - if (make_socket_nonblocking (nfd) < 0) { - goto out; - } - - /* Set close on exec */ - if (fcntl (nfd, F_SETFD, FD_CLOEXEC) == -1) { - msg_warn ("fcntl failed: %d, '%s'", errno, strerror (errno)); - goto out; - } - - return (nfd); - -out: - serrno = errno; - close (nfd); - errno = serrno; - return (-1); - -} - -gboolean -rspamd_parse_inet_address (rspamd_inet_addr_t *target, const char *src) -{ - gboolean ret = FALSE; - - if (inet_pton (AF_INET6, src, &target->addr.s6.sin6_addr) == 1) { - target->af = AF_INET6; - target->slen = sizeof (target->addr.s6); - ret = TRUE; - } - else if (inet_pton (AF_INET, src, &target->addr.s4.sin_addr) == 1) { - target->af = AF_INET; - target->slen = sizeof (target->addr.s4); - ret = TRUE; - } - - target->addr.sa.sa_family = target->af; - - return ret; -} - -const char * -rspamd_inet_address_to_string (rspamd_inet_addr_t *addr) -{ - static char addr_str[INET6_ADDRSTRLEN + 1]; - - switch (addr->af) { - case AF_INET: - return inet_ntop (addr->af, &addr->addr.s4.sin_addr, addr_str, - sizeof (addr_str)); - case AF_INET6: - return inet_ntop (addr->af, &addr->addr.s6.sin6_addr, addr_str, - sizeof (addr_str)); - case AF_UNIX: - return addr->addr.su.sun_path; - } - - return "undefined"; -} - -uint16_t -rspamd_inet_address_get_port (rspamd_inet_addr_t *addr) -{ - switch (addr->af) { - case AF_INET: - return ntohs (addr->addr.s4.sin_port); - case AF_INET6: - return ntohs (addr->addr.s6.sin6_port); - } - - return 0; -} - -void -rspamd_inet_address_set_port (rspamd_inet_addr_t *addr, uint16_t port) -{ - switch (addr->af) { - case AF_INET: - addr->addr.s4.sin_port = htons (port); - break; - case AF_INET6: - addr->addr.s6.sin6_port = htons (port); - break; - } -} - -int -rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type, - gboolean async) -{ - int fd, r; - - if (addr == NULL) { - return -1; - } - - fd = rspamd_socket_create (addr->af, type, 0, async); - if (fd == -1) { - return -1; - } - - r = connect (fd, &addr->addr.sa, addr->slen); - - if (r == -1) { - if (!async || errno != EINPROGRESS) { - close (fd); - msg_warn ("connect failed: %d, '%s'", errno, - strerror (errno)); - return -1; - } - } - - return fd; -} - /* * We use here z-base32 encoding described here: * http://philzimmermann.com/docs/human-oriented-base-32-encoding.txt diff --git a/src/libutil/util.h b/src/libutil/util.h index 40d8004e3..ed4e6fcca 100644 --- a/src/libutil/util.h +++ b/src/libutil/util.h @@ -7,6 +7,7 @@ #include "printf.h" #include "fstring.h" #include "ucl.h" +#include "addr.h" struct rspamd_config; struct rspamd_main; @@ -15,23 +16,14 @@ struct rspamd_statfile_config; struct rspamd_classifier_config; /** - * Union that is used for storing sockaddrs - */ -union sa_union { - struct sockaddr_storage ss; - struct sockaddr sa; - struct sockaddr_in s4; - struct sockaddr_in6 s6; - struct sockaddr_un su; -}; - -typedef struct _rspamd_inet_addr_s { - union sa_union addr; - socklen_t slen; - int af; -} rspamd_inet_addr_t; - - + * Create generic socket + * @param af address family + * @param type socket type + * @param protocol socket protocol + * @param async set non-blocking on a socket + * @return socket FD or -1 in case of error + */ +gint rspamd_socket_create (gint af, gint type, gint protocol, gboolean async); /* * Create socket and bind or connect it to specified address and port */ @@ -418,14 +410,6 @@ gpointer rspamd_str_pool_copy (gconstpointer data, gpointer ud); gint rspamd_read_passphrase (gchar *buf, gint size, gint rwflag, gpointer key); /** - * Check whether specified ip is valid (not INADDR_ANY or INADDR_NONE) for ipv4 or ipv6 - * @param ptr pointer to struct in_addr or struct in6_addr - * @param af address family (AF_INET or AF_INET6) - * @return TRUE if the address is valid - */ -gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr); - -/** * Emit UCL object to gstring * @param obj object to emit * @param emit_type emitter type @@ -436,51 +420,6 @@ void rspamd_ucl_emit_gstring (ucl_object_t *obj, GString *target); /** - * Accept from listening socket filling addr structure - * @param sock listening socket - * @param addr - * @return - */ -gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr); - -/** - * Try to parse address from string - * @param target target to fill - * @param src IP string representation - * @return TRUE if addr has been parsed - */ -gboolean rspamd_parse_inet_address (rspamd_inet_addr_t *target, - const char *src); - -/** - * Returns string representation of inet address - * @param addr - * @return statically allocated string pointer (not thread safe) - */ -const char * rspamd_inet_address_to_string (rspamd_inet_addr_t *addr); - -/** - * Returns port number for the specified inet address in host byte order - * @param addr - * @return - */ -uint16_t rspamd_inet_address_get_port (rspamd_inet_addr_t *addr); - -/** - * Set port for inet address - */ -void rspamd_inet_address_set_port (rspamd_inet_addr_t *addr, uint16_t port); - -/** - * Connect to inet_addr address - * @param addr - * @param async perform operations asynchronously - * @return newly created and connected socket - */ -int rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type, - gboolean async); - -/** * Encode string using base32 encoding * @param in input * @param inlen input length diff --git a/src/lua/lua_upstream.c b/src/lua/lua_upstream.c index e2d2aad3d..380397823 100644 --- a/src/lua/lua_upstream.c +++ b/src/lua/lua_upstream.c @@ -26,10 +26,6 @@ #include "upstream.h" #include "cfg_file.h" -/* Upstream timeouts */ -#define DEFAULT_UPSTREAM_ERROR_TIME 10 -#define DEFAULT_UPSTREAM_DEAD_TIME 300 -#define DEFAULT_UPSTREAM_MAXERRORS 10 /** * This module implements upstreams manipulation from lua @@ -56,97 +52,27 @@ static const struct luaL_reg upstream_list_f[] = { }; /* Upstream functions */ -LUA_FUNCTION_DEF (upstream, create); -LUA_FUNCTION_DEF (upstream, destroy); LUA_FUNCTION_DEF (upstream, ok); LUA_FUNCTION_DEF (upstream, fail); -LUA_FUNCTION_DEF (upstream, get_ip); -LUA_FUNCTION_DEF (upstream, get_port); -LUA_FUNCTION_DEF (upstream, get_ip_string); -LUA_FUNCTION_DEF (upstream, get_priority); +LUA_FUNCTION_DEF (upstream, get_addr); static const struct luaL_reg upstream_m[] = { LUA_INTERFACE_DEF (upstream, ok), LUA_INTERFACE_DEF (upstream, fail), - LUA_INTERFACE_DEF (upstream, get_ip), - LUA_INTERFACE_DEF (upstream, get_ip_string), - LUA_INTERFACE_DEF (upstream, get_port), - LUA_INTERFACE_DEF (upstream, get_priority), - LUA_INTERFACE_DEF (upstream, destroy), + LUA_INTERFACE_DEF (upstream, get_addr), {"__tostring", rspamd_lua_class_tostring}, {NULL, NULL} }; -static const struct luaL_reg upstream_f[] = { - LUA_INTERFACE_DEF (upstream, create), - {NULL, NULL} -}; /* Upstream class */ -struct lua_upstream { - struct upstream up; - gchar *def; - guint16 port; - gchar *addr; -}; -static struct lua_upstream * +static struct upstream * lua_check_upstream (lua_State * L) { void *ud = luaL_checkudata (L, 1, "rspamd{upstream}"); luaL_argcheck (L, ud != NULL, 1, "'upstream' expected"); - return ud ? *((struct lua_upstream **)ud) : NULL; -} - -/** - * Create new upstream from its string definition like 'ip[:port[:priority]]' or 'host[:port[:priority]]' - * @param L - * @return upstream structure - */ -static gint -lua_upstream_create (lua_State *L) -{ - struct lua_upstream *new, **pnew; - const gchar *def; - - def = luaL_checkstring (L, 1); - if (def) { - new = g_slice_alloc0 (sizeof (struct lua_upstream)); - new->def = g_strdup (def); - new->addr = g_malloc (INET6_ADDRSTRLEN); - if (!rspamd_parse_host_port_priority (NULL, new->def, &new->addr, - &new->port, &new->up.priority)) { - g_free (new->def); - g_slice_free1 (sizeof (struct lua_upstream), new); - lua_pushnil (L); - } - else { - pnew = lua_newuserdata (L, sizeof (struct lua_upstream *)); - rspamd_lua_setclass (L, "rspamd{upstream}", -1); - *pnew = new; - } - } - - return 1; -} - -/** - * Destroy a single upstream object - * @param L - * @return - */ -static gint -lua_upstream_destroy (lua_State *L) -{ - struct lua_upstream *up = lua_check_upstream (L); - - if (up) { - g_free (up->def); - g_free (up->addr); - g_slice_free1 (sizeof (struct lua_upstream), up); - } - - return 0; + return ud ? *((struct upstream **)ud) : NULL; } /** @@ -155,72 +81,12 @@ lua_upstream_destroy (lua_State *L) * @return */ static gint -lua_upstream_get_ip (lua_State *L) +lua_upstream_get_addr (lua_State *L) { - struct lua_upstream *up = lua_check_upstream (L); + struct upstream *up = lua_check_upstream (L); if (up) { - lua_pushstring (L, up->addr); - } - else { - lua_pushnil (L); - } - - return 1; -} - -/** - * Get ip of upstream in string form - * @param L - * @return - */ -static gint -lua_upstream_get_ip_string (lua_State *L) -{ - struct lua_upstream *up = lua_check_upstream (L); - - if (up) { - lua_pushstring (L, up->addr); - } - else { - lua_pushnil (L); - } - - return 1; -} - -/** - * Get port of upstream in numeric form - * @param L - * @return - */ -static gint -lua_upstream_get_port (lua_State *L) -{ - struct lua_upstream *up = lua_check_upstream (L); - - if (up) { - lua_pushinteger (L, up->port); - } - else { - lua_pushnil (L); - } - - return 1; -} - -/** - * Get port of upstream in numeric form - * @param L - * @return - */ -static gint -lua_upstream_get_priority (lua_State *L) -{ - struct lua_upstream *up = lua_check_upstream (L); - - if (up) { - lua_pushinteger (L, up->up.priority); + rspamd_lua_ip_push (L, rspamd_upstream_addr (up)); } else { lua_pushnil (L); @@ -237,17 +103,10 @@ lua_upstream_get_priority (lua_State *L) static gint lua_upstream_fail (lua_State *L) { - struct lua_upstream *up = lua_check_upstream (L); - time_t now; + struct upstream *up = lua_check_upstream (L); if (up) { - if (lua_gettop (L) >= 2) { - now = luaL_checkinteger (L, 2); - } - else { - now = time (NULL); - } - upstream_fail (&up->up, now); + rspamd_upstream_fail (up); } return 0; @@ -261,35 +120,24 @@ lua_upstream_fail (lua_State *L) static gint lua_upstream_ok (lua_State *L) { - struct lua_upstream *up = lua_check_upstream (L); - time_t now; + struct upstream *up = lua_check_upstream (L); if (up) { - if (lua_gettop (L) >= 2) { - now = luaL_checkinteger (L, 2); - } - else { - now = time (NULL); - } - upstream_ok (&up->up, now); + rspamd_upstream_ok (up); } return 0; } /* Upstream list class */ -struct lua_upstream_list { - struct lua_upstream *upstreams; - guint count; -}; -static struct lua_upstream_list * +static struct upstream_list * lua_check_upstream_list (lua_State * L) { void *ud = luaL_checkudata (L, 1, "rspamd{upstream_list}"); luaL_argcheck (L, ud != NULL, 1, "'upstream_list' expected"); - return ud ? *((struct lua_upstream_list **)ud) : NULL; + return ud ? *((struct upstream_list **)ud) : NULL; } /** @@ -300,40 +148,34 @@ lua_check_upstream_list (lua_State * L) static gint lua_upstream_list_create (lua_State *L) { - struct lua_upstream_list *new, **pnew; - struct lua_upstream *cur; + struct upstream_list *new = NULL, **pnew; const gchar *def; - char **tokens; - guint i, default_port = 0; + char **tokens, **t; + guint default_port = 0; def = luaL_checkstring (L, 1); if (def) { if (lua_gettop (L) >= 2) { default_port = luaL_checkinteger (L, 2); } - new = g_slice_alloc0 (sizeof (struct lua_upstream_list)); tokens = g_strsplit_set (def, ",;", 0); if (!tokens || !tokens[0]) { goto err; } - new->count = g_strv_length (tokens); - new->upstreams = - g_slice_alloc0 (new->count * sizeof (struct lua_upstream)); - - for (i = 0; i < new->count; i++) { - cur = &new->upstreams[i]; - cur->addr = g_malloc (INET6_ADDRSTRLEN); - if (!rspamd_parse_host_port_priority (NULL, tokens[i], &cur->addr, - &cur->port, &cur->up.priority)) { + t = tokens; + + new = rspamd_upstreams_create (); + while (*t != NULL) { + if (!rspamd_upstreams_add_upstream (new, *t, default_port, NULL)) { goto err; } - if (cur->port == 0) { - cur->port = default_port; - } + t ++; } pnew = lua_newuserdata (L, sizeof (struct upstream_list *)); rspamd_lua_setclass (L, "rspamd{upstream_list}", -1); + + g_strfreev (tokens); *pnew = new; } @@ -342,18 +184,11 @@ err: if (tokens) { g_strfreev (tokens); } - if (new->upstreams) { - for (i = 0; i < new->count; i++) { - cur = &new->upstreams[i]; - if (cur->addr) { - g_free (cur->addr); - } - } - g_slice_free1 (new->count * sizeof (struct lua_upstream), - new->upstreams); + if (new) { + rspamd_upstreams_destroy (new); } - g_slice_free1 (sizeof (struct lua_upstream_list), new); lua_pushnil (L); + return 1; } @@ -365,23 +200,9 @@ err: static gint lua_upstream_list_destroy (lua_State *L) { - struct lua_upstream_list *upl = lua_check_upstream_list (L); - struct lua_upstream *cur; - guint i; + struct upstream_list *upl = lua_check_upstream_list (L); - if (upl) { - if (upl->upstreams) { - for (i = 0; i < upl->count; i++) { - cur = &upl->upstreams[i]; - if (cur->addr) { - g_free (cur->addr); - } - } - g_slice_free1 (upl->count * sizeof (struct lua_upstream), - upl->upstreams); - } - g_slice_free1 (sizeof (struct lua_upstream_list), upl); - } + rspamd_upstreams_destroy (upl); return 0; } @@ -394,33 +215,19 @@ lua_upstream_list_destroy (lua_State *L) static gint lua_upstream_list_get_upstream_by_hash (lua_State *L) { - struct lua_upstream_list *upl; - struct lua_upstream *selected, **pselected; - time_t now; + struct upstream_list *upl; + struct upstream *selected, **pselected; const gchar *key; + gsize keyl; upl = lua_check_upstream_list (L); if (upl) { - key = luaL_checkstring (L, 2); + key = luaL_checklstring (L, 2, &keyl); if (key) { - if (lua_gettop (L) >= 3) { - now = luaL_checkinteger (L, 3); - } - else { - now = time (NULL); - } - selected = (struct lua_upstream *)get_upstream_by_hash ( - upl->upstreams, - upl->count, - sizeof (struct lua_upstream), - now, - DEFAULT_UPSTREAM_ERROR_TIME, - DEFAULT_UPSTREAM_DEAD_TIME, - DEFAULT_UPSTREAM_MAXERRORS, - key, - 0); + selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_HASHED, key, + (guint)keyl); if (selected) { - pselected = lua_newuserdata (L, sizeof (struct lua_upstream *)); + pselected = lua_newuserdata (L, sizeof (struct upstream *)); rspamd_lua_setclass (L, "rspamd{upstream}", -1); *pselected = selected; } @@ -447,28 +254,15 @@ lua_upstream_list_get_upstream_by_hash (lua_State *L) static gint lua_upstream_list_get_upstream_round_robin (lua_State *L) { - struct lua_upstream_list *upl; - struct lua_upstream *selected, **pselected; - time_t now; + struct upstream_list *upl; + struct upstream *selected, **pselected; upl = lua_check_upstream_list (L); if (upl) { - if (lua_gettop (L) >= 2) { - now = luaL_checkinteger (L, 2); - } - else { - now = time (NULL); - } - selected = (struct lua_upstream *)get_upstream_round_robin ( - upl->upstreams, - upl->count, - sizeof (struct lua_upstream), - now, - DEFAULT_UPSTREAM_ERROR_TIME, - DEFAULT_UPSTREAM_DEAD_TIME, - DEFAULT_UPSTREAM_MAXERRORS); + + selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_ROUND_ROBIN); if (selected) { - pselected = lua_newuserdata (L, sizeof (struct lua_upstream *)); + pselected = lua_newuserdata (L, sizeof (struct upstream *)); rspamd_lua_setclass (L, "rspamd{upstream}", -1); *pselected = selected; } @@ -491,28 +285,15 @@ lua_upstream_list_get_upstream_round_robin (lua_State *L) static gint lua_upstream_list_get_upstream_master_slave (lua_State *L) { - struct lua_upstream_list *upl; - struct lua_upstream *selected, **pselected; - time_t now; + struct upstream_list *upl; + struct upstream *selected, **pselected; upl = lua_check_upstream_list (L); if (upl) { - if (lua_gettop (L) >= 2) { - now = luaL_checkinteger (L, 2); - } - else { - now = time (NULL); - } - selected = (struct lua_upstream *)get_upstream_master_slave ( - upl->upstreams, - upl->count, - sizeof (struct lua_upstream), - now, - DEFAULT_UPSTREAM_ERROR_TIME, - DEFAULT_UPSTREAM_DEAD_TIME, - DEFAULT_UPSTREAM_MAXERRORS); + + selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_MASTER_SLAVE); if (selected) { - pselected = lua_newuserdata (L, sizeof (struct lua_upstream *)); + pselected = lua_newuserdata (L, sizeof (struct upstream *)); rspamd_lua_setclass (L, "rspamd{upstream}", -1); *pselected = selected; } @@ -528,15 +309,6 @@ lua_upstream_list_get_upstream_master_slave (lua_State *L) } static gint -lua_load_upstream (lua_State * L) -{ - lua_newtable (L); - luaL_register (L, NULL, upstream_f); - - return 1; -} - -static gint lua_load_upstream_list (lua_State * L) { lua_newtable (L); @@ -572,7 +344,6 @@ luaopen_upstream (lua_State * L) lua_rawset (L, -3); luaL_register (L, NULL, upstream_m); - rspamd_lua_add_preload (L, "rspamd_upstream", lua_load_upstream); lua_pop (L, 1); /* remove metatable from stack */ } |